Skip to content

Commit

Permalink
Merge pull request #10 from mizrael/memory-consumption
Browse files Browse the repository at this point in the history
added memory watcher
  • Loading branch information
mizrael authored Oct 23, 2023
2 parents 68209ff + 3e1d15f commit 0f761ae
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 92 deletions.
4 changes: 3 additions & 1 deletion src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using EvenireDB;
using EvenireDB.Common;
using EvenireDB.Utils;
using Microsoft.Extensions.Logging.Abstractions;
using System.Threading.Channels;

public class EventsProviderBenckmarks
Expand Down Expand Up @@ -30,10 +31,11 @@ public void GlobalSetup()
var repo = new FileEventsRepository(repoConfig, factory);

var cache = new LRUCache<Guid, CachedEvents>(this.EventsCount);
var logger = new NullLogger<EventsProvider>();

var channel = Channel.CreateUnbounded<IncomingEventsGroup>();

_sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer);
_sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer, logger);

var events = Enumerable.Range(0, (int)this.EventsCount).Select(i => factory.Create(Guid.NewGuid(), "lorem", _data)).ToArray();
Task.WaitAll(_sut.AppendAsync(_streamId, events).AsTask());
Expand Down
65 changes: 31 additions & 34 deletions src/EvenireDB.Server/IncomingEventsPersistenceWorker.cs
Original file line number Diff line number Diff line change
@@ -1,47 +1,44 @@
using EvenireDB;
using System.Threading.Channels;
using System.Threading.Channels;

public class IncomingEventsPersistenceWorker : BackgroundService
namespace EvenireDB.Server
{
private readonly ChannelReader<IncomingEventsGroup> _reader;
private readonly IEventsRepository _repo;
private readonly ILogger<IncomingEventsPersistenceWorker> _logger;

public IncomingEventsPersistenceWorker(ChannelReader<IncomingEventsGroup> reader, IEventsRepository repo, ILogger<IncomingEventsPersistenceWorker> logger)
public class IncomingEventsPersistenceWorker : BackgroundService
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_repo = repo ?? throw new ArgumentNullException(nameof(repo));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
private readonly ChannelReader<IncomingEventsGroup> _reader;
private readonly IEventsRepository _repo;
private readonly ILogger<IncomingEventsPersistenceWorker> _logger;

protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
await Task.Factory.StartNew(async () =>
public IncomingEventsPersistenceWorker(ChannelReader<IncomingEventsGroup> reader, IEventsRepository repo, ILogger<IncomingEventsPersistenceWorker> logger)
{
await this.ExecuteAsyncCore(cancellationToken).ConfigureAwait(false);
}, cancellationToken);
}
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_repo = repo ?? throw new ArgumentNullException(nameof(repo));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

