Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] update sendBatch to accept lists of events #8622

Merged
merged 2 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 5.1.1 (Unreleased)

- Updates the `EventHubProducerClient.sendBatch` API to accept an array of events.

## 5.1.0 (2020-04-07)

Expand Down
5 changes: 4 additions & 1 deletion sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ export class EventHubProducerClient {
getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise<EventHubProperties>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<Array<string>>;
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
sendBatch(batch: EventDataBatch, options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
}

// @public
Expand Down Expand Up @@ -225,6 +226,8 @@ export { RetryOptions }

// @public
export interface SendBatchOptions extends OperationOptions {
partitionId?: string;
partitionKey?: string;
}

// @public
Expand Down
60 changes: 55 additions & 5 deletions sdk/eventhub/event-hubs/src/eventHubProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.

import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { EventDataBatch } from "./eventDataBatch";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { EventHubClient } from "./impl/eventHubClient";
import { EventHubProperties, PartitionProperties } from "./managementClient";
import { EventHubProducer } from "./sender";
Expand All @@ -14,6 +14,8 @@ import {
EventHubClientOptions,
CreateBatchOptions
} from "./models/public";
import { EventData } from "./eventData";
import { OperationOptions } from "./util/operationOptions";

/**
* The `EventHubProducerClient` class is used to send events to an Event Hub.
Expand Down Expand Up @@ -159,6 +161,22 @@ export class EventHubProducerClient {
return producer.createBatch(options);
}

/**
* Sends an array of events to the associated Event Hub.
*
* @param batch An array of {@link EventData}.
* @param options A set of options that can be specified to influence the way in which
* events are sent to the associated Event Hub.
* - `abortSignal` : A signal the request to cancel the send operation.
* - `partitionId` : The partition this batch will be sent to. If set, `partitionKey` can not be set.
* - `partitionKey` : A value that is hashed to produce a partition assignment. If set, `partitionId` can not be set.
*
* @returns Promise<void>
* @throws AbortError if the operation is cancelled via the abortSignal.
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
*/
async sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
/**
* Sends a batch of events to the associated Event Hub.
*
Expand All @@ -172,11 +190,43 @@ export class EventHubProducerClient {
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
*/
async sendBatch(batch: EventDataBatch, options?: SendBatchOptions): Promise<void> {
let partitionId = "";

if (batch.partitionId) {
async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
async sendBatch(
batch: EventDataBatch | EventData[],
options: SendBatchOptions | OperationOptions = {}
): Promise<void> {
let partitionId: string | undefined;
let partitionKey: string | undefined;
if (isEventDataBatch(batch)) {
// For batches, partitionId and partitionKey would be set on the batch.
partitionId = batch.partitionId;
partitionKey = batch.partitionKey;
const unexpectedOptions = options as SendBatchOptions;
if (unexpectedOptions.partitionKey && partitionKey !== unexpectedOptions.partitionKey) {
throw new Error(
`The partitionKey (${unexpectedOptions.partitionKey}) set on sendBatch does not match the partitionKey (${partitionKey}) set when creating the batch.`
);
}
if (unexpectedOptions.partitionId && unexpectedOptions.partitionId !== partitionId) {
throw new Error(
`The partitionId (${unexpectedOptions.partitionId}) set on sendBatch does not match the partitionId (${partitionId}) set when creating the batch.`
);
}
} else {
// For arrays of events, partitionId and partitionKey would be set in the options.
const expectedOptions = options as SendBatchOptions;
partitionId = expectedOptions.partitionId;
partitionKey = expectedOptions.partitionKey;
}
if (partitionId && partitionKey) {
throw new Error(
`The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.`
);
}

if (!partitionId) {
// The producer map requires that partitionId be a string.
partitionId = "";
}

let producer = this._producersMap.get(partitionId);
Expand Down
25 changes: 21 additions & 4 deletions sdk/eventhub/event-hubs/src/models/public.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,27 @@ export interface GetPartitionPropertiesOptions extends OperationOptions {}
export interface GetPartitionIdsOptions extends OperationOptions {}

/**
* Options to configure the `sendBatch` method on the `EventHubProducerClient`.
* Options to configure the `sendBatch` method on the `EventHubProducerClient`
* when sending an array of events.
* If `partitionId` is set, `partitionKey` must not be set and vice versa.
*
* - `partitionId` : The partition this batch will be sent to.
* - `partitionKey` : A value that is hashed to produce a partition assignment.
* - `abortSignal` : A signal used to cancel the send operation.
*/
export interface SendBatchOptions extends OperationOptions {}
export interface SendBatchOptions extends OperationOptions {
/**
* The partition this batch will be sent to.
* If this value is set then partitionKey can not be set.
*/
partitionId?: string;
/**
* A value that is hashed to produce a partition assignment.
* It guarantees that messages with the same partitionKey end up in the same partition.
* Specifying this will throw an error if the producer was created using a `paritionId`.
*/
partitionKey?: string;
}

/**
* The set of options to configure the `send` operation on the `EventHubProducer`.
Expand All @@ -46,14 +63,14 @@ export interface SendBatchOptions extends OperationOptions {}
* @internal
* @ignore
*/
export interface SendOptions extends SendBatchOptions {
export interface SendOptions extends OperationOptions {
/**
* @property
* A value that is hashed to produce a partition assignment.
* It guarantees that messages with the same partitionKey end up in the same partition.
* Specifying this will throw an error if the producer was created using a `paritionId`.
*/
partitionKey?: string | null;
partitionKey?: string;
}

/**
Expand Down
Loading