Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split body memory into distinct parts for better clarity and avoiding to copy for the send path when a single ROM is passed #20098

Merged
merged 6 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM

if ((amqpMessage.BodyType & SectionFlag.Data) != 0 && amqpMessage.DataBody != null)
{
annotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.FromData(BodyMemory.FromAmqpData(amqpMessage.DataBody)));
annotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.FromData(Body.FromDataSegments(amqpMessage.DataBody)));
}
else if ((amqpMessage.BodyType & SectionFlag.AmqpValue) != 0 && amqpMessage.ValueBody?.Value != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using Azure.Core.Amqp;
using Azure.Messaging.ServiceBus.Primitives;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
using IList = System.Collections.IList;

Expand Down Expand Up @@ -104,7 +103,7 @@ public static BinaryData GetBody(this AmqpAnnotatedMessage message)
{
if (message.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
{
return BinaryData.FromBytes(BodyMemory.FromReadOnlyMemory(dataBody));
return BinaryData.FromBytes(Body.FromReadOnlyMemorySegments(dataBody));
}
throw new NotSupportedException($"{message.Body.BodyType} cannot be retrieved using the {nameof(ServiceBusMessage.Body)} property." +
$"Use {nameof(ServiceBusMessage.GetRawAmqpMessage)} to access the underlying Amqp Message object.");
Expand Down
166 changes: 166 additions & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/Body.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using Azure.Core;
using Microsoft.Azure.Amqp.Framing;

namespace Azure.Messaging.ServiceBus.Amqp
{
/// <summary>
/// The body abstractions allow to optimize several use cases of <see cref="ServiceBusMessage"/> and
/// <see cref="ServiceBusReceivedMessage"/> to make sure body memory is only converted when needed and as little as possible.
/// </summary>
internal abstract class Body : IEnumerable<ReadOnlyMemory<byte>>
{
public static Body FromReadOnlyMemorySegments(IEnumerable<ReadOnlyMemory<byte>> segments)
{
return segments switch
{
Body bodyMemory => bodyMemory,
_ => new CopyingOnConversionBody(segments)
};
}

public static Body FromReadOnlyMemorySegment(ReadOnlyMemory<byte> segment)
{
return new NonCopyingSingleSegmentBody(segment);
}

public static Body FromDataSegments(IEnumerable<Data> segments)
{
return new EagerCopyingBody(segments ?? Enumerable.Empty<Data>());
}

protected abstract ReadOnlyMemory<byte> WrittenMemory { get; }

public abstract IEnumerator<ReadOnlyMemory<byte>> GetEnumerator();

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public static implicit operator ReadOnlyMemory<byte>(Body memory)
{
return memory.WrittenMemory;
}

/// <summary>
/// Wraps a single data segment into an enumerable like type without copying to optimize for the most commonly used
/// path by always bridging into enumerable like behavior without the additional overhead of underlying lists and copying.
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
/// This is the common path for ServiceBusMessage when the Body is set via constructor or the Body property.
/// </summary>
private sealed class NonCopyingSingleSegmentBody : Body
{
public NonCopyingSingleSegmentBody(ReadOnlyMemory<byte> dataSegment)
{
WrittenMemory = dataSegment;
}

protected override ReadOnlyMemory<byte> WrittenMemory { get; }

public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
{
yield return WrittenMemory;
}
}

/// <summary>
/// Copies the provided segments into a single continuous buffer on demand while still keeping around a list of the individual copied segments.
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
/// This path is hit when users modify the body via the underlying AmqpAnnotatedMessage.
/// </summary>
private sealed class CopyingOnConversionBody : Body
{
private ArrayBufferWriter<byte> _writer;
private IList<ReadOnlyMemory<byte>> _segments;
private IEnumerable<ReadOnlyMemory<byte>> _lazySegments;

protected override ReadOnlyMemory<byte> WrittenMemory
{
get
{
if (_lazySegments != null)
{
foreach (var segment in _lazySegments)
{
Append(segment);
}

_lazySegments = null;
}

return _writer?.WrittenMemory ?? ReadOnlyMemory<byte>.Empty;
}
}

internal CopyingOnConversionBody(IEnumerable<ReadOnlyMemory<byte>> dataSegments)
{
_lazySegments = dataSegments;
}

private void Append(ReadOnlyMemory<byte> segment)
{
_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();

var memory = _writer.GetMemory(segment.Length);
segment.CopyTo(memory);
_writer.Advance(segment.Length);
_segments.Add(memory.Slice(0, segment.Length));
}

public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
{
return _segments?.GetEnumerator() ?? _lazySegments.GetEnumerator();
}
}

/// <summary>
/// Eagerly copies the provided data segments into a single continuous buffer while still keeping around a list of the individual copied segments.
/// Important for the receive path in order to make sure the buffers managed by the underlying AMQP library can be released on dispose.
/// </summary>
private sealed class EagerCopyingBody : Body
{
private ArrayBufferWriter<byte> _writer;
private IList<ReadOnlyMemory<byte>> _segments;

internal EagerCopyingBody(IEnumerable<Data> dataSegments)
{
foreach (var segment in dataSegments)
{
Append(segment);
}
}

protected override ReadOnlyMemory<byte> WrittenMemory => _writer?.WrittenMemory ?? ReadOnlyMemory<byte>.Empty;

public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
{
return _segments.GetEnumerator();
}

private void Append(Data segment)
{
// fields are lazy initialized to not occupy unnecessary memory when there are no data segments
_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();

ReadOnlyMemory<byte> dataToAppend = segment.Value switch
{
byte[] byteArray => byteArray,
ArraySegment<byte> arraySegment => arraySegment,
_ => ReadOnlyMemory<byte>.Empty
};

var memory = _writer.GetMemory(dataToAppend.Length);
dataToAppend.CopyTo(memory);
_writer.Advance(dataToAppend.Length);
_segments.Add(memory.Slice(0, dataToAppend.Length));
}
}
}
}
101 changes: 0 additions & 101 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/BodyMemory.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ServiceBusMessage(string body) :
/// <param name="body">The payload of the message in bytes.</param>
public ServiceBusMessage(ReadOnlyMemory<byte> body)
{
AmqpMessageBody amqpBody = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(new ReadOnlyMemory<byte>[] { body }));
AmqpMessageBody amqpBody = new AmqpMessageBody(Amqp.Body.FromReadOnlyMemorySegment(body));
AmqpMessage = new AmqpAnnotatedMessage(amqpBody);
}

Expand All @@ -68,7 +68,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage)
throw new NotSupportedException($"{receivedMessage.AmqpMessage.Body.BodyType} is not a supported message body type.");
}

AmqpMessageBody body = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(dataBody));
AmqpMessageBody body = new AmqpMessageBody(Amqp.Body.FromReadOnlyMemorySegments(dataBody));
AmqpMessage = new AmqpAnnotatedMessage(body);

// copy properties
Expand Down Expand Up @@ -140,7 +140,7 @@ public BinaryData Body
get => AmqpMessage.GetBody();
set
{
AmqpMessage.Body = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(new ReadOnlyMemory<byte>[] { value }));
AmqpMessage.Body = new AmqpMessageBody(Amqp.Body.FromReadOnlyMemorySegment(value));
}
}

Expand Down
Loading