diff --git a/packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h b/packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h index fe4a8e943..93b07631d 100644 --- a/packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h +++ b/packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h @@ -1,25 +1,25 @@ -#ifndef ARMONIK_API_RESULTSCLIENT_H -#define ARMONIK_API_RESULTSCLIENT_H - -#include - -namespace armonik { -namespace api { -namespace client { -class ResultsClient { -public: - explicit ResultsClient(std::unique_ptr stub) - : stub(std::move(stub)) {} - - std::map create_results(absl::string_view session_id, - const std::vector &names); - void upload_result_data(const std::string &session_id, const std::string &result_id, absl::string_view payload); - -private: - std::unique_ptr stub; -}; -} // namespace client -} // namespace api -} // namespace armonik - -#endif // ARMONIK_API_RESULTSCLIENT_H +#ifndef ARMONIK_API_RESULTSCLIENT_H +#define ARMONIK_API_RESULTSCLIENT_H + +#include + +namespace armonik { +namespace api { +namespace client { +class ResultsClient { +public: + explicit ResultsClient(std::unique_ptr stub) + : stub(std::move(stub)) {} + + std::map create_results(absl::string_view session_id, + const std::vector &names); + void upload_result_data(const std::string &session_id, const std::string &result_id, absl::string_view payload); + +private: + std::unique_ptr stub; +}; +} // namespace client +} // namespace api +} // namespace armonik + +#endif // ARMONIK_API_RESULTSCLIENT_H diff --git a/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h b/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h index 5f4e7e735..9eefa6857 100644 --- a/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h +++ b/packages/cpp/ArmoniK.Api.Client/header/submitter/SubmitterClient.h @@ -107,4 +107,4 @@ class SubmitterClient { } // namespace client } // namespace api -} // namespace armonik \ No newline at end of file +} // namespace armonik diff --git a/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp index b8c3ae3c1..ad354fdd2 100644 --- a/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp +++ b/packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp @@ -1,88 +1,86 @@ -#include "submitter/ResultsClient.h" -#include "exceptions/ArmoniKApiException.h" -#include - -namespace armonik { -namespace api { -namespace client { - -std::map ResultsClient::create_results(absl::string_view session_id, - const std::vector &names) { - std::map mapping; - ::grpc::ClientContext context; - armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request; - armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response; - - // Creates the result creation requests - std::vector results_create; - results_create.reserve(names.size()); - for (auto &&name : names) { - armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create; - result_create.set_name(name); - results_create.push_back(result_create); - } - - results_request.mutable_results()->Add(results_create.begin(), results_create.end()); - results_request.mutable_session_id()->assign(session_id.data(), session_id.size()); - - // Creates the results - auto status = stub->CreateResultsMetaData(&context, results_request, &results_response); - - if (!status.ok()) { - std::stringstream message; - message << "Error: " << status.error_code() << ": " << status.error_message() - << ". details : " << status.error_details() << std::endl; - auto str = message.str(); - std::cerr << "Could not create results for submit: " << str << std::endl; - throw armonik::api::common::exceptions::ArmoniKApiException(str); - } - - for (auto &&res : results_response.results()) { - mapping.insert({res.name(), res.result_id()}); - } - return mapping; -} -void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id, - absl::string_view payload) { - ::grpc::ClientContext context; - armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration; - auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration); - if (!status.ok()) { - throw armonik::api::common::exceptions::ArmoniKApiException("Unable to get result configuration : " + - status.error_message()); - } - - size_t maxChunkSize = configuration.data_chunk_max_size(); - - armonik::api::grpc::v1::results::UploadResultDataResponse response; - // response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw()); - ::grpc::ClientContext streamContext; - auto stream = stub->UploadResultData(&streamContext, &response); - armonik::api::grpc::v1::results::UploadResultDataRequest request; - request.mutable_id()->set_session_id(session_id); - request.mutable_id()->set_result_id(result_id); - stream->Write(request); - size_t offset = 0; - - while (offset < payload.size()) { - size_t chunkSize = std::min(maxChunkSize, payload.size() - offset); - auto chunk = payload.substr(offset, chunkSize); - request.mutable_data_chunk()->assign(chunk.data(), chunk.size()); - if (!stream->Write(request)) { - throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result"); - } - offset += chunkSize; - } - - if (!stream->WritesDone()) { - throw armonik::api::common::exceptions::ArmoniKApiException("Unable to upload result"); - } - status = stream->Finish(); - if (!status.ok()) { - throw armonik::api::common::exceptions::ArmoniKApiException("Unable to finish upload result " + - status.error_message()); - } -} -} // namespace client -} // namespace api -} // namespace armonik \ No newline at end of file +#include "submitter/ResultsClient.h" +#include "exceptions/ArmoniKApiException.h" +#include + +namespace armonik { +namespace api { +namespace client { + +std::map ResultsClient::create_results(absl::string_view session_id, + const std::vector &names) { + std::map mapping; + ::grpc::ClientContext context; + armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request; + armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response; + + // Creates the result creation requests + std::vector results_create; + results_create.reserve(names.size()); + for (auto &&name : names) { + armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create; + result_create.set_name(name); + results_create.push_back(result_create); + } + + results_request.mutable_results()->Add(results_create.begin(), results_create.end()); + results_request.mutable_session_id()->assign(session_id.data(), session_id.size()); + + // Creates the results + auto status = stub->CreateResultsMetaData(&context, results_request, &results_response); + + if (!status.ok()) { + std::stringstream message; + message << "Error: " << status.error_code() << ": " << status.error_message() + << ". details : " << status.error_details() << std::endl; + auto str = message.str(); + std::cerr << "Could not create results for submit: " << str << std::endl; + throw armonik::api::common::exceptions::ArmoniKApiException(str); + } + + for (auto &&res : results_response.results()) { + mapping.insert({res.name(), res.result_id()}); + } + return mapping; +} +void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id, + absl::string_view payload) { + ::grpc::ClientContext context; + armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration; + auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Unable to get result configuration : " + + status.error_message()); + } + + size_t maxChunkSize = configuration.data_chunk_max_size(); + + armonik::api::grpc::v1::results::UploadResultDataResponse response; + // response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw()); + ::grpc::ClientContext streamContext; + auto stream = stub->UploadResultData(&streamContext, &response); + armonik::api::grpc::v1::results::UploadResultDataRequest request; + request.mutable_id()->set_session_id(session_id); + request.mutable_id()->set_result_id(result_id); + stream->Write(request); + + while (!payload.empty()) { + auto chunk = payload.substr(0, maxChunkSize); + request.mutable_data_chunk()->assign(chunk.data(), chunk.size()); + if (!stream->Write(request)) { + throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result"); + } + payload = payload.substr(maxChunkSize); + } + + if (!stream->WritesDone()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Unable to upload result"); + } + status = stream->Finish(); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Unable to finish upload result " + + status.error_message()); + } +} +} // namespace client +} // namespace api +} // namespace armonik diff --git a/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKApiException.h b/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKApiException.h index a8fe30996..d26fe9a3d 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKApiException.h +++ b/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKApiException.h @@ -1,19 +1,19 @@ -#ifndef ARMONIK_API_ARMONIKAPIEXCEPTION_H -#define ARMONIK_API_ARMONIKAPIEXCEPTION_H - -#include -namespace armonik { -namespace api { -namespace common { -namespace exceptions { - -class ArmoniKApiException : public std::runtime_error { -public: - explicit ArmoniKApiException(const std::string &message) : runtime_error(message) {} -}; -} // namespace exceptions -} // namespace common -} // namespace api -} // namespace armonik - -#endif // ARMONIK_API_ARMONIKAPIEXCEPTION_H +#ifndef ARMONIK_API_ARMONIKAPIEXCEPTION_H +#define ARMONIK_API_ARMONIKAPIEXCEPTION_H + +#include +namespace armonik { +namespace api { +namespace common { +namespace exceptions { + +class ArmoniKApiException : public std::runtime_error { +public: + explicit ArmoniKApiException(const std::string &message) : runtime_error(message) {} +}; +} // namespace exceptions +} // namespace common +} // namespace api +} // namespace armonik + +#endif // ARMONIK_API_ARMONIKAPIEXCEPTION_H diff --git a/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskError.h b/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskError.h index a968074f7..5118cf15f 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskError.h +++ b/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskError.h @@ -1,42 +1,42 @@ -#ifndef ARMONIK_API_ARMONIKTASKERROR_H -#define ARMONIK_API_ARMONIKTASKERROR_H - -#include "ArmoniKApiException.h" -#include -#include -#include -namespace armonik { -namespace api { -namespace common { -namespace exceptions { - -class ArmoniKTaskError : public ArmoniKApiException { -public: - explicit ArmoniKTaskError(const std::string &message, const armonik::api::grpc::v1::TaskError &task_error) - : ArmoniKApiException(message) { - std::stringstream ss; - ss << "TaskId : " << task_error.task_id() << " Errors : "; - for (auto &&e : task_error.errors()) { - std::string status = armonik::api::grpc::v1::task_status::TaskStatus_Name(e.task_status()); - status_details.emplace_back(status, e.detail()); - ss << '\n' << status << " : " << e.detail(); - } - details = std::string(ArmoniKApiException::what()) + " : " + ss.str(); - taskId_ = task_error.task_id(); - } - [[nodiscard]] const char *what() const noexcept override { return details.c_str(); } - const std::string &taskId() { return taskId_; } - const std::vector> &error_details() { return status_details; } - -private: - std::string details; - std::string taskId_; - std::vector> status_details; -}; - -} // namespace exceptions -} // namespace common -} // namespace api -} // namespace armonik - -#endif // ARMONIK_API_ARMONIKTASKERROR_H +#ifndef ARMONIK_API_ARMONIKTASKERROR_H +#define ARMONIK_API_ARMONIKTASKERROR_H + +#include "ArmoniKApiException.h" +#include +#include +#include +namespace armonik { +namespace api { +namespace common { +namespace exceptions { + +class ArmoniKTaskError : public ArmoniKApiException { +public: + explicit ArmoniKTaskError(const std::string &message, const armonik::api::grpc::v1::TaskError &task_error) + : ArmoniKApiException(message) { + std::stringstream ss; + ss << "TaskId : " << task_error.task_id() << " Errors : "; + for (auto &&e : task_error.errors()) { + std::string status = armonik::api::grpc::v1::task_status::TaskStatus_Name(e.task_status()); + status_details.emplace_back(status, e.detail()); + ss << '\n' << status << " : " << e.detail(); + } + details = std::string(ArmoniKApiException::what()) + " : " + ss.str(); + taskId_ = task_error.task_id(); + } + [[nodiscard]] const char *what() const noexcept override { return details.c_str(); } + const std::string &taskId() { return taskId_; } + const std::vector> &error_details() { return status_details; } + +private: + std::string details; + std::string taskId_; + std::vector> status_details; +}; + +} // namespace exceptions +} // namespace common +} // namespace api +} // namespace armonik + +#endif // ARMONIK_API_ARMONIKTASKERROR_H diff --git a/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskNotCompletedException.h b/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskNotCompletedException.h index 9b33e129b..800a6ceeb 100644 --- a/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskNotCompletedException.h +++ b/packages/cpp/ArmoniK.Api.Common/header/exceptions/ArmoniKTaskNotCompletedException.h @@ -1,21 +1,21 @@ -#ifndef ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H -#define ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H - -#include "ArmoniKApiException.h" -namespace armonik { -namespace api { -namespace common { -namespace exceptions { - -class ArmoniKTaskNotCompletedException : public ArmoniKApiException { -public: - explicit ArmoniKTaskNotCompletedException(const std::string &taskId, const std::string &message = "") - : ArmoniKApiException("Task " + taskId + " not completed. " + message), taskId(taskId) {} - const std::string taskId; -}; -} // namespace exceptions -} // namespace common -} // namespace api -} // namespace armonik - -#endif // ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H +#ifndef ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H +#define ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H + +#include "ArmoniKApiException.h" +namespace armonik { +namespace api { +namespace common { +namespace exceptions { + +class ArmoniKTaskNotCompletedException : public ArmoniKApiException { +public: + explicit ArmoniKTaskNotCompletedException(const std::string &taskId, const std::string &message = "") + : ArmoniKApiException("Task " + taskId + " not completed. " + message), taskId(taskId) {} + const std::string taskId; +}; +} // namespace exceptions +} // namespace common +} // namespace api +} // namespace armonik + +#endif // ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H diff --git a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp index aa888b3c6..8b2820d92 100644 --- a/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp +++ b/packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp @@ -55,7 +55,7 @@ void armonik::api::worker::TaskHandler::init() { config_ = std::move(*init_request->mutable_configuration()); auto *datachunk = &init_request->payload(); - payload_.clear(); + assert(payload_.empty()); payload_.append(datachunk->data()); while (!datachunk->data_complete()) {