Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added session client wrapper to cpp #434

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "sessions_common.pb.h"
#include "sessions_service.grpc.pb.h"

namespace armonik {
namespace api {
namespace client {

/**
* Session client wrapper
*/
class SessionsClient {
public:
explicit SessionsClient(std::unique_ptr<armonik::api::grpc::v1::sessions::Sessions::StubInterface> stub)
: stub(std::move(stub)){};

/**
* Create a new session
* @param default_task_options Default task options for the session
* @param partitions Partitions the session will be able to send tasks to
* @return Session id
*/
std::string create_session(armonik::api::grpc::v1::TaskOptions default_task_options,
const std::vector<std::string> &partitions = {});

/**
* Get informations about the given session
* @param session_id Session id
* @return SessionRaw object containing information about the session
*/
armonik::api::grpc::v1::sessions::SessionRaw get_session(std::string session_id);

/**
* Cancel a session
* @param session_id Session id
* @return SessionRaw object containing information about the session
*/
armonik::api::grpc::v1::sessions::SessionRaw cancel_session(std::string session_id);

/**
* List the Sessions
* @param filters Filter to be used
* @param total Output for the total of session available for this request (used for pagination)
* @param page Page to request, use -1 to get all pages.
* @param page_size Size of the requested page, ignored if page is -1
* @param sort How the sessions are sorted, ascending creation date by default
* @return List of sessions
*
* @note If the sessions corresponding to the filters change while this call is going for page==-1,
* or between calls, then the returned values may not be consistent depending on the sorting used.
* For example, a sort by ascending creation date (the default) will be stable if sessions are being created in
* between requests.
*/
std::vector<armonik::api::grpc::v1::sessions::SessionRaw>
list_sessions(armonik::api::grpc::v1::sessions::Filters filters, int32_t &total, int32_t page = -1,
int32_t page_size = 500,
armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort = default_sort);

private:
std::unique_ptr<armonik::api::grpc::v1::sessions::Sessions::StubInterface> stub;
static const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort default_sort;
};

} // namespace client
} // namespace api
} // namespace armonik
108 changes: 108 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include <utility>

#include "exceptions/ArmoniKApiException.h"
#include "sessions/SessionsClient.h"

static armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() {
armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort;
sort.mutable_field()->mutable_session_raw_field()->set_field(
armonik::api::grpc::v1::sessions::SESSION_RAW_ENUM_FIELD_CREATED_AT);
sort.set_direction(armonik::api::grpc::v1::sort_direction::SORT_DIRECTION_ASC);
return sort;
}
const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort armonik::api::client::SessionsClient::default_sort =
get_default_sort();

std::string
armonik::api::client::SessionsClient::create_session(armonik::api::grpc::v1::TaskOptions default_task_options,
const std::vector<std::string> &partitions) {
::grpc::ClientContext context;
armonik::api::grpc::v1::sessions::CreateSessionRequest request;
armonik::api::grpc::v1::sessions::CreateSessionReply response;

*request.mutable_default_task_option() = std::move(default_task_options);
request.mutable_partition_ids()->Add(partitions.begin(), partitions.end());

auto status = stub->CreateSession(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Could not create session : " + status.error_message());
}
return std::move(*response.mutable_session_id());
}

armonik::api::grpc::v1::sessions::SessionRaw armonik::api::client::SessionsClient::get_session(std::string session_id) {
::grpc::ClientContext context;
armonik::api::grpc::v1::sessions::GetSessionRequest request;
armonik::api::grpc::v1::sessions::GetSessionResponse response;

request.set_session_id(std::move(session_id));

auto status = stub->GetSession(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Could not get session : " + status.error_message());
}
return std::move(*response.mutable_session());
}

armonik::api::grpc::v1::sessions::SessionRaw
armonik::api::client::SessionsClient::cancel_session(std::string session_id) {
::grpc::ClientContext context;
armonik::api::grpc::v1::sessions::CancelSessionRequest request;
armonik::api::grpc::v1::sessions::CancelSessionResponse response;

request.set_session_id(std::move(session_id));
auto status = stub->CancelSession(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Could not cancel session : " + status.error_message());
}
return std::move(*response.mutable_session());
}

