Skip to content

Commit

Permalink
added stream semaphores
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Jan 21, 2024
1 parent 5a4145f commit 44f3c76
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions src/EvenireDB/FileEventsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal class FileEventsRepository : IEventsRepository
{
private readonly FileEventsRepositoryConfig _config;
private readonly ConcurrentDictionary<string, byte[]> _eventTypes = new();
private readonly ConcurrentDictionary<Guid, SemaphoreSlim> _streamLocks = new();
private readonly IExtentInfoProvider _extentInfoProvider;

public FileEventsRepository(
Expand Down Expand Up @@ -112,12 +113,15 @@ await dataStream.ReadAsync(dataBufferMem, cancellationToken)
public async ValueTask AppendAsync(Guid streamId, IEnumerable<Event> events, CancellationToken cancellationToken = default)
{
var extentInfo = _extentInfoProvider.GetLatest(streamId);
using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Append, FileAccess.Write, FileShare.Read);
using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Append, FileAccess.Write, FileShare.Read);

var eventsCount = events.Count();

using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Append, FileAccess.Write, FileShare.Read);
using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Append, FileAccess.Write, FileShare.Read);
byte[] headerBuffer = ArrayPool<byte>.Shared.Rent(RawEventHeader.SIZE * eventsCount);

var semaphore = _streamLocks.GetOrAdd(streamId, static _ => new SemaphoreSlim(1, 1));
semaphore.Wait(cancellationToken);

try
{
Expand Down Expand Up @@ -150,6 +154,7 @@ await headersStream.WriteAsync(headerBuffer, 0, RawEventHeader.SIZE * eventsCoun
finally
{
ArrayPool<byte>.Shared.Return(headerBuffer);
semaphore.Release();
}

await dataStream.FlushAsync().ConfigureAwait(false);
Expand Down

0 comments on commit 44f3c76

Please sign in to comment.