From 0619f7ae62b0e1e6c15a520c0c44ca7e9574dc89 Mon Sep 17 00:00:00 2001 From: ddubuc Date: Thu, 4 May 2023 09:22:06 +0200 Subject: [PATCH 01/14] add fix for powershell script From 226e10d2edead6f403ce1ce0b58c67e05e2a02cf Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Wed, 14 Jun 2023 17:16:02 +0200 Subject: [PATCH 02/14] Removing extra lines in ci --- .github/workflows/ci.yml | 10 +++++++--- .github/workflows/publish-edge.yml | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e467b028b..403559b12 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,8 +96,7 @@ jobs: - name: Check Diff id: check-diff run: | - cd packages/cpp - git ls-files *.{c,h,hpp,cpp,cc} | xargs clang-format -style=file:.clang-format -i + git ls-files *.{c,h,hpp,cpp,cc} | xargs clang-format -style=google -i DIFF="$(git diff --name-only)" if [ -z "$DIFF" ]; then @@ -236,7 +235,7 @@ jobs: path: packages/python/coverage.xml build-cpp-packages: - name: Build C++ Packages + name: Release C++ Packages runs-on: ubuntu-latest timeout-minutes: 30 steps: @@ -249,3 +248,8 @@ jobs: run: | cd packages/cpp/tools/ ./compile.sh + + - name: Run the client + run: | + cd ../build/Armonik.Api.Client.Tests + ./Armonik.Api.Client.Tests diff --git a/.github/workflows/publish-edge.yml b/.github/workflows/publish-edge.yml index b5085a430..9556b672a 100644 --- a/.github/workflows/publish-edge.yml +++ b/.github/workflows/publish-edge.yml @@ -230,5 +230,5 @@ jobs: uses: pypa/gh-action-pypi-publish@release/v1 with: password: ${{ secrets.PYPI_API_TOKEN }} - print-hash: true - packages-dir: packages/python/pkg/ + print_hash: true + packages_dir: packages/python/pkg/ From 4e41b55b24ddfcefc2cbb2c49d02332d55006003 Mon Sep 17 00:00:00 2001 From: ddubuc Date: Thu, 4 May 2023 02:34:14 +0200 Subject: [PATCH 03/14] Fix issue with Windows compilation From 7050c97c4bad6893e6fddbd151c8af898424b4a9 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Mon, 19 Jun 2023 20:10:12 +0200 Subject: [PATCH 04/14] Removing windows setup script From a95c3cf59c035a9ba565b3c01e887675d573f320 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Fri, 30 Jun 2023 11:50:53 +0200 Subject: [PATCH 05/14] Adding task handler --- .../header/Worker/TaskHandler.h | 94 +++++ .../source/Worker/TaskHandler.cpp | 350 ++++++++++++++++++ 2 files changed, 444 insertions(+) create mode 100644 packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h create mode 100644 packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h new file mode 100644 index 000000000..62f20c88a --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -0,0 +1,94 @@ +#pragma once +#include +#include + +#include "agent_common.pb.h" +#include "agent_service.grpc.pb.h" + +#include "worker_common.pb.h" +#include "worker_service.grpc.pb.h" + +// #include "SessionContext.h" + +/** + * @brief The TaskHandler class provides methods to create and handle tasks + * + */ +class TaskHandler { + +private: + grpc::ClientContext context_; + std::unique_ptr stub_; + std::unique_ptr> request_iterator_; + std::string session_id_; + std::string task_id_; + armonik::api::grpc::v1::TaskOptions task_options_; + google::protobuf::RepeatedPtrField expected_result_; + std::string token_; + armonik::api::grpc::v1::Configuration config_; + + +public: + /** + * @brief Construct a new Task Handler object + * + * @param client + * @param request_iterator + */ + TaskHandler(std::unique_ptr client, + std::unique_ptr> request_iterator); + + /** + * @brief + * + */ + void init(); + + /** + * @brief + * + * @param task_request + * @param is_last + * @param chunk_max_size + * @return std::future> + */ + static std::future> + task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, size_t chunk_max_size); + + /** + * @brief + * + * @param task_requests + * @param task_options + * @param chunk_max_size + * @return std::vector>> + */ + static auto to_request_stream(const std::vector &task_requests, + armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size) + -> std::vector>>; + + /** + * @brief Create a tasks async object + * @param channel + * @param session_id + * @param task_options + * @param task_requests + * @return std::future + */ + std::future + create_tasks_async(std::string &session_id, + armonik::api::grpc::v1::TaskOptions task_options, + const std::vector &task_requests); + + /** + * @brief + * + * @param channel + * @param data + * @return std::future> + */ + std::future> + send_result(std::vector &data); + + std::vector get_result_ids(std::vector results); +}; diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp new file mode 100644 index 000000000..e6b33223c --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -0,0 +1,350 @@ +#include "Worker/TaskHandler.h" + +#include +#include +#include + +#include "agent_common.pb.h" +#include "agent_service.grpc.pb.h" + +#include "worker_common.pb.h" +#include "worker_service.grpc.pb.h" + +using armonik::api::grpc::v1::ResultRequest; +using armonik::api::grpc::v1::TaskOptions; +using armonik::api::grpc::v1::TaskRequest; +using armonik::api::grpc::v1::agent::Agent; +using armonik::api::grpc::v1::agent::CreateTaskReply; +using armonik::api::grpc::v1::agent::CreateTaskRequest; +using armonik::api::grpc::v1::worker::ProcessRequest; +using grpc::Channel; +using grpc::ChannelInterface; +using grpc::ClientContext; +using grpc::Status; +using namespace armonik::api::grpc::v1::agent; + +TaskHandler::TaskHandler(std::unique_ptr client, + std::unique_ptr> request_iterator) { + stub_ = std::move(client); + request_iterator_ = std::move(request_iterator); +} + + +void TaskHandler::init() { + ProcessRequest Request; + std::unique_ptr> request_iterator; + request_iterator->Read(&Request); + if (!request_iterator->Read(&Request)) { + throw std::runtime_error("Request stream ended unexpectedly."); + } + + if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitRequest) { + throw std::runtime_error("Expected a Compute request type with InitRequest to start the stream."); + } + auto &init_request = Request.compute().init_request(); + session_id_ = init_request.session_id(); + task_id_ = init_request.task_id(); + task_options_ = init_request.task_options(); + expected_result_ = init_request.expected_output_keys(); + token_ = Request.communication_token(); + config_ = init_request.configuration(); + + std::vector payload; + if (init_request.payload().data_complete()) { + std::string temp = init_request.payload().data(); + payload.resize(temp.length()); + for (size_t i = 0; i < temp.length(); i++) { + payload[i] = static_cast(temp[i]); + } + + } else { + std::vector chuncks; + auto datachunck = init_request.payload(); + + chuncks.push_back(datachunck.data()); + + while (!datachunck.data_complete()) { + if (request_iterator->Read(&Request)) { + throw std::runtime_error("Request stream ended unexpectedly."); + } + if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kPayload) { + throw std::runtime_error("Expected a Compute request type with Payload to continue the stream."); + } + + datachunck = Request.compute().payload(); + + chuncks.push_back(datachunck.data()); + } + + size_t size = chuncks.size(); + + auto payload_data = new std::byte[size]; + int address = 0; + for (auto iter = chuncks.begin(); iter != chuncks.end(); iter++){ + std::string temp_str = *iter; + + for (size_t i = 0; i < temp_str.length(); i++) { + payload_data[i + address] = static_cast(temp_str[i]); + } + address += temp_str.length(); + } + + payload.resize(size); + std::copy_n(payload_data, size, std::back_inserter(payload)); + } + + std::vector data_dependencies; + + armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData init_data; + + do { + if (request_iterator->Read(&Request)) { + throw std::runtime_error("Request stream ended unexpectedly."); + } + if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitData) { + throw std::runtime_error("Expected a Compute request type with InitData to continue the stream."); + } + + init_data = Request.compute().init_data(); + if (!init_data.key().empty()) { + std::vector chuncks; + while (true) { + if (request_iterator->Read(&Request)) { + throw std::runtime_error("Request stream ended unexpectedly."); + } + if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kData) { + throw std::runtime_error("Expected a Compute request type with Data to continue the stream."); + } + + auto datachunck = Request.compute().data(); + if (datachunck.type_case() == armonik::api::grpc::v1::DataChunk::kData) { + chuncks.push_back(datachunck.data()); + } + + if (datachunck.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); + } + + if (datachunck.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + break; + } + } + + size_t size = chuncks.size(); + + auto data = new int8_t[size]; + + size_t start = 0; + + for (size_t count = 0; count <= size; count++) { + std::string temp_str = chuncks[count]; + + for (size_t i = 0; i < temp_str.length(); i++) { + data[i + count * size] = temp_str[i]; + } + } + + data_dependencies.push_back(reinterpret_cast(data)); + } + + } while (!init_data.key().empty()); +} + +std::future> TaskHandler::task_chunk_stream(TaskRequest task_request, + bool is_last, size_t chunk_max_size) { + return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() { + std::vector requests; + armonik::api::grpc::v1::InitTaskRequest header_task_request; + armonik::api::grpc::v1::TaskRequestHeader header; + + header.mutable_data_dependencies()->Add(task_request.data_dependencies().begin(), + task_request.data_dependencies().end()); + header.mutable_expected_output_keys()->Add(task_request.expected_output_keys().begin(), + task_request.expected_output_keys().end()); + *header_task_request.mutable_header() = std::move(header); + + CreateTaskRequest create_init_task_request; + *create_init_task_request.mutable_init_task() = std::move(header_task_request); + + requests.push_back(std::move(create_init_task_request)); + + if (task_request.payload().empty()) { + CreateTaskRequest empty_task_request; + + armonik::api::grpc::v1::DataChunk task_payload; + *task_payload.mutable_data() = {}; + *empty_task_request.mutable_task_payload() = std::move(task_payload); + requests.push_back(std::move(empty_task_request)); + } + + size_t start = 0; + + while (start < task_request.payload().size()) { + + size_t chunk_size = std::min(chunk_max_size, task_request.payload().size() - start); + + CreateTaskRequest chunk_task_request; + + armonik::api::grpc::v1::DataChunk task_payload; + + *task_payload.mutable_data() = task_request.payload().substr(start, chunk_size); + *chunk_task_request.mutable_task_payload() = std::move(task_payload); + + requests.push_back(std::move(chunk_task_request)); + + start += chunk_size; + } + + CreateTaskRequest complete_task_request; + armonik::api::grpc::v1::DataChunk end_payload; + + end_payload.set_data_complete(true); + *complete_task_request.mutable_task_payload() = std::move(end_payload); + requests.push_back(std::move(complete_task_request)); + + if (is_last) { + CreateTaskRequest last_task_request; + armonik::api::grpc::v1::InitTaskRequest init_task_request; + + init_task_request.set_last_task(true); + *last_task_request.mutable_init_task() = std::move(init_task_request); + + requests.push_back(std::move(last_task_request)); + } + + return requests; + }); +} + +std::vector>> +TaskHandler::to_request_stream(const std::vector &task_requests, TaskOptions task_options, + const size_t chunk_max_size) { + std::vector>> async_chunk_payload_tasks; + + async_chunk_payload_tasks.push_back(std::async([task_options = std::move(task_options)]()mutable { + CreateTaskRequest_InitRequest create_task_request_init; + *create_task_request_init.mutable_task_options() = std::move(task_options); + + CreateTaskRequest create_task_request; + *create_task_request.mutable_init_request() = std::move(create_task_request_init); + + return std::vector{std::move(create_task_request)}; + })); + + for (auto task_request = task_requests.begin(); task_request != task_requests.end(); ++task_request) { + const bool is_last = task_request == task_requests.end() - 1 ? true : false; + + async_chunk_payload_tasks.push_back(task_chunk_stream(*task_request, is_last, chunk_max_size)); + } + + return async_chunk_payload_tasks; +} + +std::future TaskHandler::create_tasks_async(std::string &session_id, TaskOptions task_options, + const std::vector &task_requests) { + return std::async(std::launch::async, [this, &task_requests, &session_id, &task_options]()mutable { + + size_t chunk = config_.data_chunk_max_size(); + + + CreateTaskReply reply{}; + + reply.set_allocated_creation_status_list(new armonik::api::grpc::v1::agent::CreateTaskReply_CreationStatusList()); + grpc::ClientContext context_client_writer; + auto stream(stub_->CreateTask(&context_client_writer, &reply)); + + auto create_task_request_async = to_request_stream(task_requests, std::move(task_options), chunk); + + for (auto &f : create_task_request_async) { + for (auto &create_task_request : f.get()) { + stream->Write(create_task_request); + } + } + + stream->WritesDone(); + grpc::Status status = stream->Finish(); + if (!status.ok()) { + std::stringstream message; + message << "Error: " << status.error_code() << ": " << status.error_message() + << ". details : " << status.error_details() << std::endl; + throw std::runtime_error(message.str().c_str()); + } + + return reply; + }); +} + +std::future> TaskHandler::send_result(std::vector &data) { + return std::async(std::launch::async, [this, data]() { + std::vector result; + + grpc::ClientContext context_client_writer; + + auto reply = new ResultReply; + + size_t chunck; + size_t max_chunck = config_.data_chunk_max_size(); + const size_t data_size = data.size(); + size_t start = 0; + + auto stream = stub_->SendResult(&context_client_writer, reply); + + while (start < data_size) { + chunck = std::min(max_chunck, data_size - start); + + Result msg; + + armonik::api::grpc::v1::DataChunk result_msg; + std::string data_str; + int count = 0; + for (int i = start; i < start + chunck; i++) { + data_str[count++] = static_cast(data[i]); + } + + *result_msg.mutable_data() = std::move(data_str); + + *msg.mutable_data() = std::move(result_msg); + + stream->Write(msg); + + start += chunck; + } + + armonik::api::grpc::v1::DataChunk result_completed; + result_completed.set_data_complete(true); + + Result end_msg; + *end_msg.mutable_data() = std::move(result_completed); + + stream->Write(std::move(end_msg)); + + stream->WritesDone(); + grpc::Status status = stream->Finish(); + + if (!status.ok()) { + std::stringstream message; + message << "Error: " << status.error_code() << ": " << status.error_message() + << ". details: " << status.error_details() << std::endl; + throw std::runtime_error(message.str().c_str()); + } + + delete reply; + return result; + }); + +} + +std::vector TaskHandler::get_result_ids(std::vector results) { + std::vector result_ids; + + grpc::ClientContext context_client_writer; + CreateResultsMetaDataRequest request; + CreateResultsMetaDataResponse reply; + + request.mutable_results(); + request.set_session_id(session_id_); + + Status status = stub_->CreateResultsMetaData(&context_client_writer, std::move(request), &reply); + + return result_ids; +} From f36b7553ea73e42a87c87a4976a6bb1e475afcab Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 4 Jul 2023 12:00:00 +0200 Subject: [PATCH 06/14] Complete task handler --- .../header/Worker/TaskHandler.h | 54 ++++++++------- .../source/Worker/TaskHandler.cpp | 69 ++++++++++++++++--- 2 files changed, 90 insertions(+), 33 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 62f20c88a..feb74a1c0 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -32,35 +32,35 @@ class TaskHandler { /** * @brief Construct a new Task Handler object * - * @param client - * @param request_iterator + * @param client the agent client + * @param request_iterator The request iterator */ TaskHandler(std::unique_ptr client, std::unique_ptr> request_iterator); /** - * @brief + * @brief Initialise the task handler * */ void init(); /** - * @brief + * @brief Create a task_chunk_stream. * - * @param task_request - * @param is_last - * @param chunk_max_size + * @param task_request a task request + * @param is_last A boolean indicating if this is the last request. + * @param chunk_max_size Maximum chunk size. * @return std::future> */ static std::future> task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, size_t chunk_max_size); /** - * @brief + * @brief Convert task_requests to request_stream. * - * @param task_requests - * @param task_options - * @param chunk_max_size + * @param task_requests List of task requests + * @param task_options The Task Options used for this batch of tasks + * @param chunk_max_size Maximum chunk size. * @return std::vector>> */ static auto to_request_stream(const std::vector &task_requests, @@ -69,26 +69,30 @@ class TaskHandler { /** * @brief Create a tasks async object - * @param channel - * @param session_id - * @param task_options - * @param task_requests - * @return std::future + * @param task_options The Task Options used for this batch of tasks + * @param task_requests List of task requests + * @return Successfully sent task */ std::future - create_tasks_async(std::string &session_id, - armonik::api::grpc::v1::TaskOptions task_options, + create_tasks_async(armonik::api::grpc::v1::TaskOptions task_options, const std::vector &task_requests); /** - * @brief - * - * @param channel - * @param data - * @return std::future> + * @brief Send task result + * + * @param key the key of result + * @param data The result data + * @return A future containing a vector of ResultReply */ std::future> - send_result(std::vector &data); + send_result(std::string key, std::vector &data); - std::vector get_result_ids(std::vector results); + /** + * @brief Get the result ids object + * + * @param results The results data + * @return std::vector list of result ids + */ + std::vector + get_result_ids(std::vector results); }; diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index e6b33223c..6a0650d2f 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -23,13 +23,22 @@ using grpc::ClientContext; using grpc::Status; using namespace armonik::api::grpc::v1::agent; +/** + * @brief Construct a new Task Handler object + * + * @param client the agent client + * @param request_iterator The request iterator + */ TaskHandler::TaskHandler(std::unique_ptr client, std::unique_ptr> request_iterator) { stub_ = std::move(client); request_iterator_ = std::move(request_iterator); } - +/** + * @brief Initialise the task handler + * + */ void TaskHandler::init() { ProcessRequest Request; std::unique_ptr> request_iterator; @@ -150,6 +159,14 @@ void TaskHandler::init() { } while (!init_data.key().empty()); } +/** + * @brief Create a task_chunk_stream. + * + * @param task_request a task request + * @param is_last A boolean indicating if this is the last request. + * @param chunk_max_size Maximum chunk size. + * @return std::future> + */ std::future> TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, size_t chunk_max_size) { return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() { @@ -216,6 +233,14 @@ std::future> TaskHandler::task_chunk_stream(TaskR }); } +/** + * @brief Convert task_requests to request_stream. + * + * @param task_requests List of task requests + * @param task_options The Task Options used for this batch of tasks + * @param chunk_max_size Maximum chunk size. + * @return std::vector>> + */ std::vector>> TaskHandler::to_request_stream(const std::vector &task_requests, TaskOptions task_options, const size_t chunk_max_size) { @@ -240,9 +265,15 @@ TaskHandler::to_request_stream(const std::vector &task_requests, Ta return async_chunk_payload_tasks; } -std::future TaskHandler::create_tasks_async(std::string &session_id, TaskOptions task_options, +/** + * @brief Create a tasks async object + * @param task_options The Task Options used for this batch of tasks + * @param task_requests List of task requests + * @return Successfully sent task + */ +std::future TaskHandler::create_tasks_async(TaskOptions task_options, const std::vector &task_requests) { - return std::async(std::launch::async, [this, &task_requests, &session_id, &task_options]()mutable { + return std::async(std::launch::async, [this, &task_requests, &task_options]()mutable { size_t chunk = config_.data_chunk_max_size(); @@ -274,8 +305,15 @@ std::future TaskHandler::create_tasks_async(std::string &sessio }); } -std::future> TaskHandler::send_result(std::vector &data) { - return std::async(std::launch::async, [this, data]() { +/** + * @brief Send task result + * + * @param key the key of result + * @param data The result data + * @return A future containing a vector of ResultReply + */ +std::future> TaskHandler::send_result(std::string key, std::vector &data) { + return std::async(std::launch::async, [this, key, data]() { std::vector result; grpc::ClientContext context_client_writer; @@ -289,6 +327,9 @@ std::future> TaskHandler::send_result(std::vectorSendResult(&context_client_writer, reply); + Result init_msg; + init_msg.mutable_init()->set_key(key); + while (start < data_size) { chunck = std::min(max_chunck, data_size - start); @@ -334,17 +375,29 @@ std::future> TaskHandler::send_result(std::vector TaskHandler::get_result_ids(std::vector results) { +/** + * @brief Get the result ids object + * + * @param results The results data + * @return std::vector list of result ids + */ +std::vector TaskHandler::get_result_ids(std::vector results) { std::vector result_ids; grpc::ClientContext context_client_writer; CreateResultsMetaDataRequest request; CreateResultsMetaDataResponse reply; - request.mutable_results(); + *request.mutable_results() = {results.begin(), results.end()}; request.set_session_id(session_id_); - Status status = stub_->CreateResultsMetaData(&context_client_writer, std::move(request), &reply); + Status status = stub_->CreateResultsMetaData(&context_client_writer, request, &reply); + + auto results_reply = reply.results(); + + for (auto &result_reply : results_reply) { + result_ids.push_back(result_reply.result_id()); + } return result_ids; } From 7e86e911ac513eb189da3a0ad284a750bbef99fe Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 4 Jul 2023 12:03:28 +0200 Subject: [PATCH 07/14] Adding armoniK worker --- .../header/Worker/ArmoniKWorker.h | 37 ++++++++++ .../source/Worker/ArmoniKWorker.cpp | 72 +++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h create mode 100644 packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h new file mode 100644 index 000000000..1e8c6c462 --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -0,0 +1,37 @@ +#include +#include +#include + +#include + +#include "grpcpp/support/sync_stream.h" +#include "objects.pb.h" + +#include "utils/RootConfiguration.h" +#include "utils/WorkerServer.h" +#include "worker_common.grpc.pb.h" +#include "worker_service.grpc.pb.h" + +#include "Worker/TaskHandler.h" + +class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { +private: + armonik::api::common::serilog::serilog logger_; + +public: + /** + * @brief Constructs a ArmoniKWorker object. + */ + ArmoniKWorker() : logger_(armonik::api::common::serilog::logging_format::SEQ) { + logger_.info("Build Service ArmoniKWorker"); + logger_.add_property("class", "ArmoniKWorker"); + logger_.add_property("Worker", "ArmoniK.Api.Cpp"); + } + + grpc::Status Process( + std::unique_ptr agent, + std::unique_ptr> request_iterator); + + grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, + ::armonik::api::grpc::v1::worker::HealthCheckReply *response) override; +}; diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp new file mode 100644 index 000000000..194f7e4e4 --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -0,0 +1,72 @@ +#include +#include +#include + +#include + +#include "grpcpp/support/sync_stream.h" +#include "objects.pb.h" + +#include "utils/RootConfiguration.h" +#include "utils/WorkerServer.h" +#include "worker_common.grpc.pb.h" +#include "worker_service.grpc.pb.h" + +#include "Worker/TaskHandler.h" +#include "Worker/ArmoniKWorker.h" + + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +using armonik::api::common::utils::IConfiguration; +using armonik::api::grpc::v1::TaskOptions; + +using namespace armonik::api::grpc::v1::worker; +using namespace armonik::api::worker; +using namespace armonik::api::common::utils; + +/** +* @brief Implements the Process method of the Worker service. +* +* @param agent The agent object. +* @param request_iterator The request iterator +* +* @return The status of the method. +*/ +Status ArmoniKWorker::Process( + std::unique_ptr agent, + std::unique_ptr> request_iterator) { + + logger_.info("Receive new request From C++ Worker"); + TaskHandler taskHandler(std::move(agent), std::move(request_iterator)); + taskHandler.init(); + std::string key; + std::vector data{'a'}; + auto result = taskHandler.send_result(key, data); + + logger_.info("Finish call C++"); + + return grpc::Status::OK; +} + + +/** +* @brief Implements the HealthCheck method of the Worker service. +* +* @param context The ServerContext object. +* @param request The Empty object. +* @param response The HealthCheckReply object. +* +* @return The status of the method. +*/ +Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, + ::armonik::api::grpc::v1::worker::HealthCheckReply *response) { + // Implementation of the HealthCheck method + logger_.info("HealthCheck request OK"); + + response->set_status(HealthCheckReply_ServingStatus_SERVING); + + return Status::OK; +} From 89dd98ea8681df59dcf8ba2b04747ec53a00d235 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 4 Jul 2023 14:45:13 +0200 Subject: [PATCH 08/14] Restore ci files --- .github/workflows/ci.yml | 4 ++-- .github/workflows/publish-edge.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 403559b12..5d0fa470b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,7 +96,7 @@ jobs: - name: Check Diff id: check-diff run: | - git ls-files *.{c,h,hpp,cpp,cc} | xargs clang-format -style=google -i + git ls-files *.{c,h,hpp,cpp,cc} | xargs clang-format -style=file:.clang-format -i DIFF="$(git diff --name-only)" if [ -z "$DIFF" ]; then @@ -235,7 +235,7 @@ jobs: path: packages/python/coverage.xml build-cpp-packages: - name: Release C++ Packages + name: Build C++ Packages runs-on: ubuntu-latest timeout-minutes: 30 steps: diff --git a/.github/workflows/publish-edge.yml b/.github/workflows/publish-edge.yml index 9556b672a..b5085a430 100644 --- a/.github/workflows/publish-edge.yml +++ b/.github/workflows/publish-edge.yml @@ -230,5 +230,5 @@ jobs: uses: pypa/gh-action-pypi-publish@release/v1 with: password: ${{ secrets.PYPI_API_TOKEN }} - print_hash: true - packages_dir: packages/python/pkg/ + print-hash: true + packages-dir: packages/python/pkg/ From 36a863165ab293b55a72601c4626614320c07859 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 4 Jul 2023 14:46:26 +0200 Subject: [PATCH 09/14] Restore ci files --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5d0fa470b..16d6e58c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,6 +96,7 @@ jobs: - name: Check Diff id: check-diff run: | + cd packages/cpp git ls-files *.{c,h,hpp,cpp,cc} | xargs clang-format -style=file:.clang-format -i DIFF="$(git diff --name-only)" From e52386b04a92199b60d9df239b48c649964bde55 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 4 Jul 2023 14:47:52 +0200 Subject: [PATCH 10/14] Restore ci files --- .github/workflows/ci.yml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 16d6e58c3..e467b028b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -249,8 +249,3 @@ jobs: run: | cd packages/cpp/tools/ ./compile.sh - - - name: Run the client - run: | - cd ../build/Armonik.Api.Client.Tests - ./Armonik.Api.Client.Tests From 8fac1010d380b0f8a50eeb407f735cb25323f902 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 4 Jul 2023 16:58:03 +0200 Subject: [PATCH 11/14] Apply format patch --- .../header/Worker/ArmoniKWorker.h | 6 +-- .../header/Worker/TaskHandler.h | 19 ++++----- .../source/Worker/ArmoniKWorker.cpp | 41 +++++++++---------- .../source/Worker/TaskHandler.cpp | 33 +++++++-------- 4 files changed, 46 insertions(+), 53 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index 1e8c6c462..39d013dc7 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -28,9 +28,9 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Servi logger_.add_property("Worker", "ArmoniK.Api.Cpp"); } - grpc::Status Process( - std::unique_ptr agent, - std::unique_ptr> request_iterator); + grpc::Status + Process(std::unique_ptr agent, + std::unique_ptr> request_iterator); grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, ::armonik::api::grpc::v1::worker::HealthCheckReply *response) override; diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index feb74a1c0..2615ebd62 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -27,11 +27,10 @@ class TaskHandler { std::string token_; armonik::api::grpc::v1::Configuration config_; - public: /** * @brief Construct a new Task Handler object - * + * * @param client the agent client * @param request_iterator The request iterator */ @@ -40,28 +39,28 @@ class TaskHandler { /** * @brief Initialise the task handler - * + * */ void init(); /** * @brief Create a task_chunk_stream. - * + * * @param task_request a task request * @param is_last A boolean indicating if this is the last request. * @param chunk_max_size Maximum chunk size. - * @return std::future> + * @return std::future> */ static std::future> task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, size_t chunk_max_size); /** * @brief Convert task_requests to request_stream. - * + * * @param task_requests List of task requests * @param task_options The Task Options used for this batch of tasks * @param chunk_max_size Maximum chunk size. - * @return std::vector>> + * @return std::vector>> */ static auto to_request_stream(const std::vector &task_requests, armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size) @@ -84,12 +83,12 @@ class TaskHandler { * @param data The result data * @return A future containing a vector of ResultReply */ - std::future> - send_result(std::string key, std::vector &data); + std::future> send_result(std::string key, + std::vector &data); /** * @brief Get the result ids object - * + * * @param results The results data * @return std::vector list of result ids */ diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 194f7e4e4..ef897bd05 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -12,9 +12,8 @@ #include "worker_common.grpc.pb.h" #include "worker_service.grpc.pb.h" -#include "Worker/TaskHandler.h" #include "Worker/ArmoniKWorker.h" - +#include "Worker/TaskHandler.h" using grpc::Channel; using grpc::ClientContext; @@ -28,16 +27,15 @@ using namespace armonik::api::worker; using namespace armonik::api::common::utils; /** -* @brief Implements the Process method of the Worker service. -* -* @param agent The agent object. -* @param request_iterator The request iterator -* -* @return The status of the method. -*/ -Status ArmoniKWorker::Process( - std::unique_ptr agent, - std::unique_ptr> request_iterator) { + * @brief Implements the Process method of the Worker service. + * + * @param agent The agent object. + * @param request_iterator The request iterator + * + * @return The status of the method. + */ +Status ArmoniKWorker::Process(std::unique_ptr agent, + std::unique_ptr> request_iterator) { logger_.info("Receive new request From C++ Worker"); TaskHandler taskHandler(std::move(agent), std::move(request_iterator)); @@ -51,18 +49,17 @@ Status ArmoniKWorker::Process( return grpc::Status::OK; } - /** -* @brief Implements the HealthCheck method of the Worker service. -* -* @param context The ServerContext object. -* @param request The Empty object. -* @param response The HealthCheckReply object. -* -* @return The status of the method. -*/ + * @brief Implements the HealthCheck method of the Worker service. + * + * @param context The ServerContext object. + * @param request The Empty object. + * @param response The HealthCheckReply object. + * + * @return The status of the method. + */ Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, - ::armonik::api::grpc::v1::worker::HealthCheckReply *response) { + ::armonik::api::grpc::v1::worker::HealthCheckReply *response) { // Implementation of the HealthCheck method logger_.info("HealthCheck request OK"); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index 6a0650d2f..b5786c422 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -1,8 +1,8 @@ #include "Worker/TaskHandler.h" +#include #include #include -#include #include "agent_common.pb.h" #include "agent_service.grpc.pb.h" @@ -25,7 +25,7 @@ using namespace armonik::api::grpc::v1::agent; /** * @brief Construct a new Task Handler object - * + * * @param client the agent client * @param request_iterator The request iterator */ @@ -37,7 +37,7 @@ TaskHandler::TaskHandler(std::unique_ptr client, /** * @brief Initialise the task handler - * + * */ void TaskHandler::init() { ProcessRequest Request; @@ -89,7 +89,7 @@ void TaskHandler::init() { auto payload_data = new std::byte[size]; int address = 0; - for (auto iter = chuncks.begin(); iter != chuncks.end(); iter++){ + for (auto iter = chuncks.begin(); iter != chuncks.end(); iter++) { std::string temp_str = *iter; for (size_t i = 0; i < temp_str.length(); i++) { @@ -153,7 +153,7 @@ void TaskHandler::init() { } } - data_dependencies.push_back(reinterpret_cast(data)); + data_dependencies.push_back(reinterpret_cast(data)); } } while (!init_data.key().empty()); @@ -161,14 +161,14 @@ void TaskHandler::init() { /** * @brief Create a task_chunk_stream. - * + * * @param task_request a task request * @param is_last A boolean indicating if this is the last request. * @param chunk_max_size Maximum chunk size. - * @return std::future> + * @return std::future> */ -std::future> TaskHandler::task_chunk_stream(TaskRequest task_request, - bool is_last, size_t chunk_max_size) { +std::future> TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, + size_t chunk_max_size) { return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() { std::vector requests; armonik::api::grpc::v1::InitTaskRequest header_task_request; @@ -235,18 +235,18 @@ std::future> TaskHandler::task_chunk_stream(TaskR /** * @brief Convert task_requests to request_stream. - * + * * @param task_requests List of task requests * @param task_options The Task Options used for this batch of tasks * @param chunk_max_size Maximum chunk size. - * @return std::vector>> + * @return std::vector>> */ std::vector>> TaskHandler::to_request_stream(const std::vector &task_requests, TaskOptions task_options, const size_t chunk_max_size) { std::vector>> async_chunk_payload_tasks; - async_chunk_payload_tasks.push_back(std::async([task_options = std::move(task_options)]()mutable { + async_chunk_payload_tasks.push_back(std::async([task_options = std::move(task_options)]() mutable { CreateTaskRequest_InitRequest create_task_request_init; *create_task_request_init.mutable_task_options() = std::move(task_options); @@ -273,11 +273,9 @@ TaskHandler::to_request_stream(const std::vector &task_requests, Ta */ std::future TaskHandler::create_tasks_async(TaskOptions task_options, const std::vector &task_requests) { - return std::async(std::launch::async, [this, &task_requests, &task_options]()mutable { - + return std::async(std::launch::async, [this, &task_requests, &task_options]() mutable { size_t chunk = config_.data_chunk_max_size(); - CreateTaskReply reply{}; reply.set_allocated_creation_status_list(new armonik::api::grpc::v1::agent::CreateTaskReply_CreationStatusList()); @@ -339,7 +337,7 @@ std::future> TaskHandler::send_result(std::string key, std::string data_str; int count = 0; for (int i = start; i < start + chunck; i++) { - data_str[count++] = static_cast(data[i]); + data_str[count++] = static_cast(data[i]); } *result_msg.mutable_data() = std::move(data_str); @@ -372,12 +370,11 @@ std::future> TaskHandler::send_result(std::string key, delete reply; return result; }); - } /** * @brief Get the result ids object - * + * * @param results The results data * @return std::vector list of result ids */ From 25b81a7b02024e35f56abbfbce9ecd81363a15f3 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 11 Jul 2023 19:00:49 +0200 Subject: [PATCH 12/14] Refactor worker --- .../ArmoniK.Api.Worker.Tests/source/main.cpp | 67 +---------- .../header/Worker/ArmoniKWorker.h | 33 ++++-- .../header/Worker/TaskHandler.h | 6 +- .../header/utils/WorkerServer.h | 2 +- .../source/Worker/ArmoniKWorker.cpp | 45 +++++-- .../source/Worker/TaskHandler.cpp | 112 +++++++----------- 6 files changed, 114 insertions(+), 151 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index a19c39f99..b6e8d093e 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -12,6 +12,9 @@ #include "worker_common.grpc.pb.h" #include "worker_service.grpc.pb.h" +#include "Worker/ArmoniKWorker.h" +#include "Worker/TaskHandler.h" + using grpc::Channel; using grpc::ClientContext; using grpc::Status; @@ -23,68 +26,6 @@ using namespace armonik::api::grpc::v1::worker; using namespace armonik::api::worker; using namespace armonik::api::common::utils; -/** - * @brief Implements the Worker service. - */ -class WorkerServiceImpl final : public Worker::Service { -private: - armonik::api::common::serilog::serilog logger; - -public: - /** - * @brief Constructs a WorkerServiceImpl object. - */ - WorkerServiceImpl() : logger(armonik::api::common::serilog::logging_format::SEQ) { - logger.info("Build Service WorkerServiceImpl"); - logger.add_property("class", "WorkerServiceImpl"); - logger.add_property("Worker", "ArmoniK.Api.Cpp"); - } - - /** - * @brief Implements the Process method of the Worker service. - * - * @param context The ServerContext object. - * @param reader The ServerReader object. - * @param response The ProcessReply object. - * - * @return The status of the method. - */ - Status Process(::grpc::ServerContext *context, - ::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, - ::armonik::api::grpc::v1::worker::ProcessReply *response) override { - // Implementation of the Process method - logger.info("Receive new request From C++ Worker"); - auto output = armonik::api::grpc::v1::Output(); - *output.mutable_ok() = armonik::api::grpc::v1::Empty(); - ProcessRequest req; - reader->Read(&req); - *response->mutable_output() = output; - - logger.info("Finish call C++"); - - return grpc::Status::OK; - } - - /** - * @brief Implements the HealthCheck method of the Worker service. - * - * @param context The ServerContext object. - * @param request The Empty object. - * @param response The HealthCheckReply object. - * - * @return The status of the method. - */ - Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, - ::armonik::api::grpc::v1::worker::HealthCheckReply *response) override { - // Implementation of the HealthCheck method - logger.info("HealthCheck request OK"); - - response->set_status(HealthCheckReply_ServingStatus_SERVING); - - return Status::OK; - } -}; - int main(int argc, char **argv) { std::cout << "Starting C++ worker..." << std::endl; @@ -94,7 +35,7 @@ int main(int argc, char **argv) { config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); config->get_compute_plane(); - WorkerServer::create(config)->run(); + WorkerServer::create(config)->run(); std::cout << "Stooping Server..." << std::endl; return 0; diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index 39d013dc7..efbced2dd 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -17,21 +17,38 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { private: armonik::api::common::serilog::serilog logger_; + std::unique_ptr agent_; + void (*processing_function_)(TaskHandler taskHandler); public: /** * @brief Constructs a ArmoniKWorker object. */ - ArmoniKWorker() : logger_(armonik::api::common::serilog::logging_format::SEQ) { - logger_.info("Build Service ArmoniKWorker"); - logger_.add_property("class", "ArmoniKWorker"); - logger_.add_property("Worker", "ArmoniK.Api.Cpp"); - } + ArmoniKWorker(std::unique_ptr agent, + void (*processing_function)(TaskHandler task_handler)); - grpc::Status - Process(std::unique_ptr agent, - std::unique_ptr> request_iterator); + /** + * @brief Implements the Process method of the Worker service. + * + * @param context The ServerContext object. + * @param reader The request iterator + * @param response The ProcessReply object. + * + * @return The status of the method. + */ + grpc::Status Process(::grpc::ServerContext *context, + ::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, + ::armonik::api::grpc::v1::worker::ProcessReply *response) override; + /** + * @brief Implements the HealthCheck method of the Worker service. + * + * @param context The ServerContext object. + * @param request The Empty object. + * @param response The HealthCheckReply object. + * + * @return The status of the method. + */ grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, ::armonik::api::grpc::v1::worker::HealthCheckReply *response) override; }; diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 2615ebd62..008e793e8 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -19,11 +19,13 @@ class TaskHandler { private: grpc::ClientContext context_; std::unique_ptr stub_; - std::unique_ptr> request_iterator_; + std::shared_ptr> request_iterator_; std::string session_id_; std::string task_id_; armonik::api::grpc::v1::TaskOptions task_options_; google::protobuf::RepeatedPtrField expected_result_; + std::vector payload_; + std::vector data_dependencies_; std::string token_; armonik::api::grpc::v1::Configuration config_; @@ -35,7 +37,7 @@ class TaskHandler { * @param request_iterator The request iterator */ TaskHandler(std::unique_ptr client, - std::unique_ptr> request_iterator); + std::shared_ptr> request_iterator); /** * @brief Initialise the task handler diff --git a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h index d0572b485..5d25b84ab 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/utils/WorkerServer.h @@ -75,7 +75,7 @@ class WorkerServer { // Create a stub for the Submitter service worker_server->agent_stub = Agent::NewStub(worker_server->channel); - worker_server->builder_.RegisterService(new Worker()); + worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub), nullptr)); worker_server->logger.info("Finish to register new worker"); return worker_server; diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index ef897bd05..0406e0f11 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -26,23 +26,46 @@ using namespace armonik::api::grpc::v1::worker; using namespace armonik::api::worker; using namespace armonik::api::common::utils; +/** + * @brief Constructs a ArmoniKWorker object. + */ +ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, + void (*processing_function)(TaskHandler task_handler)) + : logger_(armonik::api::common::serilog::logging_format::SEQ) { + logger_.info("Build Service ArmoniKWorker"); + logger_.add_property("class", "ArmoniKWorker"); + logger_.add_property("Worker", "ArmoniK.Api.Cpp"); + agent_ = std::move(agent); + processing_function_ = processing_function; +} + /** * @brief Implements the Process method of the Worker service. * - * @param agent The agent object. - * @param request_iterator The request iterator + * @param context The ServerContext object. + * @param reader The request iterator + * @param response The ProcessReply object. * * @return The status of the method. */ -Status ArmoniKWorker::Process(std::unique_ptr agent, - std::unique_ptr> request_iterator) { - - logger_.info("Receive new request From C++ Worker"); - TaskHandler taskHandler(std::move(agent), std::move(request_iterator)); - taskHandler.init(); - std::string key; - std::vector data{'a'}; - auto result = taskHandler.send_result(key, data); +Status ArmoniKWorker::Process(::grpc::ServerContext *context, + ::grpc::ServerReader *reader, + ::armonik::api::grpc::v1::worker::ProcessReply *response) { + + logger_.info("Receive new request From C++ real Worker"); + + auto output = armonik::api::grpc::v1::Output(); + *output.mutable_ok() = armonik::api::grpc::v1::Empty(); + // ProcessRequest req; + // reader->Read(&req); + *response->mutable_output() = output; + + std::shared_ptr> request_iterator = + std::make_shared>(*reader); + + TaskHandler task_handler(std::move(agent_), request_iterator); + + task_handler.init(); logger_.info("Finish call C++"); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index b5786c422..13e6d6d94 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -30,7 +30,7 @@ using namespace armonik::api::grpc::v1::agent; * @param request_iterator The request iterator */ TaskHandler::TaskHandler(std::unique_ptr client, - std::unique_ptr> request_iterator) { + std::shared_ptr> request_iterator) { stub_ = std::move(client); request_iterator_ = std::move(request_iterator); } @@ -41,9 +41,8 @@ TaskHandler::TaskHandler(std::unique_ptr client, */ void TaskHandler::init() { ProcessRequest Request; - std::unique_ptr> request_iterator; - request_iterator->Read(&Request); - if (!request_iterator->Read(&Request)) { + // bool status = request_iterator_->Read(&Request); + if (!request_iterator_->Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } @@ -58,56 +57,41 @@ void TaskHandler::init() { token_ = Request.communication_token(); config_ = init_request.configuration(); - std::vector payload; - if (init_request.payload().data_complete()) { - std::string temp = init_request.payload().data(); - payload.resize(temp.length()); - for (size_t i = 0; i < temp.length(); i++) { - payload[i] = static_cast(temp[i]); - } + std::vector chunks; + auto datachunk = init_request.payload(); - } else { - std::vector chuncks; - auto datachunck = init_request.payload(); + chunks.push_back(datachunk.data()); - chuncks.push_back(datachunck.data()); + while (!datachunk.data_complete()) { + if (!request_iterator_->Read(&Request)) { + throw std::runtime_error("Request stream ended unexpectedly."); + } + if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kPayload) { + throw std::runtime_error("Expected a Compute request type with Payload to continue the stream."); + } - while (!datachunck.data_complete()) { - if (request_iterator->Read(&Request)) { - throw std::runtime_error("Request stream ended unexpectedly."); + datachunk = Request.compute().payload(); + if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kData) { + payload_.reserve(payload_.size() + datachunk.data().size()); + for(auto c : datachunk.data()){ + payload_.push_back(std::byte(c)); } - if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kPayload) { - throw std::runtime_error("Expected a Compute request type with Payload to continue the stream."); - } - - datachunck = Request.compute().payload(); - - chuncks.push_back(datachunck.data()); } - size_t size = chuncks.size(); - - auto payload_data = new std::byte[size]; - int address = 0; - for (auto iter = chuncks.begin(); iter != chuncks.end(); iter++) { - std::string temp_str = *iter; + if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); + } - for (size_t i = 0; i < temp_str.length(); i++) { - payload_data[i + address] = static_cast(temp_str[i]); - } - address += temp_str.length(); + if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + break; } - payload.resize(size); - std::copy_n(payload_data, size, std::back_inserter(payload)); } - - std::vector data_dependencies; - + armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData init_data; do { - if (request_iterator->Read(&Request)) { + if (!request_iterator_->Read(&Request)) { throw std::runtime_error("Request stream ended unexpectedly."); } if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kInitData) { @@ -116,44 +100,40 @@ void TaskHandler::init() { init_data = Request.compute().init_data(); if (!init_data.key().empty()) { - std::vector chuncks; + std::vector chunks_dep; + ProcessRequest dep_request; while (true) { - if (request_iterator->Read(&Request)) { + if (!request_iterator_->Read(&dep_request)) { throw std::runtime_error("Request stream ended unexpectedly."); } - if (Request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kData) { + if (dep_request.compute().type_case() != armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::kData) { throw std::runtime_error("Expected a Compute request type with Data to continue the stream."); } - auto datachunck = Request.compute().data(); - if (datachunck.type_case() == armonik::api::grpc::v1::DataChunk::kData) { - chuncks.push_back(datachunck.data()); + auto datachunk = dep_request.compute().data(); + if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kData) { + data_dependencies_.reserve(data_dependencies_.size() + datachunk.data().size()); + for(auto c : datachunk.data()){ + data_dependencies_.push_back(std::byte(c)); + } } - if (datachunck.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { + if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET) { throw std::runtime_error("Expected a Compute request type with a DataChunk Payload to continue the stream."); } - if (datachunck.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { + if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { break; } } - size_t size = chuncks.size(); - - auto data = new int8_t[size]; - - size_t start = 0; - - for (size_t count = 0; count <= size; count++) { - std::string temp_str = chuncks[count]; - - for (size_t i = 0; i < temp_str.length(); i++) { - data[i + count * size] = temp_str[i]; + for (auto&& chunk : chunks_dep) { + data_dependencies_.reserve(data_dependencies_.size()+chunk.size()); + for(auto c : chunk){ + data_dependencies_.push_back(std::byte(c)); } } - data_dependencies.push_back(reinterpret_cast(data)); } } while (!init_data.key().empty()); @@ -318,8 +298,8 @@ std::future> TaskHandler::send_result(std::string key, auto reply = new ResultReply; - size_t chunck; - size_t max_chunck = config_.data_chunk_max_size(); + size_t chunk; + size_t max_chunk = config_.data_chunk_max_size(); const size_t data_size = data.size(); size_t start = 0; @@ -329,14 +309,14 @@ std::future> TaskHandler::send_result(std::string key, init_msg.mutable_init()->set_key(key); while (start < data_size) { - chunck = std::min(max_chunck, data_size - start); + chunk = std::min(max_chunk, data_size - start); Result msg; armonik::api::grpc::v1::DataChunk result_msg; std::string data_str; int count = 0; - for (int i = start; i < start + chunck; i++) { + for (int i = start; i < start + chunk; i++) { data_str[count++] = static_cast(data[i]); } @@ -346,7 +326,7 @@ std::future> TaskHandler::send_result(std::string key, stream->Write(msg); - start += chunck; + start += chunk; } armonik::api::grpc::v1::DataChunk result_completed; From f2d698653a42ee5c88683fa225c8f3ad7b113150 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Tue, 11 Jul 2023 19:10:27 +0200 Subject: [PATCH 13/14] Apply format --- .../header/Worker/ArmoniKWorker.h | 4 ++-- .../source/Worker/ArmoniKWorker.cpp | 11 +++++------ .../source/Worker/TaskHandler.cpp | 14 ++++++-------- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index efbced2dd..f2fbe8c14 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -37,8 +37,8 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Servi * @return The status of the method. */ grpc::Status Process(::grpc::ServerContext *context, - ::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, - ::armonik::api::grpc::v1::worker::ProcessReply *response) override; + ::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader, + ::armonik::api::grpc::v1::worker::ProcessReply *response) override; /** * @brief Implements the HealthCheck method of the Worker service. diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index 0406e0f11..d0d6254c7 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -30,8 +30,8 @@ using namespace armonik::api::common::utils; * @brief Constructs a ArmoniKWorker object. */ ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, - void (*processing_function)(TaskHandler task_handler)) - : logger_(armonik::api::common::serilog::logging_format::SEQ) { + void (*processing_function)(TaskHandler task_handler)) + : logger_(armonik::api::common::serilog::logging_format::SEQ) { logger_.info("Build Service ArmoniKWorker"); logger_.add_property("class", "ArmoniKWorker"); logger_.add_property("Worker", "ArmoniK.Api.Cpp"); @@ -48,9 +48,8 @@ ArmoniKWorker::ArmoniKWorker(std::unique_ptr *reader, - ::armonik::api::grpc::v1::worker::ProcessReply *response) { +Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader *reader, + ::armonik::api::grpc::v1::worker::ProcessReply *response) { logger_.info("Receive new request From C++ real Worker"); @@ -65,7 +64,7 @@ Status ArmoniKWorker::Process(::grpc::ServerContext *context, TaskHandler task_handler(std::move(agent_), request_iterator); - task_handler.init(); + task_handler.init(); logger_.info("Finish call C++"); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index 13e6d6d94..a1fb2e0d6 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -73,7 +73,7 @@ void TaskHandler::init() { datachunk = Request.compute().payload(); if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kData) { payload_.reserve(payload_.size() + datachunk.data().size()); - for(auto c : datachunk.data()){ + for (auto c : datachunk.data()) { payload_.push_back(std::byte(c)); } } @@ -85,9 +85,8 @@ void TaskHandler::init() { if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kDataComplete) { break; } - } - + armonik::api::grpc::v1::worker::ProcessRequest_ComputeRequest::InitData init_data; do { @@ -113,7 +112,7 @@ void TaskHandler::init() { auto datachunk = dep_request.compute().data(); if (datachunk.type_case() == armonik::api::grpc::v1::DataChunk::kData) { data_dependencies_.reserve(data_dependencies_.size() + datachunk.data().size()); - for(auto c : datachunk.data()){ + for (auto c : datachunk.data()) { data_dependencies_.push_back(std::byte(c)); } } @@ -127,13 +126,12 @@ void TaskHandler::init() { } } - for (auto&& chunk : chunks_dep) { - data_dependencies_.reserve(data_dependencies_.size()+chunk.size()); - for(auto c : chunk){ + for (auto &&chunk : chunks_dep) { + data_dependencies_.reserve(data_dependencies_.size() + chunk.size()); + for (auto c : chunk) { data_dependencies_.push_back(std::byte(c)); } } - } } while (!init_data.key().empty()); From 7d892ba25318e206942c7fd875aae6bbcbce4746 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Wed, 12 Jul 2023 12:23:08 +0200 Subject: [PATCH 14/14] Fix includes --- packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h | 2 +- packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index f2fbe8c14..6dd8766f5 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -9,7 +9,7 @@ #include "utils/RootConfiguration.h" #include "utils/WorkerServer.h" -#include "worker_common.grpc.pb.h" +#include "worker_common.pb.h" #include "worker_service.grpc.pb.h" #include "Worker/TaskHandler.h" diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index d0d6254c7..ccd658ff9 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -9,7 +9,7 @@ #include "utils/RootConfiguration.h" #include "utils/WorkerServer.h" -#include "worker_common.grpc.pb.h" +#include "worker_common.pb.h" #include "worker_service.grpc.pb.h" #include "Worker/ArmoniKWorker.h"