Skip to content

Commit

Permalink
fix: various fixes (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddiakiteaneo authored Jul 21, 2023
2 parents bb89fdf + 3d8ef3d commit 7aaea17
Show file tree
Hide file tree
Showing 31 changed files with 302 additions and 167 deletions.
6 changes: 6 additions & 0 deletions packages/cpp/ArmoniK.Api.Client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ list(APPEND PROTO_GENERATED_FILES ${PROTO_GENERATED_MESSAGES})

target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure ArmoniK.Api.Common)

if(MSVC)
target_compile_options(${PROJECT_NAME} PRIVATE /W4)
else()
target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra)
endif()

target_compile_definitions(${PROJECT_NAME} PUBLIC API_CLIENT_NAMESPACE=${NAMESPACE})

target_include_directories(${PROJECT_NAME}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace API_CLIENT_NAMESPACE {
*/
struct payload_data {
std::string keys;
std::vector<char> payload;
std::string payload;
std::vector<std::string> dependencies;
};

Expand Down Expand Up @@ -91,14 +91,15 @@ class SubmitterClient {
*/
std::tuple<std::vector<std::string>, std::vector<std::string>>
submit_tasks_with_dependencies(std::string session_id, armonik::api::grpc::v1::TaskOptions task_options,
const std::vector<payload_data> &payloads_with_dependencies, int max_retries);
const std::vector<payload_data> &payloads_with_dependencies,
[[maybe_unused]] int max_retries);

/**
* @brief Get result without streaming.
* @param result_request The vector of result requests.
* @return A vector containing the data associated to the result
*/
std::future<std::vector<std::byte>> get_result_async(const armonik::api::grpc::v1::ResultRequest &result_request);
std::future<std::string> get_result_async(const armonik::api::grpc::v1::ResultRequest &result_request);
};

} // namespace API_CLIENT_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#include <sstream>
#include <string>

