-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add task handler api c++ (#334)
- Loading branch information
Showing
6 changed files
with
627 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 54 additions & 0 deletions
54
packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
#include <iostream> | ||
#include <memory> | ||
#include <string> | ||
|
||
#include <grpc++/grpc++.h> | ||
|
||
#include "grpcpp/support/sync_stream.h" | ||
#include "objects.pb.h" | ||
|
||
#include "utils/RootConfiguration.h" | ||
#include "utils/WorkerServer.h" | ||
#include "worker_common.pb.h" | ||
#include "worker_service.grpc.pb.h" | ||
|
||
#include "Worker/TaskHandler.h" | ||
|
||
class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { | ||
private: | ||
armonik::api::common::serilog::serilog logger_; | ||
std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent_; | ||
void (*processing_function_)(TaskHandler taskHandler); | ||
|
||
public: | ||
/** | ||
* @brief Constructs a ArmoniKWorker object. | ||
*/ | ||
ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent, | ||
void (*processing_function)(TaskHandler task_handler)); | ||
|
||
/** | ||
* @brief Implements the Process method of the Worker service. | ||
* | ||
* @param context The ServerContext object. | ||
* @param reader The request iterator | ||
* @param response The ProcessReply object. | ||
* | ||
* @return The status of the method. | ||
*/ | ||
grpc::Status Process(::grpc::ServerContext *context, | ||
::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, | ||
::armonik::api::grpc::v1::worker::ProcessReply *response) override; | ||
|
||
/** | ||
* @brief Implements the HealthCheck method of the Worker service. | ||
* | ||
* @param context The ServerContext object. | ||
* @param request The Empty object. | ||
* @param response The HealthCheckReply object. | ||
* | ||
* @return The status of the method. | ||
*/ | ||
grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, | ||
::armonik::api::grpc::v1::worker::HealthCheckReply *response) override; | ||
}; |
99 changes: 99 additions & 0 deletions
99
packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
#pragma once | ||
#include <future> | ||
#include <string> | ||
|
||
#include "agent_common.pb.h" | ||
#include "agent_service.grpc.pb.h" | ||
|
||
#include "worker_common.pb.h" | ||
#include "worker_service.grpc.pb.h" | ||
|
||
// #include "SessionContext.h" | ||
|
||
/** | ||
* @brief The TaskHandler class provides methods to create and handle tasks | ||
* | ||
*/ | ||
class TaskHandler { | ||
|
||
private: | ||
grpc::ClientContext context_; | ||
std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> stub_; | ||
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator_; | ||
std::string session_id_; | ||
std::string task_id_; | ||
armonik::api::grpc::v1::TaskOptions task_options_; | ||
google::protobuf::RepeatedPtrField<std::string> expected_result_; | ||
std::vector<std::byte> payload_; | ||
std::vector<std::byte> data_dependencies_; | ||
std::string token_; | ||
armonik::api::grpc::v1::Configuration config_; | ||
|
||
public: | ||
/** | ||
* @brief Construct a new Task Handler object | ||
* | ||
* @param client the agent client | ||
* @param request_iterator The request iterator | ||
*/ | ||
TaskHandler(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> client, | ||
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator); | ||
|
||
/** | ||
* @brief Initialise the task handler | ||
* | ||
*/ | ||
void init(); | ||
|
||
/** | ||
* @brief Create a task_chunk_stream. | ||
* | ||
* @param task_request a task request | ||
* @param is_last A boolean indicating if this is the last request. | ||
* @param chunk_max_size Maximum chunk size. | ||
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>> | ||
*/ | ||
static std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>> | ||
task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, size_t chunk_max_size); | ||
|
||
/** | ||
* @brief Convert task_requests to request_stream. | ||
* | ||
* @param task_requests List of task requests | ||
* @param task_options The Task Options used for this batch of tasks | ||
* @param chunk_max_size Maximum chunk size. | ||
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>> | ||
*/ | ||
static auto to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests, | ||
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size) | ||
-> std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>; | ||
|
||
/** | ||
* @brief Create a tasks async object | ||
* @param task_options The Task Options used for this batch of tasks | ||
* @param task_requests List of task requests | ||
* @return Successfully sent task | ||
*/ | ||
std::future<armonik::api::grpc::v1::agent::CreateTaskReply> | ||
create_tasks_async(armonik::api::grpc::v1::TaskOptions task_options, | ||
const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests); | ||
|
||
/** | ||
* @brief Send task result | ||
* | ||
* @param key the key of result | ||
* @param data The result data | ||
* @return A future containing a vector of ResultReply | ||
*/ | ||
std::future<std::vector<armonik::api::grpc::v1::agent::ResultReply>> send_result(std::string key, | ||
std::vector<std::byte> &data); | ||
|
||
/** | ||
* @brief Get the result ids object | ||
* | ||
* @param results The results data | ||
* @return std::vector<std::string> list of result ids | ||
*/ | ||
std::vector<std::string> | ||
get_result_ids(std::vector<armonik::api::grpc::v1::agent::CreateResultsMetaDataRequest_ResultCreate> results); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
#include <iostream> | ||
#include <memory> | ||
#include <string> | ||
|
||
#include <grpc++/grpc++.h> | ||
|
||
#include "grpcpp/support/sync_stream.h" | ||
#include "objects.pb.h" | ||
|
||
#include "utils/RootConfiguration.h" | ||
#include "utils/WorkerServer.h" | ||
#include "worker_common.pb.h" | ||
#include "worker_service.grpc.pb.h" | ||
|
||
#include "Worker/ArmoniKWorker.h" | ||
#include "Worker/TaskHandler.h" | ||
|
||
using grpc::Channel; | ||
using grpc::ClientContext; | ||
using grpc::Status; | ||
|
||
using armonik::api::common::utils::IConfiguration; | ||
using armonik::api::grpc::v1::TaskOptions; | ||
|
||
using namespace armonik::api::grpc::v1::worker; | ||
using namespace armonik::api::worker; | ||
using namespace armonik::api::common::utils; | ||
|
||
/** | ||
* @brief Constructs a ArmoniKWorker object. | ||
*/ | ||
ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent, | ||
void (*processing_function)(TaskHandler task_handler)) | ||
: logger_(armonik::api::common::serilog::logging_format::SEQ) { | ||
logger_.info("Build Service ArmoniKWorker"); | ||
logger_.add_property("class", "ArmoniKWorker"); | ||
logger_.add_property("Worker", "ArmoniK.Api.Cpp"); | ||
agent_ = std::move(agent); | ||
processing_function_ = processing_function; | ||
} | ||
|
||
/** | ||
* @brief Implements the Process method of the Worker service. | ||
* | ||
* @param context The ServerContext object. | ||
* @param reader The request iterator | ||
* @param response The ProcessReply object. | ||
* | ||
* @return The status of the method. | ||
*/ | ||
Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader<ProcessRequest> *reader, | ||
::armonik::api::grpc::v1::worker::ProcessReply *response) { | ||
|
||
logger_.info("Receive new request From C++ real Worker"); | ||
|
||
auto output = armonik::api::grpc::v1::Output(); | ||
*output.mutable_ok() = armonik::api::grpc::v1::Empty(); | ||
// ProcessRequest req; | ||
// reader->Read(&req); | ||
*response->mutable_output() = output; | ||
|
||
std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator = | ||
std::make_shared<grpc::ServerReader<ProcessRequest>>(*reader); | ||
|
||
TaskHandler task_handler(std::move(agent_), request_iterator); | ||
|
||
task_handler.init(); | ||
|
||
logger_.info("Finish call C++"); | ||
|
||
return grpc::Status::OK; | ||
} | ||
|
||
/** | ||
* @brief Implements the HealthCheck method of the Worker service. | ||
* | ||
* @param context The ServerContext object. | ||
* @param request The Empty object. | ||
* @param response The HealthCheckReply object. | ||
* | ||
* @return The status of the method. | ||
*/ | ||
Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, | ||
::armonik::api::grpc::v1::worker::HealthCheckReply *response) { | ||
// Implementation of the HealthCheck method | ||
logger_.info("HealthCheck request OK"); | ||
|
||
response->set_status(HealthCheckReply_ServingStatus_SERVING); | ||
|
||
return Status::OK; | ||
} |
Oops, something went wrong.