Skip to content

Commit

Permalink
fix: Added session client wrapper to cpp (#434)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Oct 20, 2023
2 parents 98fc452 + 89acb04 commit 99f57cd
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 0 deletions.
67 changes: 67 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "sessions_common.pb.h"
#include "sessions_service.grpc.pb.h"

namespace armonik {
namespace api {
namespace client {

/**
* Session client wrapper
*/
class SessionsClient {
public:
explicit SessionsClient(std::unique_ptr<armonik::api::grpc::v1::sessions::Sessions::StubInterface> stub)
: stub(std::move(stub)){};

/**
* Create a new session
* @param default_task_options Default task options for the session
* @param partitions Partitions the session will be able to send tasks to
* @return Session id
*/
std::string create_session(armonik::api::grpc::v1::TaskOptions default_task_options,
const std::vector<std::string> &partitions = {});

/**
* Get informations about the given session
* @param session_id Session id
* @return SessionRaw object containing information about the session
*/
armonik::api::grpc::v1::sessions::SessionRaw get_session(std::string session_id);

/**
* Cancel a session
* @param session_id Session id
* @return SessionRaw object containing information about the session
*/
armonik::api::grpc::v1::sessions::SessionRaw cancel_session(std::string session_id);

/**
* List the Sessions
* @param filters Filter to be used
* @param total Output for the total of session available for this request (used for pagination)
* @param page Page to request, use -1 to get all pages.
* @param page_size Size of the requested page, ignored if page is -1
* @param sort How the sessions are sorted, ascending creation date by default
* @return List of sessions
*
* @note If the sessions corresponding to the filters change while this call is going for page==-1,
* or between calls, then the returned values may not be consistent depending on the sorting used.
* For example, a sort by ascending creation date (the default) will be stable if sessions are being created in
* between requests.
*/
std::vector<armonik::api::grpc::v1::sessions::SessionRaw>
list_sessions(armonik::api::grpc::v1::sessions::Filters filters, int32_t &total, int32_t page = -1,
int32_t page_size = 500,
armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort = default_sort);

private:
std::unique_ptr<armonik::api::grpc::v1::sessions::Sessions::StubInterface> stub;
static const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort default_sort;
};

} // namespace client
} // namespace api
} // namespace armonik
108 changes: 108 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include <utility>

#include "exceptions/ArmoniKApiException.h"
#include "sessions/SessionsClient.h"

static armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() {
armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort;
sort.mutable_field()->mutable_session_raw_field()->set_field(
armonik::api::grpc::v1::sessions::SESSION_RAW_ENUM_FIELD_CREATED_AT);
sort.set_direction(armonik::api::grpc::v1::sort_direction::SORT_DIRECTION_ASC);
return sort;
}
const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort armonik::api::client::SessionsClient::default_sort =
get_default_sort();

std::string
armonik::api::client::SessionsClient::create_session(armonik::api::grpc::v1::TaskOptions default_task_options,
const std::vector<std::string> &partitions) {
::grpc::ClientContext context;
armonik::api::grpc::v1::sessions::CreateSessionRequest request;
armonik::api::grpc::v1::sessions::CreateSessionReply response;

*request.mutable_default_task_option() = std::move(default_task_options);
request.mutable_partition_ids()->Add(partitions.begin(), partitions.end());

auto status = stub->CreateSession(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Could not create session : " + status.error_message());
}
return std::move(*response.mutable_session_id());
}

armonik::api::grpc::v1::sessions::SessionRaw armonik::api::client::SessionsClient::get_session(std::string session_id) {
::grpc::ClientContext context;
armonik::api::grpc::v1::sessions::GetSessionRequest request;
armonik::api::grpc::v1::sessions::GetSessionResponse response;

request.set_session_id(std::move(session_id));

auto status = stub->GetSession(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Could not get session : " + status.error_message());
}
return std::move(*response.mutable_session());
}

armonik::api::grpc::v1::sessions::SessionRaw
armonik::api::client::SessionsClient::cancel_session(std::string session_id) {
::grpc::ClientContext context;
armonik::api::grpc::v1::sessions::CancelSessionRequest request;
armonik::api::grpc::v1::sessions::CancelSessionResponse response;

request.set_session_id(std::move(session_id));
auto status = stub->CancelSession(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Could not cancel session : " + status.error_message());
}
return std::move(*response.mutable_session());
}

std::vector<armonik::api::grpc::v1::sessions::SessionRaw>
armonik::api::client::SessionsClient::list_sessions(armonik::api::grpc::v1::sessions::Filters filters, int32_t &total,
int32_t page, int32_t page_size,
armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort) {
armonik::api::grpc::v1::sessions::ListSessionsRequest request;
armonik::api::grpc::v1::sessions::ListSessionsResponse response;

*request.mutable_filters() = std::move(filters);
*request.mutable_sort() = std::move(sort);
request.set_page_size(page_size);

if (page >= 0) {
request.set_page(page);
::grpc::ClientContext context;
auto status = stub->ListSessions(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to list sessions : " +
status.error_message());
}
total = response.total();
return {response.sessions().begin(), response.sessions().end()};
} else {
std::vector<armonik::api::grpc::v1::sessions::SessionRaw> rawSessions;
int current_page = 0;
do {
request.set_page(current_page);
::grpc::ClientContext context;
auto status = stub->ListSessions(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to list sessions : " +
status.error_message());
}
// Append only the additional sessions
// If the current_page is a re-request, this will add only the new information
rawSessions.insert(rawSessions.end(),
response.sessions().begin() + ((int32_t)rawSessions.size() - current_page * page_size),
response.sessions().end());
if (response.sessions_size() >= page_size) {
++current_page;
}

response.clear_sessions();
} while ((int32_t)rawSessions.size() < response.total());
total = response.total();
// NRVO
return rawSessions;
}
}
103 changes: 103 additions & 0 deletions packages/cpp/ArmoniK.Api.Tests/source/SessionClientTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include <gtest/gtest.h>

