Skip to content

Commit

Permalink
refactor: Add namespaces to client and worker (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Jul 13, 2023
2 parents 213ed8c + fd39fd7 commit a4a6357
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 40 deletions.
2 changes: 2 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ list(APPEND PROTO_GENERATED_FILES ${PROTO_GENERATED_MESSAGES})

target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure ArmoniK.Api.Common)

target_compile_definitions(${PROJECT_NAME} PUBLIC API_CLIENT_NAMESPACE=${NAMESPACE})

target_include_directories(${PROJECT_NAME}
PUBLIC
"$<BUILD_INTERFACE:${PROJECT_BUILD_DIR}>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "submitter_common.pb.h"
#include "submitter_service.grpc.pb.h"

namespace API_CLIENT_NAMESPACE {

/**
* @brief Data structure for task payload
* @param keys The expected output keys
Expand Down Expand Up @@ -98,3 +100,5 @@ class SubmitterClient {
*/
std::future<std::vector<std::byte>> get_result_async(const armonik::api::grpc::v1::ResultRequest &result_request);
};

} // namespace API_CLIENT_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ using namespace armonik::api::grpc::v1::submitter;
*
* @param stub the gRPC client stub
*/
SubmitterClient::SubmitterClient(std::unique_ptr<Submitter::StubInterface> stub) { stub_ = std::move(stub); }
API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr<Submitter::StubInterface> stub) {
stub_ = std::move(stub);
}

