From 8b5bd8ec8b56fd0cc2c16c571a5a904255da1bab Mon Sep 17 00:00:00 2001 From: Carlos Date: Mon, 6 May 2024 17:36:00 -0400 Subject: [PATCH] scheduler: tag executors with app id (#434) Thus far, we tagged executors with the (user, function) pair. This is needed in multi-threaded environments, where different threads must share the same executor. However, using only the (user, function) pair prevents the execution concurrent applications with the same pair. Instead, the correct thing to do is to track applications using the (user, function) pair in conjunction with the application id. --- src/scheduler/Scheduler.cpp | 11 ++++++----- src/util/func.cpp | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index b9375b88a..5c16c6666 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -243,7 +243,7 @@ int Scheduler::reapStaleExecutors() long Scheduler::getFunctionExecutorCount(const faabric::Message& msg) { faabric::util::SharedLock lock(mx); - const std::string funcStr = faabric::util::funcToString(msg, false); + const std::string funcStr = faabric::util::funcToString(msg, true); return executors[funcStr].size(); } @@ -252,7 +252,8 @@ void Scheduler::executeBatch(std::shared_ptr req) faabric::util::FullLock lock(mx); bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - auto funcStr = faabric::util::funcToString(req); + auto funcStr = faabric::util::funcToString(req->messages(0), true); + int nMessages = req->messages_size(); // Records for tests - copy messages before execution to avoid races @@ -265,8 +266,8 @@ void Scheduler::executeBatch(std::shared_ptr req) // For threads we only need one executor, for anything else we want // one Executor per function in flight. if (isThreads) { - // Threads use the existing executor. We assume there's only - // one running at a time. + // Threads use the existing executor. There must only be one at a time, + // thus we use a function string that includes the app id std::vector>& thisExecutors = executors[funcStr]; @@ -335,7 +336,7 @@ std::shared_ptr Scheduler::claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock) { - std::string funcStr = faabric::util::funcToString(msg, false); + std::string funcStr = faabric::util::funcToString(msg, true); std::vector>& thisExecutors = executors[funcStr]; diff --git a/src/util/func.cpp b/src/util/func.cpp index c3c73bb72..04788e8cd 100644 --- a/src/util/func.cpp +++ b/src/util/func.cpp @@ -24,7 +24,7 @@ std::string funcToString(const faabric::Message& msg, bool includeId) std::string str = msg.user() + "/" + msg.function(); if (includeId) { - str += ":" + std::to_string(msg.id()); + str += ":" + std::to_string(msg.appid()); } return str;