From 9afbdaa4d873ef1081f599831e2595cbf6bbb41f Mon Sep 17 00:00:00 2001 From: chradek <51000525+chradek@users.noreply.github.com> Date: Thu, 30 Apr 2020 09:33:02 -0700 Subject: [PATCH] [event-hubs] update sendBatch to accept lists of events (#8622) * [event-hubs] update sendBatch to accept lists of events * improve comments --- sdk/eventhub/event-hubs/CHANGELOG.md | 1 + .../event-hubs/review/event-hubs.api.md | 5 +- .../event-hubs/src/eventHubProducerClient.ts | 60 ++- sdk/eventhub/event-hubs/src/models/public.ts | 25 +- sdk/eventhub/event-hubs/test/sender.spec.ts | 366 +++++++++++++++++- 5 files changed, 445 insertions(+), 12 deletions(-) diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index bc76c35f0bdb..8238716c606a 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -2,6 +2,7 @@ ## 5.1.1 (Unreleased) +- Updates the `EventHubProducerClient.sendBatch` API to accept an array of events. ## 5.1.0 (2020-04-07) diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index d61254936144..7e49ae7e2916 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -110,7 +110,8 @@ export class EventHubProducerClient { getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise; getPartitionIds(options?: GetPartitionIdsOptions): Promise>; getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise; - sendBatch(batch: EventDataBatch, options?: SendBatchOptions): Promise; + sendBatch(batch: EventData[], options?: SendBatchOptions): Promise; + sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; } // @public @@ -225,6 +226,8 @@ export { RetryOptions } // @public export interface SendBatchOptions extends OperationOptions { + partitionId?: string; + partitionKey?: string; } // @public diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 9725dc474ec0..84f3443d67d8 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -2,7 +2,7 @@ // Licensed under the MIT License. import { isTokenCredential, TokenCredential } from "@azure/core-amqp"; -import { EventDataBatch } from "./eventDataBatch"; +import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; import { EventHubClient } from "./impl/eventHubClient"; import { EventHubProperties, PartitionProperties } from "./managementClient"; import { EventHubProducer } from "./sender"; @@ -14,6 +14,8 @@ import { EventHubClientOptions, CreateBatchOptions } from "./models/public"; +import { EventData } from "./eventData"; +import { OperationOptions } from "./util/operationOptions"; /** * The `EventHubProducerClient` class is used to send events to an Event Hub. @@ -159,6 +161,22 @@ export class EventHubProducerClient { return producer.createBatch(options); } + /** + * Sends an array of events to the associated Event Hub. + * + * @param batch An array of {@link EventData}. + * @param options A set of options that can be specified to influence the way in which + * events are sent to the associated Event Hub. + * - `abortSignal` : A signal the request to cancel the send operation. + * - `partitionId` : The partition this batch will be sent to. If set, `partitionKey` can not be set. + * - `partitionKey` : A value that is hashed to produce a partition assignment. If set, `partitionId` can not be set. + * + * @returns Promise + * @throws AbortError if the operation is cancelled via the abortSignal. + * @throws MessagingError if an error is encountered while sending a message. + * @throws Error if the underlying connection or sender has been closed. + */ + async sendBatch(batch: EventData[], options?: SendBatchOptions): Promise; /** * Sends a batch of events to the associated Event Hub. * @@ -172,11 +190,43 @@ export class EventHubProducerClient { * @throws MessagingError if an error is encountered while sending a message. * @throws Error if the underlying connection or sender has been closed. */ - async sendBatch(batch: EventDataBatch, options?: SendBatchOptions): Promise { - let partitionId = ""; - - if (batch.partitionId) { + async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; + async sendBatch( + batch: EventDataBatch | EventData[], + options: SendBatchOptions | OperationOptions = {} + ): Promise { + let partitionId: string | undefined; + let partitionKey: string | undefined; + if (isEventDataBatch(batch)) { + // For batches, partitionId and partitionKey would be set on the batch. partitionId = batch.partitionId; + partitionKey = batch.partitionKey; + const unexpectedOptions = options as SendBatchOptions; + if (unexpectedOptions.partitionKey && partitionKey !== unexpectedOptions.partitionKey) { + throw new Error( + `The partitionKey (${unexpectedOptions.partitionKey}) set on sendBatch does not match the partitionKey (${partitionKey}) set when creating the batch.` + ); + } + if (unexpectedOptions.partitionId && unexpectedOptions.partitionId !== partitionId) { + throw new Error( + `The partitionId (${unexpectedOptions.partitionId}) set on sendBatch does not match the partitionId (${partitionId}) set when creating the batch.` + ); + } + } else { + // For arrays of events, partitionId and partitionKey would be set in the options. + const expectedOptions = options as SendBatchOptions; + partitionId = expectedOptions.partitionId; + partitionKey = expectedOptions.partitionKey; + } + if (partitionId && partitionKey) { + throw new Error( + `The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.` + ); + } + + if (!partitionId) { + // The producer map requires that partitionId be a string. + partitionId = ""; } let producer = this._producersMap.get(partitionId); diff --git a/sdk/eventhub/event-hubs/src/models/public.ts b/sdk/eventhub/event-hubs/src/models/public.ts index 128f1f638d65..35c8659fa6cc 100644 --- a/sdk/eventhub/event-hubs/src/models/public.ts +++ b/sdk/eventhub/event-hubs/src/models/public.ts @@ -26,10 +26,27 @@ export interface GetPartitionPropertiesOptions extends OperationOptions {} export interface GetPartitionIdsOptions extends OperationOptions {} /** - * Options to configure the `sendBatch` method on the `EventHubProducerClient`. + * Options to configure the `sendBatch` method on the `EventHubProducerClient` + * when sending an array of events. + * If `partitionId` is set, `partitionKey` must not be set and vice versa. + * + * - `partitionId` : The partition this batch will be sent to. + * - `partitionKey` : A value that is hashed to produce a partition assignment. * - `abortSignal` : A signal used to cancel the send operation. */ -export interface SendBatchOptions extends OperationOptions {} +export interface SendBatchOptions extends OperationOptions { + /** + * The partition this batch will be sent to. + * If this value is set then partitionKey can not be set. + */ + partitionId?: string; + /** + * A value that is hashed to produce a partition assignment. + * It guarantees that messages with the same partitionKey end up in the same partition. + * Specifying this will throw an error if the producer was created using a `paritionId`. + */ + partitionKey?: string; +} /** * The set of options to configure the `send` operation on the `EventHubProducer`. @@ -46,14 +63,14 @@ export interface SendBatchOptions extends OperationOptions {} * @internal * @ignore */ -export interface SendOptions extends SendBatchOptions { +export interface SendOptions extends OperationOptions { /** * @property * A value that is hashed to produce a partition assignment. * It guarantees that messages with the same partitionKey end up in the same partition. * Specifying this will throw an error if the producer was created using a `paritionId`. */ - partitionKey?: string | null; + partitionKey?: string; } /** diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 701c1de5b9eb..5aeee9eb09cf 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -7,9 +7,14 @@ import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); import debugModule from "debug"; const debug = debugModule("azure:event-hubs:sender-spec"); -import { EventData, EventHubProducerClient, EventHubConsumerClient } from "../src"; +import { + EventData, + EventHubProducerClient, + EventHubConsumerClient, + ReceivedEventData +} from "../src"; import { EventHubClient } from "../src/impl/eventHubClient"; -import { SendOptions } from "../src/models/public"; +import { SendOptions, SendBatchOptions } from "../src/models/public"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { AbortController } from "@azure/abort-controller"; import { TestTracer, setTracer, SpanGraph } from "@azure/core-tracing"; @@ -825,6 +830,363 @@ describe("EventHub Sender", function(): void { }); }); + describe("Array of events", function() { + let consumerClient: EventHubConsumerClient; + beforeEach(() => { + consumerClient = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ); + }); + + afterEach(() => { + return consumerClient.close(); + }); + + it("should be sent successfully", async () => { + const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; + const receivedEvents: ReceivedEventData[] = []; + let receivingResolver: Function; + const receivingPromise = new Promise((r) => (receivingResolver = r)); + const subscription = consumerClient.subscribe( + { + async processError() {}, + async processEvents(events) { + receivedEvents.push(...events); + receivingResolver(); + } + }, + { + startPosition: { enqueuedOn: new Date(), isInclusive: true }, + maxBatchSize: data.length + } + ); + + await producerClient.sendBatch(data); + + await receivingPromise; + await subscription.close(); + + receivedEvents.length.should.equal(data.length); + receivedEvents.map((e) => e.body).should.eql(data.map((d) => d.body)); + }); + + it("should be sent successfully with partitionKey", async () => { + const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; + const receivedEvents: ReceivedEventData[] = []; + let receivingResolver: Function; + const receivingPromise = new Promise((r) => (receivingResolver = r)); + const subscription = consumerClient.subscribe( + { + async processError() {}, + async processEvents(events) { + receivedEvents.push(...events); + receivingResolver(); + } + }, + { + startPosition: { enqueuedOn: new Date(), isInclusive: true }, + maxBatchSize: data.length + } + ); + + await producerClient.sendBatch(data, { partitionKey: "foo" }); + + await receivingPromise; + await subscription.close(); + + receivedEvents.length.should.equal(data.length); + receivedEvents.map((e) => e.body).should.eql(data.map((d) => d.body)); + for (let i = 0; i < receivedEvents.length; i++) { + receivedEvents[i].body.should.equal(data[i].body); + } + }); + + it("should be sent successfully with partitionId", async () => { + const partitionId = "0"; + const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; + const receivedEvents: ReceivedEventData[] = []; + let receivingResolver: Function; + const receivingPromise = new Promise((r) => (receivingResolver = r)); + const subscription = consumerClient.subscribe( + partitionId, + { + async processError() {}, + async processEvents(events) { + receivedEvents.push(...events); + receivingResolver(); + } + }, + { + startPosition: { enqueuedOn: new Date(), isInclusive: true }, + maxBatchSize: data.length + } + ); + + await producerClient.sendBatch(data, { partitionId }); + + await receivingPromise; + await subscription.close(); + + receivedEvents.length.should.equal(data.length); + receivedEvents.map((e) => e.body).should.eql(data.map((d) => d.body)); + for (let i = 0; i < receivedEvents.length; i++) { + receivedEvents[i].body.should.equal(data[i].body); + } + }); + + it("can be manually traced", async function(): Promise { + const tracer = new TestTracer(); + setTracer(tracer); + + const rootSpan = tracer.startSpan("root"); + + const events = []; + for (let i = 0; i < 5; i++) { + events.push({ body: `multiple messages - manual trace propgation: ${i}` }); + } + await producerClient.sendBatch(events, { + tracingOptions: { + spanOptions: { + parent: rootSpan.context() + } + } + }); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root spans."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.send", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + }); + + it("skips already instrumented events when manually traced", async function(): Promise { + const tracer = new TestTracer(); + setTracer(tracer); + + const rootSpan = tracer.startSpan("root"); + + const events: EventData[] = []; + for (let i = 0; i < 5; i++) { + events.push({ body: `multiple messages - manual trace propgation: ${i}` }); + } + events[0].properties = { [TRACEPARENT_PROPERTY]: "foo" }; + await producerClient.sendBatch(events, { + tracingOptions: { + spanOptions: { + parent: rootSpan.context() + } + } + }); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root spans."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.message", + children: [] + }, + { + name: "Azure.EventHubs.send", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + }); + }); + + describe("Validation", function() { + describe("sendBatch", function() { + describe("with EventDataBatch", function() { + it("works if partitionKeys match", async () => { + const misconfiguredOptions: SendBatchOptions = { + partitionKey: "foo" + }; + const batch = await producerClient.createBatch({ partitionKey: "foo" }); + await producerClient.sendBatch(batch, misconfiguredOptions); + }); + it("works if partitionIds match", async () => { + const misconfiguredOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch({ partitionId: "0" }); + await producerClient.sendBatch(batch, misconfiguredOptions); + }); + it("throws an error if partitionKeys don't match", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "bar" + }; + const batch = await producerClient.createBatch({ partitionKey: "foo" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionKey (bar) set on sendBatch does not match the partitionKey (foo) set when creating the batch." + ); + } + }); + it("throws an error if partitionKeys don't match (undefined)", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "bar" + }; + const batch = await producerClient.createBatch(); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionKey (bar) set on sendBatch does not match the partitionKey (undefined) set when creating the batch." + ); + } + }); + it("throws an error if partitionIds don't match", async () => { + const badOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch({ partitionId: "1" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) set on sendBatch does not match the partitionId (1) set when creating the batch." + ); + } + }); + it("throws an error if partitionIds don't match (undefined)", async () => { + const badOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch(); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) set on sendBatch does not match the partitionId (undefined) set when creating the batch." + ); + } + }); + it("throws an error if partitionId and partitionKey are set (create, send)", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo" + }; + const batch = await producerClient.createBatch({ partitionId: "0" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.not.equal("Test failure"); + } + }); + it("throws an error if partitionId and partitionKey are set (send, create)", async () => { + const badOptions: SendBatchOptions = { + partitionId: "0" + }; + const batch = await producerClient.createBatch({ partitionKey: "foo" }); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.not.equal("Test failure"); + } + }); + it("throws an error if partitionId and partitionKey are set (send, send)", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo", + partitionId: "0" + }; + const batch = await producerClient.createBatch(); + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.not.equal("Test failure"); + } + }); + }); + describe("with events array", function() { + it("throws an error if partitionId and partitionKey are set", async () => { + const badOptions: SendBatchOptions = { + partitionKey: "foo", + partitionId: "0" + }; + const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; + try { + await producerClient.sendBatch(batch, badOptions); + throw new Error("Test failure"); + } catch (err) { + err.message.should.equal( + "The partitionId (0) and partitionKey (foo) cannot both be specified." + ); + } + }); + }); + }); + }); + describe("Negative scenarios", function(): void { it("a message greater than 1 MB should fail.", async function(): Promise { const data: EventData = {