Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaume-chervet committed Nov 13, 2024
1 parent 99c6ed0 commit 0f8a3fc
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 15 deletions.
7 changes: 4 additions & 3 deletions src/SlimData/Endpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

namespace SlimData;


[MemoryPackable]
public partial record QueueItemStatus(string Id, int HttpCode);

public class Endpoints
{
public delegate Task RespondDelegate(IRaftCluster cluster, SlimPersistentState provider,
Expand Down Expand Up @@ -178,9 +182,6 @@ public static async Task ListLeftPushCommand(SlimPersistentState provider, strin
await cluster.ReplicateAsync(logEntry, source.Token);
}

[MemoryPackable]
public partial record QueueItemStatus(string Id, int HttpCode);

public static Task ListSetQueueItemStatus(HttpContext context)
{
return DoAsync(context, async (cluster, provider, source) =>
Expand Down
3 changes: 1 addition & 2 deletions src/SlimData/ListItems.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Text.Json.Serialization;
using MemoryPack;
using MemoryPack;

namespace SlimData;

Expand Down
2 changes: 1 addition & 1 deletion src/SlimFaas/Database/DatabaseMockService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Task<long> ListLengthAsync(string key)
return Task.FromResult<long>(list.Count);
}

public async Task ListSetQueueItemStatus(string key, List<Endpoints.QueueItemStatus> queueItemStatus)
public async Task ListSetQueueItemStatus(string key, List<QueueItemStatus> queueItemStatus)
{
await Task.Delay(100);
}
Expand Down
2 changes: 1 addition & 1 deletion src/SlimFaas/Database/IDatabaseService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ public interface IDatabaseService
Task ListLeftPushAsync(string key, byte[] field);
Task<IList<QueueData>> ListRightPopAsync(string key, int count = 1);
Task<long> ListLengthAsync(string key);
Task ListSetQueueItemStatus(string key, List<Endpoints.QueueItemStatus> queueItemStatus);
Task ListSetQueueItemStatus(string key, List<QueueItemStatus> queueItemStatus);
}
2 changes: 1 addition & 1 deletion src/SlimFaas/Database/ISlimFaasQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ public interface ISlimFaasQueue
{
Task EnqueueAsync(string key, byte[] message);
Task<IList<QueueData>> DequeueAsync(string key, long count = 1);
Task ListSetQueueItemStatusAsync(string key, List<Endpoints.QueueItemStatus> queueItemStatus);
Task ListSetQueueItemStatusAsync(string key, List<QueueItemStatus> queueItemStatus);
public Task<long> CountAsync(string key);
}
4 changes: 2 additions & 2 deletions src/SlimFaas/Database/SlimDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ private async Task<IList<QueueData>> DoListRightPopAsync(string key, int count =
}
}

public async Task ListSetQueueItemStatus(string key, List<Endpoints.QueueItemStatus> queueItemStatus)
public async Task ListSetQueueItemStatus(string key, List<QueueItemStatus> queueItemStatus)
{
await Retry.Do(() => DoListSetQueueItemStatus(key, queueItemStatus), _retryInterval, logger, MaxAttemptCount);
}

private async Task DoListSetQueueItemStatus(string key, List<Endpoints.QueueItemStatus> queueItemStatus)
private async Task DoListSetQueueItemStatus(string key, List<QueueItemStatus> queueItemStatus)
{

EndPoint endpoint = await GetAndWaitForLeader();
Expand Down
2 changes: 1 addition & 1 deletion src/SlimFaas/Database/SlimFaasQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async Task<IList<QueueData>> DequeueAsync(string key, long count = 1)
return data;
}

public async Task ListSetQueueItemStatusAsync(string key, List<Endpoints.QueueItemStatus> queueItemStatus) => await databaseService.ListSetQueueItemStatus($"{KeyPrefix}{key}", queueItemStatus);
public async Task ListSetQueueItemStatusAsync(string key, List<QueueItemStatus> queueItemStatus) => await databaseService.ListSetQueueItemStatus($"{KeyPrefix}{key}", queueItemStatus);

public async Task<long> CountAsync(string key) => await databaseService.ListLengthAsync($"{KeyPrefix}{key}");
}
6 changes: 3 additions & 3 deletions src/SlimFaas/SlimWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private async Task<int> ManageProcessingTasksAsync(ISlimFaasQueue slimFaasQueue,
{
processingTasks.Add(functionDeployment, new List<RequestToWait>());
}
var queueItemStatusList = new List<Endpoints.QueueItemStatus>();
var queueItemStatusList = new List<QueueItemStatus>();
List<RequestToWait> httpResponseMessagesToDelete = new();
foreach (RequestToWait processing in processingTasks[functionDeployment])
{
Expand All @@ -160,11 +160,11 @@ private async Task<int> ManageProcessingTasksAsync(ISlimFaasQueue slimFaasQueue,
processing.CustomRequest.Method, processing.CustomRequest.Path, processing.CustomRequest.Query,
httpResponseMessage.StatusCode);
httpResponseMessagesToDelete.Add(processing);
queueItemStatusList.Add(new Endpoints.QueueItemStatus(processing.id, statusCode));
queueItemStatusList.Add(new QueueItemStatus(processing.id, statusCode));
}
catch (Exception e)
{
queueItemStatusList.Add(new Endpoints.QueueItemStatus(processing.id, 500));
queueItemStatusList.Add(new QueueItemStatus(processing.id, 500));
httpResponseMessagesToDelete.Add(processing);
logger.LogWarning("Request Error: {Message} {StackTrace}", e.Message, e.StackTrace);
historyHttpService.SetTickLastCall(functionDeployment, DateTime.UtcNow.Ticks);
Expand Down
2 changes: 1 addition & 1 deletion tests/SlimFaas.Tests/SlimProxyMiddlewareTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ internal class MemorySlimFaasQueue : ISlimFaasQueue

public Task<long> CountAsync(string key) => throw new NotImplementedException();

public Task ListSetQueueItemStatusAsync(string key, List<Endpoints.QueueItemStatus> queueItemStatus) => throw new NotImplementedException();
public Task ListSetQueueItemStatusAsync(string key, List<QueueItemStatus> queueItemStatus) => throw new NotImplementedException();

public async Task EnqueueAsync(string key, byte[] message) => await Task.Delay(100);
}
Expand Down

0 comments on commit 0f8a3fc

Please sign in to comment.