Namespace faabric

namespace faabric
namespace batch_scheduler

Typedefs

typedef std::pair<std::shared_ptr<BatchExecuteRequest>, std::shared_ptr<SchedulingDecision>> InFlightPair
typedef std::map<int32_t, InFlightPair> InFlightReqs
typedef std::shared_ptr<HostState> Host
typedef std::map<std::string, Host> HostMap

Enums

enum DecisionType

Values:

enumerator NO_DECISION_TYPE
enumerator NEW
enumerator DIST_CHANGE
enumerator SCALE_CHANGE
enum SchedulingTopologyHint

Values:

enumerator NONE
enumerator CACHED
enumerator FORCE_LOCAL
enumerator NEVER_ALONE
enumerator UNDERFULL
enum MigrationStrategy

Values:

enumerator BIN_PACK
enumerator EMPTY_HOSTS

Functions

std::shared_ptr<BatchScheduler> getBatchScheduler()
void resetBatchScheduler()
void resetBatchScheduler(const std::string &newMode)
DecisionCache &getSchedulingDecisionCache()
static std::map<std::string, int> getHostFreqCount(std::shared_ptr<SchedulingDecision> decision)
static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(std::shared_ptr<SchedulingDecision> newDecision, std::shared_ptr<SchedulingDecision> oldDecision)
static std::map<std::string, int> getHostFreqCount(std::shared_ptr<SchedulingDecision> decision)
static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(std::shared_ptr<SchedulingDecision> newDecision, std::shared_ptr<SchedulingDecision> oldDecision)
static HostMap deepCopyHostMap(const HostMap &hostMap)
static void filterHosts(HostMap &hostMap, const InFlightReqs &inFlightReqs, std::shared_ptr<faabric::BatchExecuteRequest> req)
static std::map<std::string, int> getHostFreqCount(std::shared_ptr<SchedulingDecision> decision)
static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(std::shared_ptr<SchedulingDecision> newDecision, std::shared_ptr<SchedulingDecision> oldDecision)
static std::set<std::string> filterHosts(HostMap &hostMap)

Variables

const std::unordered_map<std::string, SchedulingTopologyHint> strToTopologyHint = {{"NONE", SchedulingTopologyHint::NONE}, {"CACHED", SchedulingTopologyHint::CACHED}, {"FORCE_LOCAL", SchedulingTopologyHint::FORCE_LOCAL}, {"NEVER_ALONE", SchedulingTopologyHint::NEVER_ALONE}, {"UNDERFULL", SchedulingTopologyHint::UNDERFULL},}
const std::unordered_map<SchedulingTopologyHint, std::string> topologyHintToStr = {{SchedulingTopologyHint::NONE, "NONE"}, {SchedulingTopologyHint::CACHED, "CACHED"}, {SchedulingTopologyHint::FORCE_LOCAL, "FORCE_LOCAL"}, {SchedulingTopologyHint::NEVER_ALONE, "NEVER_ALONE"}, {SchedulingTopologyHint::UNDERFULL, "UNDERFULL"},}
static std::shared_ptr<BatchScheduler> batchScheduler = nullptr
class BatchScheduler

Subclassed by faabric::batch_scheduler::BinPackScheduler, faabric::batch_scheduler::CompactScheduler, faabric::batch_scheduler::SpotScheduler

Public Functions

virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(HostMap &hostMap, const InFlightReqs &inFlightReqs, std::shared_ptr<faabric::BatchExecuteRequest> req) = 0

Public Static Functions

static DecisionType getDecisionType(const InFlightReqs &inFlightReqs, std::shared_ptr<faabric::BatchExecuteRequest> req)
class BinPackScheduler : public faabric::batch_scheduler::BatchScheduler

Public Functions

virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(HostMap &hostMap, const InFlightReqs &inFlightReqs, std::shared_ptr<faabric::BatchExecuteRequest> req) override
class CachedDecision
#include <DecisionCache.h>

A record of a decision already taken for the given size of batch request for the given function. This doesn’t contain the messages themselves, just the hosts and group ID that was used.

Public Functions

CachedDecision(const std::vector<std::string> &hostsIn, int groupIdIn)
inline std::vector<std::string> getHosts()
inline int getGroupId() const
class CompactScheduler : public faabric::batch_scheduler::BatchScheduler

Public Functions

virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(HostMap &hostMap, const InFlightReqs &inFlightReqs, std::shared_ptr<faabric::BatchExecuteRequest> req) override
class DecisionCache
#include <DecisionCache.h>

Repository for cached scheduling decisions. Object is not thread safe as we assume only a single executor will be caching decisions for a given function and size of batch request on one host at a time.

Public Functions

std::shared_ptr<CachedDecision> getCachedDecision(std::shared_ptr<faabric::BatchExecuteRequest> req)
void addCachedDecision(std::shared_ptr<BatchExecuteRequest> req, SchedulingDecision &decision)
void clear()
struct HostState

Public Functions

inline HostState(const std::string &ipIn, int slotsIn, int usedSlotsIn)

Public Members

std::string ip
int slots
int usedSlots
class SchedulingDecision

Public Functions

SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn)
bool operator==(const SchedulingDecision &rhs) const = default
bool isSingleHost() const

Work out if this decision is all in one host. If the decision is completely on another host, we still count it as not being on a single host, as this host will be the main.

Will always return false if single host optimisations are switched off.

void addMessage(const std::string &host, const faabric::Message &msg)
void addMessage(const std::string &host, int32_t messageId, int32_t appIdx)
void addMessage(const std::string &host, int32_t messageId, int32_t appIdx, int32_t groupIdx)
void addMessageInPosition(int32_t pos, const std::string &host, int32_t messageId, int32_t appIdx, int32_t groupIdx, int32_t mpiPort)
int32_t removeMessage(int32_t messageId)
std::set<std::string> uniqueHosts()
void print(const std::string &logLevel = "debug")

Public Members

uint32_t appId = 0
int32_t groupId = 0
int32_t nFunctions = 0
std::vector<std::string> hosts
std::vector<int32_t> messageIds
std::vector<int32_t> appIdxs
std::vector<int32_t> groupIdxs
std::vector<int32_t> mpiPorts
std::string returnHost

Public Static Functions

static SchedulingDecision fromPointToPointMappings(faabric::PointToPointMappings &mappings)
class SpotScheduler : public faabric::batch_scheduler::BatchScheduler

Public Functions

virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(HostMap &hostMap, const InFlightReqs &inFlightReqs, std::shared_ptr<faabric::BatchExecuteRequest> req) override
namespace endpoint

Typedefs

using header = beast::http::field

Enums

enum class EndpointMode

Values:

enumerator SIGNAL
enumerator BG_THREAD
class FaabricEndpoint

Public Functions

FaabricEndpoint()
FaabricEndpoint(int port, int threadCount, std::shared_ptr<HttpRequestHandler> requestHandlerIn)
FaabricEndpoint(const FaabricEndpoint&) = delete
FaabricEndpoint(FaabricEndpoint&&) = delete
FaabricEndpoint &operator=(const FaabricEndpoint&) = delete
FaabricEndpoint &operator=(FaabricEndpoint&&) = delete
virtual ~FaabricEndpoint()
void start(EndpointMode mode = EndpointMode::SIGNAL)
void stop()
class FaabricEndpointHandler : public faabric::endpoint::HttpRequestHandler, public std::enable_shared_from_this<FaabricEndpointHandler>

Public Functions

virtual void onRequest(HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) override
struct HttpRequestContext

Public Members

asio::io_context &ioc
asio::any_io_executor executor
std::function<void(faabric::util::BeastHttpResponse&&)> sendFunction
class HttpRequestHandler

Subclassed by faabric::endpoint::FaabricEndpointHandler, faabric::planner::PlannerEndpointHandler

Public Functions

virtual void onRequest(HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) = 0
namespace detail
class EndpointState

Public Functions

inline EndpointState(int threadCountIn)

Public Members

asio::io_context ioc
std::vector<std::jthread> ioThreads
namespace executor

Functions

void setExecutorFactory(std::shared_ptr<ExecutorFactory> fac)
std::shared_ptr<ExecutorFactory> getExecutorFactory()

Variables

static thread_local std::shared_ptr<ExecutorContext> context = nullptr
static std::shared_ptr<ExecutorFactory> _factory
class ChainedCallException : public faabric::util::FaabricException

Public Functions

inline explicit ChainedCallException(std::string message)
class Executor

Public Functions

explicit Executor(faabric::Message &msg)
virtual ~Executor()
void executeTasks(std::vector<int> msgIdxs, std::shared_ptr<faabric::BatchExecuteRequest> req)
virtual void shutdown()

Shuts down the executor and clears all its state, including its thread pool.

This must be called before destructing an executor. This is because the tidy-up requires implementations of virtual methods held in subclasses, that may depend on state that those subclass instances hold. Because destructors run in inheritance order, this means that state may have been destructed before the executor destructor runs.

