File MessageEndpoint.h

Defines

DEFAULT_SOCKET_TIMEOUT_MS
LINGER_MS
namespace faabric
namespace transport

Enums

enum MessageEndpointConnectType

Values:

enumerator BIND
enumerator CONNECT
enum class SocketType

Values:

enumerator pair
enumerator pub
enumerator sub
enumerator pull
enumerator push
enumerator rep
enumerator req
class AsyncDirectRecvEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncDirectSendEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize)
class AsyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint

Public Functions

AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
class AsyncInternalRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncInternalSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNumber = NO_SEQUENCE_NUM)
class AsyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
AsyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNum = NO_SEQUENCE_NUM)
class FanMessageEndpoint : public faabric::transport::MessageEndpoint

Subclassed by faabric::transport::AsyncFanMessageEndpoint, faabric::transport::SyncFanMessageEndpoint

Public Functions

FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
MessageContext attachFanOut()
Message recv(const MessageContext &ctx)
void sendResponse(const MessageContext &ctx, uint8_t header, const uint8_t *data, size_t dataSize)
void stop()

Private Members

std::string controlSockAddress
bool isAsync
class MessageContext

Public Functions

MessageContext() = default
inline explicit MessageContext(nng_ctx context)
MessageContext(const MessageContext&) = delete
MessageContext &operator=(const MessageContext&) = delete
inline MessageContext(MessageContext &&rhs)
inline MessageContext &operator=(MessageContext &&rhs)
inline ~MessageContext()

Public Members

nng_ctx context = NNG_CTX_INITIALIZER
class MessageEndpoint

Subclassed by faabric::transport::AsyncDirectSendEndpoint, faabric::transport::AsyncInternalSendMessageEndpoint, faabric::transport::AsyncSendMessageEndpoint, faabric::transport::FanMessageEndpoint, faabric::transport::RecvMessageEndpoint, faabric::transport::SyncSendMessageEndpoint

Public Functions

MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
MessageEndpoint(const std::string &addressIn, int timeoutMsIn)
MessageEndpoint &operator=(const MessageEndpoint&) = delete
MessageEndpoint(const MessageEndpoint &ctx) = delete
virtual ~MessageEndpoint()
std::string getAddress()

Protected Functions

void setUpSocket(SocketType socketType, MessageEndpointConnectType connectType)

This is where we set up all our sockets. It handles setting timeouts and catching errors in the creation process, as well as logging and validating our use of socket types and connection types.

void sendMessage(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNumber = NO_SEQUENCE_NUM, std::optional<nng_ctx> context = std::nullopt)
Message recvMessage(bool async, std::optional<nng_ctx> context = std::nullopt)
MessageContext createContext()
void close()

Protected Attributes

const std::string address
const int timeoutMs = -1
const std::thread::id tid
const int id = -1
int lingerMs = -1
nng_socket socket = NNG_SOCKET_INITIALIZER
std::variant<std::monostate, nng_dialer, nng_listener> connectionManager
class MessageTimeoutException : public faabric::util::FaabricException

Public Functions

inline explicit MessageTimeoutException(std::string message)
class RecvMessageEndpoint : public faabric::transport::MessageEndpoint

Subclassed by faabric::transport::AsyncDirectRecvEndpoint, faabric::transport::AsyncInternalRecvMessageEndpoint, faabric::transport::AsyncRecvMessageEndpoint, faabric::transport::SyncRecvMessageEndpoint

Public Functions

RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)

Constructor for external TCP sockets

RecvMessageEndpoint(std::string inProcLabel, int timeoutMs, SocketType socketType, MessageEndpointConnectType connectType)

Constructor for internal inproc sockets

inline virtual ~RecvMessageEndpoint()
virtual Message recv()

Protected Functions

Message doRecv(bool async)
class SyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint

Public Functions

SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
class SyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
SyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
void sendResponse(uint8_t header, const uint8_t *data, size_t dataSize)
class SyncSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void sendRaw(const uint8_t *data, size_t dataSize)
Message sendAwaitResponse(uint8_t header, const uint8_t *data, size_t dataSize)