Skip to content

Commit

Permalink
it works :)
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaume-chervet committed Nov 10, 2023
1 parent 6686b2a commit 544da0a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 32 deletions.
4 changes: 3 additions & 1 deletion src/SlimData/Commands/LogSnapshotCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using DotNext.IO;
using DotNext.Runtime.Serialization;
using DotNext.Text;
using Newtonsoft.Json;

namespace RaftNode;

Expand Down Expand Up @@ -98,6 +99,7 @@ public async ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken t
}
}

Console.WriteLine("1 Writing snapshot ReadFromAsync" + JsonConvert.SerializeObject(keysValues));
}

#pragma warning disable CA2252
Expand Down Expand Up @@ -149,7 +151,7 @@ public static async ValueTask<LogSnapshotCommand> ReadFromAsync<TReader>(TReader
hashsets.Add(key, hashset);
}

Console.WriteLine("1 Reading snapshot ReadFromAsync");
Console.WriteLine("1 Reading snapshot ReadFromAsync" + JsonConvert.SerializeObject(keysValues));
return new LogSnapshotCommand(keysValues, hashsets, queues);
}
}
2 changes: 1 addition & 1 deletion src/SlimData/SimplePersistentState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override async ValueTask WriteToAsync<TWriter>(TWriter writer, Cancellati
foreach (var (key, value) in hashset.Value)
{
await writer.WriteStringAsync(key.AsMemory(), context, LengthFormat.Plain, token);
await writer.WriteStringAsync(key.AsMemory(), context, LengthFormat.Plain, token);
await writer.WriteStringAsync(value.AsMemory(), context, LengthFormat.Plain, token);
}
}
}
Expand Down
28 changes: 16 additions & 12 deletions src/SlimData/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,27 @@ await context.Response.WriteAsync("Key key is empty or value is not a number",
await cluster.ApplyReadBarrierAsync(context.RequestAborted);

IList<string> values = new List<string>();
var queue = ((ISupplier<SupplierPayload>)provider).Invoke().Queues[key];
for (var i = 0; i < count; i++)
var queues = ((ISupplier<SupplierPayload>)provider).Invoke().Queues;
if (queues.ContainsKey(key))
{
if (queue.Count <= i)
var queue = ((ISupplier<SupplierPayload>)provider).Invoke().Queues[key];
for (var i = 0; i < count; i++)
{
break;
if (queue.Count <= i)
{
break;
}

values.Add(queue[i]);
}

values.Add(queue[i]);
await context.Response.WriteAsync(JsonConvert.SerializeObject(values), context.RequestAborted);
var logEntry =
provider.interpreter.CreateLogEntry(new ListRightPopCommand() { Key = key, Count = count },
cluster.Term);
await provider.AppendAsync(logEntry, source.Token);
await provider.CommitAsync(source.Token);
}

await context.Response.WriteAsync(JsonConvert.SerializeObject(values), context.RequestAborted);
var logEntry =
provider.interpreter.CreateLogEntry(new ListRightPopCommand() { Key = key, Count = count },
cluster.Term);
await provider.AppendAsync(logEntry, source.Token);
await provider.CommitAsync(source.Token);
}
catch (Exception e)
{
Expand Down
21 changes: 12 additions & 9 deletions src/SlimFaas/Database/SlimDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public Task<string> GetAsync(string key) {
public async Task SetAsync(string key, string value)
{
var multipart = new MultipartFormDataContent();
multipart.Add(new StringContent(key), value);
multipart.Add(new StringContent(value), key);

var response = await _httpClient.PostAsync(new Uri("http://localhost:3262/AddKeyValue"), multipart);
}
Expand All @@ -47,7 +47,14 @@ public Task<IDictionary<string, string>> HashGetAllAsync(string key) {
return data.Hashsets.TryGetValue(key, out var value) ? Task.FromResult((IDictionary<string, string>)value) : Task.FromResult((IDictionary<string, string>)new Dictionary<string, string>());
}

public Task ListLeftPushAsync(string key, string field) => throw new NotImplementedException();
public Task ListLeftPushAsync(string key, string field) {
var request = new HttpRequestMessage(HttpMethod.Post, new Uri("http://localhost:3262/ListLeftPush"));
var multipart = new MultipartFormDataContent();
multipart.Add(new StringContent(field), key);
request.Content = multipart;
var response = _httpClient.SendAsync(request);
return Task.CompletedTask;
}

public async Task<IList<string>> ListRightPopAsync(string key, long count = 1)
{
Expand All @@ -58,17 +65,13 @@ public async Task<IList<string>> ListRightPopAsync(string key, long count = 1)
request.Content = multipart;
var response = await _httpClient.SendAsync(request);
var json = await response.Content.ReadAsStringAsync();
if (string.IsNullOrEmpty(json))
{
return new List<string>();
}
return JsonConvert.DeserializeObject<IList<string>>(json);
return string.IsNullOrEmpty(json) ? new List<string>() : JsonConvert.DeserializeObject<IList<string>>(json);
}

public Task<long> ListLengthAsync(string key) {

var data = ((ISupplier<SupplierPayload>)_simplePersistentState).Invoke();
return data.Queues.TryGetValue(key, out var value) ? Task.FromResult((long)value.Count) : Task.FromResult(0L);
var result = data.Queues.TryGetValue(key, out var value) ? Task.FromResult((long)value.Count) : Task.FromResult(0L);
return result;
}
}
#pragma warning restore CA2252
9 changes: 6 additions & 3 deletions src/SlimFaas/MasterService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace SlimFaas;
using Newtonsoft.Json;

namespace SlimFaas;

public interface IMasterService
{
Expand All @@ -18,12 +20,13 @@ public class MasterService : IMasterService

public MasterService(IRedisService redisService)
{
_redisService = redisService;
_redisService = redisService;
}

public async Task CheckAsync()
{
var dictionary= await _redisService.HashGetAllAsync(SlimFaasMaster);
Console.WriteLine(JsonConvert.SerializeObject(dictionary));
if (dictionary.Count == 0)
{
await _redisService.HashSetAsync(SlimFaasMaster, new Dictionary<string, string>
Expand Down Expand Up @@ -59,4 +62,4 @@ public async Task CheckAsync()
}
}

}
}
1 change: 0 additions & 1 deletion src/SlimFaas/SlimDataSynchronizationWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
#pragma warning disable CA2252
Startup.ClusterMembers.Add($"http://{pod.Ip}:3262");

}
// Starter.StartNode("http", 3262);
#pragma warning restore CA2252
Expand Down
11 changes: 6 additions & 5 deletions src/SlimFaas/SlimWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ private async Task DoOneCycle(CancellationToken stoppingToken, Dictionary<string
var numberLimitProcessingTasks = ComputeNumberLimitProcessingTasks(slimFaas, function);
setTickLastCallCounterDictionary[functionDeployment]++;
var functionReplicas = function.Replicas;
await UpdateTickLastCallIfRequestStillInProgress(functionReplicas, setTickLastCallCounterDictionary,
var queueLenght = await UpdateTickLastCallIfRequestStillInProgress(functionReplicas, setTickLastCallCounterDictionary,
functionDeployment, numberProcessingTasks);
if (functionReplicas == 0) continue;
if (functionReplicas == 0 || queueLenght <= 0 ) continue;
var isAnyContainerStarted = function.Pods?.Any(p => p.Ready.HasValue && p.Ready.Value);
if(!isAnyContainerStarted.HasValue || !isAnyContainerStarted.Value) continue;
if (numberProcessingTasks >= numberLimitProcessingTasks) continue;
Expand Down Expand Up @@ -90,20 +90,21 @@ private async Task SendHttpRequestToFunction(Dictionary<string, IList<RequestToW
}
}

private async Task UpdateTickLastCallIfRequestStillInProgress(int? functionReplicas,
private async Task<long> UpdateTickLastCallIfRequestStillInProgress(int? functionReplicas,
Dictionary<string, int> setTickLastCallCounterDictionnary, string functionDeployment, int numberProcessingTasks)
{
var counterLimit = functionReplicas == 0 ? 10 : 300;

var queueLenght = await _queue.CountAsync(functionDeployment);
if (setTickLastCallCounterDictionnary[functionDeployment] > counterLimit)
{
setTickLastCallCounterDictionnary[functionDeployment] = 0;
var queueLenght = await _queue.CountAsync(functionDeployment);

if (queueLenght > 0 || numberProcessingTasks > 0)
{
_historyHttpService.SetTickLastCall(functionDeployment, DateTime.Now.Ticks);
}
}
return queueLenght;
}

private static int? ComputeNumberLimitProcessingTasks(SlimFaasDeploymentInformation slimFaas,
Expand Down

0 comments on commit 544da0a

Please sign in to comment.