virtual void reset(faabric::Message &msg)
virtual int32_t executeTask(int threadPoolIdx, int msgIdx, std::shared_ptr<faabric::BatchExecuteRequest> req)
bool tryClaim()
void claim()
void releaseClaim()
std::shared_ptr<faabric::util::SnapshotData> getMainThreadSnapshot(faabric::Message &msg, bool createIfNotExists = false)
long getMillisSinceLastExec()
virtual std::span<uint8_t> getMemoryView()
virtual void restore(const std::string &snapshotKey)
faabric::Message &getBoundMessage()
bool isExecuting()
inline bool isShutdown()
void addChainedMessage(const faabric::Message &msg)
const faabric::Message &getChainedMessage(int messageId)
std::set<unsigned int> getChainedMessageIds()
std::vector<faabric::util::SnapshotDiff> mergeDirtyRegions(const Message &msg, const std::vector<char> &extraDirtyPages = {})
void setThreadResult(faabric::Message &msg, int32_t returnValue, const std::string &key, const std::vector<faabric::util::SnapshotDiff> &diffs)
virtual void setMemorySize(size_t newSize)

Public Members

std::string id
class ExecutorContext
#include <ExecutorContext.h>

Globally-accessible wrapper that allows executing applications to query their execution context. The context is thread-local, so applications can query which specific message they are executing.

Public Functions

ExecutorContext(Executor *executorIn, std::shared_ptr<faabric::BatchExecuteRequest> reqIn, int msgIdx)
inline Executor *getExecutor()
inline std::shared_ptr<faabric::BatchExecuteRequest> getBatchRequest()
inline faabric::Message &getMsg()
inline int getMsgIdx()

Public Static Functions

static bool isSet()
static void set(Executor *executorIn, std::shared_ptr<faabric::BatchExecuteRequest> reqIn, int msgIdxIn)
static void unset()
static std::shared_ptr<ExecutorContext> get()
class ExecutorContextException : public faabric::util::FaabricException

Public Functions

inline explicit ExecutorContextException(std::string message)
class ExecutorFactory

Public Functions

inline virtual ~ExecutorFactory()
virtual std::shared_ptr<Executor> createExecutor(faabric::Message &msg) = 0
virtual void flushHost()
class ExecutorTask

Public Functions

ExecutorTask() = default
ExecutorTask(int messageIndexIn, std::shared_ptr<BatchExecuteRequest> reqIn)
ExecutorTask(const ExecutorTask &other) = delete
ExecutorTask &operator=(const ExecutorTask &other) = delete
ExecutorTask(ExecutorTask &&other) = default
ExecutorTask &operator=(ExecutorTask &&other) = default

Public Members

std::shared_ptr<BatchExecuteRequest> req
int messageIndex = 0
namespace mpi

Typedefs

typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue

Enums

enum MpiMessageType

Values:

enumerator NORMAL
enumerator BARRIER_JOIN
enumerator BARRIER_DONE
enumerator SCATTER
enumerator GATHER
enumerator ALLGATHER
enumerator REDUCE
enumerator SCAN
enumerator ALLREDUCE
enumerator ALLTOALL
enumerator ALLTOALL_PACKED
enumerator SENDRECV
enumerator BROADCAST
enumerator UNACKED_MPI_MESSAGE
enumerator HANDSHAKE

Functions

inline size_t payloadSize(const MpiMessage &msg)
inline size_t msgSize(const MpiMessage &msg)
void serializeMpiMsg(std::vector<uint8_t> &buffer, const MpiMessage &msg)
void parseMpiMsg(const std::vector<uint8_t> &bytes, MpiMessage *msg)
std::vector<MpiMessage> getMpiMockedMessages(int sendRank)
MpiWorldRegistry &getMpiWorldRegistry()
static int32_t getAsyncRequestId(int sendRank, int recvRank, bool isSend)
static std::tuple<int, int, bool> getRanksFromRequestId(int32_t requestId)
void checkSendRecvMatch(faabric_datatype_t *sendType, int sendCount, faabric_datatype_t *recvType, int recvCount)

Variables

static std::mutex mockMutex
static std::map<int, std::vector<MpiMessage>> mpiMockedMessages
static thread_local MpiRankState rankState
const uint8_t iSendMagic = 0xFF
const uint8_t iRecvMagic = 0x00
class MpiContext

Public Functions

MpiContext()
int createWorld(faabric::Message &msg)
void joinWorld(faabric::Message &msg)
bool getIsMpi() const
int getRank() const
int getWorldId() const
struct MpiMessage

Public Members

int32_t id
int32_t worldId
int32_t sendRank
int32_t recvRank
int32_t typeSize
int32_t count
int32_t requestId
MpiMessageType messageType
void *buffer
struct MpiRankState

Public Functions

inline void reset()

Public Members

int msgCount = 1
faabric::Message *msg = nullptr
std::vector<std::unique_ptr<std::list<MpiMessage>>> unackedMessageBuffers
std::unique_ptr<faabric::util::FaabricCpuSet> pinnedCpu
std::vector<std::unique_ptr<faabric::transport::tcp::SendSocket>> sendSockets
std::unique_ptr<faabric::transport::tcp::RecvSocket> recvSocket
std::vector<int> recvConnPool
class MpiWorld

Public Functions

