Namespace faabric::transport

namespace transport

Enums

enum class MessageResponseCode

Types of message send/ receive outcomes.

Values:

enumerator SUCCESS
enumerator TERM
enumerator TIMEOUT
enumerator ERROR
enum MessageEndpointConnectType

Values:

enumerator BIND
enumerator CONNECT
enum class SocketType

Values:

enumerator pair
enumerator pub
enumerator sub
enumerator pull
enumerator push
enumerator rep
enumerator req
enum PointToPointCall

Values:

enumerator MAPPING
enumerator MESSAGE
enumerator LOCK_GROUP
enumerator LOCK_GROUP_RECURSIVE
enumerator UNLOCK_GROUP
enumerator UNLOCK_GROUP_RECURSIVE

Functions

PointToPointBroker &getPointToPointBroker()
std::vector<std::pair<std::string, faabric::PointToPointMappings>> getSentMappings()
std::vector<std::pair<std::string, faabric::PointToPointMessage>> getSentPointToPointMessages()
std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> getSentLockMessages()
void clearSentMessages()
static std::shared_ptr<PointToPointClient> getClient(const std::string &host)
std::string getPointToPointKey(int groupId, int sendIdx, int recvIdx)
std::string getPointToPointKey(int groupId, int recvIdx)
auto getEndpointPtrs(const std::string &label)

Variables

static std::map<MessageResponseCode, std::string> MessageResponseCodeText = {{MessageResponseCode::SUCCESS, "Success"}, {MessageResponseCode::TERM, "Connection terminated"}, {MessageResponseCode::TIMEOUT, "Message timed out"}, {MessageResponseCode::ERROR, "Error"},}
static faabric::util::ConcurrentMap<int, std::shared_ptr<PointToPointGroup>> groups
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<std::tuple<std::unique_ptr<AsyncInternalRecvMessageEndpoint>, std::unique_ptr<AsyncInternalSendMessageEndpoint>, std::atomic_int32_t>>> endpoints
thread_local absl::flat_hash_set<std::string> threadEndpoints
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<PointToPointClient>> clients
thread_local int currentGroupId = NO_CURRENT_GROUP_ID
thread_local std::vector<int> sentMsgCount
thread_local std::vector<int> recvMsgCount
thread_local std::vector<std::list<Message>> outOfOrderMsgs
static std::mutex mockMutex
static std::vector<std::pair<std::string, faabric::PointToPointMappings>> sentMappings
static std::vector<std::pair<std::string, faabric::PointToPointMessage>> sentMessages
static std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> sentLockMessages
class AsyncDirectRecvEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncDirectSendEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize)
class AsyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint

Public Functions

AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
class AsyncInternalRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncInternalSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNumber = NO_SEQUENCE_NUM)
class AsyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
AsyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNum = NO_SEQUENCE_NUM)
class FanMessageEndpoint : public faabric::transport::MessageEndpoint

Subclassed by faabric::transport::AsyncFanMessageEndpoint, faabric::transport::SyncFanMessageEndpoint

Public Functions

FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
MessageContext attachFanOut()
Message recv(const MessageContext &ctx)
void sendResponse(const MessageContext &ctx, uint8_t header, const uint8_t *data, size_t dataSize)
void stop()
class Message
#include <Message.h>

Represents message data passed around the transport layer. Essentially an array of bytes, with a size and a flag to say whether there’s more data to follow.

Messages are not copyable, only movable, as they will regularly contain large amounts of data.

Public Functions

Message(size_t bufferSize)
Message(nng_msg *nngMsg)
Message(MessageResponseCode responseCodeIn)
~Message()
Message(const Message &other) = delete
Message &operator=(const Message &other) = delete
inline Message(Message &&other)
inline Message &operator=(Message &&other)
inline MessageResponseCode getResponseCode()
inline std::span<uint8_t> allData()
inline std::span<const uint8_t> allData() const
std::span<char> data()
std::span<const char> data() const
std::span<uint8_t> udata()
std::span<const uint8_t> udata() const
std::vector<uint8_t> dataCopy() const
inline uint8_t getMessageCode() const
inline uint64_t getDeclaredDataSize() const
inline int getSequenceNum() const
class MessageContext

Public Functions

MessageContext() = default
inline explicit MessageContext(nng_ctx context)
MessageContext(const MessageContext&) = delete
MessageContext &operator=(const MessageContext&) = delete
inline MessageContext(MessageContext &&rhs)
inline MessageContext &operator=(MessageContext &&rhs)
inline ~MessageContext()

Public Members

nng_ctx context = NNG_CTX_INITIALIZER
class MessageEndpoint

Subclassed by faabric::transport::AsyncDirectSendEndpoint, faabric::transport::AsyncInternalSendMessageEndpoint, faabric::transport::AsyncSendMessageEndpoint, faabric::transport::FanMessageEndpoint, faabric::transport::RecvMessageEndpoint, faabric::transport::SyncSendMessageEndpoint

Public Functions

MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
MessageEndpoint(const std::string &addressIn, int timeoutMsIn)
MessageEndpoint &operator=(const MessageEndpoint&) = delete
MessageEndpoint(const MessageEndpoint &ctx) = delete
virtual ~MessageEndpoint()
std::string getAddress()
class MessageEndpointClient

Subclassed by faabric::planner::PlannerClient, faabric::scheduler::FunctionCallClient, faabric::snapshot::SnapshotClient, faabric::state::StateClient, faabric::transport::PointToPointClient

Public Functions

