From 50dfeaf66f69a51184052ad50a32a02ff04df181 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Wed, 18 Oct 2023 12:20:37 +0200 Subject: [PATCH 1/3] Added session client --- .../header/sessions/SessionsClient.h | 66 +++++++++++ .../source/sessions/SessionsClient.cpp | 109 ++++++++++++++++++ .../source/SessionClientTest.cpp | 103 +++++++++++++++++ 3 files changed, 278 insertions(+) create mode 100644 packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h create mode 100644 packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp create mode 100644 packages/cpp/ArmoniK.Api.Tests/source/SessionClientTest.cpp diff --git a/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h b/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h new file mode 100644 index 000000000..0224ce7cd --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h @@ -0,0 +1,66 @@ +#pragma once + +#include "sessions_common.pb.h" +#include "sessions_service.grpc.pb.h" + +namespace armonik { +namespace api { +namespace client { + +/** + * Session client wrapper + */ +class SessionsClient final { +public: + explicit SessionsClient(std::unique_ptr stub); + + /** + * Create a new session + * @param default_task_options Default task options for the session + * @param partitions Partitions the session will be able to send tasks to + * @return Session id + */ + std::string create_session(const armonik::api::grpc::v1::TaskOptions &default_task_options, + const std::vector &partitions = {}); + + /** + * Get informations about the given session + * @param session_id Session id + * @return SessionRaw object containing information about the session + */ + armonik::api::grpc::v1::sessions::SessionRaw get_session(const std::string &session_id); + + /** + * Cancel a session + * @param session_id Session id + * @return SessionRaw object containing information about the session + */ + armonik::api::grpc::v1::sessions::SessionRaw cancel_session(const std::string &session_id); + + /** + * List the Sessions + * @param filters Filter to be used + * @param total Output for the total of session available for this request (used for pagination) + * @param page Page to request, use -1 to get all pages. + * @param page_size Size of the requested page, ignored if page is -1 + * @param sort How the sessions are sorted, ascending creation date by default + * @return List of sessions + * + * @note If the sessions corresponding to the filters change while this call is going for page==-1, + * or between calls, then the returned values may not be consistent depending on the sorting used. + * For example, a sort by ascending creation date (the default) will be stable if sessions are being created in + * between requests. + */ + std::vector + list_sessions(const armonik::api::grpc::v1::sessions::Filters &filters, int32_t &total, int32_t page = -1, + int32_t page_size = 500, + const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort &sort = default_sort); + +private: + std::unique_ptr stub; + static const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort default_sort; +}; + +} // namespace client +} // namespace api +} // namespace armonik diff --git a/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp new file mode 100644 index 000000000..249efc2dc --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp @@ -0,0 +1,109 @@ +#include "sessions/SessionsClient.h" +#include "exceptions/ArmoniKApiException.h" + +armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() { + armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort; + sort.mutable_field()->mutable_session_raw_field()->set_field( + armonik::api::grpc::v1::sessions::SESSION_RAW_ENUM_FIELD_CREATED_AT); + sort.set_direction(armonik::api::grpc::v1::sort_direction::SORT_DIRECTION_ASC); + return sort; +} +const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort armonik::api::client::SessionsClient::default_sort = + get_default_sort(); + +armonik::api::client::SessionsClient::SessionsClient( + std::unique_ptr stub) + : stub(std::move(stub)) {} + +std::string +armonik::api::client::SessionsClient::create_session(const armonik::api::grpc::v1::TaskOptions &default_task_options, + const std::vector &partitions) { + ::grpc::ClientContext context; + armonik::api::grpc::v1::sessions::CreateSessionRequest request; + armonik::api::grpc::v1::sessions::CreateSessionReply response; + + *request.mutable_default_task_option() = default_task_options; + request.mutable_partition_ids()->Add(partitions.begin(), partitions.end()); + + auto status = stub->CreateSession(&context, request, &response); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Could not create session : " + status.error_message()); + } + return response.session_id(); +} + +armonik::api::grpc::v1::sessions::SessionRaw +armonik::api::client::SessionsClient::get_session(const std::string &session_id) { + ::grpc::ClientContext context; + armonik::api::grpc::v1::sessions::GetSessionRequest request; + armonik::api::grpc::v1::sessions::GetSessionResponse response; + + request.set_session_id(session_id); + + auto status = stub->GetSession(&context, request, &response); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Could not get session : " + status.error_message()); + } + return response.session(); +} + +armonik::api::grpc::v1::sessions::SessionRaw +armonik::api::client::SessionsClient::cancel_session(const std::string &session_id) { + ::grpc::ClientContext context; + armonik::api::grpc::v1::sessions::CancelSessionRequest request; + armonik::api::grpc::v1::sessions::CancelSessionResponse response; + + request.set_session_id(session_id); + auto status = stub->CancelSession(&context, request, &response); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Could not cancel session : " + status.error_message()); + } + return response.session(); +} + +std::vector armonik::api::client::SessionsClient::list_sessions( + const armonik::api::grpc::v1::sessions::Filters &filters, int32_t &total, int32_t page, int32_t page_size, + const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort &sort) { + armonik::api::grpc::v1::sessions::ListSessionsRequest request; + armonik::api::grpc::v1::sessions::ListSessionsResponse response; + + *request.mutable_filters() = filters; + *request.mutable_sort() = sort; + request.set_page_size(page_size); + + if (page >= 0) { + request.set_page(page); + ::grpc::ClientContext context; + auto status = stub->ListSessions(&context, request, &response); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Unable to list sessions : " + + status.error_message()); + } + total = response.total(); + return {response.sessions().begin(), response.sessions().end()}; + } else { + std::vector rawSessions; + int current_page = 0; + do { + request.set_page(current_page); + ::grpc::ClientContext context; + auto status = stub->ListSessions(&context, request, &response); + if (!status.ok()) { + throw armonik::api::common::exceptions::ArmoniKApiException("Unable to list sessions : " + + status.error_message()); + } + // Append only the additional sessions + // If the current_page is a re-request, this will add only the new information + rawSessions.insert(rawSessions.end(), + response.sessions().begin() + ((int32_t)rawSessions.size() - current_page * page_size), + response.sessions().end()); + if (response.sessions_size() >= page_size) { + ++current_page; + } + + response.clear_sessions(); + } while ((int32_t)rawSessions.size() < response.total()); + total = response.total(); + return rawSessions; + } +} diff --git a/packages/cpp/ArmoniK.Api.Tests/source/SessionClientTest.cpp b/packages/cpp/ArmoniK.Api.Tests/source/SessionClientTest.cpp new file mode 100644 index 000000000..39616eb33 --- /dev/null +++ b/packages/cpp/ArmoniK.Api.Tests/source/SessionClientTest.cpp @@ -0,0 +1,103 @@ +#include + +#include "common.h" +#include "logger/formatter.h" +#include "logger/logger.h" +#include "logger/writer.h" + +#include "sessions/SessionsClient.h" + +using Logger = armonik::api::common::logger::Logger; + +TEST(Sessions, can_create_session) { + Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)}; + std::shared_ptr<::grpc::Channel> channel; + armonik::api::grpc::v1::TaskOptions task_options; + init(channel, task_options, log); + + armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel)); + + std::string response; + ASSERT_NO_THROW(response = client.create_session(task_options)); + ASSERT_FALSE(response.empty()); + + ASSERT_TRUE(client.get_session(response).status() == armonik::api::grpc::v1::session_status::SESSION_STATUS_RUNNING); +} + +TEST(Sessions, can_cancel_session) { + Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)}; + std::shared_ptr<::grpc::Channel> channel; + armonik::api::grpc::v1::TaskOptions task_options; + init(channel, task_options, log); + + armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel)); + + std::string session_id = client.create_session(task_options); + ASSERT_TRUE(client.get_session(session_id).status() == + armonik::api::grpc::v1::session_status::SESSION_STATUS_RUNNING); + + armonik::api::grpc::v1::sessions::SessionRaw response; + ASSERT_NO_THROW(response = client.cancel_session(session_id)); + ASSERT_EQ(response.session_id(), session_id); + ASSERT_TRUE(client.get_session(session_id).status() == + armonik::api::grpc::v1::session_status::SESSION_STATUS_CANCELLED); +} + +TEST(Sessions, can_get_session) { + Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)}; + std::shared_ptr<::grpc::Channel> channel; + armonik::api::grpc::v1::TaskOptions task_options; + init(channel, task_options, log); + + armonik::api::client::SessionsClient client(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel)); + + std::string session_id = client.create_session(task_options); + + armonik::api::grpc::v1::sessions::SessionRaw response; + ASSERT_NO_THROW(response = client.get_session(session_id)); + ASSERT_EQ(response.session_id(), session_id); +} + +TEST(Sessions, can_list_sessions) { + Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)}; + std::shared_ptr<::grpc::Channel> channel; + armonik::api::grpc::v1::TaskOptions task_options; + init(channel, task_options, log); + + auto client = armonik::api::client::SessionsClient(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel)); + std::string session_ids; + size_t expected_n_sessions = 5; + for (size_t i = 0; i < expected_n_sessions; i++) { + ASSERT_NO_THROW(client.create_session(task_options)); + } + + armonik::api::grpc::v1::sessions::Filters filters; + int total; + auto list = client.list_sessions(filters, total); + ASSERT_GE(list.size(), expected_n_sessions); + ASSERT_GE(total, expected_n_sessions); +} + +TEST(Sessions, can_list_sessions_small_page) { + Logger log{armonik::api::common::logger::writer_console(), armonik::api::common::logger::formatter_plain(true)}; + std::shared_ptr<::grpc::Channel> channel; + armonik::api::grpc::v1::TaskOptions task_options; + init(channel, task_options, log); + + auto client = armonik::api::client::SessionsClient(armonik::api::grpc::v1::sessions::Sessions::NewStub(channel)); + std::string session_ids; + size_t expected_n_sessions = 5; + for (size_t i = 0; i < expected_n_sessions; i++) { + ASSERT_NO_THROW(client.create_session(task_options)); + } + + armonik::api::grpc::v1::sessions::Filters filters; + int total; + auto list = client.list_sessions(filters, total, 0, 2); + ASSERT_EQ(list.size(), 2); + ASSERT_GE(total, expected_n_sessions); + + list = client.list_sessions(filters, total, -1, 2); + ASSERT_GE(list.size(), expected_n_sessions); + ASSERT_GE(total, expected_n_sessions); +} From 114c2af30667ce2910a596b0ed9b014f0b82da90 Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Thu, 19 Oct 2023 11:23:09 +0200 Subject: [PATCH 2/3] PR suggestions --- .../header/sessions/SessionsClient.h | 9 ++++---- .../source/sessions/SessionsClient.cpp | 21 +++++++------------ 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h b/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h index 0224ce7cd..b54d02d8b 100644 --- a/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h +++ b/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h @@ -10,9 +10,10 @@ namespace client { /** * Session client wrapper */ -class SessionsClient final { +class SessionsClient { public: - explicit SessionsClient(std::unique_ptr stub); + explicit SessionsClient(std::unique_ptr stub) + : stub(std::move(stub)){}; /** * Create a new session @@ -28,14 +29,14 @@ class SessionsClient final { * @param session_id Session id * @return SessionRaw object containing information about the session */ - armonik::api::grpc::v1::sessions::SessionRaw get_session(const std::string &session_id); + armonik::api::grpc::v1::sessions::SessionRaw get_session(std::string session_id); /** * Cancel a session * @param session_id Session id * @return SessionRaw object containing information about the session */ - armonik::api::grpc::v1::sessions::SessionRaw cancel_session(const std::string &session_id); + armonik::api::grpc::v1::sessions::SessionRaw cancel_session(std::string session_id); /** * List the Sessions diff --git a/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp index 249efc2dc..6cc59f046 100644 --- a/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp +++ b/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp @@ -1,7 +1,7 @@ #include "sessions/SessionsClient.h" #include "exceptions/ArmoniKApiException.h" -armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() { +static armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() { armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort; sort.mutable_field()->mutable_session_raw_field()->set_field( armonik::api::grpc::v1::sessions::SESSION_RAW_ENUM_FIELD_CREATED_AT); @@ -11,10 +11,6 @@ armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() { const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort armonik::api::client::SessionsClient::default_sort = get_default_sort(); -armonik::api::client::SessionsClient::SessionsClient( - std::unique_ptr stub) - : stub(std::move(stub)) {} - std::string armonik::api::client::SessionsClient::create_session(const armonik::api::grpc::v1::TaskOptions &default_task_options, const std::vector &partitions) { @@ -29,36 +25,35 @@ armonik::api::client::SessionsClient::create_session(const armonik::api::grpc::v if (!status.ok()) { throw armonik::api::common::exceptions::ArmoniKApiException("Could not create session : " + status.error_message()); } - return response.session_id(); + return std::move(*response.mutable_session_id()); } -armonik::api::grpc::v1::sessions::SessionRaw -armonik::api::client::SessionsClient::get_session(const std::string &session_id) { +armonik::api::grpc::v1::sessions::SessionRaw armonik::api::client::SessionsClient::get_session(std::string session_id) { ::grpc::ClientContext context; armonik::api::grpc::v1::sessions::GetSessionRequest request; armonik::api::grpc::v1::sessions::GetSessionResponse response; - request.set_session_id(session_id); + request.set_session_id(std::move(session_id)); auto status = stub->GetSession(&context, request, &response); if (!status.ok()) { throw armonik::api::common::exceptions::ArmoniKApiException("Could not get session : " + status.error_message()); } - return response.session(); + return std::move(*response.mutable_session()); } armonik::api::grpc::v1::sessions::SessionRaw -armonik::api::client::SessionsClient::cancel_session(const std::string &session_id) { +armonik::api::client::SessionsClient::cancel_session(std::string session_id) { ::grpc::ClientContext context; armonik::api::grpc::v1::sessions::CancelSessionRequest request; armonik::api::grpc::v1::sessions::CancelSessionResponse response; - request.set_session_id(session_id); + request.set_session_id(std::move(session_id)); auto status = stub->CancelSession(&context, request, &response); if (!status.ok()) { throw armonik::api::common::exceptions::ArmoniKApiException("Could not cancel session : " + status.error_message()); } - return response.session(); + return std::move(*response.mutable_session()); } std::vector armonik::api::client::SessionsClient::list_sessions( From 89acb0443ac70551aeb1fe2c252f6eff44bbcaad Mon Sep 17 00:00:00 2001 From: Dylan Brasseur Date: Thu, 19 Oct 2023 11:42:23 +0200 Subject: [PATCH 3/3] PR suggestions --- .../header/sessions/SessionsClient.h | 6 +++--- .../source/sessions/SessionsClient.cpp | 20 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h b/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h index b54d02d8b..349748160 100644 --- a/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h +++ b/packages/cpp/ArmoniK.Api.Client/header/sessions/SessionsClient.h @@ -21,7 +21,7 @@ class SessionsClient { * @param partitions Partitions the session will be able to send tasks to * @return Session id */ - std::string create_session(const armonik::api::grpc::v1::TaskOptions &default_task_options, + std::string create_session(armonik::api::grpc::v1::TaskOptions default_task_options, const std::vector &partitions = {}); /** @@ -53,9 +53,9 @@ class SessionsClient { * between requests. */ std::vector - list_sessions(const armonik::api::grpc::v1::sessions::Filters &filters, int32_t &total, int32_t page = -1, + list_sessions(armonik::api::grpc::v1::sessions::Filters filters, int32_t &total, int32_t page = -1, int32_t page_size = 500, - const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort &sort = default_sort); + armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort = default_sort); private: std::unique_ptr stub; diff --git a/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp b/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp index 6cc59f046..7780b2f18 100644 --- a/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp +++ b/packages/cpp/ArmoniK.Api.Client/source/sessions/SessionsClient.cpp @@ -1,5 +1,7 @@ -#include "sessions/SessionsClient.h" +#include + #include "exceptions/ArmoniKApiException.h" +#include "sessions/SessionsClient.h" static armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort get_default_sort() { armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort; @@ -12,13 +14,13 @@ const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort armonik::api:: get_default_sort(); std::string -armonik::api::client::SessionsClient::create_session(const armonik::api::grpc::v1::TaskOptions &default_task_options, +armonik::api::client::SessionsClient::create_session(armonik::api::grpc::v1::TaskOptions default_task_options, const std::vector &partitions) { ::grpc::ClientContext context; armonik::api::grpc::v1::sessions::CreateSessionRequest request; armonik::api::grpc::v1::sessions::CreateSessionReply response; - *request.mutable_default_task_option() = default_task_options; + *request.mutable_default_task_option() = std::move(default_task_options); request.mutable_partition_ids()->Add(partitions.begin(), partitions.end()); auto status = stub->CreateSession(&context, request, &response); @@ -56,14 +58,15 @@ armonik::api::client::SessionsClient::cancel_session(std::string session_id) { return std::move(*response.mutable_session()); } -std::vector armonik::api::client::SessionsClient::list_sessions( - const armonik::api::grpc::v1::sessions::Filters &filters, int32_t &total, int32_t page, int32_t page_size, - const armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort &sort) { +std::vector +armonik::api::client::SessionsClient::list_sessions(armonik::api::grpc::v1::sessions::Filters filters, int32_t &total, + int32_t page, int32_t page_size, + armonik::api::grpc::v1::sessions::ListSessionsRequest::Sort sort) { armonik::api::grpc::v1::sessions::ListSessionsRequest request; armonik::api::grpc::v1::sessions::ListSessionsResponse response; - *request.mutable_filters() = filters; - *request.mutable_sort() = sort; + *request.mutable_filters() = std::move(filters); + *request.mutable_sort() = std::move(sort); request.set_page_size(page_size); if (page >= 0) { @@ -99,6 +102,7 @@ std::vector armonik::api::client:: response.clear_sessions(); } while ((int32_t)rawSessions.size() < response.total()); total = response.total(); + // NRVO return rawSessions; } }