Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-factor SchedulingDecisions #342

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ add_library(faabric
$<TARGET_OBJECTS:redis_obj>
$<TARGET_OBJECTS:runner_obj>
$<TARGET_OBJECTS:scheduler_obj>
$<TARGET_OBJECTS:scheduling_util_obj>
$<TARGET_OBJECTS:snapshot_obj>
$<TARGET_OBJECTS:state_obj>
$<TARGET_OBJECTS:transport_obj>
Expand Down
18 changes: 7 additions & 11 deletions include/faabric/batch-scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/util/batch.h>
#include <faabric/util/scheduling.h>

#include <string>

#define DO_NOT_MIGRATE -98
#define DO_NOT_MIGRATE_DECISION \
faabric::util::SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
#define NOT_ENOUGH_SLOTS -99
#define NOT_ENOUGH_SLOTS_DECISION \
faabric::util::SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS)
SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS)

namespace faabric::batch_scheduler {

// TODO: move BatchExecuteRequest here

// TODO: move SchedulingDecision here?
typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
std::shared_ptr<SchedulingDecision>>
InFlightPair;

typedef std::map<int32_t, InFlightPair> InFlightReqs;
Expand Down Expand Up @@ -72,8 +69,7 @@ class BatchScheduler
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req);

virtual std::shared_ptr<faabric::util::SchedulingDecision>
makeSchedulingDecision(
virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req) = 0;
Expand Down Expand Up @@ -111,8 +107,8 @@ class BatchScheduler
// ----------

virtual bool isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB) = 0;
std::shared_ptr<SchedulingDecision> decisionA,
std::shared_ptr<SchedulingDecision> decisionB) = 0;

virtual std::vector<Host> getSortedHosts(
const HostMap& hostMap,
Expand Down
8 changes: 4 additions & 4 deletions include/faabric/batch-scheduler/BinPackScheduler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
#pragma once

#include <faabric/batch-scheduler/BatchScheduler.h>
#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/util/batch.h>
#include <faabric/util/scheduling.h>
#include <string>

namespace faabric::batch_scheduler {

class BinPackScheduler final : public BatchScheduler
{
public:
std::shared_ptr<faabric::util::SchedulingDecision> makeSchedulingDecision(
std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req) override;

private:
bool isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB) override;
std::shared_ptr<SchedulingDecision> decisionA,
std::shared_ptr<SchedulingDecision> decisionB) override;

std::vector<Host> getSortedHosts(
const HostMap& hostMap,
Expand Down
52 changes: 52 additions & 0 deletions include/faabric/batch-scheduler/DecisionCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>

#include <string>
#include <vector>

namespace faabric::batch_scheduler {
/**
* A record of a decision already taken for the given size of batch request
* for the given function. This doesn't contain the messages themselves,
* just the hosts and group ID that was used.
*/
class CachedDecision
{
public:
CachedDecision(const std::vector<std::string>& hostsIn, int groupIdIn);

std::vector<std::string> getHosts() { return hosts; }

int getGroupId() const { return groupId; }

private:
std::vector<std::string> hosts;
int groupId = 0;
};

/**
* Repository for cached scheduling decisions. Object is not thread safe as we
* assume only a single executor will be caching decisions for a given function
* and size of batch request on one host at a time.
*/
class DecisionCache
{
public:
std::shared_ptr<CachedDecision> getCachedDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

void addCachedDecision(std::shared_ptr<BatchExecuteRequest> req,
SchedulingDecision& decision);

void clear();

private:
std::string getCacheKey(std::shared_ptr<BatchExecuteRequest> req);

std::unordered_map<std::string, std::shared_ptr<CachedDecision>>
cachedDecisions;
};

DecisionCache& getSchedulingDecisionCache();
}
Original file line number Diff line number Diff line change
@@ -1,58 +1,8 @@
#pragma once

#include <cstdint>
#include <string>
#include <unordered_map>
#include <vector>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/locks.h>

namespace faabric::util {

class SchedulingDecision
{
public:
static SchedulingDecision fromPointToPointMappings(
faabric::PointToPointMappings& mappings);

SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn);

uint32_t appId = 0;

int32_t groupId = 0;

int32_t nFunctions = 0;

std::vector<std::string> hosts;

std::vector<int32_t> messageIds;

std::vector<int32_t> appIdxs;

std::vector<int32_t> groupIdxs;

std::string returnHost;

/**
* Work out if this decision is all on this host. If the decision is
* completely on *another* host, we still count it as not being on a single
* host, as this host will be the master.
*
* Will always return false if single host optimisations are switched off.
*/
bool isSingleHost();

void addMessage(const std::string& host, const faabric::Message& msg);

void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);

void addMessage(const std::string& host,
int32_t messageId,
int32_t appIdx,
int32_t groupIdx);
};

namespace faabric::batch_scheduler {
// Scheduling topology hints help the scheduler decide which host to assign new
// requests in a batch.
// - NONE: bin-packs requests to slots in hosts starting from the master
Expand Down Expand Up @@ -92,59 +42,60 @@ const std::unordered_map<SchedulingTopologyHint, std::string>
{ SchedulingTopologyHint::UNDERFULL, "UNDERFULL" },
};

/**
* A record of a decision already taken for the given size of batch request
* for the given function. This doesn't contain the messages themselves,
* just the hosts and group ID that was used.
*/
class CachedDecision
// TODO(planner-schedule): remove these strategies
// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
};

