Skip to content

Commit

Permalink
Apply format
Browse files Browse the repository at this point in the history
  • Loading branch information
ddiakiteaneo committed Jul 13, 2023
1 parent 835148d commit fd39fd7
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "submitter_common.pb.h"
#include "submitter_service.grpc.pb.h"

namespace API_CLIENT_NAMESPACE{
namespace API_CLIENT_NAMESPACE {

/**
* @brief Data structure for task payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ using namespace armonik::api::grpc::v1::submitter;
*
* @param stub the gRPC client stub
*/
API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr<Submitter::StubInterface> stub) { stub_ = std::move(stub); }
API_CLIENT_NAMESPACE::SubmitterClient::SubmitterClient(std::unique_ptr<Submitter::StubInterface> stub) {
stub_ = std::move(stub);
}

/**
* @brief Create a new session.
* @param partition_ids The partitions ids.
* @param default_task_options The default task options.
*/
std::string API_CLIENT_NAMESPACE::SubmitterClient::create_session(TaskOptions default_task_options,
const std::vector<std::string> &partition_ids = {}) {
const std::vector<std::string> &partition_ids = {}) {
CreateSessionRequest request;
*request.mutable_default_task_option() = std::move(default_task_options);
for (const auto &partition_id : partition_ids) {
Expand Down Expand Up @@ -65,8 +67,9 @@ std::string API_CLIENT_NAMESPACE::SubmitterClient::create_session(TaskOptions de
* @return A vector of futures containing CreateLargeTaskRequest objects.
*/
std::vector<std::future<std::vector<CreateLargeTaskRequest>>>
API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector<TaskRequest> &task_requests, std::string session_id,
TaskOptions task_options, const size_t chunk_max_size) {
API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector<TaskRequest> &task_requests,
std::string session_id, TaskOptions task_options,
const size_t chunk_max_size) {
std::vector<std::future<std::vector<CreateLargeTaskRequest>>> async_chunk_payload_tasks;
async_chunk_payload_tasks.push_back(
std::async([session_id = std::move(session_id), task_options = std::move(task_options)]() mutable {
Expand Down Expand Up @@ -98,7 +101,8 @@ API_CLIENT_NAMESPACE::SubmitterClient::to_request_stream(const std::vector<TaskR
* @return A future containing a vector of CreateLargeTaskRequest objects.
*/
std::future<std::vector<CreateLargeTaskRequest>>
API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last, size_t chunk_max_size) {
API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task_request, bool is_last,
size_t chunk_max_size) {
return std::async(std::launch::async, [&task_request, chunk_max_size, is_last]() {
std::vector<CreateLargeTaskRequest> requests;
armonik::api::grpc::v1::InitTaskRequest header_task_request;
Expand Down Expand Up @@ -172,8 +176,9 @@ API_CLIENT_NAMESPACE::SubmitterClient::task_chunk_stream(const TaskRequest &task
* @param task_requests A vector of TaskRequest objects.
* @return A future containing a CreateTaskReply object.
*/
std::future<CreateTaskReply> API_CLIENT_NAMESPACE::SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
std::future<CreateTaskReply>
API_CLIENT_NAMESPACE::SubmitterClient::create_tasks_async(std::string session_id, TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
return std::async(std::launch::async, [this, task_requests, session_id = std::move(session_id),
task_options = std::move(task_options)]() mutable {
armonik::api::grpc::v1::Configuration config_response;
Expand Down Expand Up @@ -230,9 +235,9 @@ std::future<CreateTaskReply> API_CLIENT_NAMESPACE::SubmitterClient::create_tasks
* @return A vector of task IDs.
*/
std::tuple<std::vector<std::string>, std::vector<std::string>>
API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(std::string session_id, TaskOptions task_options,
const std::vector<payload_data> &payloads_with_dependencies,
int max_retries = 5) {
API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(
std::string session_id, TaskOptions task_options, const std::vector<payload_data> &payloads_with_dependencies,
int max_retries = 5) {
std::vector<std::string> task_ids;
std::vector<std::string> failed_task_ids;
std::vector<TaskRequest> requests;
Expand Down Expand Up @@ -283,7 +288,8 @@ API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(std::strin
* @param result_request A vector of ResultRequest objects.
* @return A future containing data result.
*/
std::future<std::vector<std::byte>> API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) {
std::future<std::vector<std::byte>>
API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) {
return std::async(std::launch::async, [this, &result_request]() {
ResultReply result_writer;
ClientContext context_configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ TEST(testMock, submitTask) {
log.enrich([&](serilog_context &ctx) { ctx.add("fieldTestValue", 1); });
log.add_property("time", time(nullptr));

::putenv((char*)"GRPC_DNS_RESOLVER=native");
::putenv((char *)"GRPC_DNS_RESOLVER=native");

std::cout << "Starting client..." << std::endl;

Expand Down
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include "Worker/TaskHandler.h"

namespace API_WORKER_NAMESPACE{
namespace API_WORKER_NAMESPACE {

class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service {
private:
armonik::api::common::serilog::serilog logger_;
Expand Down
7 changes: 4 additions & 3 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

namespace API_WORKER_NAMESPACE{
namespace API_WORKER_NAMESPACE {

// #include "SessionContext.h"

Expand Down Expand Up @@ -66,8 +66,9 @@ class TaskHandler {
* @param chunk_max_size Maximum chunk size.
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
static std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>> to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size);
static std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size);

/**
* @brief Create a tasks async object
Expand Down
12 changes: 7 additions & 5 deletions packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace armonik::api::common::utils;
* @brief Constructs a ArmoniKWorker object.
*/
API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler))
void (*processing_function)(TaskHandler task_handler))
: logger_(armonik::api::common::serilog::logging_format::SEQ) {
logger_.info("Build Service ArmoniKWorker");
logger_.add_property("class", "ArmoniKWorker");
Expand All @@ -48,8 +48,9 @@ API_WORKER_NAMESPACE::ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api:
*
* @return The status of the method.
*/
Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader<ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) {
Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *context,
::grpc::ServerReader<ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) {

logger_.info("Receive new request From C++ real Worker");

Expand Down Expand Up @@ -80,8 +81,9 @@ Status API_WORKER_NAMESPACE::ArmoniKWorker::Process(::grpc::ServerContext *conte
*
* @return The status of the method.
*/
Status API_WORKER_NAMESPACE::ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) {
Status API_WORKER_NAMESPACE::ArmoniKWorker::HealthCheck(::grpc::ServerContext *context,
const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) {
// Implementation of the HealthCheck method
logger_.info("HealthCheck request OK");

Expand Down
21 changes: 12 additions & 9 deletions packages/cpp/ArmoniK.Api.Worker/source/Worker/TaskHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace armonik::api::grpc::v1::agent;
* @param request_iterator The request iterator
*/
API_WORKER_NAMESPACE::TaskHandler::TaskHandler(std::unique_ptr<Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator) {
std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator) {
stub_ = std::move(client);
request_iterator_ = std::move(request_iterator);
}
Expand Down Expand Up @@ -145,8 +145,8 @@ void API_WORKER_NAMESPACE::TaskHandler::init() {
* @param chunk_max_size Maximum chunk size.
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
*/
std::future<std::vector<CreateTaskRequest>> API_WORKER_NAMESPACE::TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last,
size_t chunk_max_size) {
std::future<std::vector<CreateTaskRequest>>
API_WORKER_NAMESPACE::TaskHandler::task_chunk_stream(TaskRequest task_request, bool is_last, size_t chunk_max_size) {
return std::async(std::launch::async, [task_request = std::move(task_request), chunk_max_size, is_last]() {
std::vector<CreateTaskRequest> requests;
armonik::api::grpc::v1::InitTaskRequest header_task_request;
Expand Down Expand Up @@ -220,8 +220,8 @@ std::future<std::vector<CreateTaskRequest>> API_WORKER_NAMESPACE::TaskHandler::t
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
std::vector<std::future<std::vector<CreateTaskRequest>>>
API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector<TaskRequest> &task_requests, TaskOptions task_options,
const size_t chunk_max_size) {
API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector<TaskRequest> &task_requests,
TaskOptions task_options, const size_t chunk_max_size) {
std::vector<std::future<std::vector<CreateTaskRequest>>> async_chunk_payload_tasks;

async_chunk_payload_tasks.push_back(std::async([task_options = std::move(task_options)]() mutable {
Expand Down Expand Up @@ -249,8 +249,9 @@ API_WORKER_NAMESPACE::TaskHandler::to_request_stream(const std::vector<TaskReque
* @param task_requests List of task requests
* @return Successfully sent task
*/
std::future<CreateTaskReply> API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
std::future<CreateTaskReply>
API_WORKER_NAMESPACE::TaskHandler::create_tasks_async(TaskOptions task_options,
const std::vector<TaskRequest> &task_requests) {
return std::async(std::launch::async, [this, &task_requests, &task_options]() mutable {
size_t chunk = config_.data_chunk_max_size();

Expand Down Expand Up @@ -288,7 +289,8 @@ std::future<CreateTaskReply> API_WORKER_NAMESPACE::TaskHandler::create_tasks_asy
* @param data The result data
* @return A future containing a vector of ResultReply
*/
std::future<std::vector<ResultReply>> API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key, std::vector<std::byte> &data) {
std::future<std::vector<ResultReply>> API_WORKER_NAMESPACE::TaskHandler::send_result(std::string key,
std::vector<std::byte> &data) {
return std::async(std::launch::async, [this, key, data]() {
std::vector<ResultReply> result;

Expand Down Expand Up @@ -356,7 +358,8 @@ std::future<std::vector<ResultReply>> API_WORKER_NAMESPACE::TaskHandler::send_re
* @param results The results data
* @return std::vector<std::string> list of result ids
*/
std::vector<std::string> API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector<CreateResultsMetaDataRequest_ResultCreate> results) {
std::vector<std::string>
API_WORKER_NAMESPACE::TaskHandler::get_result_ids(std::vector<CreateResultsMetaDataRequest_ResultCreate> results) {
std::vector<std::string> result_ids;

grpc::ClientContext context_client_writer;
Expand Down

0 comments on commit fd39fd7

Please sign in to comment.