MpiWorld()
void create(faabric::Message &call, int newId, int newSize)
void broadcastHostsToRanks()
void initialiseFromMsg(faabric::Message &msg)
void initialiseRankFromMsg(faabric::Message &msg)
std::string getHostForRank(int rank)
std::string getUser()
std::string getFunction()
int getId() const
int getSize() const
bool destroy()
void getCartesianRank(int rank, int maxDims, const int *dims, int *periods, int *coords)
void getRankFromCoords(int *rank, int *coords)
void shiftCartesianCoords(int rank, int direction, int disp, int *source, int *destination)
void send(int sendRank, int recvRank, const uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
int isend(int sendRank, int recvRank, const uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
void broadcast(int rootRank, int thisRank, uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
void recv(int sendRank, int recvRank, uint8_t *buffer, faabric_datatype_t *dataType, int count, MPI_Status *status, MpiMessageType messageType = MpiMessageType::NORMAL)
int irecv(int sendRank, int recvRank, uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
void awaitAsyncRequest(int requestId)
void sendRecv(uint8_t *sendBuffer, int sendcount, faabric_datatype_t *sendDataType, int sendRank, uint8_t *recvBuffer, int recvCount, faabric_datatype_t *recvDataType, int recvRank, int myRank, MPI_Status *status)
void scatter(int sendRank, int recvRank, const uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
void gather(int sendRank, int recvRank, const uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
void allGather(int rank, const uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
void reduce(int sendRank, int recvRank, uint8_t *sendBuffer, uint8_t *recvBuffer, faabric_datatype_t *datatype, int count, faabric_op_t *operation)
void allReduce(int rank, uint8_t *sendBuffer, uint8_t *recvBuffer, faabric_datatype_t *datatype, int count, faabric_op_t *operation)
void op_reduce(faabric_op_t *operation, faabric_datatype_t *datatype, int count, uint8_t *inBuffer, uint8_t *resultBuffer)
void scan(int rank, uint8_t *sendBuffer, uint8_t *recvBuffer, faabric_datatype_t *datatype, int count, faabric_op_t *operation)
void allToAll(int rank, uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
void probe(int sendRank, int recvRank, MPI_Status *status)
void barrier(int thisRank)
std::shared_ptr<InMemoryMpiQueue> getLocalQueue(int sendRank, int recvRank)
long getLocalQueueSize(int sendRank, int recvRank)
void overrideHost(const std::string &newHost)
double getWTime()
void prepareMigration(int newGroupId, int thisRank, bool thisRankMustMigrate)
class MpiWorldRegistry

Public Functions

MpiWorldRegistry() = default
MpiWorld &createWorld(faabric::Message &msg, int worldId, std::string hostOverride = "")
MpiWorld &getOrInitialiseWorld(faabric::Message &msg)
MpiWorld &getWorld(int worldId)
bool worldExists(int worldId)
void clearWorld(int worldId)
void clear()
namespace planner

Typedefs

typedef std::promise<std::shared_ptr<faabric::Message>> MessageResultPromise
typedef std::shared_ptr<MessageResultPromise> MessageResultPromisePtr
using header = beast::http::field

Enums

enum FlushType

Values:

enumerator NoFlushType
enumerator Hosts
enumerator Executors
enumerator SchedulingState
enum PlannerCalls

Values:

enumerator NoPlanerCall
enumerator Ping
enumerator GetAvailableHosts
enumerator RegisterHost
enumerator RemoveHost
enumerator SetMessageResult
enumerator GetMessageResult
enumerator GetBatchResults
enumerator GetSchedulingDecision
enumerator GetNumMigrations
enumerator CallBatch
enumerator PreloadSchedulingDecision

Functions

Planner &getPlanner()
PlannerClient &getPlannerClient()
int availableOpenMpSlots(int appId, const std::string &mainHost, const std::map<std::string, std::shared_ptr<Host>> &hostMap, const faabric::batch_scheduler::InFlightReqs &inFlightReqs)
static void claimHostSlots(std::shared_ptr<Host> host, int slotsToClaim = 1)
static void releaseHostSlots(std::shared_ptr<Host> host, int slotsToRelease = 1)
static int claimHostMpiPort(std::shared_ptr<Host> host)
static void releaseHostMpiPort(std::shared_ptr<Host> host, int mpiPort)
static void printHostState(std::map<std::string, std::shared_ptr<Host>> hostMap, const std::string &logLevel = "debug")
static faabric::batch_scheduler::HostMap convertToBatchSchedHostMap(std::map<std::string, std::shared_ptr<Host>> hostMapIn, const std::set<std::string> &nextEvictedHostIps)
class KeepAliveThread : public faabric::util::PeriodicBackgroundThread

Public Functions

virtual void doWork() override
void setRequest(std::shared_ptr<RegisterHostRequest> thisHostReqIn)

Public Members

std::shared_ptr<RegisterHostRequest> thisHostReq = nullptr
class Planner

Public Functions

Planner()
PlannerConfig getConfig()
void printConfig() const
std::string getPolicy()
void setPolicy(const std::string &newPolicy)
bool reset()
bool flush(faabric::planner::FlushType flushType)
std::vector<std::shared_ptr<Host>> getAvailableHosts()
bool registerHost(const Host &hostIn, bool overwrite)
void removeHost(const Host &hostIn)
void setMessageResult(std::shared_ptr<faabric::Message> msg)
std::shared_ptr<faabric::Message> getMessageResult(std::shared_ptr<faabric::Message> msg)
void preloadSchedulingDecision(int appId, std::shared_ptr<batch_scheduler::SchedulingDecision> decision)
std::shared_ptr<batch_scheduler::SchedulingDecision> getPreloadedSchedulingDecision(int32_t appId, std::shared_ptr<BatchExecuteRequest> ber)
std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(int32_t appId)
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> getSchedulingDecision(std::shared_ptr<BatchExecuteRequest> req)
faabric::batch_scheduler::InFlightReqs getInFlightReqs()
int getNumMigrations()
std::set<std::string> getNextEvictedHostIps()
std::map<int32_t, std::shared_ptr<BatchExecuteRequest>> getEvictedReqs()
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> callBatch(std::shared_ptr<BatchExecuteRequest> req)
void setNextEvictedVm(const std::set<std::string> &vmIp)
struct PlannerCache

Public Members

std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults
std::set<std::string> pushedSnapshots
class PlannerClient : public faabric::transport::MessageEndpointClient

Public Functions

PlannerClient()
PlannerClient(const std::string &plannerIp)
void ping()
void clearCache()
std::vector<Host> getAvailableHosts()
int registerHost(std::shared_ptr<RegisterHostRequest> req)
void removeHost(std::shared_ptr<RemoveHostRequest> req)
void setMessageResult(std::shared_ptr<faabric::Message> msg)
void setMessageResultLocally(std::shared_ptr<faabric::Message> msg)
faabric::Message getMessageResult(int appId, int msgId, int timeoutMs)
faabric::Message getMessageResult(const faabric::Message &msg, int timeoutMs)
std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(std::shared_ptr<faabric::BatchExecuteRequest> req)
faabric::batch_scheduler::SchedulingDecision callFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req)
faabric::batch_scheduler::SchedulingDecision getSchedulingDecision(std::shared_ptr<faabric::BatchExecuteRequest> req)
int getNumMigrations()
void preloadSchedulingDecision(std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> preloadDec)
class PlannerEndpointHandler : public faabric::endpoint::HttpRequestHandler, public std::enable_shared_from_this<PlannerEndpointHandler>

Public Functions

virtual void onRequest(faabric::endpoint::HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) override
class PlannerServer : public faabric::transport::MessageEndpointServer

Public Functions

PlannerServer()
struct PlannerState

Public Members

std::string policy
std::map<std::string, std::shared_ptr<Host>> hostMap
std::map<int, std::map<int, std::shared_ptr<faabric::Message>>> appResults
std::map<int, std::vector<std::string>> appResultWaiters
faabric::batch_scheduler::InFlightReqs inFlightReqs
std::map<int, std::shared_ptr<batch_scheduler::SchedulingDecision>> preloadedSchedulingDecisions
std::atomic<int> numMigrations = 0
std::map<int, std::shared_ptr<BatchExecuteRequest>> evictedRequests
std::set<std::string> nextEvictedHostIps
namespace redis

Typedefs

using UniqueRedisReply = std::unique_ptr<redisReply, decltype(&freeReplyObject)>

Enums

enum RedisRole

Values:

enumerator QUEUE
enumerator STATE

Functions

UniqueRedisReply wrapReply(redisReply *r)
UniqueRedisReply safeRedisCommand(redisContext *c, const char *format, ...)
long getLongFromReply(const redisReply &reply)
std::vector<uint8_t> getBytesFromReply(const redisReply &reply)
void getBytesFromReply(const std::string &key, const redisReply &reply, uint8_t *buffer, size_t bufferLen)
long extractScriptResult(const redisReply &reply)

—&#8212; Lua scripts —&#8212;

std::set<std::string> extractStringSetFromReply(const redisReply &reply)
class Redis

Public Functions

~Redis()
void ping()

—&#8212; Standard Redis commands —&#8212;

std::vector<uint8_t> get(const std::string &key)
size_t strlen(const std::string &key)
void get(const std::string &key, uint8_t *buffer, size_t size)
void set(const std::string &key, const std::vector<uint8_t> &value)
void set(const std::string &key, const uint8_t *value, size_t size)
void del(const std::string &key)
long getCounter(const std::string &key)
long incr(const std::string &key)
long decr(const std::string &key)
long incrByLong(const std::string &key, long val)
long decrByLong(const std::string &key, long val)
void setRange(const std::string &key, long offset, const uint8_t *value, size_t size)
void setRangePipeline(const std::string &key, long offset, const uint8_t *value, size_t size)
void flushPipeline(long pipelineLength)
void getRange(const std::string &key, uint8_t *buffer, size_t bufferLen, long start, long end)

Note that start/end are both inclusive

void sadd(const std::string &key, const std::string &value)
void srem(const std::string &key, const std::string &value)
long scard(const std::string &key)
bool sismember(const std::string &key, const std::string &value)
std::string srandmember(const std::string &key)
std::set<std::string> smembers(const std::string &key)
std::set<std::string> sdiff(const std::string &keyA, const std::string &keyB)
std::set<std::string> sinter(const std::string &keyA, const std::string &keyB)
int lpushLong(const std::string &key, long val)
int rpushLong(const std::string &key, long val)
void flushAll()
long listLength(const std::string &queueName)
long getTtl(const std::string &key)
void expire(const std::string &key, long expiry)
void refresh()
uint32_t acquireLock(const std::string &key, int expirySeconds)

—&#8212; Locking —&#8212;

void releaseLock(const std::string &key, uint32_t lockId)
void delIfEq(const std::string &key, uint32_t value)
bool setnxex(const std::string &key, long value, int expirySeconds)
long getLong(const std::string &key)
void setLong(const std::string &key, long value)
void enqueue(const std::string &queueName, const std::string &value)

—&#8212; Queueing —&#8212;

void enqueueBytes(const std::string &queueName, const std::vector<uint8_t> &value)
void enqueueBytes(const std::string &queueName, const uint8_t *buffer, size_t bufferLen)
std::string dequeue(const std::string &queueName, int timeout = DEFAULT_TIMEOUT)
std::vector<uint8_t> dequeueBytes(const std::string &queueName, int timeout = DEFAULT_TIMEOUT)
size_t dequeueBytes(const std::string &queueName, uint8_t *buffer, size_t bufferLen, int timeout = DEFAULT_TIMEOUT)
void dequeueMultiple(const std::string &queueName, uint8_t *buff, long buffLen, long nElems)
void publishSchedulerResult(const std::string &key, const std::string &status_key, const std::vector<uint8_t> &result)

Public Static Functions

static Redis &getQueue()

—&#8212; Factories —&#8212;

static Redis &getState()

—&#8212; Utils —&#8212;

class RedisInstance

Public Functions

explicit RedisInstance(RedisRole role)

Public Members

std::string delifeqSha
std::string schedPublishSha
std::string ip
std::string hostname
int port
class RedisNoResponseException : public faabric::util::FaabricException

Public Functions

inline explicit RedisNoResponseException(std::string message)
namespace runner
class FaabricMain

Public Functions

FaabricMain(std::shared_ptr<faabric::executor::ExecutorFactory> fac)
void startBackground()
void startRunner()
void startFunctionCallServer()
void startStateServer()
void startSnapshotServer()
void startPointToPointServer()
void shutdown()
namespace scheduler

Typedefs

typedef faabric::util::Queue<faabric::Message> InMemoryMessageQueue
typedef std::pair<std::string, InMemoryMessageQueue*> InMemoryMessageQueuePair

Enums

enum FunctionCalls

Values:

enumerator NoFunctionCall
enumerator ExecuteFunctions
enumerator Flush
enumerator SetMessageResult

Functions

std::vector<std::pair<std::string, faabric::Message>> getFunctionCalls()
std::vector<std::pair<std::string, faabric::EmptyRequest>> getFlushCalls()
std::vector<std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>> getBatchRequests()
std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>> getMessageResults()
void clearMockRequests()
std::shared_ptr<FunctionCallClient> getFunctionCallClient(const std::string &otherHost)
void clearFunctionCallClients()
Scheduler &getScheduler()
std::string getChainedKey(unsigned int msgId)

Variables

static std::mutex mockMutex
static std::vector<std::pair<std::string, faabric::Message>> functionCalls
static std::vector<std::pair<std::string, faabric::EmptyRequest>> flushCalls
static std::vector<std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>> batchMessages
static std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>> messageResults
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<faabric::scheduler::FunctionCallClient>> functionCallClients
class FunctionCallClient : public faabric::transport::MessageEndpointClient

Public Functions

explicit FunctionCallClient(const std::string &hostIn)
void sendFlush()
void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req)
void setMessageResult(std::shared_ptr<faabric::Message> msg)
class FunctionCallServer : public faabric::transport::MessageEndpointServer

Public Functions

FunctionCallServer()
class Scheduler

Public Functions

Scheduler()
~Scheduler()
void executeBatch(std::shared_ptr<faabric::BatchExecuteRequest> req)
void reset()
void resetThreadLocalCache()
void shutdown()
inline bool isShutdown()
void broadcastSnapshotDelete(const faabric::Message &msg, const std::string &snapshotKey)
int reapStaleExecutors()
long getFunctionExecutorCount(const faabric::Message &msg)
void setThreadResultLocally(uint32_t appId, uint32_t msgId, int32_t returnValue, faabric::transport::Message &message)

Caches a message along with the thread result, to allow the thread result to refer to data held in that message (i.e. snapshot diffs). The message will be destroyed once the thread result is consumed.

std::vector<std::pair<uint32_t, int32_t>> awaitThreadResults(std::shared_ptr<faabric::BatchExecuteRequest> req, int timeoutMs = DEFAULT_THREAD_RESULT_TIMEOUT_MS)
size_t getCachedMessageCount()
std::string getThisHost()
void addHostToGlobalSet()
void addHostToGlobalSet(const std::string &host, std::shared_ptr<faabric::HostResources> overwriteResources = nullptr)
void removeHostFromGlobalSet(const std::string &host)
void setThisHostResources(faabric::HostResources &res)
std::vector<faabric::Message> getRecordedMessages()
void clearRecordedMessages()
std::shared_ptr<faabric::PendingMigration> checkForMigrationOpportunities(faabric::Message &msg, int overwriteNewGroupId = 0)
class SchedulerReaperThread : public faabric::util::PeriodicBackgroundThread
#include <Scheduler.h>

Background thread that periodically checks to see if any executors have become stale (i.e. not handled any requests in a given timeout). If any are found, they are removed.

Public Functions

virtual void doWork() override
namespace snapshot

Enums

enum SnapshotCalls

Values:

enumerator NoSnapshotCall
enumerator PushSnapshot
enumerator PushSnapshotUpdate
enumerator DeleteSnapshot
enumerator ThreadResult

Functions

std::vector<std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>> getSnapshotPushes()
std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>> getSnapshotDiffPushes()
std::vector<std::pair<std::string, std::string>> getSnapshotDeletes()
std::vector<std::pair<std::string, MockThreadResult>> getThreadResults()
void clearMockSnapshotRequests()
std::shared_ptr<SnapshotClient> getSnapshotClient(const std::string &otherHost)
void clearSnapshotClients()
SnapshotRegistry &getSnapshotRegistry()

Variables

static std::mutex mockMutex
static std::vector<std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>> snapshotPushes
static std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>> snapshotDiffPushes
static std::vector<std::pair<std::string, std::string>> snapshotDeletes
static std::vector<std::pair<std::string, MockThreadResult>> threadResults
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<faabric::snapshot::SnapshotClient>> snapshotClients
struct MockThreadResult

Public Members

uint32_t msgId = 0
int res = 0
std::string key
std::vector<faabric::util::SnapshotDiff> diffs
class SnapshotClient : public faabric::transport::MessageEndpointClient

Public Functions

explicit SnapshotClient(const std::string &hostIn)
void pushSnapshot(const std::string &key, std::shared_ptr<faabric::util::SnapshotData> data)
void pushSnapshotUpdate(std::string snapshotKey, const std::shared_ptr<faabric::util::SnapshotData> &data, const std::vector<faabric::util::SnapshotDiff> &diffs)
void pushThreadResult(uint32_t appId, uint32_t messageId, int returnValue, const std::string &key, const std::vector<faabric::util::SnapshotDiff> &diffs)
class SnapshotRegistry

Public Functions

SnapshotRegistry() = default
std::shared_ptr<faabric::util::SnapshotData> getSnapshot(const std::string &key)
bool snapshotExists(const std::string &key)
void registerSnapshot(const std::string &key, std::shared_ptr<faabric::util::SnapshotData> data)
void deleteSnapshot(const std::string &key)
size_t getSnapshotCount()
void clear()
class SnapshotServer : public faabric::transport::MessageEndpointServer

Public Functions

SnapshotServer()
namespace state

WARNING - key-value objects are shared between threads, BUT hiredis is not thread-safe, so make sure you always retrieve the reference to Redis inline rather than sharing a reference within the class.

Enums

enum InMemoryStateKeyStatus

Values:

enumerator NOT_MAIN
enumerator MAIN
enum StateCalls

Values:

enumerator NoStateCall
enumerator Pull
enumerator Push
enumerator Size
enumerator Append
enumerator ClearAppended
enumerator PullAppended
enumerator Delete

Functions

InMemoryStateRegistry &getInMemoryStateRegistry()
State &getGlobalState()
static std::string getMasterKey(const std::string &user, const std::string &key)
class AppendedInMemoryState

Public Functions

inline AppendedInMemoryState(size_t lengthIn, std::unique_ptr<uint8_t[]> &&dataIn)

Public Members

size_t length
std::unique_ptr<uint8_t[]> data
class InMemoryStateKeyValue : public faabric::state::StateKeyValue

Public Functions

InMemoryStateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn, const std::string &thisIPIn)
InMemoryStateKeyValue(const std::string &userIn, const std::string &keyIn, const std::string &thisIPIn)
bool isMaster()
AppendedInMemoryState &getAppendedValue(uint idx)

Public Static Functions

static size_t getStateSizeFromRemote(const std::string &userIn, const std::string &keyIn, const std::string &thisIPIn)
static void deleteFromRemote(const std::string &userIn, const std::string &keyIn, const std::string &thisIPIn)
static void clearAll(bool global)
class InMemoryStateRegistry

Public Functions

InMemoryStateRegistry() = default
std::string getMasterIP(const std::string &user, const std::string &key, const std::string &thisIP, bool claim)
std::string getMasterIPForOtherMaster(const std::string &userIn, const std::string &keyIn, const std::string &thisIP)
void clear()
class RedisStateKeyValue : public faabric::state::StateKeyValue

Public Functions

RedisStateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn)
RedisStateKeyValue(const std::string &userIn, const std::string &keyIn)

Public Static Functions

static size_t getStateSizeFromRemote(const std::string &userIn, const std::string &keyIn)
static void deleteFromRemote(const std::string &userIn, const std::string &keyIn)
static void clearAll(bool global)
class State

Public Functions

explicit State(std::string thisIPIn)
size_t getStateSize(const std::string &user, const std::string &keyIn)
std::shared_ptr<StateKeyValue> getKV(const std::string &user, const std::string &key, size_t size)
std::shared_ptr<StateKeyValue> getKV(const std::string &user, const std::string &key)
void forceClearAll(bool global)
void deleteKV(const std::string &userIn, const std::string &keyIn)
void deleteKVLocally(const std::string &userIn, const std::string &keyIn)
size_t getKVCount()
std::string getThisIP()
class StateChunk

Public Functions

inline StateChunk(long offsetIn, size_t lengthIn, uint8_t *dataIn)
inline StateChunk(long offsetIn, std::vector<uint8_t> &data)

Public Members

long offset
size_t length
uint8_t *data
class StateClient : public faabric::transport::MessageEndpointClient

Public Functions

explicit StateClient(const std::string &userIn, const std::string &keyIn, const std::string &hostIn)
void pushChunks(const std::vector<StateChunk> &chunks)
void pullChunks(const std::vector<StateChunk> &chunks, uint8_t *bufferStart)
void append(const uint8_t *data, size_t length)
void pullAppended(uint8_t *buffer, size_t length, long nValues)
void clearAppended()
size_t stateSize()
void deleteState()
void lock()
void unlock()

Public Members

const std::string user
const std::string key
class StateKeyValue

Subclassed by faabric::state::InMemoryStateKeyValue, faabric::state::RedisStateKeyValue

Public Functions

StateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn)
StateKeyValue(const std::string &userIn, const std::string &keyIn)
void get(uint8_t *buffer)
uint8_t *get()
void getChunk(long offset, uint8_t *buffer, size_t length)
uint8_t *getChunk(long offset, long len)
std::vector<StateChunk> getAllChunks()
void set(const uint8_t *buffer)
void setChunk(long offset, const uint8_t *buffer, size_t length)
void append(const uint8_t *buffer, size_t length)
void getAppended(uint8_t *buffer, size_t length, long nValues)
void clearAppended()
void mapSharedMemory(void *destination, long pagesOffset, long nPages)
void unmapSharedMemory(void *mappedAddr)
void pull()
void pushPartial()
void pushPartialMask(const std::shared_ptr<StateKeyValue> &maskKv)
void lockRead()
void unlockRead()
void lockWrite()
void unlockWrite()
void flagDirty()
void flagChunkDirty(long offset, long len)
size_t size() const
size_t getSharedMemorySize() const
void pushFull()

Public Members

const std::string user
const std::string key

Public Static Functions

static uint32_t waitOnRedisRemoteLock(const std::string &redisKey)
class StateKeyValueException : public runtime_error

Public Functions

inline explicit StateKeyValueException(const std::string &message)
class StateServer : public faabric::transport::MessageEndpointServer

Public Functions

explicit StateServer(State &stateIn)
namespace transport

Enums

enum class MessageResponseCode

Types of message send/ receive outcomes.

Values:

enumerator SUCCESS
enumerator TERM
enumerator TIMEOUT
enumerator ERROR
enum MessageEndpointConnectType

Values:

enumerator BIND
enumerator CONNECT
enum class SocketType

Values:

enumerator pair
enumerator pub
enumerator sub
enumerator pull
enumerator push
enumerator rep
enumerator req
enum PointToPointCall

Values:

enumerator MAPPING
enumerator MESSAGE
enumerator LOCK_GROUP
enumerator LOCK_GROUP_RECURSIVE
enumerator UNLOCK_GROUP
enumerator UNLOCK_GROUP_RECURSIVE

Functions

PointToPointBroker &getPointToPointBroker()
std::vector<std::pair<std::string, faabric::PointToPointMappings>> getSentMappings()
std::vector<std::pair<std::string, faabric::PointToPointMessage>> getSentPointToPointMessages()
std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> getSentLockMessages()
void clearSentMessages()
static std::shared_ptr<PointToPointClient> getClient(const std::string &host)
std::string getPointToPointKey(int groupId, int sendIdx, int recvIdx)
std::string getPointToPointKey(int groupId, int recvIdx)
auto getEndpointPtrs(const std::string &label)

Variables

static std::map<MessageResponseCode, std::string> MessageResponseCodeText = {{MessageResponseCode::SUCCESS, "Success"}, {MessageResponseCode::TERM, "Connection terminated"}, {MessageResponseCode::TIMEOUT, "Message timed out"}, {MessageResponseCode::ERROR, "Error"},}
static faabric::util::ConcurrentMap<int, std::shared_ptr<PointToPointGroup>> groups
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<std::tuple<std::unique_ptr<AsyncInternalRecvMessageEndpoint>, std::unique_ptr<AsyncInternalSendMessageEndpoint>, std::atomic_int32_t>>> endpoints
thread_local absl::flat_hash_set<std::string> threadEndpoints
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<PointToPointClient>> clients
thread_local int currentGroupId = NO_CURRENT_GROUP_ID
thread_local std::vector<int> sentMsgCount
thread_local std::vector<int> recvMsgCount
thread_local std::vector<std::list<Message>> outOfOrderMsgs
static std::mutex mockMutex
static std::vector<std::pair<std::string, faabric::PointToPointMappings>> sentMappings
static std::vector<std::pair<std::string, faabric::PointToPointMessage>> sentMessages
static std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> sentLockMessages
class AsyncDirectRecvEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncDirectSendEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize)
class AsyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint

Public Functions

AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
class AsyncInternalRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncInternalSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNumber = NO_SEQUENCE_NUM)
class AsyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
AsyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
class AsyncSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNum = NO_SEQUENCE_NUM)
class FanMessageEndpoint : public faabric::transport::MessageEndpoint

