From c306e6e45d6cf56865a8cfdcb5a734129eb226d8 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 12:41:27 +0200 Subject: [PATCH 01/12] Make ConcurrentExpiringSet not leak the cleanup task for the period of delayBetweenCleanups --- .../src/Core/MessageReceiver.cs | 1 + .../src/Primitives/ConcurrentExpiringSet.cs | 38 ++++++++++++++----- .../Primitives/ConcurrentExpiringSetTests.cs | 22 +++++++++++ 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 3a0600888e337..d9f11aee16ca6 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1008,6 +1008,7 @@ protected override async Task OnClosingAsync() } await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false); await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); + this.requestResponseLockedMessages.Clear(); } protected virtual async Task> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 8d5263e977cb0..2829be9e9349b 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -5,14 +5,16 @@ namespace Microsoft.Azure.ServiceBus.Primitives { using System; using System.Collections.Concurrent; + using System.Threading; using System.Threading.Tasks; sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; readonly object cleanupSynObject = new object(); + CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer bool cleanupScheduled; - static TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); + static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); public ConcurrentExpiringSet() { @@ -22,7 +24,7 @@ public ConcurrentExpiringSet() public void AddOrUpdate(TKey key, DateTime expiration) { this.dictionary[key] = expiration; - this.ScheduleCleanup(); + this.ScheduleCleanup(tokenSource.Token); } public bool Contains(TKey key) @@ -30,7 +32,14 @@ public bool Contains(TKey key) return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow; } - void ScheduleCleanup() + public void Clear() + { + tokenSource.Cancel(); + dictionary.Clear(); + tokenSource = new CancellationTokenSource(); + } + + void ScheduleCleanup(CancellationToken token) { lock (this.cleanupSynObject) { @@ -40,17 +49,26 @@ void ScheduleCleanup() } this.cleanupScheduled = true; - Task.Run(async () => await this.CollectExpiredEntriesAsync().ConfigureAwait(false)); + _ = this.CollectExpiredEntriesAsync(token); } } - async Task CollectExpiredEntriesAsync() + async Task CollectExpiredEntriesAsync(CancellationToken token) { - await Task.Delay(delayBetweenCleanups); - - lock (this.cleanupSynObject) + try { - this.cleanupScheduled = false; + await Task.Delay(delayBetweenCleanups, token); + } + catch (OperationCanceledException) + { + return; + } + finally + { + lock (this.cleanupSynObject) + { + this.cleanupScheduled = false; + } } foreach (var key in this.dictionary.Keys) @@ -61,7 +79,7 @@ async Task CollectExpiredEntriesAsync() } } - this.ScheduleCleanup(); + this.ScheduleCleanup(token); } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs index 508f7a87660c3..d663bb76b7cce 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs @@ -27,5 +27,27 @@ public void Contains_returns_false_for_expired_entry() set.AddOrUpdate("testKey", DateTime.UtcNow - TimeSpan.FromSeconds(5)); Assert.False(set.Contains("testKey"), "The set should return false for an expired entry."); } + + [Fact] + public void Contains_returns_false_after_clear() + { + var set = new ConcurrentExpiringSet(); + set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5)); + set.Clear(); + + Assert.False(set.Contains("testKey"), "The set should return false after clear."); + } + + [Fact] + public void Contains_returns_false_after_clear_for_added_expired_keys() + { + var set = new ConcurrentExpiringSet(); + set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5)); + set.Clear(); + set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)); + + Assert.False(set.Contains("testKey1"), "The set should return false after clear."); + Assert.False(set.Contains("testKey2"), "The set should return false for an expired entry."); + } } } \ No newline at end of file From f537ee7c8c6cd6473e4168b239c27bdee3393d30 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 16:47:54 +0200 Subject: [PATCH 02/12] Remove .Keys access since it locks the whole dictionary --- .../src/Primitives/ConcurrentExpiringSet.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 2829be9e9349b..ce0acd3a68613 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -71,8 +71,9 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) } } - foreach (var key in this.dictionary.Keys) + foreach (var kvp in this.dictionary) { + var key = kvp.Key; if (DateTime.UtcNow > this.dictionary[key]) { this.dictionary.TryRemove(key, out _); From 052215ffaebd58b9fa1ae03601a83adaf825d3e6 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 16:55:13 +0200 Subject: [PATCH 03/12] Try to remove the snapshot of the value because it could have been updated in the meantime --- .../src/Primitives/ConcurrentExpiringSet.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index ce0acd3a68613..7a767fc2fb0a1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.ServiceBus.Primitives { using System; + using System.Collections.Generic; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; @@ -11,6 +12,7 @@ namespace Microsoft.Azure.ServiceBus.Primitives sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; + readonly ICollection> dictionaryAsCollection; readonly object cleanupSynObject = new object(); CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer bool cleanupScheduled; @@ -19,6 +21,7 @@ sealed class ConcurrentExpiringSet public ConcurrentExpiringSet() { this.dictionary = new ConcurrentDictionary(); + this.dictionaryAsCollection = dictionary; } public void AddOrUpdate(TKey key, DateTime expiration) @@ -73,10 +76,10 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) foreach (var kvp in this.dictionary) { - var key = kvp.Key; - if (DateTime.UtcNow > this.dictionary[key]) + var expiration = kvp.Value; + if (DateTime.UtcNow > expiration) { - this.dictionary.TryRemove(key, out _); + this.dictionaryAsCollection.Remove(kvp); } } From 4cd4d0d46e11c7e96ed0a61f0168d86fb96899cf Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 17:19:08 +0200 Subject: [PATCH 04/12] Cleanup style --- .../src/Primitives/ConcurrentExpiringSet.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 7a767fc2fb0a1..a700dc70ed8a1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -37,9 +37,9 @@ public bool Contains(TKey key) public void Clear() { - tokenSource.Cancel(); - dictionary.Clear(); - tokenSource = new CancellationTokenSource(); + this.tokenSource.Cancel(); + this.dictionary.Clear(); + this.tokenSource = new CancellationTokenSource(); } void ScheduleCleanup(CancellationToken token) From 4c123373d4bd1f10c22b1dc2fc694017a3cae074 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 17:24:27 +0200 Subject: [PATCH 05/12] Missing ConfigureAwait --- .../src/Primitives/ConcurrentExpiringSet.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index a700dc70ed8a1..9f41f3f4220b0 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -60,7 +60,7 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) { try { - await Task.Delay(delayBetweenCleanups, token); + await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); } catch (OperationCanceledException) { From 438048ad620d89b55043b87f13546dbf60cb30a1 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 17:35:30 +0200 Subject: [PATCH 06/12] Remove locks --- .../src/Primitives/ConcurrentExpiringSet.cs | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 9f41f3f4220b0..e55def58cfb93 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -13,21 +13,21 @@ sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; readonly ICollection> dictionaryAsCollection; - readonly object cleanupSynObject = new object(); CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer - bool cleanupScheduled; + volatile TaskCompletionSource cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); public ConcurrentExpiringSet() { this.dictionary = new ConcurrentDictionary(); this.dictionaryAsCollection = dictionary; + _ = CollectExpiredEntriesAsync(tokenSource.Token); } public void AddOrUpdate(TKey key, DateTime expiration) { this.dictionary[key] = expiration; - this.ScheduleCleanup(tokenSource.Token); + this.cleanupTaskCompletionSource.TrySetResult(true); } public bool Contains(TKey key) @@ -39,51 +39,48 @@ public void Clear() { this.tokenSource.Cancel(); this.dictionary.Clear(); + this.cleanupTaskCompletionSource.TrySetCanceled(); + this.ResetTaskCompletionSource(); this.tokenSource = new CancellationTokenSource(); + _ = CollectExpiredEntriesAsync(tokenSource.Token); } - void ScheduleCleanup(CancellationToken token) + void ResetTaskCompletionSource() { - lock (this.cleanupSynObject) + while (true) { - if (this.cleanupScheduled || this.dictionary.Count <= 0) + var tcs = this.cleanupTaskCompletionSource; + if (!tcs.Task.IsCompleted || Interlocked.CompareExchange(ref this.cleanupTaskCompletionSource, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), tcs) == tcs) { return; } - - this.cleanupScheduled = true; - _ = this.CollectExpiredEntriesAsync(token); } } async Task CollectExpiredEntriesAsync(CancellationToken token) { - try - { - await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); - } - catch (OperationCanceledException) + while (!token.IsCancellationRequested) { - return; - } - finally - { - lock (this.cleanupSynObject) + try { - this.cleanupScheduled = false; + await cleanupTaskCompletionSource.Task.ConfigureAwait(false); + ResetTaskCompletionSource(); + await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; } - } - foreach (var kvp in this.dictionary) - { - var expiration = kvp.Value; - if (DateTime.UtcNow > expiration) + foreach (var kvp in this.dictionary) { - this.dictionaryAsCollection.Remove(kvp); + var expiration = kvp.Value; + if (DateTime.UtcNow > expiration) + { + this.dictionaryAsCollection.Remove(kvp); + } } } - - this.ScheduleCleanup(token); } } } \ No newline at end of file From 7193e018b54adbd7879aabe735d471a90073b091 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 13 Jun 2019 20:02:24 +0200 Subject: [PATCH 07/12] Simplify and remove spinning because it is probably not necessary --- .../src/Primitives/ConcurrentExpiringSet.cs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index e55def58cfb93..5abdf018852dd 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -40,23 +40,11 @@ public void Clear() this.tokenSource.Cancel(); this.dictionary.Clear(); this.cleanupTaskCompletionSource.TrySetCanceled(); - this.ResetTaskCompletionSource(); + this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); this.tokenSource = new CancellationTokenSource(); _ = CollectExpiredEntriesAsync(tokenSource.Token); } - void ResetTaskCompletionSource() - { - while (true) - { - var tcs = this.cleanupTaskCompletionSource; - if (!tcs.Task.IsCompleted || Interlocked.CompareExchange(ref this.cleanupTaskCompletionSource, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), tcs) == tcs) - { - return; - } - } - } - async Task CollectExpiredEntriesAsync(CancellationToken token) { while (!token.IsCancellationRequested) @@ -64,7 +52,7 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) try { await cleanupTaskCompletionSource.Task.ConfigureAwait(false); - ResetTaskCompletionSource(); + this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); } catch (OperationCanceledException) From febefbacad2b4d15c61589d186bbdb6f050ec3b6 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 19 Jul 2019 10:27:17 +0200 Subject: [PATCH 08/12] Switch clear to close --- .../src/Core/MessageReceiver.cs | 2 +- .../src/Primitives/ConcurrentExpiringSet.cs | 31 +++++++++++++++---- .../Primitives/ConcurrentExpiringSetTests.cs | 30 +++++++++++++----- 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index d9f11aee16ca6..a55266d092c40 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1008,7 +1008,7 @@ protected override async Task OnClosingAsync() } await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false); await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); - this.requestResponseLockedMessages.Clear(); + this.requestResponseLockedMessages.Close(); } protected virtual async Task> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 5abdf018852dd..f54aa9f63d6d8 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -13,8 +13,10 @@ sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; readonly ICollection> dictionaryAsCollection; - CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer + readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); volatile TaskCompletionSource cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + volatile int closeSignaled; + bool closed; static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); public ConcurrentExpiringSet() @@ -26,23 +28,32 @@ public ConcurrentExpiringSet() public void AddOrUpdate(TKey key, DateTime expiration) { + this.ThrowIfClosed(); + this.dictionary[key] = expiration; this.cleanupTaskCompletionSource.TrySetResult(true); } public bool Contains(TKey key) { + this.ThrowIfClosed(); + return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow; } - public void Clear() + public void Close() { + if (Interlocked.Exchange(ref this.closeSignaled, 1) != 0) + { + return; + } + + this.closed = true; + this.tokenSource.Cancel(); - this.dictionary.Clear(); this.cleanupTaskCompletionSource.TrySetCanceled(); - this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - this.tokenSource = new CancellationTokenSource(); - _ = CollectExpiredEntriesAsync(tokenSource.Token); + this.dictionary.Clear(); + this.tokenSource.Dispose(); } async Task CollectExpiredEntriesAsync(CancellationToken token) @@ -70,5 +81,13 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) } } } + + void ThrowIfClosed() + { + if (closed) + { + throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new set instead."); + } + } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs index d663bb76b7cce..e4aad25f73abe 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives { using Microsoft.Azure.ServiceBus.Primitives; using System; + using System.Threading.Tasks; using Xunit; public class ConcurrentExpiringSetTests @@ -29,25 +30,38 @@ public void Contains_returns_false_for_expired_entry() } [Fact] - public void Contains_returns_false_after_clear() + public void Contains_throws_after_close() { var set = new ConcurrentExpiringSet(); set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5)); - set.Clear(); + set.Close(); - Assert.False(set.Contains("testKey"), "The set should return false after clear."); + Assert.Throws(() => set.Contains("testKey")); } [Fact] - public void Contains_returns_false_after_clear_for_added_expired_keys() + public void AddOrUpdate_throws_after_close() { var set = new ConcurrentExpiringSet(); set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5)); - set.Clear(); - set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)); + set.Close(); - Assert.False(set.Contains("testKey1"), "The set should return false after clear."); - Assert.False(set.Contains("testKey2"), "The set should return false for an expired entry."); + Assert.Throws(() => set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5))); + } + + [Fact] + public void Close_is_idempotent_and_thread_safe() + { + var set = new ConcurrentExpiringSet(); + + var ex = Record.Exception(() => + { + set.Close(); + set.Close(); + Parallel.Invoke(() => set.Close(), () => set.Close()); + }); + + Assert.Null(ex); } } } \ No newline at end of file From 127002dc97f68e5e1c9764f7c876677735305004 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Sat, 20 Jul 2019 00:20:01 +0200 Subject: [PATCH 09/12] Remove volatile --- .../src/Primitives/ConcurrentExpiringSet.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index f54aa9f63d6d8..0ac8995c28076 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -15,7 +15,7 @@ sealed class ConcurrentExpiringSet readonly ICollection> dictionaryAsCollection; readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); volatile TaskCompletionSource cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - volatile int closeSignaled; + int closeSignaled; bool closed; static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); From b00c8c719a4aa6ac2588237c97edb21a7e01153e Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Sat, 20 Jul 2019 00:20:18 +0200 Subject: [PATCH 10/12] Cleanup --- .../src/Primitives/ConcurrentExpiringSet.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 0ac8995c28076..0c5918317d9e1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -62,7 +62,7 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) { try { - await cleanupTaskCompletionSource.Task.ConfigureAwait(false); + await this.cleanupTaskCompletionSource.Task.ConfigureAwait(false); this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); } From 340f08817bb510f81121b9c46f2f59393331cff3 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Sat, 20 Jul 2019 00:20:36 +0200 Subject: [PATCH 11/12] UtcNow to var --- .../src/Primitives/ConcurrentExpiringSet.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 0c5918317d9e1..88216848e8259 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -71,10 +71,11 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) return; } + var utcNow = DateTime.UtcNow; foreach (var kvp in this.dictionary) { var expiration = kvp.Value; - if (DateTime.UtcNow > expiration) + if (utcNow > expiration) { this.dictionaryAsCollection.Remove(kvp); } From 89db3cef703e5ea6601d20a435666796d8364e3d Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Sat, 20 Jul 2019 00:28:58 +0200 Subject: [PATCH 12/12] Only create tcs when empty --- .../src/Primitives/ConcurrentExpiringSet.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 88216848e8259..16da8f9c46078 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -63,7 +63,6 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) try { await this.cleanupTaskCompletionSource.Task.ConfigureAwait(false); - this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); } catch (OperationCanceledException) @@ -71,15 +70,22 @@ async Task CollectExpiredEntriesAsync(CancellationToken token) return; } + var isEmpty = true; var utcNow = DateTime.UtcNow; foreach (var kvp in this.dictionary) { + isEmpty = false; var expiration = kvp.Value; if (utcNow > expiration) { this.dictionaryAsCollection.Remove(kvp); } } + + if (isEmpty) + { + this.cleanupTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } } }