File PointToPointBroker.h

Defines

DEFAULT_DISTRIBUTED_TIMEOUT_MS
POINT_TO_POINT_MAIN_IDX
namespace faabric
namespace transport
class PointToPointBroker

Public Functions

PointToPointBroker()
std::string getHostForReceiver(int groupId, int recvIdx)
int getMpiPortForReceiver(int groupId, int recvIdx)
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
void setAndSendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
void sendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision, const std::set<std::string> &hostList)
void waitForMappingsOnThisHost(int groupId)
std::set<int> getIdxsRegisteredForGroup(int groupId)
std::set<std::string> getHostsRegisteredForGroup(int groupId)
void updateHostForIdx(int groupId, int groupIdx, std::string newHost)
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, std::string hostHint, bool mustOrderMsg = false)
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, bool mustOrderMsg = false, int sequenceNum = NO_SEQUENCE_NUM, std::string hostHint = "")
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx, bool mustOrderMsg = false)
void clearGroup(int groupId)
void clear()
void resetThreadLocalCache()
void postMigrationHook(faabric::Message &msg)

Private Functions

std::shared_ptr<faabric::util::FlagWaiter> getGroupFlag(int groupId)
Message doRecvMessage(int groupId, int sendIdx, int recvIdx)
void initSequenceCounters(int groupId)
int getAndIncrementSentMsgCount(int groupId, int recvIdx)
void incrementRecvMsgCount(int groupId, int sendIdx)
int getExpectedSeqNum(int groupId, int sendIdx)

Private Members

faabric::util::SystemConfig &conf
std::shared_mutex brokerMutex
std::unordered_map<int, std::set<int>> groupIdIdxsMap
std::unordered_map<std::string, std::string> mappings
std::unordered_map<std::string, int> mpiPortMappings
std::unordered_map<int, std::shared_ptr<faabric::util::FlagWaiter>> groupFlags
class PointToPointGroup

Public Functions

PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
void lock(int groupIdx, bool recursive)
void unlock(int groupIdx, bool recursive)
int getLockOwner(bool recursive)
void localLock()
void localUnlock()
bool localTryLock()
void barrier(int groupIdx)
void notify(int groupIdx)
int getNotifyCount()

Public Static Functions

static std::shared_ptr<PointToPointGroup> getGroup(int groupId)
static std::shared_ptr<PointToPointGroup> getOrAwaitGroup(int groupId)
static bool groupExists(int groupId)
static void addGroup(int appId, int groupId, int groupSize, bool isSingleHost)
static void addGroupIfNotExists(int appId, int groupId, int groupSize)
static void clearGroup(int groupId)
static void clear()

Private Functions

void notifyLocked(int groupIdx)

Private Members

faabric::util::SystemConfig &conf
std::string mainHost
int appId = 0
int groupId = 0
int groupSize = 0
bool isSingleHost
std::shared_mutex mx
faabric::transport::PointToPointBroker &ptpBroker
std::timed_mutex localMx
std::recursive_timed_mutex localRecursiveMx
std::barrier<void (*)()> localBarrier
std::stack<int> recursiveLockOwners
std::atomic<int> lockOwnerIdx = -1
std::queue<int> lockWaiters