Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Jan 21, 2024
1 parent a52e0ed commit baa2ae8
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 57 deletions.
6 changes: 4 additions & 2 deletions src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using BenchmarkDotNet.Engines;
using EvenireDB;
using EvenireDB.Common;
using EvenireDB.Extents;
using EvenireDB.Utils;
using Microsoft.Extensions.Logging.Abstractions;
using System.Threading.Channels;
Expand All @@ -26,8 +27,9 @@ public void GlobalSetup()
if (!Directory.Exists(dataPath))
Directory.CreateDirectory(dataPath);

var repoConfig = new FileEventsRepositoryConfig(dataPath);
var repo = new FileEventsRepository(repoConfig);
var repoConfig = new FileEventsRepositoryConfig();
var extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath));
var repo = new FileEventsRepository(repoConfig, extentInfoProvider);

var cache = new EventsCache(
new NullLogger<EventsCache>(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using BenchmarkDotNet.Attributes;
using EvenireDB;
using Microsoft.Diagnostics.Tracing.Parsers.Kernel;
using EvenireDB.Extents;
using System.Buffers;
using System.Runtime.CompilerServices;

public class FileEventsRepositoryWriteBenckmarks
{
private readonly static byte[] _data = Enumerable.Repeat((byte)42, 100).ToArray();

private FileEventsRepositoryConfig _repoConfig;
private IExtentInfoProvider _extentInfoProvider;
private IEventDataValidator _factory;

private Event[] BuildEvents(int count)
Expand All @@ -24,14 +24,15 @@ public void Setup()
Directory.CreateDirectory(dataPath);

_factory = new EventDataValidator(500_000);
_repoConfig = new FileEventsRepositoryConfig(dataPath);
_extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath));
_repoConfig = new FileEventsRepositoryConfig();
}

[Benchmark(Baseline = true)]
[ArgumentsSource(nameof(Data))]
public async Task WriteAsync_Baseline(Event[] events)
{
var sut = new FileEventsRepository(_repoConfig);
var sut = new FileEventsRepository(_repoConfig, _extentInfoProvider);
await sut.AppendAsync(Guid.NewGuid(), events);
}

Expand Down
4 changes: 1 addition & 3 deletions src/EvenireDB/EvenireDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
Expand All @@ -20,7 +19,6 @@

<ItemGroup>
<InternalsVisibleTo Include="EvenireDB.Benchmark" />

<InternalsVisibleTo Include="EvenireDB.Tests" />

<!-- Required for NSubstitute -->
Expand Down
5 changes: 5 additions & 0 deletions src/EvenireDB/ExtentInfoProviderConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

namespace EvenireDB
{
internal record ExtentInfoProviderConfig(string BasePath);
}
8 changes: 8 additions & 0 deletions src/EvenireDB/Extents/ExtentInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EvenireDB.Extents
{
public readonly struct ExtentInfo
{
public readonly required string DataPath { get; init; }
public readonly required string HeadersPath { get; init; }
}
}
26 changes: 26 additions & 0 deletions src/EvenireDB/Extents/ExtentInfoProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace EvenireDB.Extents
{
internal class ExtentInfoProvider : IExtentInfoProvider
{
private readonly ExtentInfoProviderConfig _config;

public ExtentInfoProvider(ExtentInfoProviderConfig config)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
if (!Directory.Exists(_config.BasePath))
Directory.CreateDirectory(config.BasePath);
}

public ExtentInfo GetLatest(Guid streamId)
{
// TODO: tests
var key = streamId.ToString("N");
int extentNumber = 0; // TODO: calculate
return new ExtentInfo
{
DataPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"),
HeadersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"),
};
}
}
}
7 changes: 7 additions & 0 deletions src/EvenireDB/Extents/IExtentInfoProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace EvenireDB.Extents
{
public interface IExtentInfoProvider
{
ExtentInfo GetLatest(Guid streamId);
}
}
49 changes: 21 additions & 28 deletions src/EvenireDB/FileEventsRepository.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,35 @@
using EvenireDB.Common;
using EvenireDB.Extents;
using System.Buffers;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Text;

