Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add task handler api c++ #334

Merged
merged 14 commits into from
Jul 12, 2023
67 changes: 4 additions & 63 deletions packages/cpp/ArmoniK.Api.Worker.Tests/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "worker_common.grpc.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/ArmoniKWorker.h"
#include "Worker/TaskHandler.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
Expand All @@ -23,68 +26,6 @@ using namespace armonik::api::grpc::v1::worker;
using namespace armonik::api::worker;
using namespace armonik::api::common::utils;

/**
* @brief Implements the Worker service.
*/
class WorkerServiceImpl final : public Worker::Service {
private:
armonik::api::common::serilog::serilog logger;

public:
/**
* @brief Constructs a WorkerServiceImpl object.
*/
WorkerServiceImpl() : logger(armonik::api::common::serilog::logging_format::SEQ) {
logger.info("Build Service WorkerServiceImpl");
logger.add_property("class", "WorkerServiceImpl");
logger.add_property("Worker", "ArmoniK.Api.Cpp");
}

/**
* @brief Implements the Process method of the Worker service.
*
* @param context The ServerContext object.
* @param reader The ServerReader object.
* @param response The ProcessReply object.
*
* @return The status of the method.
*/
Status Process(::grpc::ServerContext *context,
::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) override {
// Implementation of the Process method
logger.info("Receive new request From C++ Worker");
auto output = armonik::api::grpc::v1::Output();
*output.mutable_ok() = armonik::api::grpc::v1::Empty();
ProcessRequest req;
reader->Read(&req);
*response->mutable_output() = output;

logger.info("Finish call C++");

return grpc::Status::OK;
}

/**
* @brief Implements the HealthCheck method of the Worker service.
*
* @param context The ServerContext object.
* @param request The Empty object.
* @param response The HealthCheckReply object.
*
* @return The status of the method.
*/
Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) override {
// Implementation of the HealthCheck method
logger.info("HealthCheck request OK");

response->set_status(HealthCheckReply_ServingStatus_SERVING);

return Status::OK;
}
};

int main(int argc, char **argv) {
std::cout << "Starting C++ worker..." << std::endl;

Expand All @@ -94,7 +35,7 @@ int main(int argc, char **argv) {
config->set("ComputePlane__AgentChannel__Address", "/cache/armonik_agent.sock");

config->get_compute_plane();
WorkerServer::create<WorkerServiceImpl, bool>(config)->run();
WorkerServer::create<ArmoniKWorker, bool>(config)->run();

std::cout << "Stooping Server..." << std::endl;
return 0;
Expand Down
54 changes: 54 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/ArmoniKWorker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <iostream>
#include <memory>
#include <string>

#include <grpc++/grpc++.h>

#include "grpcpp/support/sync_stream.h"
#include "objects.pb.h"

#include "utils/RootConfiguration.h"
#include "utils/WorkerServer.h"
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/TaskHandler.h"

class ArmoniKWorker final : public armonik::api::grpc::v1::worker::Worker::Service {
private:
armonik::api::common::serilog::serilog logger_;
std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent_;
void (*processing_function_)(TaskHandler taskHandler);

public:
/**
* @brief Constructs a ArmoniKWorker object.
*/
ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler));

/**
* @brief Implements the Process method of the Worker service.
*
* @param context The ServerContext object.
* @param reader The request iterator
* @param response The ProcessReply object.
*
* @return The status of the method.
*/
grpc::Status Process(::grpc::ServerContext *context,
::grpc::ServerReader<::armonik::api::grpc::v1::worker::ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) override;

/**
* @brief Implements the HealthCheck method of the Worker service.
*
* @param context The ServerContext object.
* @param request The Empty object.
* @param response The HealthCheckReply object.
*
* @return The status of the method.
*/
grpc::Status HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) override;
};
99 changes: 99 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/header/Worker/TaskHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once
#include <future>
#include <string>

#include "agent_common.pb.h"
#include "agent_service.grpc.pb.h"

#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

// #include "SessionContext.h"

/**
* @brief The TaskHandler class provides methods to create and handle tasks
*
*/
class TaskHandler {

private:
grpc::ClientContext context_;
std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> stub_;
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator_;
std::string session_id_;
std::string task_id_;
armonik::api::grpc::v1::TaskOptions task_options_;
google::protobuf::RepeatedPtrField<std::string> expected_result_;
std::vector<std::byte> payload_;
std::vector<std::byte> data_dependencies_;
std::string token_;
armonik::api::grpc::v1::Configuration config_;

public:
/**
* @brief Construct a new Task Handler object
*
* @param client the agent client
* @param request_iterator The request iterator
*/
TaskHandler(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> client,
std::shared_ptr<grpc::ServerReader<armonik::api::grpc::v1::worker::ProcessRequest>> request_iterator);

/**
* @brief Initialise the task handler
*
*/
void init();

/**
* @brief Create a task_chunk_stream.
*
* @param task_request a task request
* @param is_last A boolean indicating if this is the last request.
* @param chunk_max_size Maximum chunk size.
* @return std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
*/
static std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>
task_chunk_stream(armonik::api::grpc::v1::TaskRequest task_request, bool is_last, size_t chunk_max_size);

/**
* @brief Convert task_requests to request_stream.
*
* @param task_requests List of task requests
* @param task_options The Task Options used for this batch of tasks
* @param chunk_max_size Maximum chunk size.
* @return std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>
*/
static auto to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size)
-> std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>;
Comment on lines +67 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static auto to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size)
-> std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>>;
static std::vector<std::future<std::vector<armonik::api::grpc::v1::agent::CreateTaskRequest>>> to_request_stream(const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests,
armonik::api::grpc::v1::TaskOptions task_options, size_t chunk_max_size);


