From c0fff8d2032ce20312b02888476f17e1f91cdd53 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 18 Mar 2024 10:58:14 +0000 Subject: [PATCH] threads: elastically scale-up --- src/planner/Planner.cpp | 56 ++++++++++++++++++++++++++++++++++++++++- src/proto/faabric.proto | 3 +++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index 1eeff755f..c5b106cec 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -27,6 +27,14 @@ namespace faabric::planner { // Utility Functions // ---------------------- +static int availableSlots(std::shared_ptr host) +{ + int availableSlots = host->slots() - host->usedslots(); + assert(availableSlots >= 0); + + return availableSlots; +} + static void claimHostSlots(std::shared_ptr host, int slotsToClaim = 1) { host->set_usedslots(host->usedslots() + slotsToClaim); @@ -769,8 +777,46 @@ Planner::callBatch(std::shared_ptr req) // does not modify it auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIps); + bool isScaleChange = + decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE; bool isDistChange = decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE; + bool existsPreloadedDec = state.preloadedSchedulingDecisions.contains(appId); + + // For a SCALE_CHANGE decision (i.e. fork) with the elastic flag set, we + // want to scale up to as many available cores as possible in the app's + // main host (bypass this logic if we have pre-loaded a decision) + if (isScaleChange && req->elasticscalehint() && !existsPreloadedDec) { + SPDLOG_INFO("App {} requested to elastically scale-up", appId); + auto oldDec = state.inFlightReqs.at(appId).second; + auto mainHost = oldDec->hosts.at(0); + + int numAvail = availableSlots(state.hostMap.at(mainHost)); + int numRequested = req->messages_size(); + int lastMsgIdx = req->messages(numRequested - 1).groupidx(); + for (int itr = 0; itr < (numAvail - numRequested); itr++) { + // Differentiate between the position in the message array (itr) + // and the new group index. Usually, in a fork, they would be + // offset by one + int msgIdx = lastMsgIdx + itr + 1; + SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", msgIdx, appId); + + // To add a new message, copy from the last, and update the indexes + *req->add_messages() = req->messages(numRequested - 1); + req->mutable_messages(numRequested + itr)->set_appidx(msgIdx); + req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx); + + // Also update the message id to make sure we can wait-for and + // clean-up the resources we use + req->mutable_messages(numRequested + itr)->set_id(faabric::util::generateGid()); + } + + if (numAvail > numRequested) { + SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})", appId, numRequested, numAvail); + } else { + SPDLOG_INFO("Decided NOT to elastically scaled-up app {}", appId); + } + } // For a DIST_CHANGE decision (i.e. migration) we want to try to imrpove // on the old decision (we don't care the one we send), so we make sure @@ -797,8 +843,14 @@ Planner::callBatch(std::shared_ptr req) // (e.g. if we want to force a migration). Note that we don't want to check // pre-loaded decisions for dist-change requests std::shared_ptr decision = nullptr; - if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) { + if (!isDistChange && existsPreloadedDec) { decision = getPreloadedSchedulingDecision(appId, req); + + // In general, after a scale change decision (that has been preloaded) + // it is safe to remove it + if (isScaleChange) { + state.preloadedSchedulingDecisions.erase(appId); + } } else if (isNew && isMpi) { mpiReq = std::make_shared(); *mpiReq = *req; @@ -1139,6 +1191,8 @@ void Planner::dispatchSchedulingDecision( hostRequests[thisHost]->set_singlehost(isSingleHost); // Propagate the single host hint hostRequests[thisHost]->set_singlehosthint(req->singlehosthint()); + // Propagate the elastic scaling hint + hostRequests[thisHost]->set_elasticscalehint(req->elasticscalehint()); } *hostRequests[thisHost]->add_messages() = msg; diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 490e1cca2..56abf85dd 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -54,6 +54,9 @@ message BatchExecuteRequest { // Hint set by the user to hint that this execution should all be in a // single host bool singleHostHint = 11; + + // Hint set by the user to make scale-up requests elastic + bool elasticScaleHint = 12; } message BatchExecuteRequestStatus {