Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] adds systemProperties to ReceivedEventData #5008

Merged
merged 4 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,10 @@ export class EventProcessor {
stop(): Promise<void>;
}

// @public (undocumented)
// @public
export interface EventProcessorOptions {
// (undocumented)
initialEventPosition?: EventPosition;
// (undocumented)
maxBatchSize?: number;
// (undocumented)
maxWaitTimeInSeconds?: number;
}

Expand Down Expand Up @@ -266,6 +263,9 @@ export interface ReceivedEventData {
[key: string]: any;
};
sequenceNumber: number;
systemProperties?: {
[key: string]: any;
};
}

// @public
Expand Down
43 changes: 32 additions & 11 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ export interface EventDataInternal {
* @property [retrievalTime] The time when the runtime info was retrieved
*/
retrievalTime?: Date;
/**
* @property [systemProperties] The properties set by the service.
*/
systemProperties?: Dictionary<any>;
}

/**
Expand All @@ -118,18 +122,29 @@ export function fromAmqpMessage(msg: Message): EventDataInternal {
const data: EventDataInternal = {
body: msg.body
};

if (msg.message_annotations) {
if (msg.message_annotations[Constants.partitionKey] != undefined) {
data.partitionKey = msg.message_annotations[Constants.partitionKey];
}
if (msg.message_annotations[Constants.sequenceNumber] != undefined) {
data.sequenceNumber = msg.message_annotations[Constants.sequenceNumber];
}
if (msg.message_annotations[Constants.enqueuedTime] != undefined) {
data.enqueuedTimeUtc = new Date(msg.message_annotations[Constants.enqueuedTime] as number);
}
if (msg.message_annotations[Constants.offset] != undefined) {
data.offset = msg.message_annotations[Constants.offset];
for (const annotationKey of Object.keys(msg.message_annotations)) {
switch (annotationKey) {
case Constants.partitionKey:
data.partitionKey = msg.message_annotations[annotationKey];
break;
case Constants.sequenceNumber:
data.sequenceNumber = msg.message_annotations[annotationKey];
break;
case Constants.enqueuedTime:
data.enqueuedTimeUtc = new Date(msg.message_annotations[annotationKey]);
break;
case Constants.offset:
data.offset = msg.message_annotations[annotationKey];
break;
default:
if (!data.systemProperties) {
data.systemProperties = {};
}
data.systemProperties[annotationKey] = msg.message_annotations[annotationKey];
break;
}
}
}
if (msg.application_properties) {
Expand Down Expand Up @@ -222,4 +237,10 @@ export interface ReceivedEventData {
* @property The sequence number of the event.
*/
sequenceNumber: number;
/**
* @property The properties set by the service.
*/
systemProperties?: {
[key: string]: any;
};
}
57 changes: 29 additions & 28 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ export class EventHubReceiver extends LinkEntity {
offset: data.offset!,
sequenceNumber: data.sequenceNumber!,
enqueuedTimeUtc: data.enqueuedTimeUtc!,
partitionKey: data.partitionKey!
partitionKey: data.partitionKey!,
systemProperties: data.systemProperties
};