Subclassed by faabric::transport::AsyncFanMessageEndpoint, faabric::transport::SyncFanMessageEndpoint

Public Functions

FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
MessageContext attachFanOut()
Message recv(const MessageContext &ctx)
void sendResponse(const MessageContext &ctx, uint8_t header, const uint8_t *data, size_t dataSize)
void stop()
class Message
#include <Message.h>

Represents message data passed around the transport layer. Essentially an array of bytes, with a size and a flag to say whether there’s more data to follow.

Messages are not copyable, only movable, as they will regularly contain large amounts of data.

Public Functions

Message(size_t bufferSize)
Message(nng_msg *nngMsg)
Message(MessageResponseCode responseCodeIn)
~Message()
Message(const Message &other) = delete
Message &operator=(const Message &other) = delete
inline Message(Message &&other)
inline Message &operator=(Message &&other)
inline MessageResponseCode getResponseCode()
inline std::span<uint8_t> allData()
inline std::span<const uint8_t> allData() const
std::span<char> data()
std::span<const char> data() const
std::span<uint8_t> udata()
std::span<const uint8_t> udata() const
std::vector<uint8_t> dataCopy() const
inline uint8_t getMessageCode() const
inline uint64_t getDeclaredDataSize() const
inline int getSequenceNum() const
class MessageContext

