Skip to content

Commit

Permalink
[event-hubs] update sendBatch to accept lists of events (#8622)
Browse files Browse the repository at this point in the history
* [event-hubs] update sendBatch to accept lists of events

* improve comments
  • Loading branch information
chradek authored Apr 30, 2020
1 parent 9ec9f06 commit 9afbdaa
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 12 deletions.
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 5.1.1 (Unreleased)

- Updates the `EventHubProducerClient.sendBatch` API to accept an array of events.

## 5.1.0 (2020-04-07)

Expand Down
5 changes: 4 additions & 1 deletion sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ export class EventHubProducerClient {
getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise<EventHubProperties>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<Array<string>>;
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
sendBatch(batch: EventDataBatch, options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
}

// @public
Expand Down Expand Up @@ -225,6 +226,8 @@ export { RetryOptions }

// @public
export interface SendBatchOptions extends OperationOptions {
partitionId?: string;
partitionKey?: string;
}

// @public
Expand Down
60 changes: 55 additions & 5 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.

import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { EventDataBatch } from "./eventDataBatch";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { EventHubClient } from "./impl/eventHubClient";
import { EventHubProperties, PartitionProperties } from "./managementClient";
import { EventHubProducer } from "./sender";
Expand All @@ -14,6 +14,8 @@ import {
EventHubClientOptions,
CreateBatchOptions
} from "./models/public";
import { EventData } from "./eventData";
import { OperationOptions } from "./util/operationOptions";

/**
* The `EventHubProducerClient` class is used to send events to an Event Hub.
Expand Down Expand Up @@ -159,6 +161,22 @@ export class EventHubProducerClient {
return producer.createBatch(options);
}

/**
* Sends an array of events to the associated Event Hub.
*
* @param batch An array of {@link EventData}.
* @param options A set of options that can be specified to influence the way in which
* events are sent to the associated Event Hub.
* - `abortSignal` : A signal the request to cancel the send operation.
* - `partitionId` : The partition this batch will be sent to. If set, `partitionKey` can not be set.
* - `partitionKey` : A value that is hashed to produce a partition assignment. If set, `partitionId` can not be set.
*
* @returns Promise<void>
* @throws AbortError if the operation is cancelled via the abortSignal.
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
*/
async sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
/**
* Sends a batch of events to the associated Event Hub.
*
Expand All @@ -172,11 +190,43 @@ export class EventHubProducerClient {
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
*/
async sendBatch(batch: EventDataBatch, options?: SendBatchOptions): Promise<void> {
let partitionId = "";

if (batch.partitionId) {
async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
async sendBatch(
batch: EventDataBatch | EventData[],
options: SendBatchOptions | OperationOptions = {}
): Promise<void> {
let partitionId: string | undefined;
let partitionKey: string | undefined;
if (isEventDataBatch(batch)) {
// For batches, partitionId and partitionKey would be set on the batch.
partitionId = batch.partitionId;
partitionKey = batch.partitionKey;
const unexpectedOptions = options as SendBatchOptions;
if (unexpectedOptions.partitionKey && partitionKey !== unexpectedOptions.partitionKey) {
throw new Error(
`The partitionKey (${unexpectedOptions.partitionKey}) set on sendBatch does not match the partitionKey (${partitionKey}) set when creating the batch.`
);
}
if (unexpectedOptions.partitionId && unexpectedOptions.partitionId !== partitionId) {
throw new Error(
`The partitionId (${unexpectedOptions.partitionId}) set on sendBatch does not match the partitionId (${partitionId}) set when creating the batch.`
);
}
} else {
// For arrays of events, partitionId and partitionKey would be set in the options.
const expectedOptions = options as SendBatchOptions;
partitionId = expectedOptions.partitionId;
partitionKey = expectedOptions.partitionKey;
}
if (partitionId && partitionKey) {
throw new Error(
`The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.`
);
}

if (!partitionId) {
// The producer map requires that partitionId be a string.
partitionId = "";
}

let producer = this._producersMap.get(partitionId);
Expand Down
25 changes: 21 additions & 4 deletions sdk/eventhub/event-hubs/src/models/public.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,27 @@ export interface GetPartitionPropertiesOptions extends OperationOptions {}
export interface GetPartitionIdsOptions extends OperationOptions {}

/**
* Options to configure the `sendBatch` method on the `EventHubProducerClient`.
* Options to configure the `sendBatch` method on the `EventHubProducerClient`
* when sending an array of events.
* If `partitionId` is set, `partitionKey` must not be set and vice versa.
*
* - `partitionId` : The partition this batch will be sent to.
* - `partitionKey` : A value that is hashed to produce a partition assignment.
* - `abortSignal` : A signal used to cancel the send operation.
*/
export interface SendBatchOptions extends OperationOptions {}
export interface SendBatchOptions extends OperationOptions {
/**
* The partition this batch will be sent to.
* If this value is set then partitionKey can not be set.
*/
partitionId?: string;
/**
* A value that is hashed to produce a partition assignment.
* It guarantees that messages with the same partitionKey end up in the same partition.
* Specifying this will throw an error if the producer was created using a `paritionId`.
*/
partitionKey?: string;
}

/**
* The set of options to configure the `send` operation on the `EventHubProducer`.
Expand All @@ -46,14 +63,14 @@ export interface SendBatchOptions extends OperationOptions {}
* @internal
* @ignore
*/
export interface SendOptions extends SendBatchOptions {
export interface SendOptions extends OperationOptions {
/**
* @property
* A value that is hashed to produce a partition assignment.
* It guarantees that messages with the same partitionKey end up in the same partition.
* Specifying this will throw an error if the producer was created using a `paritionId`.
*/
partitionKey?: string | null;
partitionKey?: string;
}

/**
Expand Down
Loading

0 comments on commit 9afbdaa

Please sign in to comment.