From 762a7b2c21d486bfa9853af07586a5621b689975 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Mon, 5 Apr 2021 20:08:28 -0700 Subject: [PATCH] Support AAD auth (#19892) * Support AAD auth * Fix test * Add back provider * Fix processor binding * API and change log * Fix test * Undo test change * stop the hosts * try increasing timeout * volatile * unused code * debug * fix * Add debug statements * debug * semaphore * Fix for real * add back debug info * Add missing StopAsync call * remove logging --- .../CHANGELOG.md | 9 +- .../README.md | 19 ++ ...bs.Extensions.ServiceBus.netstandard2.0.cs | 24 +- .../ServiceBusAttributeBindingProvider.cs | 51 +--- .../src/Bindings/ServiceBusBinding.cs | 25 +- .../StringToServiceBusEntityConverter.cs | 16 +- .../src/Config/ConfigurationExtensions.cs | 29 +++ .../src/Config/ServiceBusClientFactory.cs | 101 ++++++++ .../ServiceBusExtensionConfigProvider.cs | 17 +- .../Config/ServiceBusHostBuilderExtensions.cs | 10 +- .../src/Config/ServiceBusOptions.cs | 5 - .../src/Listeners/ServiceBusListener.cs | 104 ++++---- .../Listeners/ServiceBusListenerFactory.cs | 47 ---- .../src/Listeners/ServiceBusScaleMonitor.cs | 9 +- ...Azure.WebJobs.Extensions.ServiceBus.csproj | 2 + .../src/Primitives/MessageProcessor.cs | 13 +- .../src/Primitives/MessagingProvider.cs | 237 ++++++------------ .../src/Primitives/SessionMessageProcessor.cs | 13 +- .../src/ServiceBusAccount.cs | 53 ---- ...rviceBusTriggerAttributeBindingProvider.cs | 46 ++-- ...ServiceBusAttributeBindingProviderTests.cs | 14 +- ...BusTriggerAttributeBindingProviderTests.cs | 26 +- .../Config/ServiceBusClientFactoryTests.cs | 51 ++++ .../ServiceBusHostBuilderExtensionsTests.cs | 43 +--- .../tests/ConfigurationUtilities.cs | 16 ++ .../Listeners/ServiceBusListenerTests.cs | 34 ++- .../Listeners/ServiceBusScaleMonitorTests.cs | 83 ++++-- .../tests/MessageProcessorTests.cs | 2 +- .../tests/MessagingProviderTests.cs | 77 +++--- .../tests/ServiceBusAccountTests.cs | 59 ----- .../tests/ServiceBusEndToEndTests.cs | 98 +++++--- .../tests/ServiceBusSessionsEndToEndTests.cs | 104 ++++---- .../tests/WebJobsServiceBusTestBase.cs | 55 ++-- 33 files changed, 769 insertions(+), 723 deletions(-) create mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ConfigurationExtensions.cs create mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs delete mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListenerFactory.cs delete mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/ServiceBusAccount.cs create mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusClientFactoryTests.cs create mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ConfigurationUtilities.cs delete mode 100644 sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusAccountTests.cs diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md index d729562a4a464..ffdc45ea4f2f0 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md @@ -1,7 +1,14 @@ # Release History -## 5.0.0-beta.2 (Unreleased) +## 5.0.0-beta.2 (2021-04-07) +### Added +- Add AAD support + +### Breaking Changes +- Changed the API signatures for the methods in `MessagingProvider`. +- Added `receiver` parameter to `MessageProcessor` constructor. +- Added `client` parameter to `SessionMessageProcessor` constructor. ## 5.0.0-beta.1 (2021-03-23) diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md index 1b93f1b582ff9..849e6bff4a273 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md @@ -41,6 +41,25 @@ For local development, use the `local.settings.json` file to store the connectio When deployed, use the [application settings](https://docs.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to set the connection string. +#### Managed identity authentication + +If your environment has [managed identity](https://docs.microsoft.com/azure/app-service/overview-managed-identity?tabs=dotnet) enabled you can use it to authenticate the Service Bus extension. +To use managed identity provide the `__fullyQualifiedNamespace` configuration setting. + +```json +{ + "Values": { + "__fullyQualifiedNamespace": ".servicebus.windows.net" + } +} +``` + +Or in the case of deployed app set the same setting in [application settings](https://docs.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings): + +``` +__fullyQualifiedNamespace=.servicebus.windows.net +``` + ## Key concepts diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs index 95daadf1fdcb3..e96e56ec06212 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/api/Microsoft.Azure.WebJobs.Extensions.ServiceBus.netstandard2.0.cs @@ -51,19 +51,24 @@ public enum EntityType } public partial class MessageProcessor { - public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { } + public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor, Azure.Messaging.ServiceBus.ServiceBusReceiver receiver) { } + protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } set { } } + protected internal Azure.Messaging.ServiceBus.ServiceBusReceiver Receiver { get { throw null; } set { } } public virtual System.Threading.Tasks.Task BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; } public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; } } public partial class MessagingProvider { - public MessagingProvider(Microsoft.Extensions.Options.IOptions serviceBusOptions) { } - public virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(string entityPath, string connectionString) { throw null; } + protected MessagingProvider() { } + public MessagingProvider(Microsoft.Extensions.Options.IOptions options) { } + public virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; } public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string connectionString) { throw null; } - public virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(string entityPath, string connectionString) { throw null; } - public virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(string entityPath, string connectionString) { throw null; } - public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(string entityPath, string connectionString) { throw null; } - public virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(string entityPath, string connectionString) { throw null; } + public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential) { throw null; } + public virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; } + public virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; } + public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; } + public virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; } + public virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; } } public partial class ServiceBusMessageActions { @@ -78,7 +83,6 @@ public partial class ServiceBusOptions : Microsoft.Azure.WebJobs.Hosting.IOption { public ServiceBusOptions() { } public bool AutoCompleteMessages { get { throw null; } set { } } - public string ConnectionString { get { throw null; } set { } } public System.Func ExceptionHandler { get { throw null; } set { } } public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } } public int MaxConcurrentCalls { get { throw null; } set { } } @@ -104,7 +108,9 @@ public void Configure(Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { } } public partial class SessionMessageProcessor { - public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { } + public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { } + protected internal Azure.Messaging.ServiceBus.ServiceBusClient Client { get { throw null; } set { } } + protected internal Azure.Messaging.ServiceBus.ServiceBusSessionProcessor Processor { get { throw null; } set { } } public virtual System.Threading.Tasks.Task BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; } public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusSessionMessageActions sessionActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusAttributeBindingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusAttributeBindingProvider.cs index 8e41243512e05..d55db4da7523c 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusAttributeBindingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusAttributeBindingProvider.cs @@ -6,8 +6,10 @@ using System.Globalization; using System.Reflection; using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Bindings; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings @@ -25,33 +27,17 @@ internal class ServiceBusAttributeBindingProvider : IBindingProvider new AsyncCollectorArgumentBindingProvider()); private readonly INameResolver _nameResolver; - private readonly ServiceBusOptions _options; - private readonly IConfiguration _configuration; private readonly MessagingProvider _messagingProvider; + private readonly ServiceBusClientFactory _clientFactory; - public ServiceBusAttributeBindingProvider(INameResolver nameResolver, ServiceBusOptions options, IConfiguration configuration, MessagingProvider messagingProvider) + public ServiceBusAttributeBindingProvider( + INameResolver nameResolver, + MessagingProvider messagingProvider, + ServiceBusClientFactory clientFactory) { - if (nameResolver == null) - { - throw new ArgumentNullException(nameof(nameResolver)); - } - if (configuration == null) - { - throw new ArgumentNullException(nameof(configuration)); - } - if (options == null) - { - throw new ArgumentNullException(nameof(options)); - } - if (messagingProvider == null) - { - throw new ArgumentNullException(nameof(messagingProvider)); - } - - _nameResolver = nameResolver; - _options = options; - _configuration = configuration; - _messagingProvider = messagingProvider; + _nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver)); + _messagingProvider = messagingProvider ?? throw new ArgumentNullException(nameof(messagingProvider)); + _clientFactory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory)); } public Task TryCreateAsync(BindingProviderContext context) @@ -69,7 +55,7 @@ public Task TryCreateAsync(BindingProviderContext context) return Task.FromResult(null); } - string queueOrTopicName = Resolve(attribute.QueueOrTopicName); + string queueOrTopicName = _nameResolver.ResolveWholeString(attribute.QueueOrTopicName); IBindableServiceBusPath path = BindableServiceBusPath.Create(queueOrTopicName); ValidateContractCompatibility(path, context.BindingDataContract); @@ -79,10 +65,9 @@ public Task TryCreateAsync(BindingProviderContext context) throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, "Can't bind ServiceBus to type '{0}'.", parameter.ParameterType)); } - attribute.Connection = Resolve(attribute.Connection); - ServiceBusAccount account = new ServiceBusAccount(_options, _configuration, attribute); + attribute.Connection = _nameResolver.ResolveWholeString(attribute.Connection); - IBinding binding = new ServiceBusBinding(parameter.Name, argumentBinding, account, path, attribute, _messagingProvider); + IBinding binding = new ServiceBusBinding(parameter.Name, argumentBinding, path, attribute, _messagingProvider, _clientFactory); return Task.FromResult(binding); } @@ -105,15 +90,5 @@ private static void ValidateContractCompatibility(IBindableServiceBusPath path, } } } - - private string Resolve(string queueName) - { - if (_nameResolver == null) - { - return queueName; - } - - return _nameResolver.ResolveWholeString(queueName); - } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusBinding.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusBinding.cs index ba3b14e3e4da0..f73ba42fe10b9 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusBinding.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/ServiceBusBinding.cs @@ -4,6 +4,7 @@ using System; using System.Globalization; using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Converters; using Microsoft.Azure.WebJobs.Host.Protocols; @@ -14,21 +15,27 @@ internal class ServiceBusBinding : IBinding { private readonly string _parameterName; private readonly IArgumentBinding _argumentBinding; - private readonly ServiceBusAccount _account; private readonly IBindableServiceBusPath _path; private readonly IAsyncObjectToTypeConverter _converter; - private readonly EntityType _entityType; private readonly MessagingProvider _messagingProvider; - - public ServiceBusBinding(string parameterName, IArgumentBinding argumentBinding, ServiceBusAccount account, IBindableServiceBusPath path, ServiceBusAttribute attr, MessagingProvider messagingProvider) + private readonly ServiceBusClientFactory _clientFactory; + private readonly ServiceBusAttribute _attribute; + + public ServiceBusBinding( + string parameterName, + IArgumentBinding argumentBinding, + IBindableServiceBusPath path, + ServiceBusAttribute attribute, + MessagingProvider messagingProvider, + ServiceBusClientFactory clientFactory) { _parameterName = parameterName; _argumentBinding = argumentBinding; - _account = account; _path = path; - _entityType = attr.EntityType; _messagingProvider = messagingProvider; - _converter = new OutputConverter(new StringToServiceBusEntityConverter(account, _path, _entityType, _messagingProvider)); + _clientFactory = clientFactory; + _attribute = attribute; + _converter = new OutputConverter(new StringToServiceBusEntityConverter(_attribute, _path, _messagingProvider, _clientFactory)); } public bool FromAttribute @@ -41,12 +48,12 @@ public async Task BindAsync(BindingContext context) context.CancellationToken.ThrowIfCancellationRequested(); string boundQueueName = _path.Bind(context.BindingData); - var messageSender = _messagingProvider.CreateMessageSender(boundQueueName, _account.ConnectionString); + var messageSender = _messagingProvider.CreateMessageSender(_clientFactory.CreateClientFromSetting(_attribute.Connection), boundQueueName); var entity = new ServiceBusEntity { MessageSender = messageSender, - EntityType = _entityType + EntityType = _attribute.EntityType }; return await BindAsync(entity, context.ValueContext).ConfigureAwait(false); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/StringToServiceBusEntityConverter.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/StringToServiceBusEntityConverter.cs index 284e3da42fcd4..3774492ca3f70 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/StringToServiceBusEntityConverter.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Bindings/StringToServiceBusEntityConverter.cs @@ -1,7 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using System.Threading; using System.Threading.Tasks; @@ -9,17 +9,19 @@ namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings { internal class StringToServiceBusEntityConverter : IAsyncConverter { - private readonly ServiceBusAccount _account; + private readonly ServiceBusAttribute _attribute; private readonly IBindableServiceBusPath _defaultPath; private readonly EntityType _entityType; private readonly MessagingProvider _messagingProvider; + private readonly ServiceBusClientFactory _clientFactory; - public StringToServiceBusEntityConverter(ServiceBusAccount account, IBindableServiceBusPath defaultPath, EntityType entityType, MessagingProvider messagingProvider) + public StringToServiceBusEntityConverter(ServiceBusAttribute attribute, IBindableServiceBusPath defaultPath, MessagingProvider messagingProvider, ServiceBusClientFactory clientFactory) { - _account = account; + _attribute = attribute; _defaultPath = defaultPath; - _entityType = entityType; + _entityType = _attribute.EntityType; _messagingProvider = messagingProvider; + _clientFactory = clientFactory; } public Task ConvertAsync(string input, CancellationToken cancellationToken) @@ -27,7 +29,7 @@ public Task ConvertAsync(string input, CancellationToken cance string queueOrTopicName; // For convenience, treat an an empty string as a request for the default value. - if (String.IsNullOrEmpty(input) && _defaultPath.IsBound) + if (string.IsNullOrEmpty(input) && _defaultPath.IsBound) { queueOrTopicName = _defaultPath.Bind(null); } @@ -37,7 +39,7 @@ public Task ConvertAsync(string input, CancellationToken cance } cancellationToken.ThrowIfCancellationRequested(); - var messageSender = _messagingProvider.CreateMessageSender(queueOrTopicName, _account.ConnectionString); + var messageSender = _messagingProvider.CreateMessageSender(_clientFactory.CreateClientFromSetting(_attribute.Connection), queueOrTopicName); var entity = new ServiceBusEntity { diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ConfigurationExtensions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ConfigurationExtensions.cs new file mode 100644 index 0000000000000..53b8ea3afcabf --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ConfigurationExtensions.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Microsoft.Azure.WebJobs.Extensions.Clients.Shared; +using Microsoft.Extensions.Configuration; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config +{ + internal static class ConfigurationExtensions + { + // The order of priority is intentionally flipped from what is defined in + // WebJobsConfigurationExtensions.GetWebJobsConnectionStringSection for back compat with the Track 1 + // Service Bus extensions. + public static IConfigurationSection GetWebJobsConnectionStringSectionServiceBus(this IConfiguration configuration, string connectionStringName) + { + // first try a direct unprefixed lookup + IConfigurationSection section = WebJobsConfigurationExtensions.GetConnectionStringOrSetting(configuration, connectionStringName); + + if (!section.Exists()) + { + // next try prefixing + string prefixedConnectionStringName = WebJobsConfigurationExtensions.GetPrefixedConnectionStringName(connectionStringName); + section = WebJobsConfigurationExtensions.GetConnectionStringOrSetting(configuration, prefixedConnectionStringName); + } + + return section; + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs new file mode 100644 index 0000000000000..fad057852d653 --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Azure.Core; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Azure.WebJobs.Extensions.Clients.Shared; +using Microsoft.Azure.WebJobs.ServiceBus; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using System; +using Constants = Microsoft.Azure.WebJobs.ServiceBus.Constants; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config +{ + internal class ServiceBusClientFactory + { + private readonly IConfiguration _configuration; + private readonly AzureComponentFactory _componentFactory; + private readonly MessagingProvider _messagingProvider; + + public ServiceBusClientFactory( + IConfiguration configuration, + AzureComponentFactory componentFactory, + MessagingProvider messagingProvider) + { + _configuration = configuration; + _componentFactory = componentFactory; + _messagingProvider = messagingProvider; + } + + internal ServiceBusClient CreateClientFromSetting(string connection) + { + var connectionInfo = ResolveConnectionInformation(connection); + + return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString) + : _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential); + } + + internal ServiceBusAdministrationClient CreateAdministrationClient(string connection) + { + var connectionInfo = ResolveConnectionInformation(connection); + if (connectionInfo.ConnectionString != null) + { + return new ServiceBusAdministrationClient(connectionInfo.ConnectionString); + } + else + { + return new ServiceBusAdministrationClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential); + } + } + + private ServiceBusConnectionInformation ResolveConnectionInformation(string connection) + { + var connectionSetting = connection ?? Constants.DefaultConnectionStringName; + IConfigurationSection connectionSection = _configuration.GetWebJobsConnectionStringSectionServiceBus(connectionSetting); + if (!connectionSection.Exists()) + { + // Not found + throw new InvalidOperationException($"Service Bus account connection string '{connectionSetting}' does not exist. " + + $"Make sure that it is a defined App Setting."); + } + + if (!string.IsNullOrWhiteSpace(connectionSection.Value)) + { + return new ServiceBusConnectionInformation(connectionSection.Value); + } + else + { + string fullyQualifiedNamespace = connectionSection["fullyQualifiedNamespace"]; + if (string.IsNullOrWhiteSpace(fullyQualifiedNamespace)) + { + // Not found + throw new InvalidOperationException($"Connection should have an 'fullyQualifiedNamespace' property or be a " + + $"string representing a connection string."); + } + + TokenCredential credential = _componentFactory.CreateTokenCredential(connectionSection); + return new ServiceBusConnectionInformation(fullyQualifiedNamespace, credential); + } + } + + private record ServiceBusConnectionInformation + { + public ServiceBusConnectionInformation(string connectionString) + { + ConnectionString = connectionString; + } + + public ServiceBusConnectionInformation(string fullyQualifiedNamespace, TokenCredential tokenCredential) + { + FullyQualifiedNamespace = fullyQualifiedNamespace; + Credential = tokenCredential; + } + + public string ConnectionString { get; } + public string FullyQualifiedNamespace { get; } + public TokenCredential Credential { get; } + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs index 56f887b422f0d..7daa5b19b24eb 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusExtensionConfigProvider.cs @@ -5,11 +5,13 @@ using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Microsoft.Azure.WebJobs.Description; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Config; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Azure.WebJobs.ServiceBus.Bindings; using Microsoft.Azure.WebJobs.ServiceBus.Triggers; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -24,29 +26,30 @@ namespace Microsoft.Azure.WebJobs.ServiceBus.Config internal class ServiceBusExtensionConfigProvider : IExtensionConfigProvider { private readonly INameResolver _nameResolver; - private readonly IConfiguration _configuration; private readonly ILoggerFactory _loggerFactory; private readonly ServiceBusOptions _options; private readonly MessagingProvider _messagingProvider; private readonly IConverterManager _converterManager; + private readonly ServiceBusClientFactory _clientFactory; /// /// Creates a new instance. /// ///// The to use./> - public ServiceBusExtensionConfigProvider(IOptions options, + public ServiceBusExtensionConfigProvider( + IOptions options, MessagingProvider messagingProvider, INameResolver nameResolver, - IConfiguration configuration, ILoggerFactory loggerFactory, - IConverterManager converterManager) + IConverterManager converterManager, + ServiceBusClientFactory clientFactory) { _options = options.Value; _messagingProvider = messagingProvider; _nameResolver = nameResolver; - _configuration = configuration; _loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; _converterManager = converterManager; + _clientFactory = clientFactory; } /// @@ -82,12 +85,12 @@ public void Initialize(ExtensionConfigContext context) .AddOpenConverter(typeof(MessageToPocoConverter<>)); // register our trigger binding provider - ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(_nameResolver, _options, _messagingProvider, _configuration, _loggerFactory, _converterManager); + ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(_nameResolver, _options, _messagingProvider, _loggerFactory, _converterManager, _clientFactory); context.AddBindingRule() .BindToTrigger(triggerBindingProvider); // register our binding provider - ServiceBusAttributeBindingProvider bindingProvider = new ServiceBusAttributeBindingProvider(_nameResolver, _options, _configuration, _messagingProvider); + ServiceBusAttributeBindingProvider bindingProvider = new ServiceBusAttributeBindingProvider(_nameResolver, _messagingProvider, _clientFactory); context.AddBindingRule().Bind(bindingProvider); } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs index 47fbe890fb5ca..6d9a184d8393c 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs @@ -3,8 +3,10 @@ using System; using Microsoft.Azure.WebJobs; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.ServiceBus; using Microsoft.Azure.WebJobs.ServiceBus.Config; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -40,9 +42,6 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action builder.AddExtension() .ConfigureOptions((config, path, options) => { - options.ConnectionString = config.GetConnectionString(Constants.DefaultConnectionStringName) ?? - config[Constants.DefaultConnectionSettingStringName]; - IConfigurationSection section = config.GetSection(path); bool? autoCompleteMessages = section.GetValue( @@ -82,8 +81,9 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action configure(options); }); - builder.Services.TryAddSingleton(); - + builder.Services.AddAzureClientsCore(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); return builder; } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs index 0c39a4a4fc7ee..d3ac2f9e5ac78 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs @@ -24,11 +24,6 @@ public ServiceBusOptions() { } - /// - /// Gets or sets the Azure ServiceBus connection string. - /// - public string ConnectionString { get; set; } - /// /// Gets or sets the PrefetchCount that will be used when receiving messages. The default value is 0. /// diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs index d51b7e0aa96b9..c8d4407291f59 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs @@ -7,11 +7,17 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Azure.Core; using Azure.Core.Pipeline; using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.Clients.Shared; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; +using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners @@ -25,26 +31,34 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider private readonly string _entityPath; private readonly bool _isSessionsEnabled; private readonly CancellationTokenSource _cancellationTokenSource; - private readonly MessageProcessor _messageProcessor; - private readonly ServiceBusAccount _serviceBusAccount; private readonly ServiceBusOptions _serviceBusOptions; private readonly ILoggerFactory _loggerFactory; private readonly bool _singleDispatch; private readonly ILogger _logger; + private readonly Lazy _messageProcessor; private Lazy _batchReceiver; - private Lazy _sessionClient; + private Lazy _client; + private Lazy _sessionMessageProcessor; + private Lazy _scaleMonitor; + private bool _disposed; private bool _started; // Serialize execution of StopAsync to avoid calling Unregister* concurrently private readonly SemaphoreSlim _stopAsyncSemaphore = new SemaphoreSlim(1, 1); - private SessionMessageProcessor _sessionMessageProcessor; - - private Lazy _scaleMonitor; - - public ServiceBusListener(string functionId, EntityType entityType, string entityPath, bool isSessionsEnabled, ITriggeredFunctionExecutor triggerExecutor, - ServiceBusOptions config, ServiceBusAccount serviceBusAccount, MessagingProvider messagingProvider, ILoggerFactory loggerFactory, bool singleDispatch) + public ServiceBusListener( + string functionId, + EntityType entityType, + string entityPath, + bool isSessionsEnabled, + ITriggeredFunctionExecutor triggerExecutor, + ServiceBusOptions options, + string connection, + MessagingProvider messagingProvider, + ILoggerFactory loggerFactory, + bool singleDispatch, + ServiceBusClientFactory clientFactory) { _functionId = functionId; _entityType = entityType; @@ -53,26 +67,18 @@ public ServiceBusListener(string functionId, EntityType entityType, string entit _triggerExecutor = triggerExecutor; _cancellationTokenSource = new CancellationTokenSource(); _messagingProvider = messagingProvider; - _serviceBusAccount = serviceBusAccount; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); - _batchReceiver = CreateMessageReceiver(); - _sessionClient = CreateSessionClient(); - _scaleMonitor = new Lazy(() => new ServiceBusScaleMonitor(_functionId, _entityType, _entityPath, _serviceBusAccount.ConnectionString, _batchReceiver, _loggerFactory)); - _singleDispatch = singleDispatch; - if (_isSessionsEnabled) - { - _sessionMessageProcessor = _messagingProvider.CreateSessionMessageProcessor(_entityPath, _serviceBusAccount.ConnectionString); - } - else - { - _messageProcessor = _messagingProvider.CreateMessageProcessor(entityPath, _serviceBusAccount.ConnectionString); - } - _serviceBusOptions = config; - } + _client = new Lazy(() => clientFactory.CreateClientFromSetting(connection)); + _batchReceiver = new Lazy(() => _messagingProvider.CreateBatchMessageReceiver(_client.Value, _entityPath)); + _messageProcessor = new Lazy(() => _messagingProvider.CreateMessageProcessor(_client.Value, _entityPath)); + _sessionMessageProcessor = new Lazy(() => _messagingProvider.CreateSessionMessageProcessor(_client.Value, _entityPath)); - internal ServiceBusReceiver BatchReceiver => _batchReceiver.Value; + _scaleMonitor = new Lazy(() => new ServiceBusScaleMonitor(_functionId, _entityType, _entityPath, connection, _batchReceiver, _loggerFactory, clientFactory)); + _singleDispatch = singleDispatch; + _serviceBusOptions = options; + } public async Task StartAsync(CancellationToken cancellationToken) { @@ -87,13 +93,13 @@ public async Task StartAsync(CancellationToken cancellationToken) { if (_isSessionsEnabled) { - _sessionMessageProcessor.Processor.ProcessMessageAsync += ProcessSessionMessageAsync; - await _sessionMessageProcessor.Processor.StartProcessingAsync(cancellationToken).ConfigureAwait(false); + _sessionMessageProcessor.Value.Processor.ProcessMessageAsync += ProcessSessionMessageAsync; + await _sessionMessageProcessor.Value.Processor.StartProcessingAsync(cancellationToken).ConfigureAwait(false); } else { - _messageProcessor.Processor.ProcessMessageAsync += ProcessMessageAsync; - await _messageProcessor.Processor.StartProcessingAsync(cancellationToken).ConfigureAwait(false); + _messageProcessor.Value.Processor.ProcessMessageAsync += ProcessMessageAsync; + await _messageProcessor.Value.Processor.StartProcessingAsync(cancellationToken).ConfigureAwait(false); } } else @@ -115,7 +121,7 @@ public async Task StopAsync(CancellationToken cancellationToken) throw new InvalidOperationException("The listener has not yet been started or has already been stopped."); } - // Unregister* methods stop new messages from being processed while allowing in-flight messages to complete. + // StopProcessingAsync method stop new messages from being processed while allowing in-flight messages to complete. // As the amount of time functions are allowed to complete processing varies by SKU, we specify max timespan // as the amount of time Service Bus SDK should wait for in-flight messages to complete procesing after // unregistering the message handler so that functions have as long as the host continues to run time to complete. @@ -123,11 +129,11 @@ public async Task StopAsync(CancellationToken cancellationToken) { if (_isSessionsEnabled) { - await _sessionMessageProcessor.Processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); + await _sessionMessageProcessor.Value.Processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); } else { - await _messageProcessor.Processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); + await _messageProcessor.Value.Processor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); } } // Batch processing will be stopped via the _started flag on its next iteration @@ -160,16 +166,16 @@ public void Dispose() if (_batchReceiver != null && _batchReceiver.IsValueCreated) { - BatchReceiver.CloseAsync().Wait(); + _batchReceiver.Value.CloseAsync().Wait(); _batchReceiver = null; } - if (_sessionClient != null && _sessionClient.IsValueCreated) + if (_client != null && _client.IsValueCreated) { #pragma warning disable AZC0107 // DO NOT call public asynchronous method in synchronous scope. - _sessionClient.Value.DisposeAsync().EnsureCompleted(); + _client.Value.DisposeAsync().EnsureCompleted(); #pragma warning restore AZC0107 // DO NOT call public asynchronous method in synchronous scope. - _sessionClient = null; + _client = null; } _stopAsyncSemaphore.Dispose(); @@ -179,22 +185,12 @@ public void Dispose() } } - private Lazy CreateMessageReceiver() - { - return new Lazy(() => _messagingProvider.CreateBatchMessageReceiver(_entityPath, _serviceBusAccount.ConnectionString)); - } - - private Lazy CreateSessionClient() - { - return new Lazy(() => _messagingProvider.CreateSessionClient(_serviceBusAccount.ConnectionString)); - } - internal async Task ProcessMessageAsync(ProcessMessageEventArgs args) { using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _cancellationTokenSource.Token)) { var actions = new ServiceBusMessageActions(args); - if (!await _messageProcessor.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false)) + if (!await _messageProcessor.Value.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false)) { return; } @@ -204,7 +200,7 @@ internal async Task ProcessMessageAsync(ProcessMessageEventArgs args) TriggeredFunctionData data = input.GetTriggerFunctionData(); FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token).ConfigureAwait(false); - await _messageProcessor.CompleteProcessingMessageAsync(actions, args.Message, result, linkedCts.Token).ConfigureAwait(false); + await _messageProcessor.Value.CompleteProcessingMessageAsync(actions, args.Message, result, linkedCts.Token).ConfigureAwait(false); } } @@ -213,7 +209,7 @@ internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs ar using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _cancellationTokenSource.Token)) { var actions = new ServiceBusSessionMessageActions(args); - if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false)) + if (!await _sessionMessageProcessor.Value.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false)) { return; } @@ -223,7 +219,7 @@ internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs ar TriggeredFunctionData data = input.GetTriggerFunctionData(); FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token).ConfigureAwait(false); - await _sessionMessageProcessor.CompleteProcessingMessageAsync(actions, args.Message, result, linkedCts.Token).ConfigureAwait(false); + await _sessionMessageProcessor.Value.CompleteProcessingMessageAsync(actions, args.Message, result, linkedCts.Token).ConfigureAwait(false); } } @@ -233,11 +229,11 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken) ServiceBusReceiver receiver = null; if (_isSessionsEnabled) { - sessionClient = _sessionClient.Value; + sessionClient = _client.Value; } else { - receiver = BatchReceiver; + receiver = _batchReceiver.Value; } Task.Run(async () => @@ -252,7 +248,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken) return; } - if (_isSessionsEnabled && ( receiver == null || receiver.IsClosed)) + if (_isSessionsEnabled && (receiver == null || receiver.IsClosed)) { try { @@ -278,7 +274,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken) ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateBatch(messagesArray); if (_isSessionsEnabled) { - input.MessageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver) receiver); + input.MessageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver); } else { diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListenerFactory.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListenerFactory.cs deleted file mode 100644 index df924835ef9bf..0000000000000 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListenerFactory.cs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using Microsoft.Azure.WebJobs.Host.Executors; -using Microsoft.Azure.WebJobs.Host.Listeners; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Microsoft.Azure.WebJobs.Host.Protocols; - -namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners -{ - internal class ServiceBusListenerFactory : IListenerFactory - { - private readonly ServiceBusAccount _account; - private readonly EntityType _entityType; - private readonly string _entityPath; - private readonly bool _isSessionsEnabled; - private readonly ITriggeredFunctionExecutor _executor; - private readonly FunctionDescriptor _descriptor; - private readonly ServiceBusOptions _options; - private readonly MessagingProvider _messagingProvider; - private readonly ILoggerFactory _loggerFactory; - private readonly bool _singleDispatch; - - public ServiceBusListenerFactory(ServiceBusAccount account, EntityType entityType, string entityPath, bool isSessionsEnabled, ITriggeredFunctionExecutor executor, - FunctionDescriptor descriptor, ServiceBusOptions options, MessagingProvider messagingProvider, ILoggerFactory loggerFactory, bool singleDispatch) - { - _account = account; - _entityType = entityType; - _entityPath = entityPath; - _isSessionsEnabled = isSessionsEnabled; - _executor = executor; - _descriptor = descriptor; - _options = options; - _messagingProvider = messagingProvider; - _loggerFactory = loggerFactory; - _singleDispatch = singleDispatch; - } - - public Task CreateAsync(CancellationToken cancellationToken) - { - var listener = new ServiceBusListener(_descriptor.Id, _entityType, _entityPath, _isSessionsEnabled, _executor, _options, _account, _messagingProvider, _loggerFactory, _singleDispatch); - return Task.FromResult(listener); - } - } -} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs index 750d64a3cb41a..e7d3ec80d833f 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs @@ -3,15 +3,14 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; using System.Globalization; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners { @@ -22,7 +21,6 @@ internal class ServiceBusScaleMonitor : IScaleMonitor private readonly string _functionId; private readonly EntityType _entityType; private readonly string _entityPath; - private readonly string _connectionString; private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; private readonly bool _isListeningOnDeadLetterQueue; private readonly Lazy _receiver; @@ -31,16 +29,15 @@ internal class ServiceBusScaleMonitor : IScaleMonitor private DateTime _nextWarningTime; - public ServiceBusScaleMonitor(string functionId, EntityType entityType, string entityPath, string connectionString, Lazy receiver, ILoggerFactory loggerFactory) + public ServiceBusScaleMonitor(string functionId, EntityType entityType, string entityPath, string connection, Lazy receiver, ILoggerFactory loggerFactory, ServiceBusClientFactory clientFactory) { _functionId = functionId; _entityType = entityType; _entityPath = entityPath; - _connectionString = connectionString; _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-ServiceBusTrigger-{_entityPath}".ToLower(CultureInfo.InvariantCulture)); _isListeningOnDeadLetterQueue = entityPath.EndsWith(DeadLetterQueuePath, StringComparison.OrdinalIgnoreCase); _receiver = receiver; - _administrationClient = new Lazy(() => new ServiceBusAdministrationClient(_connectionString)); + _administrationClient = new Lazy(() => clientFactory.CreateAdministrationClient(connection)); _logger = loggerFactory.CreateLogger(); _nextWarningTime = DateTime.UtcNow; } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index f9e3cd0773e1b..19b081f1d1e71 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -25,5 +25,7 @@ + + diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs index f6418ba2e2a02..11eb600043322 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessageProcessor.cs @@ -17,16 +17,23 @@ public class MessageProcessor /// /// Initializes a new instance of . /// - /// The to use. - public MessageProcessor(ServiceBusProcessor processor) + /// The to use for single dispatch functions. + /// The to use for multiple dispatch functions. + public MessageProcessor(ServiceBusProcessor processor, ServiceBusReceiver receiver) { Processor = processor ?? throw new ArgumentNullException(nameof(processor)); + Receiver = receiver ?? throw new ArgumentNullException(nameof(processor)); } /// /// Gets or sets the that will be used by the . /// - internal ServiceBusProcessor Processor { get; set; } + protected internal ServiceBusProcessor Processor { get; set; } + + /// + /// Gets or sets the that will be used by the . + /// + protected internal ServiceBusReceiver Receiver { get; set; } /// /// This method is called when there is a new message to process, before the job function is invoked. diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs index 541893af04d81..1988614e3f0fc 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs @@ -3,8 +3,10 @@ using System; using System.Collections.Concurrent; +using Azure.Core; using Azure.Messaging.ServiceBus; using Microsoft.Azure.WebJobs.ServiceBus.Listeners; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Options; namespace Microsoft.Azure.WebJobs.ServiceBus @@ -16,210 +18,119 @@ namespace Microsoft.Azure.WebJobs.ServiceBus public class MessagingProvider { private readonly ServiceBusOptions _options; - private readonly ConcurrentDictionary _messageSenderCache = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _messageReceiverCache = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _clientCache = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _processorCache = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _sessionProcessorCache = new ConcurrentDictionary(); - - /// - /// Constructs a new instance. - /// - /// The . - public MessagingProvider(IOptions serviceBusOptions) - { - _options = serviceBusOptions?.Value ?? throw new ArgumentNullException(nameof(serviceBusOptions)); - } - /// - /// Creates a for the specified ServiceBus entity. - /// - /// The ServiceBus entity to create a for. - /// The ServiceBus connection string. - /// The . - public virtual MessageProcessor CreateMessageProcessor(string entityPath, string connectionString) - { - if (string.IsNullOrEmpty(entityPath)) - { - throw new ArgumentNullException(nameof(entityPath)); - } - if (string.IsNullOrEmpty(connectionString)) - { - throw new ArgumentNullException(nameof(connectionString)); - } + private readonly ConcurrentDictionary _messageSenderCache = new(); + private readonly ConcurrentDictionary _messageReceiverCache = new(); + private readonly ConcurrentDictionary _clientCache = new(); - return new MessageProcessor(GetOrAddProcessor(entityPath, connectionString)); + protected MessagingProvider() + { } - /// - /// Creates a for the specified ServiceBus entity. - /// - /// - /// You can override this method to customize the . - /// - /// The ServiceBus entity to create a for. - /// The ServiceBus connection string. - /// - public virtual ServiceBusProcessor CreateProcessor(string entityPath, string connectionString) + public MessagingProvider(IOptions options) { - if (string.IsNullOrEmpty(entityPath)) - { - throw new ArgumentNullException(nameof(entityPath)); - } - if (string.IsNullOrEmpty(connectionString)) - { - throw new ArgumentNullException(nameof(connectionString)); - } - - return GetOrAddProcessor(entityPath, connectionString); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); } - /// - /// Creates a for the specified ServiceBus entity. - /// - /// - /// You can override this method to customize the . - /// - /// The ServiceBus entity to create a for. - /// The ServiceBus connection string. - /// - public virtual ServiceBusSender CreateMessageSender(string entityPath, string connectionString) + public virtual ServiceBusClient CreateClient(string connectionString) { - if (string.IsNullOrEmpty(entityPath)) - { - throw new ArgumentNullException(nameof(entityPath)); - } - if (string.IsNullOrEmpty(connectionString)) - { - throw new ArgumentNullException(nameof(connectionString)); - } + Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString)); - return GetOrAddMessageSender(entityPath, connectionString); + return _clientCache.GetOrAdd( + connectionString, + (_) => new ServiceBusClient(connectionString, _options.ToClientOptions())); } - /// - /// Creates a for the specified ServiceBus entity. - /// - /// - /// You can override this method to customize the . - /// - /// The ServiceBus entity to create a for. - /// The ServiceBus connection string. - /// - public virtual ServiceBusReceiver CreateBatchMessageReceiver(string entityPath, string connectionString) + public virtual ServiceBusClient CreateClient(string fullyQualifiedNamespace, TokenCredential credential) { - if (string.IsNullOrEmpty(entityPath)) - { - throw new ArgumentNullException(nameof(entityPath)); - } - if (string.IsNullOrEmpty(connectionString)) - { - throw new ArgumentNullException(nameof(connectionString)); - } + Argument.AssertNotNullOrEmpty(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace)); + Argument.AssertNotNull(credential, nameof(credential)); - return GetOrAddMessageReceiver(entityPath, connectionString); + return _clientCache.GetOrAdd( + fullyQualifiedNamespace, + (_) => new ServiceBusClient(fullyQualifiedNamespace, credential, _options.ToClientOptions())); } - /// - /// Creates a for the specified ServiceBus connection. - /// - /// - /// You can override this method to customize the . - /// - /// The ServiceBus connection string. - /// - public virtual ServiceBusClient CreateClient(string connectionString) + public virtual MessageProcessor CreateMessageProcessor(ServiceBusClient client, string entityPath) { - if (string.IsNullOrEmpty(connectionString)) - { - throw new ArgumentNullException(nameof(connectionString)); - } + Argument.AssertNotNull(client, nameof(client)); + Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); - return new ServiceBusClient(connectionString, _options.ToClientOptions()); + return new MessageProcessor(CreateProcessor(client, entityPath), GetOrAddMessageReceiver(client, entityPath)); } - /// - /// Creates a for the specified ServiceBus entity. - /// - /// The ServiceBus entity to create a for. - /// The ServiceBus connection string. - /// - public virtual SessionMessageProcessor CreateSessionMessageProcessor(string entityPath, string connectionString) + public virtual ServiceBusProcessor CreateProcessor(ServiceBusClient client, string entityPath) { - if (string.IsNullOrEmpty(entityPath)) + // processors cannot be shared across listeners since there is a limit of 1 event handler in the Service Bus SDK. + + ServiceBusProcessor processor; + if (ServiceBusEntityPathHelper.ParseEntityType(entityPath) == EntityType.Topic) { - throw new ArgumentNullException(nameof(entityPath)); + // entityPath for a subscription is "{TopicName}/Subscriptions/{SubscriptionName}" + ServiceBusEntityPathHelper.ParseTopicAndSubscription(entityPath, out string topic, out string subscription); + processor = client.CreateProcessor(topic, subscription, _options.ToProcessorOptions()); } - if (string.IsNullOrEmpty(connectionString)) + else { - throw new ArgumentNullException(nameof(connectionString)); + // entityPath for a queue is "{QueueName}" + processor = client.CreateProcessor(entityPath, _options.ToProcessorOptions()); } - - return new SessionMessageProcessor(GetOrAddSessionProcessor(entityPath, connectionString)); + processor.ProcessErrorAsync += _options.ExceptionReceivedHandler; + return processor; } - private ServiceBusSender GetOrAddMessageSender(string entityPath, string connectionString) + public virtual ServiceBusSender CreateMessageSender(ServiceBusClient client, string entityPath) { - ServiceBusClient client = _clientCache.GetOrAdd(connectionString, CreateClient(connectionString)); + Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); + return _messageSenderCache.GetOrAdd(entityPath, client.CreateSender(entityPath)); } - private ServiceBusReceiver GetOrAddMessageReceiver(string entityPath, string connectionString) + public virtual ServiceBusReceiver CreateBatchMessageReceiver(ServiceBusClient client, string entityPath) { - ServiceBusClient client = _clientCache.GetOrAdd(connectionString, (_) => CreateClient(connectionString)); - return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver(entityPath, new ServiceBusReceiverOptions - { - PrefetchCount = _options.PrefetchCount - })); + Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); + + return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver( + entityPath, + new ServiceBusReceiverOptions + { + PrefetchCount = _options.PrefetchCount + })); } - private ServiceBusProcessor GetOrAddProcessor(string entityPath, string connectionString) + public virtual SessionMessageProcessor CreateSessionMessageProcessor(ServiceBusClient client, string entityPath) { - ServiceBusClient client = _clientCache.GetOrAdd(connectionString, (_) => CreateClient(connectionString)); - return _processorCache.GetOrAdd(entityPath, (_) => - { - ServiceBusProcessor processor; - if (ServiceBusEntityPathHelper.ParseEntityType(entityPath) == EntityType.Topic) - { - // entityPath for a subscription is "{TopicName}/Subscriptions/{SubscriptionName}" - ServiceBusEntityPathHelper.ParseTopicAndSubscription(entityPath, out string topic, out string subscription); - processor = client.CreateProcessor(topic, subscription, _options.ToProcessorOptions()); - } - else - { - // entityPath for a queue is "{QueueName}" - processor = client.CreateProcessor(entityPath, _options.ToProcessorOptions()); - } - processor.ProcessErrorAsync += _options.ExceptionReceivedHandler; - return processor; - }); + Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); + + return new SessionMessageProcessor(client, CreateSessionProcessor(client, entityPath)); } - private ServiceBusSessionProcessor GetOrAddSessionProcessor(string entityPath, string connectionString) + public virtual ServiceBusSessionProcessor CreateSessionProcessor(ServiceBusClient client, string entityPath) { - ServiceBusClient client = _clientCache.GetOrAdd(connectionString, (_) => CreateClient(connectionString)); - return _sessionProcessorCache.GetOrAdd(entityPath, (_) => + ServiceBusSessionProcessor processor; + if (ServiceBusEntityPathHelper.ParseEntityType(entityPath) == EntityType.Topic) { - ServiceBusSessionProcessor processor; - if (ServiceBusEntityPathHelper.ParseEntityType(entityPath) == EntityType.Topic) - { - // entityPath for a subscription is "{TopicName}/Subscriptions/{SubscriptionName}" - ServiceBusEntityPathHelper.ParseTopicAndSubscription(entityPath, out string topic, out string subscription); - processor = client.CreateSessionProcessor(topic, subscription, _options.ToSessionProcessorOptions()); - } - else - { - // entityPath for a queue is "{QueueName}" - processor = client.CreateSessionProcessor(entityPath, _options.ToSessionProcessorOptions()); - } - processor.ProcessErrorAsync += _options.ExceptionReceivedHandler; - return processor; - }); + // entityPath for a subscription is "{TopicName}/Subscriptions/{SubscriptionName}" + ServiceBusEntityPathHelper.ParseTopicAndSubscription(entityPath, out string topic, out string subscription); + processor = client.CreateSessionProcessor(topic, subscription, _options.ToSessionProcessorOptions()); + } + else + { + // entityPath for a queue is "{QueueName}" + processor = client.CreateSessionProcessor(entityPath, _options.ToSessionProcessorOptions()); + } + processor.ProcessErrorAsync += _options.ExceptionReceivedHandler; + return processor; } - internal ServiceBusClient CreateSessionClient(string connectionString) + private ServiceBusReceiver GetOrAddMessageReceiver(ServiceBusClient client, string entityPath) { - return _clientCache.GetOrAdd(connectionString, (_) => CreateClient(connectionString)); + return _messageReceiverCache.GetOrAdd(entityPath, (_) => client.CreateReceiver( + entityPath, + new ServiceBusReceiverOptions + { + PrefetchCount = _options.PrefetchCount + })); } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs index bf09312609e38..af0a61e0cc3a0 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/SessionMessageProcessor.cs @@ -11,15 +11,22 @@ namespace Microsoft.Azure.WebJobs.ServiceBus { public class SessionMessageProcessor { - public SessionMessageProcessor(ServiceBusSessionProcessor processor) + public SessionMessageProcessor(ServiceBusClient client, ServiceBusSessionProcessor processor) { Processor = processor ?? throw new ArgumentNullException(nameof(processor)); + Client = client ?? throw new ArgumentNullException(nameof(client)); } /// - /// Gets or sets the that will be used by the . + /// Gets or sets the that will be used by the . /// - internal ServiceBusSessionProcessor Processor { get; set; } + protected internal ServiceBusSessionProcessor Processor { get; set; } + + /// + /// Gets or sets the that will be used by the to + /// accept new sessions for multiple dispatch functions.. + /// + protected internal ServiceBusClient Client { get; set; } /// /// This method is called when there is a new message to process, before the job function is invoked. diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/ServiceBusAccount.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/ServiceBusAccount.cs deleted file mode 100644 index dca0b479b39a5..0000000000000 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/ServiceBusAccount.cs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Globalization; -using Microsoft.Azure.WebJobs.Logging; -using Microsoft.Extensions.Configuration; - -namespace Microsoft.Azure.WebJobs.ServiceBus -{ - internal class ServiceBusAccount - { - private readonly ServiceBusOptions _options; - private readonly IConnectionProvider _connectionProvider; - private readonly IConfiguration _configuration; - private string _connectionString; - - public ServiceBusAccount(ServiceBusOptions options, IConfiguration configuration, IConnectionProvider connectionProvider = null) - { - _options = options ?? throw new ArgumentNullException(nameof(options)); - _configuration = configuration; - _connectionProvider = connectionProvider; - } - - internal ServiceBusAccount() - { - } - - public virtual string ConnectionString - { - get - { - if (string.IsNullOrEmpty(_connectionString)) - { - _connectionString = _options.ConnectionString; - if (_connectionProvider != null && !string.IsNullOrEmpty(_connectionProvider.Connection)) - { - _connectionString = _configuration.GetWebJobsConnectionString(_connectionProvider.Connection); - } - - if (string.IsNullOrEmpty(_connectionString)) - { - throw new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Microsoft Azure WebJobs SDK ServiceBus connection string '{0}' is missing or empty.", - Sanitizer.Sanitize(_connectionProvider.Connection) ?? Constants.DefaultConnectionSettingStringName)); - } - } - - return _connectionString; - } - } - } -} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs index b40dd7d9773e3..3828725c1a89f 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Triggers/ServiceBusTriggerAttributeBindingProvider.cs @@ -2,19 +2,18 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Globalization; using System.Reflection; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Listeners; -using Microsoft.Azure.WebJobs.Host.Protocols; using Microsoft.Azure.WebJobs.Host.Triggers; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Azure.WebJobs.ServiceBus.Listeners; -using Microsoft.Azure.WebJobs.Host.Config; +using Microsoft.Extensions.Azure; using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; namespace Microsoft.Azure.WebJobs.ServiceBus.Triggers { @@ -23,19 +22,24 @@ internal class ServiceBusTriggerAttributeBindingProvider : ITriggerBindingProvid private readonly INameResolver _nameResolver; private readonly ServiceBusOptions _options; private readonly MessagingProvider _messagingProvider; - private readonly IConfiguration _configuration; private readonly ILoggerFactory _loggerFactory; private readonly IConverterManager _converterManager; + private readonly ServiceBusClientFactory _clientFactory; - public ServiceBusTriggerAttributeBindingProvider(INameResolver nameResolver, ServiceBusOptions options, MessagingProvider messagingProvider, IConfiguration configuration, - ILoggerFactory loggerFactory, IConverterManager converterManager) + public ServiceBusTriggerAttributeBindingProvider( + INameResolver nameResolver, + ServiceBusOptions options, + MessagingProvider messagingProvider, + ILoggerFactory loggerFactory, + IConverterManager converterManager, + ServiceBusClientFactory clientFactory) { _nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver)); _options = options ?? throw new ArgumentNullException(nameof(options)); _messagingProvider = messagingProvider ?? throw new ArgumentNullException(nameof(messagingProvider)); - _configuration = configuration; _loggerFactory = loggerFactory; _converterManager = converterManager; + _clientFactory = clientFactory; } public Task TryCreateAsync(TriggerBindingProviderContext context) @@ -53,33 +57,27 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex return Task.FromResult(null); } - string queueName = null; - string topicName = null; - string subscriptionName = null; - string entityPath = null; + attribute.Connection = _nameResolver.ResolveWholeString(attribute.Connection); + string entityPath; EntityType entityType; - if (attribute.QueueName != null) { - queueName = Resolve(attribute.QueueName); + var queueName = _nameResolver.ResolveWholeString(attribute.QueueName); entityPath = queueName; entityType = EntityType.Queue; } else { - topicName = Resolve(attribute.TopicName); - subscriptionName = Resolve(attribute.SubscriptionName); + var topicName = _nameResolver.ResolveWholeString(attribute.TopicName); + var subscriptionName = _nameResolver.ResolveWholeString(attribute.SubscriptionName); entityPath = EntityNameFormatter.FormatSubscriptionPath(topicName, subscriptionName); entityType = EntityType.Topic; } - attribute.Connection = Resolve(attribute.Connection); - ServiceBusAccount account = new ServiceBusAccount(_options, _configuration, attribute); - Func> createListener = (factoryContext, singleDispatch) => { - IListener listener = new ServiceBusListener(factoryContext.Descriptor.Id, entityType, entityPath, attribute.IsSessionsEnabled, factoryContext.Executor, _options, account, _messagingProvider, _loggerFactory, singleDispatch); + IListener listener = new ServiceBusListener(factoryContext.Descriptor.Id, entityType, entityPath, attribute.IsSessionsEnabled, factoryContext.Executor, _options, attribute.Connection, _messagingProvider, _loggerFactory, singleDispatch, _clientFactory); return Task.FromResult(listener); }; @@ -89,15 +87,5 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex return Task.FromResult(binding); } - - private string Resolve(string queueName) - { - if (_nameResolver == null) - { - return queueName; - } - - return _nameResolver.ResolveWholeString(queueName); - } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusAttributeBindingProviderTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusAttributeBindingProviderTests.cs index c08d53ef42b08..84fac4403d2e9 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusAttributeBindingProviderTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusAttributeBindingProviderTests.cs @@ -7,8 +7,10 @@ using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.ServiceBus.Bindings; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Options; using Moq; @@ -23,12 +25,13 @@ public class ServiceBusAttributeBindingProviderTests public ServiceBusAttributeBindingProviderTests() { - _configuration = new ConfigurationBuilder() - .AddEnvironmentVariables() - .Build(); + _configuration = new ConfigurationBuilder().AddInMemoryCollection(new KeyValuePair[] { new("connection", "connectionString") }).Build(); + Mock mockResolver = new Mock(MockBehavior.Strict); ServiceBusOptions config = new ServiceBusOptions(); - _provider = new ServiceBusAttributeBindingProvider(mockResolver.Object, config, _configuration, new MessagingProvider(new OptionsWrapper(config))); + var messagingProvider = new MessagingProvider(new OptionsWrapper(config)); + var factory = new ServiceBusClientFactory(_configuration, new Mock().Object, messagingProvider); + _provider = new ServiceBusAttributeBindingProvider(mockResolver.Object, messagingProvider, factory); } [Test] @@ -61,7 +64,8 @@ internal static void TestJob_AccountOverride( } internal static void TestJob( - [ServiceBusAttribute("test", Connection = Constants.DefaultConnectionStringName)] out ServiceBusMessage message) + [ServiceBusAttribute("test", Connection = Constants.DefaultConnectionStringName)] + out ServiceBusMessage message) { message = new ServiceBusMessage(); } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusTriggerAttributeBindingProviderTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusTriggerAttributeBindingProviderTests.cs index a4243e4c54c5c..229cec71aa4f0 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusTriggerAttributeBindingProviderTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Bindings/ServiceBusTriggerAttributeBindingProviderTests.cs @@ -12,28 +12,30 @@ using Moq; using NUnit.Framework; using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Azure; +using Azure.Messaging.ServiceBus.Tests; +using System.Collections.Generic; +using Microsoft.Azure.WebJobs.ServiceBus.Tests; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests.Bindings { public class ServiceBusTriggerAttributeBindingProviderTests { - private readonly Mock _mockMessagingProvider; private readonly ServiceBusTriggerAttributeBindingProvider _provider; - private readonly IConfiguration _configuration; public ServiceBusTriggerAttributeBindingProviderTests() { - _configuration = new ConfigurationBuilder() - .AddEnvironmentVariables() - .Build(); + var configuration = ConfigurationUtilities.CreateConfiguration(new KeyValuePair(Constants.DefaultConnectionStringName, "defaultConnection")); + Mock mockResolver = new Mock(MockBehavior.Strict); - ServiceBusOptions config = new ServiceBusOptions(); - _mockMessagingProvider = new Mock(MockBehavior.Strict, new OptionsWrapper(config)); + ServiceBusOptions options = new ServiceBusOptions(); Mock convertManager = new Mock(MockBehavior.Default); - - _provider = new ServiceBusTriggerAttributeBindingProvider(mockResolver.Object, config, _mockMessagingProvider.Object, _configuration, NullLoggerFactory.Instance, convertManager.Object); + var provider = new MessagingProvider(new OptionsWrapper(options)); + var factory = new ServiceBusClientFactory(configuration, new Mock().Object, provider); + _provider = new ServiceBusTriggerAttributeBindingProvider(mockResolver.Object, options, provider, NullLoggerFactory.Instance, convertManager.Object, factory); } [Test] @@ -60,13 +62,15 @@ public async Task TryCreateAsync_DefaultAccount() internal static void TestJob_AccountOverride( [ServiceBusTriggerAttribute("test"), - ServiceBusAccount(Constants.DefaultConnectionStringName)] ServiceBusMessage message) + ServiceBusAccount(Constants.DefaultConnectionStringName)] + ServiceBusMessage message) { message = new ServiceBusMessage(); } internal static void TestJob( - [ServiceBusTriggerAttribute("test", Connection = Constants.DefaultConnectionStringName)] ServiceBusMessage message) + [ServiceBusTriggerAttribute("test", Connection = Constants.DefaultConnectionStringName)] + ServiceBusMessage message) { message = new ServiceBusMessage(); } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusClientFactoryTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusClientFactoryTests.cs new file mode 100644 index 0000000000000..e03378cbc3077 --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusClientFactoryTests.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; +using Microsoft.Azure.WebJobs.ServiceBus.Tests; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Moq; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.ServiceBus.Tests.Config +{ + public class ServiceBusClientFactoryTests + { + [Test] + [TestCase("DefaultConnectionString", "DefaultConectionSettingString", "DefaultConnectionString")] + [TestCase("DefaultConnectionString", null, "DefaultConnectionString")] + [TestCase(null, "DefaultConectionSettingString", "DefaultConectionSettingString")] + [TestCase(null, null, null)] + public void ReadDefaultConnectionString(string defaultConnectionString, string defaultConnectionSettingString, string expectedValue) + { + var configuration = ConfigurationUtilities.CreateConfiguration( + new KeyValuePair("ConnectionStrings:" + Constants.DefaultConnectionStringName, defaultConnectionString), + new KeyValuePair(Constants.DefaultConnectionSettingStringName, defaultConnectionSettingString)); + + var mockProvider = new Mock(); + mockProvider.Setup( + p => p.CreateClient(expectedValue)) + .Returns(Mock.Of()); + + var factory = new ServiceBusClientFactory(configuration, Mock.Of(), mockProvider.Object); + if (expectedValue == null) + { + Assert.That( + () => factory.CreateClientFromSetting(null), + Throws.InstanceOf()); + } + else + { + factory.CreateClientFromSetting(null); + mockProvider.VerifyAll(); + } + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs index c6db9a1aca058..7180c9ef7218c 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs @@ -26,7 +26,6 @@ public void ConfigureOptions_AppliesValuesCorrectly_BackCompat() ServiceBusOptions options = CreateOptionsFromConfigBackCompat(); Assert.AreEqual(123, options.PrefetchCount); - Assert.AreEqual("TestConnectionString", options.ConnectionString); Assert.AreEqual(123, options.MaxConcurrentCalls); Assert.False(options.AutoCompleteMessages); Assert.AreEqual(TimeSpan.FromSeconds(15), options.MaxAutoLockRenewalDuration); @@ -42,8 +41,7 @@ public void ConfigureOptions_Format_Returns_Expected_BackCompat() ServiceBusOptions result = iObj.ToObject(); Assert.AreEqual(123, result.PrefetchCount); - // can't round trip the connection string - Assert.IsNull(result.ConnectionString); + Assert.AreEqual(123, result.MaxConcurrentCalls); Assert.False(result.AutoCompleteMessages); Assert.AreEqual(TimeSpan.FromSeconds(15), result.MaxAutoLockRenewalDuration); @@ -55,7 +53,6 @@ public void ConfigureOptions_AppliesValuesCorrectly() ServiceBusOptions options = CreateOptionsFromConfig(); Assert.AreEqual(123, options.PrefetchCount); - Assert.AreEqual("TestConnectionString", options.ConnectionString); Assert.AreEqual(123, options.MaxConcurrentCalls); Assert.False(options.AutoCompleteMessages); Assert.AreEqual(TimeSpan.FromSeconds(15), options.MaxAutoLockRenewalDuration); @@ -71,8 +68,7 @@ public void ConfigureOptions_Format_Returns_Expected() ServiceBusOptions result = iObj.ToObject(); Assert.AreEqual(123, result.PrefetchCount); - // can't round trip the connection string - Assert.IsNull(result.ConnectionString); + Assert.AreEqual(123, result.MaxConcurrentCalls); Assert.False(result.AutoCompleteMessages); Assert.AreEqual(TimeSpan.FromSeconds(15), result.MaxAutoLockRenewalDuration); @@ -134,7 +130,7 @@ public void AddServiceBus_ThrowsArgumentNull_WhenServiceBusOptionsIsNull() var exception = Assert.Throws(() => host.Services.GetServices()); - Assert.AreEqual("serviceBusOptions", exception.ParamName); + Assert.AreEqual("options", exception.ParamName); } [Test] @@ -157,20 +153,11 @@ public void AddServiceBus_NoServiceBusOptions_PerformsExpectedRegistration() [Test] public void AddServiceBus_ServiceBusOptionsProvided_PerformsExpectedRegistration() { - string fakeConnStr = "test service bus connection"; - IHost host = new HostBuilder() .ConfigureDefaultTestHost(b => { b.AddServiceBus(); }) - .ConfigureServices(s => - { - s.Configure(o => - { - o.ConnectionString = fakeConnStr; - }); - }) .Build(); // verify that the service bus config provider was registered @@ -179,30 +166,6 @@ public void AddServiceBus_ServiceBusOptionsProvided_PerformsExpectedRegistration // verify that the service bus config provider was registered var serviceBusExtensionConfig = configProviders.OfType().Single(); - - Assert.AreEqual(fakeConnStr, serviceBusExtensionConfig.Options.ConnectionString); - } - - [Test] - [TestCase("DefaultConnectionString", "DefaultConectionSettingString", "DefaultConnectionString")] - [TestCase("DefaultConnectionString", null, "DefaultConnectionString")] - [TestCase(null, "DefaultConectionSettingString", "DefaultConectionSettingString")] - [TestCase(null, null, null)] - public void ReadDefaultConnectionString(string defaultConnectionString, string sefaultConectionSettingString, string expectedValue) - { - ServiceBusOptions options = TestHelpers.GetConfiguredOptions(b => - { - var test = b.Services.Single(x => x.ServiceType == typeof(IConfiguration)); - - var envPrpvider = (test.ImplementationInstance as ConfigurationRoot).Providers - .Single(x => x.GetType() == typeof(EnvironmentVariablesConfigurationProvider)); - envPrpvider.Set("ConnectionStrings:" + Constants.DefaultConnectionStringName, defaultConnectionString); - envPrpvider.Set(Constants.DefaultConnectionSettingStringName, sefaultConectionSettingString); - - b.AddServiceBus(); - }, new Dictionary()); - - Assert.AreEqual(options.ConnectionString, expectedValue); } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ConfigurationUtilities.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ConfigurationUtilities.cs new file mode 100644 index 0000000000000..730b550b6e5d8 --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ConfigurationUtilities.cs @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Microsoft.Extensions.Configuration; +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.ServiceBus.Tests +{ + public static class ConfigurationUtilities + { + public static IConfiguration CreateConfiguration(params KeyValuePair[] data) + { + return new ConfigurationBuilder().AddInMemoryCollection(data).Build(); + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs index 01781b4443e90..4deb70451ae91 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusListenerTests.cs @@ -2,14 +2,19 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.WebJobs.ServiceBus.Listeners; +using Microsoft.Azure.WebJobs.ServiceBus.Tests; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Moq; using NUnit.Framework; @@ -21,6 +26,7 @@ public class ServiceBusListenerTests private readonly ServiceBusListener _listener; private readonly Mock _mockExecutor; private readonly Mock _mockMessagingProvider; + private readonly Mock _mockClientFactory; private readonly Mock _mockMessageProcessor; private readonly TestLoggerProvider _loggerProvider; private readonly LoggerFactory _loggerFactory; @@ -35,32 +41,40 @@ public ServiceBusListenerTests() var client = new ServiceBusClient(_testConnection); ServiceBusProcessor processor = client.CreateProcessor(_entityPath); ServiceBusReceiver receiver = client.CreateReceiver(_entityPath); + var configuration = ConfigurationUtilities.CreateConfiguration(new KeyValuePair("connection", _testConnection)); ServiceBusOptions config = new ServiceBusOptions { ExceptionHandler = ExceptionReceivedHandler }; - _mockMessageProcessor = new Mock(MockBehavior.Strict, processor); - - _mockMessagingProvider = new Mock(MockBehavior.Strict, new OptionsWrapper(config)); + _mockMessageProcessor = new Mock(MockBehavior.Strict, processor, receiver); + _mockMessagingProvider = new Mock(new OptionsWrapper(config)); + _mockClientFactory = new Mock(configuration, Mock.Of(), _mockMessagingProvider.Object); _mockMessagingProvider - .Setup(p => p.CreateMessageProcessor(_entityPath, _testConnection)) + .Setup(p => p.CreateMessageProcessor(It.IsAny(), _entityPath)) .Returns(_mockMessageProcessor.Object); _mockMessagingProvider - .Setup(p => p.CreateBatchMessageReceiver(_entityPath, _testConnection)) + .Setup(p => p.CreateBatchMessageReceiver(It.IsAny(), _entityPath)) .Returns(receiver); - Mock mockServiceBusAccount = new Mock(MockBehavior.Strict); - mockServiceBusAccount.Setup(a => a.ConnectionString).Returns(_testConnection); - _loggerFactory = new LoggerFactory(); _loggerProvider = new TestLoggerProvider(); _loggerFactory.AddProvider(_loggerProvider); - _listener = new ServiceBusListener(_functionId, EntityType.Queue, _entityPath, false, _mockExecutor.Object, config, mockServiceBusAccount.Object, - _mockMessagingProvider.Object, _loggerFactory, false); + _listener = new ServiceBusListener( + _functionId, + EntityType.Queue, + _entityPath, + false, + _mockExecutor.Object, + config, + "connection", + _mockMessagingProvider.Object, + _loggerFactory, + false, + _mockClientFactory.Object); } [Test] diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs index 39dca96e751e5..edb1daf16eab5 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs @@ -6,11 +6,15 @@ using System.Linq; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.WebJobs.ServiceBus.Listeners; +using Microsoft.Azure.WebJobs.ServiceBus.Tests; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Moq; using NUnit.Framework; @@ -23,41 +27,58 @@ public class ServiceBusScaleMonitorTests private ServiceBusListener _listener; private ServiceBusScaleMonitor _scaleMonitor; private ServiceBusOptions _serviceBusOptions; - private Mock _mockServiceBusAccount; private Mock _mockExecutor; - private Mock _mockMessagingProvider; + private Mock _mockProvider; + private Mock _mockClientFactory; private Mock _mockMessageProcessor; private TestLoggerProvider _loggerProvider; private LoggerFactory _loggerFactory; private string _functionId = "test-functionid"; private string _entityPath = "test-entity-path"; private string _testConnection = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; + private string _connection = "connection"; + private ServiceBusClient _client; [SetUp] public void Setup() { _mockExecutor = new Mock(MockBehavior.Strict); - var client = new ServiceBusClient(_testConnection); + _client = new ServiceBusClient(_testConnection); ServiceBusProcessorOptions processorOptions = new ServiceBusProcessorOptions(); - ServiceBusProcessor messageProcessor = client.CreateProcessor(_entityPath); - _mockMessageProcessor = new Mock(MockBehavior.Strict, messageProcessor); + ServiceBusProcessor messageProcessor = _client.CreateProcessor(_entityPath); + ServiceBusReceiver receiver = _client.CreateReceiver(_entityPath); + _mockMessageProcessor = new Mock(MockBehavior.Strict, messageProcessor, receiver); + var configuration = ConfigurationUtilities.CreateConfiguration(new KeyValuePair(_connection, _testConnection)); _serviceBusOptions = new ServiceBusOptions(); - _mockMessagingProvider = new Mock(MockBehavior.Strict, new OptionsWrapper(_serviceBusOptions)); + _mockProvider = new Mock(new OptionsWrapper(new ServiceBusOptions())); + _mockClientFactory = new Mock(configuration, Mock.Of(), _mockProvider.Object); - _mockMessagingProvider - .Setup(p => p.CreateMessageProcessor(_entityPath, _testConnection)) + _mockProvider + .Setup(p => p.CreateMessageProcessor(_client, _entityPath)) .Returns(_mockMessageProcessor.Object); - _mockServiceBusAccount = new Mock(MockBehavior.Strict); - _mockServiceBusAccount.Setup(a => a.ConnectionString).Returns(_testConnection); + _mockProvider + .Setup(p => p.CreateClient(_testConnection)) + .Returns(_client); _loggerFactory = new LoggerFactory(); _loggerProvider = new TestLoggerProvider(); _loggerFactory.AddProvider(_loggerProvider); - _listener = new ServiceBusListener(_functionId, EntityType.Queue, _entityPath, false, _mockExecutor.Object, _serviceBusOptions, _mockServiceBusAccount.Object, - _mockMessagingProvider.Object, _loggerFactory, false); + _listener = new ServiceBusListener( + _functionId, + EntityType.Queue, + _entityPath, + false, + _mockExecutor.Object, + _serviceBusOptions, + _connection, + _mockProvider.Object, + _loggerFactory, + false, + _mockClientFactory.Object); + _scaleMonitor = (ServiceBusScaleMonitor)_listener.GetMonitor(); } @@ -109,11 +130,11 @@ public void GetMetrics_ReturnsExpectedResult() public async Task GetMetrics_HandlesExceptions() { // MessagingEntityNotFoundException - _mockMessagingProvider - .Setup(p => p.CreateBatchMessageReceiver(_entityPath, _testConnection)) + _mockProvider + .Setup(p => p.CreateBatchMessageReceiver(_client, _entityPath)) .Throws(new ServiceBusException("", reason: ServiceBusFailureReason.MessagingEntityNotFound)); - ServiceBusListener listener = new ServiceBusListener(_functionId, EntityType.Queue, _entityPath, false, _mockExecutor.Object, _serviceBusOptions, - _mockServiceBusAccount.Object, _mockMessagingProvider.Object, _loggerFactory, false); + + ServiceBusListener listener = CreateListener(); var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync(); @@ -127,11 +148,10 @@ public async Task GetMetrics_HandlesExceptions() _loggerProvider.ClearAllLogMessages(); // UnauthorizedAccessException - _mockMessagingProvider - .Setup(p => p.CreateBatchMessageReceiver(_entityPath, _testConnection)) + _mockProvider + .Setup(p => p.CreateBatchMessageReceiver(_client, _entityPath)) .Throws(new UnauthorizedAccessException("")); - listener = new ServiceBusListener(_functionId, EntityType.Queue, _entityPath, false, _mockExecutor.Object, _serviceBusOptions, - _mockServiceBusAccount.Object, _mockMessagingProvider.Object, _loggerFactory, false); + listener = CreateListener(); metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync(); @@ -147,11 +167,10 @@ public async Task GetMetrics_HandlesExceptions() _loggerProvider.ClearAllLogMessages(); // Generic Exception - _mockMessagingProvider - .Setup(p => p.CreateBatchMessageReceiver(_entityPath, _testConnection)) + _mockProvider + .Setup(p => p.CreateBatchMessageReceiver(_client, _entityPath)) .Throws(new Exception("Uh oh")); - listener = new ServiceBusListener(_functionId, EntityType.Queue, _entityPath, false, _mockExecutor.Object, _serviceBusOptions, - _mockServiceBusAccount.Object, _mockMessagingProvider.Object, _loggerFactory, false); + listener = CreateListener(); metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync(); @@ -164,6 +183,22 @@ public async Task GetMetrics_HandlesExceptions() Assert.AreEqual($"Error querying for Service Bus queue scale status: Uh oh", warning.FormattedMessage); } + private ServiceBusListener CreateListener() + { + return new ServiceBusListener( + _functionId, + EntityType.Queue, + _entityPath, + false, + _mockExecutor.Object, + _serviceBusOptions, + _connection, + _mockProvider.Object, + _loggerFactory, + false, + _mockClientFactory.Object); + } + [Test] public void GetScaleStatus_NoMetrics_ReturnsVote_None() { diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs index e54ecc9b543eb..4d7a0b549a74a 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessageProcessorTests.cs @@ -19,7 +19,7 @@ public MessageProcessorTests() var client = new ServiceBusClient("Endpoint = sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="); var processor = client.CreateProcessor("test-entity"); processor.ProcessErrorAsync += ExceptionReceivedHandler; - _processor = new MessageProcessor(processor); + _processor = new MessageProcessor(processor, client.CreateReceiver("test-entity")); } [Test] diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessagingProviderTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessagingProviderTests.cs index 760e50c8cf407..21baa10580dc6 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessagingProviderTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/MessagingProviderTests.cs @@ -1,72 +1,83 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; +using Moq; using NUnit.Framework; using System; +using System.Collections.Generic; namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests { public class MessagingProviderTests { + private static string _defaultConnection = "Endpoint=sb://default.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; + private static ServiceBusClient _client = new(_defaultConnection); + [Test] public void CreateMessageReceiver_ReturnsExpectedReceiver() { - string defaultConnection = "Endpoint=sb://default.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; - var config = new ServiceBusOptions - { - ConnectionString = defaultConnection - }; - var provider = new MessagingProvider(new OptionsWrapper(config)); - var receiver = provider.CreateBatchMessageReceiver("entityPath", defaultConnection); + var options = new ServiceBusOptions(); + var configuration = CreateConfiguration(new KeyValuePair("connection", _defaultConnection)); + + var provider = new MessagingProvider(new OptionsWrapper(options)); + + var receiver = provider.CreateBatchMessageReceiver(_client, "entityPath"); Assert.AreEqual("entityPath", receiver.EntityPath); - var receiver2 = provider.CreateBatchMessageReceiver("entityPath", defaultConnection); + var receiver2 = provider.CreateBatchMessageReceiver(_client, "entityPath"); Assert.AreSame(receiver, receiver2); - config.PrefetchCount = 100; - receiver = provider.CreateBatchMessageReceiver("entityPath1", defaultConnection); + options.PrefetchCount = 100; + receiver = provider.CreateBatchMessageReceiver(_client, "entityPath1"); Assert.AreEqual(100, receiver.PrefetchCount); } [Test] public void CreateProcessor_ReturnsExpectedProcessor() { - string defaultConnection = "Endpoint=sb://default.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; - var config = new ServiceBusOptions - { - ConnectionString = defaultConnection - }; - var provider = new MessagingProvider(new OptionsWrapper(config)); - var processor = provider.CreateProcessor("entityPath", defaultConnection); + var options = new ServiceBusOptions(); + var configuration = CreateConfiguration(new KeyValuePair("connection", _defaultConnection)); + var provider = new MessagingProvider(new OptionsWrapper(options)); + + var processor = provider.CreateProcessor(_client, "entityPath"); Assert.AreEqual("entityPath", processor.EntityPath); - var processor2 = provider.CreateProcessor("entityPath", defaultConnection); - Assert.AreSame(processor, processor2); + var processor2 = provider.CreateProcessor(_client, "entityPath"); + Assert.AreNotSame(processor, processor2); - config.PrefetchCount = 100; - config.MaxConcurrentCalls = 5; - config.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(30); - processor = provider.CreateProcessor("entityPath1", defaultConnection); - Assert.AreEqual(config.PrefetchCount, processor.PrefetchCount); - Assert.AreEqual(config.MaxConcurrentCalls, processor.MaxConcurrentCalls); - Assert.AreEqual(config.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); + options.PrefetchCount = 100; + options.MaxConcurrentCalls = 5; + options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(30); + processor = provider.CreateProcessor(_client, "entityPath1"); + Assert.AreEqual(options.PrefetchCount, processor.PrefetchCount); + Assert.AreEqual(options.MaxConcurrentCalls, processor.MaxConcurrentCalls); + Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); } [Test] public void CreateMessageSender_ReturnsExpectedSender() { string defaultConnection = "Endpoint=sb://default.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; - var config = new ServiceBusOptions - { - ConnectionString = defaultConnection - }; - var provider = new MessagingProvider(new OptionsWrapper(config)); - var sender = provider.CreateMessageSender("entityPath", defaultConnection); + var options = new ServiceBusOptions(); + + var configuration = CreateConfiguration(new KeyValuePair("connection", defaultConnection)); + var provider = new MessagingProvider(new OptionsWrapper(options)); + + var sender = provider.CreateMessageSender(_client, "entityPath"); Assert.AreEqual("entityPath", sender.EntityPath); - var sender2 = provider.CreateMessageSender("entityPath", defaultConnection); + var sender2 = provider.CreateMessageSender(_client, "entityPath"); Assert.AreSame(sender, sender2); } + + private IConfiguration CreateConfiguration(params KeyValuePair[] data) + { + return new ConfigurationBuilder().AddInMemoryCollection(data).Build(); + } } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusAccountTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusAccountTests.cs deleted file mode 100644 index 11eac79b5b643..0000000000000 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusAccountTests.cs +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using Microsoft.Extensions.Configuration; -using NUnit.Framework; - -namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests -{ - public class ServiceBusAccountTests - { - private readonly IConfiguration _configuration; - - public ServiceBusAccountTests() - { - _configuration = new ConfigurationBuilder() - .AddEnvironmentVariables() - .Build(); - } - - [Test] - public void GetConnectionString_ReturnsExpectedConnectionString() - { - string defaultConnection = "Endpoint=sb://default.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; - var options = new ServiceBusOptions() - { - ConnectionString = defaultConnection - }; - var attribute = new ServiceBusTriggerAttribute("entity-name"); - var account = new ServiceBusAccount(options, _configuration, attribute); - - Assert.True(defaultConnection == account.ConnectionString); - } - - [Test] - public void GetConnectionString_ThrowsIfConnectionStringNullOrEmpty() - { - var config = new ServiceBusOptions(); - var attribute = new ServiceBusTriggerAttribute("testqueue"); - attribute.Connection = "MissingConnection"; - - var ex = Assert.Throws(() => - { - var account = new ServiceBusAccount(config, _configuration, attribute); - var cs = account.ConnectionString; - }); - Assert.AreEqual("Microsoft Azure WebJobs SDK ServiceBus connection string 'MissingConnection' is missing or empty.", ex.Message); - - attribute.Connection = null; - config.ConnectionString = null; - ex = Assert.Throws(() => - { - var account = new ServiceBusAccount(config, _configuration, attribute); - var cs = account.ConnectionString; - }); - Assert.AreEqual("Microsoft Azure WebJobs SDK ServiceBus connection string 'AzureWebJobsServiceBus' is missing or empty.", ex.Message); - } - } -} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs index 84d7a70ce2718..6bbcc47221356 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs @@ -13,6 +13,8 @@ using Azure.Messaging.ServiceBus.Tests; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.WebJobs.ServiceBus; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -27,11 +29,7 @@ public class ServiceBusEndToEndTests : WebJobsServiceBusTestBase private const string DrainingQueueMessageBody = "queue-message-draining-no-sessions-1"; private const string DrainingTopicMessageBody = "topic-message-draining-no-sessions-1"; - private static EventWaitHandle _topicSubscriptionCalled1; - private static EventWaitHandle _topicSubscriptionCalled2; private static EventWaitHandle _eventWait; - private static EventWaitHandle _drainValidationPreDelay; - private static EventWaitHandle _drainValidationPostDelay; // These two variables will be checked at the end of the test private static string _resultMessage1; @@ -52,6 +50,16 @@ public async Task ServiceBusEndToEnd() } } + [Test] + public async Task ServiceBusEndToEndTokenCredential() + { + var (jobHost, host) = BuildHost(startHost: false, useTokenCredential: true); + using (jobHost) + { + await ServiceBusEndToEndInternal(host); + } + } + [Test] public async Task ServiceBusBinderTest() { @@ -67,6 +75,7 @@ public async Task ServiceBusBinderTest() var count = await CleanUpEntity(_firstQueueScope.QueueName); Assert.AreEqual(numMessages * 3, count); + await jobHost.StopAsync(); } } @@ -116,6 +125,7 @@ public async Task CustomMessageProcessorTest() IEnumerable messages = loggerProvider.GetAllLogMessages().Where(m => m.Category == CustomMessagingProvider.CustomMessagingCategory); Assert.AreEqual(4, messages.Count(p => p.FormattedMessage.Contains("Custom processor Begin called!"))); Assert.AreEqual(4, messages.Count(p => p.FormattedMessage.Contains("Custom processor End called!"))); + await jobHost.StopAsync(); } } @@ -134,14 +144,12 @@ await WriteQueueMessage( connectionString: ServiceBusTestEnvironment.Instance.ServiceBusSecondaryNamespaceConnectionString, queueName: _secondaryNamespaceQueueScope.QueueName); - _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); - _topicSubscriptionCalled2 = new ManualResetEvent(initialState: false); - _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); _topicSubscriptionCalled2.WaitOne(SBTimeoutMills); // ensure all logs have had a chance to flush await Task.Delay(3000); + await jobHost.StopAsync(); } Assert.AreEqual("Test-topic-1", _resultMessage1); @@ -185,6 +193,7 @@ public async Task BindToPoco() var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage).ToList(); Assert.Contains("PocoValues(foo,bar)", logs); + await jobHost.StopAsync(); } } @@ -202,6 +211,7 @@ public async Task BindToString() var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage).ToList(); Assert.Contains("Input(foobar)", logs); + await jobHost.StopAsync(); } } @@ -229,6 +239,12 @@ public async Task MessageDrainingTopicBatch() await TestMultipleDrainMode(false); } + [Test] + public async Task MultipleFunctionsBindingToSameEntity() + { + await TestMultiple(); + } + /* * Helper functions */ @@ -239,9 +255,6 @@ private async Task TestSingleDrainMode(bool sendToQueue) using (jobHost) { - _drainValidationPreDelay = new ManualResetEvent(initialState: false); - _drainValidationPostDelay = new ManualResetEvent(initialState: false); - if (sendToQueue) { await WriteQueueMessage(DrainingQueueMessageBody); @@ -260,6 +273,7 @@ private async Task TestSingleDrainMode(bool sendToQueue) // Validate that function execution was allowed to complete Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + await jobHost.StopAsync(); } } @@ -278,7 +292,7 @@ private static Action BuildDrainHost() private async Task TestMultiple(bool isXml = false) { - var (jobHost, _) = BuildHost(); + var (jobHost, host) = BuildHost(); using (jobHost) { if (isXml) @@ -292,10 +306,9 @@ private async Task TestMultiple(bool isXml = false) await WriteQueueMessage("{'Name': 'Test2', 'Value': 'Value'}"); } - _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); - bool result = _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); Assert.True(result); + await jobHost.StopAsync(); } } @@ -304,9 +317,6 @@ private async Task TestMultipleDrainMode(bool sendToQueue) var (jobHost, host) = BuildHost(BuildDrainHost()); using (jobHost) { - _drainValidationPreDelay = new ManualResetEvent(initialState: false); - _drainValidationPostDelay = new ManualResetEvent(initialState: false); - if (sendToQueue) { await WriteQueueMessage(DrainingQueueMessageBody); @@ -325,6 +335,7 @@ private async Task TestMultipleDrainMode(bool sendToQueue) // Validate that function execution was allowed to complete Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + await jobHost.StopAsync(); } } @@ -334,9 +345,6 @@ private async Task ServiceBusEndToEndInternal(IHost host) await WriteQueueMessage("E2E"); - _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); - _topicSubscriptionCalled2 = new ManualResetEvent(initialState: false); - await host.StartAsync(); _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); @@ -354,11 +362,11 @@ private async Task ServiceBusEndToEndInternal(IHost host) IEnumerable logMessages = host.GetTestLoggerProvider() .GetAllLogMessages(); + Assert.False(logMessages.Where(p => p.Level == LogLevel.Error).Any()); + // filter out anything from the custom processor for easier validation. IEnumerable consoleOutput = logMessages - .Where(m => m.Category != CustomMessagingProvider.CustomMessagingCategory); - - Assert.False(consoleOutput.Where(p => p.Level == LogLevel.Error).Any()); + .Where(m => m.Category != CustomMessagingProvider.CustomMessagingCategory); string[] consoleOutputLines = consoleOutput .Where(p => p.FormattedMessage != null) @@ -577,8 +585,8 @@ public static async Task ServiceBusBinderTest( public class ServiceBusMultipleTestJobsBase { - protected static bool firstReceived = false; - protected static bool secondReceived = false; + protected static volatile bool firstReceived = false; + protected static volatile bool secondReceived = false; public static void ProcessMessages(string[] messages) { @@ -593,6 +601,9 @@ public static void ProcessMessages(string[] messages) if (firstReceived && secondReceived) { + // reset for the next test + firstReceived = false; + secondReceived = false; _topicSubscriptionCalled1.Set(); } } @@ -740,6 +751,21 @@ public static async Task TopicNoSessionsBatch( } } + public class ServiceBusSingleMessageTestJob_BindMultipleFunctionsToSameEntity + { + public static void SBQueueFunction( + [ServiceBusTrigger(FirstQueueNameKey)] string message) + { + ServiceBusMultipleTestJobsBase.ProcessMessages(new string[] { message }); + } + + public static void SBQueueFunction2( + [ServiceBusTrigger(FirstQueueNameKey)] string message) + { + ServiceBusMultipleTestJobsBase.ProcessMessages(new string[] { message }); + } + } + public class DrainModeHelper { public static async Task WaitForCancellation(CancellationToken cancellationToken) @@ -761,14 +787,16 @@ private class CustomMessagingProvider : MessagingProvider private readonly ILogger _logger; private readonly ServiceBusOptions _options; - public CustomMessagingProvider(IOptions serviceBusOptions, ILoggerFactory loggerFactory) + public CustomMessagingProvider( + IOptions serviceBusOptions, + ILoggerFactory loggerFactory) : base(serviceBusOptions) { _options = serviceBusOptions.Value; _logger = loggerFactory?.CreateLogger(CustomMessagingCategory); } - public override MessageProcessor CreateMessageProcessor(string entityPath, string connectionString) + public override MessageProcessor CreateMessageProcessor(ServiceBusClient client, string entityPath) { var options = new ServiceBusProcessorOptions() { @@ -776,20 +804,20 @@ public override MessageProcessor CreateMessageProcessor(string entityPath, strin MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(MaxAutoRenewDurationMin) }; - var client = base.CreateClient(connectionString); var processor = client.CreateProcessor(entityPath, options); + var receiver = client.CreateReceiver(entityPath); // TODO decide whether it makes sense to still default error handler when there is a custom provider // currently user needs to set it. processor.ProcessErrorAsync += args => Task.CompletedTask; - return new CustomMessageProcessor(processor, _logger); + return new CustomMessageProcessor(processor, receiver, _logger); } private class CustomMessageProcessor : MessageProcessor { private readonly ILogger _logger; - public CustomMessageProcessor(ServiceBusProcessor processor, ILogger logger) - : base(processor) + public CustomMessageProcessor(ServiceBusProcessor processor, ServiceBusReceiver receiver, ILogger logger) + : base(processor, receiver) { _logger = logger; } @@ -808,12 +836,12 @@ public override async Task CompleteProcessingMessageAsync(ServiceBusMessageActio } } } +} #pragma warning disable SA1402 // File may only contain a single type - public class TestPoco +public class TestPoco #pragma warning restore SA1402 // File may only contain a single type - { - public string Name { get; set; } - public string Value { get; set; } - } +{ + public string Name { get; set; } + public string Value { get; set; } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs index b973d9156c27a..4a184035c9236 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusSessionsEndToEndTests.cs @@ -9,6 +9,7 @@ using Azure.Messaging.ServiceBus; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.WebJobs.ServiceBus; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -22,10 +23,6 @@ public class ServiceBusSessionsEndToEndTests : WebJobsServiceBusTestBase private const string _drainModeSessionId = "drain-session"; private const string DrainingQueueMessageBody = "queue-message-draining-with-sessions-1"; private const string DrainingTopicMessageBody = "topic-message-draining-with-sessions-1"; - private static EventWaitHandle _waitHandle1; - private static EventWaitHandle _waitHandle2; - private static EventWaitHandle _drainValidationPreDelay; - private static EventWaitHandle _drainValidationPostDelay; public ServiceBusSessionsEndToEndTests() : base(isSession: true) { @@ -37,8 +34,6 @@ public async Task ServiceBusSessionQueue_OrderGuaranteed() var (jobHost, host) = BuildSessionHost(); using (jobHost) { - _waitHandle1 = new ManualResetEvent(initialState: false); - await WriteQueueMessage("message1", "test-session1"); await WriteQueueMessage("message2", "test-session1"); await WriteQueueMessage("message3", "test-session1"); @@ -48,6 +43,7 @@ public async Task ServiceBusSessionQueue_OrderGuaranteed() Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); IEnumerable logMessages = host.GetTestLoggerProvider().GetAllLogMessages(); + Assert.False(logMessages.Where(p => p.Level == LogLevel.Error).Any()); // filter out anything from the custom processor for easier validation. List consoleOutput = logMessages.Where(m => m.Category == "Function.SBQueue1Trigger.User").ToList(); @@ -59,6 +55,7 @@ public async Task ServiceBusSessionQueue_OrderGuaranteed() { StringAssert.StartsWith("message" + i++, logMessage.FormattedMessage); } + await jobHost.StopAsync(); } } @@ -68,8 +65,6 @@ public async Task ServiceBusSessionTopicSubscription_OrderGuaranteed() var (jobHost, host) = BuildSessionHost(); using (jobHost) { - _waitHandle1 = new ManualResetEvent(initialState: false); - await WriteTopicMessage("message1", "test-session1"); await WriteTopicMessage("message2", "test-session1"); await WriteTopicMessage("message3", "test-session1"); @@ -79,6 +74,7 @@ public async Task ServiceBusSessionTopicSubscription_OrderGuaranteed() Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); IEnumerable logMessages = host.GetTestLoggerProvider().GetAllLogMessages(); + Assert.False(logMessages.Where(p => p.Level == LogLevel.Error).Any()); // filter out anything from the custom processor for easier validation. List consoleOutput = logMessages.Where(m => m.Category == "Function.SBSub1Trigger.User").ToList(); @@ -90,6 +86,7 @@ public async Task ServiceBusSessionTopicSubscription_OrderGuaranteed() { StringAssert.StartsWith("message" + i++, logMessage.FormattedMessage); } + await jobHost.StopAsync(); } } @@ -101,9 +98,6 @@ public async Task ServiceBusSessionQueue_DifferentHosts_DifferentSessions() using (jobHost1) using (jobHost2) { - _waitHandle1 = new ManualResetEvent(initialState: false); - _waitHandle2 = new ManualResetEvent(initialState: false); - await WriteQueueMessage("message1", "test-session1"); await WriteQueueMessage("message1", "test-session2"); @@ -141,6 +135,8 @@ public async Task ServiceBusSessionQueue_DifferentHosts_DifferentSessions() { Assert.AreEqual(sessionId2, m.FormattedMessage[m.FormattedMessage.Length - 1]); } + await jobHost1.StopAsync(); + await jobHost2.StopAsync(); } } @@ -152,9 +148,6 @@ public async Task ServiceBusSessionSub_DifferentHosts_DifferentSessions() using (jobHost1) using (jobHost2) { - _waitHandle1 = new ManualResetEvent(initialState: false); - _waitHandle2 = new ManualResetEvent(initialState: false); - await WriteTopicMessage("message1", "test-session1"); await WriteTopicMessage("message1", "test-session2"); @@ -193,6 +186,8 @@ public async Task ServiceBusSessionSub_DifferentHosts_DifferentSessions() { Assert.AreEqual(sessionId2, m.FormattedMessage[m.FormattedMessage.Length - 1]); } + await jobHost1.StopAsync(); + await jobHost2.StopAsync(); } } @@ -202,9 +197,6 @@ public async Task ServiceBusSessionQueue_SessionLocks() var (jobHost, host) = BuildSessionHost(addCustomProvider: true); using (jobHost) { - _waitHandle1 = new ManualResetEvent(initialState: false); - _waitHandle2 = new ManualResetEvent(initialState: false); - await WriteQueueMessage("message1", "test-session1"); await WriteQueueMessage("message1", "test-session2"); @@ -224,6 +216,7 @@ public async Task ServiceBusSessionQueue_SessionLocks() Assert.True(_waitHandle2.WaitOne(SBTimeoutMills)); IEnumerable logMessages1 = host.GetTestLoggerProvider().GetAllLogMessages(); + Assert.False(logMessages1.Where(p => p.Level == LogLevel.Error).Any()); // filter out anything from the custom processor for easier validation. List consoleOutput1 = logMessages1.Where(m => m.Category == "Function.SBQueue1Trigger.User").ToList(); @@ -243,6 +236,7 @@ public async Task ServiceBusSessionQueue_SessionLocks() consoleOutput1[5].FormattedMessage[consoleOutput1[0].FormattedMessage.Length - 1]); } } + await jobHost.StopAsync(); } } @@ -252,9 +246,6 @@ public async Task ServiceBusSessionSub_SessionLocks() var (jobHost, host) = BuildSessionHost(addCustomProvider: true); using (jobHost) { - _waitHandle1 = new ManualResetEvent(initialState: false); - _waitHandle2 = new ManualResetEvent(initialState: false); - await WriteTopicMessage("message1", "test-session1"); await WriteTopicMessage("message1", "test-session2"); @@ -274,6 +265,7 @@ public async Task ServiceBusSessionSub_SessionLocks() Assert.True(_waitHandle2.WaitOne(SBTimeoutMills)); IEnumerable logMessages1 = host.GetTestLoggerProvider().GetAllLogMessages(); + Assert.False(logMessages1.Where(p => p.Level == LogLevel.Error).Any()); // filter out anything from the custom processor for easier validation. List consoleOutput1 = logMessages1.Where(m => m.Category == "Function.SBSub1Trigger.User").ToList(); @@ -293,6 +285,7 @@ public async Task ServiceBusSessionSub_SessionLocks() consoleOutput1[5].FormattedMessage[consoleOutput1[0].FormattedMessage.Length - 1]); } } + await jobHost.StopAsync(); } } @@ -344,6 +337,12 @@ public async Task MessageDraining_TopicWithSessions_Batch() await TestMultipleDrainMode(false); } + [Test] + public async Task MultipleFunctionsBindingToSameEntity() + { + await TestMultiple(); + } + /* * Helper functions */ @@ -371,9 +370,6 @@ public async Task MessageDraining_TopicWithSessions_Batch() private async Task TestSingleDrainMode(bool sendToQueue) { - _drainValidationPreDelay = new ManualResetEvent(initialState: false); - _drainValidationPostDelay = new ManualResetEvent(initialState: false); - if (sendToQueue) { await WriteQueueMessage(DrainingQueueMessageBody, _drainModeSessionId); @@ -394,13 +390,12 @@ private async Task TestSingleDrainMode(bool sendToQueue) // Validate that function execution was allowed to complete Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + await jobHost.StopAsync(); } } private async Task TestMultiple(bool isXml = false) { - _waitHandle1 = new ManualResetEvent(initialState: false); - if (isXml) { await WriteQueueMessage(new TestPoco() { Name = "Test1" }, "sessionId"); @@ -416,14 +411,12 @@ private async Task TestMultiple(bool isXml = false) { bool result = _waitHandle1.WaitOne(SBTimeoutMills); Assert.True(result); + await jobHost.StopAsync(); } } private async Task TestMultipleDrainMode(bool sendToQueue) { - _drainValidationPreDelay = new ManualResetEvent(initialState: false); - _drainValidationPostDelay = new ManualResetEvent(initialState: false); - if (sendToQueue) { await WriteQueueMessage(DrainingQueueMessageBody, _drainModeSessionId); @@ -444,6 +437,7 @@ private async Task TestMultipleDrainMode(bool sendToQueue) // Validate that function execution was allowed to complete Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + await jobHost.StopAsync(); } } @@ -588,10 +582,10 @@ public static async Task WaitForCancellation(CancellationToken cancellationToken public class ServiceBusMultipleTestJobsBase { - protected static bool firstReceived = false; - protected static bool secondReceived = false; + protected static volatile bool firstReceived = false; + protected static volatile bool secondReceived = false; - public static void ProcessMessages(string[] messages, EventWaitHandle waitHandle = null) + public static void ProcessMessages(string[] messages) { if (messages.Contains("{'Name': 'Test1', 'Value': 'Value'}")) { @@ -604,7 +598,10 @@ public static void ProcessMessages(string[] messages, EventWaitHandle waitHandle if (firstReceived && secondReceived) { - bool b = (waitHandle != null) ? waitHandle.Set() : _waitHandle1.Set(); + // reset for the next test + firstReceived = false; + secondReceived = false; + _waitHandle1.Set(); } } } @@ -648,25 +645,40 @@ public static void SBQueue2SBQueue( } } + public class ServiceBusSingleMessageTestJob_BindMultipleFunctionsToSameEntity + { + public static void SBQueueFunction( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] string message) + { + ServiceBusMultipleTestJobsBase.ProcessMessages(new string[] { message }); + } + + public static void SBQueueFunction2( + [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] string message) + { + ServiceBusMultipleTestJobsBase.ProcessMessages(new string[] { message }); + } + } + public class CustomMessagingProvider : MessagingProvider { public const string CustomMessagingCategory = "CustomMessagingProvider"; private readonly ILogger _logger; private readonly ServiceBusOptions _options; - public CustomMessagingProvider(IOptions serviceBusOptions, ILoggerFactory loggerFactory) + public CustomMessagingProvider( + IOptions serviceBusOptions, + ILoggerFactory loggerFactory) : base(serviceBusOptions) { _options = serviceBusOptions.Value; - //_options.SessionProcessorOptions.MessageWaitTimeout = TimeSpan.FromSeconds(90); - _options.RetryOptions.TryTimeout = TimeSpan.FromSeconds(90); + _options.SessionIdleTimeout = TimeSpan.FromSeconds(90); _options.MaxConcurrentSessions = 1; _logger = loggerFactory?.CreateLogger(CustomMessagingCategory); } - public override SessionMessageProcessor CreateSessionMessageProcessor(string entityPath, string connectionString) + public override SessionMessageProcessor CreateSessionMessageProcessor(ServiceBusClient client, string entityPath) { - var client = new ServiceBusClient(connectionString, _options.ToClientOptions()); ServiceBusSessionProcessor processor; if (entityPath == _firstQueueScope.QueueName) { @@ -678,15 +690,15 @@ public override SessionMessageProcessor CreateSessionMessageProcessor(string ent processor = client.CreateSessionProcessor(arr[0], arr[2], _options.ToSessionProcessorOptions()); } processor.ProcessErrorAsync += args => Task.CompletedTask; - return new CustomSessionMessageProcessor(processor, _logger); + return new CustomSessionMessageProcessor(client, processor, _logger); } private class CustomSessionMessageProcessor : SessionMessageProcessor { private readonly ILogger _logger; - public CustomSessionMessageProcessor(ServiceBusSessionProcessor sessionProcessor, ILogger logger) - : base(sessionProcessor) + public CustomSessionMessageProcessor(ServiceBusClient client, ServiceBusSessionProcessor sessionProcessor, ILogger logger) + : base(client, sessionProcessor) { _logger = logger; } @@ -704,18 +716,6 @@ public override async Task CompleteProcessingMessageAsync(ServiceBusSessionMessa } } } - - public void Dispose() - { - if (_waitHandle1 != null) - { - _waitHandle1.Dispose(); - } - if (_waitHandle2 != null) - { - _waitHandle2.Dispose(); - } - } } #pragma warning disable SA1402 // File may only contain a single type diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs index 53ca3127ae21e..044f0d64b3071 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.IO; using System.Runtime.Serialization; +using System.Threading; using System.Threading.Tasks; using System.Xml; using Azure.Core.TestFramework; @@ -41,7 +42,7 @@ public class WebJobsServiceBusTestBase protected const string FirstSubscriptionNameKey = "%" + _firstSubscriptionNameKey + "%"; private const string _secondSubscriptionNameKey = "webjobstestsubscription2"; - protected const string SecondSubscriptionNameKey = "%" + _secondSubscriptionNameKey + "%"; + protected const string SecondSubscriptionNameKey = "%" + _secondSubscriptionNameKey + "%"; private const string _secondaryNamespaceQueueKey = "webjobtestsecondarynamespacequeue"; protected const string SecondaryNamespaceQueueNameKey = "%" + _secondaryNamespaceQueueKey + "%"; @@ -62,6 +63,12 @@ public class WebJobsServiceBusTestBase protected static TopicScope _topicScope; private readonly bool _isSession; + protected static EventWaitHandle _topicSubscriptionCalled1; + protected static EventWaitHandle _topicSubscriptionCalled2; + protected static EventWaitHandle _waitHandle1; + protected static EventWaitHandle _waitHandle2; + protected static EventWaitHandle _drainValidationPreDelay; + protected static EventWaitHandle _drainValidationPostDelay; protected WebJobsServiceBusTestBase(bool isSession) { @@ -87,6 +94,12 @@ public async Task FixtureSetUp() enablePartitioning: false, enableSession: _isSession, overrideNamespace: ServiceBusTestEnvironment.Instance.ServiceBusSecondaryNamespace); + _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); + _topicSubscriptionCalled2 = new ManualResetEvent(initialState: false); + _waitHandle1 = new ManualResetEvent(initialState: false); + _waitHandle2 = new ManualResetEvent(initialState: false); + _drainValidationPreDelay = new ManualResetEvent(initialState: false); + _drainValidationPostDelay = new ManualResetEvent(initialState: false); } /// @@ -104,23 +117,37 @@ public async Task FixtureTearDown() await _topicScope.DisposeAsync(); } - protected (JobHost JobHost, IHost Host) BuildHost(Action configurationDelegate = null, bool startHost = true) + protected (JobHost JobHost, IHost Host) BuildHost( + Action configurationDelegate = null, + bool startHost = true, + bool useTokenCredential = false) { + var settings = new Dictionary + { + {_firstQueueNameKey, _firstQueueScope.QueueName}, + {_secondQueueNameKey, _secondQueueScope.QueueName}, + {_thirdQueueNameKey, _thirdQueueScope.QueueName}, + {_topicNameKey, _topicScope.TopicName}, + {_firstSubscriptionNameKey, _topicScope.SubscriptionNames[0]}, + {_secondSubscriptionNameKey, _topicScope.SubscriptionNames[1]}, + {_secondaryNamespaceQueueKey, _secondaryNamespaceQueueScope.QueueName}, + {SecondaryConnectionStringKey, ServiceBusTestEnvironment.Instance.ServiceBusSecondaryNamespaceConnectionString} + }; + if (useTokenCredential) + { + settings.Add("AzureWebJobsServiceBus:fullyQualifiedNamespace", ServiceBusTestEnvironment.Instance.FullyQualifiedNamespace); + settings.Add("AzureWebJobsServiceBus:clientId", ServiceBusTestEnvironment.Instance.ClientId); + settings.Add("AzureWebJobsServiceBus:clientSecret", ServiceBusTestEnvironment.Instance.ClientSecret); + settings.Add("AzureWebJobsServiceBus:tenantId", ServiceBusTestEnvironment.Instance.TenantId); + } + else + { + settings.Add("AzureWebJobsServiceBus", ServiceBusTestEnvironment.Instance.ServiceBusConnectionString); + } var hostBuilder = new HostBuilder() .ConfigureAppConfiguration(builder => { - builder.AddInMemoryCollection(new Dictionary - { - {"AzureWebJobsServiceBus", ServiceBusTestEnvironment.Instance.ServiceBusConnectionString}, - {_firstQueueNameKey, _firstQueueScope.QueueName}, - {_secondQueueNameKey, _secondQueueScope.QueueName}, - {_thirdQueueNameKey, _thirdQueueScope.QueueName}, - {_topicNameKey, _topicScope.TopicName}, - {_firstSubscriptionNameKey, _topicScope.SubscriptionNames[0]}, - {_secondSubscriptionNameKey, _topicScope.SubscriptionNames[1]}, - {_secondaryNamespaceQueueKey, _secondaryNamespaceQueueScope.QueueName}, - {SecondaryConnectionStringKey, ServiceBusTestEnvironment.Instance.ServiceBusSecondaryNamespaceConnectionString} - }); + builder.AddInMemoryCollection(settings); }) .ConfigureDefaultTestHost(b => {