Public Functions

MessageContext() = default
inline explicit MessageContext(nng_ctx context)
MessageContext(const MessageContext&) = delete
MessageContext &operator=(const MessageContext&) = delete
inline MessageContext(MessageContext &&rhs)
inline MessageContext &operator=(MessageContext &&rhs)
inline ~MessageContext()

Public Members

nng_ctx context = NNG_CTX_INITIALIZER
class MessageEndpoint

Subclassed by faabric::transport::AsyncDirectSendEndpoint, faabric::transport::AsyncInternalSendMessageEndpoint, faabric::transport::AsyncSendMessageEndpoint, faabric::transport::FanMessageEndpoint, faabric::transport::RecvMessageEndpoint, faabric::transport::SyncSendMessageEndpoint

Public Functions

MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
MessageEndpoint(const std::string &addressIn, int timeoutMsIn)
MessageEndpoint &operator=(const MessageEndpoint&) = delete
MessageEndpoint(const MessageEndpoint &ctx) = delete
virtual ~MessageEndpoint()
std::string getAddress()
class MessageEndpointClient

Subclassed by faabric::planner::PlannerClient, faabric::scheduler::FunctionCallClient, faabric::snapshot::SnapshotClient, faabric::state::StateClient, faabric::transport::PointToPointClient

Public Functions

MessageEndpointClient(std::string hostIn, int asyncPort, int syncPort, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void asyncSend(int header, google::protobuf::Message *msg, int sequenceNum = NO_SEQUENCE_NUM)
void asyncSend(int header, const uint8_t *buffer, size_t bufferSize, int sequenceNum = NO_SEQUENCE_NUM)
void syncSend(int header, google::protobuf::Message *msg, google::protobuf::Message *response)
void syncSend(int header, const uint8_t *buffer, size_t bufferSize, google::protobuf::Message *response)
class MessageEndpointServer

Subclassed by faabric::planner::PlannerServer, faabric::scheduler::FunctionCallServer, faabric::snapshot::SnapshotServer, faabric::state::StateServer, faabric::transport::PointToPointServer

Public Functions

MessageEndpointServer(int asyncPortIn, int syncPortIn, const std::string &inprocLabelIn, int nThreadsIn)
virtual void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)

We need to guarantee to callers of this function, that when it returns, the server will be ready to use.

virtual void stop()
virtual void onWorkerStop()
void setRequestLatch()
void awaitRequestLatch()
int getNThreads()
class MessageEndpointServerHandler

Public Functions

MessageEndpointServerHandler(MessageEndpointServer *serverIn, bool asyncIn, const std::string &inprocLabelIn, int nThreadsIn)
void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void join()
class MessageTimeoutException : public faabric::util::FaabricException

Public Functions

inline explicit MessageTimeoutException(std::string message)
class PointToPointBroker

Public Functions

PointToPointBroker()
std::string getHostForReceiver(int groupId, int recvIdx)
int getMpiPortForReceiver(int groupId, int recvIdx)
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
void setAndSendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
void sendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision, const std::set<std::string> &hostList)
void waitForMappingsOnThisHost(int groupId)
std::set<int> getIdxsRegisteredForGroup(int groupId)
std::set<std::string> getHostsRegisteredForGroup(int groupId)
void updateHostForIdx(int groupId, int groupIdx, std::string newHost)
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, std::string hostHint, bool mustOrderMsg = false)
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, bool mustOrderMsg = false, int sequenceNum = NO_SEQUENCE_NUM, std::string hostHint = "")
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx, bool mustOrderMsg = false)
void clearGroup(int groupId)
void clear()
void resetThreadLocalCache()
void postMigrationHook(faabric::Message &msg)
class PointToPointClient : public faabric::transport::MessageEndpointClient

Public Functions

PointToPointClient(const std::string &hostIn)
void sendMappings(faabric::PointToPointMappings &mappings)
void sendMessage(faabric::PointToPointMessage &msg, int sequenceNum = NO_SEQUENCE_NUM)
void groupLock(int appId, int groupId, int groupIdx, bool recursive = false)
void groupUnlock(int appId, int groupId, int groupIdx, bool recursive = false)
class PointToPointGroup

Public Functions

PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
void lock(int groupIdx, bool recursive)
void unlock(int groupIdx, bool recursive)
int getLockOwner(bool recursive)
void localLock()
void localUnlock()
bool localTryLock()
void barrier(int groupIdx)
void notify(int groupIdx)
int getNotifyCount()

Public Static Functions

static std::shared_ptr<PointToPointGroup> getGroup(int groupId)
static std::shared_ptr<PointToPointGroup> getOrAwaitGroup(int groupId)
static bool groupExists(int groupId)
static void addGroup(int appId, int groupId, int groupSize, bool isSingleHost)
static void addGroupIfNotExists(int appId, int groupId, int groupSize)
static void clearGroup(int groupId)
static void clear()
class PointToPointServer : public faabric::transport::MessageEndpointServer

Public Functions

PointToPointServer()
class RecvMessageEndpoint : public faabric::transport::MessageEndpoint

Subclassed by faabric::transport::AsyncDirectRecvEndpoint, faabric::transport::AsyncInternalRecvMessageEndpoint, faabric::transport::AsyncRecvMessageEndpoint, faabric::transport::SyncRecvMessageEndpoint

Public Functions

RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)

Constructor for external TCP sockets

RecvMessageEndpoint(std::string inProcLabel, int timeoutMs, SocketType socketType, MessageEndpointConnectType connectType)

Constructor for internal inproc sockets

inline virtual ~RecvMessageEndpoint()
virtual Message recv()
class SyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint

Public Functions

SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
class SyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint

Public Functions

SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
SyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
virtual Message recv() override
void sendResponse(uint8_t header, const uint8_t *data, size_t dataSize)
class SyncSendMessageEndpoint : public faabric::transport::MessageEndpoint