#include "exceptions/ArmoniKApiException.h"
#include "exceptions/ArmoniKTaskError.h"
#include "exceptions/ArmoniKTaskNotCompletedException.h"
#include "objects.pb.h"
#include "submitter_common.pb.h"
#include "submitter_service.grpc.pb.h"
Expand Down Expand Up @@ -243,15 +246,14 @@ API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(
std::vector<TaskRequest> requests;
for (auto &payload : payloads_with_dependencies) {
TaskRequest request;
auto &bytes = payload.payload;

request.add_expected_output_keys(payload.keys);

*request.mutable_payload() = std::string(bytes.begin(), bytes.end());
*request.mutable_payload() = payload.payload;

*request.mutable_data_dependencies() = {payload.dependencies.begin(), payload.dependencies.end()};

requests.push_back(std::move(request));
requests.push_back(request);
}

auto tasks_async = create_tasks_async(std::move(session_id), std::move(task_options), requests);
Expand Down Expand Up @@ -288,56 +290,46 @@ API_CLIENT_NAMESPACE::SubmitterClient::submit_tasks_with_dependencies(
* @param result_request A vector of ResultRequest objects.
* @return A future containing data result.
*/
std::future<std::vector<std::byte>>
API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) {
std::future<std::string> API_CLIENT_NAMESPACE::SubmitterClient::get_result_async(const ResultRequest &result_request) {
return std::async(std::launch::async, [this, &result_request]() {
ResultReply result_writer;
ClientContext context_configuration;
ResultReply reply;
ClientContext context_result;
armonik::api::grpc::v1::Configuration config_response;

const auto config_status =
stub_->GetServiceConfiguration(&context_configuration, armonik::api::grpc::v1::Empty(), &config_response);

size_t size = 0;
if (config_status.ok()) {
size = config_response.data_chunk_max_size();
} else {
throw std::runtime_error("Fail to get service configuration");
}

auto streamingCall = stub_->TryGetResultStream(&context_result, result_request);

if (!streamingCall) {
throw std::runtime_error("Fail to get result");
}

std::vector<std::byte> result_data;
for (size_t count = 0; count < size; count++) {
streamingCall->WaitForInitialMetadata();
streamingCall->Read(&result_writer);
std::string dataString;
switch (result_writer.type_case()) {
std::string result_data;
bool dataComplete = false;
while (streamingCall->Read(&reply)) {
size_t offset = result_data.size();
switch (reply.type_case()) {
case ResultReply::kResult:
dataString = result_writer.result().data();
result_data.resize(dataString.length());
std::memcpy(result_data.data(), dataString.data(), dataString.size());

switch (reply.result().type_case()) {
case armonik::api::grpc::v1::DataChunk::kData:
result_data.resize(offset + reply.result().data().size());
std::memcpy(result_data.data() + offset, reply.result().data().data(), reply.result().data().size());
dataComplete = false; // Setting to false in case we receive stuff out of order
break;
case armonik::api::grpc::v1::DataChunk::kDataComplete:
dataComplete = true;
break;
case armonik::api::grpc::v1::DataChunk::TYPE_NOT_SET:
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Issue with server, invalid data chunk");
}
break;
case ResultReply::kError:
throw std::runtime_error("Error in task ");

throw ArmoniK::Api::Common::exceptions::ArmoniKTaskError("Can't get result because it's in error",
reply.error());
case ResultReply::kNotCompletedTask:
throw std::runtime_error("Task not completed");

throw ArmoniK::Api::Common::exceptions::ArmoniKTaskNotCompletedException(reply.not_completed_task());
case ResultReply::TYPE_NOT_SET:
throw std::runtime_error("Issue with the Server");

default:
throw std::runtime_error("Unknown return type !");
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Issue with server, invalid reply");
}
}

if (!dataComplete) {
throw ArmoniK::Api::Common::exceptions::ArmoniKApiException("Retrieved data is incomplete");
}

return result_data;
});
}
7 changes: 7 additions & 0 deletions packages/cpp/ArmoniK.Api.Common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ file(MAKE_DIRECTORY ${PROJECT_BUILD_DIR})
add_library(${PROJECT_NAME} STATIC ${PROTO_GENERATED_FILES} ${SRC_CLIENT_FILES} ${HEADER_CLIENT_FILES} ${simdjson_SOURCE_DIR}/singleheader/simdjson.cpp ${simdjson_SOURCE_DIR}/singleheader/simdjson.h)

target_link_libraries(${PROJECT_NAME} PUBLIC protobuf::libprotobuf gRPC::grpc++_unsecure)
target_compile_definitions(${PROJECT_NAME} PUBLIC API_COMMON_NAMESPACE=${NAMESPACE})

if(MSVC)
target_compile_options(${PROJECT_NAME} PRIVATE /W4)
else()
target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra)
endif()

set(PROTO_BINARY_DIR "${PROJECT_BUILD_DIR}")
set(PROTO_IMPORT_DIRS "${PROTO_FILES_DIR}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef ARMONIK_API_ARMONIKAPIEXCEPTION_H
#define ARMONIK_API_ARMONIKAPIEXCEPTION_H

#include <stdexcept>
namespace API_COMMON_NAMESPACE::exceptions {

class ArmoniKApiException : public std::runtime_error {
public:
explicit ArmoniKApiException(const std::string &message) : runtime_error(message) {}
};
} // namespace API_COMMON_NAMESPACE::exceptions

#endif // ARMONIK_API_ARMONIKAPIEXCEPTION_H
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#ifndef ARMONIK_API_ARMONIKTASKERROR_H
#define ARMONIK_API_ARMONIKTASKERROR_H

#include "ArmoniKApiException.h"
#include <objects.pb.h>
#include <sstream>
#include <vector>
namespace API_COMMON_NAMESPACE::exceptions {

class ArmoniKTaskError : public ArmoniKApiException {
public:
explicit ArmoniKTaskError(const std::string &message, const armonik::api::grpc::v1::TaskError &task_error)
: ArmoniKApiException(message) {
std::stringstream ss;
ss << "TaskId : " << task_error.task_id() << " Errors : ";
for (auto &&e : task_error.errors()) {
std::string status = armonik::api::grpc::v1::task_status::TaskStatus_Name(e.task_status());
status_details.emplace_back(status, e.detail());
ss << '\n' << status << " : " << e.detail();
}
details = std::string(ArmoniKApiException::what()) + " : " + ss.str();
taskId_ = task_error.task_id();
}
[[nodiscard]] const char *what() const noexcept override { return details.c_str(); }
const std::string &taskId() { return taskId_; }
const std::vector<std::pair<std::string, std::string>> &error_details() { return status_details; }

private:
std::string details;
std::string taskId_;
std::vector<std::pair<std::string, std::string>> status_details;
};

} // namespace API_COMMON_NAMESPACE::exceptions

#endif // ARMONIK_API_ARMONIKTASKERROR_H
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H
#define ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H

#include "ArmoniKApiException.h"
namespace API_COMMON_NAMESPACE::exceptions {

class ArmoniKTaskNotCompletedException : public ArmoniKApiException {
public:
explicit ArmoniKTaskNotCompletedException(const std::string &taskId, const std::string &message = "")
: ArmoniKApiException("Task " + taskId + " not completed. " + message), taskId(taskId) {}
const std::string taskId;
};
} // namespace API_COMMON_NAMESPACE::exceptions

#endif // ARMONIK_API_ARMONIKTASKNOTCOMPLETEDEXCEPTION_H
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Common/header/options/ComputePlane.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* @brief The armonik namespace contains classes and functions related to the Armonik API.
*/
namespace armonik::api::common::options {
namespace API_COMMON_NAMESPACE::options {
/**
* @brief The ComputePlane class manages the communication addresses for workers and agents.
*/
Expand Down Expand Up @@ -58,4 +58,4 @@ class ComputePlane {
std::string worker_address_; ///< The worker address string.
std::string agent_address_; ///< The agent address string.
};
}; // namespace armonik::api::common::options
}; // namespace API_COMMON_NAMESPACE::options
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Common/header/options/ControlPlane.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "utils/IConfiguration.h"

namespace armonik::api::common::options {
namespace API_COMMON_NAMESPACE::options {
class ControlPlane {
public:
ControlPlane(const utils::IConfiguration &config) {
Expand Down Expand Up @@ -37,6 +37,6 @@ class ControlPlane {
std::string ca_cert_pem_path_;
bool sslValidation_;
};
} // namespace armonik::api::common::options
} // namespace API_COMMON_NAMESPACE::options

#endif // ARMONIK_API_CONTROLPLANE_H
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#pragma once

/**
* @namespace armonik::api::common::options
* @namespace API_COMMON_NAMESPACE::options
* @brief This namespace contains common options for the armonik API.
*/
namespace armonik::api::common::options {
namespace API_COMMON_NAMESPACE::options {
/**
* @enum grpc_socket_type
* @brief Enumerates the types of gRPC sockets supported by armonik API.
Expand All @@ -13,4 +13,4 @@ enum grpc_socket_type {
tcp = 1, /**< @brief TCP/IP socket type */
UnixDomainSocket = 2 /**< @brief Unix domain socket type */
};
}; // namespace armonik::api::common::options
}; // namespace API_COMMON_NAMESPACE::options
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/**
* @brief This namespace encapsulates the Serilog logging system used in the Armonik API.
*/
namespace armonik::api::common::serilog {
namespace API_COMMON_NAMESPACE::serilog {
/**
* @brief A struct representing a Serilog context with support for adding and retrieving properties.
*/
Expand Down Expand Up @@ -74,4 +74,4 @@ struct serilog_context {
serilog_properties_vector_t _properties;
///< The vector of Serilog properties pairs.
};
} // namespace armonik::api::common::serilog
} // namespace API_COMMON_NAMESPACE::serilog
4 changes: 2 additions & 2 deletions packages/cpp/ArmoniK.Api.Common/header/serilog/serilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "serilog/serilog_typedef.h"
#include "utils/StringsUtils.h"

namespace armonik::api::common::serilog
namespace API_COMMON_NAMESPACE::serilog
/**
@brief Increments the logging level.
Expand Down Expand Up @@ -807,4 +807,4 @@ class serilog {
shared_instance().init(level, level_serilog);
}
};
} // namespace armonik::api::common::serilog
} // namespace API_COMMON_NAMESPACE::serilog
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#include "utils/StringsUtils.h"
#include <vector>

/// @namespace armonik::api::common::serilog
/// @namespace API_COMMON_NAMESPACE::serilog
/// @brief A namespace for serilog functionality in the Armonik API
namespace armonik::api::common::serilog {
namespace API_COMMON_NAMESPACE::serilog {
/// @typedef serilog_properties_pair_t
/// @brief A pair containing a string as a key and a json_string as a value for serilog properties
using serilog_properties_pair_t = std::pair<std::string, utils::json_string>;
Expand Down Expand Up @@ -34,4 +34,4 @@ enum logging_format {
CONSOLE = 0,
SEQ = 1,
};
} // namespace armonik::api::common::serilog
} // namespace API_COMMON_NAMESPACE::serilog
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#include "utils/IConfiguration.h"

namespace armonik::api::common::utils {
namespace API_COMMON_NAMESPACE::utils {
/**
* @class EnvConfiguration
* @brief An implementation of IConfiguration that handles environment variables
Expand All @@ -19,4 +19,4 @@ class EnvConfiguration : public IConfiguration {
*/
EnvConfiguration() { add_env_configuration(); }
};
} // namespace armonik::api::common::utils
} // namespace API_COMMON_NAMESPACE::utils
6 changes: 3 additions & 3 deletions packages/cpp/ArmoniK.Api.Common/header/utils/GuuId.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
#include <random>

/**
* @brief The armonik::api::common::utils namespace provides utility classes and functions for the Armonik API.
* @brief The API_COMMON_NAMESPACE::utils namespace provides utility classes and functions for the Armonik API.
*/
namespace armonik::api::common::utils {
namespace API_COMMON_NAMESPACE::utils {
/**
* @class GuuId
* @brief The GuuId class provides a static method for generating UUIDs.
Expand Down Expand Up @@ -44,4 +44,4 @@ class GuuId {
return uuid;
}
};
} // namespace armonik::api::common::utils
} // namespace API_COMMON_NAMESPACE::utils
8 changes: 4 additions & 4 deletions packages/cpp/ArmoniK.Api.Common/header/utils/IConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
#include <string>
#include <vector>

namespace armonik::api::common::options {
namespace API_COMMON_NAMESPACE::options {
class ComputePlane;
class ControlPlane;
} // namespace armonik::api::common::options
} // namespace API_COMMON_NAMESPACE::options

namespace armonik::api::common::utils {
namespace API_COMMON_NAMESPACE::utils {
/**
* @class IConfiguration
* @brief Interface for a configuration class that stores and manages key-value pairs.
Expand Down Expand Up @@ -92,4 +92,4 @@ class IConfiguration {
std::set<std::string> above_env_keys_;
bool use_environment_ = false;
};
} // namespace armonik::api::common::utils
} // namespace API_COMMON_NAMESPACE::utils
Loading

0 comments on commit 7aaea17

Please sign in to comment.