Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding in raw AMQP message support #19156

Merged
merged 4 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Raw AMQP message support, including full support for encoding Body (Value, Sequence and also multiple byte slices for Data). See ExampleEventDataBatch_AddEventData_rawAMQPMessages for some concrete examples.

### Breaking Changes

- EventDataBatch.NumMessages() renamed to EventDataBatch.NumEvents()
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Examples for various scenarios can be found on [pkg.go.dev](https://pkg.go.dev/g

This module uses the classification-based logging implementation in `azcore`. To enable console logging for all SDK modules, set the environment variable `AZURE_SDK_GO_LOGGING` to `all`.

Use the `azcore/log` package to control log event output or to enable logs for `azservicebus` only. For example:
Use the `azcore/log` package to control log event output or to enable logs for `azeventhubs` only. For example:

```go
import (
Expand Down
271 changes: 271 additions & 0 deletions sdk/messaging/azeventhubs/amqp_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azeventhubs

import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
)

// AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs.
// For details about these properties, refer to the AMQP specification:
//
// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
//
// Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some
// cases slices and maps.
//
// AMQP simple types include:
// - int (any size), uint (any size)
// - float (any size)
// - string
// - bool
// - time.Time
type AMQPAnnotatedMessage struct {
// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
ApplicationProperties map[string]any

// Body represents the body of an AMQP message.
Body AMQPAnnotatedMessageBody

// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
DeliveryAnnotations map[any]any

// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
// for this message.
DeliveryTag []byte

// Footer is the transport footers for this AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
Footer map[any]any

// Header is the transport headers for this AMQP message.
Header *AMQPAnnotatedMessageHeader

// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
MessageAnnotations map[any]any

// Properties corresponds to the properties section of an AMQP message.
Properties *AMQPAnnotatedMessageProperties
}

// AMQPAnnotatedMessageProperties represents the properties of an AMQP message.
// See here for more details:
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
type AMQPAnnotatedMessageProperties struct {
// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
AbsoluteExpiryTime *time.Time

// ContentEncoding corresponds to the 'content-encoding' property.
ContentEncoding *string

// ContentType corresponds to the 'content-type' property
ContentType *string

// CorrelationID corresponds to the 'correlation-id' property.
// The type of CorrelationID can be a uint64, UUID, []byte, or a string
CorrelationID any

// CreationTime corresponds to the 'creation-time' property.
CreationTime *time.Time

// GroupID corresponds to the 'group-id' property.
GroupID *string

// GroupSequence corresponds to the 'group-sequence' property.
GroupSequence *uint32

// MessageID corresponds to the 'message-id' property.
// The type of MessageID can be a uint64, UUID, []byte, or string
MessageID any

// ReplyTo corresponds to the 'reply-to' property.
ReplyTo *string

// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
ReplyToGroupID *string

// Subject corresponds to the 'subject' property.
Subject *string

// To corresponds to the 'to' property.
To *string

// UserID corresponds to the 'user-id' property.
UserID []byte
}

// AMQPAnnotatedMessageBody represents the body of an AMQP message.
// Only one of these fields can be used a a time. They are mutually exclusive.
type AMQPAnnotatedMessageBody struct {
// Data is encoded/decoded as multiple data sections in the body.
Data [][]byte

// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
//
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
Sequence [][]any

// Value is encoded/decoded as the amqp-value section in the body.
//
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
// as well as slices or maps of AMQP simple types.
Value any
}

// AMQPAnnotatedMessageHeader carries standard delivery details about the transfer
// of a message.
// See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header
// for more details.
type AMQPAnnotatedMessageHeader struct {
// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
// It corresponds to the 'delivery-count' property.
DeliveryCount uint32

// Durable corresponds to the 'durable' property.
Durable bool

// FirstAcquirer corresponds to the 'first-acquirer' property.
FirstAcquirer bool

// Priority corresponds to the 'priority' property.
Priority uint8

// TTL corresponds to the 'ttl' property.
TTL time.Duration
}

// toAMQPMessage converts between our (azeventhubs) AMQP message
// to the underlying message used by go-amqp.
func (am *AMQPAnnotatedMessage) toAMQPMessage() *amqp.Message {
var header *amqp.MessageHeader

if am.Header != nil {
header = &amqp.MessageHeader{
DeliveryCount: am.Header.DeliveryCount,
Durable: am.Header.Durable,
FirstAcquirer: am.Header.FirstAcquirer,
Priority: am.Header.Priority,
TTL: am.Header.TTL,
}
}

var properties *amqp.MessageProperties

if am.Properties != nil {
properties = &amqp.MessageProperties{
AbsoluteExpiryTime: am.Properties.AbsoluteExpiryTime,
ContentEncoding: am.Properties.ContentEncoding,
ContentType: am.Properties.ContentType,
CorrelationID: am.Properties.CorrelationID,
CreationTime: am.Properties.CreationTime,
GroupID: am.Properties.GroupID,
GroupSequence: am.Properties.GroupSequence,
MessageID: am.Properties.MessageID,
ReplyTo: am.Properties.ReplyTo,
ReplyToGroupID: am.Properties.ReplyToGroupID,
Subject: am.Properties.Subject,
To: am.Properties.To,
UserID: am.Properties.UserID,
}
} else {
properties = &amqp.MessageProperties{}
}

var footer amqp.Annotations

if am.Footer != nil {
footer = (amqp.Annotations)(am.Footer)
}

return &amqp.Message{
Annotations: copyAnnotations(am.MessageAnnotations),
ApplicationProperties: am.ApplicationProperties,
Data: am.Body.Data,
DeliveryAnnotations: amqp.Annotations(am.DeliveryAnnotations),
DeliveryTag: am.DeliveryTag,
Footer: footer,
Header: header,
Properties: properties,
Sequence: am.Body.Sequence,
Value: am.Body.Value,
}
}

func copyAnnotations(src map[any]any) amqp.Annotations {
if src == nil {
return amqp.Annotations{}
}

dest := amqp.Annotations{}

for k, v := range src {
dest[k] = v
}

return dest
}

func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage {
var header *AMQPAnnotatedMessageHeader

if goAMQPMessage.Header != nil {
header = &AMQPAnnotatedMessageHeader{
DeliveryCount: goAMQPMessage.Header.DeliveryCount,
Durable: goAMQPMessage.Header.Durable,
FirstAcquirer: goAMQPMessage.Header.FirstAcquirer,
Priority: goAMQPMessage.Header.Priority,
TTL: goAMQPMessage.Header.TTL,
}
}

var properties *AMQPAnnotatedMessageProperties

if goAMQPMessage.Properties != nil {
properties = &AMQPAnnotatedMessageProperties{
AbsoluteExpiryTime: goAMQPMessage.Properties.AbsoluteExpiryTime,
ContentEncoding: goAMQPMessage.Properties.ContentEncoding,
ContentType: goAMQPMessage.Properties.ContentType,
CorrelationID: goAMQPMessage.Properties.CorrelationID,
CreationTime: goAMQPMessage.Properties.CreationTime,
GroupID: goAMQPMessage.Properties.GroupID,
GroupSequence: goAMQPMessage.Properties.GroupSequence,
MessageID: goAMQPMessage.Properties.MessageID,
ReplyTo: goAMQPMessage.Properties.ReplyTo,
ReplyToGroupID: goAMQPMessage.Properties.ReplyToGroupID,
Subject: goAMQPMessage.Properties.Subject,
To: goAMQPMessage.Properties.To,
UserID: goAMQPMessage.Properties.UserID,
}
}

var footer map[any]any

if goAMQPMessage.Footer != nil {
footer = (map[any]any)(goAMQPMessage.Footer)
}

return &AMQPAnnotatedMessage{
MessageAnnotations: map[any]any(goAMQPMessage.Annotations),
ApplicationProperties: goAMQPMessage.ApplicationProperties,
Body: AMQPAnnotatedMessageBody{
Data: goAMQPMessage.Data,
Sequence: goAMQPMessage.Sequence,
Value: goAMQPMessage.Value,
},
DeliveryAnnotations: map[any]any(goAMQPMessage.DeliveryAnnotations),
DeliveryTag: goAMQPMessage.DeliveryTag,
Footer: footer,
Header: header,
Properties: properties,
}
}
22 changes: 22 additions & 0 deletions sdk/messaging/azeventhubs/amqp_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azeventhubs

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestAMQPAnnotatedMessageUnitTest(t *testing.T) {
t.Run("Default", func(t *testing.T) {
msg := &AMQPAnnotatedMessage{}
amqpMessage := msg.toAMQPMessage()

// we duplicate/inflate these since we modify them
// in various parts of the API.
require.NotNil(t, amqpMessage.Properties)
require.NotNil(t, amqpMessage.Annotations)
})
}
10 changes: 9 additions & 1 deletion sdk/messaging/azeventhubs/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type ReceivedEventData struct {
// Offset is the offset of the event.
Offset *int64

// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
// and Header fields.
RawAMQPMessage *AMQPAnnotatedMessage

// SequenceNumber is a unique number assigned to a message by Event Hubs.
SequenceNumber int64

Expand Down Expand Up @@ -100,7 +106,9 @@ func (e *EventData) toAMQPMessage() *amqp.Message {
// NOTE: this converter assumes that the Body of this message will be the first
// serialized byte array in the Data section of the messsage.
func newReceivedEventData(amqpMsg *amqp.Message) (*ReceivedEventData, error) {
re := &ReceivedEventData{}
re := &ReceivedEventData{
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
}

if len(amqpMsg.Data) == 1 {
re.Body = amqpMsg.Data[0]
Expand Down
Loading