From 5f2822703c80538e406f21ece5bc3284683fd840 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Thu, 4 Apr 2024 12:22:20 -0700 Subject: [PATCH] [Messaging] Allow binary application properties (#43181) * [Messaging] Allow binary application properties The focus of these changes is to update AMQP conversion to remove the validation that disallows `byte[]` and `ArraySegment` values in application properties. These are confirmed to be encoded as `binary` by `Microsoft.Azure.Amqp`, making them allowable values according to section 3.2.5 of the AMQP specification. The Event Hubs documentation for `EventData` has been updated to reflect this change. Due to a known bug in the Service Bus service, the service will reject messages with `byte[]` application properties. Until this is confirmed fixed, the Service Bus documentation still reflects `byte[]` as an invalid type for the application properties. --- .../Shared/AmqpAnnotatedMessageConverter.cs | 9 +- .../AmqpAnnotatedMessageConverterTests.cs | 281 ++++++++++++- .../src/Testing/EventDataExtensions.cs | 30 +- .../tests/Testing/EventDataExtensionsTests.cs | 108 +++++ .../Azure.Messaging.EventHubs/CHANGELOG.md | 2 + .../src/Amqp/AmqpMessageConverter.cs | 372 +----------------- .../src/EventData.cs | 1 + .../tests/Amqp/AmqpMessageConverterTests.cs | 43 +- .../EventHubConsumerClientLiveTests.cs | 56 ++- .../EventHubProducerClientLiveTests.cs | 23 ++ .../src/Primitives/ServiceBusMessage.cs | 9 + 11 files changed, 550 insertions(+), 384 deletions(-) diff --git a/sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs b/sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs index ee2bdbc113573..a7de0625ea82a 100644 --- a/sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs +++ b/sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs @@ -522,10 +522,14 @@ public static bool TryCreateAmqpPropertyValueFromNetProperty( amqpPropertyValue = new DescribedType((AmqpSymbol)AmqpMessageConstants.TimeSpan, ((TimeSpan)propertyValue).Ticks); break; - case AmqpType.Unknown when allowBodyTypes && propertyValue is byte[] byteArray: + case AmqpType.Unknown when propertyValue is byte[] byteArray: amqpPropertyValue = new ArraySegment(byteArray); break; + case AmqpType.Unknown when propertyValue is ArraySegment byteSegment: + amqpPropertyValue = byteSegment; + break; + case AmqpType.Unknown when allowBodyTypes && propertyValue is IDictionary dict: amqpPropertyValue = new AmqpMap(dict); break; @@ -811,8 +815,7 @@ private static void ThrowSerializationFailed(string propertyName, KeyValuePairThe typed value of the symbol, if it belongs to the well-known set; otherwise, null. /// - private static object? TranslateSymbol(AmqpSymbol symbol, - object value) + private static object? TranslateSymbol(AmqpSymbol symbol, object value) { if (symbol.Equals(AmqpMessageConstants.Uri)) { diff --git a/sdk/core/Azure.Core.Amqp/tests/AmqpAnnotatedMessageConverterTests.cs b/sdk/core/Azure.Core.Amqp/tests/AmqpAnnotatedMessageConverterTests.cs index f2b2b337451f1..87a50cc34a406 100644 --- a/sdk/core/Azure.Core.Amqp/tests/AmqpAnnotatedMessageConverterTests.cs +++ b/sdk/core/Azure.Core.Amqp/tests/AmqpAnnotatedMessageConverterTests.cs @@ -3,11 +3,15 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Text; using Azure.Core.Amqp.Shared; using Microsoft.Azure.Amqp; +using Microsoft.Azure.Amqp.Encoding; +using Microsoft.Azure.Amqp.Framing; using NUnit.Framework; +using static Azure.Core.Amqp.Shared.AmqpAnnotatedMessageConverter; namespace Azure.Core.Amqp.Tests { @@ -68,6 +72,63 @@ public class AmqpAnnotatedMessageConverterTests new List> { new List { "first", 1}, new List { "second", 2 } } }; + public static readonly object[] s_simpleApplicationPropertyValues = + { + (byte)0x22, + (sbyte)0x11, + (short)5, + (int)27, + (long)1122334, + (ushort)12, + (uint)24, + (ulong)9955, + (float)4.3, + (double)3.4, + (decimal)7.893, + Guid.NewGuid(), + DateTime.Parse("2015-10-27T12:00:00Z"), + true, + 'x', + "hello" + }; + + public static IEnumerable DescribedTypePropertyTestCases() + { + Func TranslateValue = value => + { + return value switch + { + DateTimeOffset offset => offset.Ticks, + + TimeSpan timespan => timespan.Ticks, + + Uri uri => uri.AbsoluteUri, + + _ => value, + }; + }; + + yield return new object[] { (AmqpSymbol)AmqpMessageConstants.Uri, new Uri("https://www.cheetoes.zomg"), TranslateValue }; + yield return new object[] { (AmqpSymbol)AmqpMessageConstants.DateTimeOffset, DateTimeOffset.Parse("2015-10-27T12:00:00Z"), TranslateValue }; + yield return new object[] { (AmqpSymbol)AmqpMessageConstants.TimeSpan, TimeSpan.FromHours(6), TranslateValue }; + } + + public static IEnumerable StreamPropertyTestCases() + { + var contents = new byte[] { 0x55, 0x66, 0x99, 0xAA }; + + yield return new object[] { new MemoryStream(contents, false), contents }; + yield return new object[] { new BufferedStream(new MemoryStream(contents, false), 512), contents }; + } + + public static IEnumerable BinaryPropertyTestCases() + { + var contents = new byte[] { 0x55, 0x66, 0x99, 0xAA }; + + yield return new object[] { contents, contents }; + yield return new object[] { new ArraySegment(contents), contents }; + } + [Test] public void CanRoundTripDataMessage() { @@ -122,17 +183,215 @@ public void CanRoundTripSequenceBodyMessages(IEnumerable> sequence AssertCommonProperties(message); } + [Test] + public void ToAmqpMessagePopulatesSimpleApplicationProperties() + { + var annotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.FromData(new ReadOnlyMemory[] { new byte[] { 0x11, 0x22, 0x33 }})); + + foreach (var value in s_simpleApplicationPropertyValues) + { + annotatedMessage.ApplicationProperties.Add($"{value.GetType().Name }Property", value); + } + + using AmqpMessage message = ToAmqpMessage(annotatedMessage); + + Assert.IsNotNull(message, "The AMQP message should have been created."); + Assert.IsNotNull(message.ApplicationProperties, "The AMQP message should have a set of application properties."); + + // The collection comparisons built into the test assertions do not recognize + // the property sets as equivalent, but a manual inspection proves the properties exist + // in both. + + foreach (var property in annotatedMessage.ApplicationProperties.Keys) + { + var containsValue = message.ApplicationProperties.Map.TryGetValue(property, out object value); + + Assert.IsTrue(containsValue, $"The message properties did not contain: [{ property }]"); + Assert.That(value, Is.EqualTo(annotatedMessage.ApplicationProperties[property]), $"The property value did not match for: [{ property }]"); + } + } + + [Test] + public void FromAmqpMessagePopulatesSimpleApplicationProperties() + { + var applicationProperties = s_simpleApplicationPropertyValues.ToDictionary(value => $"{ value.GetType().Name }Property", value => value); + var dataBody = new Data { Value = new byte[] { 0x11, 0x22, 0x33 } }; + + using var message = AmqpMessage.Create(dataBody); + + foreach (KeyValuePair pair in applicationProperties) + { + message.ApplicationProperties.Map.Add(pair.Key, pair.Value); + } + + var annotatedMessage = FromAmqpMessage(message); + + Assert.NotNull(annotatedMessage, "The message should have been created."); + Assert.IsTrue(annotatedMessage.ApplicationProperties.Any(), "The message should have a set of application properties."); + + // The collection comparisons built into the test assertions do not recognize + // the property sets as equivalent, but a manual inspection proves the properties exist + // in both. + + foreach (var property in applicationProperties.Keys) + { + var containsValue = annotatedMessage.ApplicationProperties.TryGetValue(property, out object value); + + Assert.IsTrue(containsValue, $"The message properties did not contain: [{ property }]"); + Assert.AreEqual(value, applicationProperties[property], $"The property value did not match for: [{ property }]"); + } + } + + [Test] + [TestCaseSource(nameof(DescribedTypePropertyTestCases))] + public void ToAmqpMessageTranslatesDescribedApplicationProperties(object typeDescriptor, object propertyValueRaw, Func propertyValueAccessor) + { + var annotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.FromData(new ReadOnlyMemory[] { new byte[] { 0x11, 0x22, 0x33 }})); + annotatedMessage.ApplicationProperties.Add("TestProp", propertyValueRaw); + + using AmqpMessage message = ToAmqpMessage(annotatedMessage); + + Assert.IsNotNull(message, "The AMQP message should have been created."); + Assert.IsNotNull(message.ApplicationProperties, "The AMQP message should have a set of application properties."); + + var propertyKey = annotatedMessage.ApplicationProperties.Keys.First(); + var propertyValue = propertyValueAccessor(annotatedMessage.ApplicationProperties[propertyKey]); + var containsValue = message.ApplicationProperties.Map.TryGetValue(propertyKey, out DescribedType describedValue); + + Assert.True(containsValue, "The message properties did not contain the property."); + Assert.AreEqual(describedValue.Value, propertyValue, "The property value did not match."); + Assert.AreEqual(describedValue.Descriptor, typeDescriptor, "The message property descriptor was incorrect."); + } + + [TestCaseSource(nameof(DescribedTypePropertyTestCases))] + public void FromAmqpMessagePopulateDescribedApplicationProperties(object typeDescriptor, object propertyValueRaw, Func propertyValueAccessor) + { + var dataBody = new Data { Value = new byte[] { 0x11, 0x22, 0x33 } }; + using var message = AmqpMessage.Create(dataBody); + + var describedProperty = new DescribedType(typeDescriptor, propertyValueAccessor(propertyValueRaw)); + message.ApplicationProperties.Map.Add(typeDescriptor.ToString(), describedProperty); + + var annotatedMessage = FromAmqpMessage(message); + + Assert.NotNull(annotatedMessage, "The message should have been created."); + Assert.IsTrue(annotatedMessage.ApplicationProperties.Any(), "The message should have a set of application properties."); + + var containsValue = annotatedMessage.ApplicationProperties.TryGetValue(typeDescriptor.ToString(), out object value); + Assert.IsTrue(containsValue, $"The event properties did not contain the described property."); + Assert.AreEqual(value, propertyValueRaw, $"The property value did not match."); + } + + [Test] + [TestCaseSource(nameof(StreamPropertyTestCases))] + public void ToAmqpMessageTranslatesStreamApplicationProperties(object propertyStream, byte[] contents) + { + var annotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.FromData(new ReadOnlyMemory[] { new byte[] { 0x11, 0x22, 0x33 }})); + annotatedMessage.ApplicationProperties.Add("TestProp", propertyStream); + + using AmqpMessage message = ToAmqpMessage(annotatedMessage); + + Assert.IsNotNull(message, "The AMQP message should have been created."); + Assert.IsNotNull(message.ApplicationProperties, "The AMQP message should have a set of application properties."); + + var propertyKey = annotatedMessage.ApplicationProperties.Keys.First(); + var containsValue = message.ApplicationProperties.Map.TryGetValue(propertyKey, out object streamValue); + + Assert.IsTrue(containsValue, "The message properties did not contain the property."); + Assert.IsInstanceOf>(streamValue, "The message property stream was not read correctly."); + Assert.AreEqual(((ArraySegment)streamValue).ToArray(), contents, "The property value did not match."); + } + + [Test] + [TestCaseSource(nameof(BinaryPropertyTestCases))] + public void ToAmqpMessageTranslatesBinaryApplicationProperties(object property, object contents) + { + var annotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.FromData(new ReadOnlyMemory[] { new byte[] { 0x11, 0x22, 0x33 }})); + annotatedMessage.ApplicationProperties.Add("TestProp", property); + + using AmqpMessage message = ToAmqpMessage(annotatedMessage); + + Assert.IsNotNull(message, "The AMQP message should have been created."); + Assert.IsNotNull(message.ApplicationProperties, "The AMQP message should have a set of application properties."); + + var propertyKey = annotatedMessage.ApplicationProperties.Keys.First(); + var containsValue = message.ApplicationProperties.Map.TryGetValue(propertyKey, out object streamValue); + + Assert.IsTrue(containsValue, "The message properties did not contain the property."); + Assert.IsInstanceOf>(streamValue, "The message property stream was not read correctly."); + Assert.AreEqual(((ArraySegment)streamValue).ToArray(), contents, "The property value did not match."); + } + + [Test] + [TestCaseSource(nameof(BinaryPropertyTestCases))] + public void FromAmqpMessagePopulatesBinaryApplicationProperties(object property, object contents) + { + var dataBody = new Data { Value = new byte[] { 0x11, 0x22, 0x33 } }; + using var message = AmqpMessage.Create(dataBody); + + var propertyKey = "Test"; + message.ApplicationProperties.Map.Add(propertyKey, property); + + var annotatedMessage = FromAmqpMessage(message); + + Assert.NotNull(annotatedMessage, "The message should have been created."); + Assert.IsTrue(annotatedMessage.ApplicationProperties.Any(), "The message should have a set of application properties."); + + var containsValue = annotatedMessage.ApplicationProperties.TryGetValue(propertyKey, out var messageValue); + Assert.IsTrue(containsValue, $"The message properties should contain the property."); + Assert.AreEqual(messageValue, contents, "The property value was incorrect."); + } + + [Test] + public void FromAmqpMessagePopulatesPartialArraySegmentApplicationProperties() + { + var dataBody = new Data { Value = new byte[] { 0x11, 0x22, 0x33 } }; + using var message = AmqpMessage.Create(dataBody); + + var propertyKey = "Test"; + var propertyValue = new byte[] { 0x11, 0x15, 0xF8, 0x20 }; + message.ApplicationProperties.Map.Add(propertyKey, new ArraySegment(propertyValue, 1, 2)); + + var annotatedMessage = FromAmqpMessage(message); + + Assert.NotNull(annotatedMessage, "The message should have been created."); + Assert.IsTrue(annotatedMessage.ApplicationProperties.Any(), "The message should have a set of application properties."); + + var containsValue = annotatedMessage.ApplicationProperties.TryGetValue(propertyKey, out var messageValue); + Assert.IsTrue(containsValue, $"The message properties should contain the property."); + Assert.AreEqual(messageValue, propertyValue.Skip(1).Take(2), "The property value was incorrect."); + } + + [Test] + public void FromAmqpMessageDoesNotIncludeUnknownApplicationPropertyType() + { + var dataBody = new Data { Value = new byte[] { 0x11, 0x22, 0x33 } }; + using var message = AmqpMessage.Create(dataBody); + + var typeDescriptor = (AmqpSymbol)"INVALID"; + var describedProperty = new DescribedType(typeDescriptor, 1234); + message.ApplicationProperties.Map.Add(typeDescriptor.ToString(), describedProperty); + + var annotatedMessage = FromAmqpMessage(message); + + Assert.NotNull(annotatedMessage, "The message should have been created."); + Assert.IsFalse(annotatedMessage.ApplicationProperties.Any(), "The message should not have a set of application properties."); + + var containsValue = annotatedMessage.ApplicationProperties.TryGetValue(typeDescriptor.ToString(), out var messageValue); + Assert.IsFalse(containsValue, "The message properties should not contain the described property."); + } + [Test] public void TimeToLiveIsOverriddenOnReceivedMessageByAbsoluteExpiryTime() { var amqpMessage = - AmqpAnnotatedMessageConverter.ToAmqpMessage(new AmqpAnnotatedMessage(AmqpMessageBody.FromValue(5))); + ToAmqpMessage(new AmqpAnnotatedMessage(AmqpMessageBody.FromValue(5))); amqpMessage.Properties.CreationTime = DateTime.UtcNow; amqpMessage.Properties.AbsoluteExpiryTime = DateTime.MaxValue; amqpMessage.Header.Ttl = (uint) TimeSpan.FromDays(49).TotalMilliseconds; - var annotatedMessage = AmqpAnnotatedMessageConverter.FromAmqpMessage(amqpMessage); + var annotatedMessage = FromAmqpMessage(amqpMessage); // The expected TTL will disregard the TTL set on the header and instead calculate it based on expiry time and creation time. var expectedTtl = amqpMessage.Properties.AbsoluteExpiryTime - amqpMessage.Properties.CreationTime; @@ -140,15 +399,15 @@ public void TimeToLiveIsOverriddenOnReceivedMessageByAbsoluteExpiryTime() } [Test] - public void TimeToLiveIsNotOverridenWhenNoAbsoluteExpiryTimePresent() + public void TimeToLiveIsNotOverriddenWhenNoAbsoluteExpiryTimePresent() { var amqpMessage = - AmqpAnnotatedMessageConverter.ToAmqpMessage(new AmqpAnnotatedMessage(AmqpMessageBody.FromValue(5))); + ToAmqpMessage(new AmqpAnnotatedMessage(AmqpMessageBody.FromValue(5))); amqpMessage.Properties.CreationTime = DateTime.UtcNow; amqpMessage.Header.Ttl = (uint) TimeSpan.FromDays(49).TotalMilliseconds; - var annotatedMessage = AmqpAnnotatedMessageConverter.FromAmqpMessage(amqpMessage); + var annotatedMessage = FromAmqpMessage(amqpMessage); Assert.AreEqual(TimeSpan.FromDays(49), annotatedMessage.Header.TimeToLive); } @@ -163,12 +422,12 @@ public void TimeToLiveRoundTripsCorrectlyWithGreaterThanMaxInt() TimeToLive = TimeSpan.FromDays(100) } }; - var amqpMessage = AmqpAnnotatedMessageConverter.ToAmqpMessage(input); + var amqpMessage = ToAmqpMessage(input); Assert.AreEqual(uint.MaxValue, amqpMessage.Header.Ttl); Assert.AreEqual(amqpMessage.Properties.CreationTime + TimeSpan.FromDays(100), amqpMessage.Properties.AbsoluteExpiryTime); - var output = AmqpAnnotatedMessageConverter.FromAmqpMessage(amqpMessage); + var output = FromAmqpMessage(amqpMessage); Assert.AreEqual(TimeSpan.FromDays(100), output.Header.TimeToLive); Assert.AreEqual(amqpMessage.Properties.CreationTime, output.Properties.CreationTime!.Value.UtcDateTime); @@ -179,7 +438,7 @@ public void TimeToLiveRoundTripsCorrectlyWithGreaterThanMaxInt() public void AbsoluteExpiryTimeAndCreationTimeNotSetWhenNoTtl() { var input = new AmqpAnnotatedMessage(AmqpMessageBody.FromValue(5)); - var amqpMessage = AmqpAnnotatedMessageConverter.ToAmqpMessage(input); + var amqpMessage = ToAmqpMessage(input); Assert.IsNull(amqpMessage.Header.Ttl); Assert.IsNull(amqpMessage.Properties.CreationTime); @@ -193,7 +452,7 @@ public void AbsoluteExpiryTimeAndCreationTimeSetOnMessageWhenSetExplicitly() var now = DateTime.UtcNow; input.Properties.CreationTime = now; input.Properties.AbsoluteExpiryTime = now + TimeSpan.FromDays(1); - var amqpMessage = AmqpAnnotatedMessageConverter.ToAmqpMessage(input); + var amqpMessage = ToAmqpMessage(input); Assert.IsNull(amqpMessage.Header.Ttl); Assert.AreEqual(now, amqpMessage.Properties.CreationTime); @@ -201,14 +460,14 @@ public void AbsoluteExpiryTimeAndCreationTimeSetOnMessageWhenSetExplicitly() } [Test] - public void AbsoluteExpiryTimeAndCreationTimeAreOverridenBasedOnTtlWhenSending() + public void AbsoluteExpiryTimeAndCreationTimeAreOverriddenBasedOnTtlWhenSending() { var input = new AmqpAnnotatedMessage(AmqpMessageBody.FromValue(5)); var now = DateTimeOffset.UtcNow; input.Properties.CreationTime = now; input.Properties.AbsoluteExpiryTime = now + TimeSpan.FromDays(1); input.Header.TimeToLive = TimeSpan.FromDays(7); - var amqpMessage = AmqpAnnotatedMessageConverter.ToAmqpMessage(input); + var amqpMessage = ToAmqpMessage(input); Assert.AreEqual(TimeSpan.FromDays(7).TotalMilliseconds, amqpMessage.Header.Ttl); Assert.AreEqual(amqpMessage.Properties.CreationTime + TimeSpan.FromDays(7), amqpMessage.Properties.AbsoluteExpiryTime); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventDataExtensions.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventDataExtensions.cs index 12d1444c5af8d..a799711350cb8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventDataExtensions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventDataExtensions.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. using System; +using System.Collections; +using System.Collections.Generic; using System.Linq; namespace Azure.Messaging.EventHubs.Tests @@ -115,7 +117,33 @@ public static bool IsEquivalentTo(this EventData instance, return false; } - return instance.Properties.OrderBy(kvp => kvp.Key).SequenceEqual(other.Properties.OrderBy(kvp => kvp.Key)); + foreach (var key in instance.Properties.Keys) + { + if (!other.Properties.TryGetValue(key, out object otherValue)) + { + return false; + } + + // Properties can contain byte[] or ArraySegment values, which need to be compared + // as a sequence rather than by strict equality. Both forms implement IList, so they + // can be normalized for comparison. + + if ((instance.Properties[key] is IList instanceList) && (otherValue is IList otherList)) + { + if (!instanceList.SequenceEqual(otherList)) + { + return false; + } + } + else if (!instance.Properties[key].Equals(otherValue)) + { + return false; + } + } + + // No inequalities were found, so the events are equal. + + return true; } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/EventDataExtensionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/EventDataExtensionsTests.cs index 0c2cd61cdc7f5..0efbbccdc3cec 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/EventDataExtensionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/EventDataExtensionsTests.cs @@ -116,6 +116,114 @@ public void IsEquivalentToDetectsDifferentProperties() Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.False); } + /// + /// Verifies functionality of the test + /// helper. + /// + /// + [Test] + public void IsEquivalentToComparesByteArrayPropertiesBySequence() + { + var body = new byte[] { 0x22, 0x44, 0x88 }; + var firstEvent = new EventData((byte[])body.Clone()); + var secondEvent = new EventData((byte[])body.Clone()); + + firstEvent.Properties["test"] = new byte[] { 0x1, 0x2, 0x3 }; + secondEvent.Properties["test"] = new byte[] { 0x1, 0x2, 0x3 }; + + Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.True); + } + + /// + /// Verifies functionality of the test + /// helper. + /// + /// + [Test] + public void IsEquivalentToDetectsDifferentArrayProperties() + { + var body = new byte[] { 0x22, 0x44, 0x88 }; + var firstEvent = new EventData((byte[])body.Clone()); + var secondEvent = new EventData((byte[])body.Clone()); + + firstEvent.Properties["test"] = new byte[] { 0x1, 0x2, 0x3 }; + secondEvent.Properties["test"] = new byte[] { 0x2, 0x3, 0x4 }; + + Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.False); + } + + /// + /// Verifies functionality of the test + /// helper. + /// + /// + [Test] + public void IsEquivalentToComparesArraySegmentPropertiesBySequence() + { + var body = new byte[] { 0x22, 0x44, 0x88 }; + var firstEvent = new EventData((byte[])body.Clone()); + var secondEvent = new EventData((byte[])body.Clone()); + + firstEvent.Properties["test"] = new ArraySegment(new byte[] { 0x1, 0x2, 0x3 }); + secondEvent.Properties["test"] = new ArraySegment(new byte[] { 0x1, 0x2, 0x3 }); + + Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.True); + } + + /// + /// Verifies functionality of the test + /// helper. + /// + /// + [Test] + public void IsEquivalentToDetectsDifferentArraySegmentProperties() + { + var body = new byte[] { 0x22, 0x44, 0x88 }; + var firstEvent = new EventData((byte[])body.Clone()); + var secondEvent = new EventData((byte[])body.Clone()); + + firstEvent.Properties["test"] = new ArraySegment(new byte[] { 0x1, 0x2, 0x3 }); + secondEvent.Properties["test"] = new ArraySegment(new byte[] { 0x2, 0x3, 0x4 }); + + Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.False); + } + + /// + /// Verifies functionality of the test + /// helper. + /// + /// + [Test] + public void IsEquivalentToComparesMixedBinaryPropertiesBySequence() + { + var body = new byte[] { 0x22, 0x44, 0x88 }; + var firstEvent = new EventData((byte[])body.Clone()); + var secondEvent = new EventData((byte[])body.Clone()); + + firstEvent.Properties["test"] = new ArraySegment(new byte[] { 0x1, 0x2, 0x3 }); + secondEvent.Properties["test"] = new byte[] { 0x1, 0x2, 0x3 }; + + Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.True); + } + + /// + /// Verifies functionality of the test + /// helper. + /// + /// + [Test] + public void IsEquivalentToDetectsMixedBinaryProperties() + { + var body = new byte[] { 0x22, 0x44, 0x88 }; + var firstEvent = new EventData((byte[])body.Clone()); + var secondEvent = new EventData((byte[])body.Clone()); + + firstEvent.Properties["test"] = new byte[] { 0x1, 0x2, 0x3 }; + secondEvent.Properties["test"] = new ArraySegment(new byte[] { 0x2, 0x3, 0x4 }); + + Assert.That(firstEvent.IsEquivalentTo(secondEvent), Is.False); + } + /// /// Verifies functionality of the test /// helper. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md index 871d3b7478f14..809a46625801c 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md @@ -12,6 +12,8 @@ ### Other Changes +- It is now possible to set `byte[]` values as [application properties](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties) in the `EventData.Properties` collection. + ## 5.11.1 (2024-03-05) ### Other Changes diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs index db362ec3c0b33..c24eb13643348 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs @@ -437,7 +437,15 @@ private static EventData BuildEventFromAmqpMessage(AmqpMessage source) return new EventData(message); } - private static void NormalizeBrokerProperties(IDictionary properties, Annotations sourceProperties) + /// + /// Normalizes the broker-owned properties of an event. + /// + /// + /// The properties to normalize. + /// The source properties from the AMQP message. + /// + private static void NormalizeBrokerProperties(IDictionary properties, + Annotations sourceProperties) { foreach (var pair in sourceProperties) { @@ -497,368 +505,6 @@ private static void SetPublisherProperties(AmqpMessage message, } } - /// - /// Translates the data body segments into the corresponding set of - /// instances. - /// - /// - /// The data body to translate. - /// - /// The set of instances that represents the . - /// - private static IEnumerable TranslateDataBody(IEnumerable> dataBody) - { - foreach (var bodySegment in dataBody) - { - if (!MemoryMarshal.TryGetArray(bodySegment, out ArraySegment dataSegment)) - { - dataSegment = new ArraySegment(bodySegment.ToArray()); - } - - yield return new Data - { - Value = dataSegment - }; - } - } - - /// - /// Translates the data body elements into the corresponding set of - /// instances. - /// - /// - /// The sequence body to translate. - /// - /// The set of instances that represents the in AMQP format. - /// - private static IEnumerable TranslateSequenceBody(IEnumerable> sequenceBody) - { - foreach (var item in sequenceBody) - { - yield return new AmqpSequence((System.Collections.IList)item); - } - } - - /// - /// Translates the data body into the corresponding set of - /// instance. - /// - /// - /// The sequence body to translate. - /// - /// The instance that represents the in AMQP format. - /// - private static AmqpValue TranslateValueBody(object valueBody) - { - if (TryCreateAmqpPropertyValueForEventProperty(valueBody, out var amqpValue, allowBodyTypes: true)) - { - return new AmqpValue { Value = amqpValue }; - } - - throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageValueBodyMask, valueBody.GetType().Name)); - } - - /// - /// Attempts to read the data body of an . - /// - /// - /// The to read from. - /// The value of the data body, if read. - /// - /// true if the body was successfully read; otherwise, false. - /// - private static bool TryGetDataBody(AmqpMessage source, out AmqpMessageBody dataBody) - { - if (((source.BodyType & SectionFlag.Data) == 0) || (source.DataBody == null)) - { - dataBody = null; - return false; - } - - dataBody = AmqpMessageBody.FromData(MessageBody.FromDataSegments(source.DataBody)); - return true; - } - - /// - /// Attempts to read the sequence body of an . - /// - /// - /// The to read from. - /// The value of the sequence body, if read. - /// - /// true if the body was successfully read; otherwise, false. - /// - private static bool TryGetSequenceBody(AmqpMessage source, out AmqpMessageBody sequenceBody) - { - if ((source.BodyType & SectionFlag.AmqpSequence) == 0) - { - sequenceBody = null; - return false; - } - - var bodyContent = new List>(); - - foreach (var item in source.SequenceBody) - { - bodyContent.Add((IList)item.List); - } - - sequenceBody = AmqpMessageBody.FromSequence(bodyContent); - return true; - } - - /// - /// Attempts to read the sequence body of an . - /// - /// - /// The to read from. - /// The value body, if read. - /// - /// true if the body was successfully read; otherwise, false. - /// - private static bool TryGetValueBody(AmqpMessage source, out AmqpMessageBody valueBody) - { - if (((source.BodyType & SectionFlag.AmqpValue) == 0) || (source.ValueBody?.Value == null)) - { - valueBody = null; - return false; - } - - if (TryCreateEventPropertyForAmqpProperty(source.ValueBody.Value, out var translatedValue, allowBodyTypes: true)) - { - valueBody = AmqpMessageBody.FromValue(translatedValue); - return true; - } - - throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageValueBodyMask, source.ValueBody.Value.GetType().Name)); - } - - /// - /// Attempts to create an AMQP property value for a given event property. - /// - /// - /// The value of the event property to create an AMQP property value for. - /// The AMQP property value that was created. - /// true to allow an AMQP map to be translated to additional types supported only by a message body; otherwise, false. - /// - /// true if an AMQP property value was able to be created; otherwise, false. - /// - private static bool TryCreateAmqpPropertyValueForEventProperty(object eventPropertyValue, - out object amqpPropertyValue, - bool allowBodyTypes = false) - { - amqpPropertyValue = null; - - if (eventPropertyValue == null) - { - return true; - } - - switch (GetTypeIdentifier(eventPropertyValue)) - { - case AmqpProperty.Type.Byte: - case AmqpProperty.Type.SByte: - case AmqpProperty.Type.Int16: - case AmqpProperty.Type.Int32: - case AmqpProperty.Type.Int64: - case AmqpProperty.Type.UInt16: - case AmqpProperty.Type.UInt32: - case AmqpProperty.Type.UInt64: - case AmqpProperty.Type.Single: - case AmqpProperty.Type.Double: - case AmqpProperty.Type.Boolean: - case AmqpProperty.Type.Decimal: - case AmqpProperty.Type.Char: - case AmqpProperty.Type.Guid: - case AmqpProperty.Type.DateTime: - case AmqpProperty.Type.String: - amqpPropertyValue = eventPropertyValue; - break; - - case AmqpProperty.Type.Stream: - case AmqpProperty.Type.Unknown when eventPropertyValue is Stream: - amqpPropertyValue = ReadStreamToArraySegment((Stream)eventPropertyValue); - break; - - case AmqpProperty.Type.Uri: - amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.Uri, ((Uri)eventPropertyValue).AbsoluteUri); - break; - - case AmqpProperty.Type.DateTimeOffset: - amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.DateTimeOffset, ((DateTimeOffset)eventPropertyValue).UtcTicks); - break; - - case AmqpProperty.Type.TimeSpan: - amqpPropertyValue = new DescribedType(AmqpProperty.Descriptor.TimeSpan, ((TimeSpan)eventPropertyValue).Ticks); - break; - - case AmqpProperty.Type.Unknown when allowBodyTypes && eventPropertyValue is byte[] byteArray: - amqpPropertyValue = new ArraySegment(byteArray); - break; - - case AmqpProperty.Type.Unknown when allowBodyTypes && eventPropertyValue is System.Collections.IDictionary dict: - amqpPropertyValue = new AmqpMap(dict); - break; - - case AmqpProperty.Type.Unknown when allowBodyTypes && eventPropertyValue is System.Collections.IList: - amqpPropertyValue = eventPropertyValue; - break; - - case AmqpProperty.Type.Unknown: - var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.FailedToSerializeUnsupportedType, eventPropertyValue.GetType().FullName)); - EventHubsEventSource.Log.UnexpectedException(exception.Message); - throw exception; - } - - return (amqpPropertyValue != null); - } - - /// - /// Attempts to create an event property value for a given AMQP property. - /// - /// - /// The value of the AMQP property to create an event property value for. - /// The event property value that was created. - /// true to allow an AMQP map to be translated to additional types supported only by a message body; otherwise, false. - /// - /// true if an event property value was able to be created; otherwise, false. - /// - private static bool TryCreateEventPropertyForAmqpProperty(object amqpPropertyValue, - out object eventPropertyValue, - bool allowBodyTypes = false) - { - eventPropertyValue = null; - - if (amqpPropertyValue == null) - { - return true; - } - - // If the property is a simple type, then use it directly. - - switch (GetTypeIdentifier(amqpPropertyValue)) - { - case AmqpProperty.Type.Byte: - case AmqpProperty.Type.SByte: - case AmqpProperty.Type.Int16: - case AmqpProperty.Type.Int32: - case AmqpProperty.Type.Int64: - case AmqpProperty.Type.UInt16: - case AmqpProperty.Type.UInt32: - case AmqpProperty.Type.UInt64: - case AmqpProperty.Type.Single: - case AmqpProperty.Type.Double: - case AmqpProperty.Type.Boolean: - case AmqpProperty.Type.Decimal: - case AmqpProperty.Type.Char: - case AmqpProperty.Type.Guid: - case AmqpProperty.Type.DateTime: - case AmqpProperty.Type.String: - eventPropertyValue = amqpPropertyValue; - return true; - - case AmqpProperty.Type.Unknown: - // An explicitly unknown type will be considered for additional - // scenarios below. - break; - - default: - return false; - } - - // Attempt to parse the value against other well-known value scenarios. - - switch (amqpPropertyValue) - { - case AmqpSymbol symbol: - eventPropertyValue = symbol.Value; - break; - - case byte[] array: - eventPropertyValue = array; - break; - - case ArraySegment segment when segment.Count == segment.Array.Length: - eventPropertyValue = segment.Array; - break; - - case ArraySegment segment: - var buffer = new byte[segment.Count]; - Buffer.BlockCopy(segment.Array, segment.Offset, buffer, 0, segment.Count); - eventPropertyValue = buffer; - break; - - case DescribedType described when (described.Descriptor is AmqpSymbol): - eventPropertyValue = TranslateSymbol((AmqpSymbol)described.Descriptor, described.Value); - break; - - case AmqpMap map when allowBodyTypes: - { - var dict = new Dictionary(map.Count); - - foreach (var pair in map) - { - dict.Add(pair.Key.ToString(), pair.Value); - } - - eventPropertyValue = dict; - break; - } - - default: - var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, Resources.FailedToSerializeUnsupportedType, amqpPropertyValue.GetType().FullName)); - EventHubsEventSource.Log.UnexpectedException(exception.Message); - throw exception; - } - - return (eventPropertyValue != null); - } - - /// - /// Gets the AMQP property type identifier for a given - /// value. - /// - /// - /// The value to determine the type identifier for. - /// - /// The that was identified for the given . - /// - private static AmqpProperty.Type GetTypeIdentifier(object value) => - (value == null) - ? AmqpProperty.Type.Null - : value.GetType().ToAmqpPropertyType(); - - /// - /// Translates the AMQP symbol into its corresponding typed value, if it belongs to the - /// set of known types. - /// - /// - /// The symbol to consider. - /// The value of the symbol to translate. - /// - /// The typed value of the symbol, if it belongs to the well-known set; otherwise, null. - /// - private static object TranslateSymbol(AmqpSymbol symbol, - object value) - { - if (symbol.Equals(AmqpProperty.Descriptor.Uri)) - { - return new Uri((string)value); - } - - if (symbol.Equals(AmqpProperty.Descriptor.TimeSpan)) - { - return new TimeSpan((long)value); - } - - if (symbol.Equals(AmqpProperty.Descriptor.DateTimeOffset)) - { - return new DateTimeOffset((long)value, TimeSpan.Zero); - } - - return null; - } - /// /// Converts a stream to an representation. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs index 4ec0ccaa7740e..69bf941fca6cb 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs @@ -243,6 +243,7 @@ public string CorrelationId /// Stream /// Uri /// TimeSpan + /// byte[] /// /// /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs index 921e804b31f66..6acfccfec21f8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpMessageConverterTests.cs @@ -30,6 +30,7 @@ public class AmqpMessageConverterTests /// /// The set of test cases for known described type properties. /// + /// public static IEnumerable DescribedTypePropertyTestCases() { Func TranslateValue = value => @@ -63,6 +64,18 @@ public static IEnumerable StreamPropertyTestCases() yield return new object[] { new BufferedStream(new MemoryStream(contents, false), 512), contents }; } + /// + /// The set of test cases for known described type properties. + /// + /// + public static IEnumerable BinaryPropertyTestCases() + { + var contents = new byte[] { 0x55, 0x66, 0x99, 0xAA }; + + yield return new object[] { contents, contents }; + yield return new object[] { new ArraySegment(contents), contents }; + } + /// /// The set of test cases for optional publishing properties. /// @@ -317,6 +330,34 @@ public void CreateMessageFromEventTranslatesStreamApplicationProperties(object p Assert.That(((ArraySegment)streamValue).ToArray(), Is.EqualTo(contents), "The property value did not match."); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCaseSource(nameof(BinaryPropertyTestCases))] + public void CreateMessageFromEventTranslatesBinaryApplicationProperties(object property, + object contents) + { + var eventData = new EventData( + eventBody: new BinaryData(new byte[] { 0x11, 0x22, 0x33 }), + properties: new Dictionary { { "TestProp", property } }); + + using AmqpMessage message = new AmqpMessageConverter().CreateMessageFromEvent(eventData); + + Assert.That(message, Is.Not.Null, "The AMQP message should have been created."); + Assert.That(message.DataBody, Is.Not.Null, "The AMQP message should a body."); + Assert.That(message.ApplicationProperties, Is.Not.Null, "The AMQP message should have a set of application properties."); + + var propertyKey = eventData.Properties.Keys.First(); + var containsValue = message.ApplicationProperties.Map.TryGetValue(propertyKey, out object streamValue); + + Assert.That(containsValue, Is.True, "The message properties did not contain the property."); + Assert.That(streamValue, Is.InstanceOf>(), "The message property stream was not read correctly."); + Assert.That(((ArraySegment)streamValue).ToArray(), Is.EqualTo(contents), "The property value did not match."); + } + /// /// Verifies functionality of the /// method. @@ -376,7 +417,7 @@ public void CreateMessageFromEventAllowsEmptyEventWithAPartitionKey() /// /// [Test] - public void CreateMessageFromEventDoesNotTriggerPropertiesInstantation() + public void CreateMessageFromEventDoesNotTriggerPropertiesInstantiation() { var eventData = new EventData(ReadOnlyMemory.Empty); using var message = new AmqpMessageConverter().CreateMessageFromEvent(eventData); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs index 0d3b8a45a937c..4188aa6467ab5 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs @@ -521,6 +521,52 @@ [Test]public async Task ConsumerCanReadEventsWithCustomProperties() } } + /// + /// Verifies that the is able to + /// connect to the Event Hubs service and perform operations. + /// + /// + [Test]public async Task ConsumerCanReadEventsWithBinaryProperties() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + + var sourceEvents = EventGenerator.CreateEvents(5) + .Select(current => + { + current.Properties["TestByteArray"] = new byte[] { 0x12, 0x34, 0x56, 0x78 }; + current.Properties["TestArraySegment"] = new ArraySegment(new byte[] { 0x23, 0x45, 0x67, 0x89 }); + + return current; + }) + .ToList(); + + await using (var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString)) + { + var partition = (await consumer.GetPartitionIdsAsync(cancellationSource.Token)).First(); + await SendEventsAsync(connectionString, sourceEvents, new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token); + + // Read the events and validate the resulting state. + + var readState = await ReadEventsFromPartitionAsync(consumer, partition,sourceEvents.Select(evt => evt.MessageId), cancellationSource.Token); + Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); + + foreach (var sourceEvent in sourceEvents) + { + var sourceId = sourceEvent.MessageId; + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); + Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); + } + } + + cancellationSource.Cancel(); + } + } + /// /// Verifies that the is able to /// connect to the Event Hubs service and perform operations. @@ -1453,7 +1499,7 @@ public async Task ConsumerCanReadFromMultipleConsumerGroupsWithDifferentActiveOw /// /// [Test] - public async Task ExclusiveConsumerSupercedesNonExclusiveActiveReader() + public async Task ExclusiveConsumerSupersedesNonExclusiveActiveReader() { await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { @@ -1517,7 +1563,7 @@ public async Task ExclusiveConsumerSupercedesNonExclusiveActiveReader() /// /// [Test] - public async Task ConsumerWithHigherOwnerLevelSupercedesActiveReader() + public async Task ConsumerWithHigherOwnerLevelSupersedesActiveReader() { await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { @@ -1583,7 +1629,7 @@ public async Task ConsumerWithHigherOwnerLevelSupercedesActiveReader() /// /// [Test] - public async Task ExclusiveConsumerDoesNotSupercedNonExclusiveActiveReaderOnAnotherPartition() + public async Task ExclusiveConsumerDoesNotSupersedeNonExclusiveActiveReaderOnAnotherPartition() { await using (EventHubScope scope = await EventHubScope.CreateAsync(2)) { @@ -1648,7 +1694,7 @@ await Task.WhenAll /// /// [Test] - public async Task ExclusiveConsumerDoesNotSupercedNonExclusiveActiveReaderOnAnotherConsumerGroup() + public async Task ExclusiveConsumerDoesNotSupersedeNonExclusiveActiveReaderOnAnotherConsumerGroup() { var consumerGroups = new[] { "customGroup", "customTwo" }; @@ -1752,7 +1798,7 @@ public async Task ConsumerIsNotCompromisedByFailureToReadFromInvalidPartition() /// /// [Test] - public async Task ConsumerIsNotCompromisedByBeingSupercededByAnotherReaderWithHigherLevel() + public async Task ConsumerIsNotCompromisedByBeingSupersededByAnotherReaderWithHigherLevel() { await using (EventHubScope scope = await EventHubScope.CreateAsync(2)) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientLiveTests.cs index 299d56a2b5617..05cdb4dcf1f5d 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientLiveTests.cs @@ -14,6 +14,7 @@ using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Producer; +using Microsoft.Azure.Amqp.Framing; using NUnit.Framework; namespace Azure.Messaging.EventHubs.Tests @@ -707,6 +708,28 @@ public async Task ProducerCanSendLargeEventBatch() } } + /// + /// Verifies that the is able to + /// connect to the Event Hubs service and perform operations. + /// + /// + [Test] + public async Task ProducerCanSendWithBinaryApplicationProperties() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + + await using var producer = new EventHubProducerClient(connectionString); + + var eventData = new EventData(Encoding.UTF8.GetBytes("AWord")); + eventData.Properties["TestByteArray"] = new byte[] { 0x12, 0x34, 0x56, 0x78 }; + eventData.Properties["TestArraySegment"] = new ArraySegment(new byte[] { 0x23, 0x45, 0x67, 0x89 }); + + Assert.That(async () => await producer.SendAsync(new[] { eventData }), Throws.Nothing); + } + } + /// /// Verifies that the is able to /// connect to the Event Hubs service and perform operations. diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs index da1aeebf583cb..0772aaf824819 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs @@ -458,6 +458,15 @@ public DateTimeOffset ScheduledEnqueueTime /// /// Occurs when the is serialized for transport when an unsupported type is used as a property. /// + /// + /// Occurs when the is serialized for transport when a value of type or + /// is used as a property. The will be set to + /// in this case. + /// + /// This is due to a known bug in the Service Bus service, where an application property encoded as binary cannot be + /// handled by the service and is incorrectly rejected for being too large. A fix is planned, but the time line is + /// currently unknown. The recommended workaround is to encode the binary data as a Base64 string. + /// public IDictionary ApplicationProperties { get