/**
* @brief Create a tasks async object
* @param task_options The Task Options used for this batch of tasks
* @param task_requests List of task requests
* @return Successfully sent task
*/
std::future<armonik::api::grpc::v1::agent::CreateTaskReply>
create_tasks_async(armonik::api::grpc::v1::TaskOptions task_options,
const std::vector<armonik::api::grpc::v1::TaskRequest> &task_requests);

/**
* @brief Send task result
*
* @param key the key of result
* @param data The result data
* @return A future containing a vector of ResultReply
*/
std::future<std::vector<armonik::api::grpc::v1::agent::ResultReply>> send_result(std::string key,
std::vector<std::byte> &data);

/**
* @brief Get the result ids object
*
* @param results The results data
* @return std::vector<std::string> list of result ids
*/
std::vector<std::string>
get_result_ids(std::vector<armonik::api::grpc::v1::agent::CreateResultsMetaDataRequest_ResultCreate> results);
};
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class WorkerServer {
// Create a stub for the Submitter service
worker_server->agent_stub = Agent::NewStub(worker_server->channel);

worker_server->builder_.RegisterService(new Worker());
worker_server->builder_.RegisterService(new Worker(std::move(worker_server->agent_stub), nullptr));
worker_server->logger.info("Finish to register new worker");

return worker_server;
Expand Down
91 changes: 91 additions & 0 deletions packages/cpp/ArmoniK.Api.Worker/source/Worker/ArmoniKWorker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include <iostream>
#include <memory>
#include <string>

#include <grpc++/grpc++.h>

#include "grpcpp/support/sync_stream.h"
#include "objects.pb.h"

#include "utils/RootConfiguration.h"
#include "utils/WorkerServer.h"
#include "worker_common.pb.h"
#include "worker_service.grpc.pb.h"

#include "Worker/ArmoniKWorker.h"
#include "Worker/TaskHandler.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;

using armonik::api::common::utils::IConfiguration;
using armonik::api::grpc::v1::TaskOptions;

using namespace armonik::api::grpc::v1::worker;
using namespace armonik::api::worker;
using namespace armonik::api::common::utils;

/**
* @brief Constructs a ArmoniKWorker object.
*/
ArmoniKWorker::ArmoniKWorker(std::unique_ptr<armonik::api::grpc::v1::agent::Agent::Stub> agent,
void (*processing_function)(TaskHandler task_handler))
: logger_(armonik::api::common::serilog::logging_format::SEQ) {
logger_.info("Build Service ArmoniKWorker");
logger_.add_property("class", "ArmoniKWorker");
logger_.add_property("Worker", "ArmoniK.Api.Cpp");
agent_ = std::move(agent);
processing_function_ = processing_function;
}

/**
* @brief Implements the Process method of the Worker service.
*
* @param context The ServerContext object.
* @param reader The request iterator
* @param response The ProcessReply object.
*
* @return The status of the method.
*/
Status ArmoniKWorker::Process(::grpc::ServerContext *context, ::grpc::ServerReader<ProcessRequest> *reader,
::armonik::api::grpc::v1::worker::ProcessReply *response) {

logger_.info("Receive new request From C++ real Worker");

auto output = armonik::api::grpc::v1::Output();
*output.mutable_ok() = armonik::api::grpc::v1::Empty();
// ProcessRequest req;
// reader->Read(&req);
*response->mutable_output() = output;

std::shared_ptr<grpc::ServerReader<ProcessRequest>> request_iterator =
std::make_shared<grpc::ServerReader<ProcessRequest>>(*reader);

TaskHandler task_handler(std::move(agent_), request_iterator);

task_handler.init();

logger_.info("Finish call C++");

return grpc::Status::OK;
}

/**
* @brief Implements the HealthCheck method of the Worker service.
*
* @param context The ServerContext object.
* @param request The Empty object.
* @param response The HealthCheckReply object.
*
* @return The status of the method.
*/
Status ArmoniKWorker::HealthCheck(::grpc::ServerContext *context, const ::armonik::api::grpc::v1::Empty *request,
::armonik::api::grpc::v1::worker::HealthCheckReply *response) {
// Implementation of the HealthCheck method
logger_.info("HealthCheck request OK");

response->set_status(HealthCheckReply_ServingStatus_SERVING);

return Status::OK;
}
Loading