Skip to content

Commit

Permalink
Expiring set lock free (Azure#6577)
Browse files Browse the repository at this point in the history
ConcurrentExpiringSet -> Lock-free solution to make ConcurrentExpiringSet not leak the cleanup task for the period of delayBetweenCleanups

Properly cancels the `Task.Delay` if cleanup was scheduled to not leak the task for the duration of the timeout
Removes the `.Keys` access that locks the whole concurrent dictionary
Makes sure to remove the key only if the snapshot matches by using collection remove
  • Loading branch information
danielmarbach authored and nemakam committed Jul 29, 2019
1 parent e9c533e commit 412acbe
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ protected override async Task OnClosingAsync()
}
await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
this.requestResponseLockedMessages.Close();
}

protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,97 @@
namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

sealed class ConcurrentExpiringSet<TKey>
{
readonly ConcurrentDictionary<TKey, DateTime> dictionary;
readonly object cleanupSynObject = new object();
bool cleanupScheduled;
static TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);
readonly ICollection<KeyValuePair<TKey, DateTime>> dictionaryAsCollection;
readonly CancellationTokenSource tokenSource = new CancellationTokenSource();
volatile TaskCompletionSource<bool> cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
int closeSignaled;
bool closed;
static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);

public ConcurrentExpiringSet()
{
this.dictionary = new ConcurrentDictionary<TKey, DateTime>();
this.dictionaryAsCollection = dictionary;
_ = CollectExpiredEntriesAsync(tokenSource.Token);
}

public void AddOrUpdate(TKey key, DateTime expiration)
{
this.ThrowIfClosed();

this.dictionary[key] = expiration;
this.ScheduleCleanup();
this.cleanupTaskCompletionSource.TrySetResult(true);
}

public bool Contains(TKey key)
{
this.ThrowIfClosed();

return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow;
}

void ScheduleCleanup()
public void Close()
{
lock (this.cleanupSynObject)
if (Interlocked.Exchange(ref this.closeSignaled, 1) != 0)
{
if (this.cleanupScheduled || this.dictionary.Count <= 0)
{
return;
}

this.cleanupScheduled = true;
Task.Run(async () => await this.CollectExpiredEntriesAsync().ConfigureAwait(false));
return;
}

this.closed = true;

this.tokenSource.Cancel();
this.cleanupTaskCompletionSource.TrySetCanceled();
this.dictionary.Clear();
this.tokenSource.Dispose();
}

async Task CollectExpiredEntriesAsync()
async Task CollectExpiredEntriesAsync(CancellationToken token)
{
await Task.Delay(delayBetweenCleanups);

lock (this.cleanupSynObject)
while (!token.IsCancellationRequested)
{
this.cleanupScheduled = false;
}
try
{
await this.cleanupTaskCompletionSource.Task.ConfigureAwait(false);
await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}

foreach (var key in this.dictionary.Keys)
{
if (DateTime.UtcNow > this.dictionary[key])
var isEmpty = true;
var utcNow = DateTime.UtcNow;
foreach (var kvp in this.dictionary)
{
this.dictionary.TryRemove(key, out _);
isEmpty = false;
var expiration = kvp.Value;
if (utcNow > expiration)
{
this.dictionaryAsCollection.Remove(kvp);
}
}

if (isEmpty)
{
this.cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
}

this.ScheduleCleanup();
void ThrowIfClosed()
{
if (closed)
{
throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new set instead.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Threading.Tasks;
using Xunit;

public class ConcurrentExpiringSetTests
Expand All @@ -27,5 +28,40 @@ public void Contains_returns_false_for_expired_entry()
set.AddOrUpdate("testKey", DateTime.UtcNow - TimeSpan.FromSeconds(5));
Assert.False(set.Contains("testKey"), "The set should return false for an expired entry.");
}

[Fact]
public void Contains_throws_after_close()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Close();

Assert.Throws<ObjectDisposedException>(() => set.Contains("testKey"));
}

[Fact]
public void AddOrUpdate_throws_after_close()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Close();

Assert.Throws<ObjectDisposedException>(() => set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)));
}

[Fact]
public void Close_is_idempotent_and_thread_safe()
{
var set = new ConcurrentExpiringSet<string>();

var ex = Record.Exception(() =>
{
set.Close();
set.Close();
Parallel.Invoke(() => set.Close(), () => set.Close());
});

Assert.Null(ex);
}
}
}

0 comments on commit 412acbe

Please sign in to comment.