Namespace faabric
-
namespace faabric
-
namespace batch_scheduler
Typedefs
-
typedef std::pair<std::shared_ptr<BatchExecuteRequest>, std::shared_ptr<SchedulingDecision>> InFlightPair
-
typedef std::map<int32_t, InFlightPair> InFlightReqs
-
typedef std::shared_ptr<HostState> Host
-
typedef std::map<std::string, Host> HostMap
Enums
-
enum DecisionType
Values:
-
enumerator NO_DECISION_TYPE
-
enumerator NEW
-
enumerator DIST_CHANGE
-
enumerator SCALE_CHANGE
-
enumerator NO_DECISION_TYPE
-
enum SchedulingTopologyHint
Values:
-
enumerator NONE
-
enumerator CACHED
-
enumerator FORCE_LOCAL
-
enumerator NEVER_ALONE
-
enumerator UNDERFULL
-
enumerator NONE
-
enum MigrationStrategy
Values:
-
enumerator BIN_PACK
-
enumerator EMPTY_HOSTS
-
enumerator BIN_PACK
Functions
-
std::shared_ptr<BatchScheduler> getBatchScheduler()
-
void resetBatchScheduler()
-
void resetBatchScheduler(const std::string &newMode)
-
DecisionCache &getSchedulingDecisionCache()
-
static std::map<std::string, int> getHostFreqCount(std::shared_ptr<SchedulingDecision> decision)
-
static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(std::shared_ptr<SchedulingDecision> newDecision, std::shared_ptr<SchedulingDecision> oldDecision)
-
static std::map<std::string, int> getHostFreqCount(std::shared_ptr<SchedulingDecision> decision)
-
static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(std::shared_ptr<SchedulingDecision> newDecision, std::shared_ptr<SchedulingDecision> oldDecision)
-
static std::set<std::string> filterHosts(HostMap &hostMap)
Variables
-
const std::unordered_map<std::string, SchedulingTopologyHint> strToTopologyHint = {{"NONE", SchedulingTopologyHint::NONE}, {"CACHED", SchedulingTopologyHint::CACHED}, {"FORCE_LOCAL", SchedulingTopologyHint::FORCE_LOCAL}, {"NEVER_ALONE", SchedulingTopologyHint::NEVER_ALONE}, {"UNDERFULL", SchedulingTopologyHint::UNDERFULL},}
-
const std::unordered_map<SchedulingTopologyHint, std::string> topologyHintToStr = {{SchedulingTopologyHint::NONE, "NONE"}, {SchedulingTopologyHint::CACHED, "CACHED"}, {SchedulingTopologyHint::FORCE_LOCAL, "FORCE_LOCAL"}, {SchedulingTopologyHint::NEVER_ALONE, "NEVER_ALONE"}, {SchedulingTopologyHint::UNDERFULL, "UNDERFULL"},}
-
static std::shared_ptr<BatchScheduler> batchScheduler = nullptr
-
class BatchScheduler
Subclassed by faabric::batch_scheduler::BinPackScheduler, faabric::batch_scheduler::CompactScheduler, faabric::batch_scheduler::SpotScheduler
Public Functions
Public Static Functions
-
class BinPackScheduler : public faabric::batch_scheduler::BatchScheduler
Public Functions
-
class CachedDecision
- #include <DecisionCache.h>
A record of a decision already taken for the given size of batch request for the given function. This doesn’t contain the messages themselves, just the hosts and group ID that was used.
Public Functions
-
CachedDecision(const std::vector<std::string> &hostsIn, int groupIdIn)
-
inline std::vector<std::string> getHosts()
-
inline int getGroupId() const
-
CachedDecision(const std::vector<std::string> &hostsIn, int groupIdIn)
-
class CompactScheduler : public faabric::batch_scheduler::BatchScheduler
Public Functions
-
class DecisionCache
- #include <DecisionCache.h>
Repository for cached scheduling decisions. Object is not thread safe as we assume only a single executor will be caching decisions for a given function and size of batch request on one host at a time.
Public Functions
-
void clear()
-
void clear()
-
struct HostState
Public Functions
-
inline HostState(const std::string &ipIn, int slotsIn, int usedSlotsIn)
Public Members
-
std::string ip
-
int slots
-
int usedSlots
-
inline HostState(const std::string &ipIn, int slotsIn, int usedSlotsIn)
-
class SchedulingDecision
Public Functions
-
SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn)
-
bool operator==(const SchedulingDecision &rhs) const = default
-
bool isSingleHost() const
Work out if this decision is all in one host. If the decision is completely on another host, we still count it as not being on a single host, as this host will be the main.
Will always return false if single host optimisations are switched off.
-
void addMessage(const std::string &host, const faabric::Message &msg)
-
void addMessage(const std::string &host, int32_t messageId, int32_t appIdx)
-
void addMessage(const std::string &host, int32_t messageId, int32_t appIdx, int32_t groupIdx)
-
void addMessageInPosition(int32_t pos, const std::string &host, int32_t messageId, int32_t appIdx, int32_t groupIdx, int32_t mpiPort)
-
int32_t removeMessage(int32_t messageId)
-
std::set<std::string> uniqueHosts()
-
void print(const std::string &logLevel = "debug")
Public Members
-
uint32_t appId = 0
-
int32_t groupId = 0
-
int32_t nFunctions = 0
-
std::vector<std::string> hosts
-
std::vector<int32_t> messageIds
-
std::vector<int32_t> appIdxs
-
std::vector<int32_t> groupIdxs
-
std::vector<int32_t> mpiPorts
-
std::string returnHost
Public Static Functions
-
static SchedulingDecision fromPointToPointMappings(faabric::PointToPointMappings &mappings)
-
SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn)
-
class SpotScheduler : public faabric::batch_scheduler::BatchScheduler
Public Functions
-
typedef std::pair<std::shared_ptr<BatchExecuteRequest>, std::shared_ptr<SchedulingDecision>> InFlightPair
-
namespace endpoint
Typedefs
-
using header = beast::http::field
Enums
-
enum class EndpointMode
Values:
-
enumerator SIGNAL
-
enumerator BG_THREAD
-
enumerator SIGNAL
-
class FaabricEndpoint
Public Functions
-
FaabricEndpoint()
-
FaabricEndpoint(const FaabricEndpoint&) = delete
-
FaabricEndpoint(FaabricEndpoint&&) = delete
-
FaabricEndpoint &operator=(const FaabricEndpoint&) = delete
-
FaabricEndpoint &operator=(FaabricEndpoint&&) = delete
-
virtual ~FaabricEndpoint()
-
void start(EndpointMode mode = EndpointMode::SIGNAL)
-
void stop()
-
FaabricEndpoint()
-
class FaabricEndpointHandler : public faabric::endpoint::HttpRequestHandler, public std::enable_shared_from_this<FaabricEndpointHandler>
Public Functions
-
virtual void onRequest(HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) override
-
virtual void onRequest(HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) override
-
struct HttpRequestContext
Public Members
-
asio::io_context &ioc
-
asio::any_io_executor executor
-
std::function<void(faabric::util::BeastHttpResponse&&)> sendFunction
-
asio::io_context &ioc
-
class HttpRequestHandler
Subclassed by faabric::endpoint::FaabricEndpointHandler, faabric::planner::PlannerEndpointHandler
Public Functions
-
virtual void onRequest(HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) = 0
-
virtual void onRequest(HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) = 0
-
namespace detail
-
class EndpointState
Public Functions
-
inline EndpointState(int threadCountIn)
Public Members
-
asio::io_context ioc
-
std::vector<std::jthread> ioThreads
-
inline EndpointState(int threadCountIn)
-
class EndpointState
-
using header = beast::http::field
-
namespace executor
-
Variables
-
static thread_local std::shared_ptr<ExecutorContext> context = nullptr
-
static std::shared_ptr<ExecutorFactory> _factory
-
class ChainedCallException : public faabric::util::FaabricException
Public Functions
-
inline explicit ChainedCallException(std::string message)
-
inline explicit ChainedCallException(std::string message)
-
class Executor
Public Functions
-
explicit Executor(faabric::Message &msg)
-
virtual ~Executor()
-
virtual void shutdown()
Shuts down the executor and clears all its state, including its thread pool.
This must be called before destructing an executor. This is because the tidy-up requires implementations of virtual methods held in subclasses, that may depend on state that those subclass instances hold. Because destructors run in inheritance order, this means that state may have been destructed before the executor destructor runs.
-
virtual void reset(faabric::Message &msg)
-
bool tryClaim()
-
void claim()
-
void releaseClaim()
-
std::shared_ptr<faabric::util::SnapshotData> getMainThreadSnapshot(faabric::Message &msg, bool createIfNotExists = false)
-
long getMillisSinceLastExec()
-
virtual std::span<uint8_t> getMemoryView()
-
virtual void restore(const std::string &snapshotKey)
-
faabric::Message &getBoundMessage()
-
bool isExecuting()
-
inline bool isShutdown()
-
void addChainedMessage(const faabric::Message &msg)
-
const faabric::Message &getChainedMessage(int messageId)
-
std::set<unsigned int> getChainedMessageIds()
-
std::vector<faabric::util::SnapshotDiff> mergeDirtyRegions(const Message &msg, const std::vector<char> &extraDirtyPages = {})
-
void setThreadResult(faabric::Message &msg, int32_t returnValue, const std::string &key, const std::vector<faabric::util::SnapshotDiff> &diffs)
-
virtual void setMemorySize(size_t newSize)
Public Members
-
std::string id
-
explicit Executor(faabric::Message &msg)
-
class ExecutorContext
- #include <ExecutorContext.h>
Globally-accessible wrapper that allows executing applications to query their execution context. The context is thread-local, so applications can query which specific message they are executing.
Public Functions
-
inline Executor *getExecutor()
-
inline std::shared_ptr<faabric::BatchExecuteRequest> getBatchRequest()
-
inline faabric::Message &getMsg()
-
inline int getMsgIdx()
Public Static Functions
-
static bool isSet()
-
static void unset()
-
static std::shared_ptr<ExecutorContext> get()
-
inline Executor *getExecutor()
-
class ExecutorContextException : public faabric::util::FaabricException
Public Functions
-
inline explicit ExecutorContextException(std::string message)
-
inline explicit ExecutorContextException(std::string message)
-
class ExecutorFactory
-
class ExecutorTask
Public Functions
-
ExecutorTask() = default
-
ExecutorTask(const ExecutorTask &other) = delete
-
ExecutorTask &operator=(const ExecutorTask &other) = delete
-
ExecutorTask(ExecutorTask &&other) = default
-
ExecutorTask &operator=(ExecutorTask &&other) = default
Public Members
-
std::shared_ptr<BatchExecuteRequest> req
-
int messageIndex = 0
-
ExecutorTask() = default
-
static thread_local std::shared_ptr<ExecutorContext> context = nullptr
-
namespace mpi
Typedefs
-
typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue
Enums
-
enum MpiMessageType
Values:
-
enumerator NORMAL
-
enumerator BARRIER_JOIN
-
enumerator BARRIER_DONE
-
enumerator SCATTER
-
enumerator GATHER
-
enumerator ALLGATHER
-
enumerator REDUCE
-
enumerator SCAN
-
enumerator ALLREDUCE
-
enumerator ALLTOALL
-
enumerator ALLTOALL_PACKED
-
enumerator SENDRECV
-
enumerator BROADCAST
-
enumerator UNACKED_MPI_MESSAGE
-
enumerator HANDSHAKE
-
enumerator NORMAL
Functions
-
inline size_t payloadSize(const MpiMessage &msg)
-
inline size_t msgSize(const MpiMessage &msg)
-
void serializeMpiMsg(std::vector<uint8_t> &buffer, const MpiMessage &msg)
-
void parseMpiMsg(const std::vector<uint8_t> &bytes, MpiMessage *msg)
-
std::vector<MpiMessage> getMpiMockedMessages(int sendRank)
-
MpiWorldRegistry &getMpiWorldRegistry()
-
static int32_t getAsyncRequestId(int sendRank, int recvRank, bool isSend)
-
static std::tuple<int, int, bool> getRanksFromRequestId(int32_t requestId)
-
void checkSendRecvMatch(faabric_datatype_t *sendType, int sendCount, faabric_datatype_t *recvType, int recvCount)
Variables
-
static std::mutex mockMutex
-
static std::map<int, std::vector<MpiMessage>> mpiMockedMessages
-
static thread_local MpiRankState rankState
-
const uint8_t iSendMagic = 0xFF
-
const uint8_t iRecvMagic = 0x00
-
class MpiContext
-
struct MpiMessage
Public Members
-
int32_t id
-
int32_t worldId
-
int32_t sendRank
-
int32_t recvRank
-
int32_t typeSize
-
int32_t count
-
int32_t requestId
-
MpiMessageType messageType
-
void *buffer
-
int32_t id
-
struct MpiRankState
Public Functions
-
inline void reset()
Public Members
-
int msgCount = 1
-
faabric::Message *msg = nullptr
-
std::vector<std::unique_ptr<std::list<MpiMessage>>> unackedMessageBuffers
-
std::unique_ptr<faabric::util::FaabricCpuSet> pinnedCpu
-
std::vector<std::unique_ptr<faabric::transport::tcp::SendSocket>> sendSockets
-
std::unique_ptr<faabric::transport::tcp::RecvSocket> recvSocket
-
std::vector<int> recvConnPool
-
inline void reset()
-
class MpiWorld
Public Functions
-
MpiWorld()
-
void create(faabric::Message &call, int newId, int newSize)
-
void broadcastHostsToRanks()
-
void initialiseFromMsg(faabric::Message &msg)
-
void initialiseRankFromMsg(faabric::Message &msg)
-
std::string getHostForRank(int rank)
-
std::string getUser()
-
std::string getFunction()
-
int getId() const
-
int getSize() const
-
bool destroy()
-
void getCartesianRank(int rank, int maxDims, const int *dims, int *periods, int *coords)
-
void getRankFromCoords(int *rank, int *coords)
-
void shiftCartesianCoords(int rank, int direction, int disp, int *source, int *destination)
-
void send(int sendRank, int recvRank, const uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
-
int isend(int sendRank, int recvRank, const uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
-
void broadcast(int rootRank, int thisRank, uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
-
void recv(int sendRank, int recvRank, uint8_t *buffer, faabric_datatype_t *dataType, int count, MPI_Status *status, MpiMessageType messageType = MpiMessageType::NORMAL)
-
int irecv(int sendRank, int recvRank, uint8_t *buffer, faabric_datatype_t *dataType, int count, MpiMessageType messageType = MpiMessageType::NORMAL)
-
void awaitAsyncRequest(int requestId)
-
void sendRecv(uint8_t *sendBuffer, int sendcount, faabric_datatype_t *sendDataType, int sendRank, uint8_t *recvBuffer, int recvCount, faabric_datatype_t *recvDataType, int recvRank, int myRank, MPI_Status *status)
-
void scatter(int sendRank, int recvRank, const uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
-
void gather(int sendRank, int recvRank, const uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
-
void allGather(int rank, const uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
-
void reduce(int sendRank, int recvRank, uint8_t *sendBuffer, uint8_t *recvBuffer, faabric_datatype_t *datatype, int count, faabric_op_t *operation)
-
void allReduce(int rank, uint8_t *sendBuffer, uint8_t *recvBuffer, faabric_datatype_t *datatype, int count, faabric_op_t *operation)
-
void op_reduce(faabric_op_t *operation, faabric_datatype_t *datatype, int count, uint8_t *inBuffer, uint8_t *resultBuffer)
-
void scan(int rank, uint8_t *sendBuffer, uint8_t *recvBuffer, faabric_datatype_t *datatype, int count, faabric_op_t *operation)
-
void allToAll(int rank, uint8_t *sendBuffer, faabric_datatype_t *sendType, int sendCount, uint8_t *recvBuffer, faabric_datatype_t *recvType, int recvCount)
-
void probe(int sendRank, int recvRank, MPI_Status *status)
-
void barrier(int thisRank)
-
std::shared_ptr<InMemoryMpiQueue> getLocalQueue(int sendRank, int recvRank)
-
long getLocalQueueSize(int sendRank, int recvRank)
-
void overrideHost(const std::string &newHost)
-
double getWTime()
-
void prepareMigration(int newGroupId, int thisRank, bool thisRankMustMigrate)
-
MpiWorld()
-
class MpiWorldRegistry
-
typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue
-
namespace planner
Typedefs
-
typedef std::promise<std::shared_ptr<faabric::Message>> MessageResultPromise
-
typedef std::shared_ptr<MessageResultPromise> MessageResultPromisePtr
-
using header = beast::http::field
Enums
-
enum FlushType
Values:
-
enumerator NoFlushType
-
enumerator Hosts
-
enumerator Executors
-
enumerator SchedulingState
-
enumerator NoFlushType
-
enum PlannerCalls
Values:
-
enumerator NoPlanerCall
-
enumerator Ping
-
enumerator GetAvailableHosts
-
enumerator RegisterHost
-
enumerator RemoveHost
-
enumerator SetMessageResult
-
enumerator GetMessageResult
-
enumerator GetBatchResults
-
enumerator GetSchedulingDecision
-
enumerator GetNumMigrations
-
enumerator CallBatch
-
enumerator PreloadSchedulingDecision
-
enumerator NoPlanerCall
-
class KeepAliveThread : public faabric::util::PeriodicBackgroundThread
Public Functions
-
virtual void doWork() override
Public Members
-
std::shared_ptr<RegisterHostRequest> thisHostReq = nullptr
-
virtual void doWork() override
-
class Planner
Public Functions
-
Planner()
-
PlannerConfig getConfig()
-
void printConfig() const
-
std::string getPolicy()
-
void setPolicy(const std::string &newPolicy)
-
bool reset()
-
std::vector<std::shared_ptr<Host>> getAvailableHosts()
-
bool registerHost(const Host &hostIn, bool overwrite)
-
void removeHost(const Host &hostIn)
-
std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(int32_t appId)
-
faabric::batch_scheduler::InFlightReqs getInFlightReqs()
-
int getNumMigrations()
-
std::set<std::string> getNextEvictedHostIps()
-
std::map<int32_t, std::shared_ptr<BatchExecuteRequest>> getEvictedReqs()
-
void setNextEvictedVm(const std::set<std::string> &vmIp)
-
Planner()
-
struct PlannerCache
Public Members
-
std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults
-
std::set<std::string> pushedSnapshots
-
std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults
-
class PlannerClient : public faabric::transport::MessageEndpointClient
Public Functions
-
PlannerClient()
-
PlannerClient(const std::string &plannerIp)
-
void ping()
-
void clearCache()
-
std::vector<Host> getAvailableHosts()
-
faabric::Message getMessageResult(int appId, int msgId, int timeoutMs)
-
int getNumMigrations()
-
PlannerClient()
-
class PlannerEndpointHandler : public faabric::endpoint::HttpRequestHandler, public std::enable_shared_from_this<PlannerEndpointHandler>
Public Functions
-
virtual void onRequest(faabric::endpoint::HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) override
-
virtual void onRequest(faabric::endpoint::HttpRequestContext &&ctx, faabric::util::BeastHttpRequest &&request) override
-
class PlannerServer : public faabric::transport::MessageEndpointServer
Public Functions
-
PlannerServer()
-
PlannerServer()
-
struct PlannerState
Public Members
-
std::string policy
-
std::map<std::string, std::shared_ptr<Host>> hostMap
-
std::map<int, std::map<int, std::shared_ptr<faabric::Message>>> appResults
-
std::map<int, std::vector<std::string>> appResultWaiters
-
faabric::batch_scheduler::InFlightReqs inFlightReqs
-
std::map<int, std::shared_ptr<batch_scheduler::SchedulingDecision>> preloadedSchedulingDecisions
-
std::atomic<int> numMigrations = 0
-
std::map<int, std::shared_ptr<BatchExecuteRequest>> evictedRequests
-
std::set<std::string> nextEvictedHostIps
-
std::string policy
-
typedef std::promise<std::shared_ptr<faabric::Message>> MessageResultPromise
-
namespace redis
Typedefs
-
using UniqueRedisReply = std::unique_ptr<redisReply, decltype(&freeReplyObject)>
Enums
-
enum RedisRole
Values:
-
enumerator QUEUE
-
enumerator STATE
-
enumerator QUEUE
Functions
-
UniqueRedisReply wrapReply(redisReply *r)
-
UniqueRedisReply safeRedisCommand(redisContext *c, const char *format, ...)
-
long getLongFromReply(const redisReply &reply)
-
std::vector<uint8_t> getBytesFromReply(const redisReply &reply)
-
void getBytesFromReply(const std::string &key, const redisReply &reply, uint8_t *buffer, size_t bufferLen)
-
long extractScriptResult(const redisReply &reply)
—— Lua scripts ——
-
std::set<std::string> extractStringSetFromReply(const redisReply &reply)
-
class Redis
Public Functions
-
~Redis()
-
void ping()
—— Standard Redis commands ——
-
std::vector<uint8_t> get(const std::string &key)
-
size_t strlen(const std::string &key)
-
void get(const std::string &key, uint8_t *buffer, size_t size)
-
void set(const std::string &key, const std::vector<uint8_t> &value)
-
void set(const std::string &key, const uint8_t *value, size_t size)
-
void del(const std::string &key)
-
long getCounter(const std::string &key)
-
long incr(const std::string &key)
-
long decr(const std::string &key)
-
long incrByLong(const std::string &key, long val)
-
long decrByLong(const std::string &key, long val)
-
void setRange(const std::string &key, long offset, const uint8_t *value, size_t size)
-
void setRangePipeline(const std::string &key, long offset, const uint8_t *value, size_t size)
-
void flushPipeline(long pipelineLength)
-
void getRange(const std::string &key, uint8_t *buffer, size_t bufferLen, long start, long end)
Note that start/end are both inclusive
-
void sadd(const std::string &key, const std::string &value)
-
void srem(const std::string &key, const std::string &value)
-
long scard(const std::string &key)
-
bool sismember(const std::string &key, const std::string &value)
-
std::string srandmember(const std::string &key)
-
std::set<std::string> smembers(const std::string &key)
-
std::set<std::string> sdiff(const std::string &keyA, const std::string &keyB)
-
std::set<std::string> sinter(const std::string &keyA, const std::string &keyB)
-
int lpushLong(const std::string &key, long val)
-
int rpushLong(const std::string &key, long val)
-
void flushAll()
-
long listLength(const std::string &queueName)
-
long getTtl(const std::string &key)
-
void expire(const std::string &key, long expiry)
-
void refresh()
-
uint32_t acquireLock(const std::string &key, int expirySeconds)
—— Locking ——
-
void releaseLock(const std::string &key, uint32_t lockId)
-
void delIfEq(const std::string &key, uint32_t value)
-
bool setnxex(const std::string &key, long value, int expirySeconds)
-
long getLong(const std::string &key)
-
void setLong(const std::string &key, long value)
-
void enqueue(const std::string &queueName, const std::string &value)
—— Queueing ——
-
void enqueueBytes(const std::string &queueName, const std::vector<uint8_t> &value)
-
void enqueueBytes(const std::string &queueName, const uint8_t *buffer, size_t bufferLen)
-
std::string dequeue(const std::string &queueName, int timeout = DEFAULT_TIMEOUT)
-
std::vector<uint8_t> dequeueBytes(const std::string &queueName, int timeout = DEFAULT_TIMEOUT)
-
size_t dequeueBytes(const std::string &queueName, uint8_t *buffer, size_t bufferLen, int timeout = DEFAULT_TIMEOUT)
-
void dequeueMultiple(const std::string &queueName, uint8_t *buff, long buffLen, long nElems)
-
void publishSchedulerResult(const std::string &key, const std::string &status_key, const std::vector<uint8_t> &result)
-
~Redis()
-
class RedisInstance
Public Functions
-
explicit RedisInstance(RedisRole role)
Public Members
-
std::string delifeqSha
-
std::string schedPublishSha
-
std::string ip
-
std::string hostname
-
int port
-
explicit RedisInstance(RedisRole role)
-
class RedisNoResponseException : public faabric::util::FaabricException
Public Functions
-
inline explicit RedisNoResponseException(std::string message)
-
inline explicit RedisNoResponseException(std::string message)
-
using UniqueRedisReply = std::unique_ptr<redisReply, decltype(&freeReplyObject)>
-
namespace runner
-
class FaabricMain
Public Functions
-
void startBackground()
-
void startRunner()
-
void startFunctionCallServer()
-
void startStateServer()
-
void startSnapshotServer()
-
void startPointToPointServer()
-
void shutdown()
-
void startBackground()
-
class FaabricMain
-
namespace scheduler
Typedefs
-
typedef std::pair<std::string, InMemoryMessageQueue*> InMemoryMessageQueuePair
Enums
-
enum FunctionCalls
Values:
-
enumerator NoFunctionCall
-
enumerator ExecuteFunctions
-
enumerator Flush
-
enumerator SetMessageResult
-
enumerator NoFunctionCall
Functions
-
std::vector<std::pair<std::string, faabric::Message>> getFunctionCalls()
-
std::vector<std::pair<std::string, faabric::EmptyRequest>> getFlushCalls()
-
std::vector<std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>> getBatchRequests()
-
std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>> getMessageResults()
-
void clearMockRequests()
-
std::shared_ptr<FunctionCallClient> getFunctionCallClient(const std::string &otherHost)
-
void clearFunctionCallClients()
-
Scheduler &getScheduler()
-
std::string getChainedKey(unsigned int msgId)
Variables
-
static std::mutex mockMutex
-
static std::vector<std::pair<std::string, faabric::Message>> functionCalls
-
static std::vector<std::pair<std::string, faabric::EmptyRequest>> flushCalls
-
static std::vector<std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>> batchMessages
-
static std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>> messageResults
-
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<faabric::scheduler::FunctionCallClient>> functionCallClients
-
class FunctionCallClient : public faabric::transport::MessageEndpointClient
Public Functions
-
explicit FunctionCallClient(const std::string &hostIn)
-
void sendFlush()
-
explicit FunctionCallClient(const std::string &hostIn)
-
class FunctionCallServer : public faabric::transport::MessageEndpointServer
Public Functions
-
FunctionCallServer()
-
FunctionCallServer()
-
class Scheduler
Public Functions
-
Scheduler()
-
~Scheduler()
-
void reset()
-
void resetThreadLocalCache()
-
void shutdown()
-
inline bool isShutdown()
-
void broadcastSnapshotDelete(const faabric::Message &msg, const std::string &snapshotKey)
-
int reapStaleExecutors()
-
long getFunctionExecutorCount(const faabric::Message &msg)
-
void setThreadResultLocally(uint32_t appId, uint32_t msgId, int32_t returnValue, faabric::transport::Message &message)
Caches a message along with the thread result, to allow the thread result to refer to data held in that message (i.e. snapshot diffs). The message will be destroyed once the thread result is consumed.
-
size_t getCachedMessageCount()
-
std::string getThisHost()
-
void addHostToGlobalSet()
-
void removeHostFromGlobalSet(const std::string &host)
-
void setThisHostResources(faabric::HostResources &res)
-
std::vector<faabric::Message> getRecordedMessages()
-
void clearRecordedMessages()
-
Scheduler()
-
class SchedulerReaperThread : public faabric::util::PeriodicBackgroundThread
- #include <Scheduler.h>
Background thread that periodically checks to see if any executors have become stale (i.e. not handled any requests in a given timeout). If any are found, they are removed.
Public Functions
-
virtual void doWork() override
-
virtual void doWork() override
-
typedef std::pair<std::string, InMemoryMessageQueue*> InMemoryMessageQueuePair
-
namespace snapshot
Enums
-
enum SnapshotCalls
Values:
-
enumerator NoSnapshotCall
-
enumerator PushSnapshot
-
enumerator PushSnapshotUpdate
-
enumerator DeleteSnapshot
-
enumerator ThreadResult
-
enumerator NoSnapshotCall
Functions
-
std::vector<std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>> getSnapshotPushes()
-
std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>> getSnapshotDiffPushes()
-
std::vector<std::pair<std::string, std::string>> getSnapshotDeletes()
-
std::vector<std::pair<std::string, MockThreadResult>> getThreadResults()
-
void clearMockSnapshotRequests()
-
std::shared_ptr<SnapshotClient> getSnapshotClient(const std::string &otherHost)
-
void clearSnapshotClients()
-
SnapshotRegistry &getSnapshotRegistry()
Variables
-
static std::mutex mockMutex
-
static std::vector<std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>> snapshotPushes
-
static std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>> snapshotDiffPushes
-
static std::vector<std::pair<std::string, std::string>> snapshotDeletes
-
static std::vector<std::pair<std::string, MockThreadResult>> threadResults
-
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<faabric::snapshot::SnapshotClient>> snapshotClients
-
struct MockThreadResult
Public Members
-
uint32_t msgId = 0
-
int res = 0
-
std::string key
-
std::vector<faabric::util::SnapshotDiff> diffs
-
uint32_t msgId = 0
-
class SnapshotClient : public faabric::transport::MessageEndpointClient
Public Functions
-
explicit SnapshotClient(const std::string &hostIn)
-
void pushThreadResult(uint32_t appId, uint32_t messageId, int returnValue, const std::string &key, const std::vector<faabric::util::SnapshotDiff> &diffs)
-
explicit SnapshotClient(const std::string &hostIn)
-
class SnapshotRegistry
Public Functions
-
SnapshotRegistry() = default
-
std::shared_ptr<faabric::util::SnapshotData> getSnapshot(const std::string &key)
-
bool snapshotExists(const std::string &key)
-
void deleteSnapshot(const std::string &key)
-
size_t getSnapshotCount()
-
void clear()
-
SnapshotRegistry() = default
-
class SnapshotServer : public faabric::transport::MessageEndpointServer
Public Functions
-
SnapshotServer()
-
SnapshotServer()
-
enum SnapshotCalls
-
namespace state
WARNING - key-value objects are shared between threads, BUT hiredis is not thread-safe, so make sure you always retrieve the reference to Redis inline rather than sharing a reference within the class.
Enums
-
enum InMemoryStateKeyStatus
Values:
-
enumerator NOT_MAIN
-
enumerator MAIN
-
enumerator NOT_MAIN
-
enum StateCalls
Values:
-
enumerator NoStateCall
-
enumerator Pull
-
enumerator Push
-
enumerator Size
-
enumerator Append
-
enumerator ClearAppended
-
enumerator PullAppended
-
enumerator Delete
-
enumerator NoStateCall
Functions
-
InMemoryStateRegistry &getInMemoryStateRegistry()
-
State &getGlobalState()
-
static std::string getMasterKey(const std::string &user, const std::string &key)
-
class AppendedInMemoryState
Public Functions
-
inline AppendedInMemoryState(size_t lengthIn, std::unique_ptr<uint8_t[]> &&dataIn)
Public Members
-
size_t length
-
std::unique_ptr<uint8_t[]> data
-
inline AppendedInMemoryState(size_t lengthIn, std::unique_ptr<uint8_t[]> &&dataIn)
-
class InMemoryStateKeyValue : public faabric::state::StateKeyValue
Public Functions
-
InMemoryStateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn, const std::string &thisIPIn)
-
InMemoryStateKeyValue(const std::string &userIn, const std::string &keyIn, const std::string &thisIPIn)
-
bool isMaster()
-
AppendedInMemoryState &getAppendedValue(uint idx)
Public Static Functions
-
static size_t getStateSizeFromRemote(const std::string &userIn, const std::string &keyIn, const std::string &thisIPIn)
-
static void deleteFromRemote(const std::string &userIn, const std::string &keyIn, const std::string &thisIPIn)
-
static void clearAll(bool global)
-
InMemoryStateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn, const std::string &thisIPIn)
-
class InMemoryStateRegistry
Public Functions
-
InMemoryStateRegistry() = default
-
std::string getMasterIP(const std::string &user, const std::string &key, const std::string &thisIP, bool claim)
-
std::string getMasterIPForOtherMaster(const std::string &userIn, const std::string &keyIn, const std::string &thisIP)
-
void clear()
-
InMemoryStateRegistry() = default
-
class RedisStateKeyValue : public faabric::state::StateKeyValue
Public Functions
-
RedisStateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn)
-
RedisStateKeyValue(const std::string &userIn, const std::string &keyIn)
Public Static Functions
-
static size_t getStateSizeFromRemote(const std::string &userIn, const std::string &keyIn)
-
static void deleteFromRemote(const std::string &userIn, const std::string &keyIn)
-
static void clearAll(bool global)
-
RedisStateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn)
-
class State
Public Functions
-
explicit State(std::string thisIPIn)
-
size_t getStateSize(const std::string &user, const std::string &keyIn)
-
std::shared_ptr<StateKeyValue> getKV(const std::string &user, const std::string &key, size_t size)
-
std::shared_ptr<StateKeyValue> getKV(const std::string &user, const std::string &key)
-
void forceClearAll(bool global)
-
void deleteKV(const std::string &userIn, const std::string &keyIn)
-
void deleteKVLocally(const std::string &userIn, const std::string &keyIn)
-
size_t getKVCount()
-
std::string getThisIP()
-
explicit State(std::string thisIPIn)
-
class StateChunk
Public Functions
-
inline StateChunk(long offsetIn, size_t lengthIn, uint8_t *dataIn)
-
inline StateChunk(long offsetIn, std::vector<uint8_t> &data)
Public Members
-
long offset
-
size_t length
-
uint8_t *data
-
inline StateChunk(long offsetIn, size_t lengthIn, uint8_t *dataIn)
-
class StateClient : public faabric::transport::MessageEndpointClient
Public Functions
-
explicit StateClient(const std::string &userIn, const std::string &keyIn, const std::string &hostIn)
-
void pushChunks(const std::vector<StateChunk> &chunks)
-
void pullChunks(const std::vector<StateChunk> &chunks, uint8_t *bufferStart)
-
void append(const uint8_t *data, size_t length)
-
void pullAppended(uint8_t *buffer, size_t length, long nValues)
-
void clearAppended()
-
size_t stateSize()
-
void deleteState()
-
void lock()
-
void unlock()
Public Members
-
const std::string user
-
const std::string key
-
explicit StateClient(const std::string &userIn, const std::string &keyIn, const std::string &hostIn)
-
class StateKeyValue
Subclassed by faabric::state::InMemoryStateKeyValue, faabric::state::RedisStateKeyValue
Public Functions
-
StateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn)
-
StateKeyValue(const std::string &userIn, const std::string &keyIn)
-
void get(uint8_t *buffer)
-
uint8_t *get()
-
void getChunk(long offset, uint8_t *buffer, size_t length)
-
uint8_t *getChunk(long offset, long len)
-
std::vector<StateChunk> getAllChunks()
-
void set(const uint8_t *buffer)
-
void setChunk(long offset, const uint8_t *buffer, size_t length)
-
void append(const uint8_t *buffer, size_t length)
-
void getAppended(uint8_t *buffer, size_t length, long nValues)
-
void clearAppended()
-
void pull()
-
void pushPartial()
-
void lockRead()
-
void unlockRead()
-
void lockWrite()
-
void unlockWrite()
-
void flagDirty()
-
void flagChunkDirty(long offset, long len)
-
size_t size() const
-
void pushFull()
Public Members
-
const std::string user
-
const std::string key
Public Static Functions
-
static uint32_t waitOnRedisRemoteLock(const std::string &redisKey)
-
StateKeyValue(const std::string &userIn, const std::string &keyIn, size_t sizeIn)
-
class StateKeyValueException : public runtime_error
Public Functions
-
inline explicit StateKeyValueException(const std::string &message)
-
inline explicit StateKeyValueException(const std::string &message)
-
class StateServer : public faabric::transport::MessageEndpointServer
Public Functions
-
explicit StateServer(State &stateIn)
-
explicit StateServer(State &stateIn)
-
enum InMemoryStateKeyStatus
-
namespace transport
Enums
-
enum class MessageResponseCode
Types of message send/ receive outcomes.
Values:
-
enumerator SUCCESS
-
enumerator TERM
-
enumerator TIMEOUT
-
enumerator ERROR
-
enumerator SUCCESS
-
enum MessageEndpointConnectType
Values:
-
enumerator BIND
-
enumerator CONNECT
-
enumerator BIND
-
enum class SocketType
Values:
-
enumerator pair
-
enumerator pub
-
enumerator sub
-
enumerator pull
-
enumerator push
-
enumerator rep
-
enumerator req
-
enumerator pair
-
enum PointToPointCall
Values:
-
enumerator MAPPING
-
enumerator MESSAGE
-
enumerator LOCK_GROUP
-
enumerator LOCK_GROUP_RECURSIVE
-
enumerator UNLOCK_GROUP
-
enumerator UNLOCK_GROUP_RECURSIVE
-
enumerator MAPPING
Functions
-
PointToPointBroker &getPointToPointBroker()
-
std::vector<std::pair<std::string, faabric::PointToPointMappings>> getSentMappings()
-
std::vector<std::pair<std::string, faabric::PointToPointMessage>> getSentPointToPointMessages()
-
std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> getSentLockMessages()
-
void clearSentMessages()
-
static std::shared_ptr<PointToPointClient> getClient(const std::string &host)
-
std::string getPointToPointKey(int groupId, int sendIdx, int recvIdx)
-
std::string getPointToPointKey(int groupId, int recvIdx)
-
auto getEndpointPtrs(const std::string &label)
Variables
-
static std::map<MessageResponseCode, std::string> MessageResponseCodeText = {{MessageResponseCode::SUCCESS, "Success"}, {MessageResponseCode::TERM, "Connection terminated"}, {MessageResponseCode::TIMEOUT, "Message timed out"}, {MessageResponseCode::ERROR, "Error"},}
-
static faabric::util::ConcurrentMap<int, std::shared_ptr<PointToPointGroup>> groups
-
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<std::tuple<std::unique_ptr<AsyncInternalRecvMessageEndpoint>, std::unique_ptr<AsyncInternalSendMessageEndpoint>, std::atomic_int32_t>>> endpoints
-
thread_local absl::flat_hash_set<std::string> threadEndpoints
-
static faabric::util::ConcurrentMap<std::string, std::shared_ptr<PointToPointClient>> clients
-
thread_local int currentGroupId = NO_CURRENT_GROUP_ID
-
thread_local std::vector<int> sentMsgCount
-
thread_local std::vector<int> recvMsgCount
-
thread_local std::vector<std::list<Message>> outOfOrderMsgs
-
static std::mutex mockMutex
-
static std::vector<std::pair<std::string, faabric::PointToPointMappings>> sentMappings
-
static std::vector<std::pair<std::string, faabric::PointToPointMessage>> sentMessages
-
static std::vector<std::tuple<std::string, faabric::transport::PointToPointCall, faabric::PointToPointMessage>> sentLockMessages
-
class AsyncDirectRecvEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
AsyncDirectRecvEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncDirectSendEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void send(uint8_t header, const uint8_t *data, size_t dataSize)
-
AsyncDirectSendEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint
Public Functions
-
AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
AsyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncInternalRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
AsyncInternalRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncInternalSendMessageEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNumber = NO_SEQUENCE_NUM)
-
AsyncInternalSendMessageEndpoint(const std::string &inProcLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
AsyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
AsyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class AsyncSendMessageEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void send(uint8_t header, const uint8_t *data, size_t dataSize, int sequenceNum = NO_SEQUENCE_NUM)
-
AsyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class FanMessageEndpoint : public faabric::transport::MessageEndpoint
Subclassed by faabric::transport::AsyncFanMessageEndpoint, faabric::transport::SyncFanMessageEndpoint
Public Functions
-
FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
-
MessageContext attachFanOut()
-
Message recv(const MessageContext &ctx)
-
void sendResponse(const MessageContext &ctx, uint8_t header, const uint8_t *data, size_t dataSize)
-
void stop()
-
FanMessageEndpoint(int portIn, int timeoutMs, SocketType socketType, bool isAsync)
-
class Message
- #include <Message.h>
Represents message data passed around the transport layer. Essentially an array of bytes, with a size and a flag to say whether there’s more data to follow.
Messages are not copyable, only movable, as they will regularly contain large amounts of data.
Public Functions
-
Message(size_t bufferSize)
-
Message(nng_msg *nngMsg)
-
Message(MessageResponseCode responseCodeIn)
-
~Message()
-
Message(const Message &other) = delete
-
inline Message(Message &&other)
-
inline MessageResponseCode getResponseCode()
-
inline std::span<uint8_t> allData()
-
inline std::span<const uint8_t> allData() const
-
std::span<char> data()
-
std::span<const char> data() const
-
std::span<uint8_t> udata()
-
std::span<const uint8_t> udata() const
-
std::vector<uint8_t> dataCopy() const
-
inline uint8_t getMessageCode() const
-
inline uint64_t getDeclaredDataSize() const
-
inline int getSequenceNum() const
-
Message(size_t bufferSize)
-
class MessageContext
Public Functions
-
MessageContext() = default
-
inline explicit MessageContext(nng_ctx context)
-
MessageContext(const MessageContext&) = delete
-
MessageContext &operator=(const MessageContext&) = delete
-
inline MessageContext(MessageContext &&rhs)
-
inline MessageContext &operator=(MessageContext &&rhs)
-
inline ~MessageContext()
Public Members
-
nng_ctx context = NNG_CTX_INITIALIZER
-
MessageContext() = default
-
class MessageEndpoint
Subclassed by faabric::transport::AsyncDirectSendEndpoint, faabric::transport::AsyncInternalSendMessageEndpoint, faabric::transport::AsyncSendMessageEndpoint, faabric::transport::FanMessageEndpoint, faabric::transport::RecvMessageEndpoint, faabric::transport::SyncSendMessageEndpoint
Public Functions
-
MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
-
MessageEndpoint(const std::string &addressIn, int timeoutMsIn)
-
MessageEndpoint &operator=(const MessageEndpoint&) = delete
-
MessageEndpoint(const MessageEndpoint &ctx) = delete
-
virtual ~MessageEndpoint()
-
std::string getAddress()
-
MessageEndpoint(const std::string &hostIn, int portIn, int timeoutMsIn)
-
class MessageEndpointClient
Subclassed by faabric::planner::PlannerClient, faabric::scheduler::FunctionCallClient, faabric::snapshot::SnapshotClient, faabric::state::StateClient, faabric::transport::PointToPointClient
Public Functions
-
MessageEndpointClient(std::string hostIn, int asyncPort, int syncPort, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void asyncSend(int header, google::protobuf::Message *msg, int sequenceNum = NO_SEQUENCE_NUM)
-
void asyncSend(int header, const uint8_t *buffer, size_t bufferSize, int sequenceNum = NO_SEQUENCE_NUM)
-
void syncSend(int header, google::protobuf::Message *msg, google::protobuf::Message *response)
-
void syncSend(int header, const uint8_t *buffer, size_t bufferSize, google::protobuf::Message *response)
-
MessageEndpointClient(std::string hostIn, int asyncPort, int syncPort, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class MessageEndpointServer
Subclassed by faabric::planner::PlannerServer, faabric::scheduler::FunctionCallServer, faabric::snapshot::SnapshotServer, faabric::state::StateServer, faabric::transport::PointToPointServer
Public Functions
-
MessageEndpointServer(int asyncPortIn, int syncPortIn, const std::string &inprocLabelIn, int nThreadsIn)
-
virtual void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
We need to guarantee to callers of this function, that when it returns, the server will be ready to use.
-
virtual void stop()
-
virtual void onWorkerStop()
-
void setRequestLatch()
-
void awaitRequestLatch()
-
int getNThreads()
-
MessageEndpointServer(int asyncPortIn, int syncPortIn, const std::string &inprocLabelIn, int nThreadsIn)
-
class MessageEndpointServerHandler
Public Functions
-
MessageEndpointServerHandler(MessageEndpointServer *serverIn, bool asyncIn, const std::string &inprocLabelIn, int nThreadsIn)
-
void start(int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void join()
-
MessageEndpointServerHandler(MessageEndpointServer *serverIn, bool asyncIn, const std::string &inprocLabelIn, int nThreadsIn)
-
class MessageTimeoutException : public faabric::util::FaabricException
Public Functions
-
inline explicit MessageTimeoutException(std::string message)
-
inline explicit MessageTimeoutException(std::string message)
-
class PointToPointBroker
Public Functions
-
PointToPointBroker()
-
std::string getHostForReceiver(int groupId, int recvIdx)
-
int getMpiPortForReceiver(int groupId, int recvIdx)
-
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
-
void setAndSendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision)
-
void sendMappingsFromSchedulingDecision(const faabric::batch_scheduler::SchedulingDecision &decision, const std::set<std::string> &hostList)
-
void waitForMappingsOnThisHost(int groupId)
-
std::set<int> getIdxsRegisteredForGroup(int groupId)
-
std::set<std::string> getHostsRegisteredForGroup(int groupId)
-
void updateHostForIdx(int groupId, int groupIdx, std::string newHost)
-
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, std::string hostHint, bool mustOrderMsg = false)
-
void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t *buffer, size_t bufferSize, bool mustOrderMsg = false, int sequenceNum = NO_SEQUENCE_NUM, std::string hostHint = "")
-
std::vector<uint8_t> recvMessage(int groupId, int sendIdx, int recvIdx, bool mustOrderMsg = false)
-
void clearGroup(int groupId)
-
void clear()
-
void resetThreadLocalCache()
-
void postMigrationHook(faabric::Message &msg)
-
PointToPointBroker()
-
class PointToPointClient : public faabric::transport::MessageEndpointClient
Public Functions
-
PointToPointClient(const std::string &hostIn)
-
void sendMappings(faabric::PointToPointMappings &mappings)
-
void sendMessage(faabric::PointToPointMessage &msg, int sequenceNum = NO_SEQUENCE_NUM)
-
void groupLock(int appId, int groupId, int groupIdx, bool recursive = false)
-
void groupUnlock(int appId, int groupId, int groupIdx, bool recursive = false)
-
PointToPointClient(const std::string &hostIn)
-
class PointToPointGroup
Public Functions
-
PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
-
void lock(int groupIdx, bool recursive)
-
void unlock(int groupIdx, bool recursive)
-
int getLockOwner(bool recursive)
-
void localLock()
-
void localUnlock()
-
bool localTryLock()
-
void barrier(int groupIdx)
-
void notify(int groupIdx)
-
int getNotifyCount()
Public Static Functions
-
static std::shared_ptr<PointToPointGroup> getGroup(int groupId)
-
static std::shared_ptr<PointToPointGroup> getOrAwaitGroup(int groupId)
-
static bool groupExists(int groupId)
-
static void addGroup(int appId, int groupId, int groupSize, bool isSingleHost)
-
static void addGroupIfNotExists(int appId, int groupId, int groupSize)
-
static void clearGroup(int groupId)
-
static void clear()
-
PointToPointGroup(int appId, int groupIdIn, int groupSizeIn, bool isSingleHostIn)
-
class PointToPointServer : public faabric::transport::MessageEndpointServer
Public Functions
-
PointToPointServer()
-
PointToPointServer()
-
class RecvMessageEndpoint : public faabric::transport::MessageEndpoint
Subclassed by faabric::transport::AsyncDirectRecvEndpoint, faabric::transport::AsyncInternalRecvMessageEndpoint, faabric::transport::AsyncRecvMessageEndpoint, faabric::transport::SyncRecvMessageEndpoint
Public Functions
-
RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)
Constructor for external TCP sockets
-
RecvMessageEndpoint(std::string inProcLabel, int timeoutMs, SocketType socketType, MessageEndpointConnectType connectType)
Constructor for internal inproc sockets
-
inline virtual ~RecvMessageEndpoint()
-
virtual Message recv()
-
RecvMessageEndpoint(int portIn, int timeoutMs, SocketType socketType)
-
class SyncFanMessageEndpoint : public faabric::transport::FanMessageEndpoint
Public Functions
-
SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
SyncFanMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class SyncRecvMessageEndpoint : public faabric::transport::RecvMessageEndpoint
Public Functions
-
SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
SyncRecvMessageEndpoint(int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
virtual Message recv() override
-
void sendResponse(uint8_t header, const uint8_t *data, size_t dataSize)
-
SyncRecvMessageEndpoint(const std::string &inprocLabel, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
class SyncSendMessageEndpoint : public faabric::transport::MessageEndpoint
Public Functions
-
SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
void sendRaw(const uint8_t *data, size_t dataSize)
-
Message sendAwaitResponse(uint8_t header, const uint8_t *data, size_t dataSize)
-
SyncSendMessageEndpoint(const std::string &hostIn, int portIn, int timeoutMs = DEFAULT_SOCKET_TIMEOUT_MS)
-
namespace tcp
Functions
-
void setReuseAddr(int connFd)
-
void setNoDelay(int connFd)
-
void setQuickAck(int connFd)
-
void setNonBlocking(int connFd)
-
void setBlocking(int connFd)
-
bool isNonBlocking(int connFd)
-
void setBusyPolling(int connFd)
-
void setRecvTimeoutMs(int connFd, int timeoutMs)
-
void setSendTimeoutMs(int connFd, int timeoutMs)
-
void setRecvBufferSize(int connFd, size_t bufferSizeBytes)
-
void setSendBufferSize(int connFd, size_t bufferSizeBytes)
Variables
-
const int SocketTimeoutMs = 5000
-
const size_t SocketBufferSizeBytes = 16777216
-
class Address
Public Functions
-
Address(const std::string &host, int port)
-
Address(int port)
-
sockaddr *get() const
-
Address(const std::string &host, int port)
-
class RecvSocket
Public Functions
-
RecvSocket(int port, const std::string &host = ANY_HOST)
-
RecvSocket(const RecvSocket &recvSocket) = delete
-
~RecvSocket()
-
void listen()
-
int accept()
-
void recvOne(int conn, uint8_t *buffer, size_t bufferSize)
-
RecvSocket(int port, const std::string &host = ANY_HOST)
-
class SendSocket
Public Functions
-
SendSocket(const std::string &host, int port)
-
void dial()
-
void sendOne(const uint8_t *buffer, size_t bufferSize)
-
SendSocket(const std::string &host, int port)
-
class Socket
Public Functions
-
Socket()
-
Socket(int connFd)
-
Socket(const Socket &socket) = delete
-
~Socket()
-
inline int get() const
-
Socket()
-
void setReuseAddr(int connFd)
-
enum class MessageResponseCode
-
namespace util
Typedefs
-
using BeastHttpRequest = beast::http::request<beast::http::string_body>
-
using BeastHttpResponse = beast::http::response<beast::http::string_body>
-
typedef std::chrono::steady_clock::time_point TimePoint
-
typedef std::unique_lock<std::mutex> UniqueLock
-
typedef std::unique_lock<std::shared_mutex> FullLock
-
typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>> MemoryRegion
Enums
-
enum DeltaCommand
Values:
-
enumerator DELTACMD_TOTAL_SIZE
-
enumerator DELTACMD_ZSTD_COMPRESSED_COMMANDS
-
enumerator DELTACMD_DELTA_OVERWRITE
-
enumerator DELTACMD_DELTA_XOR
-
enumerator DELTACMD_END
-
enumerator DELTACMD_TOTAL_SIZE
-
enum SnapshotDataType
Defines the permitted datatypes for snapshot diffs. Each has a predefined length, except for the raw option which is used for generic streams of bytes.
Values:
-
enumerator Raw
-
enumerator Bool
-
enumerator Int
-
enumerator Long
-
enumerator Float
-
enumerator Double
-
enumerator Raw
-
enum SnapshotMergeOperation
Defines the operation to perform when merging the diff with the original snapshot.
WARNING: this enum forms part of the API with executing applications, so make sure they are updated accordingly when it’s changed.
Values:
-
enumerator Bytewise
-
enumerator Sum
-
enumerator Product
-
enumerator Subtract
-
enumerator Max
-
enumerator Min
-
enumerator Ignore
-
enumerator XOR
-
enumerator Bytewise
Functions
-
std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory()
-
std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory(const std::string &user, const std::string &function, int count = 1)
-
std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(int32_t appId)
-
std::vector<uint8_t> stringToBytes(const std::string &str)
-
int bytesToInt(const std::vector<uint8_t> &bytes)
-
std::string bytesToString(const std::vector<uint8_t> &bytes)
-
std::string formatByteArrayToIntString(const std::vector<uint8_t> &bytes)
-
void trimTrailingZeros(std::vector<uint8_t> &vectorIn)
-
int safeCopyToBuffer(const std::vector<uint8_t> &dataIn, uint8_t *buffer, int bufferLen)
-
int safeCopyToBuffer(const uint8_t *dataIn, int dataLen, uint8_t *buffer, int bufferLen)
-
std::string byteArrayToHexString(const uint8_t *data, int dataSize)
-
std::vector<uint8_t> hexStringToByteArray(const std::string &hexString)
-
template<typename T>
std::string intToHexString(T i)
-
template<class T>
T unalignedRead(const uint8_t *bytes)
-
template<class T>
void unalignedWrite(const T &value, uint8_t *destination)
-
template<class T>
void appendBytesOf(std::vector<uint8_t> &container, T value)
-
template<class T>
size_t readBytesOf(const std::vector<uint8_t> &container, size_t offset, T *outValue)
-
template<typename T>
std::vector<uint8_t> valueToBytes(T val)
-
Clock &getGlobalClock()
-
SystemConfig &getSystemConfig()
-
void setUpCrashHandler(int sig = -1)
-
void handleCrash(int sig)
-
std::vector<uint8_t> serializeDelta(const DeltaSettings &cfg, const uint8_t *oldDataStart, size_t oldDataLen, const uint8_t *newDataStart, size_t newDataLen)
-
void applyDelta(const std::vector<uint8_t> &delta, std::function<void(uint32_t)> setDataSize, std::function<uint8_t*()> getDataPointer)
-
std::shared_ptr<DirtyTracker> getDirtyTracker()
Returns the dirty tracker singleton. The dirty tracking mode is determined in the system config.
-
void resetDirtyTracker()
Resets the dirty tracker singleton (e.g. if the config has been changed).
-
std::string getEnvVar(const std::string &key, const std::string &deflt)
-
std::string setEnvVar(const std::string &varName, const std::string &value)
-
void unsetEnvVar(const std::string &varName)
-
unsigned int getUsableCores()
-
ExecGraphNode getFunctionExecGraphNode(int appId, int msgId)
-
std::set<unsigned int> getChainedFunctions(const faabric::Message &msg)
-
int countExecGraphNodes(const ExecGraph &graph)
-
std::set<std::string> getExecGraphHosts(const ExecGraph &graph)
-
std::vector<std::string> getMpiRankHostsFromExecGraph(const ExecGraph &graph)
-
std::pair<std::vector<std::string>, std::vector<std::string>> getMigratedMpiRankHostsFromExecGraph(const ExecGraph &graph)
-
std::string execNodeToJson(const ExecGraphNode &node)
-
std::string execGraphToJson(const ExecGraph &graph)
-
void addDetail(faabric::Message &msg, const std::string &key, const std::string &value)
-
void incrementCounter(faabric::Message &msg, const std::string &key, const int valueToIncrement = 1)
-
std::string readFileToString(const std::string &path)
-
std::vector<uint8_t> readFileToBytes(const std::string &path)
-
void writeBytesToFile(const std::string &path, const std::vector<uint8_t> &data)
-
bool isWasm(const std::vector<uint8_t> &bytes)
-
std::string funcToString(const faabric::Message &msg, bool includeId)
-
unsigned int setMessageId(faabric::Message &msg)
-
std::string buildAsyncResponse(const faabric::Message &msg)
-
faabric::Message messageFactory(const std::string &user, const std::string &function)
-
std::string resultKeyFromMessageId(unsigned int mid)
-
std::string statusKeyFromMessageId(unsigned int mid)
-
std::vector<uint8_t> messageToBytes(const faabric::Message &msg)
-
std::vector<std::string> getArgvForMessage(const faabric::Message &msg)
-
std::string getMainThreadSnapshotKey(const faabric::Message &msg)
-
unsigned int generateGid()
-
std::unique_ptr<FaabricCpuSet> pinThreadToFreeCpu(pthread_t thread)
-
std::string messageToJson(const google::protobuf::Message &msg)
-
void jsonToMessage(const std::string &jsonStr, google::protobuf::Message *msg)
-
void initLogging()
-
inline void *malloc(std::size_t size)
-
inline void free(void *ptr)
-
inline void *realloc(void *ptr, std::size_t newSize)
-
void mergeManyDirtyPages(std::vector<char> &dest, const std::vector<std::vector<char>> &source)
-
void mergeDirtyPages(std::vector<char> &dest, const std::vector<char> &source)
-
bool isPageAligned(const void *ptr)
-
size_t getRequiredHostPages(size_t nBytes)
-
size_t getRequiredHostPagesRoundDown(size_t nBytes)
-
size_t alignOffsetDown(size_t offset)
-
AlignedChunk getPageAlignedChunk(long offset, long length)
-
MemoryRegion allocatePrivateMemory(size_t size)
-
MemoryRegion allocateVirtualMemory(size_t size)
-
void claimVirtualMemory(std::span<uint8_t> region)
-
void mapMemoryPrivate(std::span<uint8_t> target, int fd)
-
void resizeFd(int fd, size_t size)
-
void writeToFd(int fd, off_t offset, std::span<const uint8_t> data)
-
int createFd(size_t size, const std::string &fdLabel)
-
void appendDataToFd(int fd, std::span<uint8_t> data)
-
std::string getIPFromHostname(const std::string &hostname)
-
std::string getPrimaryIPForThisHost(const std::string &interface)
Returns the IP for the given interface, or picks one based on an “appropriate” interface name.
-
std::string randomString(int len)
-
std::string randomStringFromSet(const std::unordered_set<std::string> &s)
-
void diffArrayRegions(std::vector<SnapshotDiff> &diffs, uint32_t startOffset, uint32_t endOffset, std::span<const uint8_t> a, std::span<const uint8_t> b)
-
template<typename T>
inline bool calculateDiffValue(const uint8_t *original, uint8_t *updated, SnapshotMergeOperation operation)
-
template<typename T>
inline T applyDiffValue(const uint8_t *original, const uint8_t *diff, SnapshotMergeOperation operation)
-
std::string snapshotDataTypeStr(SnapshotDataType dt)
-
std::string snapshotMergeOpStr(SnapshotMergeOperation op)
-
std::string keyForUser(const std::string &user, const std::string &key)
-
void maskDouble(unsigned int *maskArray, unsigned long idx)
-
bool isAllWhitespace(const std::string &input)
-
bool startsWith(const std::string &input, const std::string &subStr)
-
bool endsWith(const std::string &input, const std::string &subStr)
-
bool contains(const std::string &input, const std::string &subStr)
-
std::string removeSubstr(const std::string &input, const std::string &toErase)
-
bool stringIsInt(const std::string &input)
-
template<class T>
std::string vectorToString(std::vector<T> vec)
-
void setTestMode(bool val)
-
void setMockMode(bool val)
-
bool isTestMode()
-
bool isMockMode()
-
void startGlobalTimer()
-
void printTimerTotals()
-
uint64_t timespecToNanos(struct timespec *nativeTimespec)
-
void nanosToTimespec(uint64_t nanos, struct timespec *nativeTimespec)
-
void *pageAlignAddress(void *faultAddr)
-
int countExecGraphNode(const ExecGraphNode &node)
-
std::set<std::string> getExecGraphHostsForNode(const ExecGraphNode &node)
-
std::vector<std::string> getMpiRankHostsFromExecGraphNode(const ExecGraphNode &node)
-
size_t writeDataCallback(void *ptr, size_t size, size_t nmemb, void *stream)
-
static std::unique_ptr<FaabricCpuSet> getNextFreeCpu()
-
static void doPinThreadToCpu(pthread_t thread, cpu_set_t *cpuSet)
-
MemoryRegion doAlloc(size_t size, int prot, int flags)
-
void mapMemory(std::span<uint8_t> target, int fd, int flags)
Variables
-
constexpr uint8_t DELTA_PROTOCOL_VERSION = 1
-
constexpr int DELTA_ZSTD_COMPRESS_LEVEL = 1
-
const int NO_CPU_IDX = -1
-
const int GHA_CPU_IDX = -2
-
static const long HOST_PAGE_SIZE = sysconf(_SC_PAGESIZE)
-
static thread_local DirtyTrackingRecord tracking
-
static ThreadSafeDirtyTrackingRecord globalTracking
-
static std::shared_ptr<DirtyTracker> tracker = nullptr
-
static long uffd = -1
-
static bool uffdWriteProtect = false
-
static bool uffdSigbus = false
-
static int closeFd = -1
-
static std::shared_ptr<std::jthread> eventThread = nullptr
-
static FreeCpus freeCpus
-
static std::unordered_map<std::string, std::string> ipMap
-
static std::mutex hostnameMx
-
static std::atomic<bool> testMode = false
-
static std::atomic<bool> mockMode = false
-
struct AlignedChunk
Public Members
-
long originalOffset = 0
-
long originalLength = 0
-
long nBytesOffset = 0
-
long nBytesLength = 0
-
long nPagesOffset = 0
-
long nPagesLength = 0
-
long offsetRemainder = 0
-
long originalOffset = 0
-
class Barrier
Public Functions
-
explicit Barrier(int countIn, std::function<void()> completionFunctionIn, int timeoutMsIn)
-
void wait()
-
explicit Barrier(int countIn, std::function<void()> completionFunctionIn, int timeoutMsIn)
-
class ChainedCallFailedException : public faabric::util::FaabricException
Public Functions
-
inline explicit ChainedCallFailedException(std::string message)
-
inline explicit ChainedCallFailedException(std::string message)
-
class Clock
-
template<class Key, class Value>
class ConcurrentMap Public Types
-
using key_type = typename UnderlyingMap::key_type
-
using mapped_type = typename UnderlyingMap::value_type
-
using size_type = typename UnderlyingMap::size_type
-
using difference_type = typename UnderlyingMap::difference_type
Public Functions
-
ConcurrentMap() = default
-
inline ConcurrentMap(size_t initialCapacity)
-
ConcurrentMap(const ConcurrentMap&) = delete
-
ConcurrentMap &operator=(const ConcurrentMap&) = delete
-
ConcurrentMap(ConcurrentMap&&) = default
-
ConcurrentMap &operator=(ConcurrentMap&&) = default
-
inline void swap(ConcurrentMap<Key, Value> &other)
-
inline bool isEmpty() const
-
inline size_t size() const
-
inline size_t capacity() const
-
inline void reserve(size_t count)
-
inline void rehash(size_t count)
-
inline void clear()
-
using key_type = typename UnderlyingMap::key_type
-
struct DeltaSettings
Public Functions
-
explicit DeltaSettings(const std::string &definition)
-
std::string toString() const
Public Members
-
bool usePages = true
-
size_t pageSize = 4096
-
bool xorWithOld = true
-
bool useZstd = true
-
int zstdLevel = 1
-
explicit DeltaSettings(const std::string &definition)
-
class DirtyTracker
Subclassed by faabric::util::NoneDirtyTracker, faabric::util::SegfaultDirtyTracker, faabric::util::SoftPTEDirtyTracker, faabric::util::UffdDirtyTracker
Public Functions
-
inline DirtyTracker(const std::string &modeIn)
-
virtual void clearAll() = 0
-
virtual std::string getType() = 0
-
virtual void startTracking(std::span<uint8_t> region) = 0
-
virtual void stopTracking(std::span<uint8_t> region) = 0
-
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) = 0
-
virtual void startThreadLocalTracking(std::span<uint8_t> region) = 0
-
virtual void stopThreadLocalTracking(std::span<uint8_t> region) = 0
-
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) = 0
-
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) = 0
-
inline DirtyTracker(const std::string &modeIn)
-
class DirtyTrackingRecord
Wrapper around the actual bookkeeping behind dirty tracking. This is deliberately not thread-safe as it will only ever be used in TLS for signal handlers.
Subclassed by faabric::util::ThreadSafeDirtyTrackingRecord
Public Functions
-
DirtyTrackingRecord() = default
-
inline virtual void trackRegion(std::span<uint8_t> region)
-
inline virtual void markPage(void *addr)
-
inline virtual bool isInitialised()
-
inline virtual int getNPages()
-
inline virtual std::vector<char> getDirtyFlags()
-
inline virtual void reset()
-
DirtyTrackingRecord() = default
-
struct ExecGraph
Public Members
-
ExecGraphNode rootNode
-
ExecGraphNode rootNode
-
struct ExecGraphNode
-
class ExecGraphNodeNotFoundException : public faabric::util::FaabricException
Public Functions
-
inline explicit ExecGraphNodeNotFoundException(std::string message)
-
inline explicit ExecGraphNodeNotFoundException(std::string message)
-
class FaabricCpuSet
Public Functions
-
FaabricCpuSet(int cpuIdxIn = NO_CPU_IDX)
-
FaabricCpuSet &operator=(const FaabricCpuSet&) = delete
-
FaabricCpuSet(const FaabricCpuSet&) = delete
-
~FaabricCpuSet()
-
inline cpu_set_t *get()
-
FaabricCpuSet(int cpuIdxIn = NO_CPU_IDX)
-
class FaabricException : public exception
Subclassed by faabric::executor::ChainedCallException, faabric::executor::ExecutorContextException, faabric::redis::RedisNoResponseException, faabric::transport::MessageTimeoutException, faabric::util::ChainedCallFailedException, faabric::util::ExecGraphNodeNotFoundException, faabric::util::FunctionFrozenException, faabric::util::FunctionMigratedException, faabric::util::JsonSerialisationException, faabric::util::QueueTimeoutException
Public Functions
-
inline explicit FaabricException(std::string message)
-
inline const char *what() const noexcept override
-
inline explicit FaabricException(std::string message)
-
template<typename T>
class FixedCapacityQueue Public Functions
-
inline FixedCapacityQueue(int capacity)
-
inline FixedCapacityQueue()
-
inline void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
-
inline void dequeueIfPresent(T *res)
-
inline T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
-
inline T *peek(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
-
inline void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
-
inline long size()
-
inline void reset()
-
inline FixedCapacityQueue(int capacity)
-
class FlagWaiter : public std::enable_shared_from_this<FlagWaiter>
Public Functions
-
FlagWaiter(int timeoutMsIn = DEFAULT_FLAG_WAIT_MS)
-
void waitOnFlag()
-
void setFlag(bool value)
-
FlagWaiter(int timeoutMsIn = DEFAULT_FLAG_WAIT_MS)
-
class FreeCpus
Public Functions
-
inline FreeCpus()
Public Members
-
std::vector<std::unique_ptr<std::atomic<bool>>> cpuVec
-
inline FreeCpus()
-
class FunctionFrozenException : public faabric::util::FaabricException
Public Functions
-
inline explicit FunctionFrozenException(std::string message)
-
inline explicit FunctionFrozenException(std::string message)
-
class FunctionMigratedException : public faabric::util::FaabricException
Public Functions
-
inline explicit FunctionMigratedException(std::string message)
-
inline explicit FunctionMigratedException(std::string message)
-
class JsonSerialisationException : public faabric::util::FaabricException
Public Functions
-
inline explicit JsonSerialisationException(std::string message)
-
inline explicit JsonSerialisationException(std::string message)
-
class Latch : public std::enable_shared_from_this<Latch>
Public Functions
-
explicit Latch(int countIn, int timeoutMsIn)
-
void wait()
Public Static Functions
-
static std::shared_ptr<Latch> create(int count, int timeoutMs = DEFAULT_LATCH_TIMEOUT_MS)
-
explicit Latch(int countIn, int timeoutMsIn)
-
class NoneDirtyTracker : public faabric::util::DirtyTracker
Public Functions
-
NoneDirtyTracker(const std::string &modeIn)
-
virtual void clearAll() override
-
inline virtual std::string getType() override
-
virtual void startTracking(std::span<uint8_t> region) override
-
virtual void stopTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
-
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
-
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
-
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override
-
NoneDirtyTracker(const std::string &modeIn)
-
class PeriodicBackgroundThread
- #include <PeriodicBackgroundThread.h>
Wrapper around periodic background thread that repeatedly does some arbitrary work after a given interval.
Subclassed by faabric::planner::KeepAliveThread, faabric::scheduler::SchedulerReaperThread
Public Functions
-
void start(int intervalSecondsIn)
Start the background thread with the given wake-up interval in seconds.
-
void stop()
Stop and wait for this thread to finish.
-
virtual void doWork() = 0
-
virtual void tidyUp()
-
inline int getIntervalSeconds()
-
void start(int intervalSecondsIn)
-
template<typename T>
class Queue
-
class QueueTimeoutException : public faabric::util::FaabricException
Public Functions
-
inline explicit QueueTimeoutException(std::string message)
-
inline explicit QueueTimeoutException(std::string message)
-
class SegfaultDirtyTracker : public faabric::util::DirtyTracker
Public Functions
-
SegfaultDirtyTracker(const std::string &modeIn)
-
virtual void clearAll() override
-
inline virtual std::string getType() override
-
virtual void startTracking(std::span<uint8_t> region) override
-
virtual void stopTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
-
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
-
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
-
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override
Public Static Functions
-
static void handler(int sig, siginfo_t *info, void *ucontext) noexcept
-
SegfaultDirtyTracker(const std::string &modeIn)
-
class SnapshotData
Public Functions
-
SnapshotData() = default
-
explicit SnapshotData(size_t sizeIn)
-
SnapshotData(size_t sizeIn, size_t maxSizeIn)
-
explicit SnapshotData(std::span<const uint8_t> dataIn)
-
SnapshotData(std::span<const uint8_t> dataIn, size_t maxSizeIn)
-
SnapshotData(const SnapshotData&) = delete
-
SnapshotData &operator=(const SnapshotData&) = delete
-
~SnapshotData()
-
void copyInData(std::span<const uint8_t> buffer, uint32_t offset = 0)
-
const uint8_t *getDataPtr(uint32_t offset = 0)
-
std::vector<uint8_t> getDataCopy()
-
std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize)
-
void mapToMemory(std::span<uint8_t> target)
-
void addMergeRegion(uint32_t offset, size_t length, SnapshotDataType dataType, SnapshotMergeOperation operation)
-
void fillGapsWithBytewiseRegions()
-
void clearMergeRegions()
-
std::vector<SnapshotMergeRegion> getMergeRegions()
-
size_t getQueuedDiffsCount()
-
void applyDiffs(const std::vector<SnapshotDiff> &diffs)
-
void applyDiff(const SnapshotDiff &diff)
-
void queueDiffs(const std::vector<SnapshotDiff> &diffs)
-
int writeQueuedDiffs()
-
inline size_t getSize() const
-
inline size_t getMaxSize() const
-
std::vector<SnapshotDiff> getTrackedChanges()
-
void clearTrackedChanges()
-
std::vector<faabric::util::SnapshotDiff> diffWithDirtyRegions(std::span<uint8_t> updated, const std::vector<char> &dirtyRegions)
-
SnapshotData() = default
-
class SnapshotDiff
- #include <snapshot.h>
Represents a modification to a snapshot (a.k.a. a dirty region). Specifies the modified data, as well as its offset within the snapshot.
Each diff does not own the data, it just provides a span pointing at the original data.
Public Functions
-
SnapshotDiff() = default
-
SnapshotDiff(SnapshotDataType dataTypeIn, SnapshotMergeOperation operationIn, uint32_t offsetIn, std::span<const uint8_t> dataIn)
-
inline SnapshotDataType getDataType() const
-
inline SnapshotMergeOperation getOperation() const
-
inline uint32_t getOffset() const
-
inline std::span<const uint8_t> getData() const
-
std::vector<uint8_t> getDataCopy() const
-
SnapshotDiff() = default
-
class SnapshotMergeRegion
- #include <snapshot.h>
Defines how diffs in the given snapshot region should be interpreted wrt the original snapshot.
For example, it may specify that the change should be treated as an integer that needs to be summed (i.e. the diff is the current value minus the original), or just as a region of raw bytes that needs to be XORed with the original.
A merge region specifies an offset, length, data type and operation, e.g. an integer to be summed at offset 100 with a length of 4.
Public Functions
-
SnapshotMergeRegion() = default
-
SnapshotMergeRegion(uint32_t offsetIn, size_t lengthIn, SnapshotDataType dataTypeIn, SnapshotMergeOperation operationIn)
-
void addDiffs(std::vector<SnapshotDiff> &diffs, std::span<const uint8_t> originalData, std::span<uint8_t> updatedData, const std::vector<char> &dirtyRegions)
-
inline bool operator<(const SnapshotMergeRegion &other) const
This allows us to sort the merge regions which is important for diffing purposes.
-
inline bool operator==(const SnapshotMergeRegion &other) const
Public Members
-
uint32_t offset = 0
-
size_t length = 0
-
SnapshotDataType dataType = SnapshotDataType::Raw
-
SnapshotMergeOperation operation = SnapshotMergeOperation::Bytewise
-
SnapshotMergeRegion() = default
-
class SoftPTEDirtyTracker : public faabric::util::DirtyTracker
Public Functions
-
SoftPTEDirtyTracker(const std::string &modeIn)
-
~SoftPTEDirtyTracker()
-
virtual void clearAll() override
-
inline virtual std::string getType() override
-
virtual void startTracking(std::span<uint8_t> region) override
-
virtual void stopTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
-
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
-
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
-
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override
-
SoftPTEDirtyTracker(const std::string &modeIn)
-
template<typename T>
class SpinLockQueue
-
class SystemConfig
Public Functions
-
SystemConfig()
-
void print()
-
void reset()
Public Members
-
std::string serialisation
-
std::string logLevel
-
std::string logFile
-
std::string stateMode
-
std::string deltaSnapshotEncoding
-
std::string redisStateHost
-
std::string redisQueueHost
-
std::string redisPort
-
int overrideCpuCount
-
int overrideFreeCpuStart
-
std::string batchSchedulerMode
-
int globalMessageTimeout
-
int boundTimeout
-
int reaperIntervalSeconds
-
int defaultMpiWorldSize
-
std::string endpointInterface
-
std::string endpointHost
-
int endpointPort
-
int endpointNumThreads
-
int functionServerThreads
-
int stateServerThreads
-
int snapshotServerThreads
-
int pointToPointServerThreads
-
std::string dirtyTrackingMode
-
std::string diffingMode
-
std::string plannerHost
-
int plannerPort
-
SystemConfig()
-
class ThreadSafeDirtyTrackingRecord : public faabric::util::DirtyTrackingRecord
Thread-safe wrapper around dirty tracking data, necessary for use with the background event thread in the userfaultfd tracker. Although the logic around the interaction between the event thread and the main thread should in theory protect against concurrent accesses, we need this to keep TSan happy.
Public Functions
-
ThreadSafeDirtyTrackingRecord() = default
-
inline virtual void trackRegion(std::span<uint8_t> region) override
-
inline virtual void markPage(void *addr) override
-
inline virtual std::vector<char> getDirtyFlags() override
-
inline virtual bool isInitialised() override
-
inline virtual int getNPages() override
-
inline virtual void reset() override
-
ThreadSafeDirtyTrackingRecord() = default
-
class TokenPool
Public Functions
-
explicit TokenPool(int nTokens)
-
int getToken()
Blocking call to get an available token
-
void releaseToken(int token)
-
void reset()
-
int size()
-
int taken()
-
int free()
-
explicit TokenPool(int nTokens)
-
class UffdDirtyTracker : public faabric::util::DirtyTracker
- #include <dirty.h>
Dirty tracking implementation using userfaultfd to write-protect pages, then handle the resulting userspace events when they are written to.
The dirty tracking mode can be one of four options:
uffd - uses the
SIGBUShandler to catch events triggered by accessing missing pages in demand-zero paged memory.uffd-wp - same as
uffdbut adds write-protected events to catch subsequent writes to write-protected pages.uffd-thread - same as
uffd, but using a background event thread to handle events. This has the benefit of distinguishing between read and write missing page events.uffd-thread-wp - same as
uffd-thread, but adds write-protected events.
See the docs for more info on these different approaches: https://www.kernel.org/doc/html/latest/admin-guide/mm/userfaultfd.html
Public Functions
-
UffdDirtyTracker(const std::string &modeIn)
-
~UffdDirtyTracker()
-
virtual void clearAll() override
-
inline virtual std::string getType() override
-
virtual void startTracking(std::span<uint8_t> region) override
-
virtual void stopTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getDirtyPages(std::span<uint8_t> region) override
-
virtual void startThreadLocalTracking(std::span<uint8_t> region) override
-
virtual void stopThreadLocalTracking(std::span<uint8_t> region) override
-
virtual std::vector<char> getThreadLocalDirtyPages(std::span<uint8_t> region) override
-
virtual std::vector<char> getBothDirtyPages(std::span<uint8_t> region) override
Public Static Functions
-
static void sigbusHandler(int sig, siginfo_t *info, void *ucontext) noexcept
-
namespace detail
-
using BeastHttpRequest = beast::http::request<beast::http::string_body>
-
namespace batch_scheduler