Skip to content

Commit

Permalink
added streams info
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Jun 8, 2024
1 parent e164d9a commit 858879c
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 115 deletions.
2 changes: 1 addition & 1 deletion src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void GlobalSetup()
Directory.CreateDirectory(dataPath);

var repoConfig = new FileEventsRepositoryConfig();
var extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath));
var extentInfoProvider = new StreamInfoProvider(new StreamInfoProviderConfig(dataPath));
var dataRepo = new DataRepository();
var headersRepo = new HeadersRepository();
var repo = new EventsProvider(headersRepo, dataRepo, extentInfoProvider);
Expand Down
115 changes: 65 additions & 50 deletions src/EvenireDB.Server/Routes/StreamsRoutes.cs
Original file line number Diff line number Diff line change
@@ -1,64 +1,79 @@
using EvenireDB.Common;
using EvenireDB.Extents;
using EvenireDB.Server.DTO;
using Microsoft.AspNetCore.Mvc;

namespace EvenireDB.Server.Routes
namespace EvenireDB.Server.Routes;
public static class StreamsRoutes
{
//TODO: add endpoint to get all streams
public static class StreamsRoutes
public static WebApplication MapEventsRoutes(this WebApplication app)
{
public static WebApplication MapEventsRoutes(this WebApplication app)
{
var api = app.NewVersionedApi();
var v1 = api.MapGroup("/api/v{version:apiVersion}/streams")
.HasApiVersion(1.0);
v1.MapGet("/{streamId:guid}/events", GetEvents).WithName(nameof(GetEvents));
v1.MapPost("/{streamId:guid}/events", SaveEvents).WithName(nameof(SaveEvents));
var api = app.NewVersionedApi();
var v1 = api.MapGroup("/api/v{version:apiVersion}/streams")
.HasApiVersion(1.0);
v1.MapGet("", GetStreams).WithName(nameof(GetStreams));
v1.MapGet("/{streamId:guid}", GetStreamInfo).WithName(nameof(GetStreamInfo));
v1.MapGet("/{streamId:guid}/events", GetEvents).WithName(nameof(GetEvents));
v1.MapPost("/{streamId:guid}/events", SaveEvents).WithName(nameof(SaveEvents));

return app;
}
return app;
}

private static async IAsyncEnumerable<EventDTO> GetEvents(
[FromServices] IEventsReader reader,
Guid streamId,
[FromQuery(Name = "pos")] uint startPosition = 0,
[FromQuery(Name = "dir")] Direction direction = Direction.Forward)
{
await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition).ConfigureAwait(false))
yield return EventDTO.FromModel(@event);
}
private static IResult GetStreams(
[FromServices] IStreamInfoProvider provider)
{
var streams = provider.GetStreamsInfo();
return Results.Ok(streams);
}

private static async ValueTask<IResult> SaveEvents(
[FromServices] EventMapper mapper,
[FromServices] IEventsWriter writer,
Guid streamId,
[FromQuery(Name = "version")] int? expectedVersion,
[FromBody] EventDataDTO[]? dtos)
{
if(dtos is null)
return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, "No events provided"));
private static IResult GetStreamInfo(
[FromServices] IStreamInfoProvider provider,
Guid streamId)
{
var result = provider.GetStreamInfo(streamId);
return Results.Ok(result);
}

EventData[] events;
private static async IAsyncEnumerable<EventDTO> GetEvents(
[FromServices] IEventsReader reader,
Guid streamId,
[FromQuery(Name = "pos")] uint startPosition = 0,
[FromQuery(Name = "dir")] Direction direction = Direction.Forward)
{
await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition).ConfigureAwait(false))
yield return EventDTO.FromModel(@event);
}

try
{
events = mapper.ToModels(dtos);
}
catch(Exception ex)
{
return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, ex.Message));
}
private static async ValueTask<IResult> SaveEvents(
[FromServices] EventMapper mapper,
[FromServices] IEventsWriter writer,
Guid streamId,
[FromQuery(Name = "version")] int? expectedVersion,
[FromBody] EventDataDTO[]? dtos)
{
if (dtos is null)
return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, "No events provided"));

EventData[] events;

var result = await writer.AppendAsync(streamId, events, expectedVersion)
.ConfigureAwait(false);
return result switch
{
FailureResult { Code: ErrorCodes.DuplicateEvent } d => Results.Conflict(new ApiError(ErrorCodes.DuplicateEvent, d.Message)),
FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(new ApiError(ErrorCodes.VersionMismatch, d.Message)),
FailureResult { Code: ErrorCodes.BadRequest } d => Results.BadRequest(new ApiError(ErrorCodes.BadRequest, d.Message)),
FailureResult => Results.StatusCode(500),
_ => Results.AcceptedAtRoute(nameof(GetEvents), new { streamId })
};
try
{
events = mapper.ToModels(dtos);
}
}
catch (Exception ex)
{
return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, ex.Message));
}

