From c8afde10afc64b148e9a4f50b5aa9a4e44803e91 Mon Sep 17 00:00:00 2001 From: chradek <51000525+chradek@users.noreply.github.com> Date: Thu, 5 Sep 2019 16:49:43 -0700 Subject: [PATCH] [Event Hubs] Updates APIs surrounding event processing and checkpointing (#4994) --- sdk/eventhub/event-hubs/README.md | 14 +- .../event-hubs/review/event-hubs.api.md | 36 +- sdk/eventhub/event-hubs/samples/README.md | 4 +- .../event-hubs/samples/eventProcessor.ts | 68 ++- .../event-hubs/src/checkpointManager.ts | 126 ----- sdk/eventhub/event-hubs/src/eventProcessor.ts | 141 ++---- .../src/inMemoryPartitionManager.ts | 2 +- sdk/eventhub/event-hubs/src/index.ts | 7 +- .../event-hubs/src/partitionContext.ts | 143 +++++- .../event-hubs/src/partitionProcessor.ts | 65 +++ sdk/eventhub/event-hubs/src/partitionPump.ts | 26 +- sdk/eventhub/event-hubs/src/pumpManager.ts | 14 +- .../event-hubs/test/eventProcessor.spec.ts | 467 +++++++++--------- 13 files changed, 542 insertions(+), 571 deletions(-) delete mode 100644 sdk/eventhub/event-hubs/src/checkpointManager.ts create mode 100644 sdk/eventhub/event-hubs/src/partitionProcessor.ts diff --git a/sdk/eventhub/event-hubs/README.md b/sdk/eventhub/event-hubs/README.md index ed897484a2c8..5777a0e90a4a 100644 --- a/sdk/eventhub/event-hubs/README.md +++ b/sdk/eventhub/event-hubs/README.md @@ -214,25 +214,25 @@ While load balancing is a feature we will be adding in the next update, you can example, where we use an [InMemoryPartitionManager](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/inmemorypartitionmanager.html) that does checkpointing in memory. ```javascript -class SimplePartitionProcessor { +class SamplePartitionProcessor extends PartitionProcessor { // Gets called once before the processing of events from current partition starts. - async initialize() { + async initialize(partitionContext) { /* your code here */ } // Gets called for each batch of events that are received. // You may choose to use the checkpoint manager to update checkpoints. - async processEvents(events) { + async processEvents(events, partitionContext) { /* your code here */ } // Gets called for any error when receiving events. - async processError(error) { + async processError(error, partitionContext) { /* your code here */ } // Gets called when Event Processor stops processing events for current partition. - async close(reason) { + async close(reason, partitionContext) { /* your code here */ } } @@ -241,12 +241,12 @@ const client = new EventHubClient("my-connection-string", "my-event-hub"); const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - (partitionContext, checkpointManager) => new SimplePartitionProcessor(), + SamplePartitionProcessor, new InMemoryPartitionManager() ); await processor.start(); // At this point, the processor is consuming events from each partition of the Event Hub and -// delegating them to the SimplePartitionProcessor instance created for that partition. This +// delegating them to the SamplePartitionProcessor instance created for that partition. This // processing takes place in the background and will not block. // // In this example, we'll stop processing after five seconds. diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index b7c177fedea5..14c1fe6b13c3 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -40,14 +40,6 @@ export interface Checkpoint { sequenceNumber: number; } -// @public -export class CheckpointManager { - // @internal - constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, eventProcessorId: string); - updateCheckpoint(eventData: ReceivedEventData): Promise; - updateCheckpoint(sequenceNumber: number, offset: number): Promise; -} - // @public export enum CloseReason { EventHubException = "EventHubException", @@ -175,19 +167,15 @@ export class EventPosition { // @public export class EventProcessor { - constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions); + constructor(consumerGroupName: string, eventHubClient: EventHubClient, PartitionProcessorClass: typeof PartitionProcessor, partitionManager: PartitionManager, options?: EventProcessorOptions); readonly id: string; start(): void; stop(): Promise; } -// @public (undocumented) +// @public export interface EventProcessorOptions { - // (undocumented) - initialEventPosition?: EventPosition; - // (undocumented) maxBatchSize?: number; - // (undocumented) maxWaitTimeInSeconds?: number; } @@ -207,10 +195,13 @@ export type OnError = (error: MessagingError | Error) => void; export type OnMessage = (eventData: ReceivedEventData) => void; // @public -export interface PartitionContext { +export class PartitionContext { + constructor(eventHubName: string, consumerGroupName: string, partitionId: string, partitionManager: PartitionManager, eventProcessorId: string); readonly consumerGroupName: string; readonly eventHubName: string; readonly partitionId: string; + updateCheckpoint(eventData: ReceivedEventData): Promise; + updateCheckpoint(sequenceNumber: number, offset: number): Promise; } // @public @@ -234,16 +225,11 @@ export interface PartitionOwnership { } // @public -export interface PartitionProcessor { - close?(reason: CloseReason): Promise; - initialize?(): Promise; - processError(error: Error): Promise; - processEvents(events: ReceivedEventData[]): Promise; -} - -// @public -export interface PartitionProcessorFactory { - (context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor; +export class PartitionProcessor { + close(reason: CloseReason, partitionContext: PartitionContext): Promise; + initialize(partitionContext: PartitionContext): Promise; + processError(error: Error, partitionContext: PartitionContext): Promise; + processEvents(events: ReceivedEventData[], partitionContext: PartitionContext): Promise; } // @public diff --git a/sdk/eventhub/event-hubs/samples/README.md b/sdk/eventhub/event-hubs/samples/README.md index dd938b3da4ae..f63b4f6282dd 100644 --- a/sdk/eventhub/event-hubs/samples/README.md +++ b/sdk/eventhub/event-hubs/samples/README.md @@ -1,12 +1,12 @@ ## Getting started with samples ## -The samples in this folder are for version 3.0.0 and above of this library. If you are using version 2.1.0 or lower, then please use [samples for v2.1.0](https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples) instead +The samples in this folder are for version 5.0.0 and above of this library. If you are using version 2.1.0 or lower, then please use [samples for v2.1.0](https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples) instead ## Install the library Run the below in your samples folder to install the npm package for Event Hubs library. ```bash -npm install @azure/event-hubs +npm install @azure/event-hubs@next ``` ## Get connection string & Event Hubs name diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts index 5b1ae4c4c328..7f65b9fe04a5 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -1,45 +1,41 @@ import { EventHubClient, ReceivedEventData, - EventPosition, delay, EventProcessor, PartitionContext, InMemoryPartitionManager, - CheckpointManager + PartitionProcessor, + CloseReason } from "@azure/event-hubs"; -class SimplePartitionProcessor { - private _context: PartitionContext; - private _checkpointManager: CheckpointManager; - constructor(context: PartitionContext, checkpointManager: CheckpointManager) { - this._context = context; - this._checkpointManager = checkpointManager; - } - async processEvents(events: ReceivedEventData[]) { - if(events.length === 0){ +class SamplePartitionProcessor extends PartitionProcessor { + private _messageCount = 0; + + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + if (events.length === 0) { return; } for (const event of events) { console.log( - "Received event: '%s' from partition: '%s' and consumer group: '%s'", - event.body, - this._context.partitionId, - this._context.consumerGroupName + `Received event: '${event.body}' from partition: '${partitionContext.partitionId}' and consumer group: '${partitionContext.consumerGroupName}'`, + ); + this._messageCount++; + } + + try { + // checkpoint using the last event in the batch + await partitionContext.updateCheckpoint(events[events.length - 1]); + + console.log( + "Successfully checkpointed event: '%s' from partition: '%s'", + events[events.length - 1].body, + partitionContext.partitionId + ); + } catch (err) { + console.log( + `Encountered an error while checkpointing on ${partitionContext.partitionId}: ${err.message}` ); - try { - // checkpoint using the last event in the batch - await this._checkpointManager.updateCheckpoint(events[events.length - 1]); - console.log( - "Successfully checkpointed event: '%s' from partition: '%s'", - events[events.length - 1].body, - this._context.partitionId - ); - } catch (err) { - console.log( - `Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}` - ); - } } } @@ -47,12 +43,13 @@ class SimplePartitionProcessor { console.log(`Encountered an error: ${error.message}`); } - async initialize() { - console.log(`Started processing`); + async initialize(partitionContext: PartitionContext) { + console.log(`Started processing partition: ${partitionContext.partitionId}`); } - async close() { - console.log(`Stopped processing`); + async close(reason: CloseReason, partitionContext: PartitionContext) { + console.log(`Stopped processing for reason ${reason}`); + console.log(`Processed ${this._messageCount} from partition ${partitionContext.partitionId}.`); } } @@ -63,17 +60,12 @@ const eventHubName = ""; async function main() { const client = new EventHubClient(connectionString, eventHubName); - const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => { - return new SimplePartitionProcessor(context, checkpoint); - }; - const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - eventProcessorFactory, + SamplePartitionProcessor, new InMemoryPartitionManager(), { - initialEventPosition: EventPosition.earliest(), maxBatchSize: 10, maxWaitTimeInSeconds: 20 } diff --git a/sdk/eventhub/event-hubs/src/checkpointManager.ts b/sdk/eventhub/event-hubs/src/checkpointManager.ts deleted file mode 100644 index 04183217aedd..000000000000 --- a/sdk/eventhub/event-hubs/src/checkpointManager.ts +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -import { PartitionContext } from "./partitionContext"; -import { ReceivedEventData } from "./eventData"; -import { PartitionManager } from "./eventProcessor"; - -/** - * A checkpoint is meant to represent the last successfully processed event by the user from a particular - * partition of a consumer group in an Event Hub instance. - * - * When the `updateCheckpoint()` method on the `CheckpointManager` class is called by the user, a - * `Checkpoint` is created internally. It is then stored in the storage solution implemented by the - * `PartitionManager` chosen by the user when creating an `EventProcessor`. - * - * Users are never expected to interact with `Checkpoint` directly. This interface exists to support the - * internal workings of `EventProcessor` and `PartitionManager`. - **/ -export interface Checkpoint { - /** - * @property The event hub name - */ - eventHubName: string; - /** - * @property The consumer group name - */ - consumerGroupName: string; - /** - * @property The unique identifier of the event processor. - */ - ownerId: string; - /** - * @property The identifier of the Event Hub partition - */ - partitionId: string; - /** - * @property The sequence number of the event. - */ - sequenceNumber: number; - /** - * @property The offset of the event. - */ - offset: number; - /** - * @property The unique identifier for the operation. - */ - eTag: string; -} - -/** - * `EventProcessor` class instantiates this class for each partition it is processing and passes it to - * the user code. The user never has to instantiate this class directly, but is responsible to call the - * `updateCheckpoint()` method on it when required to update a checkpoint. - * - * A checkpoint is meant to represent the last successfully processed event by the user from a particular - * partition of a consumer group in an Event Hub instance. - */ -export class CheckpointManager { - private _partitionContext: PartitionContext; - private _partitionManager: PartitionManager; - private _eventProcessorId: string; - private _eTag: string; - - /** - * @ignore - * @internal - * - * Creates a new checkpoint manager which is passed to a `PartitionProcessor` to update checkpoints. - * @param partitionContext The partition context providing necessary partition and event hub information for updating - * checkpoints. - * @param partitionManager The `PartitionManager` implementation that will be used to store the checkpoint information. - * @param eventProcessorId The event processor identifier that is responsible for updating checkpoints. - */ - constructor( - partitionContext: PartitionContext, - partitionManager: PartitionManager, - eventProcessorId: string - ) { - this._partitionContext = partitionContext; - this._partitionManager = partitionManager; - this._eventProcessorId = eventProcessorId; - this._eTag = ""; - } - /** - * Updates the checkpoint for the partition associated with the current `CheckpointManager`. - * - * A checkpoint is meant to represent the last successfully processed event by the user from a particular - * partition of a consumer group in an Event Hub instance. - * - * @param eventData The event that you want to update the checkpoint with. - * @return Promise - */ - public async updateCheckpoint(eventData: ReceivedEventData): Promise; - /** - * Updates the checkpoint for the partition associated with the current `CheckpointManager`. - * - * A checkpoint is meant to represent the last successfully processed event by the user from a particular - * partition of a consumer group in an Event Hub instance. - * - * @param sequenceNumber The sequence number of the event that you want to update the checkpoint with. - * @param offset The offset of the event that you want to update the checkpoint with. - * @return Promise. - */ - public async updateCheckpoint(sequenceNumber: number, offset: number): Promise; - - public async updateCheckpoint( - eventDataOrSequenceNumber: ReceivedEventData | number, - offset?: number - ): Promise { - const checkpoint: Checkpoint = { - eventHubName: this._partitionContext.eventHubName, - consumerGroupName: this._partitionContext.consumerGroupName, - ownerId: this._eventProcessorId, - partitionId: this._partitionContext.partitionId, - sequenceNumber: - typeof eventDataOrSequenceNumber === "number" - ? eventDataOrSequenceNumber - : eventDataOrSequenceNumber.sequenceNumber, - offset: - typeof eventDataOrSequenceNumber === "number" ? offset! : eventDataOrSequenceNumber.offset, - eTag: this._eTag - }; - - this._eTag = await this._partitionManager.updateCheckpoint(checkpoint); - } -} diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 58c9c0ce0978..bd3178e10388 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -4,14 +4,13 @@ import uuid from "uuid/v4"; import { EventHubClient } from "./eventHubClient"; import { EventPosition } from "./eventPosition"; -import { PartitionContext } from "./partitionContext"; -import { CheckpointManager, Checkpoint } from "./checkpointManager"; -import { ReceivedEventData } from "./eventData"; +import { PartitionContext, Checkpoint } from "./partitionContext"; import { PumpManager } from "./pumpManager"; import { AbortController, AbortSignalLike } from "@azure/abort-controller"; import * as log from "./log"; import { PartitionLoadBalancer } from "./partitionLoadBalancer"; import { delay } from "@azure/core-amqp"; +import { PartitionProcessor } from "./partitionProcessor"; /** * An enum representing the different reasons for an `EventProcessor` to stop processing @@ -32,47 +31,6 @@ export enum CloseReason { Shutdown = "Shutdown" } -/** - * An interface to be implemented by the user in order to be used by the `EventProcessor` via - * the `PartitionProcessorFactory` to process events from a partition in a consumer group of an Event Hub instance. - * - * The interface supports methods that are called at various points of the lifecyle of processing events - * from a partition like initialize, error and close. - * - */ -export interface PartitionProcessor { - /** - * This method is called when the `EventProcessor` takes ownership of a new partition and before any - * events are received. - * - * @return {void} - */ - initialize?(): Promise; - /** - * This method is called before the partition processor is closed by the EventProcessor. - * - * @param closeReason The reason for closing this partition processor. - * @return {void} - */ - close?(reason: CloseReason): Promise; - /** - * This method is called when new events are received. - * - * This is also a good place to update checkpoints as appropriate. - * - * @param eventData The received events to be processed. - * @return {void} - */ - processEvents(events: ReceivedEventData[]): Promise; - /** - * This method is called when an error occurs while receiving events from Event Hub. - * - * @param error The error to be processed. - * @return {void} - */ - processError(error: Error): Promise; -} - /** * An interface representing the details on which instance of a `EventProcessor` owns processing * of a given partition from a consumer group of an Event Hub instance. @@ -119,28 +77,6 @@ export interface PartitionOwnership { eTag?: string; } -/** - * `PartitionProcessorFactory` is the interface for the function to be implemented by the user and passed - * to the constructor of `EventProcessor`. - * This function acts as a factory that should return an object that implements the `PartitionProcessor`. - * - * An instance of `EventProcessor` calls this factory each time it begins processing a new partition - * from a consumer group of an Event Hub instance. - */ -export interface PartitionProcessorFactory { - /** - * Factory method to create a new instance of `PartitionProcessor` for a partition. - * - * @param partitionContext The partition context containing partition and Event Hub information. The new instance of - * `PartitionProcessor` created by this method will be responsible for processing events only for this - * partition. - * @param checkpointManager The checkpoint manager for updating checkpoints when events are processed by `PartitionProcessor`. - * - * @return A new instance of `PartitionProcessor` responsible for processing events. - */ - (context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor; -} - /** * A Partition manager stores and retrieves partition ownership information and checkpoint details * for each partition in a given consumer group of an event hub instance. @@ -182,13 +118,13 @@ export interface PartitionManager { } /** - * A set of options to pass to the constructor of `EventProcessor` +* A set of options to pass to the constructor of `EventProcessor`. + * You can specify + * - `maxBatchSize`: The max size of the batch of events passed each time to user code for processing. + * - `maxWaitTimeInSeconds`: The maximum amount of time to wait to build up the requested message count before + * passing the data to user code for processing. If not provided, it defaults to 60 seconds. */ export interface EventProcessorOptions { - /** - * @property The position from where to start processing events. - */ - initialEventPosition?: EventPosition; /** * The max size of the batch of events passed each time to user code for processing. */ @@ -202,37 +138,28 @@ export interface EventProcessorOptions { /** * Event Processor based application consists of one or more instances of EventProcessor which have been - * configured to consume events from the same Event Hub and consumer group. Event Processors balance the - * workload across different instances and track progress when events are processed. - * - * `EventProcessor` is a high level construct that - * - uses an `EventHubClient` to receive events from multiple partitions in a consumer group of - * an Event Hub instance. - * - uses a factory method implemented by the user to create new processors for each partition. - * These processors hold user code to process events. - * - provides the ability to checkpoint and load balance across multiple instances of itself - * using the `PartitionManager`. + * configured to consume events from the same Event Hub and consumer group. They balance the + * workload across different instances by distributing the partitions to be processed among themselves. + * They also allow the user to track progress when events are processed using checkpoints. * * A checkpoint is meant to represent the last successfully processed event by the user from a particular * partition of a consumer group in an Event Hub instance. * - * By setting up multiple instances of the `EventProcessor` over different machines, the partitions will be distributed - * for processing among the different instances. This achieves load balancing. - * * You need the below to create an instance of `EventProcessor` * - The name of the consumer group from which you want to process events - * - An instance of `EventHubClient` that was created for the Event Hub instance. - * - A factory method that can return an object that implements the `PartitionProcessor` interface. - * This method should be implemented by the user. For example: - * (context, checkpointManager) => { - * return { - * processEvents: (events) => { - * // user code here - * // use the context to get information on the partition - * // use the checkpointManager to update checkpoints if needed - * } + * - An instance of `EventHubClient` class that was created for the Event Hub instance. + * - A user implemented class that extends the `PartitionProcessor` class. To get started, you can use the + * base class `PartitionProcessor` which simply logs the incoming events. To provide your code to process incoming + * events, extend this class and override the `processEvents()` method. For example: + * ``` + * class SamplePartitionProcessor extends PartitionProcessor { + * processEvents: (events, partitionContext) => { + * // user code to process events here + * // use `partitionContext` property to get information on the partition + * // use `partitionContext.updateCheckpoint()` method to update checkpoints as needed * } * } + * ``` * - An instance of `PartitionManager`. To get started, you can pass an instance of `InMemoryPartitionManager`. * For production, choose an implementation that will store checkpoints and partition ownership details to a durable store. * @@ -241,7 +168,7 @@ export interface EventProcessorOptions { export class EventProcessor { private _consumerGroupName: string; private _eventHubClient: EventHubClient; - private _partitionProcessorFactory: PartitionProcessorFactory; + private _partitionProcessorClass: typeof PartitionProcessor; private _processorOptions: EventProcessorOptions; private _pumpManager: PumpManager; private _id: string = uuid(); @@ -266,7 +193,7 @@ export class EventProcessor { constructor( consumerGroupName: string, eventHubClient: EventHubClient, - partitionProcessorFactory: PartitionProcessorFactory, + PartitionProcessorClass: typeof PartitionProcessor, partitionManager: PartitionManager, options?: EventProcessorOptions ) { @@ -274,10 +201,10 @@ export class EventProcessor { this._consumerGroupName = consumerGroupName; this._eventHubClient = eventHubClient; - this._partitionProcessorFactory = partitionProcessorFactory; + this._partitionProcessorClass = PartitionProcessorClass; this._partitionManager = partitionManager; this._processorOptions = options; - this._pumpManager = new PumpManager(this._id, options); + this._pumpManager = new PumpManager(this._id, this._processorOptions); const inactiveTimeLimitInMS = 60000; // ownership expiration time (1 mintue) this._partitionLoadBalancer = new PartitionLoadBalancer(this._id, inactiveTimeLimitInMS); } @@ -331,14 +258,11 @@ export class EventProcessor { log.partitionLoadBalancer( `[${this._id}] Successfully claimed ownership of partition ${partitionIdToClaim}.` ); - const partitionContext: PartitionContext = { - consumerGroupName: this._consumerGroupName, - eventHubName: this._eventHubClient.eventHubName, - partitionId: ownershipRequest.partitionId - }; - const checkpointManager = new CheckpointManager( - partitionContext, + const partitionContext = new PartitionContext( + this._eventHubClient.eventHubName, + this._consumerGroupName, + ownershipRequest.partitionId, this._partitionManager, this._id ); @@ -346,14 +270,11 @@ export class EventProcessor { log.partitionLoadBalancer( `[${this._id}] [${partitionIdToClaim}] Calling user-provided PartitionProcessorFactory.` ); - const partitionProcessor = this._partitionProcessorFactory( - partitionContext, - checkpointManager - ); + const partitionProcessor = new this._partitionProcessorClass(); const eventPosition = ownershipRequest.sequenceNumber ? EventPosition.fromSequenceNumber(ownershipRequest.sequenceNumber) - : this._processorOptions.initialEventPosition || EventPosition.earliest(); + : EventPosition.earliest(); await this._pumpManager.createPump( this._eventHubClient, diff --git a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts index 1c01e3d4b25e..af5046fc7abf 100644 --- a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts +++ b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts @@ -2,7 +2,7 @@ // Licensed under the MIT License. import { PartitionManager, PartitionOwnership } from "./eventProcessor"; -import { Checkpoint } from "./checkpointManager"; +import { Checkpoint } from "./partitionContext"; import { generate_uuid } from "rhea-promise"; /** diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index d095a28defcb..c612b33c1a42 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -20,19 +20,16 @@ export { PartitionProperties, EventHubProperties } from "./managementClient"; export { EventHubProducer } from "./sender"; export { EventHubConsumer, EventIteratorOptions } from "./receiver"; export { EventDataBatch } from "./eventDataBatch"; -export { CheckpointManager } from "./checkpointManager"; export { EventProcessor, CloseReason, EventProcessorOptions, - PartitionProcessor, PartitionManager, - PartitionProcessorFactory, PartitionOwnership } from "./eventProcessor"; -export { PartitionContext } from "./partitionContext"; +export { PartitionContext, Checkpoint } from "./partitionContext"; export { InMemoryPartitionManager } from "./inMemoryPartitionManager"; -export { Checkpoint } from "./checkpointManager"; +export { PartitionProcessor } from "./partitionProcessor"; export { MessagingError, DataTransformer, diff --git a/sdk/eventhub/event-hubs/src/partitionContext.ts b/sdk/eventhub/event-hubs/src/partitionContext.ts index e862cea5d063..720f406fa58d 100644 --- a/sdk/eventhub/event-hubs/src/partitionContext.ts +++ b/sdk/eventhub/event-hubs/src/partitionContext.ts @@ -1,27 +1,144 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +import { ReceivedEventData } from "./eventData"; +import { PartitionManager } from "./eventProcessor"; + /** - * `PartitionContext` holds information on the partition, consumer group and event hub + * A checkpoint is meant to represent the last successfully processed event by the user from a particular + * partition of a consumer group in an Event Hub instance. + * + * When the `updateCheckpoint()` method on the `PartitionContext` class is called by the user, a + * `Checkpoint` is created internally. It is then stored in the storage solution implemented by the + * `PartitionManager` chosen by the user when creating an `EventProcessor`. + * + * Users are never expected to interact with `Checkpoint` directly. This interface exists to support the + * internal workings of `EventProcessor` and `PartitionManager`. + **/ +export interface Checkpoint { + /** + * @property The event hub name + */ + eventHubName: string; + /** + * @property The consumer group name + */ + consumerGroupName: string; + /** + * @property The unique identifier of the event processor. + */ + ownerId: string; + /** + * @property The identifier of the Event Hub partition + */ + partitionId: string; + /** + * @property The sequence number of the event. + */ + sequenceNumber: number; + /** + * @property The offset of the event. + */ + offset: number; + /** + * @property The unique identifier for the operation. + */ + eTag: string; +} + +/** + * `PartitionContext` holds information on the partition, consumer group and event hub * being processed by the `EventProcessor`. - * + * It also allows users to update checkpoints via the `updateCheckpoint` method. + * * User is never meant to create `PartitionContext` directly. It is only passed to user code * by the `EventProcessor`. */ -export interface PartitionContext { +export class PartitionContext { + private _partitionManager: PartitionManager; + private _consumerGroupName: string; + private _eventHubName: string; + private _eventProcessorId: string; + private _partitionId: string; + private _eTag: string = ""; + + constructor( + eventHubName: string, + consumerGroupName: string, + partitionId: string, + partitionManager: PartitionManager, + eventProcessorId: string + ) { + this._eventHubName = eventHubName; + this._consumerGroupName = consumerGroupName; + this._partitionId = partitionId; + this._partitionManager = partitionManager; + this._eventProcessorId = eventProcessorId; + } + /** - * @property The identifier of the Event Hub partition - * @readonly + * @property The consumer group name + * @readonly */ - readonly partitionId: string; - /** + get consumerGroupName() { + return this._consumerGroupName; + } + + /** * @property The event hub name - * @readonly + * @readonly */ - readonly eventHubName: string; - /** - * @property The consumer group name - * @readonly + get eventHubName() { + return this._eventHubName; + } + + /** + * @property The identifier of the Event Hub partition + * @readonly */ - readonly consumerGroupName: string; + get partitionId() { + return this._partitionId; + } + + /** + * Updates the checkpoint for the partition associated with the `PartitionContext`. + * + * A checkpoint is meant to represent the last successfully processed event by the user from a particular + * partition of a consumer group in an Event Hub instance. + * + * @param eventData The event that you want to update the checkpoint with. + * @return Promise + */ + public async updateCheckpoint(eventData: ReceivedEventData): Promise; + /** + * Updates the checkpoint for the partition associated with the `PartitionContext`. + * + * A checkpoint is meant to represent the last successfully processed event by the user from a particular + * partition of a consumer group in an Event Hub instance. + * + * @param sequenceNumber The sequence number of the event that you want to update the checkpoint with. + * @param offset The offset of the event that you want to update the checkpoint with. + * @return Promise. + */ + public async updateCheckpoint(sequenceNumber: number, offset: number): Promise; + public async updateCheckpoint( + eventDataOrSequenceNumber: ReceivedEventData | number, + offset?: number + ): Promise { + const checkpoint: Checkpoint = { + eventHubName: this._eventHubName, + consumerGroupName: this._consumerGroupName, + ownerId: this._eventProcessorId, + partitionId: this._partitionId, + sequenceNumber: + typeof eventDataOrSequenceNumber === "number" + ? eventDataOrSequenceNumber + : eventDataOrSequenceNumber.sequenceNumber, + offset: + typeof offset === "number" ? offset : (eventDataOrSequenceNumber as ReceivedEventData).offset, + eTag: this._eTag + }; + + this._eTag = await this._partitionManager.updateCheckpoint(checkpoint); + } } diff --git a/sdk/eventhub/event-hubs/src/partitionProcessor.ts b/sdk/eventhub/event-hubs/src/partitionProcessor.ts new file mode 100644 index 000000000000..9c3a71d01bf5 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/partitionProcessor.ts @@ -0,0 +1,65 @@ +import { PartitionContext } from "./partitionContext"; +import { CloseReason } from "./eventProcessor"; +import { ReceivedEventData } from "./eventData"; + +/** + * The `PartitionProcessor` is responsible for processing events received from Event Hubs when using `EventProcessor` + * + * The EventProcessor creates a new instance of the PartitionProcessor for each partition of the event hub it starts processing. When you extend the `PartitionProcessor` in order to customize it as you see fit, + * - Override the `processEvents()` method to add the code to process the received events. This is also a good place to update the checkpoints using the `updateCheckpoint()` method + * - Optionally override the `processError()` method to handle any error that might have occurred when processing the events. + * - Optionally override the `initialize()` method to implement any set up related tasks you would want to carry out before starting to receive events from the partition + * - Optionally override the `close()` method to implement any tear down or clean up tasks you would want to carry out. + */ +export class PartitionProcessor { + /** + * This method is called when the `EventProcessor` takes ownership of a new partition and before any + * events are received. + * + * @param partitionContext An object that provides information specific to the partition being processed. + * Call the `updateCheckpoint` method to update and store the checkpoint for this partition. + * This object will have properties like the `partitionId`, `eventHubName` and `consumerGroupName`. + * @return {Promise} + */ + async initialize(partitionContext: PartitionContext): Promise {} + + /** + * This method is called before the partition processor is closed by the EventProcessor. + * + * @param reason The reason for closing this partition processor. + * @param partitionContext An object that provides information specific to the partition being processed. + * Call the `updateCheckpoint` method to update and store the checkpoint for this partition. + * This object will have properties like the `partitionId`, `eventHubName` and `consumerGroupName`. + * @return {Promise} + */ + async close(reason: CloseReason, partitionContext: PartitionContext): Promise {} + + /** + * This method is called when new events are received. + * + * This is also a good place to update checkpoints as appropriate. + * + * @param events The received events to be processed. + * @param partitionContext An object that provides information specific to the partition being processed. + * Call the `updateCheckpoint` method to update and store the checkpoint for this partition. + * This object will have properties like the `partitionId`, `eventHubName` and `consumerGroupName`. + * @return {Promise} + */ + async processEvents( + events: ReceivedEventData[], + partitionContext: PartitionContext + ): Promise { + console.log(JSON.stringify(events)); + } + + /** + * This method is called when an error occurs while receiving events from Event Hubs. + * + * @param error The error to be processed. + * @param partitionContext An object that provides information specific to the partition being processed. + * Call the `updateCheckpoint` method to update and store the checkpoint for this partition. + * This object will have properties like the `partitionId`, `eventHubName` and `consumerGroupName`. + * @return {Promise} + */ + async processError(error: Error, partitionContext: PartitionContext): Promise {} +} diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index dcd7e83f3645..b9b8121e5004 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -2,10 +2,11 @@ // Licensed under the MIT License. import * as log from "./log"; -import { EventProcessorOptions, PartitionProcessor, CloseReason } from "./eventProcessor"; +import { EventProcessorOptions, CloseReason } from "./eventProcessor"; import { PartitionContext } from "./partitionContext"; import { EventHubClient } from "./eventHubClient"; import { EventPosition } from "./eventPosition"; +import { PartitionProcessor } from "./partitionProcessor"; import { EventHubConsumer } from "./receiver"; import { AbortController } from "@azure/abort-controller"; import { MessagingError } from "@azure/core-amqp"; @@ -16,6 +17,7 @@ export class PartitionPump { private _partitionProcessor: PartitionProcessor; private _processorOptions: EventProcessorOptions; private _receiver: EventHubConsumer | undefined; + private _initialEventPosition: EventPosition; private _isReceiving: boolean = false; private _abortController: AbortController; @@ -23,12 +25,14 @@ export class PartitionPump { eventHubClient: EventHubClient, partitionContext: PartitionContext, partitionProcessor: PartitionProcessor, + initialEventPosition: EventPosition, options?: EventProcessorOptions ) { if (!options) options = {}; this._eventHubClient = eventHubClient; this._partitionContext = partitionContext; this._partitionProcessor = partitionProcessor; + this._initialEventPosition = initialEventPosition; this._processorOptions = options; this._abortController = new AbortController(); } @@ -39,12 +43,10 @@ export class PartitionPump { async start(): Promise { this._isReceiving = true; - if (typeof this._partitionProcessor.initialize === "function") { - try { - await this._partitionProcessor.initialize(); - } catch { - // swallow the error from the user-defined code - } + try { + await this._partitionProcessor.initialize(this._partitionContext); + } catch { + // swallow the error from the user-defined code } this._receiveEvents(this._partitionContext.partitionId); log.partitionPump("Successfully started the receiver."); @@ -54,7 +56,7 @@ export class PartitionPump { this._receiver = this._eventHubClient.createConsumer( this._partitionContext.consumerGroupName, partitionId, - this._processorOptions.initialEventPosition || EventPosition.earliest(), + this._initialEventPosition, { ownerLevel: 0 } ); @@ -69,7 +71,7 @@ export class PartitionPump { if (!this._isReceiving) { return; } - await this._partitionProcessor.processEvents(receivedEvents); + await this._partitionProcessor.processEvents(receivedEvents, this._partitionContext); } catch (err) { // check if this pump is still receiving // it may not be if the EventProcessor was stopped during processEvents @@ -80,7 +82,7 @@ export class PartitionPump { // forward error to user's processError and swallow errors they may throw try { - await this._partitionProcessor.processError(err); + await this._partitionProcessor.processError(err, this._partitionContext); } catch (err) { log.error("An error was thrown by user's processError method: ", err); } @@ -113,9 +115,7 @@ export class PartitionPump { await this._receiver.close(); } this._abortController.abort(); - if (typeof this._partitionProcessor.close === "function") { - await this._partitionProcessor.close(reason); - } + await this._partitionProcessor.close(reason, this._partitionContext); } catch (err) { log.error("An error occurred while closing the receiver.", err); throw err; diff --git a/sdk/eventhub/event-hubs/src/pumpManager.ts b/sdk/eventhub/event-hubs/src/pumpManager.ts index cdcc9a7ebe10..ec6a41925784 100644 --- a/sdk/eventhub/event-hubs/src/pumpManager.ts +++ b/sdk/eventhub/event-hubs/src/pumpManager.ts @@ -4,7 +4,8 @@ import { EventHubClient } from "./eventHubClient"; import { PartitionContext } from "./partitionContext"; import { EventPosition } from "./eventPosition"; -import { PartitionProcessor, EventProcessorOptions, CloseReason } from "./eventProcessor"; +import { EventProcessorOptions, CloseReason } from "./eventProcessor"; +import { PartitionProcessor } from "./partitionProcessor"; import { PartitionPump } from "./partitionPump"; import * as log from "./log"; @@ -73,10 +74,13 @@ export class PumpManager { log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`); - const pump = new PartitionPump(eventHubClient, partitionContext, partitionProcessor, { - ...this._options, - initialEventPosition - }); + const pump = new PartitionPump( + eventHubClient, + partitionContext, + partitionProcessor, + initialEventPosition, + this._options + ); try { await pump.start(); diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 0c137c773b7a..4d93ea6d64f9 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -8,7 +8,6 @@ chai.use(chaiAsPromised); import debugModule from "debug"; const debug = debugModule("azure:event-hubs:partitionPump"); import { - EventPosition, EventHubClient, EventData, EventProcessor, @@ -17,9 +16,9 @@ import { InMemoryPartitionManager, PartitionOwnership, Checkpoint, - PartitionProcessorFactory, CloseReason, - ReceivedEventData + ReceivedEventData, + PartitionProcessor } from "../src"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { generate_uuid, Dictionary } from "rhea-promise"; @@ -30,7 +29,7 @@ describe("Event Processor", function(): void { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] }; - const client: EventHubClient = new EventHubClient(service.connectionString, service.path); + let client: EventHubClient; before("validate environment", async function(): Promise { should.exist( env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], @@ -42,26 +41,20 @@ describe("Event Processor", function(): void { ); }); - after("close the connection", async function(): Promise { + beforeEach("create the client", function() { + client = new EventHubClient(service.connectionString, service.path); + }); + + afterEach("close the connection", async function(): Promise { await client.close(); }); it("should expose an id", async function(): Promise { - const factory: PartitionProcessorFactory = (context) => { - return { - async processEvents() {}, - async processError() {} - }; - }; - const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + PartitionProcessor, + new InMemoryPartitionManager() ); const id = processor.id; @@ -83,33 +76,28 @@ describe("Event Processor", function(): void { const partitionOwnerShip = new Set(); // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async initialize() { - partitionResultsMap.get(context.partitionId)!.initialized = true; - }, - async close(reason) { - partitionResultsMap.get(context.partitionId)!.closeReason = reason; - }, - async processEvents(events) { - partitionOwnerShip.add(context.partitionId); - const existingEvents = partitionResultsMap.get(context.partitionId)!.events; - events.forEach((event) => existingEvents.push(event.body)); - }, - async processError() { - didError = true; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async initialize(partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.initialized = true; + } + async close(reason: CloseReason, partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.closeReason = reason; + } + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnerShip.add(partitionContext.partitionId); + const existingEvents = partitionResultsMap.get(partitionContext.partitionId)!.events; + events.forEach((event) => existingEvents.push(event.body)); + } + async processError() { + didError = true; + } + } const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + FooPartitionProcessor, + new InMemoryPartitionManager() ); processor.start(); @@ -125,7 +113,22 @@ describe("Event Processor", function(): void { } while (partitionOwnerShip.size !== partitionIds.length) { - await delay(5000); + await delay(1000); + } + + // wait until all partitions have received at least 1 event + while (true) { + const emptyPartition = []; + for (const results of partitionResultsMap.values()) { + if (!results.events.length) { + emptyPartition.push(results); + } + } + if (emptyPartition.length) { + await delay(100); + } else { + break; + } } // shutdown the processor await processor.stop(); @@ -135,8 +138,7 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; const events = results.events; - events.length.should.equal(1); - events[0].should.equal(expectedMessagePrefix + partitionId); + events.length.should.gte(1); results.initialized.should.be.true; (results.closeReason === CloseReason.Shutdown).should.be.true; } @@ -145,32 +147,26 @@ describe("Event Processor", function(): void { it("should not throw if stop is called without start", async function(): Promise { let didPartitionProcessorStart = false; - // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async initialize() { - didPartitionProcessorStart = true; - }, - async close() { - didPartitionProcessorStart = true; - }, - async processEvents(events) { - didPartitionProcessorStart = true; - }, - async processError() { - didPartitionProcessorStart = true; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async initialize() { + didPartitionProcessorStart = true; + } + async close() { + didPartitionProcessorStart = true; + } + async processEvents() { + didPartitionProcessorStart = true; + } + async processError() { + didPartitionProcessorStart = true; + } + } const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + FooPartitionProcessor, + new InMemoryPartitionManager() ); // shutdown the processor @@ -194,33 +190,28 @@ describe("Event Processor", function(): void { let didError = false; // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async initialize() { - partitionResultsMap.get(context.partitionId)!.initialized = true; - }, - async close(reason) { - partitionResultsMap.get(context.partitionId)!.closeReason = reason; - }, - async processEvents(events) { - partitionOwnerShip.add(context.partitionId); - const existingEvents = partitionResultsMap.get(context.partitionId)!.events; - events.forEach((event) => existingEvents.push(event.body)); - }, - async processError() { - didError = true; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async initialize(partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.initialized = true; + } + async close(reason: CloseReason, partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.closeReason = reason; + } + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnerShip.add(partitionContext.partitionId); + const existingEvents = partitionResultsMap.get(partitionContext.partitionId)!.events; + events.forEach((event) => existingEvents.push(event.body)); + } + async processError() { + didError = true; + } + } const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + FooPartitionProcessor, + new InMemoryPartitionManager() ); processor.start(); @@ -235,7 +226,22 @@ describe("Event Processor", function(): void { // set a delay to give a consumers a chance to receive a message while (partitionOwnerShip.size !== partitionIds.length) { - await delay(5000); + await delay(1000); + } + + // wait until all partitions have received at least 1 event + while (true) { + const emptyPartition = []; + for (const results of partitionResultsMap.values()) { + if (!results.events.length) { + emptyPartition.push(results); + } + } + if (emptyPartition.length) { + await delay(100); + } else { + break; + } } // shutdown the processor @@ -246,8 +252,7 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; const events = results.events; - events.length.should.equal(1); - events[0].should.equal(expectedMessagePrefix + partitionId); + events.length.should.gte(1); results.initialized.should.be.true; (results.closeReason === CloseReason.Shutdown).should.be.true; // reset fields @@ -264,7 +269,22 @@ describe("Event Processor", function(): void { // set a delay to give a consumers a chance to receive a message while (partitionOwnerShip.size !== partitionIds.length) { - await delay(5000); + await delay(1000); + } + + // wait until all partitions have received at least 1 event + while (true) { + const emptyPartition = []; + for (const results of partitionResultsMap.values()) { + if (!results.events.length) { + emptyPartition.push(results); + } + } + if (emptyPartition.length) { + await delay(100); + } else { + break; + } } await processor.stop(); @@ -296,33 +316,28 @@ describe("Event Processor", function(): void { let didError = false; // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async initialize() { - partitionResultsMap.get(context.partitionId)!.initialized = true; - }, - async close(reason) { - partitionResultsMap.get(context.partitionId)!.closeReason = reason; - }, - async processEvents(events) { - partitionOwnerShip.add(context.partitionId); - const existingEvents = partitionResultsMap.get(context.partitionId)!.events; - events.forEach((event) => existingEvents.push(event.body)); - }, - async processError() { - didError = true; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async initialize(partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.initialized = true; + } + async close(reason: CloseReason, partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.closeReason = reason; + } + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnerShip.add(partitionContext.partitionId); + const existingEvents = partitionResultsMap.get(partitionContext.partitionId)!.events; + events.forEach((event) => existingEvents.push(event.body)); + } + async processError() { + didError = true; + } + } const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + FooPartitionProcessor, + new InMemoryPartitionManager() ); processor.start(); @@ -337,7 +352,22 @@ describe("Event Processor", function(): void { // set a delay to give a consumers a chance to receive a message while (partitionOwnerShip.size !== partitionIds.length) { - await delay(5000); + await delay(1000); + } + + // wait until all partitions have received at least 1 event + while (true) { + const emptyPartition = []; + for (const results of partitionResultsMap.values()) { + if (!results.events.length) { + emptyPartition.push(results); + } + } + if (emptyPartition.length) { + await delay(100); + } else { + break; + } } // shutdown the processor @@ -348,8 +378,7 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; const events = results.events; - events.length.should.equal(1); - events[0].should.equal(expectedMessagePrefix + partitionId); + events.length.should.gte(1); results.initialized.should.be.true; (results.closeReason === CloseReason.Shutdown).should.be.true; } @@ -369,27 +398,22 @@ describe("Event Processor", function(): void { let didError = false; // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async processEvents(events) { - partitionOwnerShip.add(context.partitionId); - const existingEvents = partitionResultsMap.get(context.partitionId)!; - events.forEach((event) => existingEvents.push(event.body)); - }, - async processError() { - didError = true; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnerShip.add(partitionContext.partitionId); + const existingEvents = partitionResultsMap.get(partitionContext.partitionId)!; + events.forEach((event) => existingEvents.push(event.body)); + } + async processError() { + didError = true; + } + } const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + FooPartitionProcessor, + new InMemoryPartitionManager() ); processor.start(); @@ -404,7 +428,22 @@ describe("Event Processor", function(): void { // set a delay to give a consumers a chance to receive a message while (partitionOwnerShip.size !== partitionIds.length) { - await delay(5000); + await delay(1000); + } + + // wait until all partitions have received at least 1 event + while (true) { + const emptyPartition = []; + for (const results of partitionResultsMap.values()) { + if (!results.length) { + emptyPartition.push(results); + } + } + if (emptyPartition.length) { + await delay(100); + } else { + break; + } } // shutdown the processor @@ -414,8 +453,7 @@ describe("Event Processor", function(): void { // validate correct events captured for each partition for (const partitionId of partitionIds) { const events = partitionResultsMap.get(partitionId)!; - events.length.should.equal(1); - events[0].should.equal(expectedMessagePrefix + partitionId); + events.length.should.gte(1); } }); @@ -429,7 +467,7 @@ describe("Event Processor", function(): void { isinitializeCalled = true; debug(`Started processing`); } - async processEvents(events: EventData[]) { + async processEvents(events: ReceivedEventData[]) { for (const event of events) { receivedEvents.push(event); debug("Received event", event.body); @@ -446,18 +484,12 @@ describe("Event Processor", function(): void { debug(`Stopped processing`); } } - const eventProcessorFactory = (context: PartitionContext) => { - return new SimpleEventProcessor(); - }; const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - eventProcessorFactory, - new InMemoryPartitionManager(), - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + SimpleEventProcessor, + new InMemoryPartitionManager() ); processor.start(); @@ -466,14 +498,14 @@ describe("Event Processor", function(): void { await producer.close(); while (receivedEvents.length === 0) { - await delay(5000); + await delay(1000); } + await processor.stop(); didError.should.be.false; isinitializeCalled.should.equal(true); - receivedEvents.length.should.equal(1); - receivedEvents[0].body.should.equal("Hello world!!!"); + receivedEvents.length.should.gte(1); isCloseCalled.should.equal(true); }); }); @@ -540,37 +572,36 @@ describe("Event Processor", function(): void { let partitionOwnerShip = new Set(); let partionCount: { [x: string]: number } = {}; - const factory: PartitionProcessorFactory = (context, checkpointManager) => { - return { - async processEvents(events: ReceivedEventData[]) { - partitionOwnerShip.add(context.partitionId); - !partionCount[context.partitionId] - ? (partionCount[context.partitionId] = 1) - : partionCount[context.partitionId]++; - const existingEvents = checkpointMap.get(context.partitionId)!; - for (const event of events) { - debug("Received event: '%s' from partition: '%s'", event.body, context.partitionId); - if (partionCount[context.partitionId] <= 50) { - await checkpointManager.updateCheckpoint(event); - existingEvents.push(event); - } + class FooPartitionProcessor extends PartitionProcessor { + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnerShip.add(partitionContext.partitionId); + !partionCount[partitionContext.partitionId] + ? (partionCount[partitionContext.partitionId] = 1) + : partionCount[partitionContext.partitionId]++; + const existingEvents = checkpointMap.get(partitionContext.partitionId)!; + for (const event of events) { + debug( + "Received event: '%s' from partition: '%s'", + event.body, + partitionContext.partitionId + ); + if (partionCount[partitionContext.partitionId] <= 50) { + await partitionContext.updateCheckpoint(event); + existingEvents.push(event); } - }, - async processError() { - didError = true; } - }; - }; + } + async processError() { + didError = true; + } + } const inMemoryPartitionManager = new InMemoryPartitionManager(); const processor1 = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - inMemoryPartitionManager, - { - initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) - } + FooPartitionProcessor, + inMemoryPartitionManager ); // start first processor @@ -613,7 +644,7 @@ describe("Event Processor", function(): void { const processor2 = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, + FooPartitionProcessor, inMemoryPartitionManager ); // start second processor @@ -648,7 +679,7 @@ describe("Event Processor", function(): void { }); describe("Load balancing", function(): void { - before("validate partitions", async function(): Promise { + beforeEach("validate partitions", async function(): Promise { const partitionIds = await client.getPartitionIds(); // ensure we have at least 3 partitions partitionIds.length.should.gte( @@ -660,7 +691,6 @@ describe("Event Processor", function(): void { it("should 'steal' partitions until all the processors have reached a steady-state", async function(): Promise< void > { - const now = Date.now(); const processorByName: Dictionary = {}; const partitionManager = new InMemoryPartitionManager(); const partitionIds = await client.getPartitionIds(); @@ -675,27 +705,25 @@ describe("Event Processor", function(): void { let errorName = ""; // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async initialize() { - partitionResultsMap.get(context.partitionId)!.initialized = true; - }, - async close(reason) { - partitionResultsMap.get(context.partitionId)!.closeReason = reason; - }, - async processEvents(events) { - partitionOwnershipArr.add(context.partitionId); - const existingEvents = partitionResultsMap.get(context.partitionId)!.events; - events.forEach((event) => { - existingEvents.push(event.body); - }); - }, - async processError(err) { - didError = true; - errorName = err.name; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async initialize(partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.initialized = true; + } + async close(reason: CloseReason, partitionContext: PartitionContext) { + partitionResultsMap.get(partitionContext.partitionId)!.closeReason = reason; + } + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnershipArr.add(partitionContext.partitionId); + const existingEvents = partitionResultsMap.get(partitionContext.partitionId)!.events; + events.forEach((event) => { + existingEvents.push(event.body); + }); + } + async processError(err: Error) { + didError = true; + errorName = err.name; + } + } // create messages const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; @@ -708,11 +736,8 @@ describe("Event Processor", function(): void { processorByName[`processor-1`] = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - partitionManager, - { - initialEventPosition: EventPosition.fromEnqueuedTime(now) - } + FooPartitionProcessor, + partitionManager ); processorByName[`processor-1`].start(); @@ -724,11 +749,8 @@ describe("Event Processor", function(): void { processorByName[`processor-2`] = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - partitionManager, - { - initialEventPosition: EventPosition.fromEnqueuedTime(now) - } + FooPartitionProcessor, + partitionManager ); partitionOwnershipArr.size.should.equal(partitionIds.length); @@ -766,8 +788,7 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; - const events = results.events; - events[0].should.equal(expectedMessagePrefix + partitionId); + results.events.length.should.be.gte(1); results.initialized.should.be.true; (results.closeReason === CloseReason.Shutdown).should.be.true; } @@ -776,7 +797,6 @@ describe("Event Processor", function(): void { it("should ensure that all the processors reach a steady-state where all partitions are being processed", async function(): Promise< void > { - const now = Date.now(); const processorByName: Dictionary = {}; const partitionIds = await client.getPartitionIds(); const partitionManager = new InMemoryPartitionManager(); @@ -784,16 +804,14 @@ describe("Event Processor", function(): void { let didError = false; // The partitionProcess will need to add events to the partitionResultsMap as they are received - const factory: PartitionProcessorFactory = (context) => { - return { - async processEvents(events) { - partitionOwnershipArr.add(context.partitionId); - }, - async processError() { - didError = true; - } - }; - }; + class FooPartitionProcessor extends PartitionProcessor { + async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) { + partitionOwnershipArr.add(partitionContext.partitionId); + } + async processError() { + didError = true; + } + } // create messages const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; @@ -808,11 +826,8 @@ describe("Event Processor", function(): void { processorByName[processorName] = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, - factory, - partitionManager, - { - initialEventPosition: EventPosition.fromEnqueuedTime(now) - } + FooPartitionProcessor, + partitionManager ); processorByName[processorName].start(); await delay(12000);