namespace EvenireDB
{
public class FileEventsRepository : IEventsRepository
internal class FileEventsRepository : IEventsRepository
{
private readonly ConcurrentDictionary<string, byte[]> _eventTypes = new();
private readonly FileEventsRepositoryConfig _config;
private readonly ConcurrentDictionary<string, byte[]> _eventTypes = new();
private readonly IExtentInfoProvider _extentInfoProvider;

private const string DataFileSuffix = "_data";
private const string HeadersFileSuffix = "_headers";

public FileEventsRepository(FileEventsRepositoryConfig config)
public FileEventsRepository(
FileEventsRepositoryConfig config,
IExtentInfoProvider extentInfoProvider)
{
_extentInfoProvider = extentInfoProvider ?? throw new ArgumentNullException(nameof(extentInfoProvider));
_config = config ?? throw new ArgumentNullException(nameof(config));

if (!Directory.Exists(_config.BasePath))
Directory.CreateDirectory(config.BasePath);
}

private string GetStreamPath(Guid streamId, string type)
=> Path.Combine(_config.BasePath, $"{streamId}{type}.dat");

public async IAsyncEnumerable<Event> ReadAsync(
Guid streamId,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
string headersPath = GetStreamPath(streamId, HeadersFileSuffix);
string dataPath = GetStreamPath(streamId, DataFileSuffix);
if (!File.Exists(headersPath) || !File.Exists(dataPath))
{
var extentInfo = _extentInfoProvider.GetLatest(streamId);
if (!File.Exists(extentInfo.DataPath) || !File.Exists(extentInfo.HeadersPath))
yield break;

using var headersStream = new FileStream(headersPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);

var headers = new List<RawEventHeader>();
int dataBufferSize = 0;
Expand Down Expand Up @@ -74,12 +69,12 @@ public async IAsyncEnumerable<Event> ReadAsync(

// now we read the data for all the events in a single buffer
// so that we can parse it directly, avoiding accessing the file any longer
using var dataStream = new FileStream(dataPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
dataStream.Position = headers[0].DataPosition;

var dataBuffer = ArrayPool<byte>.Shared.Rent(dataBufferSize);
var dataBufferMem = dataBuffer.AsMemory(0, dataBufferSize);

try
{
await dataStream.ReadAsync(dataBufferMem, cancellationToken)
Expand All @@ -96,7 +91,7 @@ await dataStream.ReadAsync(dataBufferMem, cancellationToken)
// if skip is specified, when calculating the source offset we need to subtract the position of the first block of data
// because the stream position was already set after opening it
long srcOffset = headers[i].DataPosition - headers[0].DataPosition;

// need to copy the memory here as the source array is rented
// AND we need to give the event data to the calling client
dataBufferMem.Slice((int)srcOffset, headers[i].EventDataLength)
Expand All @@ -110,16 +105,14 @@ await dataStream.ReadAsync(dataBufferMem, cancellationToken)
finally
{
ArrayPool<byte>.Shared.Return(dataBuffer);
}
}
}

public async ValueTask AppendAsync(Guid streamId, IEnumerable<Event> events, CancellationToken cancellationToken = default)
{
string dataPath = GetStreamPath(streamId, DataFileSuffix);
using var dataStream = new FileStream(dataPath, FileMode.Append, FileAccess.Write, FileShare.Read);

string headersPath = GetStreamPath(streamId, HeadersFileSuffix);
using var headersStream = new FileStream(headersPath, FileMode.Append, FileAccess.Write, FileShare.Read);
var extentInfo = _extentInfoProvider.GetLatest(streamId);
using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Append, FileAccess.Write, FileShare.Read);
using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Append, FileAccess.Write, FileShare.Read);

var eventsCount = events.Count();

Expand All @@ -133,7 +126,7 @@ public async ValueTask AppendAsync(Guid streamId, IEnumerable<Event> events, Can

var eventType = _eventTypes.GetOrAdd(@event.Type, static type =>
{
var dest = new byte[Constants.MAX_EVENT_TYPE_LENGTH];
var dest = new byte[Constants.MAX_EVENT_TYPE_LENGTH];
Encoding.UTF8.GetBytes(type, dest);
return dest;
});
Expand All @@ -155,7 +148,7 @@ await headersStream.WriteAsync(headerBuffer, 0, RawEventHeader.SIZE, cancellatio
finally
{
ArrayPool<byte>.Shared.Return(headerBuffer);
}
}

await dataStream.FlushAsync().ConfigureAwait(false);
await headersStream.FlushAsync().ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB/FileEventsRepositoryConfig.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

namespace EvenireDB
{
public record FileEventsRepositoryConfig(string BasePath, uint MaxPageSize = 100);
public record FileEventsRepositoryConfig(uint MaxPageSize = 100);
}
7 changes: 5 additions & 2 deletions src/EvenireDB/IServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using EvenireDB.Server;
using EvenireDB.Extents;
using EvenireDB.Server;
using EvenireDB.Utils;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -38,6 +39,7 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev
{
return new EventDataValidator(settings.MaxEventDataSize);
})
.AddSingleton(new FileEventsRepositoryConfig(settings.MaxEventsPageSizeFromDisk))
.AddSingleton(ctx =>
{
var dataPath = settings.DataFolder;
Expand All @@ -49,8 +51,9 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev
dataPath = Path.Combine(AppContext.BaseDirectory, dataPath);
}

return new FileEventsRepositoryConfig(dataPath, settings.MaxEventsPageSizeFromDisk);
return new ExtentInfoProviderConfig(dataPath);
})
.AddSingleton<IExtentInfoProvider, ExtentInfoProvider>()
.AddSingleton<IEventsRepository, FileEventsRepository>()
.AddHostedService<IncomingEventsPersistenceWorker>()
.AddSingleton(ctx =>
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB/RawEventHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace EvenireDB
{
//TODO: evaluate https://github.com/MessagePack-CSharp/MessagePack-CSharp
//TODO: evaluate https://github.com/Cysharp/MemoryPack or https://github.com/MessagePack-CSharp/MessagePack-CSharp
internal readonly struct RawEventHeader
{
public readonly long EventIdTimestamp;
Expand Down
7 changes: 3 additions & 4 deletions tests/EvenireDB.Tests/DataFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
public class DataFixture : IAsyncLifetime
{
private const string BaseDataPath = "./data/";
private readonly List<FileEventsRepositoryConfig> _configs = new();
private readonly IEventDataValidator _factory = new EventDataValidator(1000);
private readonly List<ExtentInfoProviderConfig> _configs = new();

public FileEventsRepositoryConfig CreateRepoConfig(Guid aggregateId)
internal ExtentInfoProviderConfig CreateExtentsConfig(Guid aggregateId)
{
var path = Path.Combine(BaseDataPath, aggregateId.ToString());
Directory.CreateDirectory(path);
var config = new FileEventsRepositoryConfig(path);
var config = new ExtentInfoProviderConfig(path);
_configs.Add(config);
return config;
}
Expand Down
28 changes: 16 additions & 12 deletions tests/EvenireDB.Tests/FileEventsRepositoryTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using EvenireDB.Extents;

namespace EvenireDB.Tests
{
public class FileEventsRepositoryTests : IClassFixture<DataFixture>
Expand All @@ -17,12 +19,13 @@ public async Task AppendAsync_should_write_events(int eventsCount, int expectedF
var events = _fixture.BuildEvents(eventsCount, new byte[] { 0x42 });

var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config);
var config = _fixture.CreateExtentsConfig(streamId);
var extentInfoProvider = new ExtentInfoProvider(config);
var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), extentInfoProvider);
await sut.AppendAsync(streamId, events).ConfigureAwait(false);

var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat");
var bytes = File.ReadAllBytes(eventsFilePath);
var extentInfo = extentInfoProvider.GetLatest(streamId);
var bytes = File.ReadAllBytes(extentInfo.DataPath);
Assert.Equal(expectedFileSize, bytes.Length);
}

Expand All @@ -35,13 +38,14 @@ public async Task AppendAsync_should_append_events(int batchesCount, int eventsP
.ToArray();

var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config);
var config = _fixture.CreateExtentsConfig(streamId);
var extentInfoProvider = new ExtentInfoProvider(config);
var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), extentInfoProvider);
foreach (var events in batches)
await sut.AppendAsync(streamId, events).ConfigureAwait(false);

var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat");
var bytes = File.ReadAllBytes(eventsFilePath);
var extentInfo = extentInfoProvider.GetLatest(streamId);
var bytes = File.ReadAllBytes(extentInfo.DataPath);
Assert.Equal(expectedFileSize, bytes.Length);
}

Expand All @@ -53,8 +57,8 @@ public async Task ReadAsync_should_read_entire_stream(int eventsCount)
var expectedEvents = _fixture.BuildEvents(eventsCount);

var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config);
var config = _fixture.CreateExtentsConfig(streamId);
var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), new ExtentInfoProvider(config));
await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false);

var events = await sut.ReadAsync(streamId).ToArrayAsync().ConfigureAwait(false);
Expand All @@ -78,8 +82,8 @@ public async Task ReadAsync_should_read_events_appended_in_batches(int batchesCo
.ToArray();

var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config);
var config = _fixture.CreateExtentsConfig(streamId);
var sut = new FileEventsRepository(new FileEventsRepositoryConfig(), new ExtentInfoProvider(config));
foreach (var events in batches)
await sut.AppendAsync(streamId, events).ConfigureAwait(false);

Expand Down

0 comments on commit baa2ae8

Please sign in to comment.