From 44f3c76fda7838507fa0ab4b62b769f4beedc744 Mon Sep 17 00:00:00 2001 From: David Guida <1432872+mizrael@users.noreply.github.com> Date: Sun, 21 Jan 2024 14:26:52 -0500 Subject: [PATCH] added stream semaphores --- src/EvenireDB/FileEventsRepository.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/EvenireDB/FileEventsRepository.cs b/src/EvenireDB/FileEventsRepository.cs index 19b891f..677e054 100644 --- a/src/EvenireDB/FileEventsRepository.cs +++ b/src/EvenireDB/FileEventsRepository.cs @@ -12,6 +12,7 @@ internal class FileEventsRepository : IEventsRepository { private readonly FileEventsRepositoryConfig _config; private readonly ConcurrentDictionary _eventTypes = new(); + private readonly ConcurrentDictionary _streamLocks = new(); private readonly IExtentInfoProvider _extentInfoProvider; public FileEventsRepository( @@ -112,12 +113,15 @@ await dataStream.ReadAsync(dataBufferMem, cancellationToken) public async ValueTask AppendAsync(Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) { var extentInfo = _extentInfoProvider.GetLatest(streamId); - using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Append, FileAccess.Write, FileShare.Read); - using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Append, FileAccess.Write, FileShare.Read); var eventsCount = events.Count(); + using var dataStream = new FileStream(extentInfo.DataPath, FileMode.Append, FileAccess.Write, FileShare.Read); + using var headersStream = new FileStream(extentInfo.HeadersPath, FileMode.Append, FileAccess.Write, FileShare.Read); byte[] headerBuffer = ArrayPool.Shared.Rent(RawEventHeader.SIZE * eventsCount); + + var semaphore = _streamLocks.GetOrAdd(streamId, static _ => new SemaphoreSlim(1, 1)); + semaphore.Wait(cancellationToken); try { @@ -150,6 +154,7 @@ await headersStream.WriteAsync(headerBuffer, 0, RawEventHeader.SIZE * eventsCoun finally { ArrayPool.Shared.Return(headerBuffer); + semaphore.Release(); } await dataStream.FlushAsync().ConfigureAwait(false);