From 85e56c1c83f6ed07768b3b58a992b28d09d63921 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:29:36 -0700 Subject: [PATCH 1/4] Dispose clients on host shutdown rather than listener dispose --- .../Config/ServiceBusHostBuilderExtensions.cs | 1 + .../src/Listeners/ServiceBusListener.cs | 15 +----- .../Primitives/CachedClientCleanupService.cs | 46 +++++++++++++++++++ .../src/Primitives/MessagingProvider.cs | 14 +++--- .../tests/WebJobsServiceBusTestBase.cs | 7 +++ 5 files changed, 63 insertions(+), 20 deletions(-) create mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs index a36eb6143ce3e..7131723924936 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs @@ -110,6 +110,7 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action builder.Services.AddAzureClientsCore(); builder.Services.TryAddSingleton(); + builder.Services.AddHostedService(); builder.Services.AddSingleton(); #if NET6_0_OR_GREATER builder.Services.AddSingleton(); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs index 51515f33a3c87..f191204c7f7b8 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs @@ -305,12 +305,8 @@ public void Dispose() _stoppingCancellationTokenSource.Cancel(); _functionExecutionCancellationTokenSource.Cancel(); - if (_batchReceiver.IsValueCreated) - { -#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). - _batchReceiver.Value.CloseAsync(CancellationToken.None).GetAwaiter().GetResult(); -#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). - } + // ServiceBusClient and receivers will be disposed in CachedClientCleanupService, so as not to dispose it while it's still in + // use by other listeners. Processors are disposed here as they cannot be shared amongst listeners. if (_messageProcessor.IsValueCreated) { @@ -326,13 +322,6 @@ public void Dispose() #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). } - if (_client.IsValueCreated) - { -#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). - _client.Value.DisposeAsync().AsTask().GetAwaiter().GetResult(); -#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). - } - _stopAsyncSemaphore.Dispose(); _stoppingCancellationTokenSource.Dispose(); _concurrencyUpdateManager?.Dispose(); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs new file mode 100644 index 0000000000000..635d2e390e7bb --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.Azure.WebJobs.ServiceBus +{ + internal class CachedClientCleanupService : IHostedService + { + private readonly MessagingProvider _provider; + + public CachedClientCleanupService(MessagingProvider provider) + { + _provider = provider; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + foreach (var receiver in _provider.MessageReceiverCache.Values) + { + await receiver.DisposeAsync().ConfigureAwait(false); + } + _provider.MessageReceiverCache.Clear(); + + foreach (var sender in _provider.MessageSenderCache.Values) + { + await sender.DisposeAsync().ConfigureAwait(false); + } + _provider.MessageSenderCache.Clear(); + + foreach (var client in _provider.ClientCache.Values) + { + await client.DisposeAsync().ConfigureAwait(false); + } + _provider.ClientCache.Clear(); + _provider.ActionsCache.Clear(); + } + } +} \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs index 413be97668859..84af7e4585741 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs @@ -19,9 +19,9 @@ public class MessagingProvider { internal ServiceBusOptions Options { get; } - private readonly ConcurrentDictionary _messageSenderCache = new(); - private readonly ConcurrentDictionary _messageReceiverCache = new(); - private readonly ConcurrentDictionary _clientCache = new(); + internal ConcurrentDictionary MessageSenderCache { get; } = new(); + internal ConcurrentDictionary MessageReceiverCache { get; } = new(); + internal ConcurrentDictionary ClientCache { get; } = new(); internal ConcurrentDictionary ActionsCache { get; } = new(); /// @@ -49,7 +49,7 @@ protected internal virtual ServiceBusClient CreateClient(string connectionString Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString)); Argument.AssertNotNull(options, nameof(options)); - return _clientCache.GetOrAdd( + return ClientCache.GetOrAdd( connectionString, (_) => new ServiceBusClient(connectionString, options)); } @@ -71,7 +71,7 @@ protected internal virtual ServiceBusClient CreateClient(string fullyQualifiedNa Argument.AssertNotNull(credential, nameof(credential)); Argument.AssertNotNull(options, nameof(options)); - return _clientCache.GetOrAdd( + return ClientCache.GetOrAdd( fullyQualifiedNamespace, (_) => new ServiceBusClient(fullyQualifiedNamespace, credential, options)); } @@ -140,7 +140,7 @@ protected internal virtual ServiceBusSender CreateMessageSender(ServiceBusClient Argument.AssertNotNull(client, nameof(client)); Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); - return _messageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath)); + return MessageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath)); } /// @@ -159,7 +159,7 @@ protected internal virtual ServiceBusReceiver CreateBatchMessageReceiver(Service Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); Argument.AssertNotNull(options, nameof(options)); - return _messageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options)); + return MessageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options)); } /// diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs index 3b04778b1965e..5353e6ef5cd1a 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs @@ -14,6 +14,7 @@ using Azure.Messaging.ServiceBus.Administration; using Azure.Messaging.ServiceBus.Tests; using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Azure.WebJobs.ServiceBus; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -335,6 +336,12 @@ public async Task StopAsync(CancellationToken cancellationToken) QueueRuntimeProperties properties = await client.GetQueueRuntimePropertiesAsync(FirstQueueScope.QueueName, CancellationToken.None); Assert.AreEqual(ExpectedRemainingMessages, properties.ActiveMessageCount); + + var provider = _host.Services.GetService(); + Assert.AreEqual(0, provider.ClientCache.Count); + Assert.AreEqual(0, provider.MessageReceiverCache.Count); + Assert.AreEqual(0, provider.MessageSenderCache.Count); + Assert.AreEqual(0, provider.ActionsCache.Count); } private static bool IsError(LogMessage logMessage) From 21330b2f041fcdc526013820c530838dbf12b33e Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:58:44 -0700 Subject: [PATCH 2/4] Remove project reference --- .../Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index 64c5af298d6f3..b7a21a758c3fc 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -48,10 +48,4 @@ - - - - - - From 1dc4af3b08bf2394984062f930fa12172c669048 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Thu, 12 Oct 2023 12:01:51 -0700 Subject: [PATCH 3/4] Update version - for some reason the auto-update job failed --- .../src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index b7a21a758c3fc..77ac3d72bb2b8 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -3,10 +3,10 @@ netstandard2.0;net6.0 Microsoft Azure WebJobs SDK ServiceBus Extension - 5.13.0 + 5.14.0-beta.1 - 5.12.0 + 5.13.0 $(NoWarn);AZC0001;CS1591;SA1636;AZC0007;AZC0015 true true From a85cef55c539379a3643bd4f0f8eb136bd801c49 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Thu, 12 Oct 2023 13:58:38 -0700 Subject: [PATCH 4/4] Update change log --- .../CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md index bfcdf8c9c88a6..29a120c5f6947 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md @@ -1,5 +1,15 @@ # Release History +## 5.14.0-beta.1 (Unreleased) + +### Features Added + +### Breaking Changes + +### Bugs Fixed + +### Other Changes + ## 5.13.0 (2023-10-11) ### Features Added