diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index 43df2bb9b33e..ed6387c8cbb1 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Raw AMQP message support, including full support for encoding Body (Value, Sequence and also multiple byte slices for Data). See ExampleEventDataBatch_AddEventData_rawAMQPMessages for some concrete examples. + ### Breaking Changes - EventDataBatch.NumMessages() renamed to EventDataBatch.NumEvents() diff --git a/sdk/messaging/azeventhubs/README.md b/sdk/messaging/azeventhubs/README.md index 9ffaac0f54ae..d1d64e939847 100644 --- a/sdk/messaging/azeventhubs/README.md +++ b/sdk/messaging/azeventhubs/README.md @@ -67,7 +67,7 @@ Examples for various scenarios can be found on [pkg.go.dev](https://pkg.go.dev/g This module uses the classification-based logging implementation in `azcore`. To enable console logging for all SDK modules, set the environment variable `AZURE_SDK_GO_LOGGING` to `all`. -Use the `azcore/log` package to control log event output or to enable logs for `azservicebus` only. For example: +Use the `azcore/log` package to control log event output or to enable logs for `azeventhubs` only. For example: ```go import ( diff --git a/sdk/messaging/azeventhubs/amqp_message.go b/sdk/messaging/azeventhubs/amqp_message.go new file mode 100644 index 000000000000..2b6ca2db8915 --- /dev/null +++ b/sdk/messaging/azeventhubs/amqp_message.go @@ -0,0 +1,271 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azeventhubs + +import ( + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" +) + +// AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs. +// For details about these properties, refer to the AMQP specification: +// +// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format +// +// Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some +// cases slices and maps. +// +// AMQP simple types include: +// - int (any size), uint (any size) +// - float (any size) +// - string +// - bool +// - time.Time +type AMQPAnnotatedMessage struct { + // ApplicationProperties corresponds to the "application-properties" section of an AMQP message. + // + // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. + ApplicationProperties map[string]any + + // Body represents the body of an AMQP message. + Body AMQPAnnotatedMessageBody + + // DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message. + // + // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. + DeliveryAnnotations map[any]any + + // DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame + // for this message. + DeliveryTag []byte + + // Footer is the transport footers for this AMQP message. + // + // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. + Footer map[any]any + + // Header is the transport headers for this AMQP message. + Header *AMQPAnnotatedMessageHeader + + // MessageAnnotations corresponds to the message-annotations section of an AMQP message. + // + // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. + MessageAnnotations map[any]any + + // Properties corresponds to the properties section of an AMQP message. + Properties *AMQPAnnotatedMessageProperties +} + +// AMQPAnnotatedMessageProperties represents the properties of an AMQP message. +// See here for more details: +// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties +type AMQPAnnotatedMessageProperties struct { + // AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property. + AbsoluteExpiryTime *time.Time + + // ContentEncoding corresponds to the 'content-encoding' property. + ContentEncoding *string + + // ContentType corresponds to the 'content-type' property + ContentType *string + + // CorrelationID corresponds to the 'correlation-id' property. + // The type of CorrelationID can be a uint64, UUID, []byte, or a string + CorrelationID any + + // CreationTime corresponds to the 'creation-time' property. + CreationTime *time.Time + + // GroupID corresponds to the 'group-id' property. + GroupID *string + + // GroupSequence corresponds to the 'group-sequence' property. + GroupSequence *uint32 + + // MessageID corresponds to the 'message-id' property. + // The type of MessageID can be a uint64, UUID, []byte, or string + MessageID any + + // ReplyTo corresponds to the 'reply-to' property. + ReplyTo *string + + // ReplyToGroupID corresponds to the 'reply-to-group-id' property. + ReplyToGroupID *string + + // Subject corresponds to the 'subject' property. + Subject *string + + // To corresponds to the 'to' property. + To *string + + // UserID corresponds to the 'user-id' property. + UserID []byte +} + +// AMQPAnnotatedMessageBody represents the body of an AMQP message. +// Only one of these fields can be used a a time. They are mutually exclusive. +type AMQPAnnotatedMessageBody struct { + // Data is encoded/decoded as multiple data sections in the body. + Data [][]byte + + // Sequence is encoded/decoded as one or more amqp-sequence sections in the body. + // + // The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage. + Sequence [][]any + + // Value is encoded/decoded as the amqp-value section in the body. + // + // The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage, + // as well as slices or maps of AMQP simple types. + Value any +} + +// AMQPAnnotatedMessageHeader carries standard delivery details about the transfer +// of a message. +// See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header +// for more details. +type AMQPAnnotatedMessageHeader struct { + // DeliveryCount is the number of unsuccessful previous attempts to deliver this message. + // It corresponds to the 'delivery-count' property. + DeliveryCount uint32 + + // Durable corresponds to the 'durable' property. + Durable bool + + // FirstAcquirer corresponds to the 'first-acquirer' property. + FirstAcquirer bool + + // Priority corresponds to the 'priority' property. + Priority uint8 + + // TTL corresponds to the 'ttl' property. + TTL time.Duration +} + +// toAMQPMessage converts between our (azeventhubs) AMQP message +// to the underlying message used by go-amqp. +func (am *AMQPAnnotatedMessage) toAMQPMessage() *amqp.Message { + var header *amqp.MessageHeader + + if am.Header != nil { + header = &amqp.MessageHeader{ + DeliveryCount: am.Header.DeliveryCount, + Durable: am.Header.Durable, + FirstAcquirer: am.Header.FirstAcquirer, + Priority: am.Header.Priority, + TTL: am.Header.TTL, + } + } + + var properties *amqp.MessageProperties + + if am.Properties != nil { + properties = &amqp.MessageProperties{ + AbsoluteExpiryTime: am.Properties.AbsoluteExpiryTime, + ContentEncoding: am.Properties.ContentEncoding, + ContentType: am.Properties.ContentType, + CorrelationID: am.Properties.CorrelationID, + CreationTime: am.Properties.CreationTime, + GroupID: am.Properties.GroupID, + GroupSequence: am.Properties.GroupSequence, + MessageID: am.Properties.MessageID, + ReplyTo: am.Properties.ReplyTo, + ReplyToGroupID: am.Properties.ReplyToGroupID, + Subject: am.Properties.Subject, + To: am.Properties.To, + UserID: am.Properties.UserID, + } + } else { + properties = &amqp.MessageProperties{} + } + + var footer amqp.Annotations + + if am.Footer != nil { + footer = (amqp.Annotations)(am.Footer) + } + + return &amqp.Message{ + Annotations: copyAnnotations(am.MessageAnnotations), + ApplicationProperties: am.ApplicationProperties, + Data: am.Body.Data, + DeliveryAnnotations: amqp.Annotations(am.DeliveryAnnotations), + DeliveryTag: am.DeliveryTag, + Footer: footer, + Header: header, + Properties: properties, + Sequence: am.Body.Sequence, + Value: am.Body.Value, + } +} + +func copyAnnotations(src map[any]any) amqp.Annotations { + if src == nil { + return amqp.Annotations{} + } + + dest := amqp.Annotations{} + + for k, v := range src { + dest[k] = v + } + + return dest +} + +func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage { + var header *AMQPAnnotatedMessageHeader + + if goAMQPMessage.Header != nil { + header = &AMQPAnnotatedMessageHeader{ + DeliveryCount: goAMQPMessage.Header.DeliveryCount, + Durable: goAMQPMessage.Header.Durable, + FirstAcquirer: goAMQPMessage.Header.FirstAcquirer, + Priority: goAMQPMessage.Header.Priority, + TTL: goAMQPMessage.Header.TTL, + } + } + + var properties *AMQPAnnotatedMessageProperties + + if goAMQPMessage.Properties != nil { + properties = &AMQPAnnotatedMessageProperties{ + AbsoluteExpiryTime: goAMQPMessage.Properties.AbsoluteExpiryTime, + ContentEncoding: goAMQPMessage.Properties.ContentEncoding, + ContentType: goAMQPMessage.Properties.ContentType, + CorrelationID: goAMQPMessage.Properties.CorrelationID, + CreationTime: goAMQPMessage.Properties.CreationTime, + GroupID: goAMQPMessage.Properties.GroupID, + GroupSequence: goAMQPMessage.Properties.GroupSequence, + MessageID: goAMQPMessage.Properties.MessageID, + ReplyTo: goAMQPMessage.Properties.ReplyTo, + ReplyToGroupID: goAMQPMessage.Properties.ReplyToGroupID, + Subject: goAMQPMessage.Properties.Subject, + To: goAMQPMessage.Properties.To, + UserID: goAMQPMessage.Properties.UserID, + } + } + + var footer map[any]any + + if goAMQPMessage.Footer != nil { + footer = (map[any]any)(goAMQPMessage.Footer) + } + + return &AMQPAnnotatedMessage{ + MessageAnnotations: map[any]any(goAMQPMessage.Annotations), + ApplicationProperties: goAMQPMessage.ApplicationProperties, + Body: AMQPAnnotatedMessageBody{ + Data: goAMQPMessage.Data, + Sequence: goAMQPMessage.Sequence, + Value: goAMQPMessage.Value, + }, + DeliveryAnnotations: map[any]any(goAMQPMessage.DeliveryAnnotations), + DeliveryTag: goAMQPMessage.DeliveryTag, + Footer: footer, + Header: header, + Properties: properties, + } +} diff --git a/sdk/messaging/azeventhubs/amqp_message_test.go b/sdk/messaging/azeventhubs/amqp_message_test.go new file mode 100644 index 000000000000..d7af421f2034 --- /dev/null +++ b/sdk/messaging/azeventhubs/amqp_message_test.go @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azeventhubs + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAMQPAnnotatedMessageUnitTest(t *testing.T) { + t.Run("Default", func(t *testing.T) { + msg := &AMQPAnnotatedMessage{} + amqpMessage := msg.toAMQPMessage() + + // we duplicate/inflate these since we modify them + // in various parts of the API. + require.NotNil(t, amqpMessage.Properties) + require.NotNil(t, amqpMessage.Annotations) + }) +} diff --git a/sdk/messaging/azeventhubs/event_data.go b/sdk/messaging/azeventhubs/event_data.go index 425f9ab0c46d..9300aa51f7c6 100644 --- a/sdk/messaging/azeventhubs/event_data.go +++ b/sdk/messaging/azeventhubs/event_data.go @@ -54,6 +54,12 @@ type ReceivedEventData struct { // Offset is the offset of the event. Offset *int64 + // RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access + // to properties that are not exposed by ReceivedEventData such as payloads encoded into the + // Value or Sequence section, payloads sent as multiple Data sections, as well as Footer + // and Header fields. + RawAMQPMessage *AMQPAnnotatedMessage + // SequenceNumber is a unique number assigned to a message by Event Hubs. SequenceNumber int64 @@ -100,7 +106,9 @@ func (e *EventData) toAMQPMessage() *amqp.Message { // NOTE: this converter assumes that the Body of this message will be the first // serialized byte array in the Data section of the messsage. func newReceivedEventData(amqpMsg *amqp.Message) (*ReceivedEventData, error) { - re := &ReceivedEventData{} + re := &ReceivedEventData{ + RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg), + } if len(amqpMsg.Data) == 1 { re.Body = amqpMsg.Data[0] diff --git a/sdk/messaging/azeventhubs/event_data_batch.go b/sdk/messaging/azeventhubs/event_data_batch.go index 034db9ad272a..048e973e372c 100644 --- a/sdk/messaging/azeventhubs/event_data_batch.go +++ b/sdk/messaging/azeventhubs/event_data_batch.go @@ -52,50 +52,63 @@ type AddEventDataOptions struct { // [azeventhubs.ProducerClient.NewEventDataBatch], or (by default) from Event // Hubs itself. // +// Returns ErrMessageTooLarge if the event cannot fit, or a non-nil error for +// other failures. +func (b *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) error { + return b.addAMQPMessage(ed.toAMQPMessage()) +} + +// AddAMQPAnnotatedMessage adds an AMQPAnnotatedMessage to the batch, failing +// if the AMQPAnnotatedMessage would cause the EventDataBatch to be too large to send. +// +// This size limit was set when the EventDataBatch was created, in options to +// [azeventhubs.ProducerClient.NewEventDataBatch], or (by default) from Event +// Hubs itself. +// // Returns ErrMessageTooLarge if the message cannot fit, or a non-nil error for // other failures. -func (mb *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) error { - return mb.addAMQPMessage(ed.toAMQPMessage()) +func (b *EventDataBatch) AddAMQPAnnotatedMessage(annotatedMessage *AMQPAnnotatedMessage, options *AddEventDataOptions) error { + return b.addAMQPMessage(annotatedMessage.toAMQPMessage()) } // NumBytes is the number of bytes in the batch. -func (mb *EventDataBatch) NumBytes() uint64 { - mb.mu.RLock() - defer mb.mu.RUnlock() +func (b *EventDataBatch) NumBytes() uint64 { + b.mu.RLock() + defer b.mu.RUnlock() - return mb.currentSize + return b.currentSize } // NumEvents returns the number of events in the batch. -func (mb *EventDataBatch) NumEvents() int32 { - mb.mu.RLock() - defer mb.mu.RUnlock() +func (b *EventDataBatch) NumEvents() int32 { + b.mu.RLock() + defer b.mu.RUnlock() - return int32(len(mb.marshaledMessages)) + return int32(len(b.marshaledMessages)) } // toAMQPMessage converts this batch into a sendable *amqp.Message // NOTE: not idempotent! -func (mb *EventDataBatch) toAMQPMessage() *amqp.Message { - mb.mu.Lock() - defer mb.mu.Unlock() +func (b *EventDataBatch) toAMQPMessage() *amqp.Message { + b.mu.Lock() + defer b.mu.Unlock() - mb.batchEnvelope.Data = make([][]byte, len(mb.marshaledMessages)) - mb.batchEnvelope.Format = batchMessageFormat + b.batchEnvelope.Data = make([][]byte, len(b.marshaledMessages)) + b.batchEnvelope.Format = batchMessageFormat - if mb.partitionKey != nil { - if mb.batchEnvelope.Annotations == nil { - mb.batchEnvelope.Annotations = make(amqp.Annotations) + if b.partitionKey != nil { + if b.batchEnvelope.Annotations == nil { + b.batchEnvelope.Annotations = make(amqp.Annotations) } - mb.batchEnvelope.Annotations[partitionKeyAnnotation] = *mb.partitionKey + b.batchEnvelope.Annotations[partitionKeyAnnotation] = *b.partitionKey } - copy(mb.batchEnvelope.Data, mb.marshaledMessages) - return mb.batchEnvelope + copy(b.batchEnvelope.Data, b.marshaledMessages) + return b.batchEnvelope } -func (mb *EventDataBatch) addAMQPMessage(msg *amqp.Message) error { +func (b *EventDataBatch) addAMQPMessage(msg *amqp.Message) error { if msg.Properties.MessageID == nil || msg.Properties.MessageID == "" { uid, err := uuid.New() if err != nil { @@ -104,12 +117,12 @@ func (mb *EventDataBatch) addAMQPMessage(msg *amqp.Message) error { msg.Properties.MessageID = uid.String() } - if mb.partitionKey != nil { + if b.partitionKey != nil { if msg.Annotations == nil { msg.Annotations = make(amqp.Annotations) } - msg.Annotations[partitionKeyAnnotation] = *mb.partitionKey + msg.Annotations[partitionKeyAnnotation] = *b.partitionKey } bin, err := msg.MarshalBinary() @@ -117,10 +130,10 @@ func (mb *EventDataBatch) addAMQPMessage(msg *amqp.Message) error { return err } - mb.mu.Lock() - defer mb.mu.Unlock() + b.mu.Lock() + defer b.mu.Unlock() - if len(mb.marshaledMessages) == 0 { + if len(b.marshaledMessages) == 0 { // the first message is special - we use its properties and annotations as the // actual envelope for the batch message. batchEnv, batchEnvLen, err := createBatchEnvelope(msg) @@ -130,24 +143,24 @@ func (mb *EventDataBatch) addAMQPMessage(msg *amqp.Message) error { } // (we'll undo this if it turns out the message was too big) - mb.currentSize = uint64(batchEnvLen) - mb.batchEnvelope = batchEnv + b.currentSize = uint64(batchEnvLen) + b.batchEnvelope = batchEnv } actualPayloadSize := calcActualSizeForPayload(bin) - if mb.currentSize+actualPayloadSize > mb.maxBytes { - if len(mb.marshaledMessages) == 0 { + if b.currentSize+actualPayloadSize > b.maxBytes { + if len(b.marshaledMessages) == 0 { // reset our our properties, this didn't end up being our first message. - mb.currentSize = 0 - mb.batchEnvelope = nil + b.currentSize = 0 + b.batchEnvelope = nil } return ErrEventDataTooLarge } - mb.currentSize += actualPayloadSize - mb.marshaledMessages = append(mb.marshaledMessages, bin) + b.currentSize += actualPayloadSize + b.marshaledMessages = append(b.marshaledMessages, bin) return nil } diff --git a/sdk/messaging/azeventhubs/event_data_test.go b/sdk/messaging/azeventhubs/event_data_test.go index 2cf04314f463..bbc25e4cdcdd 100644 --- a/sdk/messaging/azeventhubs/event_data_test.go +++ b/sdk/messaging/azeventhubs/event_data_test.go @@ -86,7 +86,7 @@ func TestEventData_newReceivedEventData(t *testing.T) { "application property 1": "application property value 1", } - require.Equal(t, &ReceivedEventData{ + expectedEventData := &ReceivedEventData{ EventData: EventData{ Body: expectedBody[0], ContentType: to.Ptr("content type"), @@ -101,7 +101,30 @@ func TestEventData_newReceivedEventData(t *testing.T) { }, Offset: to.Ptr[int64](102), PartitionKey: to.Ptr("partition key"), - }, re) + RawAMQPMessage: &AMQPAnnotatedMessage{ + Properties: &AMQPAnnotatedMessageProperties{ + ContentType: to.Ptr("content type"), + MessageID: "message id", + CorrelationID: to.Ptr("correlation id"), + }, + Body: AMQPAnnotatedMessageBody{ + Data: [][]byte{[]byte("hello world")}, + }, + ApplicationProperties: map[string]any{ + "application property 1": "application property value 1", + }, + MessageAnnotations: map[any]any{ + "hello": "world", + 5: "ignored", + "x-opt-partition-key": "partition key", + "x-opt-sequence-number": int64(101), + "x-opt-offset": "102", + "x-opt-enqueued-time": now, + }, + }, + } + + require.Equal(t, expectedEventData, re) require.Equal(t, &amqp.Message{ Properties: &amqp.MessageProperties{ diff --git a/sdk/messaging/azeventhubs/example_producerclient_test.go b/sdk/messaging/azeventhubs/example_producerclient_test.go index e7301649ddf1..343650fb59c5 100644 --- a/sdk/messaging/azeventhubs/example_producerclient_test.go +++ b/sdk/messaging/azeventhubs/example_producerclient_test.go @@ -98,6 +98,64 @@ func ExampleEventDataBatch_AddEventData() { } } +func ExampleEventDataBatch_AddEventData_rawAMQPMessages() { + batch, err := producerClient.NewEventDataBatch(context.TODO(), nil) + + if err != nil { + panic(err) + } + + // This is functionally equivalent to EventDataBatch.AddEventData(), just with a more + // advanced message format. + // See ExampleEventDataBatch_AddEventData for more details. + + err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ + Body: azeventhubs.AMQPAnnotatedMessageBody{ + Data: [][]byte{ + []byte("hello"), + []byte("world"), + }, + }, + }, nil) + + if err != nil { + panic(err) + } + + err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ + Body: azeventhubs.AMQPAnnotatedMessageBody{ + Sequence: [][]any{ + // let the AMQP stack encode your strings (or other primitives) for you, no need + // to convert them to bytes manually. + {"hello", "world"}, + {"howdy", "world"}, + }, + }, + }, nil) + + if err != nil { + panic(err) + } + + err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ + Body: azeventhubs.AMQPAnnotatedMessageBody{ + // let the AMQP stack encode your string (or other primitives) for you, no need + // to convert them to bytes manually. + Value: "hello world", + }, + }, nil) + + if err != nil { + panic(err) + } + + err = producerClient.SendEventBatch(context.TODO(), batch, nil) + + if err != nil { + panic(err) + } +} + func ExampleProducerClient_GetEventHubProperties() { eventHubProps, err := producerClient.GetEventHubProperties(context.TODO(), nil) diff --git a/sdk/messaging/azeventhubs/internal/test/test_helpers.go b/sdk/messaging/azeventhubs/internal/test/test_helpers.go index 5a7ea4d23eb6..1158cb1328f7 100644 --- a/sdk/messaging/azeventhubs/internal/test/test_helpers.go +++ b/sdk/messaging/azeventhubs/internal/test/test_helpers.go @@ -4,6 +4,7 @@ package test import ( + "context" "crypto/rand" "encoding/hex" "fmt" @@ -138,3 +139,9 @@ func mustGetEnvironmentVars(t *testing.T, names []string) map[string]string { return envVars } + +func RequireClose(t *testing.T, closeable interface { + Close(ctx context.Context) error +}) { + require.NoError(t, closeable.Close(context.Background())) +} diff --git a/sdk/messaging/azeventhubs/producer_client_test.go b/sdk/messaging/azeventhubs/producer_client_test.go index 61b00fe89634..9851d613b4e2 100644 --- a/sdk/messaging/azeventhubs/producer_client_test.go +++ b/sdk/messaging/azeventhubs/producer_client_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewProducerClient_GetHubAndPartitionProperties(t *testing.T) { +func TestProducerClient_GetHubAndPartitionProperties(t *testing.T) { testParams := test.GetConnectionParamsForTest(t) producer, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil) @@ -50,7 +50,7 @@ func TestNewProducerClient_GetHubAndPartitionProperties(t *testing.T) { wg.Wait() } -func TestNewProducerClient_GetEventHubsProperties(t *testing.T) { +func TestProducerClient_GetEventHubsProperties(t *testing.T) { testParams := test.GetConnectionParamsForTest(t) producer, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil) @@ -76,7 +76,7 @@ func TestNewProducerClient_GetEventHubsProperties(t *testing.T) { } } -func TestNewProducerClient_SendToAny(t *testing.T) { +func TestProducerClient_SendToAny(t *testing.T) { // there are two ways to "send to any" partition // 1. Don't specify a partition ID or a partition key when creating the batch // 2. Specify a partition key. This is useful if you want to send events and have them @@ -152,10 +152,147 @@ func TestNewProducerClient_SendToAny(t *testing.T) { } } -func makeByteSlice(index int, total int) []byte { - // ie: %0d, so it'll be zero padded up to the length we want - text := fmt.Sprintf("%0"+fmt.Sprintf("%d", total)+"d", index) - return []byte(text) +func TestProducerClient_AMQPAnnotatedMessages(t *testing.T) { + testParams := test.GetConnectionParamsForTest(t) + + producer, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil) + require.NoError(t, err) + + defer test.RequireClose(t, producer) + + beforeProps, err := producer.GetPartitionProperties(context.Background(), "0", nil) + require.NoError(t, err) + + numEvents := int64(0) + + // send the events we need, encoding several AMQP body types and exercising all the fields. + { + batch, err := producer.NewEventDataBatch(context.Background(), &azeventhubs.EventDataBatchOptions{ + PartitionID: to.Ptr("0"), + }) + require.NoError(t, err) + + // AMQP messages + + // sequence body + err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ + Body: azeventhubs.AMQPAnnotatedMessageBody{ + Sequence: [][]any{ + {"hello", "world"}, + {"howdy", "world"}, + }, + }, + ApplicationProperties: map[string]any{ + "appProperty1": "appProperty1Value", + }, + // It doesn't appear that we can't round-trip these attributes: + // Issue: https://github.com/Azure/azure-sdk-for-go/issues/19154 + DeliveryAnnotations: map[any]any{ + "deliveryAnnotation1": "deliveryAnnotation1Value", + }, + Header: &azeventhubs.AMQPAnnotatedMessageHeader{ + DeliveryCount: 100, + }, + Footer: map[any]any{ + "footerField1": "footerValue1", + }, + MessageAnnotations: map[any]any{ + "messageAnnotation1": 101, + }, + Properties: &azeventhubs.AMQPAnnotatedMessageProperties{ + GroupID: to.Ptr("custom-group-id"), + }, + }, nil) + require.NoError(t, err) + + // value body + err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ + Body: azeventhubs.AMQPAnnotatedMessageBody{ + Value: 999, + }, + }, nil) + require.NoError(t, err) + + // data body (multiple arrays, will be 'nil' in a normal ReceivedEventData) + err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{ + Body: azeventhubs.AMQPAnnotatedMessageBody{ + Data: [][]byte{ + []byte("hello"), + []byte("world"), + }, + }, + }, nil) + require.NoError(t, err) + + err = producer.SendEventBatch(context.Background(), batch, nil) + require.NoError(t, err) + + numEvents = int64(batch.NumEvents()) + } + + afterProps, err := producer.GetPartitionProperties(context.Background(), "0", nil) + require.NoError(t, err) + + require.Equal(t, numEvents, afterProps.LastEnqueuedSequenceNumber-beforeProps.LastEnqueuedSequenceNumber) + + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) + require.NoError(t, err) + + defer test.RequireClose(t, consumerClient) + + partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{ + StartPosition: getStartPosition(beforeProps), + }) + require.NoError(t, err) + + defer test.RequireClose(t, partitionClient) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + receivedEvents, err := partitionClient.ReceiveEvents(ctx, int(numEvents), nil) + require.NoError(t, err) + + // all of these events have Body encodings that can't be represented in EventData, + // so it'll be 'nil'. Users can get it from the inner RawAMQPMessage instead. + for _, e := range receivedEvents { + require.Nil(t, e.Body) + } + + sequenceMessage, valueMessage, multiarrayDataMessage := receivedEvents[0].RawAMQPMessage, receivedEvents[1].RawAMQPMessage, receivedEvents[2].RawAMQPMessage + + require.Equal(t, [][]any{ + {"hello", "world"}, + {"howdy", "world"}, + }, sequenceMessage.Body.Sequence) + + require.Equal(t, map[string]any{ + "appProperty1": "appProperty1Value", + }, sequenceMessage.ApplicationProperties) + + // It doesn't appear that we can round-trip this attribute: + // https://github.com/Azure/azure-sdk-for-go/issues/19154 + // require.Equal(t, uint32(101), sequenceMessage.Header.DeliveryCount) + // require.Equal(t, map[any]any{ + // "deliveryAnnotation1": "deliveryAnnotation1Value", + // }, sequenceMessage.DeliveryAnnotations) + + require.Equal(t, map[any]any{ + "footerField1": "footerValue1", + }, sequenceMessage.Footer) + + require.Equal(t, int64(101), sequenceMessage.MessageAnnotations["messageAnnotation1"]) + require.Equal(t, "custom-group-id", *sequenceMessage.Properties.GroupID) + + require.Equal(t, int64(999), valueMessage.Body.Value) + + // data body (multiple arrays, will be 'nil' in a normal ReceivedEventData) + require.Equal(t, [][]byte{ + []byte("hello"), + []byte("world"), + }, multiarrayDataMessage.Body.Data) + + require.Equal(t, int(numEvents), len(receivedEvents)) } func TestProducerClient_SendBatchExample(t *testing.T) { @@ -274,6 +411,12 @@ func TestProducerClient_SendBatchExample(t *testing.T) { } } +func makeByteSlice(index int, total int) []byte { + // ie: %0d, so it'll be zero padded up to the length we want + text := fmt.Sprintf("%0"+fmt.Sprintf("%d", total)+"d", index) + return []byte(text) +} + // receiveEventFromAnyPartition returns when it receives an event from any partition. Useful for tests where you're // letting the service route the event and you're not sure where it'll end up. func receiveEventFromAnyPartition(ctx context.Context, t *testing.T, consumer *azeventhubs.ConsumerClient, allPartitions []azeventhubs.PartitionProperties) *azeventhubs.ReceivedEventData {