Public Functions

SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
void sendRaw(const uint8_t *data, size_t dataSize)
Message sendAwaitResponse(uint8_t header, const uint8_t *data, size_t dataSize)
namespace tcp

Functions

void setReuseAddr(int connFd)
void setNoDelay(int connFd)
void setQuickAck(int connFd)
void setNonBlocking(int connFd)
void setBlocking(int connFd)
bool isNonBlocking(int connFd)
void setBusyPolling(int connFd)
void setRecvTimeoutMs(int connFd, int timeoutMs)
void setSendTimeoutMs(int connFd, int timeoutMs)
void setRecvBufferSize(int connFd, size_t bufferSizeBytes)
void setSendBufferSize(int connFd, size_t bufferSizeBytes)

Variables

const int SocketTimeoutMs = 5000
const size_t SocketBufferSizeBytes = 16777216
class Address

Public Functions

Address(const std::string &host, int port)
Address(int port)
sockaddr *get() const
class RecvSocket

Public Functions

RecvSocket(int port, const std::string &host = ANY_HOST)
RecvSocket(const RecvSocket &recvSocket) = delete
~RecvSocket()
void listen()
int accept()
void recvOne(int conn, uint8_t *buffer, size_t bufferSize)
class SendSocket

Public Functions

SendSocket(const std::string &host, int port)
void dial()
void sendOne(const uint8_t *buffer, size_t bufferSize)
class Socket

Public Functions

Socket()
Socket(int connFd)
Socket(const Socket &socket) = delete
~Socket()
inline int get() const
namespace util

Typedefs

using BeastHttpRequest = beast::http::request<beast::http::string_body>
using BeastHttpResponse = beast::http::response<beast::http::string_body>
typedef std::chrono::steady_clock::time_point TimePoint
typedef std::unique_lock<std::mutex> UniqueLock
typedef std::unique_lock<std::shared_mutex> FullLock
typedef std::shared_lock<std::shared_mutex> SharedLock
typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>> MemoryRegion

Enums

enum DeltaCommand

Values:

enumerator DELTACMD_TOTAL_SIZE
enumerator DELTACMD_ZSTD_COMPRESSED_COMMANDS
enumerator DELTACMD_DELTA_OVERWRITE
enumerator DELTACMD_DELTA_XOR
enumerator DELTACMD_END
enum SnapshotDataType

Defines the permitted datatypes for snapshot diffs. Each has a predefined length, except for the raw option which is used for generic streams of bytes.

Values:

enumerator Raw
enumerator Bool
enumerator Int
enumerator Long
enumerator Float
enumerator Double
enum SnapshotMergeOperation

Defines the operation to perform when merging the diff with the original snapshot.

WARNING: this enum forms part of the API with executing applications, so make sure they are updated accordingly when it’s changed.

Values:

enumerator Bytewise
enumerator Sum
enumerator Product
enumerator Subtract
enumerator Max
enumerator Min
enumerator Ignore
enumerator XOR

Functions

std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory()
std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory(const std::string &user, const std::string &function, int count = 1)
bool isBatchExecRequestValid(std::shared_ptr<faabric::BatchExecuteRequest> ber)
void updateBatchExecAppId(std::shared_ptr<faabric::BatchExecuteRequest> ber, int newAppId)
void updateBatchExecGroupId(std::shared_ptr<faabric::BatchExecuteRequest> ber, int newGroupId)
std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(int32_t appId)
std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(std::shared_ptr<faabric::BatchExecuteRequest> ber)
int getNumFinishedMessagesInBatch(std::shared_ptr<faabric::BatchExecuteRequestStatus> berStatus)
std::vector<uint8_t> stringToBytes(const std::string &str)
int bytesToInt(const std::vector<uint8_t> &bytes)
std::string bytesToString(const std::vector<uint8_t> &bytes)
std::string formatByteArrayToIntString(const std::vector<uint8_t> &bytes)
void trimTrailingZeros(std::vector<uint8_t> &vectorIn)
int safeCopyToBuffer(const std::vector<uint8_t> &dataIn, uint8_t *buffer, int bufferLen)
int safeCopyToBuffer(const uint8_t *dataIn, int dataLen, uint8_t *buffer, int bufferLen)
std::string byteArrayToHexString(const uint8_t *data, int dataSize)
std::vector<uint8_t> hexStringToByteArray(const std::string &hexString)
template<typename T>
std::string intToHexString(T i)
template<class T>
T unalignedRead(const uint8_t *bytes)
template<class T>
void unalignedWrite(const T &value, uint8_t *destination)
template<class T>
void appendBytesOf(std::vector<uint8_t> &container, T value)
template<class T>
size_t readBytesOf(const std::vector<uint8_t> &container, size_t offset, T *outValue)
template<typename T>
std::vector<uint8_t> valueToBytes(T val)
Clock &getGlobalClock()
template<typename T>
bool compareArrays(T *v1, T *v2, int size)
SystemConfig &getSystemConfig()
void setUpCrashHandler(int sig = -1)
void handleCrash(int sig)
std::vector<uint8_t> serializeDelta(const DeltaSettings &cfg, const uint8_t *oldDataStart, size_t oldDataLen, const uint8_t *newDataStart, size_t newDataLen)
void applyDelta(const std::vector<uint8_t> &delta, std::function<void(uint32_t)> setDataSize, std::function<uint8_t*()> getDataPointer)
std::shared_ptr<DirtyTracker> getDirtyTracker()

Returns the dirty tracker singleton. The dirty tracking mode is determined in the system config.

void resetDirtyTracker()

Resets the dirty tracker singleton (e.g. if the config has been changed).

std::string getEnvVar(const std::string &key, const std::string &deflt)
std::string setEnvVar(const std::string &varName, const std::string &value)
void unsetEnvVar(const std::string &varName)
unsigned int getUsableCores()
ExecGraphNode getFunctionExecGraphNode(int appId, int msgId)
ExecGraph getFunctionExecGraph(const faabric::Message &msg)
void logChainedFunction(faabric::Message &parentMessage, const faabric::Message &chainedMessage)
std::set<unsigned int> getChainedFunctions(const faabric::Message &msg)
int countExecGraphNodes(const ExecGraph &graph)
std::set<std::string> getExecGraphHosts(const ExecGraph &graph)
std::vector<std::string> getMpiRankHostsFromExecGraph(const ExecGraph &graph)
std::pair<std::vector<std::string>, std::vector<std::string>> getMigratedMpiRankHostsFromExecGraph(const ExecGraph &graph)
std::string execNodeToJson(const ExecGraphNode &node)
std::string execGraphToJson(const ExecGraph &graph)
void addDetail(faabric::Message &msg, const std::string &key, const std::string &value)
void incrementCounter(faabric::Message &msg, const std::string &key, const int valueToIncrement = 1)
std::string readFileToString(const std::string &path)
std::vector<uint8_t> readFileToBytes(const std::string &path)
void writeBytesToFile(const std::string &path, const std::vector<uint8_t> &data)
bool isWasm(const std::vector<uint8_t> &bytes)
std::string funcToString(const faabric::Message &msg, bool includeId)
std::string funcToString(const std::shared_ptr<faabric::BatchExecuteRequest> &req)
unsigned int setMessageId(faabric::Message &msg)
std::string buildAsyncResponse(const faabric::Message &msg)
std::shared_ptr<faabric::Message> messageFactoryShared(const std::string &user, const std::string &function)
faabric::Message messageFactory(const std::string &user, const std::string &function)
std::string resultKeyFromMessageId(unsigned int mid)
std::string statusKeyFromMessageId(unsigned int mid)
std::vector<uint8_t> messageToBytes(const faabric::Message &msg)
std::vector<std::string> getArgvForMessage(const faabric::Message &msg)
std::string getMainThreadSnapshotKey(const faabric::Message &msg)
unsigned int generateGid()
std::unique_ptr<FaabricCpuSet> pinThreadToFreeCpu(pthread_t thread)
std::string messageToJson(const google::protobuf::Message &msg)
void jsonToMessage(const std::string &jsonStr, google::protobuf::Message *msg)
void initLogging()
inline void *malloc(std::size_t size)
inline void free(void *ptr)
inline void *realloc(void *ptr, std::size_t newSize)
void mergeManyDirtyPages(std::vector<char> &dest, const std::vector<std::vector<char>> &source)
void mergeDirtyPages(std::vector<char> &dest, const std::vector<char> &source)
bool isPageAligned(const void *ptr)
size_t getRequiredHostPages(size_t nBytes)
size_t getRequiredHostPagesRoundDown(size_t nBytes)
size_t alignOffsetDown(size_t offset)
AlignedChunk getPageAlignedChunk(long offset, long length)
MemoryRegion allocatePrivateMemory(size_t size)
MemoryRegion allocateSharedMemory(size_t size)
MemoryRegion allocateVirtualMemory(size_t size)
void claimVirtualMemory(std::span<uint8_t> region)
void mapMemoryPrivate(std::span<uint8_t> target, int fd)
void mapMemoryShared(std::span<uint8_t> target, int fd)
void resizeFd(int fd, size_t size)
void writeToFd(int fd, off_t offset, std::span<const uint8_t> data)
int createFd(size_t size, const std::string &fdLabel)
void appendDataToFd(int fd, std::span<uint8_t> data)
std::string getIPFromHostname(const std::string &hostname)
std::string getPrimaryIPForThisHost(const std::string &interface)