class SchedulingDecision
{
public:
CachedDecision(const std::vector<std::string>& hostsIn, int groupIdIn);
static SchedulingDecision fromPointToPointMappings(
faabric::PointToPointMappings& mappings);

std::vector<std::string> getHosts() { return hosts; }
SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn);

int getGroupId() const { return groupId; }
uint32_t appId = 0;

int32_t groupId = 0;

int32_t nFunctions = 0;

private:
std::vector<std::string> hosts;
int groupId = 0;
};

/**
* Repository for cached scheduling decisions. Object is not thread safe as we
* assume only a single executor will be caching decisions for a given function
* and size of batch request on one host at a time.
*/
class DecisionCache
{
public:
std::shared_ptr<CachedDecision> getCachedDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);
std::vector<int32_t> messageIds;

void addCachedDecision(std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision);
std::vector<int32_t> appIdxs;

void clear();
std::vector<int32_t> groupIdxs;

private:
std::string getCacheKey(std::shared_ptr<faabric::BatchExecuteRequest> req);
std::string returnHost;

std::unordered_map<std::string, std::shared_ptr<CachedDecision>>
cachedDecisions;
};
/**
* Work out if this decision is all on this host. If the decision is
* completely on *another* host, we still count it as not being on a single
* host, as this host will be the master.
*
* Will always return false if single host optimisations are switched off.
*/
bool isSingleHost();

DecisionCache& getSchedulingDecisionCache();
void addMessage(const std::string& host, const faabric::Message& msg);

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);

void addMessage(const std::string& host,
int32_t messageId,
int32_t appIdx,
int32_t groupIdx);
};

}
32 changes: 16 additions & 16 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/planner/PlannerClient.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallClient.h>
Expand All @@ -14,7 +15,6 @@
#include <faabric/util/dirty.h>
#include <faabric/util/memory.h>
#include <faabric/util/queue.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

Expand All @@ -26,7 +26,7 @@
namespace faabric::scheduler {

typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision>>
InFlightPair;

class Scheduler;
Expand Down Expand Up @@ -186,17 +186,17 @@ class Scheduler

~Scheduler();

faabric::util::SchedulingDecision makeSchedulingDecision(
faabric::batch_scheduler::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint =
faabric::util::SchedulingTopologyHint::NONE);
faabric::batch_scheduler::SchedulingTopologyHint topologyHint =
faabric::batch_scheduler::SchedulingTopologyHint::NONE);

faabric::util::SchedulingDecision callFunctions(
faabric::batch_scheduler::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req);

faabric::util::SchedulingDecision callFunctions(
faabric::batch_scheduler::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& hint);
faabric::batch_scheduler::SchedulingDecision& hint);

void reset();

Expand Down Expand Up @@ -351,15 +351,15 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

faabric::util::SchedulingDecision doSchedulingDecision(
faabric::batch_scheduler::SchedulingDecision doSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint);
faabric::batch_scheduler::SchedulingTopologyHint topologyHint);

faabric::util::SchedulingDecision doCallFunctions(
faabric::batch_scheduler::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
faabric::batch_scheduler::SchedulingDecision& decision,
faabric::util::FullLock& lock,
faabric::util::SchedulingTopologyHint topologyHint);
faabric::batch_scheduler::SchedulingTopologyHint topologyHint);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
Expand All @@ -386,15 +386,15 @@ class Scheduler

std::vector<std::shared_ptr<faabric::PendingMigrations>>
doCheckForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);
faabric::batch_scheduler::MigrationStrategy migrationStrategy =
faabric::batch_scheduler::MigrationStrategy::BIN_PACK);

void broadcastPendingMigrations(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

void doStartFunctionMigrationThread(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision);
faabric::batch_scheduler::SchedulingDecision& decision);
};

}
Loading