Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose clients on host shutdown rather than listener dispose #39225

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action

builder.Services.AddAzureClientsCore();
builder.Services.TryAddSingleton<MessagingProvider>();
builder.Services.AddHostedService<CachedClientCleanupService>();
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
builder.Services.AddSingleton<ServiceBusClientFactory>();
#if NET6_0_OR_GREATER
builder.Services.AddSingleton<SettlementService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,8 @@ public void Dispose()
_stoppingCancellationTokenSource.Cancel();
_functionExecutionCancellationTokenSource.Cancel();

if (_batchReceiver.IsValueCreated)
{
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
_batchReceiver.Value.CloseAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}
// ServiceBusClient and receivers will be disposed in CachedClientCleanupService, so as not to dispose it while it's still in
// use by other listeners. Processors are disposed here as they cannot be shared amongst listeners.

if (_messageProcessor.IsValueCreated)
{
Expand All @@ -326,13 +322,6 @@ public void Dispose()
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}

if (_client.IsValueCreated)
{
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
_client.Value.DisposeAsync().AsTask().GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}

_stopAsyncSemaphore.Dispose();
_stoppingCancellationTokenSource.Dispose();
_concurrencyUpdateManager?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;

namespace Microsoft.Azure.WebJobs.ServiceBus
{
internal class CachedClientCleanupService : IHostedService
{
private readonly MessagingProvider _provider;

public CachedClientCleanupService(MessagingProvider provider)
{
_provider = provider;
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
foreach (var receiver in _provider.MessageReceiverCache.Values)
{
await receiver.DisposeAsync().ConfigureAwait(false);
}
_provider.MessageReceiverCache.Clear();

foreach (var sender in _provider.MessageSenderCache.Values)
{
await sender.DisposeAsync().ConfigureAwait(false);
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
}
_provider.MessageSenderCache.Clear();

foreach (var client in _provider.ClientCache.Values)
{
await client.DisposeAsync().ConfigureAwait(false);
}
_provider.ClientCache.Clear();
_provider.ActionsCache.Clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public class MessagingProvider
{
internal ServiceBusOptions Options { get; }

private readonly ConcurrentDictionary<string, ServiceBusSender> _messageSenderCache = new();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _messageReceiverCache = new();
private readonly ConcurrentDictionary<string, ServiceBusClient> _clientCache = new();
internal ConcurrentDictionary<string, ServiceBusSender> MessageSenderCache { get; } = new();
internal ConcurrentDictionary<string, ServiceBusReceiver> MessageReceiverCache { get; } = new();
internal ConcurrentDictionary<string, ServiceBusClient> ClientCache { get; } = new();
internal ConcurrentDictionary<string, (ServiceBusReceivedMessage Message, ServiceBusMessageActions Actions)> ActionsCache { get; } = new();

/// <summary>
Expand Down Expand Up @@ -49,7 +49,7 @@ protected internal virtual ServiceBusClient CreateClient(string connectionString
Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
Argument.AssertNotNull(options, nameof(options));

return _clientCache.GetOrAdd(
return ClientCache.GetOrAdd(
connectionString,
(_) => new ServiceBusClient(connectionString, options));
}
Expand All @@ -71,7 +71,7 @@ protected internal virtual ServiceBusClient CreateClient(string fullyQualifiedNa
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNull(options, nameof(options));

return _clientCache.GetOrAdd(
return ClientCache.GetOrAdd(
fullyQualifiedNamespace,
(_) => new ServiceBusClient(fullyQualifiedNamespace, credential, options));
}
Expand Down Expand Up @@ -140,7 +140,7 @@ protected internal virtual ServiceBusSender CreateMessageSender(ServiceBusClient
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

return _messageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath));
return MessageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath));
}

/// <summary>
Expand All @@ -159,7 +159,7 @@ protected internal virtual ServiceBusReceiver CreateBatchMessageReceiver(Service
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));
Argument.AssertNotNull(options, nameof(options));

return _messageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options));
return MessageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Azure.Messaging.ServiceBus.Administration;
using Azure.Messaging.ServiceBus.Tests;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -335,6 +336,12 @@ public async Task StopAsync(CancellationToken cancellationToken)

QueueRuntimeProperties properties = await client.GetQueueRuntimePropertiesAsync(FirstQueueScope.QueueName, CancellationToken.None);
Assert.AreEqual(ExpectedRemainingMessages, properties.ActiveMessageCount);

var provider = _host.Services.GetService<MessagingProvider>();
Assert.AreEqual(0, provider.ClientCache.Count);
Assert.AreEqual(0, provider.MessageReceiverCache.Count);
Assert.AreEqual(0, provider.MessageSenderCache.Count);
Assert.AreEqual(0, provider.ActionsCache.Count);
}

private static bool IsError(LogMessage logMessage)
Expand Down
Loading