Skip to content

Commit

Permalink
fix: Fix task handler in cpp (#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Jul 26, 2023
2 parents ab31d7c + c4865f6 commit 2f3fefb
Show file tree
Hide file tree
Showing 9 changed files with 633 additions and 637 deletions.
6 changes: 3 additions & 3 deletions packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options)
EnvConfiguration configuration;
// auto server = std::make_shared<EnvConfiguration>(configuration_t);

configuration.add_json_configuration("appsetting.json").add_env_configuration();
configuration.add_json_configuration("appsettings.json").add_env_configuration();

std::string server_address = configuration.get("ArmoniK_Client_Server");
std::string server_address = configuration.get("Grpc__EndPoint");

std::cout << " Server address " << server_address << std::endl;

Expand All @@ -61,7 +61,7 @@ void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options)
default_task_options.mutable_options()->insert({"key2", "value2"});
default_task_options.mutable_max_duration()->set_seconds(3600);
default_task_options.mutable_max_duration()->set_nanos(0);
default_task_options.set_max_retries(3);
default_task_options.set_max_retries(1);
default_task_options.set_priority(1);
default_task_options.set_partition_id("");
default_task_options.set_application_name("my-app");
Expand Down
5 changes: 4 additions & 1 deletion packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ using namespace ArmoniK::Api::Common::utils;

