Skip to content

Commit

Permalink
Adding in raw AMQP message support (#19156)
Browse files Browse the repository at this point in the history
Enabling raw AMQP message support with two pieces:

* ReceivedEventData now has a RawAMQPMessage, of type AMQPAnnotatedMessage. The AMQPAnnotatedMessage is the same as what we have in azservicebus except it doesn't need some unexported fields since we don't do message settlement.
* EventDataBatch has a new function (AddAMQPAnnotatedMessage) which takes an AMQPAnnotatedMessage as a parameter. It functions exactly the same as it's EventData counterpart (AddEventData).
There are a few new .go files for AMQP support, but they are largely duplicates of what we have in azservicebus.
  • Loading branch information
richardpark-msft authored Sep 21, 2022
1 parent 7e8768a commit 6b6811d
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 46 deletions.
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Raw AMQP message support, including full support for encoding Body (Value, Sequence and also multiple byte slices for Data). See ExampleEventDataBatch_AddEventData_rawAMQPMessages for some concrete examples.

### Breaking Changes

- EventDataBatch.NumMessages() renamed to EventDataBatch.NumEvents()
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Examples for various scenarios can be found on [pkg.go.dev](https://pkg.go.dev/g

This module uses the classification-based logging implementation in `azcore`. To enable console logging for all SDK modules, set the environment variable `AZURE_SDK_GO_LOGGING` to `all`.

Use the `azcore/log` package to control log event output or to enable logs for `azservicebus` only. For example:
Use the `azcore/log` package to control log event output or to enable logs for `azeventhubs` only. For example:

```go
import (
Expand Down
271 changes: 271 additions & 0 deletions sdk/messaging/azeventhubs/amqp_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azeventhubs

import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
)

// AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs.
// For details about these properties, refer to the AMQP specification:
//
// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
//
// Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some
// cases slices and maps.
//
// AMQP simple types include:
// - int (any size), uint (any size)
// - float (any size)
// - string
// - bool
// - time.Time
type AMQPAnnotatedMessage struct {
// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
ApplicationProperties map[string]any

// Body represents the body of an AMQP message.
Body AMQPAnnotatedMessageBody

// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
DeliveryAnnotations map[any]any

// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
// for this message.
DeliveryTag []byte

// Footer is the transport footers for this AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
Footer map[any]any

// Header is the transport headers for this AMQP message.
Header *AMQPAnnotatedMessageHeader

// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
MessageAnnotations map[any]any

// Properties corresponds to the properties section of an AMQP message.
Properties *AMQPAnnotatedMessageProperties
}

// AMQPAnnotatedMessageProperties represents the properties of an AMQP message.
// See here for more details:
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
type AMQPAnnotatedMessageProperties struct {
// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
AbsoluteExpiryTime *time.Time

// ContentEncoding corresponds to the 'content-encoding' property.
ContentEncoding *string

// ContentType corresponds to the 'content-type' property
ContentType *string

// CorrelationID corresponds to the 'correlation-id' property.
// The type of CorrelationID can be a uint64, UUID, []byte, or a string
CorrelationID any

// CreationTime corresponds to the 'creation-time' property.
CreationTime *time.Time

// GroupID corresponds to the 'group-id' property.
GroupID *string

// GroupSequence corresponds to the 'group-sequence' property.
GroupSequence *uint32

// MessageID corresponds to the 'message-id' property.
// The type of MessageID can be a uint64, UUID, []byte, or string
MessageID any

// ReplyTo corresponds to the 'reply-to' property.
ReplyTo *string

// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
ReplyToGroupID *string

// Subject corresponds to the 'subject' property.
Subject *string

// To corresponds to the 'to' property.
To *string

// UserID corresponds to the 'user-id' property.
UserID []byte
}

// AMQPAnnotatedMessageBody represents the body of an AMQP message.
// Only one of these fields can be used a a time. They are mutually exclusive.
type AMQPAnnotatedMessageBody struct {
// Data is encoded/decoded as multiple data sections in the body.
Data [][]byte

// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
//
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
Sequence [][]any

// Value is encoded/decoded as the amqp-value section in the body.
//
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
// as well as slices or maps of AMQP simple types.
Value any
}

// AMQPAnnotatedMessageHeader carries standard delivery details about the transfer
// of a message.
// See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header
// for more details.
type AMQPAnnotatedMessageHeader struct {
// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
// It corresponds to the 'delivery-count' property.
DeliveryCount uint32

// Durable corresponds to the 'durable' property.
Durable bool

// FirstAcquirer corresponds to the 'first-acquirer' property.
FirstAcquirer bool

// Priority corresponds to the 'priority' property.
Priority uint8

// TTL corresponds to the 'ttl' property.
TTL time.Duration
}

// toAMQPMessage converts between our (azeventhubs) AMQP message
// to the underlying message used by go-amqp.
func (am *AMQPAnnotatedMessage) toAMQPMessage() *amqp.Message {
var header *amqp.MessageHeader

if am.Header != nil {
header = &amqp.MessageHeader{
DeliveryCount: am.Header.DeliveryCount,
Durable: am.Header.Durable,
FirstAcquirer: am.Header.FirstAcquirer,
Priority: am.Header.Priority,
TTL: am.Header.TTL,
}
}

var properties *amqp.MessageProperties

if am.Properties != nil {
properties = &amqp.MessageProperties{
AbsoluteExpiryTime: am.Properties.AbsoluteExpiryTime,
ContentEncoding: am.Properties.ContentEncoding,
ContentType: am.Properties.ContentType,
CorrelationID: am.Properties.CorrelationID,
CreationTime: am.Properties.CreationTime,
GroupID: am.Properties.GroupID,
GroupSequence: am.Properties.GroupSequence,
MessageID: am.Properties.MessageID,
ReplyTo: am.Properties.ReplyTo,
ReplyToGroupID: am.Properties.ReplyToGroupID,
Subject: am.Properties.Subject,
To: am.Properties.To,
UserID: am.Properties.UserID,
}
} else {
properties = &amqp.MessageProperties{}
}

var footer amqp.Annotations

if am.Footer != nil {
footer = (amqp.Annotations)(am.Footer)
}

return &amqp.Message{
Annotations: copyAnnotations(am.MessageAnnotations),
ApplicationProperties: am.ApplicationProperties,
Data: am.Body.Data,
DeliveryAnnotations: amqp.Annotations(am.DeliveryAnnotations),
DeliveryTag: am.DeliveryTag,
Footer: footer,
Header: header,
Properties: properties,
Sequence: am.Body.Sequence,
Value: am.Body.Value,
}
}

func copyAnnotations(src map[any]any) amqp.Annotations {
if src == nil {
return amqp.Annotations{}
}

dest := amqp.Annotations{}

for k, v := range src {
dest[k] = v
}

return dest
}

func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage {
var header *AMQPAnnotatedMessageHeader

if goAMQPMessage.Header != nil {
header = &AMQPAnnotatedMessageHeader{
DeliveryCount: goAMQPMessage.Header.DeliveryCount,
Durable: goAMQPMessage.Header.Durable,
FirstAcquirer: goAMQPMessage.Header.FirstAcquirer,
Priority: goAMQPMessage.Header.Priority,
TTL: goAMQPMessage.Header.TTL,
}
}

var properties *AMQPAnnotatedMessageProperties

if goAMQPMessage.Properties != nil {
properties = &AMQPAnnotatedMessageProperties{
AbsoluteExpiryTime: goAMQPMessage.Properties.AbsoluteExpiryTime,
ContentEncoding: goAMQPMessage.Properties.ContentEncoding,
ContentType: goAMQPMessage.Properties.ContentType,
CorrelationID: goAMQPMessage.Properties.CorrelationID,
CreationTime: goAMQPMessage.Properties.CreationTime,
GroupID: goAMQPMessage.Properties.GroupID,
GroupSequence: goAMQPMessage.Properties.GroupSequence,
MessageID: goAMQPMessage.Properties.MessageID,
ReplyTo: goAMQPMessage.Properties.ReplyTo,
ReplyToGroupID: goAMQPMessage.Properties.ReplyToGroupID,
Subject: goAMQPMessage.Properties.Subject,
To: goAMQPMessage.Properties.To,
UserID: goAMQPMessage.Properties.UserID,
}
}

var footer map[any]any

if goAMQPMessage.Footer != nil {
footer = (map[any]any)(goAMQPMessage.Footer)
}

return &AMQPAnnotatedMessage{
MessageAnnotations: map[any]any(goAMQPMessage.Annotations),
ApplicationProperties: goAMQPMessage.ApplicationProperties,
Body: AMQPAnnotatedMessageBody{
Data: goAMQPMessage.Data,
Sequence: goAMQPMessage.Sequence,
Value: goAMQPMessage.Value,
},
DeliveryAnnotations: map[any]any(goAMQPMessage.DeliveryAnnotations),
DeliveryTag: goAMQPMessage.DeliveryTag,
Footer: footer,
Header: header,
Properties: properties,
}
}
22 changes: 22 additions & 0 deletions sdk/messaging/azeventhubs/amqp_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azeventhubs

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestAMQPAnnotatedMessageUnitTest(t *testing.T) {
t.Run("Default", func(t *testing.T) {
msg := &AMQPAnnotatedMessage{}
amqpMessage := msg.toAMQPMessage()

// we duplicate/inflate these since we modify them
// in various parts of the API.
require.NotNil(t, amqpMessage.Properties)
require.NotNil(t, amqpMessage.Annotations)
})
}
10 changes: 9 additions & 1 deletion sdk/messaging/azeventhubs/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type ReceivedEventData struct {
// Offset is the offset of the event.
Offset *int64

// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
// and Header fields.
RawAMQPMessage *AMQPAnnotatedMessage

// SequenceNumber is a unique number assigned to a message by Event Hubs.
SequenceNumber int64

Expand Down Expand Up @@ -100,7 +106,9 @@ func (e *EventData) toAMQPMessage() *amqp.Message {
// NOTE: this converter assumes that the Body of this message will be the first
// serialized byte array in the Data section of the messsage.
func newReceivedEventData(amqpMsg *amqp.Message) (*ReceivedEventData, error) {
re := &ReceivedEventData{}
re := &ReceivedEventData{
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
}

if len(amqpMsg.Data) == 1 {
re.Body = amqpMsg.Data[0]
Expand Down
Loading

0 comments on commit 6b6811d

Please sign in to comment.