forked from Azure/azure-event-hubs-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventData.ts
242 lines (233 loc) · 8.82 KB
/
eventData.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
import {
Message, MessageProperties, MessageHeader, Dictionary, messageHeader, messageProperties,
MessageAnnotations, DeliveryAnnotations
} from "rhea-promise";
import { Constants } from "@azure/amqp-common";
/**
* Describes the delivery annotations.
* @interface EventHubDeliveryAnnotations
*/
export interface EventHubDeliveryAnnotations extends DeliveryAnnotations {
/**
* @property {string} [last_enqueued_offset] The offset of the last event.
*/
last_enqueued_offset?: string;
/**
* @property {number} [last_enqueued_sequence_number] The sequence number of the last event.
*/
last_enqueued_sequence_number?: number;
/**
* @property {number} [last_enqueued_time_utc] The enqueued time of the last event.
*/
last_enqueued_time_utc?: number;
/**
* @property {number} [runtime_info_retrieval_time_utc] The retrieval time of the last event.
*/
runtime_info_retrieval_time_utc?: number;
/**
* @property {string} Any unknown delivery annotations.
*/
[x: string]: any;
}
/**
* Map containing message attributes that will be held in the message header.
* @interface EventHubMessageAnnotations
*/
export interface EventHubMessageAnnotations extends MessageAnnotations {
/**
* @property {string | null} [x-opt-partition-key] Annotation for the partition key set for the event.
*/
"x-opt-partition-key"?: string | null;
/**
* @property {number} [x-opt-sequence-number] Annontation for the sequence number of the event.
*/
"x-opt-sequence-number"?: number;
/**
* @property {number} [x-opt-enqueued-time] Annotation for the enqueued time of the event.
*/
"x-opt-enqueued-time"?: number;
/**
* @property {string} [x-opt-offset] Annotation for the offset of the event.
*/
"x-opt-offset"?: string;
/**
* @property {any} Any other annotation that can be added to the message.
*/
[x: string]: any;
}
/**
* Describes the structure of an event to be sent or received from the EventHub.
* @interface EventData
*/
export interface EventData {
/**
* @property {MessageHeader} [header] - The message headers.
*/
header?: MessageHeader;
/**
* @property {any} body - The message body that needs to be sent or is received.
*/
body: any;
/**
* @property {Date} [enqueuedTimeUtc] The enqueued time of the event.
*/
enqueuedTimeUtc?: Date;
/**
* @property {string | null} [partitionKey] If specified EventHub will hash this to a partitionId.
* It guarantees that messages end up in a specific partition on the event hub.
*/
partitionKey?: string | null;
/**
* @property {string} [offset] The offset of the event.
*/
offset?: string;
/**
* @property {number} [sequenceNumber] The sequence number of the event.
*/
sequenceNumber?: number;
/**
* @property {AmqpMessageAnnotations} [annotations] The amqp message attributes.
*/
annotations?: EventHubMessageAnnotations;
/**
* @property {AmqpMessageProperties} [properties] The predefined AMQP properties like message_id, correlation_id, reply_to, etc.
*/
properties?: MessageProperties;
/**
* @property {Dictionary<any>} [applicationProperties] The application specific properties.
*/
applicationProperties?: Dictionary<any>;
/**
* @property {number} [lastSequenceNumber] The last sequence number of the event within the partition stream of the Event Hub.
*/
lastSequenceNumber?: number;
/**
* @property {string} [lastEnqueuedOffset] The offset of the last enqueued event.
*/
lastEnqueuedOffset?: string;
/**
* @property {Date} [lastEnqueuedTime] The enqueued UTC time of the last event.
*/
lastEnqueuedTime?: Date;
/**
* @property {Date} [retrievalTime] The time when the runtime info was retrieved
*/
retrievalTime?: Date;
/**
* @property {AmqpMessage} _raw_amqp_mesage The underlying raw amqp message.
*/
_raw_amqp_mesage?: Message;
}
/**
* Describes the methods on the EventData interface.
* @module EventData
*/
export namespace EventData {
/**
* Converts the AMQP message to an EventData.
* @param {AmqpMessage} msg The AMQP message that needs to be converted to EventData.
*/
export function fromAmqpMessage(msg: Message): EventData {
const data: EventData = {
body: msg.body,
_raw_amqp_mesage: msg
};
if (msg.message_annotations) {
data.annotations = msg.message_annotations;
if (msg.message_annotations[Constants.partitionKey] != undefined) data.partitionKey = msg.message_annotations[Constants.partitionKey];
if (msg.message_annotations[Constants.sequenceNumber] != undefined) data.sequenceNumber = msg.message_annotations[Constants.sequenceNumber];
if (msg.message_annotations[Constants.enqueuedTime] != undefined) data.enqueuedTimeUtc = new Date(msg.message_annotations[Constants.enqueuedTime] as number);
if (msg.message_annotations[Constants.offset] != undefined) data.offset = msg.message_annotations[Constants.offset];
}
// Since rhea expects message properties as top level properties we will look for them and unflatten them inside properties.
for (const prop of messageProperties) {
if ((msg as any)[prop] != undefined) {
if (!data.properties) {
data.properties = {};
}
(data.properties as any)[prop] = (msg as any)[prop];
}
}
// Since rhea expects message headers as top level properties we will look for them and unflatten them inside header.
for (const prop of messageHeader) {
if ((msg as any)[prop] != undefined) {
if (!data.header) {
data.header = {};
}
(data.header as any)[prop] = (msg as any)[prop];
}
}
if (msg.application_properties) {
data.applicationProperties = msg.application_properties;
}
if (msg.delivery_annotations) {
data.lastEnqueuedOffset = msg.delivery_annotations.last_enqueued_offset;
data.lastSequenceNumber = msg.delivery_annotations.last_enqueued_sequence_number;
data.lastEnqueuedTime = new Date(msg.delivery_annotations.last_enqueued_time_utc as number);
data.retrievalTime = new Date(msg.delivery_annotations.runtime_info_retrieval_time_utc as number);
}
return data;
}
/**
* Converts an EventData object to an AMQP message.
* @param {EventData} data The EventData object that needs to be converted to an AMQP message.
*/
export function toAmqpMessage(data: EventData): Message {
const msg: Message = {
body: data.body,
};
// As per the AMQP 1.0 spec If the message-annotations or delivery-annotations section is omitted,
// it is equivalent to a message-annotations section containing anempty map of annotations.
msg.message_annotations = {};
msg.delivery_annotations = {};
if (data.annotations) {
msg.message_annotations = data.annotations;
}
if (data.properties) {
// Set amqp message properties as top level properties, since rhea sends them as top level properties.
for (const prop in data.properties) {
(msg as any)[prop] = (data.properties as any)[prop];
}
}
if (data.applicationProperties) {
msg.application_properties = data.applicationProperties;
}
if (data.partitionKey != undefined) {
msg.message_annotations[Constants.partitionKey] = data.partitionKey;
// Event Hub service cannot route messages to a specific partition based on the partition key
// if AMQP message header is an empty object. Hence we make sure that header is always present
// with atleast one property. Setting durable to true, helps us achieve that.
msg.durable = true;
}
if (data.sequenceNumber != undefined) {
msg.message_annotations[Constants.sequenceNumber] = data.sequenceNumber;
}
if (data.enqueuedTimeUtc != undefined) {
msg.message_annotations[Constants.enqueuedTime] = data.enqueuedTimeUtc.getTime();
}
if (data.offset != undefined) {
msg.message_annotations[Constants.offset] = data.offset;
}
if (data.lastEnqueuedOffset != undefined) {
msg.delivery_annotations.last_enqueued_offset = data.lastEnqueuedOffset;
}
if (data.lastSequenceNumber != undefined) {
msg.delivery_annotations.last_enqueued_sequence_number = data.lastSequenceNumber;
}
if (data.lastEnqueuedTime != undefined) {
msg.delivery_annotations.last_enqueued_time_utc = data.lastEnqueuedTime.getTime();
}
if (data.retrievalTime != undefined) {
msg.delivery_annotations.runtime_info_retrieval_time_utc = data.retrievalTime.getTime();
}
if (data.header) {
// Set amqp message header as top level properties, since rhea expects them as top level properties.
for (const prop in data.header) {
(msg as any)[prop] = (data.header as any)[prop];
}
}
return msg;
}
}