Skip to content

Commit

Permalink
added stream start position
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Sep 28, 2023
1 parent c02b5e7 commit c3ea763
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/EvenireDB.Benchmark/FileEventsRepositoryBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void Setup()
public async Task WriteAsync_Baseline(IEvent[] events)
{
var sut = new FileEventsRepository(_repoConfig, _factory);
await sut.WriteAsync(Guid.NewGuid(), events);
await sut.AppendAsync(Guid.NewGuid(), events);
}

public IEnumerable<IEvent[]> Data()
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB.Server/IncomingEventsPersistenceWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private async Task ExecuteAsyncCore(CancellationToken cancellationToken)
{
try
{
await _repo.WriteAsync(group.AggregateId, group.Events, cancellationToken)
await _repo.AppendAsync(group.AggregateId, group.Events, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
8 changes: 8 additions & 0 deletions src/EvenireDB/Direction.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

using System.Runtime.CompilerServices;

namespace EvenireDB
{
//TODO: consider add versioning on events file
Expand All @@ -8,4 +10,10 @@ public enum Direction
Forward = 0,
Backward = 1,
}

public enum StreamPosition : long
{
Start = 0,
End = long.MaxValue
}
}
22 changes: 11 additions & 11 deletions src/EvenireDB/EventsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ private async ValueTask<List<IEvent>> ReadAllPersistedAsync(Guid streamId, Cance
{
List<IEvent> results = new();
IEnumerable<IEvent> tmpEvents;
int skip = 0;
long startPosition = 0;
while (true)
{
tmpEvents = await _repo.ReadAsync(streamId, direction: Direction.Forward, skip: skip, cancellationToken: cancellationToken).ConfigureAwait(false);
tmpEvents = await _repo.ReadAsync(streamId, direction: Direction.Forward, startPosition: startPosition, cancellationToken: cancellationToken).ConfigureAwait(false);
if (tmpEvents is null || tmpEvents.Count() == 0)
break;
results.AddRange(tmpEvents);
skip += tmpEvents.Count();
startPosition += tmpEvents.Count();
}
return results;
}
Expand Down Expand Up @@ -66,24 +66,24 @@ private async ValueTask<CachedEvents> EnsureCachedEventsAsync(Guid streamId, str
return entry;
}

public async ValueTask<IEnumerable<IEvent>> GetPageAsync(Guid streamId, int skip = 0, CancellationToken cancellationToken = default)
public async ValueTask<IEnumerable<IEvent>> GetPageAsync(Guid streamId, long startPosition = 0, CancellationToken cancellationToken = default)
{
if (skip < 0)
throw new ArgumentOutOfRangeException(nameof(skip));
if (startPosition < 0)
throw new ArgumentOutOfRangeException(nameof(startPosition));

var key = streamId.ToString();

CachedEvents entry = await EnsureCachedEventsAsync(streamId, key, cancellationToken).ConfigureAwait(false);

if (entry.Events == null || entry.Events.Count == 0 || entry.Events.Count < skip)
if (entry.Events == null || entry.Events.Count == 0 || entry.Events.Count < startPosition)
return Enumerable.Empty<IEvent>();

var end = Math.Min(skip + _config.MaxPageSize, entry.Events.Count);
var count = end - skip;
var end = Math.Min(startPosition + _config.MaxPageSize, entry.Events.Count);
var count = end - startPosition;
var results = new IEvent[count];
var j = 0;
for (var i = skip; i < end; i++)
results[j++] = entry.Events[i];
for (var i = startPosition; i < end; i++)
results[j++] = entry.Events[(int)i]; //TODO: I'm not very proud of this cast to int.
entry.Events.Except(results);
return results;
}
Expand Down
22 changes: 15 additions & 7 deletions src/EvenireDB/FileEventsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private string GetStreamPath(Guid streamId, string type)
private async Task<(List<RawEventHeader>, int)> ReadHeadersAsync(
string headersPath,
Direction direction,
int skip,
long startPosition,
CancellationToken cancellationToken)
{
var headers = new List<RawEventHeader>(_config.MaxPageSize);
Expand All @@ -42,7 +42,7 @@ private string GetStreamPath(Guid streamId, string type)

headersStream.Position =
direction == Direction.Forward ?
RawEventHeader.SIZE * skip :
RawEventHeader.SIZE * startPosition :
headersStream.Length - RawEventHeader.SIZE * _config.MaxPageSize;

int dataBufferSize = 0; // TODO: this should probably be a long, but it would then require chunking from the data stream
Expand Down Expand Up @@ -72,10 +72,18 @@ private string GetStreamPath(Guid streamId, string type)
return (headers, dataBufferSize);
}

public async ValueTask<IEnumerable<IEvent>> ReadAsync(Guid streamId, Direction direction = Direction.Forward, int skip = 0, CancellationToken cancellationToken = default)
public async ValueTask<IEnumerable<IEvent>> ReadAsync(
Guid streamId,
Direction direction = Direction.Forward,
long startPosition = (long)StreamPosition.Start,
CancellationToken cancellationToken = default)
{
if (skip < 0)
throw new ArgumentOutOfRangeException(nameof(skip));
if (startPosition < 0)
throw new ArgumentOutOfRangeException(nameof(startPosition));

if(direction == Direction.Backward &&
startPosition != (long)StreamPosition.End)
throw new ArgumentOutOfRangeException(nameof(startPosition), "When reading backwards, the start position must be set to StreamPosition.End");

string headersPath = GetStreamPath(streamId, HeadersFileSuffix);
string dataPath = GetStreamPath(streamId, DataFileSuffix);
Expand All @@ -85,7 +93,7 @@ public async ValueTask<IEnumerable<IEvent>> ReadAsync(Guid streamId, Direction d
// first, read all the headers, which are stored sequentially
// this will allow later on pulling the data for all the events in one go

var (headers, dataBufferSize) = await this.ReadHeadersAsync(headersPath, direction, skip, cancellationToken)
var (headers, dataBufferSize) = await this.ReadHeadersAsync(headersPath, direction, startPosition, cancellationToken)
.ConfigureAwait(false);

// we exit early if no headers found
Expand Down Expand Up @@ -128,7 +136,7 @@ await dataStream.ReadAsync(dataBuffer, 0, dataBufferSize, cancellationToken)
return results;
}

public async ValueTask WriteAsync(Guid streamId, IEnumerable<IEvent> events, CancellationToken cancellationToken = default)
public async ValueTask AppendAsync(Guid streamId, IEnumerable<IEvent> events, CancellationToken cancellationToken = default)
{
string dataPath = GetStreamPath(streamId, DataFileSuffix);
using var dataStream = new FileStream(dataPath, FileMode.Append, FileAccess.Write, FileShare.Read);
Expand Down
4 changes: 2 additions & 2 deletions src/EvenireDB/IEventsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
public interface IEventsRepository
{
ValueTask<IEnumerable<IEvent>> ReadAsync(Guid streamId, Direction direction = Direction.Forward, int skip = 0, CancellationToken cancellationToken = default);
ValueTask WriteAsync(Guid streamId, IEnumerable<IEvent> events, CancellationToken cancellationToken = default);
ValueTask<IEnumerable<IEvent>> ReadAsync(Guid streamId, Direction direction = Direction.Forward, long startPosition = (long)StreamPosition.Start, CancellationToken cancellationToken = default);
ValueTask AppendAsync(Guid streamId, IEnumerable<IEvent> events, CancellationToken cancellationToken = default);
}
}
4 changes: 2 additions & 2 deletions tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public async Task Service_should_handle_exceptions_gracefully()
{
var channel = Channel.CreateBounded<IncomingEventsGroup>(10);
var repo = Substitute.For<IEventsRepository>();
repo.WhenForAnyArgs(r => r.WriteAsync(Arg.Any<Guid>(), null, default))
repo.WhenForAnyArgs(r => r.AppendAsync(Arg.Any<Guid>(), null, default))
.Throw<Exception>();

var logger = Substitute.For<ILogger<IncomingEventsPersistenceWorker>>();
Expand All @@ -29,7 +29,7 @@ public async Task Service_should_handle_exceptions_gracefully()
await sut.StopAsync(default);

await repo.ReceivedWithAnyArgs(groups.Length)
.WriteAsync(Arg.Any<Guid>(), null, default);
.AppendAsync(Arg.Any<Guid>(), null, default);
}
}
}
42 changes: 28 additions & 14 deletions tests/EvenireDB.Tests/FileEventsRepositoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ public FileEventsRepositoryTests(DataFixture fixture)
[Theory]
[InlineData(1, 1)]
[InlineData(10, 10)]
public async Task WriteAsync_should_write_events(int eventsCount, int expectedFileSize)
public async Task AppendAsync_should_write_events(int eventsCount, int expectedFileSize)
{
var events = _fixture.BuildEvents(eventsCount, new byte[] { 0x42 });

var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
await sut.WriteAsync(streamId, events).ConfigureAwait(false);
await sut.AppendAsync(streamId, events).ConfigureAwait(false);

var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat");
var bytes = File.ReadAllBytes(eventsFilePath);
Expand All @@ -28,7 +28,7 @@ public async Task WriteAsync_should_write_events(int eventsCount, int expectedFi

[Theory]
[InlineData(10, 10, 100)]
public async Task WriteAsync_should_append_events(int batchesCount, int eventsPerBatch, int expectedFileSize)
public async Task AppendAsync_should_append_events(int batchesCount, int eventsPerBatch, int expectedFileSize)
{
var batches = Enumerable.Range(0, batchesCount)
.Select(b => _fixture.BuildEvents(eventsPerBatch, new byte[] { 0x42 }))
Expand All @@ -38,7 +38,7 @@ public async Task WriteAsync_should_append_events(int batchesCount, int eventsPe
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
foreach(var events in batches)
await sut.WriteAsync(streamId, events).ConfigureAwait(false);
await sut.AppendAsync(streamId, events).ConfigureAwait(false);

var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat");
var bytes = File.ReadAllBytes(eventsFilePath);
Expand All @@ -55,7 +55,7 @@ public async Task ReadAsync_should_read_events(int eventsCount)
var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false);
await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false);

var events = await sut.ReadAsync(streamId).ConfigureAwait(false);
events.Should().NotBeNullOrEmpty()
Expand All @@ -71,13 +71,27 @@ public async Task ReadAsync_should_read_events_backwards()
var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
await sut.WriteAsync(streamId, inputEvents).ConfigureAwait(false);
await sut.AppendAsync(streamId, inputEvents).ConfigureAwait(false);

var events = await sut.ReadAsync(streamId, direction: Direction.Backward).ConfigureAwait(false);
var events = await sut.ReadAsync(streamId, direction: Direction.Backward, startPosition: (long)StreamPosition.End).ConfigureAwait(false);
events.Should().NotBeNullOrEmpty()
.And.BeEquivalentTo(expectedEvents);
}

[Fact]
public async Task ReadAsync_should_throw_when_reading_events_backwards_and_position_not_stream_end()
{
var inputEvents = _fixture.BuildEvents(142);
var expectedEvents = inputEvents.Reverse().ToArray();

var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
await sut.AppendAsync(streamId, inputEvents).ConfigureAwait(false);

await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await sut.ReadAsync(streamId, direction: Direction.Backward, startPosition: 42));
}

[Theory]
[InlineData(10, 10, 100)]
public async Task ReadAsync_should_read_subsequent_events(int batchesCount, int eventsPerBatch, int expectedFileSize)
Expand All @@ -90,7 +104,7 @@ public async Task ReadAsync_should_read_subsequent_events(int batchesCount, int
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
foreach (var events in batches)
await sut.WriteAsync(streamId, events).ConfigureAwait(false);
await sut.AppendAsync(streamId, events).ConfigureAwait(false);

var expectedEvents = batches.SelectMany(e => e).ToArray();

Expand All @@ -109,9 +123,9 @@ public async Task ReadAsync_should_read_events_when_offset_provided()
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));

await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false);
await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false);

var events = await sut.ReadAsync(streamId, skip: 42).ConfigureAwait(false);
var events = await sut.ReadAsync(streamId, startPosition: 42).ConfigureAwait(false);
events.Should().NotBeNullOrEmpty()
.And.HaveCount(100);
events.ElementAt(0).Id.Should().Be(expectedEvent.Id);
Expand All @@ -125,9 +139,9 @@ public async Task ReadAsync_should_read_only_page_size_events()
var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false);
await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false);

var events = await sut.ReadAsync(streamId, skip: 0).ConfigureAwait(false);
var events = await sut.ReadAsync(streamId, startPosition: 0).ConfigureAwait(false);
events.Should().NotBeNullOrEmpty()
.And.HaveCount(100);
}
Expand All @@ -140,9 +154,9 @@ public async Task ReadAsync_should_return_empty_collection_when_skip_to_high()
var streamId = Guid.NewGuid();
var config = _fixture.CreateRepoConfig(streamId);
var sut = new FileEventsRepository(config, new EventFactory(1000));
await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false);
await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false);

var events = await sut.ReadAsync(streamId, skip: 11).ConfigureAwait(false);
var events = await sut.ReadAsync(streamId, startPosition: 11).ConfigureAwait(false);
events.Should().NotBeNull()
.And.BeEmpty();
}
Expand Down

0 comments on commit c3ea763

Please sign in to comment.