#include "common.h"
#include "logger/formatter.h"
#include "logger/logger.h"
#include "logger/writer.h"

#include "sessions/SessionsClient.h"

using Logger = armonik::api::common::logger::Logger;

TEST(Sessions, can_create_session) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));

std::string response;
ASSERT_NO_THROW(response = client.create_session(task_options));
ASSERT_FALSE(response.empty());

ASSERT_TRUE(client.get_session(response).status() == armonik::api::grpc::v1::session_status::SESSION_STATUS_RUNNING);
}

TEST(Sessions, can_cancel_session) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));

std::string session_id = client.create_session(task_options);
ASSERT_TRUE(client.get_session(session_id).status() ==
armonik::api::grpc::v1::session_status::SESSION_STATUS_RUNNING);

armonik::api::grpc::v1::sessions::SessionRaw response;
ASSERT_NO_THROW(response = client.cancel_session(session_id));
ASSERT_EQ(response.session_id(), session_id);
ASSERT_TRUE(client.get_session(session_id).status() ==
armonik::api::grpc::v1::session_status::SESSION_STATUS_CANCELLED);
}

TEST(Sessions, can_get_session) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));

std::string session_id = client.create_session(task_options);

armonik::api::grpc::v1::sessions::SessionRaw response;
ASSERT_NO_THROW(response = client.get_session(session_id));
ASSERT_EQ(response.session_id(), session_id);
}

TEST(Sessions, can_list_sessions) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

auto client = armonik::api::client::SessionsClient(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));
std::string session_ids;
size_t expected_n_sessions = 5;
for (size_t i = 0; i < expected_n_sessions; i++) {
ASSERT_NO_THROW(client.create_session(task_options));
}

armonik::api::grpc::v1::sessions::Filters filters;
int total;
auto list = client.list_sessions(filters, total);
ASSERT_GE(list.size(), expected_n_sessions);
ASSERT_GE(total, expected_n_sessions);
}

TEST(Sessions, can_list_sessions_small_page) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

auto client = armonik::api::client::SessionsClient(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));
std::string session_ids;
size_t expected_n_sessions = 5;
for (size_t i = 0; i < expected_n_sessions; i++) {
ASSERT_NO_THROW(client.create_session(task_options));
}

armonik::api::grpc::v1::sessions::Filters filters;
int total;
auto list = client.list_sessions(filters, total, 0, 2);
ASSERT_EQ(list.size(), 2);
ASSERT_GE(total, expected_n_sessions);

list = client.list_sessions(filters, total, -1, 2);
ASSERT_GE(list.size(), expected_n_sessions);
ASSERT_GE(total, expected_n_sessions);
}

0 comments on commit 99f57cd

Please sign in to comment.