Skip to content

Commit

Permalink
updated events provider to use new in memory cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Oct 23, 2023
1 parent 387e195 commit 203ef1d
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 117 deletions.
1 change: 0 additions & 1 deletion src/EvenireDB.Benchmark/EvenireDB.Benchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.8" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="ObjectLayoutInspector" Version="0.1.4" />
</ItemGroup>
Expand Down
9 changes: 4 additions & 5 deletions src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using BenchmarkDotNet.Engines;
using EvenireDB;
using EvenireDB.Common;
using Microsoft.Extensions.Caching.Memory;
using EvenireDB.Utils;
using System.Threading.Channels;

public class EventsProviderBenckmarks
Expand All @@ -14,7 +14,7 @@ public class EventsProviderBenckmarks
private EventsProvider _sut;

[Params(10, 100, 1000)]
public int EventsCount;
public uint EventsCount;

[GlobalSetup]
public void GlobalSetup()
Expand All @@ -29,14 +29,13 @@ public void GlobalSetup()
var repoConfig = new FileEventsRepositoryConfig(dataPath);
var repo = new FileEventsRepository(repoConfig, factory);

var options = new MemoryCacheOptions();
var cache = new MemoryCache(options);
var cache = new LRUCache<Guid, CachedEvents>(this.EventsCount);

var channel = Channel.CreateUnbounded<IncomingEventsGroup>();

_sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer);

var events = Enumerable.Range(0, this.EventsCount).Select(i => factory.Create(Guid.NewGuid(), "lorem", _data)).ToArray();
var events = Enumerable.Range(0, (int)this.EventsCount).Select(i => factory.Create(Guid.NewGuid(), "lorem", _data)).ToArray();
Task.WaitAll(_sut.AppendAsync(_streamId, events).AsTask());
}

Expand Down
1 change: 0 additions & 1 deletion src/EvenireDB.Server/EvenireDB.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
<PackageReference Include="Asp.Versioning.Http" Version="7.1.0" />
<PackageReference Include="Grpc.AspNetCore" Version="2.32.0" />
<PackageReference Include="Google.Protobuf" Version="3.24.4" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
9 changes: 7 additions & 2 deletions src/EvenireDB.Server/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using EvenireDB;
using EvenireDB.Server.Routes;
using EvenireDB.Utils;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Options;
using System.Reflection;
Expand Down Expand Up @@ -35,12 +36,16 @@
});

