Skip to content

Commit

Permalink
fix: fix tests (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Jul 25, 2023
2 parents 7aaea17 + 8b75d64 commit ab31d7c
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 60 deletions.
20 changes: 20 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/header/submitter/ResultsClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef ARMONIK_API_RESULTSCLIENT_H
#define ARMONIK_API_RESULTSCLIENT_H

#include <results_service.grpc.pb.h>

namespace API_CLIENT_NAMESPACE {
class ResultsClient {
public:
explicit ResultsClient(std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub)
: stub(std::move(stub)) {}

std::map<std::string, std::string> create_results(std::string_view session_id, const std::vector<std::string> &names);
void upload_result_data(const std::string &session_id, const std::string &result_id, std::string_view payload);

private:
std::unique_ptr<armonik::api::grpc::v1::results::Results::Stub> stub;
};
} // namespace API_CLIENT_NAMESPACE

#endif // ARMONIK_API_RESULTSCLIENT_H
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class SubmitterClient {
* @return A vector containing the data associated to the result
*/
std::future<std::string> get_result_async(const armonik::api::grpc::v1::ResultRequest &result_request);

std::map<std::string, armonik::api::grpc::v1::result_status::ResultStatus>
get_result_status(const std::string &session_id, const std::vector<std::string> &result_ids);
};

} // namespace API_CLIENT_NAMESPACE
84 changes: 84 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/source/submitter/ResultsClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include "submitter/ResultsClient.h"
#include "exceptions/ArmoniKApiException.h"
#include <sstream>

namespace API_CLIENT_NAMESPACE {

std::map<std::string, std::string> ResultsClient::create_results(std::string_view session_id,
const std::vector<std::string> &names) {
std::map<std::string, std::string> mapping;
grpc::ClientContext context;
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest results_request;
armonik::api::grpc::v1::results::CreateResultsMetaDataResponse results_response;

// Creates the result creation requests
std::vector<armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate> results_create;
results_create.reserve(names.size());
for (auto &&name : names) {
armonik::api::grpc::v1::results::CreateResultsMetaDataRequest_ResultCreate result_create;
result_create.set_name(name);
results_create.push_back(result_create);
}

results_request.mutable_results()->Add(results_create.begin(), results_create.end());
*results_request.mutable_session_id() = session_id;

// Creates the results
auto status = stub->CreateResultsMetaData(&context, results_request, &results_response);

if (!status.ok()) {
std::stringstream message;
message << "Error: " << status.error_code() << ": " << status.error_message()
<< ". details : " << status.error_details() << std::endl;
auto str = message.str();
std::cerr << "Could not create results for submit: " << str << std::endl;
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(str);
}

for (auto &&res : results_response.results()) {
mapping.insert({res.name(), res.result_id()});
}
return mapping;
}
void ResultsClient::upload_result_data(const std::string &session_id, const std::string &result_id,
std::string_view payload) {
grpc::ClientContext context;
armonik::api::grpc::v1::results::ResultsServiceConfigurationResponse configuration;
auto status = stub->GetServiceConfiguration(&context, armonik::api::grpc::v1::Empty(), &configuration);
if (!status.ok()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to get result configuration : " +
status.error_message());
}

size_t maxChunkSize = configuration.data_chunk_max_size();

armonik::api::grpc::v1::results::UploadResultDataResponse response;
// response.set_allocated_result(new armonik::api::grpc::v1::results::ResultRaw());
grpc::ClientContext streamContext;
auto stream = stub->UploadResultData(&streamContext, &response);
armonik::api::grpc::v1::results::UploadResultDataRequest request;
request.mutable_id()->set_session_id(session_id);
request.mutable_id()->set_result_id(result_id);
stream->Write(request);
size_t offset = 0;

while (offset < payload.size()) {
size_t chunkSize = std::min(maxChunkSize, payload.size() - offset);

*request.mutable_data_chunk() = payload.substr(offset, chunkSize);
if (!stream->Write(request)) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to continue upload result");
}
offset += chunkSize;
}

if (!stream->WritesDone()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to upload result");
}
status = stream->Finish();
if (!status.ok()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Unable to finish upload result " +
status.error_message());
}
}
} // namespace API_CLIENT_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,26 @@ std::future<std::string> API_CLIENT_NAMESPACE::SubmitterClient::get_result_async
return result_data;
});
}
std::map<std::string, armonik::api::grpc::v1::result_status::ResultStatus>
ArmoniK::Api::Client::SubmitterClient::get_result_status(const std::string &session_id,
const std::vector<std::string> &result_ids) {
grpc::ClientContext context;
armonik::api::grpc::v1::submitter::GetResultStatusRequest request;
armonik::api::grpc::v1::submitter::GetResultStatusReply reply;

request.set_session_id(session_id);
request.mutable_result_ids()->Add(result_ids.begin(), result_ids.end());

auto status = stub_->GetResultStatus(&context, request, &reply);
if (!status.ok()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Couldn't get result status : " +
status.error_message());
}

