diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 2df4c943a29e..0423821806b9 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -146,6 +146,16 @@ export class EventPosition { sequenceNumber?: number; } +// @public +export class EventProcessor { + // Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts + // Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts + // Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts + constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions); + start(): Promise; + stop(): Promise; +} + export { MessagingError } // @public @@ -154,6 +164,16 @@ export type OnError = (error: MessagingError | Error) => void; // @public export type OnMessage = (eventData: ReceivedEventData) => void; +// @public +export interface PartitionContext { + // (undocumented) + readonly consumerGroupName: string; + // (undocumented) + readonly eventHubName: string; + // (undocumented) + readonly partitionId: string; +} + // @public export interface PartitionProperties { beginningSequenceNumber: number; diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts new file mode 100644 index 000000000000..9015fe7cf592 --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -0,0 +1,71 @@ +import { + EventHubClient, + EventData, + EventPosition, + delay, + EventProcessor, + PartitionContext +} from "@azure/event-hubs"; + +class SimplePartitionProcessor { + private _context: PartitionContext; + constructor(context: PartitionContext) { + this._context = context; + } + async processEvents(events: EventData[]) { + for (const event of events) { + console.log( + "Received event: '%s' from partition: '%s' and consumer group: '%s'", + event.body, + this._context.partitionId, + this._context.consumerGroupName + ); + } + } + + async processError(error: Error) { + console.log(`Encountered an error: ${error.message}`); + } + + async initialize() { + console.log(`Started processing`); + } + + async close() { + console.log(`Stopped processing`); + } +} + +// Define connection string and related Event Hubs entity name here +const connectionString = ""; +const eventHubName = ""; + +async function main() { + const client = new EventHubClient(connectionString, eventHubName); + + const eventProcessorFactory = (context: PartitionContext) => { + return new SimplePartitionProcessor(context); + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + eventProcessorFactory, + "partitionManager" as any, + { + initialEventPosition: EventPosition.earliest(), + maxBatchSize: 10, + maxWaitTimeInSeconds: 20 + } + ); + await processor.start(); + // after 2 seconds, stop processing + await delay(2000); + + await processor.stop(); + await client.close(); +} + +main().catch((err) => { + console.log("Error occurred: ", err); +}); diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 8a9ebc04ca6b..285fb43b1043 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -6,6 +6,7 @@ import { EventPosition } from "./eventPosition"; import { PartitionContext } from "./partitionContext"; import { CheckpointManager, Checkpoint } from "./checkpointManager"; import { EventData } from "./eventData"; +import { PartitionPump } from "./partitionPump"; export interface PartitionProcessor { /** @@ -66,7 +67,7 @@ export interface PartitionManager { export interface EventProcessorOptions { initialEventPosition?: EventPosition; maxBatchSize?: number; - maxWaitTime?: number; + maxWaitTimeInSeconds?: number; } /** @@ -74,13 +75,26 @@ export interface EventProcessorOptions { * @class EventProcessorHost */ export class EventProcessor { + private _consumerGroupName: string; + private _eventHubClient: EventHubClient; + private _partitionProcessorFactory: PartitionProcessorFactory; + private _processorOptions: EventProcessorOptions; + private _partitionPump?: PartitionPump; + constructor( consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions - ) {} + ) { + if (!options) options = {}; + + this._consumerGroupName = consumerGroupName; + this._eventHubClient = eventHubClient; + this._partitionProcessorFactory = partitionProcessorFactory; + this._processorOptions = options; + } /** * Starts the event processor, fetching the list of partitions, and attempting to grab leases @@ -89,11 +103,46 @@ export class EventProcessor { * * @return {Promise} */ - async start(): Promise {} + async start(): Promise { + const partitionIds = await this._eventHubClient.getPartitionIds(); + const partitionContext: PartitionContext = { + partitionId: partitionIds[0], + consumerGroupName: this._consumerGroupName, + eventHubName: this._eventHubClient.eventHubName + }; + const partitionProcessor = this._partitionProcessorFactory( + partitionContext, + new CheckpointManager() + ); + if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") { + throw new TypeError("'initialize' must be of type 'function'."); + } + if (typeof partitionProcessor.processEvents !== "function") { + throw new TypeError("'processEvents' is required and must be of type 'function'."); + } + if (typeof partitionProcessor.processError !== "function") { + throw new TypeError("'processError' is required and must be of type 'function'."); + } + if (partitionProcessor.close && typeof partitionProcessor.close !== "function") { + throw new TypeError("'close' must be of type 'function'."); + } + + this._partitionPump = new PartitionPump( + this._eventHubClient, + partitionContext, + partitionProcessor, + this._processorOptions + ); + await this._partitionPump.start(partitionIds[0]); + } /** * Stops the EventProcessor from processing messages. * @return {Promise} */ - async stop(): Promise {} + async stop(): Promise { + if (this._partitionPump) { + await this._partitionPump.stop("Stopped processing"); + } + } } diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index fed523ce7598..e00a632b0fd2 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -21,6 +21,8 @@ export { PartitionProperties, EventHubProperties } from "./managementClient"; export { EventHubProducer } from "./sender"; export { EventHubConsumer, EventIteratorOptions } from "./receiver"; export { EventDataBatch } from "./eventDataBatch"; +export { EventProcessor } from "./eventProcessor"; +export { PartitionContext } from "./partitionContext"; export { MessagingError, DataTransformer, diff --git a/sdk/eventhub/event-hubs/src/log.ts b/sdk/eventhub/event-hubs/src/log.ts index 3c0b0d91fc61..6f8610aeceb0 100644 --- a/sdk/eventhub/event-hubs/src/log.ts +++ b/sdk/eventhub/event-hubs/src/log.ts @@ -53,3 +53,8 @@ export const client = debugModule("azure:event-hubs:client"); * log statements for iothub client */ export const iotClient = debugModule("azure:event-hubs:iothubClient"); +/** + * @ignore + * log statements for partitionManager + */ +export const partitionPump = debugModule("azure:event-hubs:partitionPump"); diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts new file mode 100644 index 000000000000..cd55e47949e6 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as log from "./log"; +import { EventProcessorOptions, PartitionProcessor } from "./eventProcessor"; +import { PartitionContext } from "./partitionContext"; +import { EventHubClient } from "./eventHubClient"; +import { EventPosition } from "./eventPosition"; +import { EventHubConsumer } from "./receiver"; +import { AbortController } from "@azure/abort-controller"; + +export class PartitionPump { + private _partitionContext: PartitionContext; + private _eventHubClient: EventHubClient; + private _partitionProcessor: PartitionProcessor; + private _processorOptions: EventProcessorOptions; + private _receiver: EventHubConsumer | undefined; + private _isReceiving: boolean = false; + private _abortController: AbortController; + + constructor( + eventHubClient: EventHubClient, + partitionContext: PartitionContext, + partitionProcessor: PartitionProcessor, + options?: EventProcessorOptions + ) { + if (!options) options = {}; + this._eventHubClient = eventHubClient; + this._partitionContext = partitionContext; + this._partitionProcessor = partitionProcessor; + this._processorOptions = options; + this._abortController = new AbortController(); + } + + async start(partitionId: string): Promise { + if (this._partitionProcessor.initialize) { + await this._partitionProcessor.initialize(); + } + this._receiveEvents(partitionId); + log.partitionPump("Successfully started the receiver."); + } + + private async _receiveEvents(partitionId: string): Promise { + this._isReceiving = true; + try { + this._receiver = await this._eventHubClient.createConsumer( + this._partitionContext.consumerGroupName, + partitionId, + this._processorOptions.initialEventPosition || EventPosition.earliest() + ); + + while (this._isReceiving) { + const receivedEvents = await this._receiver.receiveBatch( + this._processorOptions.maxBatchSize || 1, + this._processorOptions.maxWaitTimeInSeconds, + this._abortController.signal + ); + await this._partitionProcessor.processEvents(receivedEvents); + } + } catch (err) { + this._isReceiving = false; + try { + if (this._receiver) { + await this._receiver.close(); + } + await this._partitionProcessor.processError(err); + log.error("An error occurred while receiving events.", err); + } catch (err) { + log.error("An error occurred while closing the receiver", err); + } + } + } + + async stop(reason: string): Promise { + this._isReceiving = false; + try { + if (this._receiver) { + await this._receiver.close(); + } + this._abortController.abort(); + if (this._partitionProcessor.close) { + await this._partitionProcessor.close(reason); + } + } catch (err) { + log.error("An error occurred while closing the receiver.", err); + throw err; + } + } +} diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts new file mode 100644 index 000000000000..2fdb84605c45 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import chai from "chai"; +const should = chai.should(); +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +import debugModule from "debug"; +const debug = debugModule("azure:event-hubs:partitionPump"); +import { + EventPosition, + EventHubClient, + EventData, + EventProcessor, + PartitionContext, + delay +} from "../src"; +import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +const env = getEnvVars(); + +describe("Event Processor", function(): void { + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME] + }; + const client: EventHubClient = new EventHubClient(service.connectionString, service.path); + before("validate environment", async function(): Promise { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + after("close the connection", async function(): Promise { + await client.close(); + }); + + describe("Partition processor", function(): void { + it("should call methods on a PartitionProcessor ", async function(): Promise { + const receivedEvents: EventData[] = []; + let isinitializeCalled = false; + let isCloseCalled = false; + class SimpleEventProcessor { + async initialize() { + isinitializeCalled = true; + debug(`Started processing`); + } + async processEvents(events: EventData[]) { + for (const event of events) { + receivedEvents.push(event); + debug("Received event", event.body); + } + } + + async processError(error: Error) { + debug(`Encountered an error: ${error.message}`); + } + + async close() { + isCloseCalled = true; + debug(`Stopped processing`); + } + } + const eventProcessorFactory = (context: PartitionContext) => { + return new SimpleEventProcessor(); + }; + const partitionInfo = await client.getPartitionProperties("0"); + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + eventProcessorFactory, + "partitionManager" as any, + { + initialEventPosition: EventPosition.fromSequenceNumber( + partitionInfo.lastEnqueuedSequenceNumber + ), + maxBatchSize: 1, + maxWaitTimeInSeconds: 5 + } + ); + const producer = client.createProducer({ partitionId: "0" }); + await producer.send({ body: "Hello world!!!" }); + + await processor.start(); + // after 2 seconds, stop processing + await delay(2000); + await processor.stop(); + await producer.close(); + isinitializeCalled.should.equal(true); + receivedEvents.length.should.equal(1); + receivedEvents[0].body.should.equal("Hello world!!!"); + isCloseCalled.should.equal(true); + }); + }); +}).timeout(90000);