Skip to content

Commit

Permalink
[core-amqp][event-hubs][service-bus] Fixes _process of undefined Type…
Browse files Browse the repository at this point in the history
…Error (#15597)

This PR fixes #13500.

`rhea` 2.0.1 contains the fix to this specific error. We currently use `rhea` 1.x, so there's additional work in this PR to workaround the single breaking change in `rhea`, and the breaking changes in `rhea-promise`.

### rhea breaking change

`rhea` contains 1 breaking change between versions 1.x and 2.x: timestamp types are now deserialized as Date objects instead of numbers. Unfortunately since this changes the way users' data might be deserialized in their service bus messages or event hubs events, we have to convert Date objects back to numbers in our client libraries until we do a major version bump. (Shorter term we can look at using rhea's default behavior behind a flag.)

### rhea-promise breaking changes

Some of the `rhea-promise` APIs that accepted multiple optional positional arguments have been updated to take a single options bag parameter at the end of their method parameter list.

AwaitableSender was also updated so that a timeout is no provided at instantiation. Instead, it must be provided per each `send()` call.

### core-amqp v3
Since core-amqp is being updated to depend on rhea 2.x, core-amqp dependencies will also pull in rhea 2.x transitively.
To ensure that existing versions of event hubs and service bus don't break by deserializing timestamps as Date objects, core-amqp is updated to a new major version: v3. 

Once #15349 is merged, we can also remove `AsyncLock` completely, so I'd like to merge that PR in before releasing the changes in this PR.
  • Loading branch information
chradek authored Jun 8, 2021
1 parent 8c726b8 commit dec897a
Show file tree
Hide file tree
Showing 17 changed files with 1,556 additions and 1,222 deletions.
2,493 changes: 1,332 additions & 1,161 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Release History

## 2.3.1 (Unreleased)
## 3.0.0 (Unreleased)

### Breaking changes

- Updates the `rhea-promise` and `rhea` dependencies to version 2.x. `rhea` contains a breaking change that changes deserialization of timestamps from numbers to Date objects.

## 2.3.0 (2021-04-29)

