Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Enforce batch limits #44917

Merged
merged 2 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,33 @@

## 7.18.0-beta.2 (Unreleased)

### Acknowledgments
Thank you to our developer community members who helped to make the Service Bus client library better with their contributions to this release:

- Martin Costello _([GitHub](https://github.com/martincostello))_

### Features Added

### Breaking Changes

### Bugs Fixed

- Fixed an error that caused connection strings using host names without a scheme to fail parsing and be considered invalid.
- Fixed an issue that caused connection strings using host names without a scheme to fail parsing and be considered invalid.

- Fixed an issue where the scheduled enqueue time was not cleared when creating a new message from a received message.

- Fixed an error that prevented relative URIs from being used with [application properties](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties) in the `ServiceBusMessage.ApplicationProperties` and `ServiceBusReceivedMessage.ApplicationProperties` collections.
- Fixed an issue that prevented relative URIs from being used with [application properties](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties) in the `ServiceBusMessage.ApplicationProperties` and `ServiceBusReceivedMessage.ApplicationProperties` collections.

- Fixed an issue that caused `ServiceBusMessageBatch` to accept more than the allowed 1mb batch limit when sending to Service Bus entities with large message sizes enabled.

### Other Changes

- The client will now refresh the maximum message size each time a new AMQP link is opened; this is necessary for large message support, where the maximum message size for entities can be reconfigureed adjusted on the fly. Because the client had cached the value, it would not be aware of the change and would enforce the wrong size for batch creation.

- Updated the `Microsoft.Azure.Amqp` dependency to 2.6.7, which contains a fix for decoding messages with a null format code as the body.

- Instances of `ServiceBusSender` created with no explicit `ServiceBusSenderOptions` value allocate less memory.
- Improved efficiency of subclient creation, reducing allocations when no explicit options are passed. - Fixed deserialization of the lock token to take into account endianness. _(A community contribution, courtesy of [martincostello](https://github.com/martincostello))_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsquire might be an issue because I look at this on the phone but the fixed de serialization bullet looks misplaced

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dang it. That's a real thing! Thanks for the heads-up, Daniel.



## 7.18.0-beta.1 (2024-05-08)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal class AmqpMessageBatch : TransportMessageBatch
/// well as any overhead for the batch itself when sent to the Queue/Topic.
/// </summary>
///
public override long MaxSizeInBytes { get; }
public override long MaxSizeInBytes => Options.MaxSizeInBytes.Value;

/// <summary>
/// The size of the batch, in bytes, as it will be sent to the Queue/Topic
Expand Down Expand Up @@ -74,8 +74,7 @@ internal class AmqpMessageBatch : TransportMessageBatch
/// Initializes a new instance of the <see cref="AmqpMessageBatch"/> class.
/// </summary>
///
/// <param name="messageConverter">The converter to use for translating <see cref="ServiceBusMessage"/> data into
/// an AMQP-specific message.</param>
/// <param name="messageConverter">The converter to use for translating <see cref="ServiceBusMessage"/> data into an AMQP-specific message.</param>
/// <param name="options">The set of options to apply to the batch.</param>
///
public AmqpMessageBatch(AmqpMessageConverter messageConverter,
Expand All @@ -86,7 +85,6 @@ public AmqpMessageBatch(AmqpMessageConverter messageConverter,
Argument.AssertNotNull(messageConverter, nameof(AmqpMessageConverter));

Options = options;
MaxSizeInBytes = options.MaxSizeInBytes.Value;
_messageConverter = messageConverter;
}

Expand All @@ -104,6 +102,15 @@ public override bool TryAddMessage(ServiceBusMessage message)
Argument.AssertNotNull(message, nameof(message));
Argument.AssertNotDisposed(_disposed, nameof(ServiceBusMessageBatch));

// If the batch is full according to the message limit, then
// reject the message.

if ((Options.MaxMessageCount.HasValue)
&& (_batchMessages.Count >= Options.MaxMessageCount))
{
return false;
}

var amqpMessage = _messageConverter.SBMessageToAmqpMessage(message);
long size = 0;

Expand All @@ -112,17 +119,14 @@ public override bool TryAddMessage(ServiceBusMessage message)
// Initialize the size by reserving space for the batch envelope taking into account the properties from the first
// message which will be used to populate properties on the batch envelope.

var messageList = new List<AmqpMessage>();
messageList.Add(amqpMessage);
var reserveOverheadMessage = _messageConverter.BuildAmqpBatchFromMessages(messageList.AsReadOnly(), forceBatch: true);
using var reserveOverheadMessage =
_messageConverter.BuildAmqpBatchFromMessages(new[] { amqpMessage }, forceBatch: true);

size = _sizeBytes
+ reserveOverheadMessage.SerializedMessageSize
+ (amqpMessage.SerializedMessageSize <= MaximumBytesSmallMessage
? OverheadBytesSmallMessage
: OverheadBytesLargeMessage);

reserveOverheadMessage.Dispose();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ internal class AmqpSender : TransportSender
///
private long? MaxMessageSize { get; set; }

/// <summary>
/// The maximum size of an AMQP message batch allowed by the associated
/// sender link.
/// </summary>
///
/// <value>The maximum message batch size, in bytes.</value>
///
private long? MaxBatchSize { get; set; }

/// <summary>
/// The maximum number of messages to allow in a single batch.
/// </summary>
///
private int MaxMessageCount { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="AmqpSender"/> class.
/// </summary>
Expand Down Expand Up @@ -114,6 +129,18 @@ public AmqpSender(
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));

// NOTE:
// This is a temporary work-around until Service Bus exposes a link property for
// the maximum batch size. The limit for batches differs from the limit for individual
// messages. Tracked by: https://github.com/Azure/azure-sdk-for-net/issues/44914
MaxBatchSize = 1048576;

// NOTE:
// This is a temporary work-around until Service Bus exposes a link property for
// the maximum batch size. The limit for batches differs from the limit for individual
// messages. Tracked by: https://github.com/Azure/azure-sdk-for-net/issues/44916
MaxMessageCount = 4500;

_entityPath = entityPath;
Identifier = identifier;
_retryPolicy = retryPolicy;
Expand Down Expand Up @@ -180,17 +207,18 @@ internal async ValueTask<TransportMessageBatch> CreateMessageBatchInternalAsync(
// Ensure that maximum message size has been determined; this depends on the underlying
// AMQP link, so if not set, requesting the link will ensure that it is populated.

if (!MaxMessageSize.HasValue)
if ((!MaxMessageSize.HasValue) || (!MaxBatchSize.HasValue))
{
await _sendLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
}

// Ensure that there was a maximum size populated; if none was provided,
// default to the maximum size allowed by the link.

options.MaxSizeInBytes ??= MaxMessageSize;
options.MaxSizeInBytes ??= MaxBatchSize;
options.MaxMessageCount ??= MaxMessageCount;

Argument.AssertInRange(options.MaxSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaxMessageSize.Value, nameof(options.MaxSizeInBytes));
Argument.AssertInRange(options.MaxSizeInBytes.Value, ServiceBusSender.MinimumBatchSizeLimit, MaxBatchSize.Value, nameof(options.MaxSizeInBytes));
return new AmqpMessageBatch(_messageConverter, options);
}

Expand Down Expand Up @@ -616,6 +644,13 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureSenderStateAsyn
await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaxMessageSize = (long)link.Settings.MaxMessageSize;

// Update with service metadata when available:
// https://github.com/Azure/azure-sdk-for-net/issues/44914
// https://github.com/Azure/azure-sdk-for-net/issues/44916

MaxBatchSize = Math.Min(MaxMessageSize.Value, MaxBatchSize.Value);
MaxMessageSize = MaxMessageSize;

ServiceBusEventSource.Log.CreateSendLinkComplete(Identifier);
link.Closed += OnSenderLinkClosed;
return link;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class ServiceBusReceiver : IAsyncDisposable
/// <summary>The maximum number of messages to delete in a single batch. This cap is established and enforced by the service.</summary>
internal const int MaxDeleteMessageCount = 4000;

/// <summary>The set of default options to use for initialization when no explicit options were provided.</summary>
private static ServiceBusReceiverOptions s_defaultOptions;

/// <summary>
/// The fully qualified Service Bus namespace that the receiver is associated with. This is likely
/// to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
Expand Down Expand Up @@ -177,7 +180,11 @@ internal ServiceBusReceiver(
Argument.AssertNotNullOrWhiteSpace(entityPath, nameof(entityPath));
connection.ThrowIfClosed();

options = options?.Clone() ?? new ServiceBusReceiverOptions();
// If no explicit options were provided, use the default set, creating them as needed. There is
// a benign race condition here where multiple sets of default options may be created when initializing.
// The cost of hitting the race is lower than the cost of synchronizing each access.
options ??= s_defaultOptions ??= new ServiceBusReceiverOptions();

Identifier = string.IsNullOrEmpty(options.Identifier) ? DiagnosticUtilities.GenerateIdentifier(entityPath) : options.Identifier;
_connection = connection;
_retryPolicy = connection.RetryOptions.ToRetryPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,5 @@ public int PrefetchCount
///
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => base.ToString();

/// <summary>
/// Creates a new copy of the current <see cref="ServiceBusReceiverOptions" />, cloning its attributes into a new instance.
/// </summary>
///
/// <returns>A new copy of <see cref="ServiceBusReceiverOptions" />.</returns>
internal ServiceBusReceiverOptions Clone() =>
new ServiceBusReceiverOptions
{
ReceiveMode = ReceiveMode,
PrefetchCount = PrefetchCount,
SubQueue = SubQueue,
Identifier = Identifier
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ public class CreateMessageBatchOptions
/// <summary>The requested maximum size to allow for the batch, in bytes.</summary>
private long? _maxSizeInBytes;

/// <summary>
/// The maximum number of messages to allow in a single batch.
/// </summary>
///
internal int? MaxMessageCount { get; set; }
jsquire marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// The maximum size to allow for a single batch of messages, in bytes.
/// </summary>
Expand All @@ -25,9 +31,11 @@ public class CreateMessageBatchOptions
/// The desired limit, in bytes, for the size of the associated service bus message batch. If <c>null</c>,
/// the maximum size allowed by the active transport will be used.
/// </value>
///
/// <exception cref="ArgumentOutOfRangeException">
/// A negative value is attempted to be set for the property.
/// </exception>
///
public long? MaxSizeInBytes
{
get => _maxSizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,58 @@ public void TryAddAcceptMessagesUntilTheMaximumSizeIsReached()
}
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpMessageBatch.TryAddMessage" />
/// method.
/// </summary>
///
[Test]
public void TryAdRespectsTheMaximumMessageCount()
{
var maximumCount = 5;
var currentIndex = -1;
var messages = new AmqpMessage[maximumCount + 1];
var mockMessage = new Mock<AmqpMessage>();

var mockMessageConverter = new InjectableMockConverter
{
BuildBatchFromAmqpMessagesHandler = (_s) => mockMessage.Object,
BuildAmqpMessageFromSBMessageHandler = (_s) => messages[++currentIndex]
};

mockMessage
.Setup(message => message.SerializedMessageSize)
.Returns(40);

var options = new CreateMessageBatchOptions
{
MaxSizeInBytes = 50000,
MaxMessageCount = maximumCount
};

for (var index = 0; index < messages.Length; ++index)
{
var size = 40;
var messageToAdd = new Mock<AmqpMessage>();
messageToAdd.Setup(messageToAdd => messageToAdd.SerializedMessageSize).Returns(size);
messages[index] = messageToAdd.Object;
}

var batch = new AmqpMessageBatch(mockMessageConverter, options);

for (var index = 0; index < messages.Length; ++index)
{
if (index == messages.Length - 1)
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[10])), Is.False, "The final addition should not fit in the available space.");
}
else
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[10])), Is.True, $"The addition for index: { index } should fit and be accepted.");
}
}
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpMessageBatch.TryAddMessage" />
/// method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
using Azure.Core.Shared;
using Azure.Messaging.ServiceBus.Amqp;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Moq;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Sender
Expand Down
Loading