Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send data between agent and worker through files instead of stream-based requests #414

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 15 additions & 49 deletions Protos/V1/agent_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,41 +47,17 @@ message CreateTaskReply {
string communication_token = 4; /** Communication token received by the worker during task processing */
}

// Request to retrieve data
message DataRequest {
string communication_token = 1; /** Communication token received by the worker during task processing */
string key = 2;
// Id of the result that will be retrieved
string result_id = 2;
}

message DataReply {
message Init {
string key = 1;
oneof has_result {
DataChunk data = 2;
string error = 3;
}
}
string communication_token = 1; /** Communication token received by the worker during task processing */
oneof type {
Init init = 2;
DataChunk data = 3;
string error = 4;
}
}

message Result {
oneof type {
InitKeyedDataStream init = 1;
DataChunk data = 2;
}
string communication_token = 3; /** Communication token received by the worker during task processing */
}

message ResultReply {
string communication_token = 3; /** Communication token received by the worker during task processing */
oneof type {
Empty Ok = 1;
string Error = 2;
}
// Response when data is available in the shared folder
message DataResponse {
// Id of the result that will be retrieved
string result_id = 2;
}

/*
Expand Down Expand Up @@ -156,7 +132,7 @@ message SubmitTasksResponse {
}

/*
* Request for creating results without data
* Request for creating results with data
*/
message CreateResultsRequest {
/**
Expand All @@ -180,11 +156,9 @@ message CreateResultsResponse {
}

/*
* Request for uploading results data through stream.
* Data must be sent in multiple chunks.
* Only one result can be uploaded.
* Request for notifying results data are available in files.
*/
message UploadResultDataRequest {
message NotifyResultDataRequest {
/**
* The metadata to identify the result to update.
*/
Expand All @@ -195,23 +169,15 @@ message UploadResultDataRequest {

/**
* The possible messages that constitute a UploadResultDataRequest
* They should be sent in the following order:
* - id
* - data_chunk (stream can have multiple data_chunk messages that represent data divided in several parts)
*
* Data chunk cannot exceed the size returned by the GetServiceConfiguration rpc method
*/
oneof type {
ResultIdentifier id = 1; /** The identifier of the result to which add data. */
bytes data_chunk = 2; /** A chunk of data. */
}
repeated ResultIdentifier ids = 1; /** The identifier of the result to which add data. */
string communication_token = 4; /** Communication token received by the worker during task processing */
}

/*
* Response for uploading data with stream for result
* Response for notifying data file availability for result
* Received when data are successfully copied to the ObjectStorage
*/
message UploadResultDataResponse {
string result_id = 1; /** The Id of the result to which data were added */
string communication_token = 2; /** Communication token received by the worker during task processing */
message NotifyResultDataResponse {
repeated string result_ids = 1; /** The Id of the result to which data were added */
}
33 changes: 26 additions & 7 deletions Protos/V1/agent_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import "agent_common.proto";
option csharp_namespace = "ArmoniK.Api.gRPC.V1.Agent";

service Agent {
rpc CreateTask(stream CreateTaskRequest) returns (CreateTaskReply);

/**
* Create the metadata of multiple results at once
* Data have to be uploaded separately
Expand All @@ -19,18 +21,35 @@ service Agent {
rpc CreateResults(CreateResultsRequest) returns (CreateResultsResponse) {}

/**
* Upload data for result with stream
* Notify Agent that a data file representing the Result to upload is available in the shared folder
* The name of the file should be the result id
* Blocks until data are stored in Object Storage
*/
rpc UploadResultData(stream UploadResultDataRequest) returns (UploadResultDataResponse) {}
rpc NotifyResultData(NotifyResultDataRequest) returns (NotifyResultDataResponse) {}

/**
* Create tasks metadata and submit task for processing.
*/
rpc SubmitTasks(SubmitTasksRequest) returns (SubmitTasksResponse) {}

rpc CreateTask(stream CreateTaskRequest) returns (CreateTaskReply);
rpc GetResourceData(DataRequest) returns (stream DataReply);
rpc GetCommonData(DataRequest) returns (stream DataReply);
rpc GetDirectData(DataRequest) returns (stream DataReply);
rpc SendResult(stream Result) returns (ResultReply);
/**
* Retrieve Resource Data from the Agent
* Data is stored in the shared folder between Agent and Worker as a file with the result id as name
* Blocks until data are available in the shared folder
*/
rpc GetResourceData(DataRequest) returns (DataResponse);

/**
* Retrieve Resource Data from the Agent
* Data is stored in the shared folder between Agent and Worker as a file with the result id as name
* Blocks until data are available in the shared folder
*/
rpc GetCommonData(DataRequest) returns (DataResponse);

/**
* Retrieve Resource Data from the Agent
* Data is stored in the shared folder between Agent and Worker as a file with the result id as name
* Blocks until data are available in the shared folder
*/
rpc GetDirectData(DataRequest) returns (DataResponse);
}
34 changes: 9 additions & 25 deletions Protos/V1/worker_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,19 @@ import "objects.proto";
option csharp_namespace = "ArmoniK.Api.gRPC.V1.Worker";

message ProcessRequest {
message ComputeRequest {
message InitRequest {
Configuration configuration = 1;
string session_id = 2;
string task_id = 3;
TaskOptions task_options = 4;
repeated string expected_output_keys = 5;
DataChunk payload = 6;
}
message InitData {
oneof type {
string key = 1;
bool last_data = 2;
}
}
oneof type {
InitRequest init_request = 1;
DataChunk payload = 2;
InitData init_data = 3;
DataChunk data = 4;
}
}
string communication_token = 1;
ComputeRequest compute = 2;
string session_id = 2;
string task_id = 3;
TaskOptions task_options = 4;
repeated string expected_output_keys = 5;
string payload_id = 6;
repeated string data_dependencies = 7;
string data_folder = 8;
Configuration configuration = 9;
}

message ProcessReply {
string communication_token = 1;
Output output = 2;
Output output = 1;
}

message HealthCheckReply {
Expand Down
2 changes: 1 addition & 1 deletion Protos/V1/worker_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import "worker_common.proto";
option csharp_namespace = "ArmoniK.Api.gRPC.V1.Worker";

service Worker {
rpc Process(stream ProcessRequest) returns (ProcessReply);
rpc Process(ProcessRequest) returns (ProcessReply);
rpc HealthCheck(Empty) returns (HealthCheckReply);
}
113 changes: 40 additions & 73 deletions packages/csharp/ArmoniK.Api.Mock/Services/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Linq;
using System.Threading.Tasks;

using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Agent;

using Grpc.Core;
Expand All @@ -36,63 +36,6 @@ public override Task<CreateTaskReply> CreateTask(IAsyncStreamReader<CreateTaskRe
CreationStatusList = new CreateTaskReply.Types.CreationStatusList(),
});

/// <inheritdocs />
[Count]
public override async Task GetCommonData(DataRequest request,
IServerStreamWriter<DataReply> responseStream,
ServerCallContext context)
=> await responseStream.WriteAsync(new DataReply
{
Data = new DataChunk
{
DataComplete = true,
},
})
.ConfigureAwait(false);

/// <inheritdocs />
[Count]
public override async Task GetDirectData(DataRequest request,
IServerStreamWriter<DataReply> responseStream,
ServerCallContext context)
=> await responseStream.WriteAsync(new DataReply
{
Data = new DataChunk
{
DataComplete = true,
},
})
.ConfigureAwait(false);

/// <inheritdocs />
[Count]
public override async Task GetResourceData(DataRequest request,
IServerStreamWriter<DataReply> responseStream,
ServerCallContext context)
=> await responseStream.WriteAsync(new DataReply
{
Data = new DataChunk
{
DataComplete = true,
},
})
.ConfigureAwait(false);

/// <inheritdocs />
[Count]
public override async Task<ResultReply> SendResult(IAsyncStreamReader<Result> requestStream,
ServerCallContext context)
{
await foreach (var _ in requestStream.ReadAllAsync())
{
}

return new ResultReply
{
Ok = new Empty(),
};
}

/// <inheritdocs />
[Count]
public override Task<CreateResultsMetaDataResponse> CreateResultsMetaData(CreateResultsMetaDataRequest request,
Expand All @@ -111,21 +54,6 @@ public override Task<SubmitTasksResponse> SubmitTasks(SubmitTasksRequest request
CommunicationToken = request.CommunicationToken,
});

/// <inheritdocs />
[Count]
public override async Task<UploadResultDataResponse> UploadResultData(IAsyncStreamReader<UploadResultDataRequest> requestStream,
ServerCallContext context)
{
await foreach (var _ in requestStream.ReadAllAsync())
{
}

return new UploadResultDataResponse
{
ResultId = "result-id",
CommunicationToken = "communication-token",
};
}

/// <inheritdocs />
[Count]
Expand All @@ -135,4 +63,43 @@ public override Task<CreateResultsResponse> CreateResults(CreateResultsRequest r
{
CommunicationToken = request.CommunicationToken,
});

/// <inheritdocs />
[Count]
public override Task<DataResponse> GetCommonData(DataRequest request,
ServerCallContext context)
=> Task.FromResult(new DataResponse
{
ResultId = request.ResultId,
});

/// <inheritdocs />
[Count]
public override Task<DataResponse> GetDirectData(DataRequest request,
ServerCallContext context)
=> Task.FromResult(new DataResponse
{
ResultId = request.ResultId,
});

/// <inheritdocs />
[Count]
public override Task<DataResponse> GetResourceData(DataRequest request,
ServerCallContext context)
=> Task.FromResult(new DataResponse
{
ResultId = request.ResultId,
});

/// <inheritdocs />
[Count]
public override Task<NotifyResultDataResponse> NotifyResultData(NotifyResultDataRequest request,
ServerCallContext context)
=> Task.FromResult(new NotifyResultDataResponse
{
ResultIds =
{
request.Ids.Select(identifier => identifier.ResultId),
},
});
}
Loading
Loading