Skip to content

Commit

Permalink
fix: all pr reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
ereali-aneo committed Jun 6, 2024
1 parent f68950c commit 390cc28
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 48 deletions.
4 changes: 2 additions & 2 deletions csharp/native/DynamicSubmission/Client/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM mcr.microsoft.com/dotnet/runtime:6.0 AS base
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base

WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["DynamicSubmission/Client/ArmoniK.Samples.DynamicSubmission.Client.csproj", "DynamicSubmission/Client/"]
RUN dotnet restore "DynamicSubmission/Client/ArmoniK.Samples.DynamicSubmission.Client.csproj"
Expand Down
7 changes: 4 additions & 3 deletions csharp/native/DynamicSubmission/Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -130,7 +131,7 @@ internal static async Task Run(string endpoint,
{
new CreateResultsRequest.Types.ResultCreate
{
Data = UnsafeByteOperations.UnsafeWrap(Encoding.ASCII.GetBytes(JsonSerializer.Serialize(input))),
Data = UnsafeByteOperations.UnsafeWrap(input.Serialize()),
Name = "Payload",
},
},
Expand Down Expand Up @@ -170,7 +171,7 @@ await eventClient.WaitForResultsAsync(createSessionReply.SessionId,
resultId,
CancellationToken.None);
// Test if the result is equal to the sum of 0 to n
var res = uint.Parse(Encoding.ASCII.GetString(result));
var res = BitConverter.ToUInt32(result);
var expectedResult = n * (n + 1) / 2;

// Throw an exception if the result is wrong
Expand Down Expand Up @@ -201,7 +202,7 @@ public static async Task<int> Main(string[] args)
// Describe the application and its purpose
var rootCommand =
new
RootCommand("Calculate the sum of a table with subtask and aggregation example for ArmoniK.\nIt sends a task to ArmoniK with a table containing 0, 1, 2..., n subdivided it in threshold sized table and create a new task , aggregate their result and addition them to get the final result");
RootCommand($"Calculate the sum of a table with subtask and aggregation example for ArmoniK.\nIt sends a task to ArmoniK with a table containing 0, 1, 2..., n subdivided it in threshold sized table and create a new task , aggregate their result and addition them to get the final result");
// Add the options to the parser
rootCommand.AddOption(endpoint);
rootCommand.AddOption(partition);
Expand Down
20 changes: 12 additions & 8 deletions csharp/native/DynamicSubmission/Common/Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ namespace ArmoniK.Samples.DynamicSubmission.Common
{
public record Table
{
public uint Size { get; init; }
public uint Threshold { get; init; }
public uint[] Values { get; set; }

public Table(uint size,
uint threshold)
{
Expand All @@ -37,7 +41,7 @@ public Table(uint size,
Values = new uint[size];
for (uint i = 0; i < size; i++)
{
Values[i] = i + 1;
Values[i] = (i + 1);
}
}

Expand All @@ -57,23 +61,23 @@ public Table(uint start,
Values = new uint[size];
for (uint i = 0; i < size; i++)
{
Values[i] = i + start;
Values[i] = (i + start);
}
}

public uint Size { get; init; }
public uint Threshold { get; init; }
public uint[] Values { get; set; }

public static Table Deserialize(byte[] payload)
{
var res = JsonSerializer.Deserialize<Table>(Encoding.ASCII.GetString(payload));
return res ?? new Table();
}

public static byte[] Serialize(Table table)
/// <summary>
///
/// </summary>
/// <returns></returns>
public byte[] Serialize()
{
var res = Encoding.ASCII.GetBytes(JsonSerializer.Serialize(table));
var res = Encoding.ASCII.GetBytes(JsonSerializer.Serialize(this));
return res;
}
}
Expand Down
66 changes: 31 additions & 35 deletions csharp/native/DynamicSubmission/Worker/DynamicSubmission.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System.Text;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Numerics;
using System.Text.Json;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Channel.Utils;
using ArmoniK.Api.Common.Options;
Expand All @@ -16,10 +17,15 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

using System.Threading.Tasks;

using ArmoniK.Utils;

using Microsoft.Extensions.Logging;

using Empty = ArmoniK.Api.gRPC.V1.Empty;


namespace ArmoniK.Samples.DynamicSubmission.Worker
{
public class DynamicSubmissionWorker : WorkerStreamWrapper
Expand Down Expand Up @@ -66,20 +72,20 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
{
logger_.LogDebug("Started Calculate nbrOfTasks");
// Calculate the nbr of tasks to create
var nbrOfTasks = size / tableInput.Threshold + (size % tableInput.Threshold != 0
? 1U
: 0U);
var nbrOfTasks = (size / tableInput.Threshold) + (size % tableInput.Threshold != 0
? 1U
: 0U);
logger_.LogInformation("nbrOfTasks = {nbrOfTasks}",
nbrOfTasks);

// Create all subtable needed for subtask arguments
// Create all sub-table needed for subtask arguments
IList<Table> inputs = new List<Table>();
logger_.LogDebug("Table List create start splitting");

for (uint i = 0; i / tableInput.Threshold < nbrOfTasks; i += tableInput.Threshold)
{
var len = tableInput.Threshold;
if (i / tableInput.Threshold == nbrOfTasks)
if (i / tableInput.Threshold == nbrOfTasks - 1 && tableInput.Size % tableInput.Threshold != 0)
{
len = tableInput.Size % tableInput.Threshold;
}
Expand All @@ -100,24 +106,22 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
PartitionId = taskHandler.TaskOptions.PartitionId,
};

logger_.LogDebug("Create ResultsMetaData for Subworkers");

var subTaskResultIds = (await taskHandler.CreateResultsMetaDataAsync(Enumerable.Range(1,
(int)nbrOfTasks)
.Select(i => new CreateResultsMetaDataRequest.Types.ResultCreate
{
Name = "Result_" + i,
}))).Results.Select(data => data.ResultId);
logger_.LogDebug("Created ResultsMetaData for Subworkers");
logger_.LogDebug("Create Result Async");
}))).Results.Select(data => data.ResultId).AsIList();
logger_.LogDebug("Created ResultsMetaData for Sub-workers");

var payloadIds = (await taskHandler.CreateResultsAsync(inputs.Select((table,
i) => new CreateResultsRequest.Types.ResultCreate
{
Data = UnsafeByteOperations.UnsafeWrap(Table.Serialize(table)),
Data =
UnsafeByteOperations.UnsafeWrap(table.Serialize()),
Name = "Payload_" + (i + 1),
}))).Results.Select(data => data.ResultId);
logger_.LogDebug("Created Results Async for Subworkers Submit Tasks Async ");
logger_.LogDebug("Created Results Async for Sub-workers Submit Tasks Async ");

await taskHandler.SubmitTasksAsync(new List<SubmitTasksRequest.Types.TaskCreation>(subTaskResultIds.Zip(payloadIds)
.Select(tuple => new SubmitTasksRequest.Types.TaskCreation
Expand All @@ -127,19 +131,22 @@ await taskHandler.SubmitTasksAsync(new List<SubmitTasksRequest.Types.TaskCreatio
{
tuple.First,
},
})),
})
),
taskOptions);
logger_.LogDebug("Sub-workers submitted");

