From c7b7681b107ef28240dbae93ce4050f4c3d603de Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Tue, 9 Jul 2024 13:32:25 -0400 Subject: [PATCH 1/2] [Service Bus] Enforce batch limits The focus of these changes is to add client-side logic to force the maximum size for batches when large message support is enabled and to enforce the undocumented 4,500 message limit for a single batch. --- .../Azure.Messaging.ServiceBus/CHANGELOG.md | 15 ++++-- .../src/Amqp/AmqpMessageBatch.cs | 22 ++++---- .../src/Amqp/AmqpSender.cs | 41 +++++++++++++-- .../src/Receiver/ServiceBusReceiver.cs | 9 +++- .../src/Receiver/ServiceBusReceiverOptions.cs | 14 ----- .../src/Sender/CreateMessageBatchOptions.cs | 8 +++ .../tests/Amqp/AmqpMessageBatchTests.cs | 52 +++++++++++++++++++ .../Sender/ServiceBusMessageBatchTests.cs | 2 - 8 files changed, 131 insertions(+), 32 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md b/sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md index fb55ddac919fa..acde0cbc240fe 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md @@ -2,16 +2,24 @@ ## 7.18.0-beta.2 (Unreleased) +### Acknowledgments +Thank you to our developer community members who helped to make the Service Bus client library better with their contributions to this release: + +- Martin Costello _([GitHub](https://github.com/martincostello))_ + ### Features Added ### Breaking Changes ### Bugs Fixed -- Fixed an error that caused connection strings using host names without a scheme to fail parsing and be considered invalid. +- Fixed an issue that caused connection strings using host names without a scheme to fail parsing and be considered invalid. + - Fixed an issue where the scheduled enqueue time was not cleared when creating a new message from a received message. -- Fixed an error that prevented relative URIs from being used with [application properties](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties) in the `ServiceBusMessage.ApplicationProperties` and `ServiceBusReceivedMessage.ApplicationProperties` collections. +- Fixed an issue that prevented relative URIs from being used with [application properties](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties) in the `ServiceBusMessage.ApplicationProperties` and `ServiceBusReceivedMessage.ApplicationProperties` collections. + +- Fixed an issue that caused `ServiceBusMessageBatch` to accept more than the allowed 1mb batch limit when sending to Service Bus entities with large message sizes enabled. ### Other Changes @@ -19,7 +27,8 @@ - Updated the `Microsoft.Azure.Amqp` dependency to 2.6.7, which contains a fix for decoding messages with a null format code as the body. -- Instances of `ServiceBusSender` created with no explicit `ServiceBusSenderOptions` value allocate less memory. +- Improved efficiency of subclient creation, reducing allocations when no explicit options are passed. - Fixed deserialization of the lock token to take into account endianness. _(A community contribution, courtesy of [martincostello](https://github.com/martincostello))_ + ## 7.18.0-beta.1 (2024-05-08) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs index f80d00b6b49f5..f8386b1a6fbb1 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs @@ -38,7 +38,7 @@ internal class AmqpMessageBatch : TransportMessageBatch /// well as any overhead for the batch itself when sent to the Queue/Topic. /// /// - public override long MaxSizeInBytes { get; } + public override long MaxSizeInBytes => Options.MaxSizeInBytes.Value; /// /// The size of the batch, in bytes, as it will be sent to the Queue/Topic @@ -74,8 +74,7 @@ internal class AmqpMessageBatch : TransportMessageBatch /// Initializes a new instance of the class. /// /// - /// The converter to use for translating data into - /// an AMQP-specific message. + /// The converter to use for translating data into an AMQP-specific message. /// The set of options to apply to the batch. /// public AmqpMessageBatch(AmqpMessageConverter messageConverter, @@ -86,7 +85,6 @@ public AmqpMessageBatch(AmqpMessageConverter messageConverter, Argument.AssertNotNull(messageConverter, nameof(AmqpMessageConverter)); Options = options; - MaxSizeInBytes = options.MaxSizeInBytes.Value; _messageConverter = messageConverter; } @@ -104,6 +102,15 @@ public override bool TryAddMessage(ServiceBusMessage message) Argument.AssertNotNull(message, nameof(message)); Argument.AssertNotDisposed(_disposed, nameof(ServiceBusMessageBatch)); + // If the batch is full according to the message limit, then + // reject the message. + + if ((Options.MaxMessageCount.HasValue) + && (_batchMessages.Count >= Options.MaxMessageCount)) + { + return false; + } + var amqpMessage = _messageConverter.SBMessageToAmqpMessage(message); long size = 0; @@ -112,17 +119,14 @@ public override bool TryAddMessage(ServiceBusMessage message) // Initialize the size by reserving space for the batch envelope taking into account the properties from the first // message which will be used to populate properties on the batch envelope. - var messageList = new List(); - messageList.Add(amqpMessage); - var reserveOverheadMessage = _messageConverter.BuildAmqpBatchFromMessages(messageList.AsReadOnly(), forceBatch: true); + using var reserveOverheadMessage = + _messageConverter.BuildAmqpBatchFromMessages(new[] { amqpMessage }, forceBatch: true); size = _sizeBytes + reserveOverheadMessage.SerializedMessageSize + (amqpMessage.SerializedMessageSize <= MaximumBytesSmallMessage ? OverheadBytesSmallMessage : OverheadBytesLargeMessage); - - reserveOverheadMessage.Dispose(); } else { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index bec362352b3d0..e5adae73282fe 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -84,6 +84,21 @@ internal class AmqpSender : TransportSender /// private long? MaxMessageSize { get; set; } + /// + /// The maximum size of an AMQP message batch allowed by the associated + /// sender link. + /// + /// + /// The maximum message batch size, in bytes. + /// + private long? MaxBatchSize { get; set; } + + /// + /// The maximum number of messages to allow in a single batch. + /// + /// + internal int MaxMessageCount { get; set; } + /// /// Initializes a new instance of the class. /// @@ -114,6 +129,18 @@ public AmqpSender( Argument.AssertNotNull(connectionScope, nameof(connectionScope)); Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); + // NOTE: + // This is a temporary work-around until Service Bus exposes a link property for + // the maximum batch size. The limit for batches differs from the limit for individual + // messages. Tracked by: https://github.com/Azure/azure-sdk-for-net/issues/44914 + MaxBatchSize = 1048576; + + // NOTE: + // This is a temporary work-around until Service Bus exposes a link property for + // the maximum batch size. The limit for batches differs from the limit for individual + // messages. Tracked by: https://github.com/Azure/azure-sdk-for-net/issues/44916 + MaxMessageCount = 4500; + _entityPath = entityPath; Identifier = identifier; _retryPolicy = retryPolicy; @@ -180,7 +207,7 @@ internal async ValueTask CreateMessageBatchInternalAsync( // Ensure that maximum message size has been determined; this depends on the underlying // AMQP link, so if not set, requesting the link will ensure that it is populated. - if (!MaxMessageSize.HasValue) + if ((!MaxMessageSize.HasValue) || (!MaxBatchSize.HasValue)) { await _sendLink.GetOrCreateAsync(timeout).ConfigureAwait(false); } @@ -188,9 +215,10 @@ internal async ValueTask CreateMessageBatchInternalAsync( // Ensure that there was a maximum size populated; if none was provided, // default to the maximum size allowed by the link. - options.MaxSizeInBytes ??= MaxMessageSize; + options.MaxSizeInBytes ??= MaxBatchSize; + options.MaxMessageCount ??= MaxMessageCount; - Argument.AssertInRange(options.MaxSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaxMessageSize.Value, nameof(options.MaxSizeInBytes)); + Argument.AssertInRange(options.MaxSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaxBatchSize.Value, nameof(options.MaxSizeInBytes)); return new AmqpMessageBatch(_messageConverter, options); } @@ -616,6 +644,13 @@ protected virtual async Task CreateLinkAndEnsureSenderStateAsyn await Task.Delay(15, cancellationToken).ConfigureAwait(false); MaxMessageSize = (long)link.Settings.MaxMessageSize; + // Update with service metadata when available: + // https://github.com/Azure/azure-sdk-for-net/issues/44914 + // https://github.com/Azure/azure-sdk-for-net/issues/44916 + + MaxBatchSize = Math.Min(MaxMessageSize.Value, MaxBatchSize.Value); + MaxMessageSize = MaxMessageSize; + ServiceBusEventSource.Log.CreateSendLinkComplete(Identifier); link.Closed += OnSenderLinkClosed; return link; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs index 63b2351b29f6a..845365faccc6a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs @@ -37,6 +37,9 @@ public class ServiceBusReceiver : IAsyncDisposable /// The maximum number of messages to delete in a single batch. This cap is established and enforced by the service. internal const int MaxDeleteMessageCount = 4000; + /// The set of default options to use for initialization when no explicit options were provided. + private static ServiceBusReceiverOptions s_defaultOptions; + /// /// The fully qualified Service Bus namespace that the receiver is associated with. This is likely /// to be similar to {yournamespace}.servicebus.windows.net. @@ -177,7 +180,11 @@ internal ServiceBusReceiver( Argument.AssertNotNullOrWhiteSpace(entityPath, nameof(entityPath)); connection.ThrowIfClosed(); - options = options?.Clone() ?? new ServiceBusReceiverOptions(); + // If no explicit options were provided, use the default set, creating them as needed. There is + // a benign race condition here where multiple sets of default options may be created when initializing. + // The cost of hitting the race is lower than the cost of synchronizing each access. + options ??= s_defaultOptions ??= new ServiceBusReceiverOptions(); + Identifier = string.IsNullOrEmpty(options.Identifier) ? DiagnosticUtilities.GenerateIdentifier(entityPath) : options.Identifier; _connection = connection; _retryPolicy = connection.RetryOptions.ToRetryPolicy(); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverOptions.cs index 418eac0e58451..4a9f4133ec3b4 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverOptions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverOptions.cs @@ -78,19 +78,5 @@ public int PrefetchCount /// [EditorBrowsable(EditorBrowsableState.Never)] public override string ToString() => base.ToString(); - - /// - /// Creates a new copy of the current , cloning its attributes into a new instance. - /// - /// - /// A new copy of . - internal ServiceBusReceiverOptions Clone() => - new ServiceBusReceiverOptions - { - ReceiveMode = ReceiveMode, - PrefetchCount = PrefetchCount, - SubQueue = SubQueue, - Identifier = Identifier - }; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateMessageBatchOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateMessageBatchOptions.cs index 7b996e3004292..f0760013c46ad 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateMessageBatchOptions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/CreateMessageBatchOptions.cs @@ -17,6 +17,12 @@ public class CreateMessageBatchOptions /// The requested maximum size to allow for the batch, in bytes. private long? _maxSizeInBytes; + /// + /// The maximum number of messages to allow in a single batch. + /// + /// + internal int? MaxMessageCount { get; set; } + /// /// The maximum size to allow for a single batch of messages, in bytes. /// @@ -25,9 +31,11 @@ public class CreateMessageBatchOptions /// The desired limit, in bytes, for the size of the associated service bus message batch. If null, /// the maximum size allowed by the active transport will be used. /// + /// /// /// A negative value is attempted to be set for the property. /// + /// public long? MaxSizeInBytes { get => _maxSizeInBytes; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs index b589e588c4b1c..a4dddd7a542a6 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs @@ -199,6 +199,58 @@ public void TryAddAcceptMessagesUntilTheMaximumSizeIsReached() } } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void TryAdRespectsTheMaximumMessageCount() + { + var maximumCount = 5; + var currentIndex = -1; + var messages = new AmqpMessage[maximumCount + 1]; + var mockMessage = new Mock(); + + var mockMessageConverter = new InjectableMockConverter + { + BuildBatchFromAmqpMessagesHandler = (_s) => mockMessage.Object, + BuildAmqpMessageFromSBMessageHandler = (_s) => messages[++currentIndex] + }; + + mockMessage + .Setup(message => message.SerializedMessageSize) + .Returns(40); + + var options = new CreateMessageBatchOptions + { + MaxSizeInBytes = 50000, + MaxMessageCount = maximumCount + }; + + for (var index = 0; index < messages.Length; ++index) + { + var size = 40; + var messageToAdd = new Mock(); + messageToAdd.Setup(messageToAdd => messageToAdd.SerializedMessageSize).Returns(size); + messages[index] = messageToAdd.Object; + } + + var batch = new AmqpMessageBatch(mockMessageConverter, options); + + for (var index = 0; index < messages.Length; ++index) + { + if (index == messages.Length - 1) + { + Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[10])), Is.False, "The final addition should not fit in the available space."); + } + else + { + Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[10])), Is.True, $"The addition for index: { index } should fit and be accepted."); + } + } + } + /// /// Verifies functionality of the /// method. diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs index 06a46ca45437a..24bc4a6d3d5ba 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs @@ -7,8 +7,6 @@ using Azure.Core.Shared; using Azure.Messaging.ServiceBus.Amqp; using Azure.Messaging.ServiceBus.Core; -using Azure.Messaging.ServiceBus.Diagnostics; -using Moq; using NUnit.Framework; namespace Azure.Messaging.ServiceBus.Tests.Sender From f4d07e5d4866aed8d31d447d7ab9dbb732da74cd Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Wed, 10 Jul 2024 11:20:20 -0400 Subject: [PATCH 2/2] Moving sender message count limit to private --- .../Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index e5adae73282fe..ec56ccb2fa0f1 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -97,7 +97,7 @@ internal class AmqpSender : TransportSender /// The maximum number of messages to allow in a single batch. /// /// - internal int MaxMessageCount { get; set; } + private int MaxMessageCount { get; set; } /// /// Initializes a new instance of the class.