Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Sep 28, 2023
1 parent c3ea763 commit 7988211
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ public void GlobalSetup()
[Benchmark(Baseline = true)]
public async Task GetPageAsync_Baseline()
{
(await _sut.GetPageAsync(_streamId).ConfigureAwait(false)).Consume(_consumer);
(await _sut.ReadAsync(_streamId).ConfigureAwait(false)).Consume(_consumer);
}
}
4 changes: 2 additions & 2 deletions src/EvenireDB.Client/EventsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public EventsClient(HttpClient httpClient)
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
}

public async Task<IEnumerable<Event>> GetAsync(Guid streamId, int skip = 0, CancellationToken cancellationToken = default)
public async Task<IEnumerable<Event>> GetAsync(Guid streamId, int startPosition = 0, CancellationToken cancellationToken = default)
{
var response = await _httpClient.GetAsync($"/api/v1/events/{streamId}?skip={skip}", HttpCompletionOption.ResponseHeadersRead, cancellationToken)
var response = await _httpClient.GetAsync($"/api/v1/events/{streamId}?pos={startPosition}", HttpCompletionOption.ResponseHeadersRead, cancellationToken)
.ConfigureAwait(false);
response.EnsureSuccessStatusCode();
var results = await response.Content.ReadFromJsonAsync<Event[]>(cancellationToken: cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB.Client/IEventsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
public interface IEventsClient
{
Task AppendAsync(Guid streamId, IEnumerable<Event> events, CancellationToken cancellationToken = default);
Task<IEnumerable<Event>> GetAsync(Guid streamId, int skip = 0, CancellationToken cancellationToken = default);
Task<IEnumerable<Event>> GetAsync(Guid streamId, int startPosition = 0, CancellationToken cancellationToken = default);
}
}
4 changes: 2 additions & 2 deletions src/EvenireDB.Server/Routes/EventsRoutes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public static WebApplication MapEventsRoutes(this WebApplication app)
private static async ValueTask<IEnumerable<EventDTO>> GetEvents(
[FromServices] EventsProvider provider,
Guid streamId,
[FromQuery(Name = "skip")] int skip = 0)
[FromQuery(Name = "pos")] int startPosition = 0)
{
var events = await provider.GetPageAsync(streamId, skip).ConfigureAwait(false);
var events = await provider.ReadAsync(streamId, startPosition: startPosition).ConfigureAwait(false);
return (events is null) ?
Array.Empty<EventDTO>() :
events.Select(@event => EventDTO.FromModel(@event));
Expand Down
3 changes: 0 additions & 3 deletions src/EvenireDB/Direction.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@

using System.Runtime.CompilerServices;

namespace EvenireDB
{
//TODO: consider add versioning on events file
Expand Down
14 changes: 11 additions & 3 deletions src/EvenireDB/EventsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private async ValueTask<CachedEvents> EnsureCachedEventsAsync(Guid streamId, str
if (entry is null)
{
var persistedEvents = await this.ReadAllPersistedAsync(streamId, cancellationToken)
.ConfigureAwait(false);
.ConfigureAwait(false);
entry = new CachedEvents(persistedEvents, new SemaphoreSlim(1, 1));

_cache.Set(key, entry, _config.CacheDuration);
Expand All @@ -66,11 +66,19 @@ private async ValueTask<CachedEvents> EnsureCachedEventsAsync(Guid streamId, str
return entry;
}

public async ValueTask<IEnumerable<IEvent>> GetPageAsync(Guid streamId, long startPosition = 0, CancellationToken cancellationToken = default)
public async ValueTask<IEnumerable<IEvent>> ReadAsync(
Guid streamId,
Direction direction = Direction.Forward,
long startPosition = (long)StreamPosition.Start,
CancellationToken cancellationToken = default)
{
if (startPosition < 0)
throw new ArgumentOutOfRangeException(nameof(startPosition));

if (direction == Direction.Backward &&
startPosition != (long)StreamPosition.End)
throw new ArgumentOutOfRangeException(nameof(startPosition), "When reading backwards, the start position must be set to StreamPosition.End");

var key = streamId.ToString();

CachedEvents entry = await EnsureCachedEventsAsync(streamId, key, cancellationToken).ConfigureAwait(false);
Expand All @@ -83,7 +91,7 @@ public async ValueTask<IEnumerable<IEvent>> GetPageAsync(Guid streamId, long sta
var results = new IEvent[count];
var j = 0;
for (var i = startPosition; i < end; i++)
results[j++] = entry.Events[(int)i]; //TODO: I'm not very proud of this cast to int.
results[j++] = entry.Events[(int)i]; // TODO: I don't like this cast here.
entry.Events.Except(results);
return results;
}
Expand Down
1 change: 0 additions & 1 deletion src/EvenireDB/FileEventsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

namespace EvenireDB
{

public class FileEventsRepository : IEventsRepository
{
private readonly ConcurrentDictionary<string, byte[]> _eventTypes = new();
Expand Down
31 changes: 27 additions & 4 deletions tests/EvenireDB.Tests/EventsProviderTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Caching.Memory;
using System.IO;
using System.Threading.Channels;

namespace EvenireDB.Tests
Expand All @@ -8,19 +9,41 @@ public class EventsProviderTests
private readonly static byte[] _defaultData = new byte[] { 0x42 };

[Fact]
public async Task GetPageAsync_should_return_empty_collection_when_data_not_available()
public async Task ReadAsync_should_return_empty_collection_when_data_not_available()
{
var repo = Substitute.For<IEventsRepository>();
var cache = Substitute.For<IMemoryCache>();
var channel = Channel.CreateUnbounded<IncomingEventsGroup>();
var sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer);

var events = await sut.GetPageAsync(Guid.NewGuid());
var events = await sut.ReadAsync(Guid.NewGuid());
events.Should().NotBeNull().And.BeEmpty();
}

[Fact]
public async Task GetPageAsync_should_pull_data_from_repo_on_cache_miss()
public async Task ReadAsync_should_throw_when_reading_events_backwards_and_position_not_stream_end()
{
var repo = Substitute.For<IEventsRepository>();
var cache = Substitute.For<IMemoryCache>();
var channel = Channel.CreateUnbounded<IncomingEventsGroup>();
var sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer);

await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await sut.ReadAsync(Guid.NewGuid(), direction: Direction.Backward, startPosition: 42));
}

[Fact]
public async Task ReadAsync_should_throw_when_start_position_below_zero()
{
var repo = Substitute.For<IEventsRepository>();
var cache = Substitute.For<IMemoryCache>();
var channel = Channel.CreateUnbounded<IncomingEventsGroup>();
var sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer);

await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await sut.ReadAsync(Guid.NewGuid(), startPosition: -1));
}

[Fact]
public async Task ReadAsync_should_pull_data_from_repo_on_cache_miss()
{
var streamId = Guid.NewGuid();

Expand All @@ -39,7 +62,7 @@ public async Task GetPageAsync_should_pull_data_from_repo_on_cache_miss()
var channel = Channel.CreateUnbounded<IncomingEventsGroup>();
var sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer);

var events = await sut.GetPageAsync(streamId);
var events = await sut.ReadAsync(streamId);
events.Should().NotBeNullOrEmpty();
}

Expand Down

0 comments on commit 7988211

Please sign in to comment.