private async Task ExecuteAsyncCore(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested || await _reader.WaitToReadAsync(cancellationToken))
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (_reader.TryRead(out IncomingEventsGroup? group) && group is not null)
await Task.Factory.StartNew(async () =>
{
try
{
await _repo.AppendAsync(group.AggregateId, group.Events, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
await ExecuteAsyncCore(cancellationToken).ConfigureAwait(false);
}, cancellationToken);
}

private async Task ExecuteAsyncCore(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested || await _reader.WaitToReadAsync(cancellationToken))
{
while (_reader.TryRead(out IncomingEventsGroup? group) && group is not null)
{
_logger.LogError(
ex,
"an error has occurred while persisting events group for aggregate {AggregateId}: {Error}",
group.AggregateId,
ex.Message);
try
{
await _repo.AppendAsync(group.AggregateId, group.Events, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.EventsGroupPersistenceError(group.AggregateId, ex.Message);
}
}

}
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/EvenireDB.Server/LogMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace EvenireDB.Server
{
public static partial class LogMessages
{
[LoggerMessage(
EventId = 1,
Level = LogLevel.Warning,
Message = "Memory usage is {MemoryUsage} bytes, more than the allowed value: {MaxAllowedAllocatedBytes}. Dropping some cached streams")]
public static partial void HighMemoryUsageDetected(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
EventId = 2,
Level = LogLevel.Debug,
Message = "Memory usage is {MemoryUsage} / {MaxAllowedAllocatedBytes} bytes")]
public static partial void MemoryUsageBelowTreshold(this ILogger logger, long memoryUsage, long maxAllowedAllocatedBytes);

[LoggerMessage(
EventId = 3,
Level = LogLevel.Error,
Message = "an error has occurred while persisting events group for stream {StreamId}: {Error}")]
public static partial void EventsGroupPersistenceError(this ILogger logger, Guid streamId, string error);
}
}
51 changes: 51 additions & 0 deletions src/EvenireDB.Server/MemoryWatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using EvenireDB.Utils;
using System.Diagnostics;

namespace EvenireDB.Server
{
public record MemoryWatcherSettings(
TimeSpan Interval,
long MaxAllowedAllocatedBytes);

public class MemoryWatcher : BackgroundService
{
private readonly MemoryWatcherSettings _settings;
private readonly ILogger<MemoryWatcher> _logger;
private readonly IServiceProvider _sp;

public MemoryWatcher(MemoryWatcherSettings settings, ILogger<MemoryWatcher> logger, IServiceProvider sp)
{
_settings = settings;
_logger = logger;
_sp = sp;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using var process = Process.GetCurrentProcess();

bool needDrop = process.PrivateMemorySize64 > _settings.MaxAllowedAllocatedBytes;
if (needDrop)
{
_logger.HighMemoryUsageDetected(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes);

using var scope = _sp.CreateScope();
var cache = scope.ServiceProvider.GetRequiredService<ICache<Guid, CachedEvents>>();

var dropCount = cache.Count / 3;
cache.DropOldest(dropCount);

GC.Collect();
}
else
{
_logger.MemoryUsageBelowTreshold(process.PrivateMemorySize64, _settings.MaxAllowedAllocatedBytes);
}

await Task.Delay(_settings.Interval, stoppingToken);
}
}
}
}
10 changes: 8 additions & 2 deletions src/EvenireDB.Server/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using EvenireDB;
using EvenireDB.Server;
using EvenireDB.Server.Routes;
using EvenireDB.Utils;
using Microsoft.AspNetCore.Server.Kestrel.Core;
Expand Down Expand Up @@ -40,7 +41,7 @@
.AddSingleton<ICache<Guid, CachedEvents>>(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new LRUCache<Guid, CachedEvents>(serverConfig.CacheCapacity);
return new LRUCache<Guid, CachedEvents>(serverConfig.MaxInMemoryStreamsCount);
})
.AddSingleton(ctx =>
{
Expand Down Expand Up @@ -72,7 +73,12 @@
return new FileEventsRepositoryConfig(dataPath, serverConfig.MaxEventsPageSizeFromDisk);
})
.AddSingleton<IEventsRepository, FileEventsRepository>()
.AddHostedService<IncomingEventsPersistenceWorker>();
.AddHostedService<IncomingEventsPersistenceWorker>()
.AddSingleton(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new MemoryWatcherSettings(serverConfig.MemoryWatcherInterval, serverConfig.MaxAllowedAllocatedBytes);
}).AddHostedService<MemoryWatcher>();

var version = Assembly.GetExecutingAssembly().GetName().Version;