std::map<std::string, armonik::api::grpc::v1::result_status::ResultStatus> statuses;
for (auto &&id_status : reply.id_statuses()) {
statuses[id_status.result_id()] = id_status.status();
}

return statuses;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromPath(API_COMMON_NAMESPA
dom::element elem;
try {
elem = parser.load(std::string(filepath));
populate(config, "", elem);
} catch (const std::exception &e) {
std::cerr << "Unable to load json file " << filepath << " : " << e.what();
}
populate(config, "", elem);
}
void API_COMMON_NAMESPACE::utils::JsonConfiguration::fromString(API_COMMON_NAMESPACE::utils::IConfiguration &config,
const std::string &json_string) {
Expand Down
111 changes: 101 additions & 10 deletions packages/cpp/ArmoniK.Api.Tests/source/SubmitterClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include "utils/GuuId.h"
#include "utils/StringsUtils.h"

#include "results_common.pb.h"
#include "results_service.grpc.pb.h"
#include "submitter/ResultsClient.h"

using ArmoniK::Api::Common::utils::IConfiguration;
using armonik::api::grpc::v1::TaskOptions;
using armonik::api::grpc::v1::submitter::CreateSessionReply;
Expand Down Expand Up @@ -59,7 +63,7 @@ void init(std::shared_ptr<Channel> &channel, TaskOptions &default_task_options)
default_task_options.mutable_max_duration()->set_nanos(0);
default_task_options.set_max_retries(3);
default_task_options.set_priority(1);
default_task_options.set_partition_id("cpp");
default_task_options.set_partition_id("");
default_task_options.set_application_name("my-app");
default_task_options.set_application_version("1.0");
default_task_options.set_application_namespace("my-namespace");
Expand All @@ -75,12 +79,12 @@ TEST(testMock, createSession) {
CreateSessionReply reply;
CreateSessionRequest request;

const std::vector<std::string> &partition_ids = {"cpp"};
const std::vector<std::string> &partition_ids = {""};

TaskOptions task_options;
init(channel, task_options);

ASSERT_EQ(task_options.partition_id(), "cpp");
ASSERT_EQ(task_options.partition_id(), "");

std::unique_ptr<Submitter::StubInterface> stub = Submitter::NewStub(channel);
// EXPECT_CALL(*stub, CreateSession(_, _, _)).Times(AtLeast(1));
Expand Down Expand Up @@ -126,17 +130,29 @@ TEST(testMock, submitTask) {
grpc::ClientContext context;

ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub));
const std::vector<std::string> &partition_ids = {"cpp"};
const std::vector<std::string> &partition_ids = {""};
std::string session_id = submitter.create_session(task_options, partition_ids);

ASSERT_FALSE(session_id.empty());

ArmoniK::Api::Client::ResultsClient results(armonik::api::grpc::v1::results::Results::NewStub(channel));
std::vector<std::string> names;
names.reserve(10);
for (int i = 0; i < 10; i++) {
names.push_back(ArmoniK::Api::Common::utils::GuuId::generate_uuid());
}
auto result_mapping = results.create_results(session_id, names);
int j = 0;
for (auto &&[k, v] : result_mapping) {
names[j++] = v;
}

try {
std::vector<ArmoniK::Api::Client::payload_data> payloads;

for (int i = 0; i < 10; i++) {
ArmoniK::Api::Client::payload_data data;
data.keys = ArmoniK::Api::Common::utils::GuuId::generate_uuid();
data.keys = names[i];
data.payload = {'a', 'r', 'm', 'o', 'n', 'i', 'k'};
data.dependencies = {};
payloads.push_back(data);
Expand All @@ -148,10 +164,12 @@ TEST(testMock, submitTask) {
out << "Generate task_ids : " << task_id;
log.info(out.str());
}

for (const auto &failed_task_id : failed_task_ids) {
std::stringstream out;
out << "Failed task_ids : " << failed_task_id;
log.info(out.str());
throw;
}
} catch (std::exception &e) {
log.error(e.what());
Expand All @@ -160,26 +178,99 @@ TEST(testMock, submitTask) {
log.info("Stopping client...OK");
}

TEST(testMock, testWorker) {
std::shared_ptr<Channel> channel;

CreateSessionReply reply;
CreateSessionRequest request;

const std::vector<std::string> &partition_ids = {""};

TaskOptions task_options;

init(channel, task_options);

auto stub = armonik::api::grpc::v1::results::Results::NewStub(channel);

grpc::ClientContext context;

std::unique_ptr<Submitter::StubInterface> stub_client = Submitter::NewStub(channel);
ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub_client));
std::string session_id = submitter.create_session(task_options, partition_ids);

auto name = "test";

armonik::api::grpc::v1::results::CreateResultsMetaDataRequest request_create;
request_create.set_session_id(session_id);
ArmoniK::Api::Client::ResultsClient results(armonik::api::grpc::v1::results::Results::NewStub(channel));
auto mapping = results.create_results(session_id, {name});
ASSERT_TRUE(mapping.size() == 1);

std::vector<ArmoniK::Api::Client::payload_data> payloads;
ArmoniK::Api::Client::payload_data data;
data.keys = mapping[name];
data.payload = "armonik";
data.dependencies = {};
payloads.push_back(data);
const auto [task_ids, failed] = submitter.submit_tasks_with_dependencies(session_id, task_options, payloads, 5);

while (true) {
auto status = submitter.get_result_status(session_id, {mapping[name]})[mapping[name]];
if (status == armonik::api::grpc::v1::result_status::RESULT_STATUS_COMPLETED ||
status == armonik::api::grpc::v1::result_status::RESULT_STATUS_ABORTED) {
ASSERT_NE(armonik::api::grpc::v1::result_status::RESULT_STATUS_ABORTED, status);
break;
}
}

armonik::api::grpc::v1::ResultRequest result_request;
result_request.set_session(session_id);
result_request.set_result_id(mapping[name]);
auto result_payload = submitter.get_result_async(result_request).get();
ASSERT_TRUE(!result_payload.empty());
}

TEST(testMock, getResult) {
// MockStubInterface stub;
std::shared_ptr<Channel> channel;

CreateSessionReply reply;
CreateSessionRequest request;

const std::vector<std::string> &partition_ids = {"cpp"};
const std::vector<std::string> &partition_ids = {""};

TaskOptions task_options;
armonik::api::grpc::v1::ResultRequest result_request;

init(channel, task_options);

auto stub = armonik::api::grpc::v1::results::Results::NewStub(channel);

grpc::ClientContext context;

std::unique_ptr<Submitter::StubInterface> stub_client = Submitter::NewStub(channel);
ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub_client));
std::string session_id = submitter.create_session(task_options, partition_ids);

