From 835148d3d8bb5839cb942692751d82c667433ead Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Thu, 13 Jul 2023 15:52:00 +0200 Subject: [PATCH 1/2] Add namespaces to client and worker --- packages/cpp/ArmoniK.Api.Client/CMakeLists.txt | 2 ++ .../header/submitter/SubmitterClient.h | 4 ++++ .../source/submitter/SubmitterClient.cpp | 14 +++++++------- packages/cpp/ArmoniK.Api.Tests/CMakeLists.txt | 3 +++ .../source/SubmitterCLientTest.cpp | 12 ++++++------ .../cpp/ArmoniK.Api.Worker.Tests/CMakeLists.txt | 2 ++ .../cpp/ArmoniK.Api.Worker.Tests/source/main.cpp | 4 ++-- packages/cpp/ArmoniK.Api.Worker/CMakeLists.txt | 2 ++ .../header/Worker/ArmoniKWorker.h | 4 ++++ .../ArmoniK.Api.Worker/header/Worker/TaskHandler.h | 9 ++++++--- .../source/Worker/ArmoniKWorker.cpp | 6 +++--- .../source/Worker/TaskHandler.cpp | 14 +++++++------- 12 files changed, 48 insertions(+), 28 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Client/CMakeLists.txt b/packages/cpp/ArmoniK.Api.Client/CMakeLists.txt index bb053d743..d88f0dcc0 100644 --- a/packages/cpp/ArmoniK.Api.Client/CMakeLists.txt +++ b/packages/cpp/ArmoniK.Api.Client/CMakeLists.txt @@ -75,6 +75,8 @@ list(APPEND PROTO_GENERATED_FILES ${PROTO_GENERATED_MESSAGES}) target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure ArmoniK.Api.Common) +target_compile_definitions(${PROJECT_NAME} PUBLIC API_CLIENT_NAMESPACE=${NAMESPACE}) + target_include_directories(${PROJECT_NAME} PUBLIC "$" diff --git a/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h b/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h index 2226516ea..06f216baf 100644 --- a/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h +++ b/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h @@ -10,6 +10,8 @@ #include "submitter_common.pb.h" #include "submitter_service.grpc.pb.h" +namespace API_CLIENT_NAMESPACE{ + /** * @brief Data structure for task payload * @param keys The expected output keys @@ -98,3 +100,5 @@ class SubmitterClient { */ std::future> get_result_async(const armonik::api::grpc::v1::ResultRequest &result_request); }; + +} // namespace API_CLIENT_NAMESPACE \ No newline at end of file diff --git a/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp index dabaad6b2..269d1effb 100644 --- a/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp +++ b/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp @@ -28,14 +28,14 @@ using namespace armonik::api::grpc::v1::submitter; * * @param stub the gRPC client stub */ -SubmitterClient::SubmitterClient(std::unique_ptr stub) { stub_ = std::move(stub); } +API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr stub) { stub_ = std::move(stub); } /** * @brief Create a new session. * @param partition_ids The partitions ids. * @param default_task_options The default task options. */ -std::string SubmitterClient::create_session(TaskOptions default_task_options, +std::string API_CLIENT_NAMESPACE::SubmitterClient::create_session(TaskOptions default_task_options, const std::vector &partition_ids = {}) { CreateSessionRequest request; *request.mutable_default_task_option() = std::move(default_task_options); @@ -65,7 +65,7 @@ std::string SubmitterClient::create_session(TaskOptions default_task_options, * @return A vector of futures containing CreateLargeTaskRequest objects. */ std::vector>> -SubmitterClient::to_request_stream(const std::vector &task_requests, std::string session_id, +API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector &task_requests, std::string session_id, TaskOptions task_options, const size_t chunk_max_size) { std::vector>> async_chunk_payload_tasks; async_chunk_payload_tasks.push_back( @@ -98,7 +98,7 @@ SubmitterClient::to_request_stream(const std::vector &task_requests * @return A future containing a vector of CreateLargeTaskRequest objects. */ std::future> -SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last, size_t chunk_max_size) { +API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last, size_t chunk_max_size) { return std::async(std::launch::async, [&task_request, chunk_max_size, is_last]() { std::vector requests; armonik::api::grpc::v1::InitTaskRequest header_task_request; @@ -172,7 +172,7 @@ SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last * @param task_requests A vector of TaskRequest objects. * @return A future containing a CreateTaskReply object. */ -std::future SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options, +std::future API_CLIENT_NAMESPACE::SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options, const std::vector &task_requests) { return std::async(std::launch::async, [this, task_requests, session_id = std::move(session_id), task_options = std::move(task_options)]() mutable { @@ -230,7 +230,7 @@ std::future SubmitterClient::create_tasks_async(std::string ses * @return A vector of task IDs. */ std::tuple, std::vector> -SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOptions task_options, +API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOptions task_options, const std::vector &payloads_with_dependencies, int max_retries = 5) { std::vector task_ids; @@ -283,7 +283,7 @@ SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOpti * @param result_request A vector of ResultRequest objects. * @return A future containing data result. */ -std::future> SubmitterClient::get_result_async(const ResultRequest &result_request) { +std::future> API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) { return std::async(std::launch::async, [this, &result_request]() { ResultReply result_writer; ClientContext context_configuration; diff --git a/packages/cpp/ArmoniK.Api.Tests/CMakeLists.txt b/packages/cpp/ArmoniK.Api.Tests/CMakeLists.txt index e619938e7..8fc5d39d4 100644 --- a/packages/cpp/ArmoniK.Api.Tests/CMakeLists.txt +++ b/packages/cpp/ArmoniK.Api.Tests/CMakeLists.txt @@ -22,6 +22,8 @@ target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_ set(PROTO_BINARY_DIR "${BUILD_DIR}/${PROJECT_NAME}/") set(PROTO_IMPORT_DIRS "${PROTO_FILES_DIR}") +target_compile_definitions(${PROJECT_NAME} PUBLIC API_TEST_NAMESPACE=${NAMESPACE}) + target_include_directories(${PROJECT_NAME} PUBLIC "$" @@ -40,6 +42,7 @@ include(FetchContent) FetchContent_Declare( googletest URL https://github.com/google/googletest/archive/03597a01ee50ed33e9dfd640b249b4be3799d395.zip + DOWNLOAD_EXTRACT_TIMESTAMP TRUE ) # For Windows: Prevent overriding the parent project's compiler/linker settings set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp index 0872d3999..4f6c09cf8 100644 --- a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp +++ b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp @@ -84,7 +84,7 @@ TEST(testMock, createSession) { std::unique_ptr stub = Submitter::NewStub(channel); // EXPECT_CALL(*stub, CreateSession(_, _, _)).Times(AtLeast(1)); - SubmitterClient submitter(std::move(stub)); + ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub)); std::string session_id = submitter.create_session(task_options, partition_ids); std::cout << "create_session response: " << session_id << std::endl; @@ -102,7 +102,7 @@ TEST(testMock, submitTask) { log.enrich([&](serilog_context &ctx) { ctx.add("fieldTestValue", 1); }); log.add_property("time", time(nullptr)); - ::putenv("GRPC_DNS_RESOLVER=native"); + ::putenv((char*)"GRPC_DNS_RESOLVER=native"); std::cout << "Starting client..." << std::endl; @@ -125,17 +125,17 @@ TEST(testMock, submitTask) { CreateSessionReply reply; grpc::ClientContext context; - SubmitterClient submitter(std::move(stub)); + ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub)); const std::vector &partition_ids = {"cpp"}; std::string session_id = submitter.create_session(task_options, partition_ids); ASSERT_FALSE(session_id.empty()); try { - std::vector payloads; + std::vector payloads; for (int i = 0; i < 10; i++) { - payload_data data; + ArmoniK::Api::Client::payload_data data; data.keys = armonik::api::common::utils::GuuId::generate_uuid(); data.payload = {'a', 'r', 'm', 'o', 'n', 'i', 'k'}; data.dependencies = {}; @@ -177,7 +177,7 @@ TEST(testMock, getResult) { // EXPECT_CALL(*stub, TryGetResultStreamRaw(_, _)).Times(AtLeast(1)); std::unique_ptr stub = Submitter::NewStub(channel); - SubmitterClient submitter(std::move(stub)); + ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub)); auto result = submitter.get_result_async(result_request); diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/CMakeLists.txt b/packages/cpp/ArmoniK.Api.Worker.Tests/CMakeLists.txt index 2847414b5..9409962b9 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/CMakeLists.txt +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/CMakeLists.txt @@ -23,6 +23,8 @@ target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_ set(PROTO_BINARY_DIR "${BUILD_DIR}/${PROJECT_NAME}/") set(PROTO_IMPORT_DIRS "${PROTO_FILES_DIR}") +target_compile_definitions(${PROJECT_NAME} PUBLIC API_WORKER_TEST_NAMESPACE=${NAMESPACE}) + target_include_directories(${PROJECT_NAME} PUBLIC "$" diff --git a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp index b6e8d093e..d810b921a 100644 --- a/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp +++ b/packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp @@ -9,7 +9,7 @@ #include "utils/RootConfiguration.h" #include "utils/WorkerServer.h" -#include "worker_common.grpc.pb.h" +#include "worker_common.pb.h" #include "worker_service.grpc.pb.h" #include "Worker/ArmoniKWorker.h" @@ -35,7 +35,7 @@ int main(int argc, char **argv) { config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock"); config->get_compute_plane(); - WorkerServer::create(config)->run(); + WorkerServer::create(config)->run(); std::cout << "Stooping Server..." << std::endl; return 0; diff --git a/packages/cpp/ArmoniK.Api.Worker/CMakeLists.txt b/packages/cpp/ArmoniK.Api.Worker/CMakeLists.txt index 06dc4d720..dc22c5498 100644 --- a/packages/cpp/ArmoniK.Api.Worker/CMakeLists.txt +++ b/packages/cpp/ArmoniK.Api.Worker/CMakeLists.txt @@ -33,6 +33,8 @@ add_library(${PROJECT_NAME} STATIC ${PROTO_GENERATED_FILES} ${SRC_CLIENT_FILES} target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure ArmoniK.Api.Common ${PROTO_TARGET}) +target_compile_definitions(${PROJECT_NAME} PUBLIC API_WORKER_NAMESPACE=${NAMESPACE}) + target_include_directories(${PROJECT_NAME} PUBLIC "$" diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index 6dd8766f5..bf8f9aa1e 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -14,6 +14,8 @@ #include "Worker/TaskHandler.h" +namespace API_WORKER_NAMESPACE{ + class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { private: armonik::api::common::serilog::serilog logger_; @@ -52,3 +54,5 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Servi grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, ::armonik::api::grpc::v1::worker::HealthCheckReply *response) override; }; + +} // namespace API_WORKER_NAMESPACE \ No newline at end of file diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 008e793e8..6dba67631 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -8,6 +8,8 @@ #include "worker_common.pb.h" #include "worker_service.grpc.pb.h" +namespace API_WORKER_NAMESPACE{ + // #include "SessionContext.h" /** @@ -64,9 +66,8 @@ class TaskHandler { * @param chunk_max_size Maximum chunk size. * @return std::vector>> */ - static auto to_request_stream(const std::vector &task_requests, - armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size) - -> std::vector>>; + static std::vector>> to_request_stream(const std::vector &task_requests, + armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size); /** * @brief Create a tasks async object @@ -97,3 +98,5 @@ class TaskHandler { std::vector get_result_ids(std::vector results); }; + +} // namespace API_WORKER_NAMESPACE diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index ccd658ff9..adf96dfa2 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -29,7 +29,7 @@ using namespace armonik::api::common::utils; /** * @brief Constructs a ArmoniKWorker object. */ -ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, +API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, void (*processing_function)(TaskHandler task_handler)) : logger_(armonik::api::common::serilog::logging_format::SEQ) { logger_.info("Build Service ArmoniKWorker"); @@ -48,7 +48,7 @@ ArmoniKWorker::ArmoniKWorker(std::unique_ptr *reader, +Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader *reader, ::armonik::api::grpc::v1::worker::ProcessReply *response) { logger_.info("Receive new request From C++ real Worker"); @@ -80,7 +80,7 @@ Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerRead * * @return The status of the method. */ -Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, +Status API_WORKER_NAMESPACE::ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, ::armonik::api::grpc::v1::worker::HealthCheckReply *response) { // Implementation of the HealthCheck method logger_.info("HealthCheck request OK"); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index a1fb2e0d6..1b7c49d86 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -29,7 +29,7 @@ using namespace armonik::api::grpc::v1::agent; * @param client the agent client * @param request_iterator The request iterator */ -TaskHandler::TaskHandler(std::unique_ptr client, +API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::unique_ptr client, std::shared_ptr> request_iterator) { stub_ = std::move(client); request_iterator_ = std::move(request_iterator); @@ -39,7 +39,7 @@ TaskHandler::TaskHandler(std::unique_ptr client, * @brief Initialise the task handler * */ -void TaskHandler::init() { +void API_WORKER_NAMESPACE::TaskHandler::init() { ProcessRequest Request; // bool status = request_iterator_->Read(&Request); if (!request_iterator_->Read(&Request)) { @@ -145,7 +145,7 @@ void TaskHandler::init() { * @param chunk_max_size Maximum chunk size. * @return std::future> */ -std::future> TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, +std::future> API_WORKER_NAMESPACE::TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, size_t chunk_max_size) { return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() { std::vector requests; @@ -220,7 +220,7 @@ std::future> TaskHandler::task_chunk_stream(TaskR * @return std::vector>> */ std::vector>> -TaskHandler::to_request_stream(const std::vector &task_requests, TaskOptions task_options, +API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector &task_requests, TaskOptions task_options, const size_t chunk_max_size) { std::vector>> async_chunk_payload_tasks; @@ -249,7 +249,7 @@ TaskHandler::to_request_stream(const std::vector &task_requests, Ta * @param task_requests List of task requests * @return Successfully sent task */ -std::future TaskHandler::create_tasks_async(TaskOptions task_options, +std::future API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, const std::vector &task_requests) { return std::async(std::launch::async, [this, &task_requests, &task_options]() mutable { size_t chunk = config_.data_chunk_max_size(); @@ -288,7 +288,7 @@ std::future TaskHandler::create_tasks_async(TaskOptions task_op * @param data The result data * @return A future containing a vector of ResultReply */ -std::future> TaskHandler::send_result(std::string key, std::vector &data) { +std::future> API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key, std::vector &data) { return std::async(std::launch::async, [this, key, data]() { std::vector result; @@ -356,7 +356,7 @@ std::future> TaskHandler::send_result(std::string key, * @param results The results data * @return std::vector list of result ids */ -std::vector TaskHandler::get_result_ids(std::vector results) { +std::vector API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector results) { std::vector result_ids; grpc::ClientContext context_client_writer; From fd39fd7eb8f986f9958ed4b30eacc3c3491168c2 Mon Sep 17 00:00:00 2001 From: ddiakiteaneo Date: Thu, 13 Jul 2023 16:20:41 +0200 Subject: [PATCH 2/2] Apply format --- .../header/submitter/SubmitterClient.h | 2 +- .../source/submitter/SubmitterClient.cpp | 28 +++++++++++-------- .../source/SubmitterCLientTest.cpp | 2 +- .../header/Worker/ArmoniKWorker.h | 4 +-- .../header/Worker/TaskHandler.h | 7 +++-- .../source/Worker/ArmoniKWorker.cpp | 12 ++++---- .../source/Worker/TaskHandler.cpp | 21 ++++++++------ 7 files changed, 44 insertions(+), 32 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h b/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h index 06f216baf..0aa8662ba 100644 --- a/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h +++ b/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h @@ -10,7 +10,7 @@ #include "submitter_common.pb.h" #include "submitter_service.grpc.pb.h" -namespace API_CLIENT_NAMESPACE{ +namespace API_CLIENT_NAMESPACE { /** * @brief Data structure for task payload diff --git a/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp index 269d1effb..c379a0c09 100644 --- a/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp +++ b/packages/cpp/ArmoniK.Api.Client/source/submitter/SubmitterClient.cpp @@ -28,7 +28,9 @@ using namespace armonik::api::grpc::v1::submitter; * * @param stub the gRPC client stub */ -API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr stub) { stub_ = std::move(stub); } +API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr stub) { + stub_ = std::move(stub); +} /** * @brief Create a new session. @@ -36,7 +38,7 @@ API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr &partition_ids = {}) { + const std::vector &partition_ids = {}) { CreateSessionRequest request; *request.mutable_default_task_option() = std::move(default_task_options); for (const auto &partition_id : partition_ids) { @@ -65,8 +67,9 @@ std::string API_CLIENT_NAMESPACE::SubmitterClient::create_session(TaskOptions de * @return A vector of futures containing CreateLargeTaskRequest objects. */ std::vector>> -API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector &task_requests, std::string session_id, - TaskOptions task_options, const size_t chunk_max_size) { +API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector &task_requests, + std::string session_id, TaskOptions task_options, + const size_t chunk_max_size) { std::vector>> async_chunk_payload_tasks; async_chunk_payload_tasks.push_back( std::async([session_id = std::move(session_id), task_options = std::move(task_options)]() mutable { @@ -98,7 +101,8 @@ API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector> -API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last, size_t chunk_max_size) { +API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last, + size_t chunk_max_size) { return std::async(std::launch::async, [&task_request, chunk_max_size, is_last]() { std::vector requests; armonik::api::grpc::v1::InitTaskRequest header_task_request; @@ -172,8 +176,9 @@ API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task * @param task_requests A vector of TaskRequest objects. * @return A future containing a CreateTaskReply object. */ -std::future API_CLIENT_NAMESPACE::SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options, - const std::vector &task_requests) { +std::future +API_CLIENT_NAMESPACE::SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options, + const std::vector &task_requests) { return std::async(std::launch::async, [this, task_requests, session_id = std::move(session_id), task_options = std::move(task_options)]() mutable { armonik::api::grpc::v1::Configuration config_response; @@ -230,9 +235,9 @@ std::future API_CLIENT_NAMESPACE::SubmitterClient::create_tasks * @return A vector of task IDs. */ std::tuple, std::vector> -API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOptions task_options, - const std::vector &payloads_with_dependencies, - int max_retries = 5) { +API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies( + std::string session_id, TaskOptions task_options, const std::vector &payloads_with_dependencies, + int max_retries = 5) { std::vector task_ids; std::vector failed_task_ids; std::vector requests; @@ -283,7 +288,8 @@ API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(std::strin * @param result_request A vector of ResultRequest objects. * @return A future containing data result. */ -std::future> API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) { +std::future> +API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) { return std::async(std::launch::async, [this, &result_request]() { ResultReply result_writer; ClientContext context_configuration; diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp index 4f6c09cf8..057a86fd0 100644 --- a/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp +++ b/packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp @@ -102,7 +102,7 @@ TEST(testMock, submitTask) { log.enrich([&](serilog_context &ctx) { ctx.add("fieldTestValue", 1); }); log.add_property("time", time(nullptr)); - ::putenv((char*)"GRPC_DNS_RESOLVER=native"); + ::putenv((char *)"GRPC_DNS_RESOLVER=native"); std::cout << "Starting client..." << std::endl; diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h index bf8f9aa1e..9f53a250c 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h @@ -14,8 +14,8 @@ #include "Worker/TaskHandler.h" -namespace API_WORKER_NAMESPACE{ - +namespace API_WORKER_NAMESPACE { + class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service { private: armonik::api::common::serilog::serilog logger_; diff --git a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h index 6dba67631..74aa2a0b3 100644 --- a/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h +++ b/packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h @@ -8,7 +8,7 @@ #include "worker_common.pb.h" #include "worker_service.grpc.pb.h" -namespace API_WORKER_NAMESPACE{ +namespace API_WORKER_NAMESPACE { // #include "SessionContext.h" @@ -66,8 +66,9 @@ class TaskHandler { * @param chunk_max_size Maximum chunk size. * @return std::vector>> */ - static std::vector>> to_request_stream(const std::vector &task_requests, - armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size); + static std::vector>> + to_request_stream(const std::vector &task_requests, + armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size); /** * @brief Create a tasks async object diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp index adf96dfa2..cff96b433 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp @@ -30,7 +30,7 @@ using namespace armonik::api::common::utils; * @brief Constructs a ArmoniKWorker object. */ API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr agent, - void (*processing_function)(TaskHandler task_handler)) + void (*processing_function)(TaskHandler task_handler)) : logger_(armonik::api::common::serilog::logging_format::SEQ) { logger_.info("Build Service ArmoniKWorker"); logger_.add_property("class", "ArmoniKWorker"); @@ -48,8 +48,9 @@ API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr *reader, - ::armonik::api::grpc::v1::worker::ProcessReply *response) { +Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *context, + ::grpc::ServerReader *reader, + ::armonik::api::grpc::v1::worker::ProcessReply *response) { logger_.info("Receive new request From C++ real Worker"); @@ -80,8 +81,9 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *conte * * @return The status of the method. */ -Status API_WORKER_NAMESPACE::ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request, - ::armonik::api::grpc::v1::worker::HealthCheckReply *response) { +Status API_WORKER_NAMESPACE::ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, + const ::armonik::api::grpc::v1::Empty *request, + ::armonik::api::grpc::v1::worker::HealthCheckReply *response) { // Implementation of the HealthCheck method logger_.info("HealthCheck request OK"); diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index 1b7c49d86..133c59c5f 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -30,7 +30,7 @@ using namespace armonik::api::grpc::v1::agent; * @param request_iterator The request iterator */ API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::unique_ptr client, - std::shared_ptr> request_iterator) { + std::shared_ptr> request_iterator) { stub_ = std::move(client); request_iterator_ = std::move(request_iterator); } @@ -145,8 +145,8 @@ void API_WORKER_NAMESPACE::TaskHandler::init() { * @param chunk_max_size Maximum chunk size. * @return std::future> */ -std::future> API_WORKER_NAMESPACE::TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, - size_t chunk_max_size) { +std::future> +API_WORKER_NAMESPACE::TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, size_t chunk_max_size) { return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() { std::vector requests; armonik::api::grpc::v1::InitTaskRequest header_task_request; @@ -220,8 +220,8 @@ std::future> API_WORKER_NAMESPACE::TaskHandler::t * @return std::vector>> */ std::vector>> -API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector &task_requests, TaskOptions task_options, - const size_t chunk_max_size) { +API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector &task_requests, + TaskOptions task_options, const size_t chunk_max_size) { std::vector>> async_chunk_payload_tasks; async_chunk_payload_tasks.push_back(std::async([task_options = std::move(task_options)]() mutable { @@ -249,8 +249,9 @@ API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, - const std::vector &task_requests) { +std::future +API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options, + const std::vector &task_requests) { return std::async(std::launch::async, [this, &task_requests, &task_options]() mutable { size_t chunk = config_.data_chunk_max_size(); @@ -288,7 +289,8 @@ std::future API_WORKER_NAMESPACE::TaskHandler::create_tasks_asy * @param data The result data * @return A future containing a vector of ResultReply */ -std::future> API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key, std::vector &data) { +std::future> API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key, + std::vector &data) { return std::async(std::launch::async, [this, key, data]() { std::vector result; @@ -356,7 +358,8 @@ std::future> API_WORKER_NAMESPACE::TaskHandler::send_re * @param results The results data * @return std::vector list of result ids */ -std::vector API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector results) { +std::vector +API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector results) { std::vector result_ids; grpc::ClientContext context_client_writer;