Skip to content

Commit

Permalink
feat: Agent and Worker send data with files
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Sep 18, 2023
1 parent ee8d926 commit 67e01e8
Show file tree
Hide file tree
Showing 38 changed files with 660 additions and 2,619 deletions.
2 changes: 1 addition & 1 deletion Base/src/ArmoniK.Core.Base.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Core" Version="3.12.0" />
<PackageReference Include="ArmoniK.Api.Core" Version="3.13.0-jgdatafromfiles.12.1676231" />
<PackageReference Include="JetBrains.Annotations" Version="2023.2.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="7.0.10" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="7.0.0" />
Expand Down
2 changes: 1 addition & 1 deletion Common/src/ArmoniK.Core.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


<ItemGroup>
<PackageReference Include="ArmoniK.Api.Core" Version="3.12.0" />
<PackageReference Include="ArmoniK.Api.Core" Version="3.13.0-jgdatafromfiles.12.1676231" />
<PackageReference Include="Calzolari.Grpc.AspNetCore.Validation" Version="6.3.0" />
<PackageReference Include="Grpc.HealthCheck" Version="2.55.0" />
<PackageReference Include="JetBrains.Annotations" Version="2023.2.0" />
Expand Down
10 changes: 10 additions & 0 deletions Common/src/Injection/Options/Pollster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ public class Pollster
/// This happens in parallel of the execution of another task
/// </summary>
public TimeSpan TimeoutBeforeNextAcquisition { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Shared folder between Agent and Worker
/// </summary>
public string SharedCacheFolder { get; set; } = "/cache/shared";

/// <summary>
/// Internal cache for data
/// </summary>
public string InternalCacheFolder { get; set; } = "/cache/internal";
}
2 changes: 2 additions & 0 deletions Common/src/Pollster/AgentHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public async Task<IAgent> Start(string token,
ILogger logger,
SessionData sessionData,
TaskData taskData,
string folder,
CancellationToken cancellationToken)
{
try
Expand All @@ -144,6 +145,7 @@ public async Task<IAgent> Start(string token,
taskTable_,
sessionData,
taskData,
folder,
token,
logger);

Expand Down
198 changes: 0 additions & 198 deletions Common/src/Pollster/ComputeRequestQueue.cs

This file was deleted.

60 changes: 23 additions & 37 deletions Common/src/Pollster/DataPrefetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Worker;
using ArmoniK.Core.Base;
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Core.Common.Exceptions;
using ArmoniK.Core.Common.Storage;

using Google.Protobuf;

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -84,58 +79,49 @@ await objectStorage_.Init(cancellationToken)
/// Method used to prefetch data before executing a task
/// </summary>
/// <param name="taskData">Task metadata</param>
/// <param name="folder"></param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Queue containing the request containing the data for the task which can be sent to the worker
/// </returns>
/// <exception cref="ObjectDataNotFoundException">input data are not found</exception>
/// <exception cref="InvalidOperationException">invalid transition between states</exception>
public async Task<Queue<ProcessRequest.Types.ComputeRequest>> PrefetchDataAsync(TaskData taskData,
CancellationToken cancellationToken)
public async Task PrefetchDataAsync(TaskData taskData,
string folder,
CancellationToken cancellationToken)
{
using var activity = activitySource_?.StartActivity();
using var sessionScope = logger_.BeginPropertyScope(("sessionId", taskData.SessionId));

activity?.AddEvent(new ActivityEvent("Load payload"));

var payloadChunks = await objectStorage_.GetValuesAsync(taskData.PayloadId,
cancellationToken)
.Select(bytes => UnsafeByteOperations.UnsafeWrap(bytes))
.ToListAsync(cancellationToken)
.ConfigureAwait(false);

var computeRequests = new ComputeRequestQueue(logger_);
computeRequests.Init(PayloadConfiguration.MaxChunkSize,
taskData.SessionId,
taskData.TaskId,
taskData.Options.ToGrpcTaskOptions(),
payloadChunks.FirstOrDefault(),
taskData.ExpectedOutputIds);

for (var i = 1; i < payloadChunks.Count; i++)
await using (var fs = new FileStream(Path.Combine(folder,
taskData.PayloadId),
FileMode.OpenOrCreate))
{
computeRequests.AddPayloadChunk(payloadChunks[i]);
await using var w = new BinaryWriter(fs);
await foreach (var chunk in objectStorage_.GetValuesAsync(taskData.PayloadId,
cancellationToken)
.ConfigureAwait(false))
{
w.Write(chunk);
}
}

computeRequests.CompletePayload();

foreach (var dataDependency in taskData.DataDependencies)
{
var dependencyChunks = await objectStorage_.GetValuesAsync(dataDependency,
cancellationToken)
.Select(bytes => UnsafeByteOperations.UnsafeWrap(bytes))
.ToListAsync(cancellationToken)
.ConfigureAwait(false);

computeRequests.InitDataDependency(dataDependency);
foreach (var chunk in dependencyChunks)
await using var fs = new FileStream(Path.Combine(folder,
dataDependency),
FileMode.OpenOrCreate);
await using var w = new BinaryWriter(fs);
await foreach (var chunk in objectStorage_.GetValuesAsync(dataDependency,
cancellationToken)
.ConfigureAwait(false))
{
computeRequests.AddDataDependencyChunk(chunk);
w.Write(chunk);
}

computeRequests.CompleteDataDependency();
}

return computeRequests.GetQueue();
}
}
2 changes: 2 additions & 0 deletions Common/src/Pollster/IAgentHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public interface IAgentHandler
/// <param name="logger">Logger that may be injected into the handler that embed preconfigured scopes</param>
/// <param name="sessionData">Session metadata</param>
/// <param name="taskData">Task metadata</param>
/// <param name="folder">Shared folder between Agent and Worker</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
Expand All @@ -54,5 +55,6 @@ Task<IAgent> Start(string token,
ILogger logger,
SessionData sessionData,
TaskData taskData,
string folder,
CancellationToken cancellationToken);
}
Loading

0 comments on commit 67e01e8

Please sign in to comment.