From febefbacad2b4d15c61589d186bbdb6f050ec3b6 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 19 Jul 2019 10:27:17 +0200 Subject: [PATCH] Switch clear to close --- .../src/Core/MessageReceiver.cs | 2 +- .../src/Primitives/ConcurrentExpiringSet.cs | 31 +++++++++++++++---- .../Primitives/ConcurrentExpiringSetTests.cs | 30 +++++++++++++----- 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index d9f11aee16ca6..a55266d092c40 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1008,7 +1008,7 @@ protected override async Task OnClosingAsync() } await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false); await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); - this.requestResponseLockedMessages.Clear(); + this.requestResponseLockedMessages.Close(); } protected virtual async Task> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 5abdf018852dd..f54aa9f63d6d8 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -13,8 +13,10 @@ sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; readonly ICollection> dictionaryAsCollection; - CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer + readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); volatile TaskCompletionSource cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + volatile int closeSignaled; + bool closed; static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); public ConcurrentExpiringSet() @@ -26,23 +28,32 @@ public ConcurrentExpiringSet() public void AddOrUpdate(TKey key, DateTime expiration) { + this.ThrowIfClosed(); + this.dictionary[key] = expiration; this.cleanupTaskCompletionSource.TrySetResult(true); } public bool Contains(TKey key) { + this.ThrowIfClosed(); + return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow; } - public void Clear() + public void Close() { + if (Interlocked.Exchange(ref this.closeSignaled, 1) != 0) + { + return; + } + + this.closed = true; + this.tokenSource.Cancel(); - this.dictionary.Clear(); this.cleanupTaskCompletionSource.TrySetCanceled(); - this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - this.tokenSource = new CancellationTokenSource(); - _ = CollectExpiredEntriesAsync(tokenSource.Token); + this.dictionary.Clear(); + this.tokenSource.Dispose(); } async Task CollectExpiredEntriesAsync(CancellationToken token) @@ -70,5 +81,13 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) } } } + + void ThrowIfClosed() + { + if (closed) + { + throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new set instead."); + } + } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs index d663bb76b7cce..e4aad25f73abe 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives { using Microsoft.Azure.ServiceBus.Primitives; using System; + using System.Threading.Tasks; using Xunit; public class ConcurrentExpiringSetTests @@ -29,25 +30,38 @@ public void Contains_returns_false_for_expired_entry() } [Fact] - public void Contains_returns_false_after_clear() + public void Contains_throws_after_close() { var set = new ConcurrentExpiringSet(); set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5)); - set.Clear(); + set.Close(); - Assert.False(set.Contains("testKey"), "The set should return false after clear."); + Assert.Throws(() => set.Contains("testKey")); } [Fact] - public void Contains_returns_false_after_clear_for_added_expired_keys() + public void AddOrUpdate_throws_after_close() { var set = new ConcurrentExpiringSet(); set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5)); - set.Clear(); - set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)); + set.Close(); - Assert.False(set.Contains("testKey1"), "The set should return false after clear."); - Assert.False(set.Contains("testKey2"), "The set should return false for an expired entry."); + Assert.Throws(() => set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5))); + } + + [Fact] + public void Close_is_idempotent_and_thread_safe() + { + var set = new ConcurrentExpiringSet(); + + var ex = Record.Exception(() => + { + set.Close(); + set.Close(); + Parallel.Invoke(() => set.Close(), () => set.Close()); + }); + + Assert.Null(ex); } } } \ No newline at end of file