MessageEndpointClient(std::string hostIn, int asyncPort, int syncPort, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void asyncSend(int header, google::protobuf::Message *msg, int sequenceNum = NO_SEQUENCE_NUM)
void asyncSend(int header, const uint8_t *buffer, size_t bufferSize, int sequenceNum = NO_SEQUENCE_NUM)
void syncSend(int header, google::protobuf::Message *msg, google::protobuf::Message *response)
void syncSend(int header, const uint8_t *buffer, size_t bufferSize, google::protobuf::Message *response)
class MessageEndpointServer

Subclassed by faabric::planner::PlannerServer, faabric::scheduler::FunctionCallServer, faabric::snapshot::SnapshotServer, faabric::state::StateServer, faabric::transport::PointToPointServer

Public Functions

MessageEndpointServer(int asyncPortIn, int syncPortIn, const std::string &inprocLabelIn, int nThreadsIn)
virtual void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)

We need to guarantee to callers of this function, that when it returns, the server will be ready to use.

virtual void stop()
virtual void onWorkerStop()
void setRequestLatch()
void awaitRequestLatch()
int getNThreads()
class MessageEndpointServerHandler

Public Functions

MessageEndpointServerHandler(MessageEndpointServer *serverIn, bool asyncIn, const std::string &inprocLabelIn, int nThreadsIn)
void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void join()
class MessageTimeoutException : public faabric::util::FaabricException

Public Functions

inline explicit MessageTimeoutException(std::string message)
class PointToPointBroker

Public Functions

PointToPointBroker()
std::string getHostForReceiver(int groupId, int recvIdx)
int getMpiPortForReceiver(int groupId, int recvIdx)
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
void setAndSendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
void sendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision, const std::set<std::string> &hostList)
void waitForMappingsOnThisHost(int groupId)
std::set<int> getIdxsRegisteredForGroup(int groupId)
std::set<std::string> getHostsRegisteredForGroup(int groupId)
void updateHostForIdx(int groupId, int groupIdx, std::string newHost)
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, std::string hostHint, bool mustOrderMsg = false)
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, bool mustOrderMsg = false, int sequenceNum = NO_SEQUENCE_NUM, std::string hostHint = "")
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx, bool mustOrderMsg = false)
void clearGroup(int groupId)
void clear()
void resetThreadLocalCache()
void postMigrationHook(faabric::Message &msg)
class PointToPointClient : public faabric::transport::MessageEndpointClient

Public Functions

PointToPointClient(const std::string &hostIn)
void sendMappings(faabric::PointToPointMappings &mappings)
void sendMessage(faabric::PointToPointMessage &msg, int sequenceNum = NO_SEQUENCE_NUM)
void groupLock(int appId, int groupId, int groupIdx, bool recursive = false)
void groupUnlock(int appId, int groupId, int groupIdx, bool recursive = false)
class PointToPointGroup

Public Functions

PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
void lock(int groupIdx, bool recursive)
void unlock(int groupIdx, bool recursive)
int getLockOwner(bool recursive)
void localLock()
void localUnlock()
bool localTryLock()
void barrier(int groupIdx)
void notify(int groupIdx)
int getNotifyCount()

Public Static Functions

static std::shared_ptr<PointToPointGroup> getGroup(int groupId)
static std::shared_ptr<PointToPointGroup> getOrAwaitGroup(int groupId)
static bool groupExists(int groupId)
static void addGroup(int appId, int groupId, int groupSize, bool isSingleHost)
static void addGroupIfNotExists(int appId, int groupId, int groupSize)
static void clearGroup(int groupId)
static void clear()
class PointToPointServer : public faabric::transport::MessageEndpointServer

Public Functions

PointToPointServer()
class RecvMessageEndpoint : public faabric::transport::MessageEndpoint

Subclassed by faabric::transport::AsyncDirectRecvEndpoint, faabric::transport::AsyncInternalRecvMessageEndpoint, faabric::transport::AsyncRecvMessageEndpoint, faabric::transport::SyncRecvMessageEndpoint

Public Functions

RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)

Constructor for external TCP sockets

RecvMessageEndpoint(std::string inProcLabel, int timeoutMs, SocketType socketType, MessageEndpointConnectType connectType)

Constructor for internal inproc sockets

inline virtual ~RecvMessageEndpoint()
virtual Message recv()
class SyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint

Public Functions

SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
class SyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
SyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
void sendResponse(uint8_t header, const uint8_t *data, size_t dataSize)
class SyncSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void sendRaw(const uint8_t *data, size_t dataSize)
Message sendAwaitResponse(uint8_t header, const uint8_t *data, size_t dataSize)
namespace tcp

Functions

void setReuseAddr(int connFd)
void setNoDelay(int connFd)
void setQuickAck(int connFd)
void setNonBlocking(int connFd)
void setBlocking(int connFd)
bool isNonBlocking(int connFd)
void setBusyPolling(int connFd)
void setRecvTimeoutMs(int connFd, int timeoutMs)
void setSendTimeoutMs(int connFd, int timeoutMs)
void setRecvBufferSize(int connFd, size_t bufferSizeBytes)
void setSendBufferSize(int connFd, size_t bufferSizeBytes)

Variables

const int SocketTimeoutMs = 5000
const size_t SocketBufferSizeBytes = 16777216
class Address

Public Functions

Address(const std::string &host, int port)
Address(int port)
sockaddr *get() const
class RecvSocket

Public Functions

RecvSocket(int port, const std::string &host = ANY_HOST)
RecvSocket(const RecvSocket &recvSocket) = delete
~RecvSocket()
void listen()
int accept()
void recvOne(int conn, uint8_t *buffer, size_t bufferSize)
class SendSocket

Public Functions

SendSocket(const std::string &host, int port)
void dial()
void sendOne(const uint8_t *buffer, size_t bufferSize)
class Socket

Public Functions

Socket()
Socket(int connFd)
Socket(const Socket &socket) = delete
~Socket()
inline int get() const