this._checkpoint = receivedEventData.sequenceNumber;
Expand Down Expand Up @@ -270,7 +271,7 @@ export class EventHubReceiver extends LinkEntity {
if (rheaReceiver.isItselfClosed()) {
log.error(
"[%s] The receiver was closed by the user." +
"Hence not notifying the user's error handler.",
"Hence not notifying the user's error handler.",
this._context.connectionId
);
return;
Expand All @@ -286,7 +287,7 @@ export class EventHubReceiver extends LinkEntity {
);
log.error(
"[%s] Since the user did not close the receiver " +
"we let the user know about it by calling the user's error handler.",
"we let the user know about it by calling the user's error handler.",
this._context.connectionId
);
this._onError(error);
Expand All @@ -307,7 +308,7 @@ export class EventHubReceiver extends LinkEntity {
if (rheaReceiver.isSessionItselfClosed()) {
log.error(
"[%s] The receiver was closed by the user." +
"Hence not notifying the user's error handler.",
"Hence not notifying the user's error handler.",
this._context.connectionId
);
return;
Expand All @@ -324,7 +325,7 @@ export class EventHubReceiver extends LinkEntity {

log.error(
"[%s] Since the user did not close the receiver, " +
"we let the user know about it by calling the user's error handler.",
"we let the user know about it by calling the user's error handler.",
this._context.connectionId
);
this._onError(error);
Expand All @@ -336,8 +337,8 @@ export class EventHubReceiver extends LinkEntity {
if (!rheaReceiver || rheaReceiver.isItselfClosed()) {
log.error(
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " +
"because the sdk initiated it. Hence not calling detached from the _onAmqpClose" +
"() handler.",
"because the sdk initiated it. Hence not calling detached from the _onAmqpClose" +
"() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -349,7 +350,7 @@ export class EventHubReceiver extends LinkEntity {
if (amqpError) {
log.error(
"[%s] 'receiver_close' event occurred for receiver '%s' with address '%s'. " +
"The associated error is: %O",
"The associated error is: %O",
this._context.connectionId,
this.name,
this.address,
Expand All @@ -360,8 +361,8 @@ export class EventHubReceiver extends LinkEntity {
if (!this.isConnecting) {
log.error(
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " +
"and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " +
"detached from the _onAmqpClose() handler.",
"and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " +
"detached from the _onAmqpClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -370,8 +371,8 @@ export class EventHubReceiver extends LinkEntity {
} else {
log.error(
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " +
"and the sdk did not initate this. Moreover the receiver is already re-connecting. " +
"Hence not calling detached from the _onAmqpClose() handler.",
"and the sdk did not initate this. Moreover the receiver is already re-connecting. " +
"Hence not calling detached from the _onAmqpClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -384,8 +385,8 @@ export class EventHubReceiver extends LinkEntity {
if (!rheaReceiver || rheaReceiver.isSessionItselfClosed()) {
log.error(
"[%s] 'session_close' event occurred on the session of receiver '%s' with " +
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " +
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.",
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " +
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -397,7 +398,7 @@ export class EventHubReceiver extends LinkEntity {
if (sessionError) {
log.error(
"[%s] 'session_close' event occurred for receiver '%s' with address '%s'. " +
"The associated error is: %O",
"The associated error is: %O",
this._context.connectionId,
this.name,
this.address,
Expand All @@ -408,8 +409,8 @@ export class EventHubReceiver extends LinkEntity {
if (!this.isConnecting) {
log.error(
"[%s] 'session_close' event occurred on the session of receiver '%s' with " +
"address '%s' and the sdk did not initiate this. Hence calling detached from the " +
"_onAmqpSessionClose() handler.",
"address '%s' and the sdk did not initiate this. Hence calling detached from the " +
"_onAmqpSessionClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -418,8 +419,8 @@ export class EventHubReceiver extends LinkEntity {
} else {
log.error(
"[%s] 'session_close' event occurred on the session of receiver '%s' with " +
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " +
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.",
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " +
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.",
this._context.connectionId,
this.name,
this.address
Expand Down Expand Up @@ -462,17 +463,17 @@ export class EventHubReceiver extends LinkEntity {
shouldReopen = true;
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is retryable. This is a candidate for re-establishing " +
"the receiver link.",
"was an accompanying error and it is retryable. This is a candidate for re-establishing " +
"the receiver link.",
this._context.connectionId,
this.name,
this.address
);
} else {
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the receiver link.",
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the receiver link.",
this._context.connectionId,
this.name,
this.address
Expand All @@ -483,8 +484,8 @@ export class EventHubReceiver extends LinkEntity {
shouldReopen = true;
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. " +
"There was no accompanying error as well. This is a candidate for re-establishing " +
"the receiver link.",
"There was no accompanying error as well. This is a candidate for re-establishing " +
"the receiver link.",
this._context.connectionId,
this.name,
this.address
Expand Down Expand Up @@ -545,7 +546,7 @@ export class EventHubReceiver extends LinkEntity {
} catch (err) {
log.error(
"[%s] An error occurred while processing onDetached() of Receiver '%s' with address " +
"'%s': %O",
"'%s': %O",
this._context.connectionId,
this.name,
this.address,
Expand Down Expand Up @@ -731,7 +732,7 @@ export class EventHubReceiver extends LinkEntity {
if (!this.isOpen() && !this.isConnecting) {
log.error(
"[%s] The receiver '%s' with address '%s' is not open and is not currently " +
"establishing itself. Hence let's try to connect.",
"establishing itself. Hence let's try to connect.",
this._context.connectionId,
this.name,
this.address
Expand Down Expand Up @@ -784,7 +785,7 @@ export class EventHubReceiver extends LinkEntity {
} else {
log.error(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
"-> %s. Hence not reconnecting.",
this._context.connectionId,
this.name,
this.address,
Expand Down
23 changes: 23 additions & 0 deletions sdk/eventhub/event-hubs/test/eventdata.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,29 @@ describe("EventData #RunnableInBrowser", function(): void {
const testEventData = fromAmqpMessage(testMessage);
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
});

it("returns systemProperties for unknown message annotations", function(): void {
const extraAnnotations = {
"x-iot-foo-prop": "just-a-foo",
"x-iot-bar-prop": "bar-above-the-rest"
};
const testEventData = fromAmqpMessage({
body: testBody,
application_properties: applicationProperties,
message_annotations: {
...testAnnotations,
...extraAnnotations
}
});
testEventData
.enqueuedTimeUtc!.getTime()
.should.equal(testAnnotations["x-opt-enqueued-time"]);
testEventData.offset!.should.equal(testAnnotations["x-opt-offset"]);
testEventData.sequenceNumber!.should.equal(testAnnotations["x-opt-sequence-number"]);
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
testEventData.systemProperties!["x-iot-foo-prop"] = extraAnnotations["x-iot-foo-prop"];
testEventData.systemProperties!["x-iot-bar-prop"] = extraAnnotations["x-iot-bar-prop"];
});
});
});

Expand Down