Skip to content

Commit

Permalink
Message binary data (#12358)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored May 30, 2020
1 parent 00a62c7 commit ddf2da6
Show file tree
Hide file tree
Showing 23 changed files with 146 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
README.md = README.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{4A864BCB-6AD7-4F89-8778-C2953379EAAC}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{4A864BCB-6AD7-4F89-8778-C2953379EAAC}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C7CA6-452C-4041-A66D-78DC6F762C62}"
ProjectSection(SolutionItems) = preProject
Expand All @@ -28,6 +28,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C
samples\Sample06_Transactions.md = samples\Sample06_Transactions.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Experimental", "..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj", "{5871D9C6-F2DF-4F05-B29A-6C0D8709784A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -46,6 +48,10 @@ Global
{4A864BCB-6AD7-4F89-8778-C2953379EAAC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A864BCB-6AD7-4F89-8778-C2953379EAAC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A864BCB-6AD7-4F89-8778-C2953379EAAC}.Release|Any CPU.Build.0 = Release|Any CPU
{5871D9C6-F2DF-4F05-B29A-6C0D8709784A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5871D9C6-F2DF-4F05-B29A-6C0D8709784A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5871D9C6-F2DF-4F05-B29A-6C0D8709784A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5871D9C6-F2DF-4F05-B29A-6C0D8709784A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/Azure.Messaging.ServiceBus/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ServiceBusReceiver receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveAsync();

// get the message body as a string
string body = Encoding.UTF8.GetString(receivedMessage.Body.ToArray());
string body = receivedMessage.Body.AsString();
Console.WriteLine(body);
```

Expand Down Expand Up @@ -248,7 +248,7 @@ IList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveBatchA
foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
{
// get the message body as a string
string body = Encoding.UTF8.GetString(receivedMessage.Body.ToArray());
string body = receivedMessage.Body.AsString();
Console.WriteLine(body);
}
```
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/Azure.Messaging.ServiceBus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ServiceBusReceiver receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveAsync();

// get the message body as a string
string body = Encoding.UTF8.GetString(receivedMessage.Body.ToArray());
string body = receivedMessage.Body.AsString();
Console.WriteLine(body);
```

Expand Down Expand Up @@ -279,7 +279,7 @@ processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = Encoding.UTF8.GetString(args.Message.Body.ToArray());
string body = args.Message.Body.AsString();
Console.WriteLine(body);

// we can evaluate application logic and use that to determine how to settle the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ServiceBusReceiver receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveAsync();

// get the message body as a string
string body = Encoding.UTF8.GetString(receivedMessage.Body.ToArray());
string body = receivedMessage.Body.AsString();
Console.WriteLine(body);
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = Encoding.UTF8.GetString(args.Message.Body.ToArray());
string body = args.Message.Body.AsString();
Console.WriteLine(body);

// we can evaluate application logic and use that to determine how to settle the message.
Expand Down Expand Up @@ -125,8 +125,7 @@ processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
string body = Encoding.Default.GetString(args.Message.Body.ToArray());
Console.WriteLine(body);
var body = args.Message.Body.AsString();

// we can evaluate application logic and use that to determine how to settle the message.
await args.CompleteAsync(args.Message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
string body = Encoding.Default.GetString(args.Message.Body.ToArray());
Console.WriteLine(body);
var body = args.Message.Body.AsString();

// we can evaluate application logic and use that to determine how to settle the message.
await args.CompleteAsync(args.Message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream)

public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
{
var body = new ArraySegment<byte>((sbMessage.Body.IsEmpty) ? Array.Empty<byte>() : sbMessage.Body.ToArray());
ReadOnlyMemory<byte> bodyBytes = sbMessage.Body.AsBytes();
var body = new ArraySegment<byte>((bodyBytes.IsEmpty) ? Array.Empty<byte>() : bodyBytes.ToArray());
var amqpMessage = AmqpMessage.Create(new Data { Value = body });
amqpMessage.Properties.MessageId = sbMessage.MessageId;
amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
<Compile Include="$(AzureCoreSharedSources)ValueStopwatch.cs" Link="SharedSource\Azure.Core\ValueStopwatch.cs" />

</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj" />
</ItemGroup>
<ItemGroup>
<Compile Update="Resources.Designer.cs">
<DesignTime>True</DesignTime>
Expand Down
44 changes: 35 additions & 9 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using Azure.Core;

namespace Azure.Messaging.ServiceBus
Expand All @@ -24,18 +25,49 @@ public class ServiceBusMessage
private TimeSpan _timeToLive;

/// <summary>
/// Creates a new Message
/// Creates a new message.
/// </summary>
public ServiceBusMessage()
: this(default(ReadOnlyMemory<byte>))
{
}

/// <summary>
/// Creates a new message from the specified string, using UTF-8 encoding.
/// </summary>
/// <param name="body">The payload of the message as a string.</param>
public ServiceBusMessage(string body) :
this(body, Encoding.UTF8)
{
}

/// <summary>
/// Creates a new message from the specified string, using the specified encoding.
/// </summary>
/// <param name="body">The payload of the message as a string.</param>
/// <param name="encoding">The encoding to use for the body.</param>
public ServiceBusMessage(string body, Encoding encoding)
{
Argument.AssertNotNull(encoding, nameof(encoding));
Body = BinaryData.Create(body, encoding);
Properties = new Dictionary<string, object>();
}

/// <summary>
/// Creates a new message from the specified payload.
/// </summary>
/// <param name="body">The payload of the message in bytes</param>
/// <param name="body">The payload of the message in bytes.</param>
public ServiceBusMessage(ReadOnlyMemory<byte> body)
{
Body = new BinaryData(body);
Properties = new Dictionary<string, object>();
}

/// <summary>
/// Creates a new message from the specified <see cref="BinaryData"/> instance.
/// </summary>
/// <param name="body">The payload of the message.</param>
public ServiceBusMessage(BinaryData body)
{
Body = body;
Properties = new Dictionary<string, object>();
Expand Down Expand Up @@ -74,7 +106,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage)
/// message.Body = System.Text.Encoding.UTF8.GetBytes("Message1");
/// </code>
/// </remarks>
public ReadOnlyMemory<byte> Body { get; set; }
public BinaryData Body { get; set; }

/// <summary>
/// Gets or sets the MessageId to identify the message.
Expand Down Expand Up @@ -256,12 +288,6 @@ public TimeSpan TimeToLive
/// depends on the queue's workload and its state.</remarks>
public DateTimeOffset ScheduledEnqueueTime { get; set; }

// TODO: Calculate the size of the properties and body
/// <summary>
/// Gets the total size of the message body in bytes.
/// </summary>
public long Size => !Body.IsEmpty ? Body.Length : 0;

/// <summary>
/// Gets the "user properties" bag, which can be used for custom message metadata.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Globalization;
using Azure.Core;

namespace Azure.Messaging.ServiceBus
{
Expand All @@ -24,7 +25,7 @@ public class ServiceBusReceivedMessage
/// <summary>
/// Gets the body of the message.
/// </summary>
public ReadOnlyMemory<byte> Body => SentMessage.Body;
public BinaryData Body => SentMessage.Body;

/// <summary>
/// Gets or sets the MessageId to identify the message.
Expand Down Expand Up @@ -145,12 +146,6 @@ public class ServiceBusReceivedMessage
/// depends on the queue's workload and its state.</remarks>
public DateTimeOffset ScheduledEnqueueTime => SentMessage.ScheduledEnqueueTime;

// TODO: Calculate the size of the properties and body
/// <summary>
/// Gets the total size of the message body in bytes.
/// </summary>
public long Size => SentMessage.Size;

/// <summary>
/// Gets the "user properties" bag, which can be used for custom message metadata.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Text;
using Azure.Core;
using Azure.Messaging.ServiceBus.Amqp;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
Expand All @@ -22,7 +23,8 @@ public void ConvertAmqpMessageToSBMessage()
var amqpMessage = AmqpMessage.Create(data);

var sbMessage = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
Assert.AreEqual(messageBody, sbMessage.Body.ToArray());
ReadOnlyMemory<byte> sbBody = sbMessage.Body;
Assert.AreEqual(messageBody, sbBody.ToArray());
}

[Test]
Expand Down Expand Up @@ -61,7 +63,7 @@ public void ConvertSBMessageToAmqpMessageAndBack()
var convertedSbMessage = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);

Assert.AreEqual("SomeUserProperty", convertedSbMessage.Properties["UserProperty"]);
Assert.AreEqual(messageBody, convertedSbMessage.Body.ToArray());
Assert.AreEqual(messageBody, convertedSbMessage.Body.AsBytes().ToArray());
Assert.AreEqual(messageId, convertedSbMessage.MessageId);
Assert.AreEqual(partitionKey, convertedSbMessage.PartitionKey);
Assert.AreEqual(viaPartitionKey, convertedSbMessage.ViaPartitionKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<ItemGroup>
<ProjectReference Include="$(AzureCoreTestFramework)" />
<ProjectReference Include="..\src\Azure.Messaging.ServiceBus.csproj" />
<ProjectReference Include="..\..\..\core\Azure.Core.Experimental\src\Azure.Core.Experimental.csproj" />
</ItemGroup>

<ItemGroup>
Expand All @@ -27,7 +28,7 @@
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
<PackageReference Include="System.Threading.Tasks.Extensions" />
</ItemGroup>

<ItemGroup Condition="'$(IsTargetingNetFx)' == 'true'">
<Reference Include="System.Transactions" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Azure.Core;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Client
Expand Down Expand Up @@ -34,7 +35,7 @@ public async Task GetChildClientFromClosedParentClientThrows(bool useSessions)
receiver = await client.CreateSessionReceiverAsync(scope.QueueName);
}
var receivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false);
Assert.True(Encoding.UTF8.GetString(receivedMessage.Body.ToArray()) == Encoding.UTF8.GetString(message.Body.ToArray()));
Assert.AreEqual(message.Body.AsString(), receivedMessage.Body.AsString());

await client.DisposeAsync();
Assert.IsTrue(client.IsDisposed);
Expand Down Expand Up @@ -76,7 +77,7 @@ public async Task GetChildClientFromParentSucceedsOnOpenConnection(bool useSessi
receiver = await client.CreateSessionReceiverAsync(scope.QueueName);
}
var receivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false);
Assert.True(Encoding.UTF8.GetString(receivedMessage.Body.ToArray()) == Encoding.UTF8.GetString(message.Body.ToArray()));
Assert.AreEqual(message.Body.AsString(), receivedMessage.Body.AsString());

if (!useSessions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp.Framing;
using Azure.Core;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Message
Expand Down Expand Up @@ -86,7 +84,7 @@ public async Task CanSendMessageWithMaxSize()
var receiver = client.CreateReceiver(scope.QueueName);
var receivedMaxSizeMessage = await receiver.ReceiveAsync();
await receiver.CompleteAsync(receivedMaxSizeMessage.LockToken);
Assert.AreEqual(maxPayload, receivedMaxSizeMessage.Body.ToArray());
Assert.AreEqual(maxPayload, receivedMaxSizeMessage.Body.AsBytes().ToArray());
}
}

Expand All @@ -98,7 +96,7 @@ public async Task CreateFromReceivedMessageCopiesProperties()
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var msg = new ServiceBusMessage();
msg.Body = GetRandomBuffer(100);
msg.Body = new BinaryData(GetRandomBuffer(100));
msg.ContentType = "contenttype";
msg.CorrelationId = "correlationid";
msg.Label = "label";
Expand Down Expand Up @@ -126,7 +124,7 @@ public async Task CreateFromReceivedMessageCopiesProperties()

void AssertMessagesEqual(ServiceBusMessage sentMessage, ServiceBusReceivedMessage received)
{
Assert.IsTrue(received.Body.ToArray().SequenceEqual(sentMessage.Body.ToArray()));
Assert.IsTrue(received.Body.AsBytes().ToArray().SequenceEqual(sentMessage.Body.AsBytes().ToArray()));
Assert.AreEqual(received.ContentType, sentMessage.ContentType);
Assert.AreEqual(received.CorrelationId, sentMessage.CorrelationId);
Assert.AreEqual(received.Label, sentMessage.Label);
Expand All @@ -145,5 +143,40 @@ void AssertMessagesEqual(ServiceBusMessage sentMessage, ServiceBusReceivedMessag
}
}
}

[Test]
public async Task SendJsonBodyMessage()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var serializer = new JsonObjectSerializer();
var testBody = new TestBody
{
A = "text",
B = 5,
C = false
};
var body = BinaryData.Create(testBody, serializer);
var msg = new ServiceBusMessage(body);

await sender.SendAsync(msg);

var receiver = client.CreateReceiver(scope.QueueName);
var received = await receiver.ReceiveAsync();
var receivedBody = received.Body.As<TestBody>(serializer);
Assert.AreEqual(testBody.A, receivedBody.A);
Assert.AreEqual(testBody.B, receivedBody.B);
Assert.AreEqual(testBody.C, receivedBody.C);
}
}

private class TestBody
{
public string A { get; set; }
public int B { get; set; }
public bool C { get; set; }
}
}
}
Loading

0 comments on commit ddf2da6

Please sign in to comment.