From 412acbecfb4ac5f6f1d131b5c88ef4c250d8489b Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 30 Jul 2019 01:46:43 +0200 Subject: [PATCH] Expiring set lock free (#6577) ConcurrentExpiringSet -> Lock-free solution to make ConcurrentExpiringSet not leak the cleanup task for the period of delayBetweenCleanups Properly cancels the `Task.Delay` if cleanup was scheduled to not leak the task for the duration of the timeout Removes the `.Keys` access that locks the whole concurrent dictionary Makes sure to remove the key only if the snapshot matches by using collection remove --- .../src/Core/MessageReceiver.cs | 1 + .../src/Primitives/ConcurrentExpiringSet.cs | 81 +++++++++++++------ .../Primitives/ConcurrentExpiringSetTests.cs | 36 +++++++++ 3 files changed, 94 insertions(+), 24 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 3a0600888e337..a55266d092c40 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1008,6 +1008,7 @@ protected override async Task OnClosingAsync() } await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false); await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); + this.requestResponseLockedMessages.Close(); } protected virtual async Task> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 8d5263e977cb0..16da8f9c46078 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -4,64 +4,97 @@ namespace Microsoft.Azure.ServiceBus.Primitives { using System; + using System.Collections.Generic; using System.Collections.Concurrent; + using System.Threading; using System.Threading.Tasks; sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; - readonly object cleanupSynObject = new object(); - bool cleanupScheduled; - static TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); + readonly ICollection> dictionaryAsCollection; + readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); + volatile TaskCompletionSource cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + int closeSignaled; + bool closed; + static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); public ConcurrentExpiringSet() { this.dictionary = new ConcurrentDictionary(); + this.dictionaryAsCollection = dictionary; + _ = CollectExpiredEntriesAsync(tokenSource.Token); } public void AddOrUpdate(TKey key, DateTime expiration) { + this.ThrowIfClosed(); + this.dictionary[key] = expiration; - this.ScheduleCleanup(); + this.cleanupTaskCompletionSource.TrySetResult(true); } public bool Contains(TKey key) { + this.ThrowIfClosed(); + return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow; } - void ScheduleCleanup() + public void Close() { - lock (this.cleanupSynObject) + if (Interlocked.Exchange(ref this.closeSignaled, 1) != 0) { - if (this.cleanupScheduled || this.dictionary.Count <= 0) - { - return; - } - - this.cleanupScheduled = true; - Task.Run(async () => await this.CollectExpiredEntriesAsync().ConfigureAwait(false)); + return; } + + this.closed = true; + + this.tokenSource.Cancel(); + this.cleanupTaskCompletionSource.TrySetCanceled(); + this.dictionary.Clear(); + this.tokenSource.Dispose(); } - async Task CollectExpiredEntriesAsync() + async Task CollectExpiredEntriesAsync(CancellationToken token) { - await Task.Delay(delayBetweenCleanups); - - lock (this.cleanupSynObject) + while (!token.IsCancellationRequested) { - this.cleanupScheduled = false; - } + try + { + await this.cleanupTaskCompletionSource.Task.ConfigureAwait(false); + await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } - foreach (var key in this.dictionary.Keys) - { - if (DateTime.UtcNow > this.dictionary[key]) + var isEmpty = true; + var utcNow = DateTime.UtcNow; + foreach (var kvp in this.dictionary) { - this.dictionary.TryRemove(key, out _); + isEmpty = false; + var expiration = kvp.Value; + if (utcNow > expiration) + { + this.dictionaryAsCollection.Remove(kvp); + } + } + + if (isEmpty) + { + this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } } + } - this.ScheduleCleanup(); + void ThrowIfClosed() + { + if (closed) + { + throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new set instead."); + } } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs index 508f7a87660c3..e4aad25f73abe 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives { using Microsoft.Azure.ServiceBus.Primitives; using System; + using System.Threading.Tasks; using Xunit; public class ConcurrentExpiringSetTests @@ -27,5 +28,40 @@ public void Contains_returns_false_for_expired_entry() set.AddOrUpdate("testKey", DateTime.UtcNow - TimeSpan.FromSeconds(5)); Assert.False(set.Contains("testKey"), "The set should return false for an expired entry."); } + + [Fact] + public void Contains_throws_after_close() + { + var set = new ConcurrentExpiringSet(); + set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5)); + set.Close(); + + Assert.Throws(() => set.Contains("testKey")); + } + + [Fact] + public void AddOrUpdate_throws_after_close() + { + var set = new ConcurrentExpiringSet(); + set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5)); + set.Close(); + + Assert.Throws(() => set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5))); + } + + [Fact] + public void Close_is_idempotent_and_thread_safe() + { + var set = new ConcurrentExpiringSet(); + + var ex = Record.Exception(() => + { + set.Close(); + set.Close(); + Parallel.Invoke(() => set.Close(), () => set.Close()); + }); + + Assert.Null(ex); + } } } \ No newline at end of file