Skip to content

Commit

Permalink
refactor: Changes in worker and task handler (#368)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo authored Aug 2, 2023
2 parents da9b639 + 5d304cc commit b8800ae
Show file tree
Hide file tree
Showing 18 changed files with 218 additions and 244 deletions.

This file was deleted.

This file was deleted.

10 changes: 5 additions & 5 deletions packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once
#include <sstream>
#include <utils/IConfiguration.h>
#include <utils/Configuration.h>

/**
* @brief The armonik namespace contains classes and functions related to the Armonik API.
Expand All @@ -13,9 +13,9 @@ class ComputePlane {
public:
/**
* @brief Constructs a ComputePlane object with the given configuration.
* @param configuration The IConfiguration object containing address information.
* @param configuration The Configuration object containing address information.
*/
ComputePlane(const utils::IConfiguration &configuration) {
ComputePlane(const utils::Configuration &configuration) {
set_worker_address(configuration.get("ComputePlane__WorkerChannel__Address"));
set_agent_address(configuration.get(std::string("ComputePlane__AgentChannel__Address")));
}
Expand All @@ -24,7 +24,7 @@ class ComputePlane {
* @brief Returns the server address.
* @return A reference to the server address string.
*/
[[nodiscard]] std::string_view get_server_address() const { return worker_address_; }
[[nodiscard]] const std::string &get_server_address() const { return worker_address_; }

/**
* @brief Sets the worker address with the given socket address.
Expand Down Expand Up @@ -52,7 +52,7 @@ class ComputePlane {
* @brief Returns the agent address.
* @return A reference to the agent address string.
*/
[[nodiscard]] std::string_view get_agent_address() const { return agent_address_; }
[[nodiscard]] const std::string &get_agent_address() const { return agent_address_; }

private:
std::string worker_address_; ///< The worker address string.
Expand Down
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#ifndef ARMONIK_API_CONTROLPLANE_H
#define ARMONIK_API_CONTROLPLANE_H

#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

namespace API_COMMON_NAMESPACE::options {
class ControlPlane {
public:
ControlPlane(const utils::IConfiguration &config) {
ControlPlane(const utils::Configuration &config) {
endpoint_ = config.get(EndpointKey);
user_cert_pem_path_ = config.get(UserCertKey);
user_key_pem_path_ = config.get(UserKeyKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* @file IConfiguration.h
* @file Configuration.h
* @brief Interface for a configuration class that stores and manages key-value pairs.
*/

Expand All @@ -18,21 +18,18 @@ class ControlPlane;

namespace API_COMMON_NAMESPACE::utils {
/**
* @class IConfiguration
* @class Configuration
* @brief Interface for a configuration class that stores and manages key-value pairs.
*/
class IConfiguration {
class Configuration {
public:
/**
* @brief Default constructor.
*/
IConfiguration() = default;

/**
* @brief Default virtual destructor.
*/
virtual ~IConfiguration() = default;
Configuration() noexcept = default;
Configuration(const Configuration &) = default;
Configuration(Configuration &&) noexcept = default;

Configuration &operator=(const Configuration &) = default;
Configuration &operator=(Configuration &&) noexcept = default;
~Configuration() = default;
/**
* @brief Get the value associated with the given key.
* @param string Key to look up.
Expand All @@ -48,10 +45,10 @@ class IConfiguration {
void set(const std::string &string, const std::string &value);

/**
* @brief Set the values from another IConfiguration object.
* @param other IConfiguration object to copy values from.
* @brief Set the values from another Configuration object.
* @param other Configuration object to copy values from.
*/
void set(const IConfiguration &other);
void set(const Configuration &other);

/**
* @brief List defined values of this configuration.
Expand All @@ -62,15 +59,15 @@ class IConfiguration {
/**
* @brief Add JSON configuration from a file.
* @param file_path Path to the JSON file.
* @return Reference to the current IConfiguration object.
* @return Reference to the current Configuration object.
*/
IConfiguration &add_json_configuration(std::string_view file_path);
Configuration &add_json_configuration(std::string_view file_path);

/**
* @brief Add environment variable configuration.
* @return Reference to the current IConfiguration object.
* @return Reference to the current Configuration object.
*/
IConfiguration &add_env_configuration();
Configuration &add_env_configuration();

/**
* @brief Get the current ComputePlane configuration.
Expand Down
23 changes: 9 additions & 14 deletions packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@
* @brief Header file for the EnvConfiguration class
*/

#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

namespace API_COMMON_NAMESPACE::utils {
/**
* @class EnvConfiguration
* @brief An implementation of IConfiguration that handles environment variables
*/
class EnvConfiguration : public IConfiguration {
public:
/**
* @brief Default constructor
*/
EnvConfiguration() { add_env_configuration(); }
};
} // namespace API_COMMON_NAMESPACE::utils
namespace API_COMMON_NAMESPACE::utils::EnvConfiguration {
inline void fromEnv(Configuration &config) { config.add_env_configuration(); }
inline Configuration fromEnv() {
Configuration config;
config.add_env_configuration();
return config;
}
} // namespace API_COMMON_NAMESPACE::utils::EnvConfiguration
39 changes: 16 additions & 23 deletions packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
#pragma once
/**
* @file JsonConfiguration.h
* @brief Definition of a JSON configuration class that inherits from IConfiguration.
* @brief Definition of a JSON configuration class that inherits from Configuration.
*/
#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

namespace API_COMMON_NAMESPACE::utils {
/**
* @class JsonConfiguration
* @brief JSON configuration class that inherits from IConfiguration.
*/
class JsonConfiguration : public IConfiguration {
private:
JsonConfiguration() = default;

public:
/**
* @brief Constructor that takes a JSON file path.
* @param filepath JSON file path to be used for configuration.
*/
explicit JsonConfiguration(const std::string &filepath);

static void fromPath(IConfiguration &config, std::string_view filepath);
static JsonConfiguration fromString(const std::string &json_string);
static void fromString(IConfiguration &config, const std::string &json_string);
};
} // namespace API_COMMON_NAMESPACE::utils
namespace API_COMMON_NAMESPACE::utils::JsonConfiguration {
void fromPath(Configuration &config, std::string_view filepath);
void fromString(Configuration &config, std::string_view json_string);
inline Configuration fromPath(std::string_view filepath) {
Configuration config;
fromPath(config, filepath);
return config;
}
inline Configuration fromString(std::string_view json_string) {
Configuration config;
fromString(config, json_string);
return config;
}
} // namespace API_COMMON_NAMESPACE::utils::JsonConfiguration
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

#include "options/ComputePlane.h"
#include "options/ControlPlane.h"
#include "utils/JsonConfiguration.h"

namespace API_COMMON_NAMESPACE::utils {
IConfiguration &IConfiguration::add_json_configuration(std::string_view file_path) {
Configuration &Configuration::add_json_configuration(std::string_view file_path) {
JsonConfiguration::fromPath(*this, file_path);
return *this;
}

IConfiguration &IConfiguration::add_env_configuration() {
Configuration &Configuration::add_env_configuration() {
use_environment_ = true;
above_env_keys_.clear();
return *this;
}

options::ComputePlane IConfiguration::get_compute_plane() const { return *this; }
options::ComputePlane Configuration::get_compute_plane() const { return *this; }

void IConfiguration::set(const IConfiguration &other) {
void Configuration::set(const Configuration &other) {
for (auto &&[key, value] : other.list()) {
set(key, value);
}
}
void IConfiguration::set(const std::string &key, const std::string &value) {
void Configuration::set(const std::string &key, const std::string &value) {
if (use_environment_) {
above_env_keys_.insert(key);
}
options_[key] = value;
}

std::string IConfiguration::get(const std::string &string) const {
std::string Configuration::get(const std::string &string) const {
if (use_environment_ && above_env_keys_.find(string) == above_env_keys_.end()) {
char *value = std::getenv(string.c_str());
if (value != nullptr) {
Expand All @@ -41,7 +41,7 @@ std::string IConfiguration::get(const std::string &string) const {
return position == options_.end() ? "" : position->second;
}

const std::map<std::string, std::string> &IConfiguration::list() const { return options_; }
options::ControlPlane IConfiguration::get_control_plane() const { return *this; }
const std::map<std::string, std::string> &Configuration::list() const { return options_; }
options::ControlPlane Configuration::get_control_plane() const { return *this; }

} // namespace API_COMMON_NAMESPACE::utils
18 changes: 4 additions & 14 deletions packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ using namespace simdjson;
* @param prefix Prefix for the key
* @param element json element
*/
void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::string &prefix,
void populate(API_COMMON_NAMESPACE::utils::Configuration &config, const std::string &prefix,
const dom::element &element) {
switch (element.type()) {
case dom::element_type::ARRAY: {
Expand All @@ -33,17 +33,7 @@ void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::st
}
}

API_COMMON_NAMESPACE::utils::JsonConfiguration::JsonConfiguration(const std::string &json_path) {
fromPath(*this, json_path);
}

API_COMMON_NAMESPACE::utils::JsonConfiguration
API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(const std::string &json_string) {
JsonConfiguration config;
fromString(config, json_string);
return config;
}
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::IConfiguration &config,
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::Configuration &config,
std::string_view filepath) {
dom::parser parser;
dom::element elem;
Expand All @@ -54,8 +44,8 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPA
std::cerr << "Unable to load json file " << filepath << " : " << e.what();
}
}
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::IConfiguration &config,
const std::string &json_string) {
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::Configuration &config,
std::string_view json_string) {
dom::parser parser;
populate(config, "", parser.parse(padded_string(json_string)));
}
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "results_service.grpc.pb.h"
#include "submitter/ResultsClient.h"

using ArmoniK::Api::Common::utils::IConfiguration;
using ArmoniK::Api::Common::utils::Configuration;
using armonik::api::grpc::v1::TaskOptions;
using armonik::api::grpc::v1::submitter::CreateSessionReply;
using armonik::api::grpc::v1::submitter::CreateSessionRequest;
Expand All @@ -44,7 +44,7 @@ using namespace ArmoniK::Api::Common::serilog;
*/
void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options) {

EnvConfiguration configuration;
Configuration configuration;
// auto server = std::make_shared<EnvConfiguration>(configuration_t);

configuration.add_json_configuration("appsettings.json").add_env_configuration();
Expand Down
50 changes: 29 additions & 21 deletions packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,51 @@ using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;

using ArmoniK::Api::Common::utils::IConfiguration;
using ArmoniK::Api::Common::utils::Configuration;
using armonik::api::grpc::v1::TaskOptions;

using namespace armonik::api::grpc::v1::worker;
using namespace ArmoniK::Api::Common::utils;

ArmoniK::Api::Worker::ProcessStatus computer(ArmoniK::Api::Worker::TaskHandler &handler) {
std::cout << "Call computer" << std::endl;
std::cout << "SizePayload : " << handler.getPayload().size() << "\nSize DD : " << handler.getDataDependencies().size()
<< "\n Expected results : " << handler.getExpectedResults().size() << std::endl;

try {
if (!handler.getExpectedResults().empty()) {
auto res = handler.send_result(handler.getExpectedResults()[0], handler.getPayload()).get();
if (res.has_error()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error());
class TestWorker : public ArmoniK::Api::Worker::ArmoniKWorker {
public:
explicit TestWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent)
: ArmoniKWorker(std::move(agent)) {}

ArmoniK::Api::Worker::ProcessStatus Execute(ArmoniK::Api::Worker::TaskHandler &taskHandler) override {
std::cout << "Call computer" << std::endl;
std::cout << "SizePayload : " << taskHandler.getPayload().size()
<< "\nSize DD : " << taskHandler.getDataDependencies().size()
<< "\n Expected results : " << taskHandler.getExpectedResults().size() << std::endl;

try {
if (!taskHandler.getExpectedResults().empty()) {
auto res = taskHandler.send_result(taskHandler.getExpectedResults()[0], taskHandler.getPayload()).get();
if (res.has_error()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error());
}
}

} catch (const std::exception &e) {
std::cout << "Error sending result " << e.what() << std::endl;
return ArmoniK::Api::Worker::ProcessStatus(e.what());
}

} catch (const std::exception &e) {
std::cout << "Error sending result " << e.what() << std::endl;
return ArmoniK::Api::Worker::ProcessStatus(e.what());
return ArmoniK::Api::Worker::ProcessStatus::Ok;
}

return ArmoniK::Api::Worker::ProcessStatus::OK;
}
};

int main(int argc, char **argv) {
std::cout << "Starting C++ worker..." << std::endl;

std::shared_ptr<IConfiguration> config = std::make_shared<IConfiguration>();
Configuration config;
config.add_json_configuration("appsettings.json").add_env_configuration();

config->set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock");
config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");
config.set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock");
config.set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");

try {
ArmoniK::Api::Worker::WorkerServer::create<ArmoniK::Api::Worker::ArmoniKWorker>(config, &computer)->run();
ArmoniK::Api::Worker::WorkerServer::create<TestWorker>(config)->run();
} catch (const std::exception &e) {
std::cout << "Error in worker" << e.what() << std::endl;
}
Expand Down
Loading

0 comments on commit b8800ae

Please sign in to comment.