Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Oct 26, 2023
1 parent 668d30f commit 6488dc3
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 142 deletions.
4 changes: 2 additions & 2 deletions src/EvenireDB.Server/Grpc/EventsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace EvenireDB.Server.Grpc
{
public class EventsService : EventsGrpcService.EventsGrpcServiceBase
{
private readonly EventsProvider _provider;
private readonly IEventsProvider _provider;
private readonly IEventFactory _eventFactory;

public EventsService(EventsProvider provider, IEventFactory eventFactory)
public EventsService(IEventsProvider provider, IEventFactory eventFactory)
{
_provider = provider ?? throw new ArgumentNullException(nameof(provider));
_eventFactory = eventFactory ?? throw new ArgumentNullException(nameof(eventFactory));
Expand Down
30 changes: 0 additions & 30 deletions src/EvenireDB.Server/LogMessages.cs

This file was deleted.

58 changes: 4 additions & 54 deletions src/EvenireDB.Server/Program.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using EvenireDB;
using EvenireDB.Server;
using EvenireDB.Server.Routes;
using EvenireDB.Utils;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Options;
using System.Reflection;
using System.Threading.Channels;

var builder = WebApplication.CreateBuilder(args);

Expand All @@ -19,66 +16,19 @@
.AddJsonFile($"appsettings.{builder.Environment.EnvironmentName}.json", optional: true, reloadOnChange: true)
.AddEnvironmentVariables();

var serverConfig = builder.Configuration.GetSection("Server").Get<EvenireServerSettings>();

builder.WebHost.ConfigureKestrel((context, options) =>
{
var serverConfig = context.Configuration.GetRequiredSection("Server").Get<ServerConfig>();

options.ListenAnyIP(serverConfig.GrpcPort, o => o.Protocols = HttpProtocols.Http2);
options.ListenAnyIP(serverConfig.HttpPort, o => o.Protocols = HttpProtocols.Http1);
});

builder.Services.AddGrpc();

var channel = Channel.CreateUnbounded<IncomingEventsGroup>(new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});

builder.Services
.Configure<ServerConfig>(builder.Configuration.GetSection("Server"))
.AddSingleton<ICache<Guid, CachedEvents>>(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new LRUCache<Guid, CachedEvents>(serverConfig.MaxInMemoryStreamsCount);
})
.AddSingleton(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new EventsProviderConfig(serverConfig.MaxPageSizeToClient);
})
.AddSingleton<EventsProvider>()
.AddSingleton(channel.Writer)
.AddSingleton(channel.Reader)
.AddSingleton<IEventFactory>(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new EventFactory(serverConfig.MaxEventDataSize);
})
.AddSingleton<EventMapper>()
.AddSingleton(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;

var dataPath = serverConfig.DataFolder;
if (string.IsNullOrWhiteSpace(dataPath))
dataPath = Path.Combine(AppContext.BaseDirectory, "data");
else
{
if (!Path.IsPathFullyQualified(dataPath))
dataPath = Path.Combine(AppContext.BaseDirectory, dataPath);
}

return new FileEventsRepositoryConfig(dataPath, serverConfig.MaxEventsPageSizeFromDisk);
})
.AddSingleton<IEventsRepository, FileEventsRepository>()
.AddHostedService<IncomingEventsPersistenceWorker>()
.AddSingleton(ctx =>
{
var serverConfig = ctx.GetRequiredService<IOptions<ServerConfig>>().Value;
return new MemoryWatcherSettings(serverConfig.MemoryWatcherInterval, serverConfig.MaxAllowedAllocatedBytes);
}).AddHostedService<MemoryWatcher>();
.AddEvenire(serverConfig)
.AddSingleton<EventMapper>();

var version = Assembly.GetExecutingAssembly().GetName().Version;

Expand Down
4 changes: 2 additions & 2 deletions src/EvenireDB.Server/Routes/EventsRoutes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static WebApplication MapEventsRoutes(this WebApplication app)
}