Expand Down
6 changes: 3 additions & 3 deletions sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/core-amqp",
"sdk-type": "client",
"version": "2.3.1",
"version": "3.0.0",
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down Expand Up @@ -78,8 +78,8 @@
"events": "^3.0.0",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea": "^1.0.24",
"rhea-promise": "^1.2.1",
"rhea": "^2.0.2",
"rhea-promise": "^2.0.0",
"tslib": "^2.0.0",
"url": "^0.11.0",
"util": "^0.12.1"
Expand Down
8 changes: 4 additions & 4 deletions sdk/core/core-amqp/src/messageProperties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export const AmqpMessageProperties = {
toRheaMessageProperties(props: AmqpMessageProperties): RheaMessageProperties {
const amqpProperties: RheaMessageProperties = {};
if (props.absoluteExpiryTime != undefined) {
amqpProperties.absolute_expiry_time = props.absoluteExpiryTime;
amqpProperties.absolute_expiry_time = new Date(props.absoluteExpiryTime);
}
if (props.contentEncoding != undefined) {
amqpProperties.content_encoding = props.contentEncoding;
Expand All @@ -91,7 +91,7 @@ export const AmqpMessageProperties = {
amqpProperties.correlation_id = props.correlationId;
}
if (props.creationTime != undefined) {
amqpProperties.creation_time = props.creationTime;
amqpProperties.creation_time = new Date(props.creationTime);
}
if (props.groupId != undefined) {
amqpProperties.group_id = props.groupId;
Expand Down Expand Up @@ -130,7 +130,7 @@ export const AmqpMessageProperties = {
fromRheaMessageProperties(props: RheaMessageProperties): AmqpMessageProperties {
const msgProperties: AmqpMessageProperties = {};
if (props.absolute_expiry_time != undefined) {
msgProperties.absoluteExpiryTime = props.absolute_expiry_time;
msgProperties.absoluteExpiryTime = props.absolute_expiry_time.getTime();
}
if (props.content_encoding != undefined) {
msgProperties.contentEncoding = props.content_encoding;
Expand All @@ -142,7 +142,7 @@ export const AmqpMessageProperties = {
msgProperties.correlationId = props.correlation_id;
}
if (props.creation_time != undefined) {
msgProperties.creationTime = props.creation_time;
msgProperties.creationTime = props.creation_time.getTime();
}
if (props.group_id != undefined) {
msgProperties.groupId = props.group_id;
Expand Down
8 changes: 4 additions & 4 deletions sdk/core/core-amqp/test/message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ describe("message", function() {
// userId: ""
};
const amqpMsgPropertiesExpected: RheaMessageProperties = {
absolute_expiry_time: 0,
absolute_expiry_time: new Date(0),
content_encoding: "",
content_type: "",
correlation_id: 0,
creation_time: 0,
creation_time: new Date(0),
group_id: "",
group_sequence: 0,
message_id: "",
Expand Down Expand Up @@ -140,11 +140,11 @@ describe("message", function() {
// userId: ""
};
const amqpMsgProperties: RheaMessageProperties = {
absolute_expiry_time: 0,
absolute_expiry_time: new Date(0),
content_encoding: "",
content_type: "",
correlation_id: 0,
creation_time: 0,
creation_time: new Date(0),
group_id: "",
group_sequence: 0,
message_id: "",
Expand Down
3 changes: 3 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 5.5.2 (Unreleased)

### Bug fixes

- Fixes issue [#13500](https://github.com/Azure/azure-sdk-for-js/issues/13500) where a `TypeError: Cannot read property '_process' of undefined` could be thrown in rare cases.

## 5.5.1 (2021-04-29)

Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
},
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-amqp": "^2.3.0",
"@azure/core-amqp": "^3.0.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"@azure/core-auth": "^1.3.0",
"@azure/core-tracing": "1.0.0-preview.11",
Expand All @@ -99,7 +99,7 @@
"is-buffer": "^2.0.3",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea-promise": "^1.2.1",
"rhea-promise": "^2.0.0",
"tslib": "^2.0.0",
"uuid": "^8.3.0"
},
Expand Down
52 changes: 48 additions & 4 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { DeliveryAnnotations, Message as RheaMessage, MessageAnnotations } from "rhea-promise";
import { Constants } from "@azure/core-amqp";
import { isDefined } from "./util/typeGuards";
import { isDefined, objectHasProperty } from "./util/typeGuards";

/**
* Describes the delivery annotations.
Expand Down Expand Up @@ -156,13 +156,15 @@ export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
if (!data.systemProperties) {
data.systemProperties = {};
}
data.systemProperties[annotationKey] = msg.message_annotations[annotationKey];
data.systemProperties[annotationKey] = convertDatesToNumbers(
msg.message_annotations[annotationKey]
);
break;
}
}
}
if (msg.application_properties) {
data.properties = msg.application_properties;
data.properties = convertDatesToNumbers(msg.application_properties);
}
if (msg.delivery_annotations) {
data.lastEnqueuedOffset = msg.delivery_annotations.last_enqueued_offset;
Expand All @@ -181,7 +183,9 @@ export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
data.systemProperties = {};
}
if (msg[messageProperty] != null) {
data.systemProperties[messagePropertiesMap[messageProperty]] = msg[messageProperty];
data.systemProperties[messagePropertiesMap[messageProperty]] = convertDatesToNumbers(
msg[messageProperty]
);
}
}

Expand Down Expand Up @@ -284,3 +288,43 @@ export interface ReceivedEventData {
[key: string]: any;
};
}

/**
* Converts any Date objects into a number representing date.getTime().
* Recursively checks for any Date objects in arrays and objects.
* @internal
*/
function convertDatesToNumbers<T = unknown>(thing: T): T {
// fast exit
if (!isDefined(thing)) return thing;

// When 'thing' is a Date, return the number representation
if (
typeof thing === "object" &&
objectHasProperty(thing, "getTime") &&
typeof thing.getTime === "function"
) {
return thing.getTime();
}

/*
Examples:
[0, 'foo', new Date(), { nested: new Date()}]
*/
if (Array.isArray(thing)) {
return (thing.map(convertDatesToNumbers) as unknown) as T;
}

/*
Examples:
{ foo: new Date(), children: { nested: new Date() }}
*/
if (typeof thing === "object" && isDefined(thing)) {
thing = { ...thing };
for (const key of Object.keys(thing)) {
(thing as any)[key] = convertDatesToNumbers((thing as any)[key]);
}
}

return thing;
}
12 changes: 6 additions & 6 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ export class EventHubSender extends LinkEntity {
);
}

private _createSenderOptions(timeoutInMs: number, newName?: boolean): AwaitableSenderOptions {
private _createSenderOptions(newName?: boolean): AwaitableSenderOptions {
if (newName) this.name = `${uuid()}`;
const srOptions: AwaitableSenderOptions = {
name: this.name,
Expand All @@ -313,8 +313,7 @@ export class EventHubSender extends LinkEntity {
onError: this._onAmqpError,
onClose: this._onAmqpClose,
onSessionError: this._onSessionError,
onSessionClose: this._onSessionClose,
sendTimeoutInSeconds: timeoutInMs / 1000
onSessionClose: this._onSessionClose
};
logger.verbose("Creating sender with options: %O", srOptions);
return srOptions;
Expand Down Expand Up @@ -403,9 +402,10 @@ export class EventHubSender extends LinkEntity {
throw translate(e);
}

sender.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
try {
const delivery = await sender.send(rheaMessage, undefined, 0x80013700, {
const delivery = await sender.send(rheaMessage, {
format: 0x80013700,
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000,
abortSignal
});
logger.info(
Expand Down Expand Up @@ -454,7 +454,7 @@ export class EventHubSender extends LinkEntity {
const retryOptions = options.retryOptions || {};
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;
const senderOptions = this._createSenderOptions(timeoutInMs);
const senderOptions = this._createSenderOptions();

const startTime = Date.now();
const createLinkPromise = async (): Promise<AwaitableSender> => {
Expand Down
47 changes: 43 additions & 4 deletions sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ describe("EventData", function(): void {
testEventData.offset!.should.equal(testAnnotations["x-opt-offset"]);
testEventData.sequenceNumber!.should.equal(testAnnotations["x-opt-sequence-number"]);
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
testEventData.systemProperties!["x-iot-foo-prop"] = extraAnnotations["x-iot-foo-prop"];
testEventData.systemProperties!["x-iot-bar-prop"] = extraAnnotations["x-iot-bar-prop"];
testEventData.systemProperties!["x-iot-foo-prop"].should.eql(
extraAnnotations["x-iot-foo-prop"]
);
testEventData.systemProperties!["x-iot-bar-prop"].should.eql(
extraAnnotations["x-iot-bar-prop"]
);
});

it("returns systemProperties for special known properties", function(): void {
Expand All @@ -104,8 +108,8 @@ describe("EventData", function(): void {
content_encoding: "utf-8",
content_type: "application/json",
correlation_id: "id2",
absolute_expiry_time: 0,
creation_time: 0,
absolute_expiry_time: new Date(0),
creation_time: new Date(0),
group_id: "groupId",
group_sequence: 1
});
Expand All @@ -131,6 +135,41 @@ describe("EventData", function(): void {
testEventData.systemProperties!["groupSequence"].should.equal(1);
});
});

it("deserializes Dates to numbers in properties and annotations", () => {
const timestamp = new Date();
const extraAnnotations = {
"x-date": timestamp,
"x-number": timestamp.getTime()
};
const testEventData = fromRheaMessage({
body: testBody,
application_properties: {
topLevelDate: timestamp,
child: {
nestedDate: timestamp,
children: [timestamp, { deepDate: timestamp }]
}
},
message_annotations: {
...testAnnotations,
...extraAnnotations
}
});
testEventData.enqueuedTimeUtc!.getTime().should.equal(testAnnotations["x-opt-enqueued-time"]);
testEventData.offset!.should.equal(testAnnotations["x-opt-offset"]);
testEventData.sequenceNumber!.should.equal(testAnnotations["x-opt-sequence-number"]);
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
testEventData.systemProperties!["x-date"].should.eql(extraAnnotations["x-date"].getTime());
testEventData.systemProperties!["x-number"].should.eql(extraAnnotations["x-number"]);
testEventData.properties!.should.eql({
topLevelDate: timestamp.getTime(),
child: {
nestedDate: timestamp.getTime(),
children: [timestamp.getTime(), { deepDate: timestamp.getTime() }]
}
});
});
});

describe("toAmqpMessage", function(): void {
Expand Down
11 changes: 5 additions & 6 deletions sdk/eventhub/event-hubs/test/public/node/disconnects.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,18 @@ describe("disconnected", function() {
});

it("should not throw an uncaught exception", async () => {
const client = new EventHubProducerClient(service.connectionString, service.path);
const client = new EventHubProducerClient(service.connectionString, service.path, {
retryOptions: {
timeoutInMs: 0
}
});
const clientConnectionContext = client["_context"];

// Send an event to open the connection.
await client.sendBatch([{ body: "test" }]);
const originalConnectionId = clientConnectionContext.connectionId;

// We need to dig deep into the internals to get the awaitable sender so that .
const awaitableSender = client["_sendersMap"].get("")!["_sender"]!;

let thirdSend: Promise<void>;
// Change the timeout on the awaitableSender so it forces an OperationTimeoutError
awaitableSender.sendTimeoutInSeconds = 0;
// Ensure that the connection will disconnect, and another sendBatch occurs while a sendBatch is in-flight.
setTimeout(() => {
// Trigger a disconnect on the underlying connection while the `sendBatch` is in flight.
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/mock-hub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"rhea": "^1.0.24",
"rhea": "^2.0.2",
"tslib": "^2.0.0"
},
"//sampleConfiguration": {
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- ServiceBusSender could throw an error (`TypeError: Cannot read property 'maxMessageSize' of undefined`) if a link was being restarted while calling sendMessages().
[PR#15409](https://github.com/Azure/azure-sdk-for-js/pull/15409)
- Fixes issue [#13500](https://github.com/Azure/azure-sdk-for-js/issues/13500) where a `TypeError: Cannot read property '_process' of undefined` could be thrown in rare cases.

## 7.2.0-beta.1 (2021-05-18)

Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
},
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-amqp": "^2.3.0",
"@azure/core-amqp": "^3.0.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"@azure/core-http": "^1.2.0",
"@azure/core-tracing": "1.0.0-preview.11",
Expand All @@ -123,7 +123,7 @@
"long": "^4.0.0",
"process": "^0.11.10",
"tslib": "^2.0.0",
"rhea-promise": "^1.2.1"
"rhea-promise": "^2.0.0"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down
Loading

0 comments on commit dec897a

Please sign in to comment.