From 41f7677fee86f2475826c3e7a834bf9589e1975e Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Wed, 26 Jul 2023 18:05:35 +0200 Subject: [PATCH 01/10] Changes in worker and task handler --- .../header/serializer/ArgsSerializer.h | 1 - .../header/serializer/Serializer.h | 1 - .../ArmoniK.Api.Worker.Tests/source/main.cpp | 41 +++++++++++-------- .../header/Worker/ArmoniKWorker.h | 13 ++++-- .../header/Worker/TaskHandler.h | 2 +- .../header/utils/WorkerServer.h | 2 +- .../source/Worker/ArmoniKWorker.cpp | 6 +-- .../source/Worker/TaskHandler.cpp | 5 +-- 8 files changed, 39 insertions(+), 32 deletions(-) delete mode 100644 packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h delete mode 100644 packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h diff --git a/packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h b/packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h deleted file mode 100644 index 6f70f09be..000000000 --- a/packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h +++ /dev/null @@ -1 +0,0 @@ -#pragma once diff --git a/packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h b/packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h deleted file mode 100644 index 6f70f09be..000000000 --- a/packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h +++ /dev/null @@ -1 +0,0 @@ -#pragma once diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index 205b5eec0..1295391da 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -26,26 +26,33 @@ using armonik::api::grpc::v1::TaskOptions; using namespace armonik::api::grpc::v1::worker; using namespace ArmoniK::Api::Common::utils; -ArmoniK::Api::Worker::ProcessStatus computer(ArmoniK::Api::Worker::TaskHandler &handler) { - std::cout << "Call computer" << std::endl; - std::cout << "SizePayload : " << handler.getPayload().size() << "\nSize DD : " << handler.getDataDependencies().size() - << "\n Expected results : " << handler.getExpectedResults().size() << std::endl; - - try { - if (!handler.getExpectedResults().empty()) { - auto res = handler.send_result(handler.getExpectedResults()[0], handler.getPayload()).get(); - if (res.has_error()) { - throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error()); +class Computer final : public ArmoniK::Api::Worker::ArmoniKWorker { +public: + explicit Computer(std::unique_ptr agent) + : ArmoniKWorker(std::move(agent)) {} + + ArmoniK::Api::Worker::ProcessStatus Execute(ArmoniK::Api::Worker::TaskHandler &taskHandler) override { + std::cout << "Call computer" << std::endl; + std::cout << "SizePayload : " << taskHandler.getPayload().size() + << "\nSize DD : " << taskHandler.getDataDependencies().size() + << "\n Expected results : " << taskHandler.getExpectedResults().size() << std::endl; + + try { + if (!taskHandler.getExpectedResults().empty()) { + auto res = taskHandler.send_result(taskHandler.getExpectedResults()[0], taskHandler.getPayload()).get(); + if (res.has_error()) { + throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error()); + } } + + } catch (const std::exception &e) { + std::cout << "Error sending result " << e.what() << std::endl; + return ArmoniK::Api::Worker::ProcessStatus(e.what()); } - } catch (const std::exception &e) { - std::cout << "Error sending result " << e.what() << std::endl; - return ArmoniK::Api::Worker::ProcessStatus(e.what()); + return ArmoniK::Api::Worker::ProcessStatus::OK; } - - return ArmoniK::Api::Worker::ProcessStatus::OK; -} +}; int main(int argc, char **argv) { std::cout << "Starting C++ worker..." << std::endl; @@ -56,7 +63,7 @@ int main(int argc, char **argv) { config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); try { - ArmoniK::Api::Worker::WorkerServer::create(config, &computer)->run(); + ArmoniK::Api::Worker::WorkerServer::create(config)->run(); } catch (const std::exception &e) { std::cout << "Error in worker" << e.what() << std::endl; } diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index 1ac7b9fe2..0e1da2efd 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -17,18 +17,16 @@ namespace API_WORKER_NAMESPACE { -class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { +class ArmoniKWorker : public armonik::api::grpc::v1::worker::Worker::Service { private: ArmoniK::Api::Common::serilog::serilog logger_; std::unique_ptr agent_; - std::function processing_function_; public: /** * @brief Constructs a ArmoniKWorker object. */ - ArmoniKWorker(std::unique_ptr agent, - std::function processing_function); + ArmoniKWorker(std::unique_ptr agent); /** * @brief Implements the Process method of the Worker service. @@ -43,6 +41,13 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Servi ::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, ::armonik::api::grpc::v1::worker::ProcessReply *response) override; + /** + * @brief Function which does the actual work + * @param taskHandler Task handler + * @return Process status + */ + virtual ProcessStatus Execute(TaskHandler &taskHandler) = 0; + /** * @brief Implements the HealthCheck method of the Worker service. * diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 5d5b8be5c..3eb893b01 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -88,7 +88,7 @@ class TaskHandler { * @param data The result data * @return A future containing a vector of ResultReply */ - std::future send_result(const std::string &key, const std::string &data); + std::future send_result(const std::string &key, std::string_view data); /** * @brief Get the result ids object diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index c065a000d..e3e12228d 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -56,7 +56,7 @@ class WorkerServer { * @return A shared pointer to the created WorkerServer instance */ template - static std::shared_ptr create(const std::shared_ptr configuration, + static std::shared_ptr create(const std::shared_ptr &configuration, Args... args) { configuration->add_json_configuration("appsettings.json").add_env_configuration(); auto worker_server = std::make_shared(configuration); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 34a356b90..e4f63642c 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -29,14 +29,12 @@ using namespace ArmoniK::Api::Common::utils; /** * @brief Constructs a ArmoniKWorker object. */ -API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, - std::function processing_function) +API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent) : logger_(ArmoniK::Api::Common::serilog::logging_format::SEQ) { logger_.info("Build Service ArmoniKWorker"); logger_.add_property("class", "ArmoniKWorker"); logger_.add_property("Worker", "ArmoniK.Api.Cpp"); agent_ = std::move(agent); - processing_function_ = std::move(processing_function); } /** @@ -62,7 +60,7 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process([[maybe_unused]] ::grpc::Ser task_handler.init(); - ProcessStatus status = processing_function_(task_handler); + ProcessStatus status = Execute(task_handler); logger_.debug("Finish call C++"); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index 55ebe96b5..b625667bb 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -289,8 +289,7 @@ API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, * @param data The result data * @return A future containing a vector of ResultReply */ -std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const std::string &key, - const std::string &data) { +std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const std::string &key, std::string_view data) { return std::async(std::launch::async, [this, key, data]() { grpc::ClientContext context_client_writer; @@ -315,7 +314,7 @@ std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const st msg.set_communication_token(token_); auto chunk = msg.mutable_data(); chunk->mutable_data()->resize(chunkSize); - std::memcpy(chunk->mutable_data()->data(), data.c_str() + start, chunkSize); + std::memcpy(chunk->mutable_data()->data(), data.data() + start, chunkSize); stream->Write(msg); From 0576d69fe08b196fe16765407c769688cf33d816 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Thu, 27 Jul 2023 11:45:09 +0200 Subject: [PATCH 02/10] no copy const taskhandler --- .../header/Worker/ProcessStatus.h | 4 ++-- .../header/Worker/TaskHandler.h | 16 +++++++-------- .../source/Worker/ProcessStatus.cpp | 4 ++-- .../source/Worker/TaskHandler.cpp | 20 +++++++++++-------- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h index e1e666b7b..081584ce8 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h @@ -14,8 +14,8 @@ class ProcessStatus { [[nodiscard]] bool ok() const { return ok_; } [[nodiscard]] const std::string &details() const { return details_; } - static const ProcessStatus OK; - static const ProcessStatus ERROR; + static const ProcessStatus PROCESS_OK; + static const ProcessStatus PROCESS_ERROR; private: explicit ProcessStatus(bool ok, std::string error_message = "") { diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 3eb893b01..e161917ce 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -25,7 +25,7 @@ class TaskHandler { std::string session_id_; std::string task_id_; armonik::api::grpc::v1::TaskOptions task_options_; - google::protobuf::RepeatedPtrField expected_result_; + std::vector expected_result_; std::string payload_; std::map data_dependencies_; std::string token_; @@ -104,47 +104,47 @@ class TaskHandler { * * @return std::string */ - std::string getSessionId(); + const std::string &getSessionId() const; /** * @brief Get the Task Id object * * @return std::string */ - std::string getTaskId(); + const std::string &getTaskId() const; /** * @brief Get the Payload object * * @return std::vector */ - std::string getPayload(); + const std::string &getPayload() const; /** * @brief Get the Data Dependencies object * * @return std::vector */ - std::map getDataDependencies(); + const std::map &getDataDependencies() const; /** * @brief Get the Task Options object * * @return armonik::api::grpc::v1::TaskOptions */ - armonik::api::grpc::v1::TaskOptions getTaskOptions(); + const armonik::api::grpc::v1::TaskOptions &getTaskOptions() const; /** * @brief Get the Expected Results object * * @return google::protobuf::RepeatedPtrField */ - google::protobuf::RepeatedPtrField getExpectedResults(); + const std::vector &getExpectedResults() const; /** * @brief Get the Configuration object * * @return armonik::api::grpc::v1::Configuration */ - armonik::api::grpc::v1::Configuration getConfiguration(); + const armonik::api::grpc::v1::Configuration &getConfiguration() const; }; } // namespace API_WORKER_NAMESPACE diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp index 413cf79ae..9ced5a9be 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp @@ -1,4 +1,4 @@ #include "Worker/ProcessStatus.h" -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::OK; -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::ERROR(false); \ No newline at end of file +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::PROCESS_OK; +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::PROCESS_ERROR(false); \ No newline at end of file diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index b625667bb..e456301f5 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -52,7 +52,7 @@ void API_WORKER_NAMESPACE::TaskHandler::init() { session_id_ = init_request.session_id(); task_id_ = init_request.task_id(); task_options_ = init_request.task_options(); - expected_result_ = init_request.expected_output_keys(); + expected_result_.assign(init_request.expected_output_keys().begin(), init_request.expected_output_keys().end()); token_ = Request.communication_token(); config_ = init_request.configuration(); @@ -372,28 +372,28 @@ API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector */ -std::string API_WORKER_NAMESPACE::TaskHandler::getPayload() { return payload_; } +const std::string &API_WORKER_NAMESPACE::TaskHandler::getPayload() const { return payload_; } /** * @brief Get the Data Dependencies object * * @return std::vector */ -std::map API_WORKER_NAMESPACE::TaskHandler::getDataDependencies() { +const std::map &API_WORKER_NAMESPACE::TaskHandler::getDataDependencies() const { return data_dependencies_; } @@ -402,14 +402,16 @@ std::map API_WORKER_NAMESPACE::TaskHandler::getDataDep * * @return armonik::api::grpc::v1::TaskOptions */ -armonik::api::grpc::v1::TaskOptions API_WORKER_NAMESPACE::TaskHandler::getTaskOptions() { return task_options_; } +const armonik::api::grpc::v1::TaskOptions &API_WORKER_NAMESPACE::TaskHandler::getTaskOptions() const { + return task_options_; +} /** * @brief Get the Expected Results object * * @return google::protobuf::RepeatedPtrField */ -google::protobuf::RepeatedPtrField API_WORKER_NAMESPACE::TaskHandler::getExpectedResults() { +const std::vector &API_WORKER_NAMESPACE::TaskHandler::getExpectedResults() const { return expected_result_; } @@ -418,4 +420,6 @@ google::protobuf::RepeatedPtrField API_WORKER_NAMESPACE::TaskHandle * * @return armonik::api::grpc::v1::Configuration */ -armonik::api::grpc::v1::Configuration API_WORKER_NAMESPACE::TaskHandler::getConfiguration() { return config_; } \ No newline at end of file +const armonik::api::grpc::v1::Configuration &API_WORKER_NAMESPACE::TaskHandler::getConfiguration() const { + return config_; +} \ No newline at end of file From e5a7a366ed072a96cc83769a0c45829536e41c68 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Thu, 27 Jul 2023 14:45:00 +0200 Subject: [PATCH 03/10] Corrections from PR --- .../header/Worker/ProcessStatus.h | 12 ++++++++++-- .../source/Worker/ProcessStatus.cpp | 4 ++-- .../source/Worker/TaskHandler.cpp | 16 +++++++++------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h index 081584ce8..9a1908949 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h @@ -13,9 +13,17 @@ class ProcessStatus { [[nodiscard]] bool ok() const { return ok_; } [[nodiscard]] const std::string &details() const { return details_; } + void set_ok() { + ok_ = true; + details_.clear(); + } + void set_error(const std::string &details) { + ok_ = false; + details_ = details; + } - static const ProcessStatus PROCESS_OK; - static const ProcessStatus PROCESS_ERROR; + static const ProcessStatus Ok; + static const ProcessStatus Error; private: explicit ProcessStatus(bool ok, std::string error_message = "") { diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp index 9ced5a9be..0ce75614b 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp @@ -1,4 +1,4 @@ #include "Worker/ProcessStatus.h" -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::PROCESS_OK; -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::PROCESS_ERROR(false); \ No newline at end of file +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Ok; +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Error(false); \ No newline at end of file diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index e456301f5..0ea911200 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -48,17 +48,19 @@ void API_WORKER_NAMESPACE::TaskHandler::init() { if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitRequest) { throw std::runtime_error("Expected a Compute request type with InitRequest to start the stream."); } - auto &init_request = Request.compute().init_request(); - session_id_ = init_request.session_id(); - task_id_ = init_request.task_id(); - task_options_ = init_request.task_options(); - expected_result_.assign(init_request.expected_output_keys().begin(), init_request.expected_output_keys().end()); + auto init_request = Request.mutable_compute()->mutable_init_request(); + session_id_ = init_request->session_id(); + task_id_ = init_request->task_id(); + task_options_ = init_request->task_options(); + expected_result_.assign(std::make_move_iterator(init_request->mutable_expected_output_keys()->begin()), + std::make_move_iterator(init_request->mutable_expected_output_keys()->end())); token_ = Request.communication_token(); - config_ = init_request.configuration(); + config_ = init_request->configuration(); std::vector chunks; - auto datachunk = init_request.payload(); + auto datachunk = init_request->payload(); + payload_.clear(); payload_.resize(datachunk.data().size()); std::memcpy(payload_.data(), datachunk.data().data(), datachunk.data().size()); From 43827555b6642c01866f666c528866adfdf6fb00 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Thu, 27 Jul 2023 14:58:32 +0200 Subject: [PATCH 04/10] Added missing include guard --- packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index 0e1da2efd..d1337c47e 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -1,3 +1,4 @@ +#pragma once #include #include #include From 53b4b9cfd5ac748c2939124e040ec773110ec142 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Thu, 27 Jul 2023 16:56:02 +0200 Subject: [PATCH 05/10] Fix WorkerServer --- .../ArmoniK.Api.Worker.Tests/source/main.cpp | 9 +-- .../header/utils/WorkerServer.h | 59 +++++++++++-------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index 1295391da..9ab8ded51 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -50,17 +50,18 @@ class Computer final : public ArmoniK::Api::Worker::ArmoniKWorker { return ArmoniK::Api::Worker::ProcessStatus(e.what()); } - return ArmoniK::Api::Worker::ProcessStatus::OK; + return ArmoniK::Api::Worker::ProcessStatus::Ok; } }; int main(int argc, char **argv) { std::cout << "Starting C++ worker..." << std::endl; - std::shared_ptr config = std::make_shared(); + IConfiguration config; + config.add_json_configuration("appsettings.json").add_env_configuration(); - config->set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock"); - config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); + config.set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock"); + config.set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); try { ArmoniK::Api::Worker::WorkerServer::create(config)->run(); diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index e3e12228d..e995a4b03 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -33,18 +33,32 @@ class WorkerServer { private: ::grpc::ServerBuilder builder_; - std::shared_ptr configuration_; + Common::utils::IConfiguration configuration_; + std::unique_ptr<::grpc::Server> instance_server; ///< Unique pointer to the gRPC server instance + std::shared_ptr<::grpc::Channel> channel; ///< Shared pointer to the gRPC channel + std::unique_ptr agent_stub; ///< Proxy to communicate with the agent public: /** * @brief Constructor for the WorkerServer class. * @param configuration A shared pointer to the IConfiguration object. */ - WorkerServer(std::shared_ptr configuration) - : configuration_(std::move(configuration)) { + WorkerServer(const ArmoniK::Api::Common::utils::IConfiguration &configuration) : configuration_(configuration) { logger.enrich([&](Common::serilog::serilog_context &ctx) { ctx.add("threadId", std::this_thread::get_id()); }); - logger.add_property("container", "ArmoniK.Worker"); + logger.info("Creating worker"); + Common::options::ComputePlane compute_plane(configuration); + + builder_.AddListeningPort(std::string(compute_plane.get_server_address()), ::grpc::InsecureServerCredentials()); + builder_.SetMaxReceiveMessageSize(-1); + + logger.info("Initialize and register worker"); + + // Create a gRPC channel to communicate with the server + channel = CreateChannel(std::string(compute_plane.get_agent_address()), ::grpc::InsecureChannelCredentials()); + + // Create a stub for the Submitter service + agent_stub = Agent::NewStub(channel); } /** @@ -56,35 +70,28 @@ class WorkerServer { * @return A shared pointer to the created WorkerServer instance */ template - static std::shared_ptr create(const std::shared_ptr &configuration, - Args... args) { - configuration->add_json_configuration("appsettings.json").add_env_configuration(); + static std::shared_ptr create(const Common::utils::IConfiguration &configuration, Args... args) { auto worker_server = std::make_shared(configuration); - worker_server->logger.info("Creating worker"); - Common::options::ComputePlane compute_plane(*configuration); - - worker_server->builder_.AddListeningPort(std::string(compute_plane.get_server_address()), - ::grpc::InsecureServerCredentials()); - worker_server->builder_.SetMaxReceiveMessageSize(-1); - - worker_server->logger.info("Initialize and register worker"); - - // Create a gRPC channel to communicate with the server - worker_server->channel = - CreateChannel(std::string(compute_plane.get_agent_address()), ::grpc::InsecureChannelCredentials()); - - // Create a stub for the Submitter service - worker_server->agent_stub = Agent::NewStub(worker_server->channel); - worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub), args...)); worker_server->logger.info("Finish to register new worker"); return worker_server; } - std::unique_ptr<::grpc::Server> instance_server; ///< Unique pointer to the gRPC server instance - std::shared_ptr<::grpc::Channel> channel; ///< Shared pointer to the gRPC channel - std::unique_ptr agent_stub; ///< Proxy to communicate with the agent + /** + * @brief Create a WorkerServer instance with the given configuration. + * @tparam Worker The worker class to be used + * @param configuration Shared pointer to the IConfiguration object + * @return A shared pointer to the created WorkerServer instance + */ + template + static std::shared_ptr create(const Common::utils::IConfiguration &configuration) { + auto worker_server = std::make_shared(configuration); + worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub))); + worker_server->logger.info("Finish to register new worker"); + + return worker_server; + } void run() { instance_server = builder_.BuildAndStart(); From c64a5b8465cc2a4a0590b22cec215d8032dd2969 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Fri, 28 Jul 2023 15:05:59 +0200 Subject: [PATCH 06/10] Catching exception --- .../source/Worker/ArmoniKWorker.cpp | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index e4f63642c..5a2c5be3c 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -59,20 +59,23 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process([[maybe_unused]] ::grpc::Ser TaskHandler task_handler(agent, iterator); task_handler.init(); + try { + ProcessStatus status = Execute(task_handler); - ProcessStatus status = Execute(task_handler); + logger_.debug("Finish call C++"); - logger_.debug("Finish call C++"); + armonik::api::grpc::v1::Output output; + if (status.ok()) { + *output.mutable_ok() = armonik::api::grpc::v1::Empty(); + } else { + output.mutable_error()->set_details(status.details()); + } - armonik::api::grpc::v1::Output output; - if (status.ok()) { - *output.mutable_ok() = armonik::api::grpc::v1::Empty(); - } else { - output.mutable_error()->set_details(status.details()); + *response->mutable_output() = output; + } catch (const std::exception &e) { + return {grpc::StatusCode::UNAVAILABLE, "Error processing task", e.what()}; } - *response->mutable_output() = output; - return grpc::Status::OK; } From ea35a08d9da6a5efbd5b4e5b4d8bf0b126e8dc7e Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Mon, 31 Jul 2023 14:44:56 +0200 Subject: [PATCH 07/10] Changed name of test worker --- packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index 9ab8ded51..340c881e1 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -26,9 +26,9 @@ using armonik::api::grpc::v1::TaskOptions; using namespace armonik::api::grpc::v1::worker; using namespace ArmoniK::Api::Common::utils; -class Computer final : public ArmoniK::Api::Worker::ArmoniKWorker { +class TestWorker : public ArmoniK::Api::Worker::ArmoniKWorker { public: - explicit Computer(std::unique_ptr agent) + explicit TestWorker(std::unique_ptr agent) : ArmoniKWorker(std::move(agent)) {} ArmoniK::Api::Worker::ProcessStatus Execute(ArmoniK::Api::Worker::TaskHandler &taskHandler) override { @@ -64,7 +64,7 @@ int main(int argc, char **argv) { config.set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); try { - ArmoniK::Api::Worker::WorkerServer::create(config)->run(); + ArmoniK::Api::Worker::WorkerServer::create(config)->run(); } catch (const std::exception &e) { std::cout << "Error in worker" << e.what() << std::endl; } From 42ec5a9835ddca03f9c3a53ddff1023c1ec13dd5 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Tue, 1 Aug 2023 14:08:40 +0200 Subject: [PATCH 08/10] PR suggestions --- .../header/options/ComputePlane.h | 10 +-- .../header/options/ControlPlane.h | 4 +- .../{IConfiguration.h => Configuration.h} | 35 ++++----- .../header/utils/EnvConfiguration.h | 6 +- .../header/utils/JsonConfiguration.h | 12 +-- .../{IConfiguration.cpp => Configuration.cpp} | 18 ++--- .../source/utils/JsonConfiguration.cpp | 6 +- .../source/SubmitterClientTest.cpp | 2 +- .../ArmoniK.Api.Worker.Tests/source/main.cpp | 4 +- .../header/Worker/ArmoniKWorker.h | 2 +- .../header/Worker/ProcessStatus.h | 4 +- .../header/Worker/TaskHandler.h | 10 +-- .../header/utils/WorkerServer.h | 44 ++++------- .../source/Worker/ArmoniKWorker.cpp | 13 +--- .../source/Worker/TaskHandler.cpp | 75 ++++++++----------- 15 files changed, 104 insertions(+), 141 deletions(-) rename packages/cpp/ArmoniK.Api.Common/header/utils/{IConfiguration.h => Configuration.h} (71%) rename packages/cpp/ArmoniK.Api.Common/source/utils/{IConfiguration.cpp => Configuration.cpp} (55%) diff --git a/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h b/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h index 7bcdcac88..7c3a72686 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h +++ b/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h @@ -1,6 +1,6 @@ #pragma once #include -#include +#include /** * @brief The armonik namespace contains classes and functions related to the Armonik API. @@ -13,9 +13,9 @@ class ComputePlane { public: /** * @brief Constructs a ComputePlane object with the given configuration. - * @param configuration The IConfiguration object containing address information. + * @param configuration The Configuration object containing address information. */ - ComputePlane(const utils::IConfiguration &configuration) { + ComputePlane(const utils::Configuration &configuration) { set_worker_address(configuration.get("ComputePlane__WorkerChannel__Address")); set_agent_address(configuration.get(std::string("ComputePlane__AgentChannel__Address"))); } @@ -24,7 +24,7 @@ class ComputePlane { * @brief Returns the server address. * @return A reference to the server address string. */ - [[nodiscard]] std::string_view get_server_address() const { return worker_address_; } + [[nodiscard]] const std::string &get_server_address() const { return worker_address_; } /** * @brief Sets the worker address with the given socket address. @@ -52,7 +52,7 @@ class ComputePlane { * @brief Returns the agent address. * @return A reference to the agent address string. */ - [[nodiscard]] std::string_view get_agent_address() const { return agent_address_; } + [[nodiscard]] const std::string &get_agent_address() const { return agent_address_; } private: std::string worker_address_; ///< The worker address string. diff --git a/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h b/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h index 7d90027e9..2bf455949 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h +++ b/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h @@ -1,12 +1,12 @@ #ifndef ARMONIK_API_CONTROLPLANE_H #define ARMONIK_API_CONTROLPLANE_H -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" namespace API_COMMON_NAMESPACE::options { class ControlPlane { public: - ControlPlane(const utils::IConfiguration &config) { + ControlPlane(const utils::Configuration &config) { endpoint_ = config.get(EndpointKey); user_cert_pem_path_ = config.get(UserCertKey); user_key_pem_path_ = config.get(UserKeyKey); diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/Configuration.h similarity index 71% rename from packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h rename to packages/cpp/ArmoniK.Api.Common/header/utils/Configuration.h index 363664a5d..efb60b715 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/Configuration.h @@ -1,5 +1,5 @@ /** - * @file IConfiguration.h + * @file Configuration.h * @brief Interface for a configuration class that stores and manages key-value pairs. */ @@ -18,21 +18,18 @@ class ControlPlane; namespace API_COMMON_NAMESPACE::utils { /** - * @class IConfiguration + * @class Configuration * @brief Interface for a configuration class that stores and manages key-value pairs. */ -class IConfiguration { +class Configuration { public: - /** - * @brief Default constructor. - */ - IConfiguration() = default; - - /** - * @brief Default virtual destructor. - */ - virtual ~IConfiguration() = default; + Configuration() noexcept = default; + Configuration(const Configuration &) = default; + Configuration(Configuration &&) noexcept = default; + Configuration &operator=(const Configuration &) = default; + Configuration &operator=(Configuration &&) noexcept = default; + ~Configuration() = default; /** * @brief Get the value associated with the given key. * @param string Key to look up. @@ -48,10 +45,10 @@ class IConfiguration { void set(const std::string &string, const std::string &value); /** - * @brief Set the values from another IConfiguration object. - * @param other IConfiguration object to copy values from. + * @brief Set the values from another Configuration object. + * @param other Configuration object to copy values from. */ - void set(const IConfiguration &other); + void set(const Configuration &other); /** * @brief List defined values of this configuration. @@ -62,15 +59,15 @@ class IConfiguration { /** * @brief Add JSON configuration from a file. * @param file_path Path to the JSON file. - * @return Reference to the current IConfiguration object. + * @return Reference to the current Configuration object. */ - IConfiguration &add_json_configuration(std::string_view file_path); + Configuration &add_json_configuration(std::string_view file_path); /** * @brief Add environment variable configuration. - * @return Reference to the current IConfiguration object. + * @return Reference to the current Configuration object. */ - IConfiguration &add_env_configuration(); + Configuration &add_env_configuration(); /** * @brief Get the current ComputePlane configuration. diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h index fc30431ad..3e6f1df6e 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h @@ -5,14 +5,14 @@ * @brief Header file for the EnvConfiguration class */ -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" namespace API_COMMON_NAMESPACE::utils { /** * @class EnvConfiguration - * @brief An implementation of IConfiguration that handles environment variables + * @brief An implementation of Configuration that handles environment variables */ -class EnvConfiguration : public IConfiguration { +class EnvConfiguration : public Configuration { public: /** * @brief Default constructor diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h index 8d035b5c7..5d594775a 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h @@ -1,16 +1,16 @@ #pragma once /** * @file JsonConfiguration.h - * @brief Definition of a JSON configuration class that inherits from IConfiguration. + * @brief Definition of a JSON configuration class that inherits from Configuration. */ -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" namespace API_COMMON_NAMESPACE::utils { /** * @class JsonConfiguration - * @brief JSON configuration class that inherits from IConfiguration. + * @brief JSON configuration class that inherits from Configuration. */ -class JsonConfiguration : public IConfiguration { +class JsonConfiguration : public Configuration { private: JsonConfiguration() = default; @@ -21,8 +21,8 @@ class JsonConfiguration : public IConfiguration { */ explicit JsonConfiguration(const std::string &filepath); - static void fromPath(IConfiguration &config, std::string_view filepath); + static void fromPath(Configuration &config, std::string_view filepath); static JsonConfiguration fromString(const std::string &json_string); - static void fromString(IConfiguration &config, const std::string &json_string); + static void fromString(Configuration &config, const std::string &json_string); }; } // namespace API_COMMON_NAMESPACE::utils diff --git a/packages/cpp/ArmoniK.Api.Common/source/utils/IConfiguration.cpp b/packages/cpp/ArmoniK.Api.Common/source/utils/Configuration.cpp similarity index 55% rename from packages/cpp/ArmoniK.Api.Common/source/utils/IConfiguration.cpp rename to packages/cpp/ArmoniK.Api.Common/source/utils/Configuration.cpp index bc8e302fc..5ac8cdbfd 100644 --- a/packages/cpp/ArmoniK.Api.Common/source/utils/IConfiguration.cpp +++ b/packages/cpp/ArmoniK.Api.Common/source/utils/Configuration.cpp @@ -1,36 +1,36 @@ -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" #include "options/ComputePlane.h" #include "options/ControlPlane.h" #include "utils/JsonConfiguration.h" namespace API_COMMON_NAMESPACE::utils { -IConfiguration &IConfiguration::add_json_configuration(std::string_view file_path) { +Configuration &Configuration::add_json_configuration(std::string_view file_path) { JsonConfiguration::fromPath(*this, file_path); return *this; } -IConfiguration &IConfiguration::add_env_configuration() { +Configuration &Configuration::add_env_configuration() { use_environment_ = true; above_env_keys_.clear(); return *this; } -options::ComputePlane IConfiguration::get_compute_plane() const { return *this; } +options::ComputePlane Configuration::get_compute_plane() const { return *this; } -void IConfiguration::set(const IConfiguration &other) { +void Configuration::set(const Configuration &other) { for (auto &&[key, value] : other.list()) { set(key, value); } } -void IConfiguration::set(const std::string &key, const std::string &value) { +void Configuration::set(const std::string &key, const std::string &value) { if (use_environment_) { above_env_keys_.insert(key); } options_[key] = value; } -std::string IConfiguration::get(const std::string &string) const { +std::string Configuration::get(const std::string &string) const { if (use_environment_ && above_env_keys_.find(string) == above_env_keys_.end()) { char *value = std::getenv(string.c_str()); if (value != nullptr) { @@ -41,7 +41,7 @@ std::string IConfiguration::get(const std::string &string) const { return position == options_.end() ? "" : position->second; } -const std::map &IConfiguration::list() const { return options_; } -options::ControlPlane IConfiguration::get_control_plane() const { return *this; } +const std::map &Configuration::list() const { return options_; } +options::ControlPlane Configuration::get_control_plane() const { return *this; } } // namespace API_COMMON_NAMESPACE::utils diff --git a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp index b7c923c5c..c66671ee9 100644 --- a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp +++ b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp @@ -11,7 +11,7 @@ using namespace simdjson; * @param prefix Prefix for the key * @param element json element */ -void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::string &prefix, +void populate(API_COMMON_NAMESPACE::utils::Configuration &config, const std::string &prefix, const dom::element &element) { switch (element.type()) { case dom::element_type::ARRAY: { @@ -43,7 +43,7 @@ API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(const std::string &js fromString(config, json_string); return config; } -void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::IConfiguration &config, +void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::Configuration &config, std::string_view filepath) { dom::parser parser; dom::element elem; @@ -54,7 +54,7 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPA std::cerr << "Unable to load json file " << filepath << " : " << e.what(); } } -void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::IConfiguration &config, +void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::Configuration &config, const std::string &json_string) { dom::parser parser; populate(config, "", parser.parse(padded_string(json_string))); diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp index b9350fda7..cf0a71536 100644 --- a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp +++ b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp @@ -21,7 +21,7 @@ #include "results_service.grpc.pb.h" #include "submitter/ResultsClient.h" -using ArmoniK::Api::Common::utils::IConfiguration; +using ArmoniK::Api::Common::utils::Configuration; using armonik::api::grpc::v1::TaskOptions; using armonik::api::grpc::v1::submitter::CreateSessionReply; using armonik::api::grpc::v1::submitter::CreateSessionRequest; diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index 340c881e1..d69e888ba 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -20,7 +20,7 @@ using grpc::Channel; using grpc::ClientContext; using grpc::Status; -using ArmoniK::Api::Common::utils::IConfiguration; +using ArmoniK::Api::Common::utils::Configuration; using armonik::api::grpc::v1::TaskOptions; using namespace armonik::api::grpc::v1::worker; @@ -57,7 +57,7 @@ class TestWorker : public ArmoniK::Api::Worker::ArmoniKWorker { int main(int argc, char **argv) { std::cout << "Starting C++ worker..." << std::endl; - IConfiguration config; + Configuration config; config.add_json_configuration("appsettings.json").add_env_configuration(); config.set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock"); diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index d1337c47e..fc483d03c 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -8,7 +8,7 @@ #include "grpcpp/support/sync_stream.h" #include "objects.pb.h" -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" #include "utils/WorkerServer.h" #include "worker_common.pb.h" #include "worker_service.grpc.pb.h" diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h index 9a1908949..a81a521e9 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h @@ -17,9 +17,9 @@ class ProcessStatus { ok_ = true; details_.clear(); } - void set_error(const std::string &details) { + void set_error(std::string details) { ok_ = false; - details_ = details; + details_ = std::move(details); } static const ProcessStatus Ok; diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index e161917ce..1f799286a 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -20,8 +20,8 @@ class TaskHandler { private: grpc::ClientContext context_; - std::shared_ptr stub_; - std::shared_ptr> request_iterator_; + armonik::api::grpc::v1::agent::Agent::Stub &stub_; + grpc::ServerReader &request_iterator_; std::string session_id_; std::string task_id_; armonik::api::grpc::v1::TaskOptions task_options_; @@ -38,8 +38,8 @@ class TaskHandler { * @param client the agent client * @param request_iterator The request iterator */ - TaskHandler(std::shared_ptr client, - std::shared_ptr> request_iterator); + TaskHandler(armonik::api::grpc::v1::agent::Agent::Stub &client, + grpc::ServerReader &request_iterator); /** * @brief Initialise the task handler @@ -88,7 +88,7 @@ class TaskHandler { * @param data The result data * @return A future containing a vector of ResultReply */ - std::future send_result(const std::string &key, std::string_view data); + std::future send_result(std::string key, std::string_view data); /** * @brief Get the result ids object diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index e995a4b03..09177594b 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -18,7 +18,7 @@ #include "options/ComputePlane.h" #include "serilog/SerilogContext.h" #include "serilog/serilog.h" -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" using namespace armonik::api::grpc::v1::agent; @@ -33,61 +33,43 @@ class WorkerServer { private: ::grpc::ServerBuilder builder_; - Common::utils::IConfiguration configuration_; + Common::utils::Configuration configuration_; std::unique_ptr<::grpc::Server> instance_server; ///< Unique pointer to the gRPC server instance std::shared_ptr<::grpc::Channel> channel; ///< Shared pointer to the gRPC channel - std::unique_ptr agent_stub; ///< Proxy to communicate with the agent public: /** * @brief Constructor for the WorkerServer class. - * @param configuration A shared pointer to the IConfiguration object. + * @param configuration A shared pointer to the Configuration object. */ - WorkerServer(const ArmoniK::Api::Common::utils::IConfiguration &configuration) : configuration_(configuration) { - logger.enrich([&](Common::serilog::serilog_context &ctx) { ctx.add("threadId", std::this_thread::get_id()); }); + explicit WorkerServer(Common::utils::Configuration configuration) : configuration_(std::move(configuration)) { + logger.enrich([](Common::serilog::serilog_context &ctx) { ctx.add("threadId", std::this_thread::get_id()); }); logger.add_property("container", "ArmoniK.Worker"); logger.info("Creating worker"); - Common::options::ComputePlane compute_plane(configuration); + Common::options::ComputePlane compute_plane(configuration_); - builder_.AddListeningPort(std::string(compute_plane.get_server_address()), ::grpc::InsecureServerCredentials()); + builder_.AddListeningPort(compute_plane.get_server_address(), ::grpc::InsecureServerCredentials()); builder_.SetMaxReceiveMessageSize(-1); logger.info("Initialize and register worker"); // Create a gRPC channel to communicate with the server - channel = CreateChannel(std::string(compute_plane.get_agent_address()), ::grpc::InsecureChannelCredentials()); - - // Create a stub for the Submitter service - agent_stub = Agent::NewStub(channel); + channel = CreateChannel(compute_plane.get_agent_address(), ::grpc::InsecureChannelCredentials()); } /** * @brief Create a WorkerServer instance with the given configuration. * @tparam Worker The worker class to be used * @tparam Args Argument types to construct the worker, apart from the agent stub - * @param configuration Shared pointer to the IConfiguration object + * @param configuration Shared pointer to the Configuration object * @param args Arguments to construct the worker, apart from the agent stub * @return A shared pointer to the created WorkerServer instance */ template - static std::shared_ptr create(const Common::utils::IConfiguration &configuration, Args... args) { - auto worker_server = std::make_shared(configuration); - worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub), args...)); - worker_server->logger.info("Finish to register new worker"); - - return worker_server; - } - - /** - * @brief Create a WorkerServer instance with the given configuration. - * @tparam Worker The worker class to be used - * @param configuration Shared pointer to the IConfiguration object - * @return A shared pointer to the created WorkerServer instance - */ - template - static std::shared_ptr create(const Common::utils::IConfiguration &configuration) { - auto worker_server = std::make_shared(configuration); - worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub))); + static std::shared_ptr create(Common::utils::Configuration configuration, Args &&...args) { + auto worker_server = std::make_shared(std::move(configuration)); + worker_server->builder_.RegisterService( + new Worker(Agent::NewStub(worker_server->channel), static_cast(args)...)); worker_server->logger.info("Finish to register new worker"); return worker_server; diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 5a2c5be3c..75eb91924 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -8,7 +8,7 @@ #include "grpcpp/support/sync_stream.h" #include "objects.pb.h" -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" #include "utils/WorkerServer.h" #include "worker_common.pb.h" #include "worker_service.grpc.pb.h" @@ -20,7 +20,7 @@ using grpc::Channel; using grpc::ClientContext; using grpc::Status; -using ArmoniK::Api::Common::utils::IConfiguration; +using ArmoniK::Api::Common::utils::Configuration; using armonik::api::grpc::v1::TaskOptions; using namespace armonik::api::grpc::v1::worker; @@ -52,11 +52,7 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process([[maybe_unused]] ::grpc::Ser logger_.debug("Receive new request From C++ Worker"); - // Encapsulate the pointer without deleting it out of scope - std::shared_ptr> iterator(reader, [](void *) {}); - std::shared_ptr agent(agent_.get(), [](void *) {}); - - TaskHandler task_handler(agent, iterator); + TaskHandler task_handler(*agent_, *reader); task_handler.init(); try { @@ -70,8 +66,7 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process([[maybe_unused]] ::grpc::Ser } else { output.mutable_error()->set_details(status.details()); } - - *response->mutable_output() = output; + *response->mutable_output() = std::move(output); } catch (const std::exception &e) { return {grpc::StatusCode::UNAVAILABLE, "Error processing task", e.what()}; } diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index 0ea911200..d6813a9c5 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -29,11 +29,9 @@ using namespace armonik::api::grpc::v1::agent; * @param client the agent client * @param request_iterator The request iterator */ -API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::shared_ptr client, - std::shared_ptr> request_iterator) { - stub_ = std::move(client); - request_iterator_ = std::move(request_iterator); -} +API_WORKER_NAMESPACE::TaskHandler::TaskHandler(Agent::Stub &client, + grpc::ServerReader &request_iterator) + : stub_(client), request_iterator_(request_iterator) {} /** * @brief Initialise the task handler @@ -41,70 +39,65 @@ API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::shared_ptr clie */ void API_WORKER_NAMESPACE::TaskHandler::init() { ProcessRequest Request; - if (!request_iterator_->Read(&Request)) { + if (!request_iterator_.Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitRequest) { throw std::runtime_error("Expected a Compute request type with InitRequest to start the stream."); } - auto init_request = Request.mutable_compute()->mutable_init_request(); + auto *init_request = Request.mutable_compute()->mutable_init_request(); session_id_ = init_request->session_id(); task_id_ = init_request->task_id(); task_options_ = init_request->task_options(); expected_result_.assign(std::make_move_iterator(init_request->mutable_expected_output_keys()->begin()), std::make_move_iterator(init_request->mutable_expected_output_keys()->end())); token_ = Request.communication_token(); - config_ = init_request->configuration(); + config_ = std::move(*init_request->mutable_configuration()); std::vector chunks; - auto datachunk = init_request->payload(); - payload_.clear(); - payload_.resize(datachunk.data().size()); - std::memcpy(payload_.data(), datachunk.data().data(), datachunk.data().size()); + auto *datachunk = &init_request->payload(); + payload_.resize(datachunk->data().size()); + std::memcpy(payload_.data(), datachunk->data().data(), datachunk->data().size()); - while (!datachunk.data_complete()) { - if (!request_iterator_->Read(&Request)) { + while (!datachunk->data_complete()) { + if (!request_iterator_.Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kPayload) { throw std::runtime_error("Expected a Compute request type with Payload to continue the stream."); } - datachunk = Request.compute().payload(); - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kData) { + datachunk = &Request.compute().payload(); + if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::kData) { size_t prev_size = payload_.size(); - payload_.resize(payload_.size() + datachunk.data().size()); - std::memcpy(payload_.data() + prev_size, datachunk.data().data(), datachunk.data().size()); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + payload_.resize(payload_.size() + datachunk->data().size()); + std::memcpy(payload_.data() + prev_size, datachunk->data().data(), datachunk->data().size()); + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { break; } } - armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData init_data; + armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData *init_data; do { - if (!request_iterator_->Read(&Request)) { + if (!request_iterator_.Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitData) { throw std::runtime_error("Expected a Compute request type with InitData to continue the stream."); } - init_data = Request.compute().init_data(); - if (init_data.type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey) { - const std::string &key = init_data.key(); + init_data = Request.mutable_compute()->mutable_init_data(); + if (init_data->type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey) { + const std::string &key = init_data->key(); std::string data_dep; while (true) { ProcessRequest dep_request; - if (!request_iterator_->Read(&dep_request)) { + if (!request_iterator_.Read(&dep_request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (dep_request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kData) { @@ -116,19 +109,15 @@ void API_WORKER_NAMESPACE::TaskHandler::init() { size_t prevSize = data_dep.size(); data_dep.resize(prevSize + chunk.data().size()); std::memcpy(data_dep.data() + prevSize, chunk.data().data(), chunk.data().size()); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { break; } } data_dependencies_[key] = data_dep; } - } while (init_data.type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey); + } while (init_data->type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey); } /** @@ -261,7 +250,7 @@ API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, reply.set_allocated_creation_status_list(new armonik::api::grpc::v1::agent::CreateTaskReply_CreationStatusList()); grpc::ClientContext context_client_writer; - auto stream(stub_->CreateTask(&context_client_writer, &reply)); + auto stream(stub_.CreateTask(&context_client_writer, &reply)); auto create_task_request_async = to_request_stream(task_requests, std::move(task_options), token_, chunk); @@ -291,8 +280,8 @@ API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, * @param data The result data * @return A future containing a vector of ResultReply */ -std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const std::string &key, std::string_view data) { - return std::async(std::launch::async, [this, key, data]() { +std::future API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key, std::string_view data) { + return std::async(std::launch::async, [this, key = std::move(key), data]() mutable { grpc::ClientContext context_client_writer; ResultReply reply; @@ -301,10 +290,10 @@ std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const st const size_t data_size = data.size(); size_t start = 0; - auto stream = stub_->SendResult(&context_client_writer, &reply); + auto stream = stub_.SendResult(&context_client_writer, &reply); Result init_msg; - init_msg.mutable_init()->set_key(key); + init_msg.mutable_init()->set_key(std::move(key)); init_msg.set_communication_token(token_); stream->Write(init_msg); @@ -358,7 +347,7 @@ API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vectorCreateResultsMetaData(&context_client_writer, request, &reply); + Status status = stub_.CreateResultsMetaData(&context_client_writer, request, &reply); auto results_reply = reply.results(); From 9b53e9ff265641b609818c7d7e34029f6125da73 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Tue, 1 Aug 2023 16:29:48 +0200 Subject: [PATCH 09/10] More PR suggestions --- .../header/utils/EnvConfiguration.h | 21 +++++------ .../header/utils/JsonConfiguration.h | 35 ++++++++----------- .../source/utils/JsonConfiguration.cpp | 14 ++------ .../source/SubmitterClientTest.cpp | 2 +- .../header/Worker/ProcessStatus.h | 3 +- .../source/Worker/ArmoniKWorker.cpp | 2 +- 6 files changed, 28 insertions(+), 49 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h index 3e6f1df6e..496bfe853 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h @@ -7,16 +7,11 @@ #include "utils/Configuration.h" -namespace API_COMMON_NAMESPACE::utils { -/** - * @class EnvConfiguration - * @brief An implementation of Configuration that handles environment variables - */ -class EnvConfiguration : public Configuration { -public: - /** - * @brief Default constructor - */ - EnvConfiguration() { add_env_configuration(); } -}; -} // namespace API_COMMON_NAMESPACE::utils +namespace API_COMMON_NAMESPACE::utils::EnvConfiguration { +inline void fromEnv(Configuration &config) { config.add_env_configuration(); } +inline Configuration fromEnv() { + Configuration config; + config.add_env_configuration(); + return config; +} +} // namespace API_COMMON_NAMESPACE::utils::EnvConfiguration diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h index 5d594775a..2a4c8b238 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h @@ -5,24 +5,17 @@ */ #include "utils/Configuration.h" -namespace API_COMMON_NAMESPACE::utils { -/** - * @class JsonConfiguration - * @brief JSON configuration class that inherits from Configuration. - */ -class JsonConfiguration : public Configuration { -private: - JsonConfiguration() = default; - -public: - /** - * @brief Constructor that takes a JSON file path. - * @param filepath JSON file path to be used for configuration. - */ - explicit JsonConfiguration(const std::string &filepath); - - static void fromPath(Configuration &config, std::string_view filepath); - static JsonConfiguration fromString(const std::string &json_string); - static void fromString(Configuration &config, const std::string &json_string); -}; -} // namespace API_COMMON_NAMESPACE::utils +namespace API_COMMON_NAMESPACE::utils::JsonConfiguration { +void fromPath(Configuration &config, std::string_view filepath); +void fromString(Configuration &config, std::string_view json_string); +inline Configuration fromPath(std::string_view filepath) { + Configuration config; + fromPath(config, filepath); + return config; +} +inline Configuration fromString(std::string_view json_string) { + Configuration config; + fromString(config, json_string); + return config; +} +} // namespace API_COMMON_NAMESPACE::utils::JsonConfiguration diff --git a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp index c66671ee9..f3af75f35 100644 --- a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp +++ b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp @@ -33,16 +33,6 @@ void populate(API_COMMON_NAMESPACE::utils::Configuration &config, const std::str } } -API_COMMON_NAMESPACE::utils::JsonConfiguration::JsonConfiguration(const std::string &json_path) { - fromPath(*this, json_path); -} - -API_COMMON_NAMESPACE::utils::JsonConfiguration -API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(const std::string &json_string) { - JsonConfiguration config; - fromString(config, json_string); - return config; -} void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::Configuration &config, std::string_view filepath) { dom::parser parser; @@ -55,7 +45,7 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPA } } void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::Configuration &config, - const std::string &json_string) { + std::string_view json_string) { dom::parser parser; populate(config, "", parser.parse(padded_string(json_string))); -} +} \ No newline at end of file diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp index cf0a71536..5867ad02e 100644 --- a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp +++ b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp @@ -44,7 +44,7 @@ using namespace ArmoniK::Api::Common::serilog; */ void init(std::shared_ptr &channel, TaskOptions &default_task_options) { - EnvConfiguration configuration; + Configuration configuration; // auto server = std::make_shared(configuration_t); configuration.add_json_configuration("appsettings.json").add_env_configuration(); diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h index a81a521e9..4b2536499 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h @@ -12,7 +12,8 @@ class ProcessStatus { explicit ProcessStatus(std::string error_message) : ProcessStatus(false, std::move(error_message)) {} [[nodiscard]] bool ok() const { return ok_; } - [[nodiscard]] const std::string &details() const { return details_; } + [[nodiscard]] const std::string &details() const & { return details_; } + [[nodiscard]] std::string &&details() && { return std::move(details_); } void set_ok() { ok_ = true; details_.clear(); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 75eb91924..0c1696c6b 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -64,7 +64,7 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process([[maybe_unused]] ::grpc::Ser if (status.ok()) { *output.mutable_ok() = armonik::api::grpc::v1::Empty(); } else { - output.mutable_error()->set_details(status.details()); + output.mutable_error()->set_details(std::move(status).details()); } *response->mutable_output() = std::move(output); } catch (const std::exception &e) { From 5d304cc76ec4cefdb961dc4a80b70c6bed8e1e83 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Wed, 2 Aug 2023 10:11:54 +0200 Subject: [PATCH 10/10] Fixed worker config --- .../ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp | 2 +- packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h | 5 ++--- .../cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp | 2 +- .../cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp index f3af75f35..a639be866 100644 --- a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp +++ b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp @@ -48,4 +48,4 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMES std::string_view json_string) { dom::parser parser; populate(config, "", parser.parse(padded_string(json_string))); -} \ No newline at end of file +} diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index 09177594b..72429a7a5 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -33,7 +33,6 @@ class WorkerServer { private: ::grpc::ServerBuilder builder_; - Common::utils::Configuration configuration_; std::unique_ptr<::grpc::Server> instance_server; ///< Unique pointer to the gRPC server instance std::shared_ptr<::grpc::Channel> channel; ///< Shared pointer to the gRPC channel @@ -42,11 +41,11 @@ class WorkerServer { * @brief Constructor for the WorkerServer class. * @param configuration A shared pointer to the Configuration object. */ - explicit WorkerServer(Common::utils::Configuration configuration) : configuration_(std::move(configuration)) { + explicit WorkerServer(const Common::utils::Configuration &configuration) { logger.enrich([](Common::serilog::serilog_context &ctx) { ctx.add("threadId", std::this_thread::get_id()); }); logger.add_property("container", "ArmoniK.Worker"); logger.info("Creating worker"); - Common::options::ComputePlane compute_plane(configuration_); + Common::options::ComputePlane compute_plane(configuration); builder_.AddListeningPort(compute_plane.get_server_address(), ::grpc::InsecureServerCredentials()); builder_.SetMaxReceiveMessageSize(-1); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp index 0ce75614b..7c9a685d1 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp @@ -1,4 +1,4 @@ #include "Worker/ProcessStatus.h" const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Ok; -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Error(false); \ No newline at end of file +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Error(false); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index d6813a9c5..5f524e049 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -413,4 +413,4 @@ const std::vector &API_WORKER_NAMESPACE::TaskHandler::getExpectedRe */ const armonik::api::grpc::v1::Configuration &API_WORKER_NAMESPACE::TaskHandler::getConfiguration() const { return config_; -} \ No newline at end of file +}