private static async IAsyncEnumerable<EventDTO> GetEvents(
[FromServices] EventsProvider provider,
[FromServices] IEventsProvider provider,
Guid streamId,
[FromQuery(Name = "pos")] uint startPosition = 0,
[FromQuery(Name = "dir")] Direction direction = Direction.Forward)
Expand All @@ -29,7 +29,7 @@ private static async IAsyncEnumerable<EventDTO> GetEvents(

private static async ValueTask<IResult> SaveEvents(
[FromServices] EventMapper mapper,
[FromServices] EventsProvider provider,
[FromServices] IEventsProvider provider,
Guid streamId,
[FromBody] EventDTO[]? dtos)
{
Expand Down
35 changes: 0 additions & 35 deletions src/EvenireDB.Server/ServerConfig.cs

This file was deleted.

4 changes: 4 additions & 0 deletions src/EvenireDB/CachedEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace EvenireDB
{
internal record CachedEvents(List<IEvent> Events, SemaphoreSlim Semaphore);
}
11 changes: 11 additions & 0 deletions src/EvenireDB/EvenireDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\EvenireDB.Common\EvenireDB.Common.csproj" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="EvenireDB.Benchmark" />

<InternalsVisibleTo Include="EvenireDB.Tests" />

<!-- Required for NSubstitute -->
<InternalsVisibleTo Include="DynamicProxyGenAssembly2"/>
</ItemGroup>

</Project>
38 changes: 38 additions & 0 deletions src/EvenireDB/EvenireServerSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace EvenireDB
{
public record class EvenireServerSettings
{
/// <summary>
/// max page size returned to clients
/// </summary>
public uint MaxPageSizeToClient { get; init; } = 100;

/// <summary>
/// max page size when parsing events from disk
/// </summary>
public uint MaxEventsPageSizeFromDisk { get; init; } = 100;

/// <summary>
/// max allowed event data size
/// </summary>
public uint MaxEventDataSize { get; init; } = 500_000;

public string? DataFolder { get; init; }

public int HttpPort { get; init; } = 16281;
public int GrpcPort { get; init; } = 16282;

/// <summary>
/// max number of streams to cache in memory
/// </summary>
public uint MaxInMemoryStreamsCount { get; init; } = 1000;

/// <summary>
/// max allowed memory allocated by the process. If exceeded, the system will try to recover
/// some memory by dropping some cached streams
/// </summary>
public long MaxAllowedAllocatedBytes { get; init; } = 1_000_000_000; // TODO: consider making this a function of max allowed streams count and max event data size

public TimeSpan MemoryWatcherInterval { get; init; } = TimeSpan.FromMinutes(2);
}
}
24 changes: 11 additions & 13 deletions src/EvenireDB/EventsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

namespace EvenireDB
{
public record CachedEvents(List<IEvent> Events, SemaphoreSlim Semaphore);

// TODO: logging
// TODO: append to a transaction log
public class EventsProvider
internal class EventsProvider : IEventsProvider
{
private readonly ICache<Guid, CachedEvents> _cache;
private readonly EventsProviderConfig _config;
Expand Down Expand Up @@ -47,8 +45,8 @@ private ValueTask<CachedEvents> EnsureCachedEventsAsync(Guid streamId, Cancellat

public async IAsyncEnumerable<IEvent> ReadAsync(
Guid streamId,
StreamPosition startPosition,
Direction direction = Direction.Forward,
StreamPosition startPosition,
Direction direction = Direction.Forward,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (startPosition < 0)
Expand All @@ -58,7 +56,7 @@ public async IAsyncEnumerable<IEvent> ReadAsync(

if (entry?.Events == null || entry.Events.Count == 0)
yield break;

uint totalCount = (uint)entry.Events.Count;
uint pos = startPosition;

Expand All @@ -69,24 +67,24 @@ public async IAsyncEnumerable<IEvent> ReadAsync(

uint j = 0, i = pos,
finalCount = Math.Min(_config.MaxPageSize, totalCount - i);

while (j++ != finalCount)
{
yield return entry.Events[(int)i++];
}
}
else
{
if(startPosition == StreamPosition.End)
{
if (startPosition == StreamPosition.End)
pos = totalCount - 1;

if (pos >= totalCount)
yield break;

uint j = 0, i = pos,
finalCount = Math.Min(_config.MaxPageSize, i + 1);
while(j++ != finalCount)

while (j++ != finalCount)
{
yield return entry.Events[(int)i--];
}
Expand All @@ -110,7 +108,7 @@ public async ValueTask<IOperationResult> AppendAsync(Guid streamId, IEnumerable<
return FailureResult.DuplicateEvent(duplicate);

var group = new IncomingEventsGroup(streamId, incomingEvents);
if(!_writer.TryWrite(group))
if (!_writer.TryWrite(group))
return FailureResult.CannotInitiateWrite(streamId);

UpdateCache(streamId, incomingEvents, entry);
Expand Down
3 changes: 0 additions & 3 deletions src/EvenireDB/FileEventsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
using System.Runtime.CompilerServices;
using System.Text;

[assembly: InternalsVisibleTo("EvenireDB.Benchmark")]
[assembly: InternalsVisibleTo("EvenireDB.Tests")]

namespace EvenireDB
{
public class FileEventsRepository : IEventsRepository
Expand Down
11 changes: 11 additions & 0 deletions src/EvenireDB/IEventsProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using EvenireDB.Common;
using System.Runtime.CompilerServices;

namespace EvenireDB
{
public interface IEventsProvider
{
ValueTask<IOperationResult> AppendAsync(Guid streamId, IEnumerable<IEvent> incomingEvents, CancellationToken cancellationToken = default);
IAsyncEnumerable<IEvent> ReadAsync(Guid streamId, StreamPosition startPosition, Direction direction = Direction.Forward, [EnumeratorCancellation] CancellationToken cancellationToken = default);

Check warning on line 9 in src/EvenireDB/IEventsProvider.cs

View workflow job for this annotation

GitHub Actions / build

The EnumeratorCancellationAttribute applied to parameter 'cancellationToken' will have no effect. The attribute is only effective on a parameter of type CancellationToken in an async-iterator method returning IAsyncEnumerable
}
}
Loading

0 comments on commit 6488dc3

Please sign in to comment.