Returns the IP for the given interface, or picks one based on an “appropriate” interface name.

faabric::PointToPointMappings ptpMappingsFromSchedulingDecision(std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> decision)
std::string randomString(int len)
std::string randomStringFromSet(const std::unordered_set<std::string> &s)
void diffArrayRegions(std::vector<SnapshotDiff> &diffs, uint32_t startOffset, uint32_t endOffset, std::span<const uint8_t> a, std::span<const uint8_t> b)
template<typename T>
inline bool calculateDiffValue(const uint8_t *original, uint8_t *updated, SnapshotMergeOperation operation)
template<typename T>
inline T applyDiffValue(const uint8_t *original, const uint8_t *diff, SnapshotMergeOperation operation)
std::string snapshotDataTypeStr(SnapshotDataType dt)
std::string snapshotMergeOpStr(SnapshotMergeOperation op)
std::string keyForUser(const std::string &user, const std::string &key)
void maskDouble(unsigned int *maskArray, unsigned long idx)
bool isAllWhitespace(const std::string &input)
bool startsWith(const std::string &input, const std::string &subStr)
bool endsWith(const std::string &input, const std::string &subStr)
bool contains(const std::string &input, const std::string &subStr)
std::string removeSubstr(const std::string &input, const std::string &toErase)
bool stringIsInt(const std::string &input)
template<class T>
std::string vectorToString(std::vector<T> vec)
void setTestMode(bool val)
void setMockMode(bool val)
bool isTestMode()
bool isMockMode()
faabric::util::TimePoint startTimer()
long getTimeDiffNanos(const faabric::util::TimePoint &begin)
long getTimeDiffMicros(const faabric::util::TimePoint &begin)
double getTimeDiffMillis(const faabric::util::TimePoint &begin)
void logEndTimer(const std::string &label, const faabric::util::TimePoint &begin)
void startGlobalTimer()
void printTimerTotals()
uint64_t timespecToNanos(struct timespec *nativeTimespec)
void nanosToTimespec(uint64_t nanos, struct timespec *nativeTimespec)
void *pageAlignAddress(void *faultAddr)
int countExecGraphNode(const ExecGraphNode &node)
std::set<std::string> getExecGraphHostsForNode(const ExecGraphNode &node)
std::vector<std::string> getMpiRankHostsFromExecGraphNode(const ExecGraphNode &node)
size_t writeDataCallback(void *ptr, size_t size, size_t nmemb, void *stream)
static std::unique_ptr<FaabricCpuSet> getNextFreeCpu()
static void doPinThreadToCpu(pthread_t thread, cpu_set_t *cpuSet)
MemoryRegion doAlloc(size_t size, int prot, int flags)
void mapMemory(std::span<uint8_t> target, int fd, int flags)

Variables

constexpr uint8_t DELTA_PROTOCOL_VERSION = 1
constexpr int DELTA_ZSTD_COMPRESS_LEVEL = 1
const int NO_CPU_IDX = -1
const int GHA_CPU_IDX = -2
static const long HOST_PAGE_SIZE = sysconf(_SC_PAGESIZE)
static thread_local DirtyTrackingRecord tracking
static ThreadSafeDirtyTrackingRecord globalTracking
static std::shared_ptr<DirtyTracker> tracker = nullptr
static long uffd = -1
static bool uffdWriteProtect = false
static bool uffdSigbus = false
static int closeFd = -1
static std::shared_ptr<std::jthread> eventThread = nullptr
static FreeCpus freeCpus
static std::unordered_map<std::string, std::string> ipMap
static std::mutex hostnameMx
static std::atomic<bool> testMode = false
static std::atomic<bool> mockMode = false
struct AlignedChunk

Public Members

long originalOffset = 0
long originalLength = 0
long nBytesOffset = 0
long nBytesLength = 0
long nPagesOffset = 0
long nPagesLength = 0
long offsetRemainder = 0
class Barrier

Public Functions

explicit Barrier(int countIn, std::function<void()> completionFunctionIn, int timeoutMsIn)
void wait()

Public Static Functions

static std::shared_ptr<Barrier> create(int count, std::function<void()> completionFunctionIn, int timeoutMs = DEFAULT_BARRIER_TIMEOUT_MS)
static std::shared_ptr<Barrier> create(int count, int timeoutMs = DEFAULT_BARRIER_TIMEOUT_MS)
class ChainedCallFailedException : public faabric::util::FaabricException

Public Functions

inline explicit ChainedCallFailedException(std::string message)
class Clock

Public Functions

Clock()
const TimePoint now()
long epochMillis()
long timeDiff(const TimePoint &t1, const TimePoint &t2)
long timeDiffNano(const TimePoint &t1, const TimePoint &t2)
long timeDiffMicro(const TimePoint &t1, const TimePoint &t2)
template<class Key, class Value>
class ConcurrentMap

Public Types

using key_type = typename UnderlyingMap::key_type
using mapped_type = typename UnderlyingMap::value_type
using size_type = typename UnderlyingMap::size_type
using difference_type = typename UnderlyingMap::difference_type

Public Functions

ConcurrentMap() = default
inline ConcurrentMap(size_t initialCapacity)
ConcurrentMap(const ConcurrentMap&) = delete
ConcurrentMap &operator=(const ConcurrentMap&) = delete
ConcurrentMap(ConcurrentMap&&) = default
ConcurrentMap &operator=(ConcurrentMap&&) = default
inline void swap(ConcurrentMap<Key, Value> &other)
inline bool isEmpty() const
inline size_t size() const
inline size_t capacity() const
inline void reserve(size_t count)
inline void rehash(size_t count)
inline void clear()
template<class K = Key>
inline bool erase(const KeyArg<K> &key)
template<class P = std::pair<Key, Value>>
inline bool insert(P &&pair)
template<class K = Key>
inline bool insertOrAssign(K &&key, Value &&value)
template<class K = Key, class ...Args>
inline bool tryEmplace(K &&key, Args&&... args)
template<class K = Key, class ...Args>
inline std::pair<bool, Value> tryEmplaceShared(K &&key, Args&&... args)
requires detail
template<class K = Key, std::invocable<const Value&> F>
inline bool inspect(const KeyArg<K> &key, F inspector) const
template<class K = Key>
inline std::optional<Value> get(const KeyArg<K> &key) const
requires std
template<std::predicate<const Key&, const Value&> F>
inline void eraseIf(F predicate)
struct DeltaSettings

Public Functions

explicit DeltaSettings(const std::string &definition)
std::string toString() const

Public Members

bool usePages = true
size_t pageSize = 4096
bool xorWithOld = true
bool useZstd = true
int zstdLevel = 1
class DirtyTracker

Subclassed by faabric::util::NoneDirtyTracker, faabric::util::SegfaultDirtyTracker, faabric::util::SoftPTEDirtyTracker, faabric::util::UffdDirtyTracker

Public Functions

inline DirtyTracker(const std::string &modeIn)
virtual void clearAll() = 0
virtual std::string getType() = 0
virtual void startTracking(std::span<uint8_t> region) = 0
virtual void stopTracking(std::span<uint8_t> region) = 0
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) = 0
virtual void startThreadLocalTracking(std::span<uint8_t> region) = 0
virtual void stopThreadLocalTracking(std::span<uint8_t> region) = 0
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) = 0
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) = 0
class DirtyTrackingRecord

Wrapper around the actual bookkeeping behind dirty tracking. This is deliberately not thread-safe as it will only ever be used in TLS for signal handlers.

Subclassed by faabric::util::ThreadSafeDirtyTrackingRecord

Public Functions

DirtyTrackingRecord() = default
inline virtual void trackRegion(std::span<uint8_t> region)
inline virtual void markPage(void *addr)
inline virtual bool isInitialised()
inline virtual int getNPages()
inline virtual std::vector<char> getDirtyFlags()
inline virtual void reset()
struct ExecGraph

Public Members

ExecGraphNode rootNode
struct ExecGraphNode

Public Members

faabric::Message msg
std::vector<ExecGraphNode> children
class ExecGraphNodeNotFoundException : public faabric::util::FaabricException

Public Functions

inline explicit ExecGraphNodeNotFoundException(std::string message)
class FaabricCpuSet

Public Functions

FaabricCpuSet(int cpuIdxIn = NO_CPU_IDX)
FaabricCpuSet &operator=(const FaabricCpuSet&) = delete
FaabricCpuSet(const FaabricCpuSet&) = delete
~FaabricCpuSet()
inline cpu_set_t *get()
class FaabricException : public exception

Subclassed by faabric::executor::ChainedCallException, faabric::executor::ExecutorContextException, faabric::redis::RedisNoResponseException, faabric::transport::MessageTimeoutException, faabric::util::ChainedCallFailedException, faabric::util::ExecGraphNodeNotFoundException, faabric::util::FunctionFrozenException, faabric::util::FunctionMigratedException, faabric::util::JsonSerialisationException, faabric::util::QueueTimeoutException

Public Functions

inline explicit FaabricException(std::string message)
inline const char *what() const noexcept override
template<typename T>
class FixedCapacityQueue

Public Functions

inline FixedCapacityQueue(int capacity)
inline FixedCapacityQueue()
inline void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
inline void dequeueIfPresent(T *res)
inline T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
inline T *peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
inline void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
inline long size()
inline void reset()
class FlagWaiter : public std::enable_shared_from_this<FlagWaiter>

Public Functions

