Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Add a helper function to get a Message from a ReceivedMessage #21472

Merged
merged 5 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- `ReceivedMessage` can be converted to a `Message` for easier re-sending, using `ReceivedMessage.Message()`. PR#21472

### Breaking Changes

### Bugs Fixed
Expand Down
42 changes: 20 additions & 22 deletions sdk/messaging/azservicebus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ type ReceivedMessage struct {
deferred bool
}

// Message creates a shallow copy of the fields from this message to an instance of
// [Message].
func (rm *ReceivedMessage) Message() *Message {
return &Message{
ApplicationProperties: rm.ApplicationProperties,
Body: rm.Body,
ContentType: rm.ContentType,
CorrelationID: rm.CorrelationID,
MessageID: &rm.MessageID,
PartitionKey: rm.PartitionKey,
ReplyTo: rm.ReplyTo,
ReplyToSessionID: rm.ReplyToSessionID,
ScheduledEnqueueTime: rm.ScheduledEnqueueTime,
SessionID: rm.SessionID,
Subject: rm.Subject,
TimeToLive: rm.TimeToLive,
To: rm.To,
}
}

// MessageState represents the current state of a message (Active, Scheduled, Deferred).
type MessageState int32

Expand Down Expand Up @@ -284,28 +304,6 @@ func (m *Message) toAMQPMessage() *amqp.Message {
amqpMsg.Annotations[scheduledEnqueuedTimeAnnotation] = *m.ScheduledEnqueueTime
}

// TODO: These are 'received' message properties so I believe their inclusion here was just an artifact of only
// having one message type.

// if m.SystemProperties != nil {
// // Set the raw annotations first (they may be nil) and add the explicit
// // system properties second to ensure they're set properly.
// amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, m.SystemProperties.Annotations)

// sysPropMap, err := encodeStructureToMap(m.SystemProperties)
// if err != nil {
// return nil, err
// }
// amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, sysPropMap)
// }

// if m.LockToken != nil {
// if amqpMsg.DeliveryAnnotations == nil {
// amqpMsg.DeliveryAnnotations = make(amqp.Annotations)
// }
// amqpMsg.DeliveryAnnotations[lockTokenName] = *m.LockToken
// }

return amqpMsg
}

Expand Down
64 changes: 64 additions & 0 deletions sdk/messaging/azservicebus/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,67 @@ func TestMessageWithIncorrectBody(t *testing.T) {
}, "receiving_link")
require.Nil(t, message.Body)
}

func TestReceivedMessageToMessage(t *testing.T) {
rm := &ReceivedMessage{
ApplicationProperties: map[string]any{
"hello": "world",
},
Body: []byte("body content"),
ContentType: to.Ptr("content type"),
CorrelationID: to.Ptr("correlation ID"),
DeadLetterErrorDescription: to.Ptr("dead letter error description"),
DeadLetterReason: to.Ptr("dead letter reason"),
DeadLetterSource: to.Ptr("dead letter source"),
DeliveryCount: 9,
EnqueuedSequenceNumber: to.Ptr[int64](101),
EnqueuedTime: mustParseTime("2023-01-01T01:02:03Z"),
ExpiresAt: mustParseTime("2023-01-02T01:02:03Z"),
LockedUntil: mustParseTime("2023-01-03T01:02:03Z"),
LockToken: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
MessageID: "message ID",
PartitionKey: to.Ptr("partition key"),
ReplyTo: to.Ptr("reply to"),
ReplyToSessionID: to.Ptr("reply to session id"),
ScheduledEnqueueTime: mustParseTime("2023-01-04T01:02:03Z"),
SequenceNumber: to.Ptr[int64](102),
SessionID: to.Ptr("session id"),
State: 10,
Subject: to.Ptr("subject"),
TimeToLive: to.Ptr(time.Second),
To: to.Ptr("to"),
RawAMQPMessage: &AMQPAnnotatedMessage{}, // doesn't exist on `Message`, ignored.
}

msg := rm.Message()

expectedMsg := &Message{
ApplicationProperties: map[string]any{
"hello": "world",
},
Body: []byte("body content"),
ContentType: to.Ptr("content type"),
CorrelationID: to.Ptr("correlation ID"),
MessageID: to.Ptr("message ID"),
PartitionKey: to.Ptr("partition key"),
ReplyTo: to.Ptr("reply to"),
ReplyToSessionID: to.Ptr("reply to session id"),
ScheduledEnqueueTime: mustParseTime("2023-01-04T01:02:03Z"),
SessionID: to.Ptr("session id"),
Subject: to.Ptr("subject"),
TimeToLive: to.Ptr(time.Second),
To: to.Ptr("to"),
}

require.Equal(t, msg, expectedMsg)
}

func mustParseTime(str string) *time.Time {
tm, err := time.Parse(time.RFC3339, str)

if err != nil {
panic(err)
}

return &tm
}
49 changes: 49 additions & 0 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,55 @@ func TestReceiverUnauthorizedCreds(t *testing.T) {
})
}

func TestReceiveAndSendAndReceive(t *testing.T) {
serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()

sender, err := serviceBusClient.NewSender(queueName, nil)
require.NoError(t, err)
defer sender.Close(context.Background())

scheduledEnqueuedTime := time.Now()

err = sender.SendMessage(context.Background(), &Message{
Body: []byte("body text"),
ApplicationProperties: map[string]any{
"hello": "world",
},
ContentType: to.Ptr("application/text"),
CorrelationID: to.Ptr("correlation ID"),
MessageID: to.Ptr("message id"),
PartitionKey: to.Ptr("session id"),
ReplyTo: to.Ptr("reply to"),
ReplyToSessionID: to.Ptr("reply to session id"),
ScheduledEnqueueTime: &scheduledEnqueuedTime,
SessionID: to.Ptr("session id"),
Subject: to.Ptr("subject"),
TimeToLive: to.Ptr(time.Minute),
To: to.Ptr("to"),
}, nil)
require.NoError(t, err)

receiver, err := serviceBusClient.NewReceiverForQueue(queueName, &ReceiverOptions{
ReceiveMode: ReceiveModeReceiveAndDelete,
})
require.NoError(t, err)

msgs, err := receiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)
require.Equal(t, "body text", string(msgs[0].Body))

// re-send
err = sender.SendMessage(context.Background(), msgs[0].Message(), nil)
require.NoError(t, err)

// re-receive
rereceivedMsgs, err := receiver.ReceiveMessages(context.Background(), 1, nil)
require.NoError(t, err)

require.Equal(t, msgs[0].Message(), rereceivedMsgs[0].Message(), "all sendable fields are preserved when resending")
}

type receivedMessageSlice []*ReceivedMessage

func (messages receivedMessageSlice) Len() int {
Expand Down