diff --git a/packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h b/packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h deleted file mode 100644 index 6f70f09be..000000000 --- a/packages/cpp/ArmoniK.Api.Client/header/serializer/ArgsSerializer.h +++ /dev/null @@ -1 +0,0 @@ -#pragma once diff --git a/packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h b/packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h deleted file mode 100644 index 6f70f09be..000000000 --- a/packages/cpp/ArmoniK.Api.Client/header/serializer/Serializer.h +++ /dev/null @@ -1 +0,0 @@ -#pragma once diff --git a/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h b/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h index 7bcdcac88..7c3a72686 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h +++ b/packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h @@ -1,6 +1,6 @@ #pragma once #include -#include +#include /** * @brief The armonik namespace contains classes and functions related to the Armonik API. @@ -13,9 +13,9 @@ class ComputePlane { public: /** * @brief Constructs a ComputePlane object with the given configuration. - * @param configuration The IConfiguration object containing address information. + * @param configuration The Configuration object containing address information. */ - ComputePlane(const utils::IConfiguration &configuration) { + ComputePlane(const utils::Configuration &configuration) { set_worker_address(configuration.get("ComputePlane__WorkerChannel__Address")); set_agent_address(configuration.get(std::string("ComputePlane__AgentChannel__Address"))); } @@ -24,7 +24,7 @@ class ComputePlane { * @brief Returns the server address. * @return A reference to the server address string. */ - [[nodiscard]] std::string_view get_server_address() const { return worker_address_; } + [[nodiscard]] const std::string &get_server_address() const { return worker_address_; } /** * @brief Sets the worker address with the given socket address. @@ -52,7 +52,7 @@ class ComputePlane { * @brief Returns the agent address. * @return A reference to the agent address string. */ - [[nodiscard]] std::string_view get_agent_address() const { return agent_address_; } + [[nodiscard]] const std::string &get_agent_address() const { return agent_address_; } private: std::string worker_address_; ///< The worker address string. diff --git a/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h b/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h index 7d90027e9..2bf455949 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h +++ b/packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h @@ -1,12 +1,12 @@ #ifndef ARMONIK_API_CONTROLPLANE_H #define ARMONIK_API_CONTROLPLANE_H -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" namespace API_COMMON_NAMESPACE::options { class ControlPlane { public: - ControlPlane(const utils::IConfiguration &config) { + ControlPlane(const utils::Configuration &config) { endpoint_ = config.get(EndpointKey); user_cert_pem_path_ = config.get(UserCertKey); user_key_pem_path_ = config.get(UserKeyKey); diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/Configuration.h similarity index 71% rename from packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h rename to packages/cpp/ArmoniK.Api.Common/header/utils/Configuration.h index 363664a5d..efb60b715 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/Configuration.h @@ -1,5 +1,5 @@ /** - * @file IConfiguration.h + * @file Configuration.h * @brief Interface for a configuration class that stores and manages key-value pairs. */ @@ -18,21 +18,18 @@ class ControlPlane; namespace API_COMMON_NAMESPACE::utils { /** - * @class IConfiguration + * @class Configuration * @brief Interface for a configuration class that stores and manages key-value pairs. */ -class IConfiguration { +class Configuration { public: - /** - * @brief Default constructor. - */ - IConfiguration() = default; - - /** - * @brief Default virtual destructor. - */ - virtual ~IConfiguration() = default; + Configuration() noexcept = default; + Configuration(const Configuration &) = default; + Configuration(Configuration &&) noexcept = default; + Configuration &operator=(const Configuration &) = default; + Configuration &operator=(Configuration &&) noexcept = default; + ~Configuration() = default; /** * @brief Get the value associated with the given key. * @param string Key to look up. @@ -48,10 +45,10 @@ class IConfiguration { void set(const std::string &string, const std::string &value); /** - * @brief Set the values from another IConfiguration object. - * @param other IConfiguration object to copy values from. + * @brief Set the values from another Configuration object. + * @param other Configuration object to copy values from. */ - void set(const IConfiguration &other); + void set(const Configuration &other); /** * @brief List defined values of this configuration. @@ -62,15 +59,15 @@ class IConfiguration { /** * @brief Add JSON configuration from a file. * @param file_path Path to the JSON file. - * @return Reference to the current IConfiguration object. + * @return Reference to the current Configuration object. */ - IConfiguration &add_json_configuration(std::string_view file_path); + Configuration &add_json_configuration(std::string_view file_path); /** * @brief Add environment variable configuration. - * @return Reference to the current IConfiguration object. + * @return Reference to the current Configuration object. */ - IConfiguration &add_env_configuration(); + Configuration &add_env_configuration(); /** * @brief Get the current ComputePlane configuration. diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h index fc30431ad..496bfe853 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h @@ -5,18 +5,13 @@ * @brief Header file for the EnvConfiguration class */ -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" -namespace API_COMMON_NAMESPACE::utils { -/** - * @class EnvConfiguration - * @brief An implementation of IConfiguration that handles environment variables - */ -class EnvConfiguration : public IConfiguration { -public: - /** - * @brief Default constructor - */ - EnvConfiguration() { add_env_configuration(); } -}; -} // namespace API_COMMON_NAMESPACE::utils +namespace API_COMMON_NAMESPACE::utils::EnvConfiguration { +inline void fromEnv(Configuration &config) { config.add_env_configuration(); } +inline Configuration fromEnv() { + Configuration config; + config.add_env_configuration(); + return config; +} +} // namespace API_COMMON_NAMESPACE::utils::EnvConfiguration diff --git a/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h b/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h index 8d035b5c7..2a4c8b238 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h +++ b/packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h @@ -1,28 +1,21 @@ #pragma once /** * @file JsonConfiguration.h - * @brief Definition of a JSON configuration class that inherits from IConfiguration. + * @brief Definition of a JSON configuration class that inherits from Configuration. */ -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" -namespace API_COMMON_NAMESPACE::utils { -/** - * @class JsonConfiguration - * @brief JSON configuration class that inherits from IConfiguration. - */ -class JsonConfiguration : public IConfiguration { -private: - JsonConfiguration() = default; - -public: - /** - * @brief Constructor that takes a JSON file path. - * @param filepath JSON file path to be used for configuration. - */ - explicit JsonConfiguration(const std::string &filepath); - - static void fromPath(IConfiguration &config, std::string_view filepath); - static JsonConfiguration fromString(const std::string &json_string); - static void fromString(IConfiguration &config, const std::string &json_string); -}; -} // namespace API_COMMON_NAMESPACE::utils +namespace API_COMMON_NAMESPACE::utils::JsonConfiguration { +void fromPath(Configuration &config, std::string_view filepath); +void fromString(Configuration &config, std::string_view json_string); +inline Configuration fromPath(std::string_view filepath) { + Configuration config; + fromPath(config, filepath); + return config; +} +inline Configuration fromString(std::string_view json_string) { + Configuration config; + fromString(config, json_string); + return config; +} +} // namespace API_COMMON_NAMESPACE::utils::JsonConfiguration diff --git a/packages/cpp/ArmoniK.Api.Common/source/utils/IConfiguration.cpp b/packages/cpp/ArmoniK.Api.Common/source/utils/Configuration.cpp similarity index 55% rename from packages/cpp/ArmoniK.Api.Common/source/utils/IConfiguration.cpp rename to packages/cpp/ArmoniK.Api.Common/source/utils/Configuration.cpp index bc8e302fc..5ac8cdbfd 100644 --- a/packages/cpp/ArmoniK.Api.Common/source/utils/IConfiguration.cpp +++ b/packages/cpp/ArmoniK.Api.Common/source/utils/Configuration.cpp @@ -1,36 +1,36 @@ -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" #include "options/ComputePlane.h" #include "options/ControlPlane.h" #include "utils/JsonConfiguration.h" namespace API_COMMON_NAMESPACE::utils { -IConfiguration &IConfiguration::add_json_configuration(std::string_view file_path) { +Configuration &Configuration::add_json_configuration(std::string_view file_path) { JsonConfiguration::fromPath(*this, file_path); return *this; } -IConfiguration &IConfiguration::add_env_configuration() { +Configuration &Configuration::add_env_configuration() { use_environment_ = true; above_env_keys_.clear(); return *this; } -options::ComputePlane IConfiguration::get_compute_plane() const { return *this; } +options::ComputePlane Configuration::get_compute_plane() const { return *this; } -void IConfiguration::set(const IConfiguration &other) { +void Configuration::set(const Configuration &other) { for (auto &&[key, value] : other.list()) { set(key, value); } } -void IConfiguration::set(const std::string &key, const std::string &value) { +void Configuration::set(const std::string &key, const std::string &value) { if (use_environment_) { above_env_keys_.insert(key); } options_[key] = value; } -std::string IConfiguration::get(const std::string &string) const { +std::string Configuration::get(const std::string &string) const { if (use_environment_ && above_env_keys_.find(string) == above_env_keys_.end()) { char *value = std::getenv(string.c_str()); if (value != nullptr) { @@ -41,7 +41,7 @@ std::string IConfiguration::get(const std::string &string) const { return position == options_.end() ? "" : position->second; } -const std::map &IConfiguration::list() const { return options_; } -options::ControlPlane IConfiguration::get_control_plane() const { return *this; } +const std::map &Configuration::list() const { return options_; } +options::ControlPlane Configuration::get_control_plane() const { return *this; } } // namespace API_COMMON_NAMESPACE::utils diff --git a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp index b7c923c5c..a639be866 100644 --- a/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp +++ b/packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp @@ -11,7 +11,7 @@ using namespace simdjson; * @param prefix Prefix for the key * @param element json element */ -void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::string &prefix, +void populate(API_COMMON_NAMESPACE::utils::Configuration &config, const std::string &prefix, const dom::element &element) { switch (element.type()) { case dom::element_type::ARRAY: { @@ -33,17 +33,7 @@ void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::st } } -API_COMMON_NAMESPACE::utils::JsonConfiguration::JsonConfiguration(const std::string &json_path) { - fromPath(*this, json_path); -} - -API_COMMON_NAMESPACE::utils::JsonConfiguration -API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(const std::string &json_string) { - JsonConfiguration config; - fromString(config, json_string); - return config; -} -void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::IConfiguration &config, +void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::Configuration &config, std::string_view filepath) { dom::parser parser; dom::element elem; @@ -54,8 +44,8 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPA std::cerr << "Unable to load json file " << filepath << " : " << e.what(); } } -void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::IConfiguration &config, - const std::string &json_string) { +void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::Configuration &config, + std::string_view json_string) { dom::parser parser; populate(config, "", parser.parse(padded_string(json_string))); } diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp index b9350fda7..5867ad02e 100644 --- a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp +++ b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp @@ -21,7 +21,7 @@ #include "results_service.grpc.pb.h" #include "submitter/ResultsClient.h" -using ArmoniK::Api::Common::utils::IConfiguration; +using ArmoniK::Api::Common::utils::Configuration; using armonik::api::grpc::v1::TaskOptions; using armonik::api::grpc::v1::submitter::CreateSessionReply; using armonik::api::grpc::v1::submitter::CreateSessionRequest; @@ -44,7 +44,7 @@ using namespace ArmoniK::Api::Common::serilog; */ void init(std::shared_ptr &channel, TaskOptions &default_task_options) { - EnvConfiguration configuration; + Configuration configuration; // auto server = std::make_shared(configuration_t); configuration.add_json_configuration("appsettings.json").add_env_configuration(); diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index 205b5eec0..d69e888ba 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -20,43 +20,51 @@ using grpc::Channel; using grpc::ClientContext; using grpc::Status; -using ArmoniK::Api::Common::utils::IConfiguration; +using ArmoniK::Api::Common::utils::Configuration; using armonik::api::grpc::v1::TaskOptions; using namespace armonik::api::grpc::v1::worker; using namespace ArmoniK::Api::Common::utils; -ArmoniK::Api::Worker::ProcessStatus computer(ArmoniK::Api::Worker::TaskHandler &handler) { - std::cout << "Call computer" << std::endl; - std::cout << "SizePayload : " << handler.getPayload().size() << "\nSize DD : " << handler.getDataDependencies().size() - << "\n Expected results : " << handler.getExpectedResults().size() << std::endl; - - try { - if (!handler.getExpectedResults().empty()) { - auto res = handler.send_result(handler.getExpectedResults()[0], handler.getPayload()).get(); - if (res.has_error()) { - throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error()); +class TestWorker : public ArmoniK::Api::Worker::ArmoniKWorker { +public: + explicit TestWorker(std::unique_ptr agent) + : ArmoniKWorker(std::move(agent)) {} + + ArmoniK::Api::Worker::ProcessStatus Execute(ArmoniK::Api::Worker::TaskHandler &taskHandler) override { + std::cout << "Call computer" << std::endl; + std::cout << "SizePayload : " << taskHandler.getPayload().size() + << "\nSize DD : " << taskHandler.getDataDependencies().size() + << "\n Expected results : " << taskHandler.getExpectedResults().size() << std::endl; + + try { + if (!taskHandler.getExpectedResults().empty()) { + auto res = taskHandler.send_result(taskHandler.getExpectedResults()[0], taskHandler.getPayload()).get(); + if (res.has_error()) { + throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error()); + } } + + } catch (const std::exception &e) { + std::cout << "Error sending result " << e.what() << std::endl; + return ArmoniK::Api::Worker::ProcessStatus(e.what()); } - } catch (const std::exception &e) { - std::cout << "Error sending result " << e.what() << std::endl; - return ArmoniK::Api::Worker::ProcessStatus(e.what()); + return ArmoniK::Api::Worker::ProcessStatus::Ok; } - - return ArmoniK::Api::Worker::ProcessStatus::OK; -} +}; int main(int argc, char **argv) { std::cout << "Starting C++ worker..." << std::endl; - std::shared_ptr config = std::make_shared(); + Configuration config; + config.add_json_configuration("appsettings.json").add_env_configuration(); - config->set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock"); - config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); + config.set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock"); + config.set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); try { - ArmoniK::Api::Worker::WorkerServer::create(config, &computer)->run(); + ArmoniK::Api::Worker::WorkerServer::create(config)->run(); } catch (const std::exception &e) { std::cout << "Error in worker" << e.what() << std::endl; } diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index 1ac7b9fe2..fc483d03c 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -1,3 +1,4 @@ +#pragma once #include #include #include @@ -7,7 +8,7 @@ #include "grpcpp/support/sync_stream.h" #include "objects.pb.h" -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" #include "utils/WorkerServer.h" #include "worker_common.pb.h" #include "worker_service.grpc.pb.h" @@ -17,18 +18,16 @@ namespace API_WORKER_NAMESPACE { -class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { +class ArmoniKWorker : public armonik::api::grpc::v1::worker::Worker::Service { private: ArmoniK::Api::Common::serilog::serilog logger_; std::unique_ptr agent_; - std::function processing_function_; public: /** * @brief Constructs a ArmoniKWorker object. */ - ArmoniKWorker(std::unique_ptr agent, - std::function processing_function); + ArmoniKWorker(std::unique_ptr agent); /** * @brief Implements the Process method of the Worker service. @@ -43,6 +42,13 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Servi ::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, ::armonik::api::grpc::v1::worker::ProcessReply *response) override; + /** + * @brief Function which does the actual work + * @param taskHandler Task handler + * @return Process status + */ + virtual ProcessStatus Execute(TaskHandler &taskHandler) = 0; + /** * @brief Implements the HealthCheck method of the Worker service. * diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h index e1e666b7b..4b2536499 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h @@ -12,10 +12,19 @@ class ProcessStatus { explicit ProcessStatus(std::string error_message) : ProcessStatus(false, std::move(error_message)) {} [[nodiscard]] bool ok() const { return ok_; } - [[nodiscard]] const std::string &details() const { return details_; } + [[nodiscard]] const std::string &details() const & { return details_; } + [[nodiscard]] std::string &&details() && { return std::move(details_); } + void set_ok() { + ok_ = true; + details_.clear(); + } + void set_error(std::string details) { + ok_ = false; + details_ = std::move(details); + } - static const ProcessStatus OK; - static const ProcessStatus ERROR; + static const ProcessStatus Ok; + static const ProcessStatus Error; private: explicit ProcessStatus(bool ok, std::string error_message = "") { diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 5d5b8be5c..1f799286a 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -20,12 +20,12 @@ class TaskHandler { private: grpc::ClientContext context_; - std::shared_ptr stub_; - std::shared_ptr> request_iterator_; + armonik::api::grpc::v1::agent::Agent::Stub &stub_; + grpc::ServerReader &request_iterator_; std::string session_id_; std::string task_id_; armonik::api::grpc::v1::TaskOptions task_options_; - google::protobuf::RepeatedPtrField expected_result_; + std::vector expected_result_; std::string payload_; std::map data_dependencies_; std::string token_; @@ -38,8 +38,8 @@ class TaskHandler { * @param client the agent client * @param request_iterator The request iterator */ - TaskHandler(std::shared_ptr client, - std::shared_ptr> request_iterator); + TaskHandler(armonik::api::grpc::v1::agent::Agent::Stub &client, + grpc::ServerReader &request_iterator); /** * @brief Initialise the task handler @@ -88,7 +88,7 @@ class TaskHandler { * @param data The result data * @return A future containing a vector of ResultReply */ - std::future send_result(const std::string &key, const std::string &data); + std::future send_result(std::string key, std::string_view data); /** * @brief Get the result ids object @@ -104,47 +104,47 @@ class TaskHandler { * * @return std::string */ - std::string getSessionId(); + const std::string &getSessionId() const; /** * @brief Get the Task Id object * * @return std::string */ - std::string getTaskId(); + const std::string &getTaskId() const; /** * @brief Get the Payload object * * @return std::vector */ - std::string getPayload(); + const std::string &getPayload() const; /** * @brief Get the Data Dependencies object * * @return std::vector */ - std::map getDataDependencies(); + const std::map &getDataDependencies() const; /** * @brief Get the Task Options object * * @return armonik::api::grpc::v1::TaskOptions */ - armonik::api::grpc::v1::TaskOptions getTaskOptions(); + const armonik::api::grpc::v1::TaskOptions &getTaskOptions() const; /** * @brief Get the Expected Results object * * @return google::protobuf::RepeatedPtrField */ - google::protobuf::RepeatedPtrField getExpectedResults(); + const std::vector &getExpectedResults() const; /** * @brief Get the Configuration object * * @return armonik::api::grpc::v1::Configuration */ - armonik::api::grpc::v1::Configuration getConfiguration(); + const armonik::api::grpc::v1::Configuration &getConfiguration() const; }; } // namespace API_WORKER_NAMESPACE diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index c065a000d..72429a7a5 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -18,7 +18,7 @@ #include "options/ComputePlane.h" #include "serilog/SerilogContext.h" #include "serilog/serilog.h" -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" using namespace armonik::api::grpc::v1::agent; @@ -33,59 +33,47 @@ class WorkerServer { private: ::grpc::ServerBuilder builder_; - std::shared_ptr configuration_; + std::unique_ptr<::grpc::Server> instance_server; ///< Unique pointer to the gRPC server instance + std::shared_ptr<::grpc::Channel> channel; ///< Shared pointer to the gRPC channel public: /** * @brief Constructor for the WorkerServer class. - * @param configuration A shared pointer to the IConfiguration object. + * @param configuration A shared pointer to the Configuration object. */ - WorkerServer(std::shared_ptr configuration) - : configuration_(std::move(configuration)) { - logger.enrich([&](Common::serilog::serilog_context &ctx) { ctx.add("threadId", std::this_thread::get_id()); }); - + explicit WorkerServer(const Common::utils::Configuration &configuration) { + logger.enrich([](Common::serilog::serilog_context &ctx) { ctx.add("threadId", std::this_thread::get_id()); }); logger.add_property("container", "ArmoniK.Worker"); + logger.info("Creating worker"); + Common::options::ComputePlane compute_plane(configuration); + + builder_.AddListeningPort(compute_plane.get_server_address(), ::grpc::InsecureServerCredentials()); + builder_.SetMaxReceiveMessageSize(-1); + + logger.info("Initialize and register worker"); + + // Create a gRPC channel to communicate with the server + channel = CreateChannel(compute_plane.get_agent_address(), ::grpc::InsecureChannelCredentials()); } /** * @brief Create a WorkerServer instance with the given configuration. * @tparam Worker The worker class to be used * @tparam Args Argument types to construct the worker, apart from the agent stub - * @param configuration Shared pointer to the IConfiguration object + * @param configuration Shared pointer to the Configuration object * @param args Arguments to construct the worker, apart from the agent stub * @return A shared pointer to the created WorkerServer instance */ template - static std::shared_ptr create(const std::shared_ptr configuration, - Args... args) { - configuration->add_json_configuration("appsettings.json").add_env_configuration(); - auto worker_server = std::make_shared(configuration); - worker_server->logger.info("Creating worker"); - Common::options::ComputePlane compute_plane(*configuration); - - worker_server->builder_.AddListeningPort(std::string(compute_plane.get_server_address()), - ::grpc::InsecureServerCredentials()); - worker_server->builder_.SetMaxReceiveMessageSize(-1); - - worker_server->logger.info("Initialize and register worker"); - - // Create a gRPC channel to communicate with the server - worker_server->channel = - CreateChannel(std::string(compute_plane.get_agent_address()), ::grpc::InsecureChannelCredentials()); - - // Create a stub for the Submitter service - worker_server->agent_stub = Agent::NewStub(worker_server->channel); - - worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub), args...)); + static std::shared_ptr create(Common::utils::Configuration configuration, Args &&...args) { + auto worker_server = std::make_shared(std::move(configuration)); + worker_server->builder_.RegisterService( + new Worker(Agent::NewStub(worker_server->channel), static_cast(args)...)); worker_server->logger.info("Finish to register new worker"); return worker_server; } - std::unique_ptr<::grpc::Server> instance_server; ///< Unique pointer to the gRPC server instance - std::shared_ptr<::grpc::Channel> channel; ///< Shared pointer to the gRPC channel - std::unique_ptr agent_stub; ///< Proxy to communicate with the agent - void run() { instance_server = builder_.BuildAndStart(); instance_server->Wait(); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 34a356b90..0c1696c6b 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -8,7 +8,7 @@ #include "grpcpp/support/sync_stream.h" #include "objects.pb.h" -#include "utils/IConfiguration.h" +#include "utils/Configuration.h" #include "utils/WorkerServer.h" #include "worker_common.pb.h" #include "worker_service.grpc.pb.h" @@ -20,7 +20,7 @@ using grpc::Channel; using grpc::ClientContext; using grpc::Status; -using ArmoniK::Api::Common::utils::IConfiguration; +using ArmoniK::Api::Common::utils::Configuration; using armonik::api::grpc::v1::TaskOptions; using namespace armonik::api::grpc::v1::worker; @@ -29,14 +29,12 @@ using namespace ArmoniK::Api::Common::utils; /** * @brief Constructs a ArmoniKWorker object. */ -API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, - std::function processing_function) +API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent) : logger_(ArmoniK::Api::Common::serilog::logging_format::SEQ) { logger_.info("Build Service ArmoniKWorker"); logger_.add_property("class", "ArmoniKWorker"); logger_.add_property("Worker", "ArmoniK.Api.Cpp"); agent_ = std::move(agent); - processing_function_ = std::move(processing_function); } /** @@ -54,27 +52,25 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process([[maybe_unused]] ::grpc::Ser logger_.debug("Receive new request From C++ Worker"); - // Encapsulate the pointer without deleting it out of scope - std::shared_ptr> iterator(reader, [](void *) {}); - std::shared_ptr agent(agent_.get(), [](void *) {}); - - TaskHandler task_handler(agent, iterator); + TaskHandler task_handler(*agent_, *reader); task_handler.init(); - - ProcessStatus status = processing_function_(task_handler); - - logger_.debug("Finish call C++"); - - armonik::api::grpc::v1::Output output; - if (status.ok()) { - *output.mutable_ok() = armonik::api::grpc::v1::Empty(); - } else { - output.mutable_error()->set_details(status.details()); + try { + ProcessStatus status = Execute(task_handler); + + logger_.debug("Finish call C++"); + + armonik::api::grpc::v1::Output output; + if (status.ok()) { + *output.mutable_ok() = armonik::api::grpc::v1::Empty(); + } else { + output.mutable_error()->set_details(std::move(status).details()); + } + *response->mutable_output() = std::move(output); + } catch (const std::exception &e) { + return {grpc::StatusCode::UNAVAILABLE, "Error processing task", e.what()}; } - *response->mutable_output() = output; - return grpc::Status::OK; } diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp index 413cf79ae..7c9a685d1 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ProcessStatus.cpp @@ -1,4 +1,4 @@ #include "Worker/ProcessStatus.h" -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::OK; -const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::ERROR(false); \ No newline at end of file +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Ok; +const API_WORKER_NAMESPACE::ProcessStatus API_WORKER_NAMESPACE::ProcessStatus::Error(false); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index 55ebe96b5..5f524e049 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -29,11 +29,9 @@ using namespace armonik::api::grpc::v1::agent; * @param client the agent client * @param request_iterator The request iterator */ -API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::shared_ptr client, - std::shared_ptr> request_iterator) { - stub_ = std::move(client); - request_iterator_ = std::move(request_iterator); -} +API_WORKER_NAMESPACE::TaskHandler::TaskHandler(Agent::Stub &client, + grpc::ServerReader &request_iterator) + : stub_(client), request_iterator_(request_iterator) {} /** * @brief Initialise the task handler @@ -41,68 +39,65 @@ API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::shared_ptr clie */ void API_WORKER_NAMESPACE::TaskHandler::init() { ProcessRequest Request; - if (!request_iterator_->Read(&Request)) { + if (!request_iterator_.Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitRequest) { throw std::runtime_error("Expected a Compute request type with InitRequest to start the stream."); } - auto &init_request = Request.compute().init_request(); - session_id_ = init_request.session_id(); - task_id_ = init_request.task_id(); - task_options_ = init_request.task_options(); - expected_result_ = init_request.expected_output_keys(); + auto *init_request = Request.mutable_compute()->mutable_init_request(); + session_id_ = init_request->session_id(); + task_id_ = init_request->task_id(); + task_options_ = init_request->task_options(); + expected_result_.assign(std::make_move_iterator(init_request->mutable_expected_output_keys()->begin()), + std::make_move_iterator(init_request->mutable_expected_output_keys()->end())); token_ = Request.communication_token(); - config_ = init_request.configuration(); + config_ = std::move(*init_request->mutable_configuration()); std::vector chunks; - auto datachunk = init_request.payload(); - payload_.resize(datachunk.data().size()); - std::memcpy(payload_.data(), datachunk.data().data(), datachunk.data().size()); + auto *datachunk = &init_request->payload(); + payload_.resize(datachunk->data().size()); + std::memcpy(payload_.data(), datachunk->data().data(), datachunk->data().size()); - while (!datachunk.data_complete()) { - if (!request_iterator_->Read(&Request)) { + while (!datachunk->data_complete()) { + if (!request_iterator_.Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kPayload) { throw std::runtime_error("Expected a Compute request type with Payload to continue the stream."); } - datachunk = Request.compute().payload(); - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kData) { + datachunk = &Request.compute().payload(); + if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::kData) { size_t prev_size = payload_.size(); - payload_.resize(payload_.size() + datachunk.data().size()); - std::memcpy(payload_.data() + prev_size, datachunk.data().data(), datachunk.data().size()); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + payload_.resize(payload_.size() + datachunk->data().size()); + std::memcpy(payload_.data() + prev_size, datachunk->data().data(), datachunk->data().size()); + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { break; } } - armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData init_data; + armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData *init_data; do { - if (!request_iterator_->Read(&Request)) { + if (!request_iterator_.Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitData) { throw std::runtime_error("Expected a Compute request type with InitData to continue the stream."); } - init_data = Request.compute().init_data(); - if (init_data.type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey) { - const std::string &key = init_data.key(); + init_data = Request.mutable_compute()->mutable_init_data(); + if (init_data->type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey) { + const std::string &key = init_data->key(); std::string data_dep; while (true) { ProcessRequest dep_request; - if (!request_iterator_->Read(&dep_request)) { + if (!request_iterator_.Read(&dep_request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (dep_request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kData) { @@ -114,19 +109,15 @@ void API_WORKER_NAMESPACE::TaskHandler::init() { size_t prevSize = data_dep.size(); data_dep.resize(prevSize + chunk.data().size()); std::memcpy(data_dep.data() + prevSize, chunk.data().data(), chunk.data().size()); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); - } - - if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + } else if (datachunk->type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { break; } } data_dependencies_[key] = data_dep; } - } while (init_data.type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey); + } while (init_data->type_case() == armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest_InitData::kKey); } /** @@ -259,7 +250,7 @@ API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, reply.set_allocated_creation_status_list(new armonik::api::grpc::v1::agent::CreateTaskReply_CreationStatusList()); grpc::ClientContext context_client_writer; - auto stream(stub_->CreateTask(&context_client_writer, &reply)); + auto stream(stub_.CreateTask(&context_client_writer, &reply)); auto create_task_request_async = to_request_stream(task_requests, std::move(task_options), token_, chunk); @@ -289,9 +280,8 @@ API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, * @param data The result data * @return A future containing a vector of ResultReply */ -std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const std::string &key, - const std::string &data) { - return std::async(std::launch::async, [this, key, data]() { +std::future API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key, std::string_view data) { + return std::async(std::launch::async, [this, key = std::move(key), data]() mutable { grpc::ClientContext context_client_writer; ResultReply reply; @@ -300,10 +290,10 @@ std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const st const size_t data_size = data.size(); size_t start = 0; - auto stream = stub_->SendResult(&context_client_writer, &reply); + auto stream = stub_.SendResult(&context_client_writer, &reply); Result init_msg; - init_msg.mutable_init()->set_key(key); + init_msg.mutable_init()->set_key(std::move(key)); init_msg.set_communication_token(token_); stream->Write(init_msg); @@ -315,7 +305,7 @@ std::future API_WORKER_NAMESPACE::TaskHandler::send_result(const st msg.set_communication_token(token_); auto chunk = msg.mutable_data(); chunk->mutable_data()->resize(chunkSize); - std::memcpy(chunk->mutable_data()->data(), data.c_str() + start, chunkSize); + std::memcpy(chunk->mutable_data()->data(), data.data() + start, chunkSize); stream->Write(msg); @@ -357,7 +347,7 @@ API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vectorCreateResultsMetaData(&context_client_writer, request, &reply); + Status status = stub_.CreateResultsMetaData(&context_client_writer, request, &reply); auto results_reply = reply.results(); @@ -373,28 +363,28 @@ API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector */ -std::string API_WORKER_NAMESPACE::TaskHandler::getPayload() { return payload_; } +const std::string &API_WORKER_NAMESPACE::TaskHandler::getPayload() const { return payload_; } /** * @brief Get the Data Dependencies object * * @return std::vector */ -std::map API_WORKER_NAMESPACE::TaskHandler::getDataDependencies() { +const std::map &API_WORKER_NAMESPACE::TaskHandler::getDataDependencies() const { return data_dependencies_; } @@ -403,14 +393,16 @@ std::map API_WORKER_NAMESPACE::TaskHandler::getDataDep * * @return armonik::api::grpc::v1::TaskOptions */ -armonik::api::grpc::v1::TaskOptions API_WORKER_NAMESPACE::TaskHandler::getTaskOptions() { return task_options_; } +const armonik::api::grpc::v1::TaskOptions &API_WORKER_NAMESPACE::TaskHandler::getTaskOptions() const { + return task_options_; +} /** * @brief Get the Expected Results object * * @return google::protobuf::RepeatedPtrField */ -google::protobuf::RepeatedPtrField API_WORKER_NAMESPACE::TaskHandler::getExpectedResults() { +const std::vector &API_WORKER_NAMESPACE::TaskHandler::getExpectedResults() const { return expected_result_; } @@ -419,4 +411,6 @@ google::protobuf::RepeatedPtrField API_WORKER_NAMESPACE::TaskHandle * * @return armonik::api::grpc::v1::Configuration */ -armonik::api::grpc::v1::Configuration API_WORKER_NAMESPACE::TaskHandler::getConfiguration() { return config_; } \ No newline at end of file +const armonik::api::grpc::v1::Configuration &API_WORKER_NAMESPACE::TaskHandler::getConfiguration() const { + return config_; +}