ArmoniK::Api::Worker::ProcessStatus computer(ArmoniK::Api::Worker::TaskHandler &handler) {
std::cout << "Call computer" << std::endl;
std::cout << "SizePayload : " << handler.getPayload().size() << "\nSize DD : " << handler.getDataDependencies().size()
<< "\n Expected results : " << handler.getExpectedResults().size() << std::endl;

try {
if (!handler.getExpectedResults().empty()) {
auto res = handler.send_result(handler.getExpectedResults()[0], "test").get();
auto res = handler.send_result(handler.getExpectedResults()[0], handler.getPayload()).get();
if (res.has_error()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error());
}
Expand Down
60 changes: 30 additions & 30 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/ProcessStatus.h
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
#ifndef ARMONIK_API_PROCESSSTATUS_H
#define ARMONIK_API_PROCESSSTATUS_H

#include <string>
#include <utility>

namespace API_WORKER_NAMESPACE {
class ProcessStatus {
public:
ProcessStatus() : ProcessStatus(true, "") {}
explicit ProcessStatus(const char *error_message) : ProcessStatus(false, std::string(error_message)) {}
explicit ProcessStatus(std::string error_message) : ProcessStatus(false, std::move(error_message)) {}

[[nodiscard]] bool ok() const { return ok_; }
[[nodiscard]] const std::string &details() const { return details_; }

static const ProcessStatus OK;
static const ProcessStatus ERROR;

private:
explicit ProcessStatus(bool ok, std::string error_message = "") {
ok_ = ok;
details_ = std::move(error_message);
}
bool ok_ = true;
std::string details_;
};
} // namespace API_WORKER_NAMESPACE

#endif // ARMONIK_API_PROCESSSTATUS_H
#ifndef ARMONIK_API_PROCESSSTATUS_H
#define ARMONIK_API_PROCESSSTATUS_H

#include <string>
#include <utility>

namespace API_WORKER_NAMESPACE {
class ProcessStatus {
public:
ProcessStatus() : ProcessStatus(true, "") {}
explicit ProcessStatus(const char *error_message) : ProcessStatus(false, std::string(error_message)) {}
explicit ProcessStatus(std::string error_message) : ProcessStatus(false, std::move(error_message)) {}

[[nodiscard]] bool ok() const { return ok_; }
[[nodiscard]] const std::string &details() const { return details_; }

static const ProcessStatus OK;
static const ProcessStatus ERROR;

private:
explicit ProcessStatus(bool ok, std::string error_message = "") {
ok_ = ok;
details_ = std::move(error_message);
}
bool ok_ = true;
std::string details_;
};
} // namespace API_WORKER_NAMESPACE

#endif // ARMONIK_API_PROCESSSTATUS_H
300 changes: 150 additions & 150 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h
Original file line number Diff line number Diff line change
@@ -1,150 +1,150 @@
#pragma once
#include <future>
#include <string>

#include "agent_common.pb.h"
#include "agent_service.grpc.pb.h"

#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

namespace API_WORKER_NAMESPACE {

// #include "SessionContext.h"

/**
* @brief The TaskHandler class provides methods to create and handle tasks
*
*/
class TaskHandler {

private:
grpc::ClientContext context_;
std::shared_ptr<armonik::api::grpc::v1::agent::Agent::Stub> stub_;
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator_;
std::string session_id_;
std::string task_id_;
armonik::api::grpc::v1::TaskOptions task_options_;
google::protobuf::RepeatedPtrField<std::string> expected_result_;
std::vector<std::byte> payload_;
std::vector<std::byte> data_dependencies_;
std::string token_;
armonik::api::grpc::v1::Configuration config_;

public:
/**
* @brief Construct a new Task Handler object
*
* @param client the agent client
* @param request_iterator The request iterator
*/
TaskHandler(std::shared_ptr<armonik::api::grpc::v1::agent::Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator);

/**
* @brief Initialise the task handler
*
*/
void init();

/**
* @brief Create a task_chunk_stream.
*
* @param task_request a task request
* @param is_last A boolean indicating if this is the last request.
* @param chunk_max_size Maximum chunk size.
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
*/
static std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, const std::string &token,
size_t chunk_max_size);

/**
* @brief Convert task_requests to request_stream.
*
* @param task_requests List of task requests
* @param task_options The Task Options used for this batch of tasks
* @param chunk_max_size Maximum chunk size.
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
static std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, const std::string &token, size_t chunk_max_size);

/**
* @brief Create a tasks async object
* @param task_options The Task Options used for this batch of tasks
* @param task_requests List of task requests
* @return Successfully sent task
*/
std::future<armonik::api::grpc::v1::agent::CreateTaskReply>
create_tasks_async(armonik::api::grpc::v1::TaskOptions task_options,
const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests);

/**
* @brief Send task result
*
* @param key the key of result
* @param data The result data
* @return A future containing a vector of ResultReply
*/
std::future<armonik::api::grpc::v1::agent::ResultReply> send_result(const std::string &key, const std::string &data);

/**
* @brief Get the result ids object
*
* @param results The results data
* @return std::vector<std::string> list of result ids
*/
std::vector<std::string>
get_result_ids(std::vector<armonik::api::grpc::v1::agent::CreateResultsMetaDataRequest_ResultCreate> results);

/**
* @brief Get the Session Id object
*
* @return std::string
*/
std::string getSessionId();

/**
* @brief Get the Task Id object
*
* @return std::string
*/
std::string getTaskId();
/**
* @brief Get the Payload object
*
* @return std::vector<std::byte>
*/
std::vector<std::byte> getPayload();
/**
* @brief Get the Data Dependencies object
*
* @return std::vector<std::byte>
*/
std::vector<std::byte> getDataDependencies();

/**
* @brief Get the Task Options object
*
* @return armonik::api::grpc::v1::TaskOptions
*/
armonik::api::grpc::v1::TaskOptions getTaskOptions();

/**
* @brief Get the Expected Results object
*
* @return google::protobuf::RepeatedPtrField<std::string>
*/
google::protobuf::RepeatedPtrField<std::string> getExpectedResults();

/**
* @brief Get the Configuration object
*
* @return armonik::api::grpc::v1::Configuration
*/
armonik::api::grpc::v1::Configuration getConfiguration();
};

} // namespace API_WORKER_NAMESPACE
#pragma once
#include <future>
#include <string>

#include "agent_common.pb.h"
#include "agent_service.grpc.pb.h"

#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

namespace API_WORKER_NAMESPACE {

// #include "SessionContext.h"

/**
* @brief The TaskHandler class provides methods to create and handle tasks
*
*/
class TaskHandler {

private:
grpc::ClientContext context_;
std::shared_ptr<armonik::api::grpc::v1::agent::Agent::Stub> stub_;
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator_;
std::string session_id_;
std::string task_id_;
armonik::api::grpc::v1::TaskOptions task_options_;
google::protobuf::RepeatedPtrField<std::string> expected_result_;
std::string payload_;
std::map<std::string, std::string> data_dependencies_;
std::string token_;
armonik::api::grpc::v1::Configuration config_;

public:
/**
* @brief Construct a new Task Handler object
*
* @param client the agent client
* @param request_iterator The request iterator
*/
TaskHandler(std::shared_ptr<armonik::api::grpc::v1::agent::Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator);

/**
* @brief Initialise the task handler
*
*/
void init();

/**
* @brief Create a task_chunk_stream.
*
* @param task_request a task request
* @param is_last A boolean indicating if this is the last request.
* @param chunk_max_size Maximum chunk size.
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
*/
static std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, const std::string &token,
size_t chunk_max_size);

/**
* @brief Convert task_requests to request_stream.
*
* @param task_requests List of task requests
* @param task_options The Task Options used for this batch of tasks
* @param chunk_max_size Maximum chunk size.
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
static std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, const std::string &token, size_t chunk_max_size);

/**
* @brief Create a tasks async object
* @param task_options The Task Options used for this batch of tasks
* @param task_requests List of task requests
* @return Successfully sent task
*/
std::future<armonik::api::grpc::v1::agent::CreateTaskReply>
create_tasks_async(armonik::api::grpc::v1::TaskOptions task_options,
const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests);

/**
* @brief Send task result
*
* @param key the key of result
* @param data The result data
* @return A future containing a vector of ResultReply
*/
std::future<armonik::api::grpc::v1::agent::ResultReply> send_result(const std::string &key, const std::string &data);

/**
* @brief Get the result ids object
*
* @param results The results data
* @return std::vector<std::string> list of result ids
*/
std::vector<std::string>
get_result_ids(std::vector<armonik::api::grpc::v1::agent::CreateResultsMetaDataRequest_ResultCreate> results);

/**
* @brief Get the Session Id object
*
* @return std::string
*/
std::string getSessionId();

/**
* @brief Get the Task Id object
*
* @return std::string
*/
std::string getTaskId();
/**
* @brief Get the Payload object
*
* @return std::vector<std::byte>
*/
std::string getPayload();
/**
* @brief Get the Data Dependencies object
*
* @return std::vector<std::byte>
*/
std::map<std::string, std::string> getDataDependencies();

/**
* @brief Get the Task Options object
*
* @return armonik::api::grpc::v1::TaskOptions
*/
armonik::api::grpc::v1::TaskOptions getTaskOptions();

/**
* @brief Get the Expected Results object
*
* @return google::protobuf::RepeatedPtrField<std::string>
*/
google::protobuf::RepeatedPtrField<std::string> getExpectedResults();

/**
* @brief Get the Configuration object
*
* @return armonik::api::grpc::v1::Configuration
*/
armonik::api::grpc::v1::Configuration getConfiguration();
};

} // namespace API_WORKER_NAMESPACE
Loading

0 comments on commit 2f3fefb

Please sign in to comment.