Namespace faabric::transport
-
namespace transport
Enums
-
enum class MessageResponseCode
Types of message send/ receive outcomes.
Values:
-
enumerator SUCCESS
-
enumerator TERM
-
enumerator TIMEOUT
-
enumerator ERROR
-
enumerator SUCCESS
-
enum MessageEndpointConnectType
Values:
-
enumerator BIND
-
enumerator CONNECT
-
enumerator BIND
-
enum class SocketType
Values:
-
enumerator pair
-
enumerator pub
-
enumerator sub
-
enumerator pull
-
enumerator push
-
enumerator rep
-
enumerator req
-
enumerator pair
-
enum PointToPointCall
Values:
-
enumerator MAPPING
-
enumerator MESSAGE
-
enumerator LOCK_GROUP
-
enumerator LOCK_GROUP_RECURSIVE
-
enumerator UNLOCK_GROUP
-
enumerator UNLOCK_GROUP_RECURSIVE
-
enumerator MAPPING
Functions
-
PointToPointBroker &getPointToPointBroker()
-
std::vector<std::pair<std::string, faabric::PointToPointMappings>> getSentMappings()
-
std::vector<std::pair<std::string, faabric::PointToPointMessage>> getSentPointToPointMessages()
-
std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> getSentLockMessages()
-
void clearSentMessages()
-
static std::shared_ptr<PointToPointClient> getClient(const std::string &host)
-
std::string getPointToPointKey(int groupId, int sendIdx, int recvIdx)
-
std::string getPointToPointKey(int groupId, int recvIdx)
-
auto getEndpointPtrs(const std::string &label)
Variables
-
static std::map<MessageResponseCode, std::string> MessageResponseCodeText = {{MessageResponseCode::SUCCESS, "Success"}, {MessageResponseCode::TERM, "Connection terminated"}, {MessageResponseCode::TIMEOUT, "Message timed out"}, {MessageResponseCode::ERROR, "Error"},}
-
static faabric::util::ConcurrentMap<int, std::shared_ptr<PointToPointGroup>> groups
-
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<std::tuple<std::unique_ptr<AsyncInternalRecvMessageEndpoint>, std::unique_ptr<AsyncInternalSendMessageEndpoint>, std::atomic_int32_t>>> endpoints
-
thread_local absl::flat_hash_set<std::string> threadEndpoints
-
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<PointToPointClient>> clients
-
thread_local int currentGroupId = NO_CURRENT_GROUP_ID
-
thread_local std::vector<int> sentMsgCount
-
thread_local std::vector<int> recvMsgCount
-
thread_local std::vector<std::list<Message>> outOfOrderMsgs
-
static std::mutex mockMutex
-
static std::vector<std::pair<std::string, faabric::PointToPointMappings>> sentMappings
-
static std::vector<std::pair<std::string, faabric::PointToPointMessage>> sentMessages
-
static std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> sentLockMessages
-
class AsyncDirectRecvEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncDirectSendEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void send(uint8_t header, const uint8_t *data, size_t dataSize)
-
AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint
Public Functions
-
AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncInternalRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncInternalSendMessageEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNumber = NO_SEQUENCE_NUM)
-
AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
AsyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncSendMessageEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNum = NO_SEQUENCE_NUM)
-
AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class FanMessageEndpoint : public faabric::transport::MessageEndpoint
Subclassed by faabric::transport::AsyncFanMessageEndpoint, faabric::transport::SyncFanMessageEndpoint
Public Functions
-
FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
-
MessageContext attachFanOut()
-
Message recv(const MessageContext &ctx)
-
void sendResponse(const MessageContext &ctx, uint8_t header, const uint8_t *data, size_t dataSize)
-
void stop()
-
FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
-
class Message
- #include <Message.h>
Represents message data passed around the transport layer. Essentially an array of bytes, with a size and a flag to say whether there’s more data to follow.
Messages are not copyable, only movable, as they will regularly contain large amounts of data.
Public Functions
-
Message(size_t bufferSize)
-
Message(nng_msg *nngMsg)
-
Message(MessageResponseCode responseCodeIn)
-
~Message()
-
Message(const Message &other) = delete
-
inline Message(Message &&other)
-
inline MessageResponseCode getResponseCode()
-
inline std::span<uint8_t> allData()
-
inline std::span<const uint8_t> allData() const
-
std::span<char> data()
-
std::span<const char> data() const
-
std::span<uint8_t> udata()
-
std::span<const uint8_t> udata() const
-
std::vector<uint8_t> dataCopy() const
-
inline uint8_t getMessageCode() const
-
inline uint64_t getDeclaredDataSize() const
-
inline int getSequenceNum() const
-
Message(size_t bufferSize)
-
class MessageContext
Public Functions
-
MessageContext() = default
-
inline explicit MessageContext(nng_ctx context)
-
MessageContext(const MessageContext&) = delete
-
MessageContext &operator=(const MessageContext&) = delete
-
inline MessageContext(MessageContext &&rhs)
-
inline MessageContext &operator=(MessageContext &&rhs)
-
inline ~MessageContext()
Public Members
-
nng_ctx context = NNG_CTX_INITIALIZER
-
MessageContext() = default
-
class MessageEndpoint
Subclassed by faabric::transport::AsyncDirectSendEndpoint, faabric::transport::AsyncInternalSendMessageEndpoint, faabric::transport::AsyncSendMessageEndpoint, faabric::transport::FanMessageEndpoint, faabric::transport::RecvMessageEndpoint, faabric::transport::SyncSendMessageEndpoint
Public Functions
-
MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
-
MessageEndpoint(const std::string &addressIn, int timeoutMsIn)
-
MessageEndpoint &operator=(const MessageEndpoint&) = delete
-
MessageEndpoint(const MessageEndpoint &ctx) = delete
-
virtual ~MessageEndpoint()
-
std::string getAddress()
-
MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
-
class MessageEndpointClient
Subclassed by faabric::planner::PlannerClient, faabric::scheduler::FunctionCallClient, faabric::snapshot::SnapshotClient, faabric::state::StateClient, faabric::transport::PointToPointClient
Public Functions
-
MessageEndpointClient(std::string hostIn, int asyncPort, int syncPort, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void asyncSend(int header, google::protobuf::Message *msg, int sequenceNum = NO_SEQUENCE_NUM)
-
void asyncSend(int header, const uint8_t *buffer, size_t bufferSize, int sequenceNum = NO_SEQUENCE_NUM)
-
void syncSend(int header, google::protobuf::Message *msg, google::protobuf::Message *response)
-
void syncSend(int header, const uint8_t *buffer, size_t bufferSize, google::protobuf::Message *response)
-
MessageEndpointClient(std::string hostIn, int asyncPort, int syncPort, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class MessageEndpointServer
Subclassed by faabric::planner::PlannerServer, faabric::scheduler::FunctionCallServer, faabric::snapshot::SnapshotServer, faabric::state::StateServer, faabric::transport::PointToPointServer
Public Functions
-
MessageEndpointServer(int asyncPortIn, int syncPortIn, const std::string &inprocLabelIn, int nThreadsIn)
-
virtual void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
We need to guarantee to callers of this function, that when it returns, the server will be ready to use.
-
virtual void stop()
-
virtual void onWorkerStop()
-
void setRequestLatch()
-
void awaitRequestLatch()
-
int getNThreads()
-
MessageEndpointServer(int asyncPortIn, int syncPortIn, const std::string &inprocLabelIn, int nThreadsIn)
-
class MessageEndpointServerHandler
Public Functions
-
MessageEndpointServerHandler(MessageEndpointServer *serverIn, bool asyncIn, const std::string &inprocLabelIn, int nThreadsIn)
-
void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void join()
-
MessageEndpointServerHandler(MessageEndpointServer *serverIn, bool asyncIn, const std::string &inprocLabelIn, int nThreadsIn)
-
class MessageTimeoutException : public faabric::util::FaabricException
Public Functions
-
inline explicit MessageTimeoutException(std::string message)
-
inline explicit MessageTimeoutException(std::string message)
-
class PointToPointBroker
Public Functions
-
PointToPointBroker()
-
std::string getHostForReceiver(int groupId, int recvIdx)
-
int getMpiPortForReceiver(int groupId, int recvIdx)
-
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
-
void setAndSendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
-
void sendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision, const std::set<std::string> &hostList)
-
void waitForMappingsOnThisHost(int groupId)
-
std::set<int> getIdxsRegisteredForGroup(int groupId)
-
std::set<std::string> getHostsRegisteredForGroup(int groupId)
-
void updateHostForIdx(int groupId, int groupIdx, std::string newHost)
-
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, std::string hostHint, bool mustOrderMsg = false)
-
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, bool mustOrderMsg = false, int sequenceNum = NO_SEQUENCE_NUM, std::string hostHint = "")
-
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx, bool mustOrderMsg = false)
-
void clearGroup(int groupId)
-
void clear()
-
void resetThreadLocalCache()
-
void postMigrationHook(faabric::Message &msg)
-
PointToPointBroker()
-
class PointToPointClient : public faabric::transport::MessageEndpointClient
Public Functions
-
PointToPointClient(const std::string &hostIn)
-
void sendMappings(faabric::PointToPointMappings &mappings)
-
void sendMessage(faabric::PointToPointMessage &msg, int sequenceNum = NO_SEQUENCE_NUM)
-
void groupLock(int appId, int groupId, int groupIdx, bool recursive = false)
-
void groupUnlock(int appId, int groupId, int groupIdx, bool recursive = false)
-
PointToPointClient(const std::string &hostIn)
-
class PointToPointGroup
Public Functions
-
PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
-
void lock(int groupIdx, bool recursive)
-
void unlock(int groupIdx, bool recursive)
-
int getLockOwner(bool recursive)
-
void localLock()
-
void localUnlock()
-
bool localTryLock()
-
void barrier(int groupIdx)
-
void notify(int groupIdx)
-
int getNotifyCount()
Public Static Functions
-
static std::shared_ptr<PointToPointGroup> getGroup(int groupId)
-
static std::shared_ptr<PointToPointGroup> getOrAwaitGroup(int groupId)
-
static bool groupExists(int groupId)
-
static void addGroup(int appId, int groupId, int groupSize, bool isSingleHost)
-
static void addGroupIfNotExists(int appId, int groupId, int groupSize)
-
static void clearGroup(int groupId)
-
static void clear()
-
PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
-
class PointToPointServer : public faabric::transport::MessageEndpointServer
Public Functions
-
PointToPointServer()
-
PointToPointServer()
-
class RecvMessageEndpoint : public faabric::transport::MessageEndpoint
Subclassed by faabric::transport::AsyncDirectRecvEndpoint, faabric::transport::AsyncInternalRecvMessageEndpoint, faabric::transport::AsyncRecvMessageEndpoint, faabric::transport::SyncRecvMessageEndpoint
Public Functions
-
RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)
Constructor for external TCP sockets
-
RecvMessageEndpoint(std::string inProcLabel, int timeoutMs, SocketType socketType, MessageEndpointConnectType connectType)
Constructor for internal inproc sockets
-
inline virtual ~RecvMessageEndpoint()
-
virtual Message recv()
-
RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)
-
class SyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint
Public Functions
-
SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class SyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
SyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
void sendResponse(uint8_t header, const uint8_t *data, size_t dataSize)
-
SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class SyncSendMessageEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void sendRaw(const uint8_t *data, size_t dataSize)
-
Message sendAwaitResponse(uint8_t header, const uint8_t *data, size_t dataSize)
-
SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
namespace tcp
Functions
-
void setReuseAddr(int connFd)
-
void setNoDelay(int connFd)
-
void setQuickAck(int connFd)
-
void setNonBlocking(int connFd)
-
void setBlocking(int connFd)
-
bool isNonBlocking(int connFd)
-
void setBusyPolling(int connFd)
-
void setRecvTimeoutMs(int connFd, int timeoutMs)
-
void setSendTimeoutMs(int connFd, int timeoutMs)
-
void setRecvBufferSize(int connFd, size_t bufferSizeBytes)
-
void setSendBufferSize(int connFd, size_t bufferSizeBytes)
Variables
-
const int SocketTimeoutMs = 5000
-
const size_t SocketBufferSizeBytes = 16777216
-
class Address
Public Functions
-
Address(const std::string &host, int port)
-
Address(int port)
-
sockaddr *get() const
-
Address(const std::string &host, int port)
-
class RecvSocket
Public Functions
-
RecvSocket(int port, const std::string &host = ANY_HOST)
-
RecvSocket(const RecvSocket &recvSocket) = delete
-
~RecvSocket()
-
void listen()
-
int accept()
-
void recvOne(int conn, uint8_t *buffer, size_t bufferSize)
-
RecvSocket(int port, const std::string &host = ANY_HOST)
-
class SendSocket
Public Functions
-
SendSocket(const std::string &host, int port)
-
void dial()
-
void sendOne(const uint8_t *buffer, size_t bufferSize)
-
SendSocket(const std::string &host, int port)
-
class Socket
Public Functions
-
Socket()
-
Socket(int connFd)
-
Socket(const Socket &socket) = delete
-
~Socket()
-
inline int get() const
-
Socket()
-
void setReuseAddr(int connFd)
-
enum class MessageResponseCode