Skip to content

Commit

Permalink
Added PR suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo committed Sep 15, 2023
1 parent b05644c commit de4eafa
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 197 deletions.
50 changes: 25 additions & 25 deletions packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
#ifndef ARMONIK_API_RESULTSCLIENT_H
#define ARMONIK_API_RESULTSCLIENT_H

#include <results_service.grpc.pb.h>

namespace armonik {
namespace api {
namespace client {
class ResultsClient {
public:
explicit ResultsClient(std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub)
: stub(std::move(stub)) {}

std::map<std::string, std::string> create_results(absl::string_view session_id,
const std::vector<std::string> &names);
void upload_result_data(const std::string &session_id, const std::string &result_id, absl::string_view payload);

private:
std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub;
};
} // namespace client
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_RESULTSCLIENT_H
#ifndef ARMONIK_API_RESULTSCLIENT_H
#define ARMONIK_API_RESULTSCLIENT_H

#include <results_service.grpc.pb.h>

namespace armonik {
namespace api {
namespace client {
class ResultsClient {
public:
explicit ResultsClient(std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub)
: stub(std::move(stub)) {}

std::map<std::string, std::string> create_results(absl::string_view session_id,
const std::vector<std::string> &names);
void upload_result_data(const std::string &session_id, const std::string &result_id, absl::string_view payload);

private:
std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub;
};
} // namespace client
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_RESULTSCLIENT_H
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ class SubmitterClient {

} // namespace client
} // namespace api
} // namespace armonik
} // namespace armonik
174 changes: 86 additions & 88 deletions packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp
Original file line number Diff line number Diff line change
@@ -1,88 +1,86 @@
#include "submitter/ResultsClient.h"
#include "exceptions/ArmoniKApiException.h"
#include <sstream>

namespace armonik {
namespace api {
namespace client {

std::map<std::string, std::string> ResultsClient::create_results(absl::string_view session_id,
const std::vector<std::string> &names) {
std::map<std::string, std::string> mapping;
::grpc::ClientContext context;
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request;
armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response;

// Creates the result creation requests
std::vector<armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate> results_create;
results_create.reserve(names.size());
for (auto &&name : names) {
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create;
result_create.set_name(name);
results_create.push_back(result_create);
}

results_request.mutable_results()->Add(results_create.begin(), results_create.end());
results_request.mutable_session_id()->assign(session_id.data(), session_id.size());

// Creates the results
auto status = stub->CreateResultsMetaData(&context, results_request, &results_response);

if (!status.ok()) {
std::stringstream message;
message << "Error: " << status.error_code() << ": " << status.error_message()
<< ". details : " << status.error_details() << std::endl;
auto str = message.str();
std::cerr << "Could not create results for submit: " << str << std::endl;
throw armonik::api::common::exceptions::ArmoniKApiException(str);
}

for (auto &&res : results_response.results()) {
mapping.insert({res.name(), res.result_id()});
}
return mapping;
}
void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id,
absl::string_view payload) {
::grpc::ClientContext context;
armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration;
auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to get result configuration : " +
status.error_message());
}

size_t maxChunkSize = configuration.data_chunk_max_size();

armonik::api::grpc::v1::results::UploadResultDataResponse response;
// response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw());
::grpc::ClientContext streamContext;
auto stream = stub->UploadResultData(&streamContext, &response);
armonik::api::grpc::v1::results::UploadResultDataRequest request;
request.mutable_id()->set_session_id(session_id);
request.mutable_id()->set_result_id(result_id);
stream->Write(request);
size_t offset = 0;

while (offset < payload.size()) {
size_t chunkSize = std::min(maxChunkSize, payload.size() - offset);
auto chunk = payload.substr(offset, chunkSize);
request.mutable_data_chunk()->assign(chunk.data(), chunk.size());
if (!stream->Write(request)) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result");
}
offset += chunkSize;
}

if (!stream->WritesDone()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to upload result");
}
status = stream->Finish();
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to finish upload result " +
status.error_message());
}
}
} // namespace client
} // namespace api
} // namespace armonik
#include "submitter/ResultsClient.h"
#include "exceptions/ArmoniKApiException.h"
#include <sstream>

