diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 9b800f2596c0..7ca515b854e4 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -3,13 +3,19 @@ ## 5.6.0 (Unreleased) ### Features Added + - With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details. +- Adds the `contentType`, `correlationId`, and `messageId` AMQP properties as top-level fields on `EventData` and `ReceivedEventData`. + +- Enable encoding the body of a message to the 'value' or 'sequence' sections (via AmqpAnnotatedMessage.bodyType). Using this encoding is not required but does allow you to take advantage of native AMQP serialization for supported primitives or sequences. + + More information about the AMQP message body type can be found in the AMQP specification: [link](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format) + ### Breaking Changes ### Key Bugs Fixed - ## 5.5.2 (2021-06-10) ### Bug fixes diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index f32080899c6f..953a510a95a0 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -145,6 +145,7 @@ "assert": "^1.4.1", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", + "chai-exclude": "^2.0.2", "chai-string": "^1.5.0", "cross-env": "^7.0.2", "debug": "^4.1.1", diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 9f25e798454b..45d801b43ce8 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -5,6 +5,7 @@ ```ts import { AbortSignalLike } from '@azure/abort-controller'; +import { AmqpAnnotatedMessage } from '@azure/core-amqp'; import { MessagingError } from '@azure/core-amqp'; import { NamedKeyCredential } from '@azure/core-auth'; import { OperationTracingOptions } from '@azure/core-tracing'; @@ -54,6 +55,9 @@ export const earliestEventPosition: EventPosition; // @public export interface EventData { body: any; + contentType?: string; + correlationId?: string | number | Buffer; + messageId?: string | number | Buffer; properties?: { [key: string]: any; }; @@ -72,7 +76,7 @@ export interface EventDataBatch { // @internal readonly partitionKey?: string; readonly sizeInBytes: number; - tryAdd(eventData: EventData, options?: TryAddOptions): boolean; + tryAdd(eventData: EventData | AmqpAnnotatedMessage, options?: TryAddOptions): boolean; } // @public @@ -129,7 +133,7 @@ export class EventHubProducerClient { getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise; getPartitionIds(options?: GetPartitionIdsOptions): Promise>; getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise; - sendBatch(batch: EventData[], options?: SendBatchOptions): Promise; + sendBatch(batch: EventData[] | AmqpAnnotatedMessage[], options?: SendBatchOptions): Promise; sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; } @@ -239,7 +243,11 @@ export type ProcessInitializeHandler = (context: PartitionContext) => Promise Constants.maxMessageIdLength + ) { + throw new Error( + `Length of 'messageId' property on the event cannot be greater than ${Constants.maxMessageIdLength} characters.` + ); + } + rheaMessage.message_id = data.messageId; + } } - return msg; + return rheaMessage; } /** @@ -240,6 +323,29 @@ export interface EventData { * cross-language compatibility. */ body: any; + + /** + * The content type of the message. Optionally describes + * the payload of the message, with a descriptor following the format of RFC2045, Section 5, for + * example "application/json". + */ + contentType?: string; + + /** + * The correlation identifier that allows an + * application to specify a context for the message for the purposes of correlation, for example + * reflecting the MessageId of a message that is being replied to. + */ + correlationId?: string | number | Buffer; + + /** + * The message identifier is an + * application-defined value that uniquely identifies the message and its payload. + * + * Note: Numbers that are not whole integers are not allowed. + */ + messageId?: string | number | Buffer; + /** * Set of key value pairs that can be used to set properties specific to user application. */ @@ -287,6 +393,41 @@ export interface ReceivedEventData { systemProperties?: { [key: string]: any; }; + + /** + * The content type of the message. Optionally describes + * the payload of the message, with a descriptor following the format of RFC2045, Section 5, for + * example "application/json". + */ + contentType?: string; + + /** + * The correlation identifier that allows an + * application to specify a context for the message for the purposes of correlation, for example + * reflecting the MessageId of a message that is being replied to. + */ + correlationId?: string | number | Buffer; + + /** + * The message identifier is an + * application-defined value that uniquely identifies the message and its payload. + */ + messageId?: string | number | Buffer; + + /** + * Returns the underlying raw amqp message. + */ + getRawAmqpMessage(): AmqpAnnotatedMessage; +} + +/** + * @internal + */ +export function isAmqpAnnotatedMessage(possible: unknown): possible is AmqpAnnotatedMessage { + return ( + isObjectWithProperties(possible, ["body", "bodyType"]) && + !objectHasProperty(possible, "getRawAmqpMessage") + ); } /** diff --git a/sdk/eventhub/event-hubs/src/eventDataBatch.ts b/sdk/eventhub/event-hubs/src/eventDataBatch.ts index 9d1e17821126..dcd16a6c97b1 100644 --- a/sdk/eventhub/event-hubs/src/eventDataBatch.ts +++ b/sdk/eventhub/event-hubs/src/eventDataBatch.ts @@ -1,14 +1,14 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { EventData, toRheaMessage } from "./eventData"; +import { EventData, isAmqpAnnotatedMessage, toRheaMessage } from "./eventData"; import { ConnectionContext } from "./connectionContext"; import { MessageAnnotations, message, Message as RheaMessage } from "rhea-promise"; import { throwTypeErrorIfParameterMissing } from "./util/error"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; import { Span, SpanContext } from "@azure/core-tracing"; import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData"; import { convertTryAddOptionsForCompatibility, createMessageSpan } from "./diagnostics/tracing"; -import { defaultDataTransformer } from "./dataTransformer"; import { isDefined, isObjectWithProperties } from "./util/typeGuards"; import { OperationTracingOptions } from "@azure/core-tracing"; @@ -106,10 +106,10 @@ export interface EventDataBatch { * **NOTE**: Always remember to check the return value of this method, before calling it again * for the next event. * - * @param eventData - An individual event data object. + * @param eventData - An individual event data object or AmqpAnnotatedMessage. * @returns A boolean value indicating if the event data has been added to the batch or not. */ - tryAdd(eventData: EventData, options?: TryAddOptions): boolean; + tryAdd(eventData: EventData | AmqpAnnotatedMessage, options?: TryAddOptions): boolean; /** * The AMQP message containing encoded events that were added to the batch. @@ -284,13 +284,15 @@ export class EventDataBatchImpl implements EventDataBatch { * @param eventData - An individual event data object. * @returns A boolean value indicating if the event data has been added to the batch or not. */ - public tryAdd(eventData: EventData, options: TryAddOptions = {}): boolean { + public tryAdd(eventData: EventData | AmqpAnnotatedMessage, options: TryAddOptions = {}): boolean { throwTypeErrorIfParameterMissing(this._context.connectionId, "tryAdd", "eventData", eventData); options = convertTryAddOptionsForCompatibility(options); // check if the event has already been instrumented const previouslyInstrumented = Boolean( - eventData.properties && eventData.properties[TRACEPARENT_PROPERTY] + (isAmqpAnnotatedMessage(eventData) + ? eventData.applicationProperties + : eventData.properties)?.[TRACEPARENT_PROPERTY] // Event Data maps properties to applicationProperties. ); let spanContext: SpanContext | undefined; if (!previouslyInstrumented) { @@ -302,7 +304,6 @@ export class EventDataBatchImpl implements EventDataBatch { // Convert EventData to RheaMessage. const amqpMessage = toRheaMessage(eventData, this._partitionKey); - amqpMessage.body = defaultDataTransformer.encode(eventData.body); const encodedMessage = message.encode(amqpMessage); let currentSize = this._sizeInBytes; diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 78ade573fdc1..72bd6d1c9c72 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; import { NamedKeyCredential, SASCredential, TokenCredential } from "@azure/core-auth"; import { SpanStatusCode, Link, Span, SpanContext, SpanKind } from "@azure/core-tracing"; import { ConnectionContext, createConnectionContext } from "./connectionContext"; @@ -226,7 +227,7 @@ export class EventHubProducerClient { * await client.sendBatch(messages); * ``` * - * @param batch - An array of {@link EventData}. + * @param batch - An array of {@link EventData} or `AmqpAnnotatedMessage`. * @param options - A set of options that can be specified to influence the way in which * events are sent to the associated Event Hub. * - `abortSignal` : A signal the request to cancel the send operation. @@ -238,7 +239,10 @@ export class EventHubProducerClient { * @throws MessagingError if an error is encountered while sending a message. * @throws Error if the underlying connection or sender has been closed. */ - async sendBatch(batch: EventData[], options?: SendBatchOptions): Promise; + async sendBatch( + batch: EventData[] | AmqpAnnotatedMessage[], + options?: SendBatchOptions + ): Promise; /** * Sends a batch of events to the associated Event Hub. * diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 7b3814280942..b0d97fe83b6b 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -25,7 +25,6 @@ import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { EventPosition, getEventPositionFilter } from "./eventPosition"; import { AbortError, AbortSignalLike } from "@azure/abort-controller"; -import { defaultDataTransformer } from "./dataTransformer"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; /** @@ -224,15 +223,28 @@ export class EventHubReceiver extends LinkEntity { } const data: EventDataInternal = fromRheaMessage(context.message); + const rawMessage = data.getRawAmqpMessage(); const receivedEventData: ReceivedEventData = { - body: defaultDataTransformer.decode(context.message.body), + body: data.body, properties: data.properties, offset: data.offset!, sequenceNumber: data.sequenceNumber!, enqueuedTimeUtc: data.enqueuedTimeUtc!, partitionKey: data.partitionKey!, - systemProperties: data.systemProperties + systemProperties: data.systemProperties, + getRawAmqpMessage() { + return rawMessage; + } }; + if (data.correlationId != null) { + receivedEventData.correlationId = data.correlationId; + } + if (data.contentType != null) { + receivedEventData.contentType = data.contentType; + } + if (data.messageId != null) { + receivedEventData.messageId = data.messageId; + } this._checkpoint = receivedEventData.sequenceNumber; diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index e79ec67200bc..441336530f99 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -30,7 +30,6 @@ import { SendOptions } from "./models/public"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; import { AbortSignalLike } from "@azure/abort-controller"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; -import { defaultDataTransformer } from "./dataTransformer"; /** * Describes the EventHubSender that will send event data to EventHub. @@ -259,7 +258,6 @@ export class EventHubSender extends LinkEntity { // Convert EventData to RheaMessage. for (let i = 0; i < events.length; i++) { const rheaMessage = toRheaMessage(events[i], partitionKey); - rheaMessage.body = defaultDataTransformer.encode(events[i].body); messages[i] = rheaMessage; } // Encode every amqp message and then convert every encoded message to amqp data section diff --git a/sdk/eventhub/event-hubs/test/internal/amqp.spec.ts b/sdk/eventhub/event-hubs/test/internal/amqp.spec.ts new file mode 100644 index 000000000000..24e9c33c4512 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/internal/amqp.spec.ts @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { Constants } from "@azure/core-amqp"; +import { fromRheaMessage, isAmqpAnnotatedMessage } from "../../src/eventData"; +const assert = chai.assert; + +describe("AMQP message encoding", () => { + it("isAmqpAnnotatedMessage", () => { + assert.isFalse(isAmqpAnnotatedMessage({})); + assert.isFalse(isAmqpAnnotatedMessage({ body: "hello world" })); + assert.isFalse( + isAmqpAnnotatedMessage( + fromRheaMessage({ + message_annotations: { + [Constants.enqueuedTime]: Date.now() + }, + body: undefined + }) + ) + ); + + assert.isTrue( + isAmqpAnnotatedMessage( + fromRheaMessage({ + message_annotations: { + [Constants.enqueuedTime]: Date.now() + }, + body: undefined + }).getRawAmqpMessage() + ) + ); + + assert.isTrue( + isAmqpAnnotatedMessage({ + body: "hello world", + bodyType: "sequence" + }) + ); + assert.isTrue( + isAmqpAnnotatedMessage({ + body: "hello world", + bodyType: "value" + }) + ); + assert.isTrue( + isAmqpAnnotatedMessage({ + body: "hello world", + bodyType: "data" + }) + ); + + assert.isTrue( + isAmqpAnnotatedMessage({ + body: "hello world", + bodyType: undefined // the property _must_ exist, but undefined is fine. We'll default to 'data' + }) + ); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/internal/dataTransformer.spec.ts b/sdk/eventhub/event-hubs/test/internal/dataTransformer.spec.ts index 45df00a6fe1e..14257983e175 100644 --- a/sdk/eventhub/event-hubs/test/internal/dataTransformer.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/dataTransformer.spec.ts @@ -41,155 +41,317 @@ describe("DataTransformer", function() { const hexBufferBody: Buffer = Buffer.from("7468697320697320612074c3a97374", "hex"); const transformer = defaultDataTransformer; - it("should correctly encode/decode a string message body", function(done) { - const encoded: any = transformer.encode(stringBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - decoded.should.equal(stringBody); - done(); - }); + describe(`encoded bodyType: "data"`, () => { + const bodyType = "data"; - it("should correctly encode/decode a number message body", function(done) { - const encoded: any = transformer.encode(numberBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - decoded.should.equal(numberBody); - done(); - }); + it("should correctly encode/decode a string message body", function(done) { + const encoded: any = transformer.encode(stringBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(stringBody); + done(); + }); - it("should correctly encode/decode a boolean message body", function(done) { - const encoded: any = transformer.encode(booleanBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - decoded.should.equal(booleanBody); - done(); - }); + it("should correctly encode/decode a number message body", function(done) { + const encoded: any = transformer.encode(numberBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(numberBody); + done(); + }); - it("should correctly encode/decode a null message body", function(done) { - const encoded: any = transformer.encode(nullBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - should.equal(decoded, nullBody); - done(); - }); + it("should correctly encode/decode a boolean message body", function(done) { + const encoded: any = transformer.encode(booleanBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(booleanBody); + done(); + }); - it("should correctly encode/decode an undefined message body", function(done) { - const encoded: any = transformer.encode(undefinedBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - should.equal(decoded, nullBody); - done(); - }); + it("should correctly encode/decode a null message body", function(done) { + const encoded: any = transformer.encode(nullBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(false); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + should.equal(decoded, nullBody); + done(); + }); - it("should correctly encode/decode an empty string message body", function(done) { - const encoded: any = transformer.encode(emptyStringBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - decoded.should.equal(emptyStringBody); - done(); - }); + it("should correctly encode/decode an undefined message body", function(done) { + const encoded: any = transformer.encode(undefinedBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(false); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + should.equal(decoded, nullBody); + done(); + }); - it("should correctly encode/decode an array message body", function(done) { - const encoded: any = transformer.encode(arrayBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - assert.deepEqual(decoded, arrayBody); - done(); - }); + it("should correctly encode/decode an empty string message body", function(done) { + const encoded: any = transformer.encode(emptyStringBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(emptyStringBody); + done(); + }); + + it("should correctly encode/decode an array message body", function(done) { + const encoded: any = transformer.encode(arrayBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, arrayBody); + done(); + }); + + it("should correctly encode/decode an object message body", function(done) { + const encoded: any = transformer.encode(objectBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, objectBody); + done(); + }); + + it("should correctly encode/decode a buffer message body", function(done) { + const encoded: any = transformer.encode(bufferBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, bufferBody); + done(); + }); - it("should correctly encode/decode an object message body", function(done) { - const encoded: any = transformer.encode(objectBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - assert.deepEqual(decoded, objectBody); - done(); + it("should correctly encode/decode a hex buffer message body", function(done) { + const encoded: any = transformer.encode(hexBufferBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, hexBufferBody); + done(); + }); }); - it("should correctly encode/decode a buffer message body", function(done) { - const encoded: any = transformer.encode(bufferBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - assert.deepEqual(decoded, bufferBody); - done(); + describe(`encoded bodyType: "value"`, () => { + const expectedTypeCode = 0x77; + const bodyType = "value"; + + it("should correctly encode/decode a string message body", function(done) { + const encoded: any = transformer.encode(stringBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(stringBody); + done(); + }); + + it("should correctly encode/decode a number message body", function(done) { + const encoded: any = transformer.encode(numberBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(numberBody); + done(); + }); + + it("should correctly encode/decode a boolean message body", function(done) { + const encoded: any = transformer.encode(booleanBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(booleanBody); + done(); + }); + + it("should correctly encode/decode a null message body", function(done) { + const encoded: any = transformer.encode(nullBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + should.equal(decoded, nullBody); + done(); + }); + + it("should correctly encode/decode an undefined message body", function(done) { + const encoded: any = transformer.encode(undefinedBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + should.equal(decoded, nullBody); + done(); + }); + + it("should correctly encode/decode an empty string message body", function(done) { + const encoded: any = transformer.encode(emptyStringBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + (decoded as any).should.equal(emptyStringBody); + done(); + }); + + it("should correctly encode/decode an array message body", function(done) { + const encoded: any = transformer.encode(arrayBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, arrayBody); + done(); + }); + + it("should correctly encode/decode an object message body", function(done) { + const encoded: any = transformer.encode(objectBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, objectBody); + done(); + }); + + it("should correctly encode/decode a buffer message body", function(done) { + const encoded: any = transformer.encode(bufferBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, bufferBody); + done(); + }); + + it("should correctly encode/decode a hex buffer message body", function(done) { + const encoded: any = transformer.encode(hexBufferBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + isBuffer(encoded.content).should.equal(true); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, hexBufferBody); + done(); + }); }); - it("should correctly encode/decode a hex buffer message body", function(done) { - const encoded: any = transformer.encode(hexBufferBody); - encoded.typecode.should.equal(117); - isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); - assert.deepEqual(decoded, hexBufferBody); - done(); + describe(`encoded bodyType: "sequence"`, () => { + const expectedTypeCode = 0x76; + const bodyType = "sequence"; + + it("should correctly encode/decode a null message body", function(done) { + const encoded: any = transformer.encode(nullBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + should.equal(decoded, nullBody); + done(); + }); + + it("should correctly encode/decode an undefined message body", function(done) { + const encoded: any = transformer.encode(undefinedBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + should.equal(decoded, nullBody); + done(); + }); + + it("should correctly encode/decode an array message body", function(done) { + const encoded: any = transformer.encode(arrayBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, arrayBody); + done(); + }); + + it("should correctly encode/decode an object message body", function(done) { + const encoded: any = transformer.encode(objectBody, bodyType); + encoded.typecode.should.equal(expectedTypeCode); + const { body: decoded, bodyType: decodedType } = transformer.decode(encoded); + should.equal(decodedType, bodyType); + assert.deepEqual(decoded, objectBody); + done(); + }); }); describe("decode", function() { // It is possible that we receive an AMQP value type from the messages that were sent with // previously shipped version of the sdk. If so then we should be able to handle those scenarios. it("should correctly decode a string message body", function(done) { - const decoded: any = transformer.decode(stringBody); - decoded.should.equal(stringBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(stringBody); + should.equal(decodedType, "value"); + (decoded as any).should.equal(stringBody); done(); }); it("should correctly decode a number message body", function(done) { - const decoded: any = transformer.decode(numberBody); - decoded.should.equal(numberBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(numberBody); + should.equal(decodedType, "value"); + (decoded as any).should.equal(numberBody); done(); }); it("should correctly decode a boolean message body", function(done) { - const decoded: any = transformer.decode(booleanBody); - decoded.should.equal(booleanBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(booleanBody); + should.equal(decodedType, "value"); + (decoded as any).should.equal(booleanBody); done(); }); it("should correctly decode a null message body", function(done) { - const decoded: any = transformer.decode(nullBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(nullBody); + should.equal(decodedType, "value"); should.equal(decoded, nullBody); done(); }); it("should correctly decode an undefined message body", function(done) { - const decoded: any = transformer.decode(undefinedBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(undefinedBody); + should.equal(decodedType, "value"); should.equal(decoded, undefined); done(); }); it("should correctly decode an empty string message body", function(done) { - const decoded: any = transformer.decode(emptyStringBody); - decoded.should.equal(emptyStringBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(emptyStringBody); + should.equal(decodedType, "value"); + (decoded as any).should.equal(emptyStringBody); done(); }); it("should correctly decode an array message body", function(done) { - const decoded: any = transformer.decode(arrayBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(arrayBody); + should.equal(decodedType, "value"); assert.deepEqual(decoded, arrayBody); done(); }); it("should correctly decode an object message body", function(done) { - const decoded: any = transformer.decode(objectBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(objectBody); + should.equal(decodedType, "value"); assert.deepEqual(decoded, objectBody); done(); }); it("should correctly decode a buffer message body", function(done) { - const decoded: any = transformer.decode(bufferBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(bufferBody); + should.equal(decodedType, "data"); assert.deepEqual(decoded, bufferBody); done(); }); it("should correctly decode a hex buffer message body", function(done) { - const decoded: any = transformer.decode(hexBufferBody); + const { body: decoded, bodyType: decodedType } = transformer.decode(hexBufferBody); + should.equal(decodedType, "data"); assert.deepEqual(decoded, hexBufferBody); done(); }); diff --git a/sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts b/sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts index feef0349b359..a8b1bd7a76b8 100644 --- a/sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts @@ -1,11 +1,17 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import chai from "chai"; +import chai, { assert, should } from "chai"; chai.should(); -import { EventData, fromRheaMessage, toRheaMessage } from "../../src/eventData"; +import { EventData, fromRheaMessage, ReceivedEventData, toRheaMessage } from "../../src/eventData"; import { Message } from "rhea-promise"; +import { + dataSectionTypeCode, + sequenceSectionTypeCode, + valueSectionTypeCode +} from "../../src/dataTransformer"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; const testAnnotations = { "x-opt-enqueued-time": Date.now(), @@ -44,6 +50,20 @@ describe("EventData", function(): void { testEventData.body.should.equal(testBody); }); + it("populates top-level fields", () => { + const testEventData = fromRheaMessage({ + ...testMessage, + ...{ content_type: "application/json", correlation_id: "cid", message_id: 1 } + }); + should().equal(testEventData.messageId, 1, "Unexpected messageId found."); + should().equal( + testEventData.contentType, + "application/json", + "Unexpected contentType found." + ); + should().equal(testEventData.correlationId, "cid", "Unexpected correlationId found."); + }); + describe("properties", function(): void { it("enqueuedTimeUtc gets the enqueued time from system properties", function(): void { const testEventData = fromRheaMessage(testMessage); @@ -171,14 +191,125 @@ describe("EventData", function(): void { }); }); }); - describe("toAmqpMessage", function(): void { - it("populates body with the message body", function(): void { - messageFromED.body.should.equal(testBody); + it("populates body with the message body encoded", function(): void { + const expectedTestBodyContents = Buffer.from(JSON.stringify(testBody)); + should().equal( + expectedTestBodyContents.equals(messageFromED.body.content), + true, + "Encoded body does not match expected result." + ); + should().equal( + messageFromED.body.typecode, + dataSectionTypeCode, + "Unexpected typecode encountered on body." + ); + }); + + it("populates top-level fields", () => { + const message = toRheaMessage({ + ...testSourceEventData, + ...{ contentType: "application/json", correlationId: "cid", messageId: 1 } + }); + should().equal(message.message_id, 1, "Unexpected message_id found."); + should().equal(message.content_type, "application/json", "Unexpected content_type found."); + should().equal(message.correlation_id, "cid", "Unexpected correlation_id found."); }); it("populates application_properties of the message", function(): void { messageFromED.application_properties!.should.equal(properties); }); + + it("AmqpAnnotatedMessage (explicit type)", () => { + const amqpAnnotatedMessage: AmqpAnnotatedMessage = { + body: "hello", + bodyType: "value" + }; + + const rheaMessage = toRheaMessage(amqpAnnotatedMessage); + + assert.equal(rheaMessage.body.typecode, valueSectionTypeCode); + }); + + it("AmqpAnnotatedMessage (implicit type)", () => { + const amqpAnnotatedMessage: AmqpAnnotatedMessage = { + body: "hello", + bodyType: undefined + }; + + const rheaMessage = toRheaMessage(amqpAnnotatedMessage); + + assert.equal(rheaMessage.body.typecode, dataSectionTypeCode); + }); + + it("EventData", () => { + const event: EventData = { + body: "hello" + }; + + const rheaMessage = toRheaMessage(event); + + assert.equal(rheaMessage.body.typecode, dataSectionTypeCode); + }); + + it("ReceivedEventData (sequence)", () => { + const event: ReceivedEventData = { + enqueuedTimeUtc: new Date(), + offset: 100, + partitionKey: null, + sequenceNumber: 1, + body: ["foo", "bar"], + getRawAmqpMessage() { + return { + body: this.body, + bodyType: "sequence" + }; + } + }; + + const rheaMessage = toRheaMessage(event); + + assert.equal(rheaMessage.body.typecode, sequenceSectionTypeCode); + }); + + it("ReceivedEventData (data)", () => { + const event: ReceivedEventData = { + enqueuedTimeUtc: new Date(), + offset: 100, + partitionKey: null, + sequenceNumber: 1, + body: ["foo", "bar"], + getRawAmqpMessage() { + return { + body: this.body, + bodyType: "data" + }; + } + }; + + const rheaMessage = toRheaMessage(event); + + assert.equal(rheaMessage.body.typecode, dataSectionTypeCode); + }); + + it("ReceivedEventData (value)", () => { + const event: ReceivedEventData = { + enqueuedTimeUtc: new Date(), + offset: 100, + partitionKey: null, + sequenceNumber: 1, + body: ["foo", "bar"], + getRawAmqpMessage() { + return { + body: this.body, + bodyType: "value" + }; + } + }; + + const rheaMessage = toRheaMessage(event); + + assert.equal(rheaMessage.body.typecode, valueSectionTypeCode); + }); }); }); diff --git a/sdk/eventhub/event-hubs/test/internal/misc.spec.ts b/sdk/eventhub/event-hubs/test/internal/misc.spec.ts index 7763ba6ffd50..4b82855c07c6 100644 --- a/sdk/eventhub/event-hubs/test/internal/misc.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/misc.spec.ts @@ -410,6 +410,9 @@ describe("extractSpanContextFromEventData", function() { partitionKey: null, properties: { [TRACEPARENT_PROPERTY]: `00-${traceId}-${spanId}-${flags}` + }, + getRawAmqpMessage() { + return {} as any; } }; @@ -437,6 +440,9 @@ describe("extractSpanContextFromEventData", function() { partitionKey: null, properties: { [TRACEPARENT_PROPERTY]: `99-${traceId}-${spanId}-${flags}` + }, + getRawAmqpMessage() { + return {} as any; } }; @@ -454,7 +460,10 @@ describe("extractSpanContextFromEventData", function() { enqueuedTimeUtc: new Date(), offset: 0, sequenceNumber: 0, - partitionKey: null + partitionKey: null, + getRawAmqpMessage() { + return {} as any; + } }; const spanContext = extractSpanContextFromEventData(eventData); diff --git a/sdk/eventhub/event-hubs/test/internal/partitionPump.spec.ts b/sdk/eventhub/event-hubs/test/internal/partitionPump.spec.ts index e3b0d5a02344..9538dd8f1827 100644 --- a/sdk/eventhub/event-hubs/test/internal/partitionPump.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/partitionPump.spec.ts @@ -73,7 +73,10 @@ describe("PartitionPump", () => { enqueuedTimeUtc: new Date(), offset: 0, partitionKey: null, - sequenceNumber: 0 + sequenceNumber: 0, + getRawAmqpMessage() { + return {} as any; + } }; const { tracer, resetTracer } = setTracerForTest(new TestTracer2()); diff --git a/sdk/eventhub/event-hubs/test/public/amqpAnnotatedMessage.spec.ts b/sdk/eventhub/event-hubs/test/public/amqpAnnotatedMessage.spec.ts new file mode 100644 index 000000000000..72c39e50b247 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/public/amqpAnnotatedMessage.spec.ts @@ -0,0 +1,296 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +import chaiExclude from "chai-exclude"; +import { Buffer } from "buffer"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; +import { v4 } from "uuid"; +import { EnvVarKeys, getEnvVars, getStartingPositionsForTests } from "./utils/testUtils"; +import { + EventHubConsumerClient, + EventHubProducerClient, + EventPosition, + ReceivedEventData, + Subscription +} from "../../src"; +import { BodyTypes } from "../../src/dataTransformer"; + +const should = chai.should(); +chai.use(chaiAsPromised); +chai.use(chaiExclude); +const assert = chai.assert; + +const env = getEnvVars(); + +describe("AmqpAnnotatedMessage", function(): void { + let producerClient: EventHubProducerClient; + let consumerClient: EventHubConsumerClient; + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME] + }; + + before("validate environment", function(): void { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + beforeEach(async () => { + producerClient = new EventHubProducerClient(service.connectionString, service.path); + consumerClient = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ); + }); + + afterEach("close the connection", async function(): Promise { + await producerClient.close(); + await consumerClient.close(); + }); + + function getSampleAmqpAnnotatedMessage(): AmqpAnnotatedMessage { + const randomTag = Math.random().toString(); + + return { + body: `message body ${randomTag}`, + bodyType: "data", + applicationProperties: { + propOne: 1, + propTwo: "two", + propThree: true, + propFour: Date() + }, + footer: { + propFooter: "foot" + }, + messageAnnotations: { propMsgAnnotate: "annotation" }, + properties: { + contentEncoding: "application/json; charset=utf-8", + correlationId: randomTag, + messageId: v4() + } + } as AmqpAnnotatedMessage; + } + + /** + * Helper function that will receive a single event that comes after the starting positions. + * + * Note: Call this after sending a single event to Event Hubs to validate + * @internal + */ + async function receiveEvent(startingPositions: { + [partitionId: string]: EventPosition; + }): Promise { + return new Promise((resolve, reject) => { + const subscription: Subscription = consumerClient.subscribe( + { + async processError(err) { + reject(err); + return subscription.close(); + }, + async processEvents(events) { + if (events.length) { + resolve(events[0]); + return subscription.close(); + } + } + }, + { + startPosition: startingPositions + } + ); + }); + } + + async function sendEvents(messages: AmqpAnnotatedMessage[], { useBatch }: { useBatch: boolean }) { + if (!useBatch) { + return producerClient.sendBatch(messages); + } + + const batch = await producerClient.createBatch(); + for (const message of messages) { + assert.isTrue(batch.tryAdd(message)); + } + + return producerClient.sendBatch(batch); + } + + describe("round-tripping AMQP encoding/decoding", () => { + [{ useBatch: true }, { useBatch: false }].forEach(({ useBatch }) => { + it(`props (useBatch: ${useBatch})`, async () => { + const startingPositions = await getStartingPositionsForTests(consumerClient); + const testMessage = getSampleAmqpAnnotatedMessage(); + await sendEvents([testMessage], { useBatch }); + + const event = await receiveEvent(startingPositions); + should.equal(event.body, testMessage.body, "Unexpected body on the received event."); + should.equal( + event.getRawAmqpMessage().messageAnnotations!["propMsgAnnotate"], + testMessage.messageAnnotations!["propMsgAnnotate"], + "Unexpected messageAnnotations on the received event." + ); + assert.deepEqualExcluding( + event.getRawAmqpMessage(), + testMessage, + ["deliveryAnnotations", "body", "messageAnnotations", "header", "properties"], + "Unexpected on the AmqpAnnotatedMessage" + ); + assert.deepEqualExcluding( + event.getRawAmqpMessage().footer!, + testMessage.footer!, + ["deliveryCount"], + "Unexpected header on the AmqpAnnotatedMessage" + ); + assert.deepEqualExcluding( + event.getRawAmqpMessage().properties!, + testMessage.properties!, + ["creationTime", "absoluteExpiryTime", "groupId"], + "Unexpected properties on the AmqpAnnotatedMessage" + ); + assert.equal( + event.getRawAmqpMessage().properties!.groupId, + testMessage.properties!.groupId, + "Unexpected session-id on the AmqpAnnotatedMessage" + ); + }); + + it(`values (useBatch: ${useBatch})`, async () => { + const valueTypes = [[1, 2, 3], 1, 1.5, "hello", { hello: "world" }]; + for (const valueType of valueTypes) { + const startingPositions = await getStartingPositionsForTests(consumerClient); + await sendEvents( + [ + { + body: valueType, + bodyType: "value" + } + ], + { useBatch } + ); + + const event = await receiveEvent(startingPositions); + assert.deepEqual( + event.getRawAmqpMessage().bodyType, + "value", + `Should be identified as a value: ${valueType.toString()}` + ); + + assert.deepEqual( + event.body, + valueType, + `Deserialized body should be equal: ${valueType.toString()}` + ); + } + }); + + it(`sequences (useBatch: ${useBatch})`, async () => { + const sequenceTypes = [ + [[1], [2], [3]], + [1, 2, 3] + ]; + + for (const sequenceType of sequenceTypes) { + const startingPositions = await getStartingPositionsForTests(consumerClient); + await sendEvents( + [ + { + body: sequenceType, + bodyType: "sequence" + } + ], + { useBatch } + ); + + const event = await receiveEvent(startingPositions); + assert.deepEqual( + event.getRawAmqpMessage().bodyType, + "sequence", + `Should be identified as a value: ${sequenceType.toString()}` + ); + + assert.deepEqual( + event.body, + sequenceType, + `Deserialized body should be equal: ${sequenceType.toString()}` + ); + } + }); + + it(`data (useBatch: ${useBatch})`, async () => { + const buff = Buffer.from("hello", "utf8"); + + const dataTypes = [1, 1.5, "hello", { hello: "world" }, buff, [1, 2, 3]]; + + for (const dataType of dataTypes) { + const startingPositions = await getStartingPositionsForTests(consumerClient); + await sendEvents( + [ + { + body: dataType, + bodyType: "data" + } + ], + { useBatch } + ); + + const event = await receiveEvent(startingPositions); + + assert.deepEqual( + event.getRawAmqpMessage().bodyType, + "data", + `Should be identified as data: ${dataType.toString()}` + ); + assert.deepEqual( + event.body, + dataType, + `Deserialized body should be equal: : ${dataType.toString()}` + ); + } + }); + + ([ + ["sequence", [1, 2, 3]], + ["value", "hello"], + ["data", "hello"] + ] as [BodyTypes, any][]).forEach(([expectedBodyType, expectedBody]) => { + it(`receive EventData and resend (useBatch: ${useBatch})`, async () => { + let startingPositions = await getStartingPositionsForTests(consumerClient); + // if we receive an event that was encoded to a non-data section + // and then re-send it (again, as an EventData) we should + // respect it. + await sendEvents( + [ + { + body: expectedBody, + bodyType: expectedBodyType + } + ], + { useBatch } + ); + + const event = await receiveEvent(startingPositions); + + assert.equal(event.getRawAmqpMessage().bodyType, expectedBodyType); + + startingPositions = await getStartingPositionsForTests(consumerClient); + // now let's just resend it, unaltered + await sendEvents([event], { useBatch }); + + const reencodedEvent = await receiveEvent(startingPositions); + + assert.equal(reencodedEvent.getRawAmqpMessage().bodyType, expectedBodyType); + assert.deepEqual(reencodedEvent.body, expectedBody); + }); + }); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/public/eventData.spec.ts b/sdk/eventhub/event-hubs/test/public/eventData.spec.ts new file mode 100644 index 000000000000..16105d656921 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/public/eventData.spec.ts @@ -0,0 +1,132 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +import chaiExclude from "chai-exclude"; +import { v4 } from "uuid"; +import { EnvVarKeys, getEnvVars, getStartingPositionsForTests } from "./utils/testUtils"; +import { + EventData, + EventHubConsumerClient, + EventHubProducerClient, + EventPosition, + ReceivedEventData, + Subscription +} from "../../src"; + +const should = chai.should(); +chai.use(chaiAsPromised); +chai.use(chaiExclude); + +const env = getEnvVars(); + +describe("EventData", function(): void { + let producerClient: EventHubProducerClient; + let consumerClient: EventHubConsumerClient; + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME] + }; + + before("validate environment", function(): void { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + beforeEach(async () => { + producerClient = new EventHubProducerClient(service.connectionString, service.path); + consumerClient = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ); + }); + + afterEach("close the connection", async function(): Promise { + await producerClient.close(); + await consumerClient.close(); + }); + + function getSampleEventData(): EventData { + const randomTag = Math.random().toString(); + + return { + body: `message body ${randomTag}`, + contentEncoding: "application/json; charset=utf-8", + correlationId: randomTag, + messageId: v4() + } as EventData; + } + + /** + * Helper function that will receive a single event that comes after the starting positions. + * + * Note: Call this after sending a single event to Event Hubs to validate + * @internal + */ + async function receiveEvent(startingPositions: { + [partitionId: string]: EventPosition; + }): Promise { + return new Promise((resolve, reject) => { + const subscription: Subscription = consumerClient.subscribe( + { + async processError(err) { + reject(err); + return subscription.close(); + }, + async processEvents(events) { + if (events.length) { + resolve(events[0]); + return subscription.close(); + } + } + }, + { + startPosition: startingPositions + } + ); + }); + } + + describe("round-tripping AMQP encoding/decoding", () => { + it(`props`, async () => { + const startingPositions = await getStartingPositionsForTests(consumerClient); + const testEvent = getSampleEventData(); + await producerClient.sendBatch([testEvent]); + + const event = await receiveEvent(startingPositions); + should.equal(event.body, testEvent.body, "Unexpected body on the received event."); + should.equal( + event.contentType, + testEvent.contentType, + "Unexpected contentType on the received event." + ); + should.equal( + event.correlationId, + testEvent.correlationId, + "Unexpected correlationId on the received event." + ); + should.equal( + event.messageId, + testEvent.messageId, + "Unexpected messageId on the received event." + ); + }); + + it(`null body`, async () => { + const startingPositions = await getStartingPositionsForTests(consumerClient); + const testEvent: EventData = { body: null }; + await producerClient.sendBatch([testEvent]); + + const event = await receiveEvent(startingPositions); + should.equal(event.body, testEvent.body, "Unexpected body on the received event."); + }); + }); +});