var result = await writer.AppendAsync(streamId, events, expectedVersion)
.ConfigureAwait(false);
return result switch
{
FailureResult { Code: ErrorCodes.DuplicateEvent } d => Results.Conflict(new ApiError(ErrorCodes.DuplicateEvent, d.Message)),
FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(new ApiError(ErrorCodes.VersionMismatch, d.Message)),
FailureResult { Code: ErrorCodes.BadRequest } d => Results.BadRequest(new ApiError(ErrorCodes.BadRequest, d.Message)),
FailureResult => Results.StatusCode(500),
_ => Results.AcceptedAtRoute(nameof(GetEvents), new { streamId })
};
}
}
5 changes: 0 additions & 5 deletions src/EvenireDB/ExtentInfoProviderConfig.cs

This file was deleted.

13 changes: 6 additions & 7 deletions src/EvenireDB/Extents/ExtentInfo.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace EvenireDB.Extents
namespace EvenireDB.Extents;

public readonly struct ExtentInfo
{
public readonly struct ExtentInfo
{
public readonly required string DataPath { get; init; }
public readonly required string HeadersPath { get; init; }
}
}
public readonly required string DataPath { get; init; }
public readonly required string HeadersPath { get; init; }
}
26 changes: 0 additions & 26 deletions src/EvenireDB/Extents/ExtentInfoProvider.cs

This file was deleted.

7 changes: 0 additions & 7 deletions src/EvenireDB/Extents/IExtentInfoProvider.cs

This file was deleted.

8 changes: 8 additions & 0 deletions src/EvenireDB/Extents/IStreamInfoProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EvenireDB.Extents;

public interface IStreamInfoProvider
{
ExtentInfo GetExtentInfo(Guid streamId);
IEnumerable<StreamInfo> GetStreamsInfo();
StreamInfo GetStreamInfo(Guid streamId);
}
7 changes: 7 additions & 0 deletions src/EvenireDB/Extents/StreamInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace EvenireDB.Extents;

public readonly record struct StreamInfo(
Guid StreamId,
long EventsCount,
DateTimeOffset CreatedAt,
DateTimeOffset LastAccessedAt);
53 changes: 53 additions & 0 deletions src/EvenireDB/Extents/StreamInfoProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System.Runtime.InteropServices;

namespace EvenireDB.Extents;

internal record StreamInfoProviderConfig(string BasePath);

internal class StreamInfoProvider : IStreamInfoProvider
{
private readonly StreamInfoProviderConfig _config;
private readonly int _headerSize = Marshal.SizeOf<RawHeader>();

public StreamInfoProvider(StreamInfoProviderConfig config)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
if (!Directory.Exists(_config.BasePath))
Directory.CreateDirectory(config.BasePath);
}

public ExtentInfo GetExtentInfo(Guid streamId)
{
// TODO: tests
var key = streamId.ToString("N");
int extentNumber = 0; // TODO: calculate
return new ExtentInfo
{
DataPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"),
HeadersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"),
};
}

public StreamInfo GetStreamInfo(Guid streamId)
{
var extent = this.GetExtentInfo(streamId);

var fileInfo = new FileInfo(extent.HeadersPath);
var headersCount = fileInfo.Length / _headerSize;
return new StreamInfo(
streamId,
headersCount,
fileInfo.CreationTimeUtc,
fileInfo.LastWriteTimeUtc);
}

public IEnumerable<StreamInfo> GetStreamsInfo()
{
var headersFiles = Directory.GetFiles(_config.BasePath, "*_headers.dat");
foreach(var headerFile in headersFiles)
{
var key = Path.GetFileNameWithoutExtension(headerFile).Split('_')[0];
yield return GetStreamInfo(Guid.Parse(key));
}
}
}
4 changes: 2 additions & 2 deletions src/EvenireDB/IServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev
dataPath = Path.Combine(AppContext.BaseDirectory, dataPath);
}

return new ExtentInfoProviderConfig(dataPath);
return new StreamInfoProviderConfig(dataPath);
})
.AddSingleton<IExtentInfoProvider, ExtentInfoProvider>()
.AddSingleton<IStreamInfoProvider, StreamInfoProvider>()
.AddSingleton<IDataRepository, DataRepository>()
.AddSingleton<IHeadersRepository, HeadersRepository>()
.AddSingleton<IEventsProvider, EventsProvider>()
Expand Down
8 changes: 4 additions & 4 deletions src/EvenireDB/Persistence/EventsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ namespace EvenireDB.Persistence;