FlagWaiter(int timeoutMsIn = DEFAULT_FLAG_WAIT_MS)
void waitOnFlag()
void setFlag(bool value)
class FreeCpus

Public Functions

inline FreeCpus()

Public Members

std::vector<std::unique_ptr<std::atomic<bool>>> cpuVec
class FunctionFrozenException : public faabric::util::FaabricException

Public Functions

inline explicit FunctionFrozenException(std::string message)
class FunctionMigratedException : public faabric::util::FaabricException

Public Functions

inline explicit FunctionMigratedException(std::string message)
class JsonSerialisationException : public faabric::util::FaabricException

Public Functions

inline explicit JsonSerialisationException(std::string message)
class Latch : public std::enable_shared_from_this<Latch>

Public Functions

explicit Latch(int countIn, int timeoutMsIn)
void wait()

Public Static Functions

static std::shared_ptr<Latch> create(int count, int timeoutMs = DEFAULT_LATCH_TIMEOUT_MS)
class NoneDirtyTracker : public faabric::util::DirtyTracker

Public Functions

NoneDirtyTracker(const std::string &modeIn)
virtual void clearAll() override
inline virtual std::string getType() override
virtual void startTracking(std::span<uint8_t> region) override
virtual void stopTracking(std::span<uint8_t> region) override
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override
class PeriodicBackgroundThread
#include <PeriodicBackgroundThread.h>

Wrapper around periodic background thread that repeatedly does some arbitrary work after a given interval.

Subclassed by faabric::planner::KeepAliveThread, faabric::scheduler::SchedulerReaperThread

Public Functions

void start(int intervalSecondsIn)

Start the background thread with the given wake-up interval in seconds.

void stop()

Stop and wait for this thread to finish.

virtual void doWork() = 0
virtual void tidyUp()
inline int getIntervalSeconds()
template<typename T>
class Queue

Public Functions

inline void enqueue(T value)
inline void dequeueIfPresent(T *res)
inline T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
inline T *peek(long timeoutMs = 0)
inline void waitToDrain(long timeoutMs)
inline void drain()
inline long size()
inline void reset()
class QueueTimeoutException : public faabric::util::FaabricException

Public Functions

inline explicit QueueTimeoutException(std::string message)
class SegfaultDirtyTracker : public faabric::util::DirtyTracker

Public Functions

SegfaultDirtyTracker(const std::string &modeIn)
virtual void clearAll() override
inline virtual std::string getType() override
virtual void startTracking(std::span<uint8_t> region) override
virtual void stopTracking(std::span<uint8_t> region) override
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override

Public Static Functions

static void handler(int sig, siginfo_t *info, void *ucontext) noexcept
class SnapshotData

Public Functions

SnapshotData() = default
explicit SnapshotData(size_t sizeIn)
SnapshotData(size_t sizeIn, size_t maxSizeIn)
explicit SnapshotData(std::span<const uint8_t> dataIn)
SnapshotData(std::span<const uint8_t> dataIn, size_t maxSizeIn)
SnapshotData(const SnapshotData&) = delete
SnapshotData &operator=(const SnapshotData&) = delete
~SnapshotData()
void copyInData(std::span<const uint8_t> buffer, uint32_t offset = 0)
const uint8_t *getDataPtr(uint32_t offset = 0)
std::vector<uint8_t> getDataCopy()
std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize)
void mapToMemory(std::span<uint8_t> target)
void addMergeRegion(uint32_t offset, size_t length, SnapshotDataType dataType, SnapshotMergeOperation operation)
void fillGapsWithBytewiseRegions()
void clearMergeRegions()
std::vector<SnapshotMergeRegion> getMergeRegions()
size_t getQueuedDiffsCount()
void applyDiffs(const std::vector<SnapshotDiff> &diffs)
void applyDiff(const SnapshotDiff &diff)
void queueDiffs(const std::vector<SnapshotDiff> &diffs)
int writeQueuedDiffs()
inline size_t getSize() const
inline size_t getMaxSize() const
std::vector<SnapshotDiff> getTrackedChanges()
void clearTrackedChanges()
std::vector<faabric::util::SnapshotDiff> diffWithDirtyRegions(std::span<uint8_t> updated, const std::vector<char> &dirtyRegions)
class SnapshotDiff
#include <snapshot.h>

Represents a modification to a snapshot (a.k.a. a dirty region). Specifies the modified data, as well as its offset within the snapshot.

Each diff does not own the data, it just provides a span pointing at the original data.

Public Functions

SnapshotDiff() = default
SnapshotDiff(SnapshotDataType dataTypeIn, SnapshotMergeOperation operationIn, uint32_t offsetIn, std::span<const uint8_t> dataIn)
inline SnapshotDataType getDataType() const
inline SnapshotMergeOperation getOperation() const
inline uint32_t getOffset() const
inline std::span<const uint8_t> getData() const
std::vector<uint8_t> getDataCopy() const
class SnapshotMergeRegion
#include <snapshot.h>

Defines how diffs in the given snapshot region should be interpreted wrt the original snapshot.

For example, it may specify that the change should be treated as an integer that needs to be summed (i.e. the diff is the current value minus the original), or just as a region of raw bytes that needs to be XORed with the original.

A merge region specifies an offset, length, data type and operation, e.g. an integer to be summed at offset 100 with a length of 4.

Public Functions

SnapshotMergeRegion() = default
SnapshotMergeRegion(uint32_t offsetIn, size_t lengthIn, SnapshotDataType dataTypeIn, SnapshotMergeOperation operationIn)
void addDiffs(std::vector<SnapshotDiff> &diffs, std::span<const uint8_t> originalData, std::span<uint8_t> updatedData, const std::vector<char> &dirtyRegions)
inline bool operator<(const SnapshotMergeRegion &other) const

This allows us to sort the merge regions which is important for diffing purposes.

inline bool operator==(const SnapshotMergeRegion &other) const

Public Members

uint32_t offset = 0
size_t length = 0
SnapshotDataType dataType = SnapshotDataType::Raw
SnapshotMergeOperation operation = SnapshotMergeOperation::Bytewise
class SoftPTEDirtyTracker : public faabric::util::DirtyTracker

Public Functions

SoftPTEDirtyTracker(const std::string &modeIn)
~SoftPTEDirtyTracker()
virtual void clearAll() override
inline virtual std::string getType() override
virtual void startTracking(std::span<uint8_t> region) override
virtual void stopTracking(std::span<uint8_t> region) override
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override
template<typename T>
class SpinLockQueue

Public Functions

inline void enqueue(T &value)
inline T dequeue()
inline long size()
inline void drain()
inline void reset()
class SystemConfig

Public Functions

SystemConfig()
void print()
void reset()

Public Members

std::string serialisation
std::string logLevel
std::string logFile
std::string stateMode
std::string deltaSnapshotEncoding
std::string redisStateHost
std::string redisQueueHost
std::string redisPort
int overrideCpuCount
int overrideFreeCpuStart
std::string batchSchedulerMode
int globalMessageTimeout
int boundTimeout
int reaperIntervalSeconds
int defaultMpiWorldSize
std::string endpointInterface
std::string endpointHost
int endpointPort
int endpointNumThreads
int functionServerThreads
int stateServerThreads
int snapshotServerThreads
int pointToPointServerThreads
std::string dirtyTrackingMode
std::string diffingMode
std::string plannerHost
int plannerPort
class ThreadSafeDirtyTrackingRecord : public faabric::util::DirtyTrackingRecord

Thread-safe wrapper around dirty tracking data, necessary for use with the background event thread in the userfaultfd tracker. Although the logic around the interaction between the event thread and the main thread should in theory protect against concurrent accesses, we need this to keep TSan happy.

Public Functions

ThreadSafeDirtyTrackingRecord() = default
inline virtual void trackRegion(std::span<uint8_t> region) override
inline virtual void markPage(void *addr) override
inline virtual std::vector<char> getDirtyFlags() override
inline virtual bool isInitialised() override
inline virtual int getNPages() override
inline virtual void reset() override
class TokenPool

Public Functions

explicit TokenPool(int nTokens)
int getToken()

Blocking call to get an available token

void releaseToken(int token)
void reset()
int size()
int taken()
int free()
class UffdDirtyTracker : public faabric::util::DirtyTracker
#include <dirty.h>

Dirty tracking implementation using userfaultfd to write-protect pages, then handle the resulting userspace events when they are written to.

The dirty tracking mode can be one of four options:

  • uffd - uses the SIGBUS handler to catch events triggered by accessing missing pages in demand-zero paged memory.

  • uffd-wp - same as uffd but adds write-protected events to catch subsequent writes to write-protected pages.

  • uffd-thread - same as uffd, but using a background event thread to handle events. This has the benefit of distinguishing between read and write missing page events.

  • uffd-thread-wp - same as uffd-thread, but adds write-protected events.

See the docs for more info on these different approaches: https://www.kernel.org/doc/html/latest/admin-guide/mm/userfaultfd.html

Public Functions

UffdDirtyTracker(const std::string &modeIn)
~UffdDirtyTracker()
virtual void clearAll() override
inline virtual std::string getType() override
virtual void startTracking(std::span<uint8_t> region) override
virtual void stopTracking(std::span<uint8_t> region) override
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override

Public Static Functions

static void sigbusHandler(int sig, siginfo_t *info, void *ucontext) noexcept
namespace detail
template<typename>
struct is_shared_ptr : public false_type
template<typename Pointee>
struct is_shared_ptr<std::shared_ptr<Pointee>> : public true_type