Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Filling the gaps for AmqpAnnotatedMessage #14786

Merged
10 commits merged into from
Apr 15, 2021
1 change: 1 addition & 0 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface AmqpAnnotatedMessage {
// @public
export const AmqpAnnotatedMessage: {
fromRheaMessage(msg: Message): AmqpAnnotatedMessage;
toRheaMessage(msg: AmqpAnnotatedMessage): Message;
};

// @public
Expand Down
15 changes: 15 additions & 0 deletions sdk/core/core-amqp/src/amqpAnnotatedMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,20 @@ export const AmqpAnnotatedMessage = {
properties: AmqpMessageProperties.fromRheaMessageProperties(msg),
body: msg.body
};
},
/**
* Takes AmqpAnnotatedMessage and returns it in the RheaMessage(`Message` type from "rhea") format.
*/
toRheaMessage(msg: AmqpAnnotatedMessage): RheaMessage {
const message = {
...AmqpMessageProperties.toRheaMessageProperties(msg.properties || {}),
...AmqpMessageHeader.toRheaMessageHeader(msg.header || {}),
body: msg.body,
message_annotations: msg.messageAnnotations,
delivery_annotations: msg.deliveryAnnotations,
application_properties: msg.applicationProperties,
footer: msg.footer
};
return message;
}
};
84 changes: 38 additions & 46 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,51 +278,52 @@ export function toRheaMessage(
msg: ServiceBusMessage | ServiceBusReceivedMessage | AmqpAnnotatedMessage,
encoder: Pick<typeof defaultDataTransformer, "encode">
): RheaMessage {
let amqpMsg: RheaMessage;
if (isAmqpAnnotatedMessage(msg)) {
const amqpMsg: RheaMessage = {
body: encoder.encode(msg.body, msg.bodyType ?? "data"),
message_annotations: {}
amqpMsg = {
...AmqpAnnotatedMessage.toRheaMessage(msg),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have custom logic for absolute_expiry_time for ServiceBusMessage
image

...we should do the same for AmqpAnnotatedMessage? (.NET has the same logic too!)

@JoshLove-msft

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging @yunhaoling @hemanttanwar to make sure the same logic is present in Java/Python too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have this in python, for service bus message (and annotated message), we don't calculate absolute_expiry_time based on ttl.

two questions:

  1. during which process is your logic applied, setting the time_to_live property of a service bus message?
  2. absolute_expiry_time is in the amqp message property while ttl is in the amqp message header, I read the amqp spec but didn't find the spec saying those two are related to each other, so do you know why you have this logic?

ttl, Duration in milliseconds for which the message is to be considered "live". If this is set then a message expiration time will be computed based on the time of arrival at an intermediary. Messages that live longer than their expiration time will be discarded (or dead lettered). When a message is transmitted by an intermediary that was received with a ttl, the transmitted message's header SHOULD contain a ttl that is computed as the difference between the current time and the formerly computed message expiration time, i.e., the reduced ttl, so that messages will eventually die if they end up in a delivery loop.
absolute-expiry-time, An absolute time when this message is considered to be expired.

Copy link
Member Author

@HarshaNalluru HarshaNalluru Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during which process is your logic applied, setting the time_to_live property of a service bus message?

This gets applied to every message that is being sent(during the translation of SBMessage to the internal AMQP message).

absolute_expiry_time is in the amqp message property while ttl is in the amqp message header, I read the amqp spec but didn't find the spec saying those two are related to each other, so do you know why you have this logic?

Not from the spec, service-bus service has a bug that required us to add this(/cc @JoshLove-msft). Clemens is also involved here, it's better to reach out to him and get approval if we're changing anything here.

For JS especially,

  • SBMessage
    • doesn't expose the absolute_expiry_time on the message to be sent(and we use the custom logic to set the absolute_expiry_time property while translating it to the internal amqp message)
  • AmqpAnnotatedMessage
    • While sending, if we apply the same logic, we're overriding the property set by the user
    • Maybe we can consider removing CreationTime/AbsoluteExpiry from the AmqpAnnotatedMessage that is used for sending and go ahead with the custom logic of calculating it from ttl.

body: encoder.encode(msg.body, msg.bodyType ?? "data")
};

if (msg.applicationProperties != null) {
amqpMsg.application_properties = msg.applicationProperties;
} else {
let bodyType: "data" | "sequence" | "value" = "data";

if (isServiceBusReceivedMessage(msg)) {
/*
* TODO: this is a bit complicated.
*
* It seems reasonable to expect to be able to round-trip a message (ie,
* receive a message, and then send it again, possibly to another queue / topic).
* If the user does that we need to make sure to respect their original AMQP
* type so when the message is re - encoded we don't put 'body' into the wrong spot.
*
* The complication is that we need to decide if we're okay with respecting a field
* from the rawAmqpMessage, which up until now we've treated as just vestigial
* information on send. My hope is that the use case of "alter the sb message in some
* incompatible way with the underying _rawAmqpMessage.bodyType" is not common
* enough for us to try to do anything more than what I'm doing here.
*/
bodyType = msg._rawAmqpMessage.bodyType ?? "data";
}

if (msg.messageAnnotations != null) {
amqpMsg.message_annotations = msg.messageAnnotations;
}
amqpMsg = {
body: encoder.encode(msg.body, bodyType),
message_annotations: {}
};

if (msg.deliveryAnnotations != null) {
amqpMsg.delivery_annotations = msg.deliveryAnnotations;
}
amqpMsg.ttl = msg.timeToLive;
}

return amqpMsg;
if (amqpMsg.ttl != null && amqpMsg.ttl !== Constants.maxDurationValue) {
amqpMsg.creation_time = Date.now();
amqpMsg.absolute_expiry_time = Math.min(
Constants.maxAbsoluteExpiryTime,
amqpMsg.creation_time + amqpMsg.ttl
);
}

let bodyType: "data" | "sequence" | "value" = "data";

if (isServiceBusReceivedMessage(msg)) {
/*
* TODO: this is a bit complicated.
*
* It seems reasonable to expect to be able to round-trip a message (ie,
* receive a message, and then send it again, possibly to another queue / topic).
* If the user does that we need to make sure to respect their original AMQP
* type so when the message is re - encoded we don't put 'body' into the wrong spot.
*
* The complication is that we need to decide if we're okay with respecting a field
* from the rawAmqpMessage, which up until now we've treated as just vestigial
* information on send. My hope is that the use case of "alter the sb message in some
* incompatible way with the underying _rawAmqpMessage.bodyType" is not common
* enough for us to try to do anything more than what I'm doing here.
*/
bodyType = msg._rawAmqpMessage.bodyType ?? "data";
}

const amqpMsg: RheaMessage = {
body: encoder.encode(msg.body, bodyType),
message_annotations: {}
};
if (isAmqpAnnotatedMessage(msg)) {
return amqpMsg;
}

if (msg.applicationProperties != null) {
amqpMsg.application_properties = msg.applicationProperties;
Expand Down Expand Up @@ -362,15 +363,6 @@ export function toRheaMessage(
if (msg.replyToSessionId != null) {
amqpMsg.reply_to_group_id = msg.replyToSessionId;
}
if (msg.timeToLive != null && msg.timeToLive !== Constants.maxDurationValue) {
amqpMsg.ttl = msg.timeToLive;
amqpMsg.creation_time = Date.now();
if (Constants.maxAbsoluteExpiryTime - amqpMsg.creation_time > amqpMsg.ttl) {
amqpMsg.absolute_expiry_time = amqpMsg.creation_time + amqpMsg.ttl;
} else {
amqpMsg.absolute_expiry_time = Constants.maxAbsoluteExpiryTime;
}
}
if (msg.partitionKey != null) {
if (msg.partitionKey.length > Constants.maxPartitionKeyLength) {
throw new Error(
Expand Down
140 changes: 140 additions & 0 deletions sdk/servicebus/service-bus/test/public/amqpAnnotatedMessage.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusSender } from "../../src";
import { ServiceBusReceiver } from "../../src/receivers/receiver";
import {
ServiceBusClientForTests,
createServiceBusClientForTests,
testPeekMsgsLength,
EntityName,
getRandomTestClientType
} from "../public/utils/testutils2";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { generateUuid } from "@azure/core-http";

const should = chai.should();
chai.use(chaiAsPromised);
const assert = chai.assert;

const anyRandomTestClientType = getRandomTestClientType();

let serviceBusClient: ServiceBusClientForTests;
let entityNames: EntityName;
let sender: ServiceBusSender;
let receiver: ServiceBusReceiver;
const sessionId = "session-1";

function afterEachTest(): Promise<void> {
return serviceBusClient.test.afterEach();
}

function getSampleAmqpAnnotatedMessage(randomTag?: string): AmqpAnnotatedMessage {
if (randomTag == null) {
randomTag = Math.random().toString();
}

return {
body: `message body ${randomTag}`,
bodyType: "data",
header: {
deliveryCount: 10, // TODO: Doesn't make sense to set on the message to be sent, should this be removed for sending?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove deliveryCount from the AmqpAnnotatedMessage for send side?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm....this is an interesting problem we don't have with ServiceBusMessage, now that I think about it, since we make a distinction between a sendable message and a received message.

For now I'd say it's fine. A received AmqpAnnotatedMessage should also be immediately sendable so it should work, and we can just do a quick check to make sure that we aren't in some way dependent on those fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshLove-msft @yunhaoling @hemanttanwar what are you guys doing here?

durable: false,
firstAcquirer: false,
priority: 20,
timeToLive: 100000
},
applicationProperties: {
propOne: 1,
propTwo: "two",
propThree: true,
propFour: Date()
},
// deliveryAnnotations - TODO: should this be removed for sending?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove deliveryAnnotations from the AmqpAnnotatedMessage for send side?

footer: {
propFooter: "foot"
},
messageAnnotations: { propMsgAnnotate: "annotation" },
properties: {
contentEncoding: "application/json; charset=utf-8",
correlationId: randomTag,
messageId: generateUuid()
}
};
}

describe("AmqpAnnotatedMessage", function(): void {
before(() => {
serviceBusClient = createServiceBusClientForTests();
});

after(() => {
return serviceBusClient.test.after();
});

afterEach(async () => {
await afterEachTest();
});

async function receiveMsg(testMessage: AmqpAnnotatedMessage) {
receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver({
...entityNames,
sessionId
});
const msgs = await receiver.receiveMessages(1);

should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array");
should.equal(msgs.length, 1, "Unexpected number of messages");
should.equal(msgs[0].body, testMessage.body, "Unexpected body on the received message");
should.equal(
msgs[0]._rawAmqpMessage.messageAnnotations!["propMsgAnnotate"],
testMessage.messageAnnotations!["propMsgAnnotate"],
"Unexpected messageAnnotations on the received message"
);
assert.deepEqualExcluding(
msgs[0]._rawAmqpMessage,
testMessage,
["deliveryAnnotations", "body", "messageAnnotations", "header", "properties"],
"Unexpected on the AmqpAnnotatedMessage"
);
assert.deepEqualExcluding(
msgs[0]._rawAmqpMessage.header!,
testMessage.header!,
["deliveryCount"],
"Unexpected header on the AmqpAnnotatedMessage"
);
assert.deepEqualExcluding(
msgs[0]._rawAmqpMessage.properties!,
testMessage.properties!,
["creationTime", "absoluteExpiryTime", "groupId"],
"Unexpected properties on the AmqpAnnotatedMessage"
);
assert.equal(
msgs[0]._rawAmqpMessage.properties!.groupId,
testMessage.properties!.groupId,
"Unexpected session-id on the AmqpAnnotatedMessage"
);
}

it(
anyRandomTestClientType + ": send, receive, verify props, and complete()",
async function(): Promise<void> {
entityNames = await serviceBusClient.test.createTestEntities(anyRandomTestClientType);

const testMessage: AmqpAnnotatedMessage = getSampleAmqpAnnotatedMessage();
testMessage.properties = {
...testMessage.properties,
groupId: entityNames.usesSessions ? sessionId : undefined
};
sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
);
await sender.sendMessages(testMessage);
await receiveMsg(testMessage);

await testPeekMsgsLength(receiver, 0);
}
);
});
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/test/public/utils/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ export class TestMessage {
applicationProperties: {
propOne: 1,
propTwo: "two",
propThree: true
propThree: true,
propFour: Date()
},
sessionId: TestMessage.sessionId,
replyToSessionId: "some-other-session-id"
Expand Down