Expand Down
18 changes: 13 additions & 5 deletions src/EvenireDB.Server/ServerConfig.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
internal record class ServerConfig
{
/// <summary>
/// max number of streams to cache in memory
/// </summary>
public uint CacheCapacity { get; init; } = 1000;

/// <summary>
/// max page size returned to clients
/// </summary>
Expand All @@ -24,4 +19,17 @@ internal record class ServerConfig

public int HttpPort { get; init; } = 16281;
public int GrpcPort { get; init; } = 16282;

/// <summary>
/// max number of streams to cache in memory
/// </summary>
public uint MaxInMemoryStreamsCount { get; init; } = 1000;

/// <summary>
/// max allowed memory allocated by the process. If exceeded, the system will try to recover
/// some memory by dropping some cached streams
/// </summary>
public long MaxAllowedAllocatedBytes { get; init; } = 1_000_000_000; // TODO: consider making this a function of max allowed streams count and max event data size

public TimeSpan MemoryWatcherInterval { get; init; } = TimeSpan.FromMinutes(2);
}
11 changes: 8 additions & 3 deletions src/EvenireDB.Server/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
{
"Logging": {
"Console": {
"IncludeScopes": true
},
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Default": "Warning",
"EvenireDB": "Trace"
}
},
"Server": {
"HttpPort": 5001,
"GrpcPort": 5002
"GrpcPort": 5002,
"MemoryWatcherInterval": "0:00:10",
"MaxAllowedAllocatedBytes": 50000000
}
}
6 changes: 3 additions & 3 deletions src/EvenireDB.Server/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Default": "Warning",
"EvenireDB": "Error"
}
},
"AllowedHosts": "*",
Expand All @@ -11,6 +11,6 @@
"MaxPageSizeToClient": 100,
"MaxEventsPageSizeFromDisk": 100,
"MaxEventDataSize": 500000,
"DataFolder": "./data"
"DataFolder": "./data"
}
}
4 changes: 4 additions & 0 deletions src/EvenireDB/EvenireDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\EvenireDB.Common\EvenireDB.Common.csproj" />
</ItemGroup>
Expand Down
16 changes: 11 additions & 5 deletions src/EvenireDB/EventsProvider.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using EvenireDB.Common;
using EvenireDB.Utils;
using Microsoft.Extensions.Logging;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

Expand All @@ -15,21 +16,26 @@ public class EventsProvider
private readonly EventsProviderConfig _config;
private readonly ChannelWriter<IncomingEventsGroup> _writer;
private readonly IEventsRepository _repo;
private readonly ILogger<EventsProvider> _logger;

public EventsProvider(
EventsProviderConfig config,
IEventsRepository repo,
ICache<Guid, CachedEvents> cache,
ChannelWriter<IncomingEventsGroup> writer)
EventsProviderConfig config,
IEventsRepository repo,
ICache<Guid, CachedEvents> cache,
ChannelWriter<IncomingEventsGroup> writer,
ILogger<EventsProvider> logger)
{
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
_config = config ?? throw new ArgumentNullException(nameof(config));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_repo = repo ?? throw new ArgumentNullException(nameof(repo));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

private async ValueTask<CachedEvents> EventsFactory(Guid streamId, CancellationToken cancellationToken)
{
_logger.ReadingStreamFromRepository(streamId);

var persistedEvents = new List<IEvent>();
await foreach (var @event in _repo.ReadAsync(streamId, cancellationToken))
persistedEvents.Add(@event);
Expand Down Expand Up @@ -119,7 +125,7 @@ public async ValueTask<IOperationResult> AppendAsync(Guid streamId, IEnumerable<
private void UpdateCache(Guid streamId, IEnumerable<IEvent> incomingEvents, CachedEvents entry)
{
entry.Events.AddRange(incomingEvents);
_cache.Update(streamId, entry);
_cache.AddOrUpdate(streamId, entry);
}

private static bool HasDuplicateEvent(IEnumerable<IEvent> incomingEvents, CachedEvents entry, out IEvent? duplicate)
Expand Down
13 changes: 13 additions & 0 deletions src/EvenireDB/LogMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Microsoft.Extensions.Logging;

namespace EvenireDB
{
public static partial class LogMessages
{
[LoggerMessage(
EventId = 0,
Level = LogLevel.Warning,
Message = "Reading stream '{StreamId}' from repository")]
public static partial void ReadingStreamFromRepository(this ILogger logger, Guid streamId);
}
}
5 changes: 4 additions & 1 deletion src/EvenireDB/Utils/ICache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
public interface ICache<TKey, TValue> where TKey : notnull
{
ValueTask<TValue> GetOrAddAsync(TKey key, Func<TKey, CancellationToken, ValueTask<TValue>> valueFactory, CancellationToken cancellationToken = default);
void Update(TKey key, TValue value);
void AddOrUpdate(TKey key, TValue value);
void DropOldest(uint maxCount);

public uint Count { get; }
}
}
Loading

0 comments on commit 0f761ae

Please sign in to comment.