auto name = "test";

armonik::api::grpc::v1::results::CreateResultsMetaDataRequest request_create;
request_create.set_session_id(session_id);
ArmoniK::Api::Client::ResultsClient results(armonik::api::grpc::v1::results::Results::NewStub(channel));
auto mapping = results.create_results(session_id, {name});
ASSERT_TRUE(mapping.size() == 1);

std::string payload = "TestPayload";
results.upload_result_data(session_id, mapping[name], payload);

// EXPECT_CALL(*stub, GetServiceConfiguration(_, _, _)).Times(AtLeast(1));
// EXPECT_CALL(*stub, TryGetResultStreamRaw(_, _)).Times(AtLeast(1));

std::unique_ptr<Submitter::StubInterface> stub = Submitter::NewStub(channel);
ArmoniK::Api::Client::SubmitterClient submitter(std::move(stub));
result_request.set_result_id(mapping[name]);
result_request.set_session(session_id);

auto result = submitter.get_result_async(result_request);
auto result = submitter.get_result_async(result_request).get();

ASSERT_FALSE(result.get().empty());
ASSERT_FALSE(result.empty());
ASSERT_EQ(payload, result);
}
24 changes: 21 additions & 3 deletions packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "Worker/ArmoniKWorker.h"
#include "Worker/ProcessStatus.h"
#include "Worker/TaskHandler.h"
#include "exceptions/ArmoniKApiException.h"

using grpc::Channel;
using grpc::ClientContext;
Expand All @@ -26,7 +27,20 @@ using namespace armonik::api::grpc::v1::worker;
using namespace ArmoniK::Api::Common::utils;

ArmoniK::Api::Worker::ProcessStatus computer(ArmoniK::Api::Worker::TaskHandler &handler) {
handler.send_result(handler.getExpectedResults()[0], "test");
std::cout << "Call computer" << std::endl;
try {
if (!handler.getExpectedResults().empty()) {
auto res = handler.send_result(handler.getExpectedResults()[0], "test").get();
if (res.has_error()) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException(res.error());
}
}

} catch (const std::exception &e) {
std::cout << "Error sending result " << e.what() << std::endl;
return ArmoniK::Api::Worker::ProcessStatus(e.what());
}

return ArmoniK::Api::Worker::ProcessStatus::OK;
}

Expand All @@ -38,8 +52,12 @@ int main(int argc, char **argv) {
config->set("ComputePlane__WorkerChannel__Address", "/cache/armonik_worker.sock");
config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");

ArmoniK::Api::Worker::WorkerServer::create<ArmoniK::Api::Worker::ArmoniKWorker>(config, &computer)->run();
try {
ArmoniK::Api::Worker::WorkerServer::create<ArmoniK::Api::Worker::ArmoniKWorker>(config, &computer)->run();
} catch (const std::exception &e) {
std::cout << "Error in worker" << e.what() << std::endl;
}

std::cout << "Stooping Server..." << std::endl;
std::cout << "Stopping Server..." << std::endl;
return 0;
}
Loading

0 comments on commit ab31d7c

Please sign in to comment.