namespace armonik {
namespace api {
namespace client {

std::map<std::string, std::string> ResultsClient::create_results(absl::string_view session_id,
const std::vector<std::string> &names) {
std::map<std::string, std::string> mapping;
::grpc::ClientContext context;
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request;
armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response;

// Creates the result creation requests
std::vector<armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate> results_create;
results_create.reserve(names.size());
for (auto &&name : names) {
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create;
result_create.set_name(name);
results_create.push_back(result_create);
}

results_request.mutable_results()->Add(results_create.begin(), results_create.end());
results_request.mutable_session_id()->assign(session_id.data(), session_id.size());

// Creates the results
auto status = stub->CreateResultsMetaData(&context, results_request, &results_response);

if (!status.ok()) {
std::stringstream message;
message << "Error: " << status.error_code() << ": " << status.error_message()
<< ". details : " << status.error_details() << std::endl;
auto str = message.str();
std::cerr << "Could not create results for submit: " << str << std::endl;
throw armonik::api::common::exceptions::ArmoniKApiException(str);
}

for (auto &&res : results_response.results()) {
mapping.insert({res.name(), res.result_id()});
}
return mapping;
}
void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id,
absl::string_view payload) {
::grpc::ClientContext context;
armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration;
auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to get result configuration : " +
status.error_message());
}

size_t maxChunkSize = configuration.data_chunk_max_size();

armonik::api::grpc::v1::results::UploadResultDataResponse response;
// response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw());
::grpc::ClientContext streamContext;
auto stream = stub->UploadResultData(&streamContext, &response);
armonik::api::grpc::v1::results::UploadResultDataRequest request;
request.mutable_id()->set_session_id(session_id);
request.mutable_id()->set_result_id(result_id);
stream->Write(request);

while (!payload.empty()) {
auto chunk = payload.substr(0, maxChunkSize);
request.mutable_data_chunk()->assign(chunk.data(), chunk.size());
if (!stream->Write(request)) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to continue upload result");
}
payload = payload.substr(maxChunkSize);
}

if (!stream->WritesDone()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to upload result");
}
status = stream->Finish();
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to finish upload result " +
status.error_message());
}
}
} // namespace client
} // namespace api
} // namespace armonik
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#ifndef ARMONIK_API_ARMONIKAPIEXCEPTION_H
#define ARMONIK_API_ARMONIKAPIEXCEPTION_H

#include <stdexcept>
namespace armonik {
namespace api {
namespace common {
namespace exceptions {

class ArmoniKApiException : public std::runtime_error {
public:
explicit ArmoniKApiException(const std::string &message) : runtime_error(message) {}
};
} // namespace exceptions
} // namespace common
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_ARMONIKAPIEXCEPTION_H
#ifndef ARMONIK_API_ARMONIKAPIEXCEPTION_H
#define ARMONIK_API_ARMONIKAPIEXCEPTION_H

#include <stdexcept>
namespace armonik {
namespace api {
namespace common {
namespace exceptions {

class ArmoniKApiException : public std::runtime_error {
public:
explicit ArmoniKApiException(const std::string &message) : runtime_error(message) {}
};
} // namespace exceptions
} // namespace common
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_ARMONIKAPIEXCEPTION_H
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
#ifndef ARMONIK_API_ARMONIKTASKERROR_H
#define ARMONIK_API_ARMONIKTASKERROR_H

#include "ArmoniKApiException.h"
#include <objects.pb.h>
#include <sstream>
#include <vector>
namespace armonik {
namespace api {
namespace common {
namespace exceptions {

class ArmoniKTaskError : public ArmoniKApiException {
public:
explicit ArmoniKTaskError(const std::string &message, const armonik::api::grpc::v1::TaskError &task_error)
: ArmoniKApiException(message) {
std::stringstream ss;
ss << "TaskId : " << task_error.task_id() << " Errors : ";
for (auto &&e : task_error.errors()) {
std::string status = armonik::api::grpc::v1::task_status::TaskStatus_Name(e.task_status());
status_details.emplace_back(status, e.detail());
ss << '\n' << status << " : " << e.detail();
}
details = std::string(ArmoniKApiException::what()) + " : " + ss.str();
taskId_ = task_error.task_id();
}
[[nodiscard]] const char *what() const noexcept override { return details.c_str(); }
const std::string &taskId() { return taskId_; }
const std::vector<std::pair<std::string, std::string>> &error_details() { return status_details; }

private:
std::string details;
std::string taskId_;
std::vector<std::pair<std::string, std::string>> status_details;
};

} // namespace exceptions
} // namespace common
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_ARMONIKTASKERROR_H
#ifndef ARMONIK_API_ARMONIKTASKERROR_H
#define ARMONIK_API_ARMONIKTASKERROR_H

#include "ArmoniKApiException.h"
#include <objects.pb.h>
#include <sstream>
#include <vector>
namespace armonik {
namespace api {
namespace common {
namespace exceptions {

class ArmoniKTaskError : public ArmoniKApiException {
public:
explicit ArmoniKTaskError(const std::string &message, const armonik::api::grpc::v1::TaskError &task_error)
: ArmoniKApiException(message) {
std::stringstream ss;
ss << "TaskId : " << task_error.task_id() << " Errors : ";
for (auto &&e : task_error.errors()) {
std::string status = armonik::api::grpc::v1::task_status::TaskStatus_Name(e.task_status());
status_details.emplace_back(status, e.detail());
ss << '\n' << status << " : " << e.detail();
}
details = std::string(ArmoniKApiException::what()) + " : " + ss.str();
taskId_ = task_error.task_id();
}
[[nodiscard]] const char *what() const noexcept override { return details.c_str(); }
const std::string &taskId() { return taskId_; }
const std::vector<std::pair<std::string, std::string>> &error_details() { return status_details; }

private:
std::string details;
std::string taskId_;
std::vector<std::pair<std::string, std::string>> status_details;
};

} // namespace exceptions
} // namespace common
} // namespace api
} // namespace armonik

#endif // ARMONIK_API_ARMONIKTASKERROR_H
Loading

0 comments on commit de4eafa

Please sign in to comment.