Skip to content

Commit

Permalink
feat: Added channel retry arguments (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrasseur-aneo authored Oct 11, 2023
2 parents 9ce98b2 + 9b6a876 commit 165f373
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@aneoconsultingfr/armonik.api.angular",
"version": "3.13.1",
"version": "3.13.2",
"description": "gRPC API to interact with ArmoniK built for Angular",
"license": "Apache-2.0",
"homepage": "https://github.com/aneoconsulting/ArmoniK.Api#readme",
Expand Down
47 changes: 33 additions & 14 deletions packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,61 @@
#define ARMONIK_API_CONTROLPLANE_H

#include "utils/Configuration.h"
#include <cmath>
#include <google/protobuf/duration.pb.h>

namespace armonik {
namespace api {
namespace common {
namespace options {
class ControlPlane {
public:
ControlPlane(const utils::Configuration &config) {
endpoint_ = config.get(EndpointKey);
user_cert_pem_path_ = config.get(UserCertKey);
user_key_pem_path_ = config.get(UserKeyKey);
user_p12_path_ = config.get(UserP12Key);
ca_cert_pem_path_ = config.get(CaCertKey);
sslValidation_ = config.get(SSLValidationKey) != "disable";
}
ControlPlane(const utils::Configuration &config);

[[nodiscard]] absl::string_view getEndpoint() const { return endpoint_; }
[[nodiscard]] absl::string_view getUserCertPemPath() const { return user_cert_pem_path_; }
[[nodiscard]] absl::string_view getUserKeyPemPath() const { return user_key_pem_path_; }
[[nodiscard]] absl::string_view getUserP12Path() const { return user_p12_path_; }
[[nodiscard]] absl::string_view getCaCertPemPath() const { return ca_cert_pem_path_; }
[[nodiscard]] bool isSslValidation() const { return sslValidation_; }
absl::string_view getEndpoint() const { return endpoint_; }
absl::string_view getUserCertPemPath() const { return user_cert_pem_path_; }
absl::string_view getUserKeyPemPath() const { return user_key_pem_path_; }
absl::string_view getUserP12Path() const { return user_p12_path_; }
absl::string_view getCaCertPemPath() const { return ca_cert_pem_path_; }
bool isSslValidation() const { return sslValidation_; }
const google::protobuf::Duration &getKeepAliveTime() const { return keep_alive_time_; }
const google::protobuf::Duration &getKeepAliveTimeInterval() const { return keep_alive_time_interval_; }
const google::protobuf::Duration &getMaxIdleTime() const { return max_idle_time_; }
int getMaxAttempts() const { return max_attempts_; }
double getBackoffMultiplier() const { return backoff_multiplier_; }
const google::protobuf::Duration &getInitialBackoff() const { return initial_backoff_; }
const google::protobuf::Duration &getMaxBackoff() const { return max_backoff_; }
const google::protobuf::Duration &getRequestTimeout() const { return request_timeout_; }

static constexpr char EndpointKey[] = "Grpc__EndPoint";
static constexpr char UserCertKey[] = "Grpc__ClientCert";
static constexpr char UserKeyKey[] = "Grpc__ClientKey";
static constexpr char UserP12Key[] = "Grpc__ClientP12";
static constexpr char CaCertKey[] = "Grpc__CaCert";
static constexpr char SSLValidationKey[] = "Grpc__SSLValidation";
static constexpr char KeepAliveTimeKey[] = "Grpc__KeepAliveTime";
static constexpr char KeepAliveTimeIntervalKey[] = "Grpc__KeepAliveTimeInterval";
static constexpr char MaxIdleTimeKey[] = "Grpc__MaxIdleTime";
static constexpr char MaxAttemptsKey[] = "Grpc__MaxAttempts";
static constexpr char BackoffMultiplierKey[] = "Grpc__BackoffMultiplier";
static constexpr char InitialBackOffKey[] = "Grpc__InitialBackOff";
static constexpr char MaxBackOffKey[] = "Grpc__MaxBackOff";
static constexpr char RequestTimeoutKey[] = "Grpc__RequestTimeout";

private:
std::string endpoint_;
std::string user_cert_pem_path_;
std::string user_key_pem_path_;
std::string user_p12_path_;
std::string ca_cert_pem_path_;
::google::protobuf::Duration keep_alive_time_;
::google::protobuf::Duration keep_alive_time_interval_;
::google::protobuf::Duration max_idle_time_;
int max_attempts_{};
double backoff_multiplier_{};
::google::protobuf::Duration initial_backoff_;
::google::protobuf::Duration max_backoff_;
::google::protobuf::Duration request_timeout_;
bool sslValidation_;
};
} // namespace options
Expand Down
33 changes: 33 additions & 0 deletions packages/cpp/ArmoniK.Api.Common/header/utils/ChannelArguments.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "utils/Configuration.h"
#include <grpcpp/support/channel_arguments.h>

namespace armonik {
namespace api {
namespace common {
namespace utils {
/**
* Get custom channel arguments for channel creation
* @param config Configuration
* @return Channel arguments
*/
::grpc::ChannelArguments getChannelArguments(const Configuration &config);

/**
* Get custom channel arguments for channel creation
* @param config Control Plane configuration
* @return Channel arguments
*/
::grpc::ChannelArguments getChannelArguments(const options::ControlPlane &config);

/**
* Generate the service config for the channel arguments
* @param config Control Plane configuration
* @return Json of the service
*/
std::string getServiceConfigJson(const armonik::api::common::options::ControlPlane &config);
} // namespace utils
} // namespace common
} // namespace api
} // namespace armonik
33 changes: 33 additions & 0 deletions packages/cpp/ArmoniK.Api.Common/header/utils/Utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "google/protobuf/duration.pb.h"

namespace armonik {
namespace api {
namespace common {
namespace utils {
/**
* Creates a duration from the given values
* @param days Days
* @param hours Hours
* @param minutes Minutes
* @param seconds Seconds
* @param nanoseconds Nanoseconds
* @return Duration with the right value
* @note Make sure that the resulting number of seconds and the nanoseconds are of the same sign for the duration to be
* valid
*/
::google::protobuf::Duration duration_from_values(long long int days = 0, long long int hours = 0,
long long int minutes = 0, long long int seconds = 0,
int nanoseconds = 0);

/**
* Creates a duration from timespan string
* @param timespan string with format [-][d.]hh:mm:ss[.fffffffff]
* @return Duration in accordance with timespan
*/
::google::protobuf::Duration duration_from_timespan(const std::string &timespan);
} // namespace utils
} // namespace common
} // namespace api
} // namespace armonik
32 changes: 32 additions & 0 deletions packages/cpp/ArmoniK.Api.Common/source/options/ControlPlane.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "options/ControlPlane.h"
#include "utils/Configuration.h"
#include "utils/Utils.h"

armonik::api::common::options::ControlPlane::ControlPlane(const utils::Configuration &config) {
endpoint_ = config.get(EndpointKey);
user_cert_pem_path_ = config.get(UserCertKey);
user_key_pem_path_ = config.get(UserKeyKey);
user_p12_path_ = config.get(UserP12Key);
ca_cert_pem_path_ = config.get(CaCertKey);
sslValidation_ = config.get(SSLValidationKey) != "disable";

keep_alive_time_ = config.get(KeepAliveTimeKey).empty() ? utils::duration_from_values(0, 0, 0, 30)
: utils::duration_from_timespan(config.get(KeepAliveTimeKey));
keep_alive_time_interval_ = config.get(KeepAliveTimeIntervalKey).empty()
? utils::duration_from_values(0, 0, 0, 30)
: utils::duration_from_timespan(config.get(KeepAliveTimeIntervalKey));
max_idle_time_ = config.get(MaxIdleTimeKey).empty() ? utils::duration_from_values(0, 0, 5)
: utils::duration_from_timespan(config.get(MaxIdleTimeKey));
long attempts = std::strtol(config.get(MaxAttemptsKey).c_str(), nullptr, 10);
max_attempts_ = (attempts <= 0 || attempts >= INT_MAX) ? 5 : (int)attempts;
backoff_multiplier_ = strtod(config.get(BackoffMultiplierKey).c_str(), nullptr);
backoff_multiplier_ = backoff_multiplier_ == 0 || backoff_multiplier_ == HUGE_VAL ? 1.5 : backoff_multiplier_;
initial_backoff_ = config.get(InitialBackOffKey).empty()
? utils::duration_from_values(0, 0, 0, 1)
: utils::duration_from_timespan(config.get(InitialBackOffKey));
max_backoff_ = config.get(MaxBackOffKey).empty() ? utils::duration_from_values(0, 0, 0, 5)
: utils::duration_from_timespan(config.get(MaxBackOffKey));
request_timeout_ = config.get(RequestTimeoutKey).empty()
? utils::duration_from_values(366)
: utils::duration_from_timespan(config.get(RequestTimeoutKey));
}
53 changes: 53 additions & 0 deletions packages/cpp/ArmoniK.Api.Common/source/utils/ChannelArguments.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "utils/ChannelArguments.h"
#include "options/ControlPlane.h"
#include <google/protobuf/util/json_util.h>
#include <sstream>

template <typename T, typename Tin> T saturate_cast(Tin in, T max_value = std::numeric_limits<T>::max()) {
if (in > max_value) {
return max_value;
}
return static_cast<T>(in);
}

int getMilliseconds(const google::protobuf::Duration &duration) {
return saturate_cast<int>(duration.seconds() * 1000) + (duration.nanos() / 1000000);
}

std::string
armonik::api::common::utils::getServiceConfigJson(const armonik::api::common::options::ControlPlane &config) {
std::stringstream ss;
std::string initialBackoff, maxBackoff, timeout;
auto status = google::protobuf::util::MessageToJsonString(config.getInitialBackoff(), &initialBackoff);
if (!status.ok()) {
throw std::invalid_argument("Initial backoff is invalid" + status.ToString());
}
status = google::protobuf::util::MessageToJsonString(config.getMaxBackoff(), &maxBackoff);
if (!status.ok()) {
throw std::invalid_argument("Max backoff is invalid" + status.ToString());
}
status = google::protobuf::util::MessageToJsonString(config.getRequestTimeout(), &timeout);
if (!status.ok()) {
throw std::invalid_argument("Timeout is invalid" + status.ToString());
}
ss << R"({ "methodConfig": [{ "name": [{}], )"
<< R"("timeout" : )" << timeout << ',' << R"("retryPolicy" : {)"
<< R"("backoffMultiplier": )" << config.getBackoffMultiplier() << ',' << R"("initialBackoff":)" << initialBackoff
<< ","
<< R"("maxBackoff":)" << maxBackoff << ","
<< R"("maxAttempts":)" << config.getMaxAttempts() << ','
<< R"("retryableStatusCodes": [ "UNAVAILABLE", "ABORTED", "UNKNOWN" ])"
<< "}}]}";
return ss.str();
}

::grpc::ChannelArguments armonik::api::common::utils::getChannelArguments(const Configuration &config) {
return getChannelArguments(config.get_control_plane());
}
::grpc::ChannelArguments armonik::api::common::utils::getChannelArguments(const options::ControlPlane &config) {
::grpc::ChannelArguments args;
args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, getMilliseconds(config.getKeepAliveTime()));
args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, getMilliseconds(config.getMaxIdleTime()));
args.SetServiceConfigJSON(getServiceConfigJson(config));
return args;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ constexpr char armonik::api::common::options::ControlPlane::SSLValidationKey[];
constexpr char armonik::api::common::options::ControlPlane::UserCertKey[];
constexpr char armonik::api::common::options::ControlPlane::UserKeyKey[];
constexpr char armonik::api::common::options::ControlPlane::UserP12Key[];
constexpr char armonik::api::common::options::ControlPlane::KeepAliveTimeKey[];
constexpr char armonik::api::common::options::ControlPlane::KeepAliveTimeIntervalKey[];
constexpr char armonik::api::common::options::ControlPlane::MaxIdleTimeKey[];
constexpr char armonik::api::common::options::ControlPlane::MaxAttemptsKey[];
constexpr char armonik::api::common::options::ControlPlane::BackoffMultiplierKey[];
constexpr char armonik::api::common::options::ControlPlane::InitialBackOffKey[];
constexpr char armonik::api::common::options::ControlPlane::MaxBackOffKey[];
constexpr char armonik::api::common::options::ControlPlane::RequestTimeoutKey[];

namespace armonik {
namespace api {
Expand Down
69 changes: 69 additions & 0 deletions packages/cpp/ArmoniK.Api.Common/source/utils/Utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include "utils/Utils.h"
#include <absl/strings/str_split.h>
#include <cmath>
#include <iomanip>

namespace armonik {
namespace api {
namespace common {
namespace utils {

::google::protobuf::Duration duration_from_values(long long int days, long long int hours, long long int minutes,
long long int seconds, int nanoseconds) {
::google::protobuf::Duration duration;
duration.set_seconds(days * 86400 + 3600 * hours + 60 * minutes + seconds);
duration.set_nanos(nanoseconds);
return duration;
}

/**
* Creates a duration from timespan string
* @param timespan string with format [-][d.]hh:mm:ss[.fffffffff]
* @return Duration in accordance with timespan
*/
::google::protobuf::Duration duration_from_timespan(const std::string &timespan) {
auto splitted = absl::StrSplit(timespan, ':');
std::vector<std::string> sections(splitted.begin(), splitted.end());
long days = 0, hours, minutes, seconds;
if (sections.size() != 3) {
throw std::invalid_argument("timespan is not of the format [-][d.]hh:mm:ss[.fffffffff]");
}
// Split the days.hours
auto subsplitted = absl::StrSplit(sections[0], '.');
std::vector<std::string> subsplit(subsplitted.begin(), subsplitted.end());
if (subsplit.size() > 2) {
throw std::invalid_argument("timespan is not of the format [-][d.]hh:mm:ss[.fffffffff]");
}
// Sign is only present in the first section
int sign = absl::StrContains(subsplit[0], '-') ? -1 : 1;
if (subsplit.size() == 2) {
days = std::strtol(subsplit[0].c_str(), nullptr, 10);
hours = sign * std::strtol(subsplit[1].c_str(), nullptr, 10);
} else {
hours = std::strtol(subsplit[0].c_str(), nullptr, 10);
}

minutes = sign * std::strtol(sections[1].c_str(), nullptr, 10);
subsplitted = absl::StrSplit(sections[2], '.');
std::vector<std::string> subsplit_sec(subsplitted.begin(), subsplitted.end());
if (subsplit_sec.size() > 2) {
throw std::invalid_argument("timespan is not of the format [-][d.]hh:mm:ss[.fffffffff]");
}
int nanos = 0;
seconds = sign * std::strtol(subsplit_sec[0].c_str(), nullptr, 10);
if (subsplit_sec.size() == 2) {
if (subsplit_sec[1].length() >= 9) {
nanos = sign * (int)std::strtol(subsplit_sec[1].substr(0, 9).c_str(), nullptr, 10);
} else {
nanos = sign *
(int)std::strtol((subsplit_sec[1] + std::string(9 - subsplit_sec[1].length(), '0')).c_str(), nullptr, 10);
}
}

return duration_from_values(days, hours, minutes, seconds, nanos);
}

} // namespace utils
} // namespace common
} // namespace api
} // namespace armonik
64 changes: 64 additions & 0 deletions packages/cpp/ArmoniK.Api.Tests/source/ChannelOptionsTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "common.h"
#include "options/ControlPlane.h"
#include "utils/Configuration.h"
#include <grpcpp/create_channel.h>
#include <gtest/gtest.h>

#include "submitter/SubmitterClient.h"
#include "utils/ChannelArguments.h"

armonik::api::grpc::v1::TaskOptions default_task_options() {
armonik::api::grpc::v1::TaskOptions default_task_options;
default_task_options.mutable_options()->insert({"key1", "value1"});
default_task_options.mutable_options()->insert({"key2", "value2"});
default_task_options.mutable_max_duration()->set_seconds(3600);
default_task_options.mutable_max_duration()->set_nanos(0);
default_task_options.set_max_retries(1);
default_task_options.set_priority(1);
default_task_options.set_partition_id("");
default_task_options.set_application_name("my-app");
default_task_options.set_application_version("1.0");
default_task_options.set_application_namespace("my-namespace");
default_task_options.set_application_service("my-service");
default_task_options.set_engine_type("Unified");
return default_task_options;
}

TEST(Options, no_options) {
armonik::api::common::utils::Configuration configuration;
configuration.add_json_configuration("appsettings.json").add_env_configuration();

std::string server_address = configuration.get("Grpc__EndPoint");
auto channel = ::grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
armonik::api::client::SubmitterClient client(armonik::api::grpc::v1::submitter::Submitter::NewStub(channel));
ASSERT_NO_THROW(client.create_session(default_task_options(), {}));
}

TEST(Options, default_options) {
armonik::api::common::utils::Configuration configuration;
configuration.add_json_configuration("appsettings.json").add_env_configuration();

std::string server_address = configuration.get("Grpc__EndPoint");
auto args = armonik::api::common::utils::getChannelArguments(configuration);
auto channel = ::grpc::CreateCustomChannel(server_address, grpc::InsecureChannelCredentials(), args);
armonik::api::client::SubmitterClient client(armonik::api::grpc::v1::submitter::Submitter::NewStub(channel));
ASSERT_NO_THROW(client.create_session(default_task_options(), {}));
}

TEST(Options, test_timeout) {
armonik::api::common::utils::Configuration configuration;
configuration.add_json_configuration("appsettings.json").add_env_configuration();

std::string server_address = configuration.get("Grpc__EndPoint");
configuration.set(armonik::api::common::options::ControlPlane::RequestTimeoutKey, "0:0:0.001"); // 1ms, way too short
armonik::api::client::SubmitterClient client(armonik::api::grpc::v1::submitter::Submitter::NewStub(
::grpc::CreateCustomChannel(server_address, grpc::InsecureChannelCredentials(),
armonik::api::common::utils::getChannelArguments(configuration))));
ASSERT_ANY_THROW(client.create_session(default_task_options(), {}));
configuration.set(armonik::api::common::options::ControlPlane::RequestTimeoutKey,
"0:0:10"); // 10s, should have finished by now
client = armonik::api::client::SubmitterClient(armonik::api::grpc::v1::submitter::Submitter::NewStub(
::grpc::CreateCustomChannel(server_address, grpc::InsecureChannelCredentials(),
armonik::api::common::utils::getChannelArguments(configuration))));
ASSERT_NO_THROW(client.create_session(default_task_options(), {}));
}
Loading

0 comments on commit 165f373

Please sign in to comment.