Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Changes in worker and task handler #368

Merged
merged 10 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

10 changes: 5 additions & 5 deletions packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once
#include <sstream>
#include <utils/IConfiguration.h>
#include <utils/Configuration.h>

/**
* @brief The armonik namespace contains classes and functions related to the Armonik API.
Expand All @@ -13,9 +13,9 @@ class ComputePlane {
public:
/**
* @brief Constructs a ComputePlane object with the given configuration.
* @param configuration The IConfiguration object containing address information.
* @param configuration The Configuration object containing address information.
*/
ComputePlane(const utils::IConfiguration &configuration) {
ComputePlane(const utils::Configuration &configuration) {
set_worker_address(configuration.get("ComputePlane__WorkerChannel__Address"));
set_agent_address(configuration.get(std::string("ComputePlane__AgentChannel__Address")));
}
Expand All @@ -24,7 +24,7 @@ class ComputePlane {
* @brief Returns the server address.
* @return A reference to the server address string.
*/
[[nodiscard]] std::string_view get_server_address() const { return worker_address_; }
[[nodiscard]] const std::string &get_server_address() const { return worker_address_; }

/**
* @brief Sets the worker address with the given socket address.
Expand Down Expand Up @@ -52,7 +52,7 @@ class ComputePlane {
* @brief Returns the agent address.
* @return A reference to the agent address string.
*/
[[nodiscard]] std::string_view get_agent_address() const { return agent_address_; }
[[nodiscard]] const std::string &get_agent_address() const { return agent_address_; }

private:
std::string worker_address_; ///< The worker address string.
Expand Down
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#ifndef ARMONIK_API_CONTROLPLANE_H
#define ARMONIK_API_CONTROLPLANE_H

#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

namespace API_COMMON_NAMESPACE::options {
class ControlPlane {
public:
ControlPlane(const utils::IConfiguration &config) {
ControlPlane(const utils::Configuration &config) {
endpoint_ = config.get(EndpointKey);
user_cert_pem_path_ = config.get(UserCertKey);
user_key_pem_path_ = config.get(UserKeyKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* @file IConfiguration.h
* @file Configuration.h
* @brief Interface for a configuration class that stores and manages key-value pairs.
*/

Expand All @@ -18,21 +18,18 @@ class ControlPlane;

namespace API_COMMON_NAMESPACE::utils {
/**
* @class IConfiguration
* @class Configuration
* @brief Interface for a configuration class that stores and manages key-value pairs.
*/
class IConfiguration {
class Configuration {
public:
/**
* @brief Default constructor.
*/
IConfiguration() = default;

/**
* @brief Default virtual destructor.
*/
virtual ~IConfiguration() = default;
Configuration() noexcept = default;
Configuration(const Configuration &) = default;
Configuration(Configuration &&) noexcept = default;

Configuration &operator=(const Configuration &) = default;
Configuration &operator=(Configuration &&) noexcept = default;
~Configuration() = default;
/**
* @brief Get the value associated with the given key.
* @param string Key to look up.
Expand All @@ -48,10 +45,10 @@ class IConfiguration {
void set(const std::string &string, const std::string &value);

/**
* @brief Set the values from another IConfiguration object.
* @param other IConfiguration object to copy values from.
* @brief Set the values from another Configuration object.
* @param other Configuration object to copy values from.
*/
void set(const IConfiguration &other);
void set(const Configuration &other);

/**
* @brief List defined values of this configuration.
Expand All @@ -62,15 +59,15 @@ class IConfiguration {
/**
* @brief Add JSON configuration from a file.
* @param file_path Path to the JSON file.
* @return Reference to the current IConfiguration object.
* @return Reference to the current Configuration object.
*/
IConfiguration &add_json_configuration(std::string_view file_path);
Configuration &add_json_configuration(std::string_view file_path);

/**
* @brief Add environment variable configuration.
* @return Reference to the current IConfiguration object.
* @return Reference to the current Configuration object.
*/
IConfiguration &add_env_configuration();
Configuration &add_env_configuration();

/**
* @brief Get the current ComputePlane configuration.
Expand Down
23 changes: 9 additions & 14 deletions packages/cpp/ArmoniK.Api.Common/header/utils/EnvConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@
* @brief Header file for the EnvConfiguration class
*/

#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

namespace API_COMMON_NAMESPACE::utils {
/**
* @class EnvConfiguration
* @brief An implementation of IConfiguration that handles environment variables
*/
class EnvConfiguration : public IConfiguration {
public:
/**
* @brief Default constructor
*/
EnvConfiguration() { add_env_configuration(); }
};
} // namespace API_COMMON_NAMESPACE::utils
namespace API_COMMON_NAMESPACE::utils::EnvConfiguration {
inline void fromEnv(Configuration &config) { config.add_env_configuration(); }
inline Configuration fromEnv() {
Configuration config;
config.add_env_configuration();
return config;
}
} // namespace API_COMMON_NAMESPACE::utils::EnvConfiguration
39 changes: 16 additions & 23 deletions packages/cpp/ArmoniK.Api.Common/header/utils/JsonConfiguration.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
#pragma once
/**
* @file JsonConfiguration.h
* @brief Definition of a JSON configuration class that inherits from IConfiguration.
* @brief Definition of a JSON configuration class that inherits from Configuration.
*/
#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

namespace API_COMMON_NAMESPACE::utils {
/**
* @class JsonConfiguration
* @brief JSON configuration class that inherits from IConfiguration.
*/
class JsonConfiguration : public IConfiguration {
private:
JsonConfiguration() = default;

public:
/**
* @brief Constructor that takes a JSON file path.
* @param filepath JSON file path to be used for configuration.
*/
explicit JsonConfiguration(const std::string &filepath);

static void fromPath(IConfiguration &config, std::string_view filepath);
static JsonConfiguration fromString(const std::string &json_string);
static void fromString(IConfiguration &config, const std::string &json_string);
};
} // namespace API_COMMON_NAMESPACE::utils
namespace API_COMMON_NAMESPACE::utils::JsonConfiguration {
void fromPath(Configuration &config, std::string_view filepath);
void fromString(Configuration &config, std::string_view json_string);
inline Configuration fromPath(std::string_view filepath) {
Configuration config;
fromPath(config, filepath);
return config;
}
inline Configuration fromString(std::string_view json_string) {
Configuration config;
fromString(config, json_string);
return config;
}
} // namespace API_COMMON_NAMESPACE::utils::JsonConfiguration
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
#include "utils/IConfiguration.h"
#include "utils/Configuration.h"

#include "options/ComputePlane.h"
#include "options/ControlPlane.h"
#include "utils/JsonConfiguration.h"

namespace API_COMMON_NAMESPACE::utils {
IConfiguration &IConfiguration::add_json_configuration(std::string_view file_path) {
Configuration &Configuration::add_json_configuration(std::string_view file_path) {
JsonConfiguration::fromPath(*this, file_path);
return *this;
}

IConfiguration &IConfiguration::add_env_configuration() {
Configuration &Configuration::add_env_configuration() {
use_environment_ = true;
above_env_keys_.clear();
return *this;
}

options::ComputePlane IConfiguration::get_compute_plane() const { return *this; }
options::ComputePlane Configuration::get_compute_plane() const { return *this; }

void IConfiguration::set(const IConfiguration &other) {
void Configuration::set(const Configuration &other) {
for (auto &&[key, value] : other.list()) {
set(key, value);
}
}
void IConfiguration::set(const std::string &key, const std::string &value) {
void Configuration::set(const std::string &key, const std::string &value) {
if (use_environment_) {
above_env_keys_.insert(key);
}
options_[key] = value;
}

std::string IConfiguration::get(const std::string &string) const {
std::string Configuration::get(const std::string &string) const {
if (use_environment_ && above_env_keys_.find(string) == above_env_keys_.end()) {
char *value = std::getenv(string.c_str());
if (value != nullptr) {
Expand All @@ -41,7 +41,7 @@ std::string IConfiguration::get(const std::string &string) const {
return position == options_.end() ? "" : position->second;
}

const std::map<std::string, std::string> &IConfiguration::list() const { return options_; }
options::ControlPlane IConfiguration::get_control_plane() const { return *this; }
const std::map<std::string, std::string> &Configuration::list() const { return options_; }
options::ControlPlane Configuration::get_control_plane() const { return *this; }

} // namespace API_COMMON_NAMESPACE::utils
20 changes: 5 additions & 15 deletions packages/cpp/ArmoniK.Api.Common/source/utils/JsonConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ using namespace simdjson;
* @param prefix Prefix for the key
* @param element json element
*/
void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::string &prefix,
void populate(API_COMMON_NAMESPACE::utils::Configuration &config, const std::string &prefix,
const dom::element &element) {
switch (element.type()) {
case dom::element_type::ARRAY: {
Expand All @@ -33,17 +33,7 @@ void populate(API_COMMON_NAMESPACE::utils::IConfiguration &config, const std::st
}
}

API_COMMON_NAMESPACE::utils::JsonConfiguration::JsonConfiguration(const std::string &json_path) {
fromPath(*this, json_path);
}

API_COMMON_NAMESPACE::utils::JsonConfiguration
API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(const std::string &json_string) {
JsonConfiguration config;
fromString(config, json_string);
return config;
}
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::IConfiguration &config,
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPACE::utils::Configuration &config,
std::string_view filepath) {
dom::parser parser;
dom::element elem;
Expand All @@ -54,8 +44,8 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPA
std::cerr << "Unable to load json file " << filepath << " : " << e.what();
}
}
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::IConfiguration &config,
const std::string &json_string) {
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::Configuration &config,
std::string_view json_string) {
dom::parser parser;
populate(config, "", parser.parse(padded_string(json_string)));
}
}
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "results_service.grpc.pb.h"
#include "submitter/ResultsClient.h"

using ArmoniK::Api::Common::utils::IConfiguration;
using ArmoniK::Api::Common::utils::Configuration;
using armonik::api::grpc::v1::TaskOptions;
using armonik::api::grpc::v1::submitter::CreateSessionReply;
using armonik::api::grpc::v1::submitter::CreateSessionRequest;
Expand All @@ -44,7 +44,7 @@ using namespace ArmoniK::Api::Common::serilog;
*/
void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options) {

EnvConfiguration configuration;
Configuration configuration;
// auto server = std::make_shared<EnvConfiguration>(configuration_t);

configuration.add_json_configuration("appsettings.json").add_env_configuration();
Expand Down
50 changes: 29 additions & 21 deletions packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,51 @@ using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;

using ArmoniK::Api::Common::utils::IConfiguration;
using ArmoniK::Api::Common::utils::Configuration;
using armonik::api::grpc::v1::TaskOptions;

using namespace armonik::api::grpc::v1::worker;
using namespace ArmoniK::Api::Common::utils;

ArmoniK::Api::Worker::ProcessStatus computer(ArmoniK::Api::Worker::TaskHandler &handler) {
std::cout << "Call computer" << std::endl;
std::cout << "SizePayload : " << handler.getPayload().size() << "\nSize DD : " << handler.getDataDependencies().size()
<< "\n Expected results : " << handler.getExpectedResults().size() << std::endl;

try {
if (!handler.getExpectedResults().empty()) {
auto res = handler.send_result(handler.getExpectedResults()[0], handler.getPayload()).get();
if (res.has_error()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error());
class TestWorker : public ArmoniK::Api::Worker::ArmoniKWorker {
public:
explicit TestWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent)
: ArmoniKWorker(std::move(agent)) {}

ArmoniK::Api::Worker::ProcessStatus Execute(ArmoniK::Api::Worker::TaskHandler &taskHandler) override {
std::cout << "Call computer" << std::endl;
std::cout << "SizePayload : " << taskHandler.getPayload().size()
<< "\nSize DD : " << taskHandler.getDataDependencies().size()
<< "\n Expected results : " << taskHandler.getExpectedResults().size() << std::endl;

try {
if (!taskHandler.getExpectedResults().empty()) {
auto res = taskHandler.send_result(taskHandler.getExpectedResults()[0], taskHandler.getPayload()).get();
if (res.has_error()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error());
}
}

} catch (const std::exception &e) {
std::cout << "Error sending result " << e.what() << std::endl;
return ArmoniK::Api::Worker::ProcessStatus(e.what());
}

} catch (const std::exception &e) {
std::cout << "Error sending result " << e.what() << std::endl;
return ArmoniK::Api::Worker::ProcessStatus(e.what());
return ArmoniK::Api::Worker::ProcessStatus::Ok;
}

return ArmoniK::Api::Worker::ProcessStatus::OK;
}
};

int main(int argc, char **argv) {
std::cout << "Starting C++ worker..." << std::endl;

std::shared_ptr<IConfiguration> config = std::make_shared<IConfiguration>();
Configuration config;
config.add_json_configuration("appsettings.json").add_env_configuration();

config->set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock");
config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");
config.set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock");
config.set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");

try {
ArmoniK::Api::Worker::WorkerServer::create<ArmoniK::Api::Worker::ArmoniKWorker>(config, &computer)->run();
ArmoniK::Api::Worker::WorkerServer::create<TestWorker>(config)->run();
} catch (const std::exception &e) {
std::cout << "Error in worker" << e.what() << std::endl;
}
Expand Down
Loading