Skip to content

Commit

Permalink
fix: all pr reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
ereali-aneo committed Jun 6, 2024
1 parent f68950c commit 4a7588c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 38 deletions.
4 changes: 2 additions & 2 deletions csharp/native/DynamicSubmission/Client/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM mcr.microsoft.com/dotnet/runtime:6.0 AS base
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base

WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["DynamicSubmission/Client/ArmoniK.Samples.DynamicSubmission.Client.csproj", "DynamicSubmission/Client/"]
RUN dotnet restore "DynamicSubmission/Client/ArmoniK.Samples.DynamicSubmission.Client.csproj"
Expand Down
5 changes: 2 additions & 3 deletions csharp/native/DynamicSubmission/Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
using System.CommandLine;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -130,7 +129,7 @@ internal static async Task Run(string endpoint,
{
new CreateResultsRequest.Types.ResultCreate
{
Data = UnsafeByteOperations.UnsafeWrap(Encoding.ASCII.GetBytes(JsonSerializer.Serialize(input))),
Data = UnsafeByteOperations.UnsafeWrap(input.Serialize()),
Name = "Payload",
},
},
Expand Down Expand Up @@ -170,7 +169,7 @@ await eventClient.WaitForResultsAsync(createSessionReply.SessionId,
resultId,
CancellationToken.None);
// Test if the result is equal to the sum of 0 to n
var res = uint.Parse(Encoding.ASCII.GetString(result));
var res = BitConverter.ToUInt32(result);
var expectedResult = n * (n + 1) / 2;

// Throw an exception if the result is wrong
Expand Down
7 changes: 5 additions & 2 deletions csharp/native/DynamicSubmission/Common/Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ public static Table Deserialize(byte[] payload)
return res ?? new Table();
}

public static byte[] Serialize(Table table)
/// <summary>
/// </summary>
/// <returns></returns>
public byte[] Serialize()
{
var res = Encoding.ASCII.GetBytes(JsonSerializer.Serialize(table));
var res = Encoding.ASCII.GetBytes(JsonSerializer.Serialize(this));
return res;
}
}
Expand Down
53 changes: 22 additions & 31 deletions csharp/native/DynamicSubmission/Worker/DynamicSubmission.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Channel.Utils;
Expand All @@ -12,6 +10,7 @@
using ArmoniK.Api.gRPC.V1.Agent;
using ArmoniK.Api.Worker.Worker;
using ArmoniK.Samples.DynamicSubmission.Common;
using ArmoniK.Utils;

using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
Expand Down Expand Up @@ -72,14 +71,14 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
logger_.LogInformation("nbrOfTasks = {nbrOfTasks}",
nbrOfTasks);

// Create all subtable needed for subtask arguments
// Create all sub-table needed for subtask arguments
IList<Table> inputs = new List<Table>();
logger_.LogDebug("Table List create start splitting");

for (uint i = 0; i / tableInput.Threshold < nbrOfTasks; i += tableInput.Threshold)
{
var len = tableInput.Threshold;
if (i / tableInput.Threshold == nbrOfTasks)
if (i / tableInput.Threshold == nbrOfTasks - 1 && tableInput.Size % tableInput.Threshold != 0)
{
len = tableInput.Size % tableInput.Threshold;
}
Expand All @@ -100,24 +99,22 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
PartitionId = taskHandler.TaskOptions.PartitionId,
};

logger_.LogDebug("Create ResultsMetaData for Subworkers");

var subTaskResultIds = (await taskHandler.CreateResultsMetaDataAsync(Enumerable.Range(1,
(int)nbrOfTasks)
.Select(i => new CreateResultsMetaDataRequest.Types.ResultCreate
{
Name = "Result_" + i,
}))).Results.Select(data => data.ResultId);
logger_.LogDebug("Created ResultsMetaData for Subworkers");
logger_.LogDebug("Create Result Async");
}))).Results.Select(data => data.ResultId)
.AsIList();
logger_.LogDebug("Created ResultsMetaData for Sub-workers");

var payloadIds = (await taskHandler.CreateResultsAsync(inputs.Select((table,
i) => new CreateResultsRequest.Types.ResultCreate
{
Data = UnsafeByteOperations.UnsafeWrap(Table.Serialize(table)),
Data = UnsafeByteOperations.UnsafeWrap(table.Serialize()),
Name = "Payload_" + (i + 1),
}))).Results.Select(data => data.ResultId);
logger_.LogDebug("Created Results Async for Subworkers Submit Tasks Async ");
logger_.LogDebug("Created Results Async for Sub-workers Submit Tasks Async ");

await taskHandler.SubmitTasksAsync(new List<SubmitTasksRequest.Types.TaskCreation>(subTaskResultIds.Zip(payloadIds)
.Select(tuple => new SubmitTasksRequest.Types.TaskCreation
Expand All @@ -129,17 +126,19 @@ await taskHandler.SubmitTasksAsync(new List<SubmitTasksRequest.Types.TaskCreatio
},
})),
taskOptions);
logger_.LogDebug("Sub-workers submitted");

logger_.LogDebug("Submitting Aggregation");
logger_.LogDebug("Submitting Aggregation start");

var payload = await taskHandler.CreateResultsAsync(new List<CreateResultsRequest.Types.ResultCreate>
{
new()
{
Data = UnsafeByteOperations.UnsafeWrap(Encoding.ASCII.GetBytes(JsonSerializer.Serialize(tableInput))),
Data = UnsafeByteOperations.UnsafeWrap(tableInput.Serialize()),
Name = "Payload",
},
});
logger_.LogDebug("Created Results Async for Sub-workers Submit Tasks Async ");

var payloadId = payload.Results.Single()
.ResultId;
Expand All @@ -160,40 +159,32 @@ await taskHandler.SubmitTasksAsync(new List<SubmitTasksRequest.Types.TaskCreatio
},
},
taskOptions);
logger_.LogDebug("Aggregation submitted");
}
else
{
logger_.LogDebug("Debug : Enter in subtask process");

uint result = 0;

for (uint i = 0; taskHandler.DataDependencies.Count == 0 && i < size; i++)
if (taskHandler.DataDependencies.Any())
{
result += tableInput.Values[i];
result = taskHandler.DataDependencies.Values.Select(res => BitConverter.ToUInt32(res,
0))
.Aggregate((u,
u1) => u + u1);
}

// if we have dependencies this mean that it is the aggregation task
// We add all result of the dependencies together to get the final result
if (taskHandler.DataDependencies.Count > 0)
else
{
logger_.LogDebug("Debug : Enter in aggregation process");

var results = taskHandler.DataDependencies.Values.Select(res => uint.Parse(Encoding.ASCII.GetString(res)));

for (var i = 0; i < taskHandler.DataDependencies.Count; i++)
{
logger_.LogInformation("Info : result = {result} ",
result);

result += results.ElementAt(i);
//logger_.LogInformation("Info : result = {result} Sum with next result ", result);
//result = (uint)results.Sum(c => c);
}
result = tableInput.Values.Aggregate((u,
u1) => u + u1);
}

// We get the result of the task using through the handler
await taskHandler.SendResult(resultId,
Encoding.ASCII.GetBytes($"{result}"))
BitConverter.GetBytes(result))
.ConfigureAwait(false);
}
}
Expand Down

0 comments on commit 4a7588c

Please sign in to comment.