/**
* @brief Create a new session.
* @param partition_ids The partitions ids.
* @param default_task_options The default task options.
*/
std::string SubmitterClient::create_session(TaskOptions default_task_options,
const std::vector<std::string> &partition_ids = {}) {
std::string API_CLIENT_NAMESPACE::SubmitterClient::create_session(TaskOptions default_task_options,
const std::vector<std::string> &partition_ids = {}) {
CreateSessionRequest request;
*request.mutable_default_task_option() = std::move(default_task_options);
for (const auto &partition_id : partition_ids) {
Expand Down Expand Up @@ -65,8 +67,9 @@ std::string SubmitterClient::create_session(TaskOptions default_task_options,
* @return A vector of futures containing CreateLargeTaskRequest objects.
*/
std::vector<std::future<std::vector<CreateLargeTaskRequest>>>
SubmitterClient::to_request_stream(const std::vector<TaskRequest> &task_requests, std::string session_id,
TaskOptions task_options, const size_t chunk_max_size) {
API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector<TaskRequest> &task_requests,
std::string session_id, TaskOptions task_options,
const size_t chunk_max_size) {
std::vector<std::future<std::vector<CreateLargeTaskRequest>>> async_chunk_payload_tasks;
async_chunk_payload_tasks.push_back(
std::async([session_id = std::move(session_id), task_options = std::move(task_options)]() mutable {
Expand Down Expand Up @@ -98,7 +101,8 @@ SubmitterClient::to_request_stream(const std::vector<TaskRequest> &task_requests
* @return A future containing a vector of CreateLargeTaskRequest objects.
*/
std::future<std::vector<CreateLargeTaskRequest>>
SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last, size_t chunk_max_size) {
API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last,
size_t chunk_max_size) {
return std::async(std::launch::async, [&task_request, chunk_max_size, is_last]() {
std::vector<CreateLargeTaskRequest> requests;
armonik::api::grpc::v1::InitTaskRequest header_task_request;
Expand Down Expand Up @@ -172,8 +176,9 @@ SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last
* @param task_requests A vector of TaskRequest objects.
* @return A future containing a CreateTaskReply object.
*/
std::future<CreateTaskReply> SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
std::future<CreateTaskReply>
API_CLIENT_NAMESPACE::SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
return std::async(std::launch::async, [this, task_requests, session_id = std::move(session_id),
task_options = std::move(task_options)]() mutable {
armonik::api::grpc::v1::Configuration config_response;
Expand Down Expand Up @@ -230,9 +235,9 @@ std::future<CreateTaskReply> SubmitterClient::create_tasks_async(std::string ses
* @return A vector of task IDs.
*/
std::tuple<std::vector<std::string>, std::vector<std::string>>
SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOptions task_options,
const std::vector<payload_data> &payloads_with_dependencies,
int max_retries = 5) {
API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(
std::string session_id, TaskOptions task_options, const std::vector<payload_data> &payloads_with_dependencies,
int max_retries = 5) {
std::vector<std::string> task_ids;
std::vector<std::string> failed_task_ids;
std::vector<TaskRequest> requests;
Expand Down Expand Up @@ -283,7 +288,8 @@ SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOpti
* @param result_request A vector of ResultRequest objects.
* @return A future containing data result.
*/
std::future<std::vector<std::byte>> SubmitterClient::get_result_async(const ResultRequest &result_request) {
std::future<std::vector<std::byte>>
API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) {
return std::async(std::launch::async, [this, &result_request]() {
ResultReply result_writer;
ClientContext context_configuration;
Expand Down
3 changes: 3 additions & 0 deletions packages/cpp/ArmoniK.Api.Tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_
set(PROTO_BINARY_DIR "${BUILD_DIR}/${PROJECT_NAME}/")
set(PROTO_IMPORT_DIRS "${PROTO_FILES_DIR}")

target_compile_definitions(${PROJECT_NAME} PUBLIC API_TEST_NAMESPACE=${NAMESPACE})

target_include_directories(${PROJECT_NAME}
PUBLIC
"$<BUILD_INTERFACE:${HEADER_FILES_DIR}>"
Expand All @@ -40,6 +42,7 @@ include(FetchContent)
FetchContent_Declare(
googletest
URL https://github.com/google/googletest/archive/03597a01ee50ed33e9dfd640b249b4be3799d395.zip
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
)
# For Windows: Prevent overriding the parent project's compiler/linker settings
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
Expand Down
12 changes: 6 additions & 6 deletions packages/cpp/ArmoniK.Api.Tests/source/SubmitterCLientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TEST(testMock, createSession) {

std::unique_ptr<Submitter::StubInterface> stub = Submitter::NewStub(channel);
// EXPECT_CALL(*stub, CreateSession(_, _, _)).Times(AtLeast(1));
SubmitterClient submitter(std::move(stub));
ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub));
std::string session_id = submitter.create_session(task_options, partition_ids);

std::cout << "create_session response: " << session_id << std::endl;
Expand All @@ -102,7 +102,7 @@ TEST(testMock, submitTask) {
log.enrich([&](serilog_context &ctx) { ctx.add("fieldTestValue", 1); });
log.add_property("time", time(nullptr));

::putenv("GRPC_DNS_RESOLVER=native");
::putenv((char *)"GRPC_DNS_RESOLVER=native");

std::cout << "Starting client..." << std::endl;

Expand All @@ -125,17 +125,17 @@ TEST(testMock, submitTask) {
CreateSessionReply reply;
grpc::ClientContext context;

SubmitterClient submitter(std::move(stub));
ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub));
const std::vector<std::string> &partition_ids = {"cpp"};
std::string session_id = submitter.create_session(task_options, partition_ids);

ASSERT_FALSE(session_id.empty());

try {
std::vector<payload_data> payloads;
std::vector<ArmoniK::Api::Client::payload_data> payloads;

for (int i = 0; i < 10; i++) {
payload_data data;
ArmoniK::Api::Client::payload_data data;
data.keys = armonik::api::common::utils::GuuId::generate_uuid();
data.payload = {'a', 'r', 'm', 'o', 'n', 'i', 'k'};
data.dependencies = {};
Expand Down Expand Up @@ -177,7 +177,7 @@ TEST(testMock, getResult) {
// EXPECT_CALL(*stub, TryGetResultStreamRaw(_, _)).Times(AtLeast(1));

std::unique_ptr<Submitter::StubInterface> stub = Submitter::NewStub(channel);
SubmitterClient submitter(std::move(stub));
ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub));

auto result = submitter.get_result_async(result_request);

Expand Down
2 changes: 2 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker.Tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_
set(PROTO_BINARY_DIR "${BUILD_DIR}/${PROJECT_NAME}/")
set(PROTO_IMPORT_DIRS "${PROTO_FILES_DIR}")

target_compile_definitions(${PROJECT_NAME} PUBLIC API_WORKER_TEST_NAMESPACE=${NAMESPACE})

target_include_directories(${PROJECT_NAME}
PUBLIC
"$<BUILD_INTERFACE:${HEADER_FILES_DIR}>"
Expand Down
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#include "utils/RootConfiguration.h"
#include "utils/WorkerServer.h"
#include "worker_common.grpc.pb.h"
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/ArmoniKWorker.h"
Expand All @@ -35,7 +35,7 @@ int main(int argc, char **argv) {
config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");

config->get_compute_plane();
WorkerServer::create<ArmoniKWorker, bool>(config)->run();
WorkerServer::create<ArmoniK::Api::Worker::ArmoniKWorker, bool>(config)->run();

std::cout << "Stooping Server..." << std::endl;
return 0;
Expand Down
2 changes: 2 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ add_library(${PROJECT_NAME} STATIC ${PROTO_GENERATED_FILES} ${SRC_CLIENT_FILES}

target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure ArmoniK.Api.Common ${PROTO_TARGET})

target_compile_definitions(${PROJECT_NAME} PUBLIC API_WORKER_NAMESPACE=${NAMESPACE})

target_include_directories(${PROJECT_NAME}
PUBLIC
"$<BUILD_INTERFACE:${BUILD_DIR}/ArmoniK.Api.Common>"
Expand Down
4 changes: 4 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "Worker/TaskHandler.h"

namespace API_WORKER_NAMESPACE {

class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service {
private:
armonik::api::common::serilog::serilog logger_;
Expand Down Expand Up @@ -52,3 +54,5 @@ class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Servi
grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) override;
};

} // namespace API_WORKER_NAMESPACE
10 changes: 7 additions & 3 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

namespace API_WORKER_NAMESPACE {

// #include "SessionContext.h"

/**
Expand Down Expand Up @@ -64,9 +66,9 @@ class TaskHandler {
* @param chunk_max_size Maximum chunk size.
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
static auto to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size)
-> std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>;
static std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size);

/**
* @brief Create a tasks async object
Expand Down Expand Up @@ -97,3 +99,5 @@ class TaskHandler {
std::vector<std::string>
get_result_ids(std::vector<armonik::api::grpc::v1::agent::CreateResultsMetaDataRequest_ResultCreate> results);
};

} // namespace API_WORKER_NAMESPACE
14 changes: 8 additions & 6 deletions packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ using namespace armonik::api::common::utils;
/**
* @brief Constructs a ArmoniKWorker object.
*/
ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler))
API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler))
: logger_(armonik::api::common::serilog::logging_format::SEQ) {
logger_.info("Build Service ArmoniKWorker");
logger_.add_property("class", "ArmoniKWorker");
Expand All @@ -48,8 +48,9 @@ ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agen
*
* @return The status of the method.
*/
Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader<ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) {
Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *context,
::grpc::ServerReader<ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) {

logger_.info("Receive new request From C++ real Worker");

Expand Down Expand Up @@ -80,8 +81,9 @@ Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerRead
*
* @return The status of the method.
*/
Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) {
Status API_WORKER_NAMESPACE::ArmoniKWorker::HealthCheck(::grpc::ServerContext *context,
const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) {
// Implementation of the HealthCheck method
logger_.info("HealthCheck request OK");

Expand Down
25 changes: 14 additions & 11 deletions packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ using namespace armonik::api::grpc::v1::agent;
* @param client the agent client
* @param request_iterator The request iterator
*/
TaskHandler::TaskHandler(std::unique_ptr<Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator) {
API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::unique_ptr<Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator) {
stub_ = std::move(client);
request_iterator_ = std::move(request_iterator);
}
Expand All @@ -39,7 +39,7 @@ TaskHandler::TaskHandler(std::unique_ptr<Agent::Stub> client,
* @brief Initialise the task handler
*
*/
void TaskHandler::init() {
void API_WORKER_NAMESPACE::TaskHandler::init() {
ProcessRequest Request;
// bool status = request_iterator_->Read(&Request);
if (!request_iterator_->Read(&Request)) {
Expand Down Expand Up @@ -145,8 +145,8 @@ void TaskHandler::init() {
* @param chunk_max_size Maximum chunk size.
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
*/
std::future<std::vector<CreateTaskRequest>> TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last,
size_t chunk_max_size) {
std::future<std::vector<CreateTaskRequest>>
API_WORKER_NAMESPACE::TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, size_t chunk_max_size) {
return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() {
std::vector<CreateTaskRequest> requests;
armonik::api::grpc::v1::InitTaskRequest header_task_request;
Expand Down Expand Up @@ -220,8 +220,8 @@ std::future<std::vector<CreateTaskRequest>> TaskHandler::task_chunk_stream(TaskR
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
std::vector<std::future<std::vector<CreateTaskRequest>>>
TaskHandler::to_request_stream(const std::vector<TaskRequest> &task_requests, TaskOptions task_options,
const size_t chunk_max_size) {
API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector<TaskRequest> &task_requests,
TaskOptions task_options, const size_t chunk_max_size) {
std::vector<std::future<std::vector<CreateTaskRequest>>> async_chunk_payload_tasks;

async_chunk_payload_tasks.push_back(std::async([task_options = std::move(task_options)]() mutable {
Expand Down Expand Up @@ -249,8 +249,9 @@ TaskHandler::to_request_stream(const std::vector<TaskRequest> &task_requests, Ta
* @param task_requests List of task requests
* @return Successfully sent task
*/
std::future<CreateTaskReply> TaskHandler::create_tasks_async(TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
std::future<CreateTaskReply>
API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
return std::async(std::launch::async, [this, &task_requests, &task_options]() mutable {
size_t chunk = config_.data_chunk_max_size();

Expand Down Expand Up @@ -288,7 +289,8 @@ std::future<CreateTaskReply> TaskHandler::create_tasks_async(TaskOptions task_op
* @param data The result data
* @return A future containing a vector of ResultReply
*/
std::future<std::vector<ResultReply>> TaskHandler::send_result(std::string key, std::vector<std::byte> &data) {
std::future<std::vector<ResultReply>> API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key,
std::vector<std::byte> &data) {
return std::async(std::launch::async, [this, key, data]() {
std::vector<ResultReply> result;

Expand Down Expand Up @@ -356,7 +358,8 @@ std::future<std::vector<ResultReply>> TaskHandler::send_result(std::string key,
* @param results The results data
* @return std::vector<std::string> list of result ids
*/
std::vector<std::string> TaskHandler::get_result_ids(std::vector<CreateResultsMetaDataRequest_ResultCreate> results) {
std::vector<std::string>
API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector<CreateResultsMetaDataRequest_ResultCreate> results) {
std::vector<std::string> result_ids;

grpc::ClientContext context_client_writer;
Expand Down

0 comments on commit a4a6357

Please sign in to comment.