diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index a50996a209177..bd97f97841c7a 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1008,6 +1008,7 @@ protected override async Task OnClosingAsync() } await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false); await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); + this.requestResponseLockedMessages.Clear(); } protected virtual async Task> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs index 8d5263e977cb0..9f41f3f4220b0 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Primitives/ConcurrentExpiringSet.cs @@ -4,25 +4,30 @@ namespace Microsoft.Azure.ServiceBus.Primitives { using System; + using System.Collections.Generic; using System.Collections.Concurrent; + using System.Threading; using System.Threading.Tasks; sealed class ConcurrentExpiringSet { readonly ConcurrentDictionary dictionary; + readonly ICollection> dictionaryAsCollection; readonly object cleanupSynObject = new object(); + CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer bool cleanupScheduled; - static TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); + static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30); public ConcurrentExpiringSet() { this.dictionary = new ConcurrentDictionary(); + this.dictionaryAsCollection = dictionary; } public void AddOrUpdate(TKey key, DateTime expiration) { this.dictionary[key] = expiration; - this.ScheduleCleanup(); + this.ScheduleCleanup(tokenSource.Token); } public bool Contains(TKey key) @@ -30,7 +35,14 @@ public bool Contains(TKey key) return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow; } - void ScheduleCleanup() + public void Clear() + { + this.tokenSource.Cancel(); + this.dictionary.Clear(); + this.tokenSource = new CancellationTokenSource(); + } + + void ScheduleCleanup(CancellationToken token) { lock (this.cleanupSynObject) { @@ -40,28 +52,38 @@ void ScheduleCleanup() } this.cleanupScheduled = true; - Task.Run(async () => await this.CollectExpiredEntriesAsync().ConfigureAwait(false)); + _ = this.CollectExpiredEntriesAsync(token); } } - async Task CollectExpiredEntriesAsync() + async Task CollectExpiredEntriesAsync(CancellationToken token) { - await Task.Delay(delayBetweenCleanups); - - lock (this.cleanupSynObject) + try { - this.cleanupScheduled = false; + await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + finally + { + lock (this.cleanupSynObject) + { + this.cleanupScheduled = false; + } } - foreach (var key in this.dictionary.Keys) + foreach (var kvp in this.dictionary) { - if (DateTime.UtcNow > this.dictionary[key]) + var expiration = kvp.Value; + if (DateTime.UtcNow > expiration) { - this.dictionary.TryRemove(key, out _); + this.dictionaryAsCollection.Remove(kvp); } } - this.ScheduleCleanup(); + this.ScheduleCleanup(token); } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs index 508f7a87660c3..d663bb76b7cce 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Primitives/ConcurrentExpiringSetTests.cs @@ -27,5 +27,27 @@ public void Contains_returns_false_for_expired_entry() set.AddOrUpdate("testKey", DateTime.UtcNow - TimeSpan.FromSeconds(5)); Assert.False(set.Contains("testKey"), "The set should return false for an expired entry."); } + + [Fact] + public void Contains_returns_false_after_clear() + { + var set = new ConcurrentExpiringSet(); + set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5)); + set.Clear(); + + Assert.False(set.Contains("testKey"), "The set should return false after clear."); + } + + [Fact] + public void Contains_returns_false_after_clear_for_added_expired_keys() + { + var set = new ConcurrentExpiringSet(); + set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5)); + set.Clear(); + set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)); + + Assert.False(set.Contains("testKey1"), "The set should return false after clear."); + Assert.False(set.Contains("testKey2"), "The set should return false for an expired entry."); + } } } \ No newline at end of file