From c3ea763f4c5ac84bc6f75e992087069574ce9667 Mon Sep 17 00:00:00 2001 From: David Guida Date: Wed, 27 Sep 2023 21:22:45 -0400 Subject: [PATCH] added stream start position --- .../FileEventsRepositoryBenckmarks.cs | 2 +- .../IncomingEventsPersistenceWorker.cs | 2 +- src/EvenireDB/Direction.cs | 8 ++++ src/EvenireDB/EventsProvider.cs | 22 +++++----- src/EvenireDB/FileEventsRepository.cs | 22 ++++++---- src/EvenireDB/IEventsRepository.cs | 4 +- .../IncomingEventsSubscriberTests.cs | 4 +- .../FileEventsRepositoryTests.cs | 42 ++++++++++++------- 8 files changed, 68 insertions(+), 38 deletions(-) diff --git a/src/EvenireDB.Benchmark/FileEventsRepositoryBenckmarks.cs b/src/EvenireDB.Benchmark/FileEventsRepositoryBenckmarks.cs index 86a0d31..bebfe56 100644 --- a/src/EvenireDB.Benchmark/FileEventsRepositoryBenckmarks.cs +++ b/src/EvenireDB.Benchmark/FileEventsRepositoryBenckmarks.cs @@ -29,7 +29,7 @@ public void Setup() public async Task WriteAsync_Baseline(IEvent[] events) { var sut = new FileEventsRepository(_repoConfig, _factory); - await sut.WriteAsync(Guid.NewGuid(), events); + await sut.AppendAsync(Guid.NewGuid(), events); } public IEnumerable Data() diff --git a/src/EvenireDB.Server/IncomingEventsPersistenceWorker.cs b/src/EvenireDB.Server/IncomingEventsPersistenceWorker.cs index 5d59ed6..c0906e3 100644 --- a/src/EvenireDB.Server/IncomingEventsPersistenceWorker.cs +++ b/src/EvenireDB.Server/IncomingEventsPersistenceWorker.cs @@ -31,7 +31,7 @@ private async Task ExecuteAsyncCore(CancellationToken cancellationToken) { try { - await _repo.WriteAsync(group.AggregateId, group.Events, cancellationToken) + await _repo.AppendAsync(group.AggregateId, group.Events, cancellationToken) .ConfigureAwait(false); } catch (Exception ex) diff --git a/src/EvenireDB/Direction.cs b/src/EvenireDB/Direction.cs index 8d20822..c9bd871 100644 --- a/src/EvenireDB/Direction.cs +++ b/src/EvenireDB/Direction.cs @@ -1,4 +1,6 @@ +using System.Runtime.CompilerServices; + namespace EvenireDB { //TODO: consider add versioning on events file @@ -8,4 +10,10 @@ public enum Direction Forward = 0, Backward = 1, } + + public enum StreamPosition : long + { + Start = 0, + End = long.MaxValue + } } \ No newline at end of file diff --git a/src/EvenireDB/EventsProvider.cs b/src/EvenireDB/EventsProvider.cs index f355d46..cc29c00 100644 --- a/src/EvenireDB/EventsProvider.cs +++ b/src/EvenireDB/EventsProvider.cs @@ -28,14 +28,14 @@ private async ValueTask> ReadAllPersistedAsync(Guid streamId, Cance { List results = new(); IEnumerable tmpEvents; - int skip = 0; + long startPosition = 0; while (true) { - tmpEvents = await _repo.ReadAsync(streamId, direction: Direction.Forward, skip: skip, cancellationToken: cancellationToken).ConfigureAwait(false); + tmpEvents = await _repo.ReadAsync(streamId, direction: Direction.Forward, startPosition: startPosition, cancellationToken: cancellationToken).ConfigureAwait(false); if (tmpEvents is null || tmpEvents.Count() == 0) break; results.AddRange(tmpEvents); - skip += tmpEvents.Count(); + startPosition += tmpEvents.Count(); } return results; } @@ -66,24 +66,24 @@ private async ValueTask EnsureCachedEventsAsync(Guid streamId, str return entry; } - public async ValueTask> GetPageAsync(Guid streamId, int skip = 0, CancellationToken cancellationToken = default) + public async ValueTask> GetPageAsync(Guid streamId, long startPosition = 0, CancellationToken cancellationToken = default) { - if (skip < 0) - throw new ArgumentOutOfRangeException(nameof(skip)); + if (startPosition < 0) + throw new ArgumentOutOfRangeException(nameof(startPosition)); var key = streamId.ToString(); CachedEvents entry = await EnsureCachedEventsAsync(streamId, key, cancellationToken).ConfigureAwait(false); - if (entry.Events == null || entry.Events.Count == 0 || entry.Events.Count < skip) + if (entry.Events == null || entry.Events.Count == 0 || entry.Events.Count < startPosition) return Enumerable.Empty(); - var end = Math.Min(skip + _config.MaxPageSize, entry.Events.Count); - var count = end - skip; + var end = Math.Min(startPosition + _config.MaxPageSize, entry.Events.Count); + var count = end - startPosition; var results = new IEvent[count]; var j = 0; - for (var i = skip; i < end; i++) - results[j++] = entry.Events[i]; + for (var i = startPosition; i < end; i++) + results[j++] = entry.Events[(int)i]; //TODO: I'm not very proud of this cast to int. entry.Events.Except(results); return results; } diff --git a/src/EvenireDB/FileEventsRepository.cs b/src/EvenireDB/FileEventsRepository.cs index bc1eb2e..af7fca1 100644 --- a/src/EvenireDB/FileEventsRepository.cs +++ b/src/EvenireDB/FileEventsRepository.cs @@ -33,7 +33,7 @@ private string GetStreamPath(Guid streamId, string type) private async Task<(List, int)> ReadHeadersAsync( string headersPath, Direction direction, - int skip, + long startPosition, CancellationToken cancellationToken) { var headers = new List(_config.MaxPageSize); @@ -42,7 +42,7 @@ private string GetStreamPath(Guid streamId, string type) headersStream.Position = direction == Direction.Forward ? - RawEventHeader.SIZE * skip : + RawEventHeader.SIZE * startPosition : headersStream.Length - RawEventHeader.SIZE * _config.MaxPageSize; int dataBufferSize = 0; // TODO: this should probably be a long, but it would then require chunking from the data stream @@ -72,10 +72,18 @@ private string GetStreamPath(Guid streamId, string type) return (headers, dataBufferSize); } - public async ValueTask> ReadAsync(Guid streamId, Direction direction = Direction.Forward, int skip = 0, CancellationToken cancellationToken = default) + public async ValueTask> ReadAsync( + Guid streamId, + Direction direction = Direction.Forward, + long startPosition = (long)StreamPosition.Start, + CancellationToken cancellationToken = default) { - if (skip < 0) - throw new ArgumentOutOfRangeException(nameof(skip)); + if (startPosition < 0) + throw new ArgumentOutOfRangeException(nameof(startPosition)); + + if(direction == Direction.Backward && + startPosition != (long)StreamPosition.End) + throw new ArgumentOutOfRangeException(nameof(startPosition), "When reading backwards, the start position must be set to StreamPosition.End"); string headersPath = GetStreamPath(streamId, HeadersFileSuffix); string dataPath = GetStreamPath(streamId, DataFileSuffix); @@ -85,7 +93,7 @@ public async ValueTask> ReadAsync(Guid streamId, Direction d // first, read all the headers, which are stored sequentially // this will allow later on pulling the data for all the events in one go - var (headers, dataBufferSize) = await this.ReadHeadersAsync(headersPath, direction, skip, cancellationToken) + var (headers, dataBufferSize) = await this.ReadHeadersAsync(headersPath, direction, startPosition, cancellationToken) .ConfigureAwait(false); // we exit early if no headers found @@ -128,7 +136,7 @@ await dataStream.ReadAsync(dataBuffer, 0, dataBufferSize, cancellationToken) return results; } - public async ValueTask WriteAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) + public async ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) { string dataPath = GetStreamPath(streamId, DataFileSuffix); using var dataStream = new FileStream(dataPath, FileMode.Append, FileAccess.Write, FileShare.Read); diff --git a/src/EvenireDB/IEventsRepository.cs b/src/EvenireDB/IEventsRepository.cs index b4bb3be..a45dff9 100644 --- a/src/EvenireDB/IEventsRepository.cs +++ b/src/EvenireDB/IEventsRepository.cs @@ -2,7 +2,7 @@ { public interface IEventsRepository { - ValueTask> ReadAsync(Guid streamId, Direction direction = Direction.Forward, int skip = 0, CancellationToken cancellationToken = default); - ValueTask WriteAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default); + ValueTask> ReadAsync(Guid streamId, Direction direction = Direction.Forward, long startPosition = (long)StreamPosition.Start, CancellationToken cancellationToken = default); + ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs b/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs index c4918f9..45be753 100644 --- a/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs +++ b/tests/EvenireDB.Server.Tests/IncomingEventsSubscriberTests.cs @@ -10,7 +10,7 @@ public async Task Service_should_handle_exceptions_gracefully() { var channel = Channel.CreateBounded(10); var repo = Substitute.For(); - repo.WhenForAnyArgs(r => r.WriteAsync(Arg.Any(), null, default)) + repo.WhenForAnyArgs(r => r.AppendAsync(Arg.Any(), null, default)) .Throw(); var logger = Substitute.For>(); @@ -29,7 +29,7 @@ public async Task Service_should_handle_exceptions_gracefully() await sut.StopAsync(default); await repo.ReceivedWithAnyArgs(groups.Length) - .WriteAsync(Arg.Any(), null, default); + .AppendAsync(Arg.Any(), null, default); } } } \ No newline at end of file diff --git a/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs b/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs index 9cb5e8a..2e56d53 100644 --- a/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs +++ b/tests/EvenireDB.Tests/FileEventsRepositoryTests.cs @@ -12,14 +12,14 @@ public FileEventsRepositoryTests(DataFixture fixture) [Theory] [InlineData(1, 1)] [InlineData(10, 10)] - public async Task WriteAsync_should_write_events(int eventsCount, int expectedFileSize) + public async Task AppendAsync_should_write_events(int eventsCount, int expectedFileSize) { var events = _fixture.BuildEvents(eventsCount, new byte[] { 0x42 }); var streamId = Guid.NewGuid(); var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); - await sut.WriteAsync(streamId, events).ConfigureAwait(false); + await sut.AppendAsync(streamId, events).ConfigureAwait(false); var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat"); var bytes = File.ReadAllBytes(eventsFilePath); @@ -28,7 +28,7 @@ public async Task WriteAsync_should_write_events(int eventsCount, int expectedFi [Theory] [InlineData(10, 10, 100)] - public async Task WriteAsync_should_append_events(int batchesCount, int eventsPerBatch, int expectedFileSize) + public async Task AppendAsync_should_append_events(int batchesCount, int eventsPerBatch, int expectedFileSize) { var batches = Enumerable.Range(0, batchesCount) .Select(b => _fixture.BuildEvents(eventsPerBatch, new byte[] { 0x42 })) @@ -38,7 +38,7 @@ public async Task WriteAsync_should_append_events(int batchesCount, int eventsPe var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); foreach(var events in batches) - await sut.WriteAsync(streamId, events).ConfigureAwait(false); + await sut.AppendAsync(streamId, events).ConfigureAwait(false); var eventsFilePath = Path.Combine(config.BasePath, streamId + "_data.dat"); var bytes = File.ReadAllBytes(eventsFilePath); @@ -55,7 +55,7 @@ public async Task ReadAsync_should_read_events(int eventsCount) var streamId = Guid.NewGuid(); var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); - await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false); + await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false); var events = await sut.ReadAsync(streamId).ConfigureAwait(false); events.Should().NotBeNullOrEmpty() @@ -71,13 +71,27 @@ public async Task ReadAsync_should_read_events_backwards() var streamId = Guid.NewGuid(); var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); - await sut.WriteAsync(streamId, inputEvents).ConfigureAwait(false); + await sut.AppendAsync(streamId, inputEvents).ConfigureAwait(false); - var events = await sut.ReadAsync(streamId, direction: Direction.Backward).ConfigureAwait(false); + var events = await sut.ReadAsync(streamId, direction: Direction.Backward, startPosition: (long)StreamPosition.End).ConfigureAwait(false); events.Should().NotBeNullOrEmpty() .And.BeEquivalentTo(expectedEvents); } + [Fact] + public async Task ReadAsync_should_throw_when_reading_events_backwards_and_position_not_stream_end() + { + var inputEvents = _fixture.BuildEvents(142); + var expectedEvents = inputEvents.Reverse().ToArray(); + + var streamId = Guid.NewGuid(); + var config = _fixture.CreateRepoConfig(streamId); + var sut = new FileEventsRepository(config, new EventFactory(1000)); + await sut.AppendAsync(streamId, inputEvents).ConfigureAwait(false); + + await Assert.ThrowsAsync(async () => await sut.ReadAsync(streamId, direction: Direction.Backward, startPosition: 42)); + } + [Theory] [InlineData(10, 10, 100)] public async Task ReadAsync_should_read_subsequent_events(int batchesCount, int eventsPerBatch, int expectedFileSize) @@ -90,7 +104,7 @@ public async Task ReadAsync_should_read_subsequent_events(int batchesCount, int var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); foreach (var events in batches) - await sut.WriteAsync(streamId, events).ConfigureAwait(false); + await sut.AppendAsync(streamId, events).ConfigureAwait(false); var expectedEvents = batches.SelectMany(e => e).ToArray(); @@ -109,9 +123,9 @@ public async Task ReadAsync_should_read_events_when_offset_provided() var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); - await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false); + await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false); - var events = await sut.ReadAsync(streamId, skip: 42).ConfigureAwait(false); + var events = await sut.ReadAsync(streamId, startPosition: 42).ConfigureAwait(false); events.Should().NotBeNullOrEmpty() .And.HaveCount(100); events.ElementAt(0).Id.Should().Be(expectedEvent.Id); @@ -125,9 +139,9 @@ public async Task ReadAsync_should_read_only_page_size_events() var streamId = Guid.NewGuid(); var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); - await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false); + await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false); - var events = await sut.ReadAsync(streamId, skip: 0).ConfigureAwait(false); + var events = await sut.ReadAsync(streamId, startPosition: 0).ConfigureAwait(false); events.Should().NotBeNullOrEmpty() .And.HaveCount(100); } @@ -140,9 +154,9 @@ public async Task ReadAsync_should_return_empty_collection_when_skip_to_high() var streamId = Guid.NewGuid(); var config = _fixture.CreateRepoConfig(streamId); var sut = new FileEventsRepository(config, new EventFactory(1000)); - await sut.WriteAsync(streamId, expectedEvents).ConfigureAwait(false); + await sut.AppendAsync(streamId, expectedEvents).ConfigureAwait(false); - var events = await sut.ReadAsync(streamId, skip: 11).ConfigureAwait(false); + var events = await sut.ReadAsync(streamId, startPosition: 11).ConfigureAwait(false); events.Should().NotBeNull() .And.BeEmpty(); }