internal class EventsProvider : IEventsProvider
{
private readonly IExtentInfoProvider _extentInfoProvider;
private readonly IStreamInfoProvider _extentInfoProvider;
private readonly IHeadersRepository _headersRepo;
private readonly IDataRepository _dataRepo;
private readonly ConcurrentDictionary<Guid, SemaphoreSlim> _streamLocks = new();

public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, IExtentInfoProvider extentInfoProvider)
public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, IStreamInfoProvider extentInfoProvider)
{
_headersRepo = headersRepo;
_dataRepo = dataRepo;
Expand All @@ -24,7 +24,7 @@ public async IAsyncEnumerable<Event> ReadAsync(
int? take = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var extentInfo = _extentInfoProvider.Get(streamId);
var extentInfo = _extentInfoProvider.GetExtentInfo(streamId);
if (!File.Exists(extentInfo.DataPath) || !File.Exists(extentInfo.HeadersPath))
yield break;

Expand All @@ -39,7 +39,7 @@ public async ValueTask AppendAsync(
Guid streamId,
IEnumerable<Event> events, CancellationToken cancellationToken = default)
{
var extentInfo = _extentInfoProvider.Get(streamId);
var extentInfo = _extentInfoProvider.GetExtentInfo(streamId);

var semaphore = _streamLocks.GetOrAdd(streamId, _ => new SemaphoreSlim(1, 1));
await semaphore.WaitAsync(cancellationToken);
Expand Down
16 changes: 7 additions & 9 deletions src/EvenireDB/Persistence/HeadersRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace EvenireDB.Persistence;

internal class HeadersRepository : IHeadersRepository
{
private readonly int _headerSize = Marshal.SizeOf<RawHeader>();

public async ValueTask AppendAsync(ExtentInfo extentInfo, IAsyncEnumerable<RawHeader> headers, CancellationToken cancellationToken = default)
{
var headerSize = Marshal.SizeOf<RawHeader>();
Expand Down Expand Up @@ -38,24 +40,20 @@ public async IAsyncEnumerable<RawHeader> ReadAsync(
int? take = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var headerSize = Marshal.SizeOf<RawHeader>();

var streamBufferSize = headerSize * take.GetValueOrDefault(100);
var streamBufferSize = _headerSize * take.GetValueOrDefault(100);

using var stream = new FileStream(extentInfo.HeadersPath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: streamBufferSize, useAsync: true);

byte[] buffer = ArrayPool<byte>.Shared.Rent(headerSize);
byte[] buffer = ArrayPool<byte>.Shared.Rent(_headerSize);

try
{


if (skip.HasValue)
stream.Seek(skip.Value * headerSize, SeekOrigin.Begin);
stream.Seek(skip.Value * _headerSize, SeekOrigin.Begin);

while (!cancellationToken.IsCancellationRequested)
{
var bytesRead = await stream.ReadAsync(buffer, 0, headerSize, cancellationToken);
var bytesRead = await stream.ReadAsync(buffer, 0, _headerSize, cancellationToken);
if (bytesRead == 0)
yield break;

Expand All @@ -66,7 +64,7 @@ public async IAsyncEnumerable<RawHeader> ReadAsync(
yield break;
}

var header = MemoryMarshal.Read<RawHeader>(buffer.AsSpan(0, headerSize));
var header = MemoryMarshal.Read<RawHeader>(buffer.AsSpan(0, _headerSize));
yield return header;
}
}
Expand Down
8 changes: 4 additions & 4 deletions tests/EvenireDB.Tests/EventsProviderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ public EventsProviderTests(DataFixture fixture)
_fixture = fixture;
}

private EventsProvider CreateSut(Guid streamId, out IExtentInfoProvider extentInfoProvider)
private EventsProvider CreateSut(Guid streamId, out IStreamInfoProvider extentInfoProvider)
{
var config = _fixture.CreateExtentsConfig(streamId);
extentInfoProvider = new ExtentInfoProvider(config);
extentInfoProvider = new StreamInfoProvider(config);
var dataRepo = new DataRepository();
var headersRepo = new HeadersRepository();
return new EventsProvider(headersRepo, dataRepo, extentInfoProvider);
Expand All @@ -34,7 +34,7 @@ public async Task AppendAsync_should_write_events(int eventsCount, int expectedF

await sut.AppendAsync(streamId, events);

var extentInfo = extentInfoProvider.Get(streamId);
var extentInfo = extentInfoProvider.GetExtentInfo(streamId);
var bytes = File.ReadAllBytes(extentInfo.DataPath);
Assert.Equal(expectedFileSize, bytes.Length);
}
Expand All @@ -54,7 +54,7 @@ public async Task AppendAsync_should_append_events(int batchesCount, int eventsP
foreach (var events in batches)
await sut.AppendAsync(streamId, events);

var extentInfo = extentInfoProvider.Get(streamId);
var extentInfo = extentInfoProvider.GetExtentInfo(streamId);
var bytes = File.ReadAllBytes(extentInfo.DataPath);
Assert.Equal(expectedFileSize, bytes.Length);
}
Expand Down

0 comments on commit 858879c

Please sign in to comment.