Skip to content

Commit

Permalink
feat: Add task handler api c++ (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddiakiteaneo authored Jul 12, 2023
2 parents 9bc0291 + 7d892ba commit 213ed8c
Show file tree
Hide file tree
Showing 6 changed files with 627 additions and 64 deletions.
67 changes: 4 additions & 63 deletions packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "worker_common.grpc.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/ArmoniKWorker.h"
#include "Worker/TaskHandler.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
Expand All @@ -23,68 +26,6 @@ using namespace armonik::api::grpc::v1::worker;
using namespace armonik::api::worker;
using namespace armonik::api::common::utils;

/**
* @brief Implements the Worker service.
*/
class WorkerServiceImpl final : public Worker::Service {
private:
armonik::api::common::serilog::serilog logger;

public:
/**
* @brief Constructs a WorkerServiceImpl object.
*/
WorkerServiceImpl() : logger(armonik::api::common::serilog::logging_format::SEQ) {
logger.info("Build Service WorkerServiceImpl");
logger.add_property("class", "WorkerServiceImpl");
logger.add_property("Worker", "ArmoniK.Api.Cpp");
}

/**
* @brief Implements the Process method of the Worker service.
*
* @param context The ServerContext object.
* @param reader The ServerReader object.
* @param response The ProcessReply object.
*
* @return The status of the method.
*/
Status Process(::grpc::ServerContext *context,
::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) override {
// Implementation of the Process method
logger.info("Receive new request From C++ Worker");
auto output = armonik::api::grpc::v1::Output();
*output.mutable_ok() = armonik::api::grpc::v1::Empty();
ProcessRequest req;
reader->Read(&req);
*response->mutable_output() = output;

logger.info("Finish call C++");

return grpc::Status::OK;
}

/**
* @brief Implements the HealthCheck method of the Worker service.
*
* @param context The ServerContext object.
* @param request The Empty object.
* @param response The HealthCheckReply object.
*
* @return The status of the method.
*/
Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) override {
// Implementation of the HealthCheck method
logger.info("HealthCheck request OK");

response->set_status(HealthCheckReply_ServingStatus_SERVING);

return Status::OK;
}
};

int main(int argc, char **argv) {
std::cout << "Starting C++ worker..." << std::endl;

Expand All @@ -94,7 +35,7 @@ int main(int argc, char **argv) {
config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");

config->get_compute_plane();
WorkerServer::create<WorkerServiceImpl, bool>(config)->run();
WorkerServer::create<ArmoniKWorker, bool>(config)->run();

std::cout << "Stooping Server..." << std::endl;
return 0;
Expand Down
54 changes: 54 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <iostream>
#include <memory>
#include <string>

#include <grpc++/grpc++.h>

#include "grpcpp/support/sync_stream.h"
#include "objects.pb.h"

#include "utils/RootConfiguration.h"
#include "utils/WorkerServer.h"
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/TaskHandler.h"

class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service {
private:
armonik::api::common::serilog::serilog logger_;
std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent_;
void (*processing_function_)(TaskHandler taskHandler);

public:
/**
* @brief Constructs a ArmoniKWorker object.
*/
ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler));

/**
* @brief Implements the Process method of the Worker service.
*
* @param context The ServerContext object.
* @param reader The request iterator
* @param response The ProcessReply object.
*
* @return The status of the method.
*/
grpc::Status Process(::grpc::ServerContext *context,
::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) override;

/**
* @brief Implements the HealthCheck method of the Worker service.
*
* @param context The ServerContext object.
* @param request The Empty object.
* @param response The HealthCheckReply object.
*
* @return The status of the method.
*/
grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) override;
};
99 changes: 99 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once
#include <future>
#include <string>

#include "agent_common.pb.h"
#include "agent_service.grpc.pb.h"

#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

// #include "SessionContext.h"