builder.Services
.AddMemoryCache()
.Configure<ServerConfig>(builder.Configuration.GetSection("Server"))
.AddSingleton<ICache<Guid, CachedEvents>>(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new LRUCache<Guid, CachedEvents>(serverConfig.CacheCapacity);
})
.AddSingleton(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new EventsProviderConfig(serverConfig.CacheDuration, serverConfig.MaxPageSizeToClient);
return new EventsProviderConfig(serverConfig.MaxPageSizeToClient);
})
.AddSingleton<EventsProvider>()
.AddSingleton(channel.Writer)
Expand Down
7 changes: 5 additions & 2 deletions src/EvenireDB.Server/ServerConfig.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
internal record class ServerConfig
{
public TimeSpan CacheDuration { get; init; } = TimeSpan.FromHours(4);

/// <summary>
/// max number of streams to cache in memory
/// </summary>
public uint CacheCapacity { get; init; } = 1000;

/// <summary>
/// max page size returned to clients
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB.Server/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
},
"AllowedHosts": "*",
"Server": {
"CacheDuration": "4:00:00",
"CacheCapacity": 1000000,
"MaxPageSizeToClient": 100,
"MaxEventsPageSizeFromDisk": 100,
"MaxEventDataSize": 500000,
Expand Down
4 changes: 0 additions & 4 deletions src/EvenireDB/EvenireDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\EvenireDB.Common\EvenireDB.Common.csproj" />
</ItemGroup>
Expand Down
57 changes: 20 additions & 37 deletions src/EvenireDB/EventsProvider.cs
Original file line number Diff line number Diff line change
@@ -1,60 +1,44 @@
using EvenireDB.Common;
using Microsoft.Extensions.Caching.Memory;
using EvenireDB.Utils;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

namespace EvenireDB
{
public record CachedEvents(List<IEvent> Events, SemaphoreSlim Semaphore);

// TODO: logging
// TODO: append to a transaction log
// TODO: consider dumping to disk when memory consumption is approaching a threshold (check IMemoryCache.GetCurrentStatistics() )
public class EventsProvider
{
private readonly IMemoryCache _cache;
private readonly ICache<Guid, CachedEvents> _cache;
private readonly EventsProviderConfig _config;
private readonly ChannelWriter<IncomingEventsGroup> _writer;
private readonly IEventsRepository _repo;
private readonly static SemaphoreSlim _lock = new(1,1);

internal record CachedEvents(List<IEvent> Events, SemaphoreSlim Semaphore);

public EventsProvider(EventsProviderConfig config, IEventsRepository repo, IMemoryCache cache, ChannelWriter<IncomingEventsGroup> writer)
public EventsProvider(
EventsProviderConfig config,
IEventsRepository repo,
ICache<Guid, CachedEvents> cache,
ChannelWriter<IncomingEventsGroup> writer)
{
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
_config = config ?? throw new ArgumentNullException(nameof(config));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_repo = repo ?? throw new ArgumentNullException(nameof(repo));
}

private async ValueTask<CachedEvents> EnsureCachedEventsAsync(Guid streamId, CancellationToken cancellationToken)
private async ValueTask<CachedEvents> EventsFactory(Guid streamId, CancellationToken cancellationToken)
{
var key = streamId.ToString();

var entry = _cache.Get<CachedEvents>(key);
if (entry is not null)
return entry;

_lock.Wait(cancellationToken);
try
{
entry = _cache.Get<CachedEvents>(key);
if (entry is null)
{
var persistedEvents = new List<IEvent>();
await foreach(var @event in _repo.ReadAsync(streamId, cancellationToken))
persistedEvents.Add(@event);
entry = new CachedEvents(persistedEvents, new SemaphoreSlim(1, 1));

_cache.Set(key, entry, _config.CacheDuration);
}
}
finally
{
_lock.Release();
}
return entry;
var persistedEvents = new List<IEvent>();
await foreach (var @event in _repo.ReadAsync(streamId, cancellationToken))
persistedEvents.Add(@event);
return new CachedEvents(persistedEvents, new SemaphoreSlim(1));
}

private ValueTask<CachedEvents> EnsureCachedEventsAsync(Guid streamId, CancellationToken cancellationToken)
=> _cache.GetOrAddAsync(streamId, this.EventsFactory, cancellationToken);

public async IAsyncEnumerable<IEvent> ReadAsync(
Guid streamId,
StreamPosition startPosition,
Expand All @@ -66,7 +50,7 @@ public async IAsyncEnumerable<IEvent> ReadAsync(

CachedEvents entry = await EnsureCachedEventsAsync(streamId, cancellationToken).ConfigureAwait(false);

if (entry.Events == null || entry.Events.Count == 0)
if (entry?.Events == null || entry.Events.Count == 0)
yield break;

uint totalCount = (uint)entry.Events.Count;
Expand Down Expand Up @@ -105,8 +89,7 @@ public async IAsyncEnumerable<IEvent> ReadAsync(

public async ValueTask<IOperationResult> AppendAsync(Guid streamId, IEnumerable<IEvent> incomingEvents, CancellationToken cancellationToken = default)
{
if (incomingEvents is null)
throw new ArgumentNullException(nameof(incomingEvents));
ArgumentNullException.ThrowIfNull(incomingEvents, nameof(incomingEvents));

if (!incomingEvents.Any())
return new SuccessResult();
Expand Down Expand Up @@ -136,7 +119,7 @@ public async ValueTask<IOperationResult> AppendAsync(Guid streamId, IEnumerable<
private void UpdateCache(Guid streamId, IEnumerable<IEvent> incomingEvents, CachedEvents entry)
{
entry.Events.AddRange(incomingEvents);
_cache.Set(streamId.ToString(), entry, _config.CacheDuration);
_cache.Update(streamId, entry);
}

private static bool HasDuplicateEvent(IEnumerable<IEvent> incomingEvents, CachedEvents entry, out IEvent? duplicate)
Expand Down
6 changes: 2 additions & 4 deletions src/EvenireDB/EventsProviderConfig.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace EvenireDB
{
public record EventsProviderConfig(
TimeSpan CacheDuration,
uint MaxPageSize)
public record EventsProviderConfig(uint MaxPageSize)
{
public readonly static EventsProviderConfig Default = new(TimeSpan.FromHours(4), 100);
public readonly static EventsProviderConfig Default = new(100);
}
}
8 changes: 8 additions & 0 deletions src/EvenireDB/Utils/ICache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EvenireDB.Utils
{
public interface ICache<TKey, TValue> where TKey : notnull
{
ValueTask<TValue> GetOrAddAsync(TKey key, Func<TKey, CancellationToken, ValueTask<TValue>> valueFactory, CancellationToken cancellationToken = default);
void Update(TKey key, TValue value);
}
}
32 changes: 23 additions & 9 deletions src/EvenireDB/Utils/LRUCache.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
namespace EvenireDB.Utils
{
public class LRUCache<TKey, TValue> : IDisposable
where TKey : notnull
// TODO: drop entries if memory consumption is approaching a threshold
// TODO: add item expiration
public class LRUCache<TKey, TValue> : IDisposable, ICache<TKey, TValue> where TKey : notnull
{
private class Node
{
public TKey Key { get; init; }

Check warning on line 9 in src/EvenireDB/Utils/LRUCache.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'Key' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public TValue Value { get; init; }
public TValue Value { get; set; }

Check warning on line 10 in src/EvenireDB/Utils/LRUCache.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'Value' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public Node? Next { get; set; }
public Node? Previous { get; set; }
}
Expand All @@ -16,7 +17,7 @@ private class Node
private Node? _head;
private Node? _tail;

private object _lock = new ();
private object _lock = new();
private readonly Dictionary<TKey, SemaphoreSlim> _semaphores;

private bool _disposed;
Expand All @@ -28,8 +29,21 @@ public LRUCache(uint capacity)
_semaphores = new Dictionary<TKey, SemaphoreSlim>((int)capacity);
}

public void Update(TKey key, TValue value)
{
if (!_cache.ContainsKey(key))
throw new KeyNotFoundException($"invalid key: {key}");

SemaphoreSlim semaphore = GetSemaphore(key);
semaphore.Wait();

_cache[key].Value = value;

semaphore.Release();
}

public async ValueTask<TValue> GetOrAddAsync(
TKey key,
TKey key,
Func<TKey, CancellationToken, ValueTask<TValue>> valueFactory,
CancellationToken cancellationToken = default)
{
Expand All @@ -45,15 +59,15 @@ public async ValueTask<TValue> GetOrAddAsync(
}
else
{
MoveToHead(node);
MoveToHead(node);
}

return node.Value;

Check warning on line 65 in src/EvenireDB/Utils/LRUCache.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
}

private async Task<LRUCache<TKey, TValue>.Node?> AddAsync(
TKey key,
Func<TKey, CancellationToken, ValueTask<TValue>> valueFactory,
TKey key,
Func<TKey, CancellationToken, ValueTask<TValue>> valueFactory,
CancellationToken cancellationToken)
{
var value = await valueFactory(key, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -90,7 +104,7 @@ private SemaphoreSlim GetSemaphore(TKey key)
semaphore = new SemaphoreSlim(1, 1);
_semaphores.Add(key, semaphore);
}

return semaphore;
}
}
Expand Down
Loading

0 comments on commit 203ef1d

Please sign in to comment.