Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: C++14 support and various installation fixes #413

Merged
merged 8 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions packages/cpp/ArmoniK.Api.Client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
set(PROJECT_NAME ArmoniK.Api.Client)
set(NAMESPACE ArmoniK::Api::Client)
set(ARMONIK_INSTALL_INCLUDE_DIR ${CMAKE_INSTALL_INCLUDEDIR}/armonik/client)
set(PROJECT_BUILD_DIR ${BUILD_DIR}/${PROJECT_NAME})

Expand Down Expand Up @@ -28,16 +27,20 @@ set(PROTO_MESSAGES
list(TRANSFORM PROTO_FILES PREPEND "${PROTO_FILES_DIR}/")
list(TRANSFORM PROTO_MESSAGES PREPEND "${PROTO_FILES_DIR}/")

find_package(Protobuf REQUIRED)
find_package(gRPC CONFIG REQUIRED)
find_package(Threads)

SET(SOURCES_FILES_DIR "${CMAKE_CURRENT_SOURCE_DIR}/source")
SET(HEADER_FILES_DIR "${CMAKE_CURRENT_SOURCE_DIR}/header")

FILE(GLOB_RECURSE SRC_CLIENT_FILES ${SOURCES_FILES_DIR}/*.cpp)
FILE(GLOB_RECURSE HEADER_CLIENT_FILES ${HEADER_FILES_DIR}/*.h)

# Trouver les packages requis
if (UNIX)
find_package(Protobuf REQUIRED)
else()
find_package(Protobuf CONFIG REQUIRED)
endif()
find_package(gRPC CONFIG REQUIRED)
find_package(Threads)

file(MAKE_DIRECTORY ${PROJECT_BUILD_DIR})

Expand Down Expand Up @@ -73,11 +76,10 @@ set_source_files_properties(${PROTO_GENERATED_FILES} PROPERTIES SKIP_UNITY_BUILD
list(APPEND PROTO_GENERATED_FILES ${PROTO_GENERATED_MESSAGES})

target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure ArmoniK.Api.Common)
set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 14)

setup_options(${PROJECT_NAME})

target_compile_definitions(${PROJECT_NAME} PUBLIC API_CLIENT_NAMESPACE=${NAMESPACE})

target_include_directories(${PROJECT_NAME}
PUBLIC
"$<BUILD_INTERFACE:${PROJECT_BUILD_DIR}>"
Expand Down
45 changes: 25 additions & 20 deletions packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
#ifndef ARMONIK_API_RESULTSCLIENT_H
#define ARMONIK_API_RESULTSCLIENT_H

#include <results_service.grpc.pb.h>

namespace API_CLIENT_NAMESPACE {
class ResultsClient {
public:
explicit ResultsClient(std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub)
: stub(std::move(stub)) {}

std::map<std::string, std::string> create_results(std::string_view session_id, const std::vector<std::string> &names);
void upload_result_data(const std::string &session_id, const std::string &result_id, std::string_view payload);

private:
std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub;
};
} // namespace API_CLIENT_NAMESPACE

#endif // ARMONIK_API_RESULTSCLIENT_H
#ifndef ARMONIK_API_RESULTSCLIENT_H
#define ARMONIK_API_RESULTSCLIENT_H

#include <results_service.grpc.pb.h>

namespace armonik {
namespace api {
namespace client {
class ResultsClient {
public:
explicit ResultsClient(std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub)
: stub(std::move(stub)) {}

std::map<std::string, std::string> create_results(absl::string_view session_id,
const std::vector<std::string> &names);
void upload_result_data(const std::string &session_id, const std::string &result_id, absl::string_view payload);

private:
std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub;
};
} // namespace client
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_RESULTSCLIENT_H
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include "submitter_common.pb.h"
#include "submitter_service.grpc.pb.h"

namespace API_CLIENT_NAMESPACE {
namespace armonik {
namespace api {
namespace client {

/**
* @brief Data structure for task payload
Expand All @@ -30,7 +32,6 @@ struct payload_data {
*/
class SubmitterClient {
private:
grpc::ClientContext context_;
std::unique_ptr<armonik::api::grpc::v1::submitter::Submitter::StubInterface> stub_;

public:
Expand Down Expand Up @@ -89,10 +90,9 @@ class SubmitterClient {
* @param max_retries The maximum number of retries for submitting tasks.
* @return A vector of submitted task IDs.
*/
std::tuple<std::vector<std::string>, std::vector<std::string>>
std::pair<std::vector<std::string>, std::vector<std::string>>
submit_tasks_with_dependencies(std::string session_id, armonik::api::grpc::v1::TaskOptions task_options,
const std::vector<payload_data> &payloads_with_dependencies,
[[maybe_unused]] int max_retries);
const std::vector<payload_data> &payloads_with_dependencies, int max_retries);

/**
* @brief Get result without streaming.
Expand All @@ -105,4 +105,6 @@ class SubmitterClient {
get_result_status(const std::string &session_id, const std::vector<std::string> &result_ids);
};

} // namespace API_CLIENT_NAMESPACE
} // namespace client
} // namespace api
} // namespace armonik
170 changes: 86 additions & 84 deletions packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp
Original file line number Diff line number Diff line change
@@ -1,84 +1,86 @@
#include "submitter/ResultsClient.h"
#include "exceptions/ArmoniKApiException.h"
#include <sstream>

namespace API_CLIENT_NAMESPACE {

std::map<std::string, std::string> ResultsClient::create_results(std::string_view session_id,
const std::vector<std::string> &names) {
std::map<std::string, std::string> mapping;
grpc::ClientContext context;
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request;
armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response;

// Creates the result creation requests
std::vector<armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate> results_create;
results_create.reserve(names.size());
for (auto &&name : names) {
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create;
result_create.set_name(name);
results_create.push_back(result_create);
}

results_request.mutable_results()->Add(results_create.begin(), results_create.end());
*results_request.mutable_session_id() = session_id;

// Creates the results
auto status = stub->CreateResultsMetaData(&context, results_request, &results_response);

if (!status.ok()) {
std::stringstream message;
message << "Error: " << status.error_code() << ": " << status.error_message()
<< ". details : " << status.error_details() << std::endl;
auto str = message.str();
std::cerr << "Could not create results for submit: " << str << std::endl;
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(str);
}

for (auto &&res : results_response.results()) {
mapping.insert({res.name(), res.result_id()});
}
return mapping;
}
void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id,
std::string_view payload) {
grpc::ClientContext context;
armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration;
auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration);
if (!status.ok()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to get result configuration : " +
status.error_message());
}

size_t maxChunkSize = configuration.data_chunk_max_size();

armonik::api::grpc::v1::results::UploadResultDataResponse response;
// response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw());
grpc::ClientContext streamContext;
auto stream = stub->UploadResultData(&streamContext, &response);
armonik::api::grpc::v1::results::UploadResultDataRequest request;
request.mutable_id()->set_session_id(session_id);
request.mutable_id()->set_result_id(result_id);
stream->Write(request);
size_t offset = 0;

while (offset < payload.size()) {
size_t chunkSize = std::min(maxChunkSize, payload.size() - offset);

*request.mutable_data_chunk() = payload.substr(offset, chunkSize);
if (!stream->Write(request)) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to continue upload result");
}
offset += chunkSize;
}

if (!stream->WritesDone()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to upload result");
}
status = stream->Finish();
if (!status.ok()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to finish upload result " +
status.error_message());
}
}
} // namespace API_CLIENT_NAMESPACE
#include "submitter/ResultsClient.h"
#include "exceptions/ArmoniKApiException.h"
#include <sstream>

namespace armonik {
namespace api {
namespace client {

std::map<std::string, std::string> ResultsClient::create_results(absl::string_view session_id,
const std::vector<std::string> &names) {
std::map<std::string, std::string> mapping;
::grpc::ClientContext context;
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request;
armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response;

// Creates the result creation requests
std::vector<armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate> results_create;
results_create.reserve(names.size());
for (auto &&name : names) {
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create;
result_create.set_name(name);
results_create.push_back(result_create);
}

results_request.mutable_results()->Add(results_create.begin(), results_create.end());
results_request.mutable_session_id()->assign(session_id.data(), session_id.size());

// Creates the results
auto status = stub->CreateResultsMetaData(&context, results_request, &results_response);

if (!status.ok()) {
std::stringstream message;
message << "Error: " << status.error_code() << ": " << status.error_message()
<< ". details : " << status.error_details() << std::endl;
auto str = message.str();
std::cerr << "Could not create results for submit: " << str << std::endl;
throw armonik::api::common::exceptions::ArmoniKApiException(str);
}

for (auto &&res : results_response.results()) {
mapping.insert({res.name(), res.result_id()});
}
return mapping;
}
void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id,
absl::string_view payload) {
::grpc::ClientContext context;
armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration;
auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to get result configuration : " +
status.error_message());
}

size_t maxChunkSize = configuration.data_chunk_max_size();

armonik::api::grpc::v1::results::UploadResultDataResponse response;
// response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw());
::grpc::ClientContext streamContext;
auto stream = stub->UploadResultData(&streamContext, &response);
armonik::api::grpc::v1::results::UploadResultDataRequest request;
request.mutable_id()->set_session_id(session_id);
request.mutable_id()->set_result_id(result_id);
stream->Write(request);

while (!payload.empty()) {
auto chunk = payload.substr(0, maxChunkSize);
request.mutable_data_chunk()->assign(chunk.data(), chunk.size());
if (!stream->Write(request)) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result");
}
payload = payload.substr(maxChunkSize);
}

if (!stream->WritesDone()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to upload result");
}
status = stream->Finish();
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to finish upload result " +
status.error_message());
}
}
} // namespace client
} // namespace api
} // namespace armonik
Loading
Loading