logger_.LogDebug("Submitting Aggregation");
logger_.LogDebug("Submitting Aggregation start");

var payload = await taskHandler.CreateResultsAsync(new List<CreateResultsRequest.Types.ResultCreate>
{
new()
{
Data = UnsafeByteOperations.UnsafeWrap(Encoding.ASCII.GetBytes(JsonSerializer.Serialize(tableInput))),
Data = UnsafeByteOperations.UnsafeWrap(tableInput.Serialize()),
Name = "Payload",
},
});
logger_.LogDebug("Created Results Async for Sub-workers Submit Tasks Async ");

var payloadId = payload.Results.Single()
.ResultId;
Expand All @@ -160,40 +167,29 @@ await taskHandler.SubmitTasksAsync(new List<SubmitTasksRequest.Types.TaskCreatio
},
},
taskOptions);
logger_.LogDebug("Aggregation submitted");

}
else
{
logger_.LogDebug("Debug : Enter in subtask process");

uint result = 0;

for (uint i = 0; taskHandler.DataDependencies.Count == 0 && i < size; i++)
if (taskHandler.DataDependencies.Any())
{
result += tableInput.Values[i];
result = taskHandler.DataDependencies.Values.Select(res => BitConverter.ToUInt32(res, 0)).Aggregate((u, u1) => u + u1);
}

// if we have dependencies this mean that it is the aggregation task
// We add all result of the dependencies together to get the final result
if (taskHandler.DataDependencies.Count > 0)
else
{
logger_.LogDebug("Debug : Enter in aggregation process");

var results = taskHandler.DataDependencies.Values.Select(res => uint.Parse(Encoding.ASCII.GetString(res)));

for (var i = 0; i < taskHandler.DataDependencies.Count; i++)
{
logger_.LogInformation("Info : result = {result} ",
result);

result += results.ElementAt(i);
//logger_.LogInformation("Info : result = {result} Sum with next result ", result);
//result = (uint)results.Sum(c => c);
}
result = tableInput.Values.Aggregate((u, u1) => u + u1);
}

// We get the result of the task using through the handler
await taskHandler.SendResult(resultId,
Encoding.ASCII.GetBytes($"{result}"))
BitConverter.GetBytes(result))
.ConfigureAwait(false);
}
}
Expand Down

0 comments on commit 390cc28

Please sign in to comment.