std::vector<armonik::api::grpc::v1::sessions::SessionRaw>
armonik::api::client::SessionsClient::list_sessions(armonik::api::grpc::v1::sessions::Filters filters, int32_t &total,
int32_t page, int32_t page_size,
armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort) {
armonik::api::grpc::v1::sessions::ListSessionsRequest request;
armonik::api::grpc::v1::sessions::ListSessionsResponse response;

*request.mutable_filters() = std::move(filters);
*request.mutable_sort() = std::move(sort);
request.set_page_size(page_size);

if (page >= 0) {
request.set_page(page);
::grpc::ClientContext context;
auto status = stub->ListSessions(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to list sessions : " +
status.error_message());
}
total = response.total();
return {response.sessions().begin(), response.sessions().end()};
} else {
std::vector<armonik::api::grpc::v1::sessions::SessionRaw> rawSessions;
int current_page = 0;
do {
request.set_page(current_page);
::grpc::ClientContext context;
auto status = stub->ListSessions(&context, request, &response);
if (!status.ok()) {
throw armonik::api::common::exceptions::ArmoniKApiException("Unable to list sessions : " +
status.error_message());
}
// Append only the additional sessions
// If the current_page is a re-request, this will add only the new information
rawSessions.insert(rawSessions.end(),
response.sessions().begin() + ((int32_t)rawSessions.size() - current_page * page_size),
response.sessions().end());
if (response.sessions_size() >= page_size) {
++current_page;
}

response.clear_sessions();
} while ((int32_t)rawSessions.size() < response.total());
total = response.total();
// NRVO
return rawSessions;
lemaitre-aneo marked this conversation as resolved.
Show resolved Hide resolved
}
}
103 changes: 103 additions & 0 deletions packages/cpp/ArmoniK.Api.Tests/source/SessionClientTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include <gtest/gtest.h>

#include "common.h"
#include "logger/formatter.h"
#include "logger/logger.h"
#include "logger/writer.h"

#include "sessions/SessionsClient.h"

using Logger = armonik::api::common::logger::Logger;

TEST(Sessions, can_create_session) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));

std::string response;
ASSERT_NO_THROW(response = client.create_session(task_options));
ASSERT_FALSE(response.empty());

ASSERT_TRUE(client.get_session(response).status() == armonik::api::grpc::v1::session_status::SESSION_STATUS_RUNNING);
}

TEST(Sessions, can_cancel_session) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));

std::string session_id = client.create_session(task_options);
ASSERT_TRUE(client.get_session(session_id).status() ==
armonik::api::grpc::v1::session_status::SESSION_STATUS_RUNNING);

armonik::api::grpc::v1::sessions::SessionRaw response;
ASSERT_NO_THROW(response = client.cancel_session(session_id));
ASSERT_EQ(response.session_id(), session_id);
ASSERT_TRUE(client.get_session(session_id).status() ==
armonik::api::grpc::v1::session_status::SESSION_STATUS_CANCELLED);
}

TEST(Sessions, can_get_session) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));

std::string session_id = client.create_session(task_options);

armonik::api::grpc::v1::sessions::SessionRaw response;
ASSERT_NO_THROW(response = client.get_session(session_id));
ASSERT_EQ(response.session_id(), session_id);
}

TEST(Sessions, can_list_sessions) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

auto client = armonik::api::client::SessionsClient(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));
std::string session_ids;
size_t expected_n_sessions = 5;
for (size_t i = 0; i < expected_n_sessions; i++) {
ASSERT_NO_THROW(client.create_session(task_options));
}

armonik::api::grpc::v1::sessions::Filters filters;
int total;
auto list = client.list_sessions(filters, total);
ASSERT_GE(list.size(), expected_n_sessions);
ASSERT_GE(total, expected_n_sessions);
}

TEST(Sessions, can_list_sessions_small_page) {
Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)};
std::shared_ptr<::grpc::Channel> channel;
armonik::api::grpc::v1::TaskOptions task_options;
init(channel, task_options, log);

auto client = armonik::api::client::SessionsClient(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel));
std::string session_ids;
size_t expected_n_sessions = 5;
for (size_t i = 0; i < expected_n_sessions; i++) {
ASSERT_NO_THROW(client.create_session(task_options));
}

armonik::api::grpc::v1::sessions::Filters filters;
int total;
auto list = client.list_sessions(filters, total, 0, 2);
ASSERT_EQ(list.size(), 2);
ASSERT_GE(total, expected_n_sessions);

list = client.list_sessions(filters, total, -1, 2);
ASSERT_GE(list.size(), expected_n_sessions);
ASSERT_GE(total, expected_n_sessions);
}
Loading