/**
* @brief The TaskHandler class provides methods to create and handle tasks
*
*/
class TaskHandler {

private:
grpc::ClientContext context_;
std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> stub_;
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator_;
std::string session_id_;
std::string task_id_;
armonik::api::grpc::v1::TaskOptions task_options_;
google::protobuf::RepeatedPtrField<std::string> expected_result_;
std::vector<std::byte> payload_;
std::vector<std::byte> data_dependencies_;
std::string token_;
armonik::api::grpc::v1::Configuration config_;

public:
/**
* @brief Construct a new Task Handler object
*
* @param client the agent client
* @param request_iterator The request iterator
*/
TaskHandler(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator);

/**
* @brief Initialise the task handler
*
*/
void init();

/**
* @brief Create a task_chunk_stream.
*
* @param task_request a task request
* @param is_last A boolean indicating if this is the last request.
* @param chunk_max_size Maximum chunk size.
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
*/
static std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, size_t chunk_max_size);

/**
* @brief Convert task_requests to request_stream.
*
* @param task_requests List of task requests
* @param task_options The Task Options used for this batch of tasks
* @param chunk_max_size Maximum chunk size.
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
static auto to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size)
-> std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>;

/**
* @brief Create a tasks async object
* @param task_options The Task Options used for this batch of tasks
* @param task_requests List of task requests
* @return Successfully sent task
*/
std::future<armonik::api::grpc::v1::agent::CreateTaskReply>
create_tasks_async(armonik::api::grpc::v1::TaskOptions task_options,
const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests);

/**
* @brief Send task result
*
* @param key the key of result
* @param data The result data
* @return A future containing a vector of ResultReply
*/
std::future<std::vector<armonik::api::grpc::v1::agent::ResultReply>> send_result(std::string key,
std::vector<std::byte> &data);

/**
* @brief Get the result ids object
*
* @param results The results data
* @return std::vector<std::string> list of result ids
*/
std::vector<std::string>
get_result_ids(std::vector<armonik::api::grpc::v1::agent::CreateResultsMetaDataRequest_ResultCreate> results);
};
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class WorkerServer {
// Create a stub for the Submitter service
worker_server->agent_stub = Agent::NewStub(worker_server->channel);

worker_server->builder_.RegisterService(new Worker());
worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub), nullptr));
worker_server->logger.info("Finish to register new worker");

return worker_server;
Expand Down
91 changes: 91 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include <iostream>
#include <memory>
#include <string>

#include <grpc++/grpc++.h>

#include "grpcpp/support/sync_stream.h"
#include "objects.pb.h"

#include "utils/RootConfiguration.h"
#include "utils/WorkerServer.h"
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/ArmoniKWorker.h"
#include "Worker/TaskHandler.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;

using armonik::api::common::utils::IConfiguration;
using armonik::api::grpc::v1::TaskOptions;

using namespace armonik::api::grpc::v1::worker;
using namespace armonik::api::worker;
using namespace armonik::api::common::utils;

/**
* @brief Constructs a ArmoniKWorker object.
*/
ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler))
: logger_(armonik::api::common::serilog::logging_format::SEQ) {
logger_.info("Build Service ArmoniKWorker");
logger_.add_property("class", "ArmoniKWorker");
logger_.add_property("Worker", "ArmoniK.Api.Cpp");
agent_ = std::move(agent);
processing_function_ = processing_function;
}

/**
* @brief Implements the Process method of the Worker service.
*
* @param context The ServerContext object.
* @param reader The request iterator
* @param response The ProcessReply object.
*
* @return The status of the method.
*/
Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader<ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) {

logger_.info("Receive new request From C++ real Worker");

auto output = armonik::api::grpc::v1::Output();
*output.mutable_ok() = armonik::api::grpc::v1::Empty();
// ProcessRequest req;
// reader->Read(&req);
*response->mutable_output() = output;

std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator =
std::make_shared<grpc::ServerReader<ProcessRequest>>(*reader);

TaskHandler task_handler(std::move(agent_), request_iterator);

task_handler.init();

logger_.info("Finish call C++");

return grpc::Status::OK;
}

/**
* @brief Implements the HealthCheck method of the Worker service.
*
* @param context The ServerContext object.
* @param request The Empty object.
* @param response The HealthCheckReply object.
*
* @return The status of the method.
*/
Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) {
// Implementation of the HealthCheck method
logger_.info("HealthCheck request OK");

response->set_status(HealthCheckReply_ServingStatus_SERVING);

return Status::OK;
}
Loading

0 comments on commit 213ed8c

Please sign in to comment.