diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 2f5031ff8fbb..97a475a2eb44 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- `ReceivedMessage` can be converted to a `Message` for easier re-sending, using `ReceivedMessage.Message()`. PR#21472 + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/messaging/azservicebus/message.go b/sdk/messaging/azservicebus/message.go index 40b690b0f548..90c22a446545 100644 --- a/sdk/messaging/azservicebus/message.go +++ b/sdk/messaging/azservicebus/message.go @@ -132,6 +132,26 @@ type ReceivedMessage struct { deferred bool } +// Message creates a shallow copy of the fields from this message to an instance of +// [Message]. +func (rm *ReceivedMessage) Message() *Message { + return &Message{ + ApplicationProperties: rm.ApplicationProperties, + Body: rm.Body, + ContentType: rm.ContentType, + CorrelationID: rm.CorrelationID, + MessageID: &rm.MessageID, + PartitionKey: rm.PartitionKey, + ReplyTo: rm.ReplyTo, + ReplyToSessionID: rm.ReplyToSessionID, + ScheduledEnqueueTime: rm.ScheduledEnqueueTime, + SessionID: rm.SessionID, + Subject: rm.Subject, + TimeToLive: rm.TimeToLive, + To: rm.To, + } +} + // MessageState represents the current state of a message (Active, Scheduled, Deferred). type MessageState int32 @@ -284,28 +304,6 @@ func (m *Message) toAMQPMessage() *amqp.Message { amqpMsg.Annotations[scheduledEnqueuedTimeAnnotation] = *m.ScheduledEnqueueTime } - // TODO: These are 'received' message properties so I believe their inclusion here was just an artifact of only - // having one message type. - - // if m.SystemProperties != nil { - // // Set the raw annotations first (they may be nil) and add the explicit - // // system properties second to ensure they're set properly. - // amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, m.SystemProperties.Annotations) - - // sysPropMap, err := encodeStructureToMap(m.SystemProperties) - // if err != nil { - // return nil, err - // } - // amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, sysPropMap) - // } - - // if m.LockToken != nil { - // if amqpMsg.DeliveryAnnotations == nil { - // amqpMsg.DeliveryAnnotations = make(amqp.Annotations) - // } - // amqpMsg.DeliveryAnnotations[lockTokenName] = *m.LockToken - // } - return amqpMsg } diff --git a/sdk/messaging/azservicebus/message_test.go b/sdk/messaging/azservicebus/message_test.go index 3704b7591f35..5140a39a712b 100644 --- a/sdk/messaging/azservicebus/message_test.go +++ b/sdk/messaging/azservicebus/message_test.go @@ -216,3 +216,67 @@ func TestMessageWithIncorrectBody(t *testing.T) { }, "receiving_link") require.Nil(t, message.Body) } + +func TestReceivedMessageToMessage(t *testing.T) { + rm := &ReceivedMessage{ + ApplicationProperties: map[string]any{ + "hello": "world", + }, + Body: []byte("body content"), + ContentType: to.Ptr("content type"), + CorrelationID: to.Ptr("correlation ID"), + DeadLetterErrorDescription: to.Ptr("dead letter error description"), + DeadLetterReason: to.Ptr("dead letter reason"), + DeadLetterSource: to.Ptr("dead letter source"), + DeliveryCount: 9, + EnqueuedSequenceNumber: to.Ptr[int64](101), + EnqueuedTime: mustParseTime("2023-01-01T01:02:03Z"), + ExpiresAt: mustParseTime("2023-01-02T01:02:03Z"), + LockedUntil: mustParseTime("2023-01-03T01:02:03Z"), + LockToken: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + MessageID: "message ID", + PartitionKey: to.Ptr("partition key"), + ReplyTo: to.Ptr("reply to"), + ReplyToSessionID: to.Ptr("reply to session id"), + ScheduledEnqueueTime: mustParseTime("2023-01-04T01:02:03Z"), + SequenceNumber: to.Ptr[int64](102), + SessionID: to.Ptr("session id"), + State: 10, + Subject: to.Ptr("subject"), + TimeToLive: to.Ptr(time.Second), + To: to.Ptr("to"), + RawAMQPMessage: &AMQPAnnotatedMessage{}, // doesn't exist on `Message`, ignored. + } + + msg := rm.Message() + + expectedMsg := &Message{ + ApplicationProperties: map[string]any{ + "hello": "world", + }, + Body: []byte("body content"), + ContentType: to.Ptr("content type"), + CorrelationID: to.Ptr("correlation ID"), + MessageID: to.Ptr("message ID"), + PartitionKey: to.Ptr("partition key"), + ReplyTo: to.Ptr("reply to"), + ReplyToSessionID: to.Ptr("reply to session id"), + ScheduledEnqueueTime: mustParseTime("2023-01-04T01:02:03Z"), + SessionID: to.Ptr("session id"), + Subject: to.Ptr("subject"), + TimeToLive: to.Ptr(time.Second), + To: to.Ptr("to"), + } + + require.Equal(t, msg, expectedMsg) +} + +func mustParseTime(str string) *time.Time { + tm, err := time.Parse(time.RFC3339, str) + + if err != nil { + panic(err) + } + + return &tm +} diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index d1617f77526a..a51b410b1d9a 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -892,6 +892,55 @@ func TestReceiverUnauthorizedCreds(t *testing.T) { }) } +func TestReceiveAndSendAndReceive(t *testing.T) { + serviceBusClient, cleanup, queueName := setupLiveTest(t, nil) + defer cleanup() + + sender, err := serviceBusClient.NewSender(queueName, nil) + require.NoError(t, err) + defer sender.Close(context.Background()) + + scheduledEnqueuedTime := time.Now() + + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte("body text"), + ApplicationProperties: map[string]any{ + "hello": "world", + }, + ContentType: to.Ptr("application/text"), + CorrelationID: to.Ptr("correlation ID"), + MessageID: to.Ptr("message id"), + PartitionKey: to.Ptr("session id"), + ReplyTo: to.Ptr("reply to"), + ReplyToSessionID: to.Ptr("reply to session id"), + ScheduledEnqueueTime: &scheduledEnqueuedTime, + SessionID: to.Ptr("session id"), + Subject: to.Ptr("subject"), + TimeToLive: to.Ptr(time.Minute), + To: to.Ptr("to"), + }, nil) + require.NoError(t, err) + + receiver, err := serviceBusClient.NewReceiverForQueue(queueName, &ReceiverOptions{ + ReceiveMode: ReceiveModeReceiveAndDelete, + }) + require.NoError(t, err) + + msgs, err := receiver.ReceiveMessages(context.Background(), 1, nil) + require.NoError(t, err) + require.Equal(t, "body text", string(msgs[0].Body)) + + // re-send + err = sender.SendMessage(context.Background(), msgs[0].Message(), nil) + require.NoError(t, err) + + // re-receive + rereceivedMsgs, err := receiver.ReceiveMessages(context.Background(), 1, nil) + require.NoError(t, err) + + require.Equal(t, msgs[0].Message(), rereceivedMsgs[0].Message(), "all sendable fields are preserved when resending") +} + type receivedMessageSlice []*ReceivedMessage func (messages receivedMessageSlice) Len() int {