From bc9906a34202cb3ff9c118c8ab388a918c93e179 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Fri, 26 Jul 2019 16:18:05 -0700 Subject: [PATCH 01/13] [EPH] Call methods on PartitionProcessor while processing single partition --- .../event-hubs/samples/testEPHSample.ts | 74 +++++++++++++++++++ sdk/eventhub/event-hubs/src/eventProcessor.ts | 42 ++++++++++- sdk/eventhub/event-hubs/src/index.ts | 2 + sdk/eventhub/event-hubs/src/log.ts | 5 ++ sdk/eventhub/event-hubs/src/partitionPump.ts | 71 ++++++++++++++++++ 5 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 sdk/eventhub/event-hubs/samples/testEPHSample.ts create mode 100644 sdk/eventhub/event-hubs/src/partitionPump.ts diff --git a/sdk/eventhub/event-hubs/samples/testEPHSample.ts b/sdk/eventhub/event-hubs/samples/testEPHSample.ts new file mode 100644 index 000000000000..7658d163d86f --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/testEPHSample.ts @@ -0,0 +1,74 @@ +import { + EventHubClient, + EventData, + EventPosition, + delay, + EventProcessor, + PartitionContext +} from "../src"; + +class TestEventProcessor { + constructor(partitionId: string) {} + + async processEvents(events: EventData[]) { + for (const event of events) { + console.log("Receive", event.body); + } + // try { + // // checkpoint using the last event in the batch + // await checkpointContext.checkpoint(events[events.length - 1]); + // } catch (err) { + // console.error(`Encountered an error while checkpointing on: ${err.message}`); + // } + } + + async processError(error: Error) { + console.log(`Encountered an error: ${error.message}`); + } + + async initialize() { + console.log(`Started processing`); + } + + async close() { + console.log(`Stopped processing`); + } +} + +async function main() { + const client = new EventHubClient("connectionString", "eventHubName"); + + const eventProcessorFactory = (context: PartitionContext) => { + return new TestEventProcessor(context.partitionId); + }; + + const eph = new EventProcessor( + "$Default", + client, + eventProcessorFactory, + "partitionManager" as any, + { + initialEventPosition: EventPosition.earliest(), + maxBatchSize: 10, + maxWaitTime: 20 + } + ); + try { + await eph.start(); + } catch (err) { + console.log(`Error encountered while starting EPH: ${err.message}`); + } + // after 10 seconds, stop processing + await delay(2000); + + try { + await eph.stop(); + await client.close(); + } catch (err) { + console.log("Error occurred", err); + } +} + +main().catch((err) => { + console.log("Error occurred: ", err); +}); diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 8a9ebc04ca6b..0b34acc52033 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -6,6 +6,7 @@ import { EventPosition } from "./eventPosition"; import { PartitionContext } from "./partitionContext"; import { CheckpointManager, Checkpoint } from "./checkpointManager"; import { EventData } from "./eventData"; +import { PartitionPump } from "./partitionPump"; export interface PartitionProcessor { /** @@ -74,13 +75,26 @@ export interface EventProcessorOptions { * @class EventProcessorHost */ export class EventProcessor { + private _consumerGroupName: string; + private _eventHubClient: EventHubClient; + private _partitionProcessorFactory: PartitionProcessorFactory; + private _processorOptions: EventProcessorOptions; + private _partitionPump: PartitionPump | undefined; + constructor( consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions - ) {} + ) { + if (!options) options = {}; + + this._consumerGroupName = consumerGroupName; + this._eventHubClient = eventHubClient; + this._partitionProcessorFactory = partitionProcessorFactory; + this._processorOptions = options; + } /** * Starts the event processor, fetching the list of partitions, and attempting to grab leases @@ -89,11 +103,33 @@ export class EventProcessor { * * @return {Promise} */ - async start(): Promise {} + async start(): Promise { + const partitionIds = await this._eventHubClient.getPartitionIds(); + const partitionContext: PartitionContext = { + partitionId: partitionIds[0], + consumerGroupName: this._consumerGroupName, + eventHubName: this._eventHubClient.eventHubName + }; + const partitionProcessor = this._partitionProcessorFactory( + partitionContext, + "checkpointManager" as any + ); + this._partitionPump = new PartitionPump( + this._eventHubClient, + partitionContext, + partitionProcessor, + this._processorOptions + ); + this._partitionPump.start(partitionIds[0]); + } /** * Stops the EventProcessor from processing messages. * @return {Promise} */ - async stop(): Promise {} + async stop(): Promise { + if (this._partitionPump) { + this._partitionPump.stop(); + } + } } diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index fed523ce7598..e00a632b0fd2 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -21,6 +21,8 @@ export { PartitionProperties, EventHubProperties } from "./managementClient"; export { EventHubProducer } from "./sender"; export { EventHubConsumer, EventIteratorOptions } from "./receiver"; export { EventDataBatch } from "./eventDataBatch"; +export { EventProcessor } from "./eventProcessor"; +export { PartitionContext } from "./partitionContext"; export { MessagingError, DataTransformer, diff --git a/sdk/eventhub/event-hubs/src/log.ts b/sdk/eventhub/event-hubs/src/log.ts index 3c0b0d91fc61..6f8610aeceb0 100644 --- a/sdk/eventhub/event-hubs/src/log.ts +++ b/sdk/eventhub/event-hubs/src/log.ts @@ -53,3 +53,8 @@ export const client = debugModule("azure:event-hubs:client"); * log statements for iothub client */ export const iotClient = debugModule("azure:event-hubs:iothubClient"); +/** + * @ignore + * log statements for partitionManager + */ +export const partitionPump = debugModule("azure:event-hubs:partitionPump"); diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts new file mode 100644 index 000000000000..de4615cf350d --- /dev/null +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as log from "./log"; +import { EventProcessorOptions, PartitionProcessor } from "./eventProcessor"; +import { PartitionContext } from "./partitionContext"; +import { EventHubClient } from "./eventHubClient"; +import { EventPosition } from "./eventPosition"; +import { EventHubConsumer } from "./receiver"; + +export class PartitionPump { + private _partitionContext: PartitionContext; + private _eventHubClient: EventHubClient; + private _partitionProcessor: PartitionProcessor; + private _processorOptions: EventProcessorOptions; + private _receiver: EventHubConsumer | undefined; + private _isReceiving: boolean = false; + + constructor( + eventHubClient: EventHubClient, + partitionContext: PartitionContext, + partitionProcessor: PartitionProcessor, + options?: EventProcessorOptions + ) { + if (!options) options = {}; + this._eventHubClient = eventHubClient; + this._partitionContext = partitionContext; + this._partitionProcessor = partitionProcessor; + this._processorOptions = options; + } + + async start(partitionId: string): Promise { + await this._partitionProcessor.initialize!(); + await this._receiveEvents(partitionId); + log.partitionPump("Successfully started the receiver."); + } + + private async _receiveEvents(partitionId: string): Promise { + this._isReceiving = true; + this._receiver = await this._eventHubClient.createConsumer( + this._partitionContext.consumerGroupName, + partitionId, + this._processorOptions.initialEventPosition || EventPosition.earliest() + ); + try { + while (this._isReceiving) { + const receivedEvents = await this._receiver.receiveBatch( + this._processorOptions.maxBatchSize!, + this._processorOptions.maxWaitTime + ); + await this._partitionProcessor.processEvents(receivedEvents); + } + } catch (err) { + await this._partitionProcessor.processError(err); + log.partitionPump("An error occurred while receiving events.", err); + } + } + + async stop(): Promise { + if (this._receiver && this._isReceiving) { + this._isReceiving = false; + try { + this._receiver.close(); + await this._partitionProcessor.close!("Stopped processing"); + } catch (err) { + log.partitionPump("An error occurred while closing the receiver.", err); + throw err; + } + } + } +} From e93041b8f5dcfb402b78f31d6a8ceef500de2265 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Fri, 26 Jul 2019 16:28:19 -0700 Subject: [PATCH 02/13] Set default for maxBatchSize --- sdk/eventhub/event-hubs/src/partitionPump.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index de4615cf350d..d2d4fb95c74c 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -45,7 +45,7 @@ export class PartitionPump { try { while (this._isReceiving) { const receivedEvents = await this._receiver.receiveBatch( - this._processorOptions.maxBatchSize!, + this._processorOptions.maxBatchSize || 1, this._processorOptions.maxWaitTime ); await this._partitionProcessor.processEvents(receivedEvents); From 9c93fdd6af8b9277c1fa81a6367c6f8a425c9ce7 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 10:44:30 -0700 Subject: [PATCH 03/13] Close the receiver when error occurs while receiving --- .../event-hubs/review/event-hubs.api.md | 20 +++++++++++++++++++ .../event-hubs/samples/testEPHSample.ts | 14 +++---------- sdk/eventhub/event-hubs/src/partitionPump.ts | 16 ++++++++------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 2df4c943a29e..0423821806b9 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -146,6 +146,16 @@ export class EventPosition { sequenceNumber?: number; } +// @public +export class EventProcessor { + // Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts + // Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts + // Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts + constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions); + start(): Promise; + stop(): Promise; +} + export { MessagingError } // @public @@ -154,6 +164,16 @@ export type OnError = (error: MessagingError | Error) => void; // @public export type OnMessage = (eventData: ReceivedEventData) => void; +// @public +export interface PartitionContext { + // (undocumented) + readonly consumerGroupName: string; + // (undocumented) + readonly eventHubName: string; + // (undocumented) + readonly partitionId: string; +} + // @public export interface PartitionProperties { beginningSequenceNumber: number; diff --git a/sdk/eventhub/event-hubs/samples/testEPHSample.ts b/sdk/eventhub/event-hubs/samples/testEPHSample.ts index 7658d163d86f..6020c074f5c1 100644 --- a/sdk/eventhub/event-hubs/samples/testEPHSample.ts +++ b/sdk/eventhub/event-hubs/samples/testEPHSample.ts @@ -53,20 +53,12 @@ async function main() { maxWaitTime: 20 } ); - try { - await eph.start(); - } catch (err) { - console.log(`Error encountered while starting EPH: ${err.message}`); - } + await eph.start(); // after 10 seconds, stop processing await delay(2000); - try { - await eph.stop(); - await client.close(); - } catch (err) { - console.log("Error occurred", err); - } + await eph.stop(); + await client.close(); } main().catch((err) => { diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index d2d4fb95c74c..425fa06062a7 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -51,21 +51,23 @@ export class PartitionPump { await this._partitionProcessor.processEvents(receivedEvents); } } catch (err) { + this._isReceiving = false; + this._receiver.close(); await this._partitionProcessor.processError(err); log.partitionPump("An error occurred while receiving events.", err); } } async stop(): Promise { - if (this._receiver && this._isReceiving) { - this._isReceiving = false; - try { + this._isReceiving = false; + try { + if (this._receiver) { this._receiver.close(); - await this._partitionProcessor.close!("Stopped processing"); - } catch (err) { - log.partitionPump("An error occurred while closing the receiver.", err); - throw err; } + await this._partitionProcessor.close!("Stopped processing"); + } catch (err) { + log.partitionPump("An error occurred while closing the receiver.", err); + throw err; } } } From bc8b013eb71e6253f2966951e887c67731970cdc Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 11:45:10 -0700 Subject: [PATCH 04/13] Add tests and update sample --- ...testEPHSample.ts => eventProcessorHost.ts} | 29 +++-- .../test/eventProcessorHost.spec.ts | 100 ++++++++++++++++++ 2 files changed, 122 insertions(+), 7 deletions(-) rename sdk/eventhub/event-hubs/samples/{testEPHSample.ts => eventProcessorHost.ts} (57%) create mode 100644 sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts diff --git a/sdk/eventhub/event-hubs/samples/testEPHSample.ts b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts similarity index 57% rename from sdk/eventhub/event-hubs/samples/testEPHSample.ts rename to sdk/eventhub/event-hubs/samples/eventProcessorHost.ts index 6020c074f5c1..950e5dfd0253 100644 --- a/sdk/eventhub/event-hubs/samples/testEPHSample.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts @@ -1,3 +1,16 @@ +/* + Copyright (c) Microsoft Corporation. All rights reserved. + Licensed under the MIT Licence. + + This sample demonstrates how to receive events from multiple partitions. + + If your Event Hubs instance doesn't have any events, then please run "sendEvents.ts" sample + to populate Event Hubs before running this sample. + + Note: If you are using version 2.1.0 or lower of @azure/event-hubs library, then please use the samples at + https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples instead. +*/ + import { EventHubClient, EventData, @@ -5,14 +18,12 @@ import { delay, EventProcessor, PartitionContext -} from "../src"; +} from "@azure/event-hubs"; class TestEventProcessor { - constructor(partitionId: string) {} - async processEvents(events: EventData[]) { for (const event of events) { - console.log("Receive", event.body); + console.log("Received event", event.body); } // try { // // checkpoint using the last event in the batch @@ -35,11 +46,15 @@ class TestEventProcessor { } } +// Define connection string and related Event Hubs entity name here +const connectionString = ""; +const eventHubName = ""; + async function main() { - const client = new EventHubClient("connectionString", "eventHubName"); + const client = new EventHubClient(connectionString, eventHubName); const eventProcessorFactory = (context: PartitionContext) => { - return new TestEventProcessor(context.partitionId); + return new TestEventProcessor(); }; const eph = new EventProcessor( @@ -54,7 +69,7 @@ async function main() { } ); await eph.start(); - // after 10 seconds, stop processing + // after 20 seconds, stop processing await delay(2000); await eph.stop(); diff --git a/sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts new file mode 100644 index 000000000000..700b4026b2f7 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import chai from "chai"; +const should = chai.should(); +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +import debugModule from "debug"; +const debug = debugModule("azure:event-hubs:partitionPump"); +import { + EventPosition, + EventHubClient, + EventData, + EventProcessor, + PartitionContext, + delay +} from "../src"; +import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +const env = getEnvVars(); + +describe("Event processor Host", function(): void { + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME] + }; + const client: EventHubClient = new EventHubClient(service.connectionString, service.path); + before("validate environment", async function(): Promise { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + after("close the connection", async function(): Promise { + await client.close(); + }); + + describe("Partition processor", function(): void { + const receivedEvents: EventData[] = []; + let isinitializeCalled = false; + let isCloseCalled = false; + class TestEventProcessor { + async initialize() { + isinitializeCalled = true; + debug(`Started processing`); + } + async processEvents(events: EventData[]) { + for (const event of events) { + receivedEvents.push(event); + debug("Received event", event.body); + } + } + + async processError(error: Error) { + debug(`Encountered an error: ${error.message}`); + } + + async close() { + isCloseCalled = true; + debug(`Stopped processing`); + } + } + const eventProcessorFactory = (context: PartitionContext) => { + return new TestEventProcessor(); + }; + + it("should call methods on a PartitionProcessor ", async function(): Promise { + const partitionInfo = await client.getPartitionProperties("0"); + const eph = new EventProcessor( + "$Default", + client, + eventProcessorFactory, + "partitionManager" as any, + { + initialEventPosition: EventPosition.fromSequenceNumber( + partitionInfo.lastEnqueuedSequenceNumber + ), + maxBatchSize: 1, + maxWaitTime: 5 + } + ); + const producer = client.createProducer({ partitionId: "0" }); + await producer.send({ body: "Hello world!!!" }); + + await eph.start(); + // after 10 seconds, stop processing + await delay(1000); + await eph.stop(); + await producer.close(); + isinitializeCalled.should.equal(true); + receivedEvents.length.should.equal(1); + receivedEvents[0].body.should.equal("Hello world!!!"); + isCloseCalled.should.equal(true); + }); + }); +}).timeout(90000); From 3224994cbe1305ab0fb89c0b00a025cad57cec33 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 11:48:09 -0700 Subject: [PATCH 05/13] Update delay time --- .../event-hubs/samples/eventProcessorHost.ts | 31 +++++-------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts index 950e5dfd0253..351804ea9b0c 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts @@ -1,16 +1,3 @@ -/* - Copyright (c) Microsoft Corporation. All rights reserved. - Licensed under the MIT Licence. - - This sample demonstrates how to receive events from multiple partitions. - - If your Event Hubs instance doesn't have any events, then please run "sendEvents.ts" sample - to populate Event Hubs before running this sample. - - Note: If you are using version 2.1.0 or lower of @azure/event-hubs library, then please use the samples at - https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples instead. -*/ - import { EventHubClient, EventData, @@ -18,12 +5,14 @@ import { delay, EventProcessor, PartitionContext -} from "@azure/event-hubs"; +} from "../src"; class TestEventProcessor { + constructor(partitionId: string) {} + async processEvents(events: EventData[]) { for (const event of events) { - console.log("Received event", event.body); + console.log("Receive", event.body); } // try { // // checkpoint using the last event in the batch @@ -46,15 +35,11 @@ class TestEventProcessor { } } -// Define connection string and related Event Hubs entity name here -const connectionString = ""; -const eventHubName = ""; - async function main() { - const client = new EventHubClient(connectionString, eventHubName); + const client = new EventHubClient("connectionString", "eventHubName"); const eventProcessorFactory = (context: PartitionContext) => { - return new TestEventProcessor(); + return new TestEventProcessor(context.partitionId); }; const eph = new EventProcessor( @@ -69,8 +54,8 @@ async function main() { } ); await eph.start(); - // after 20 seconds, stop processing - await delay(2000); + // after 10 seconds, stop processing + await delay(10000); await eph.stop(); await client.close(); From e8c0eceb8477021827cbe046671edae73331e131 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 11:55:51 -0700 Subject: [PATCH 06/13] Update sample --- .../event-hubs/samples/eventProcessorHost.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts index 351804ea9b0c..96991266580a 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts @@ -5,11 +5,9 @@ import { delay, EventProcessor, PartitionContext -} from "../src"; - -class TestEventProcessor { - constructor(partitionId: string) {} +} from "@azure/event-hubs"; +class EventProcessorHost { async processEvents(events: EventData[]) { for (const event of events) { console.log("Receive", event.body); @@ -35,11 +33,15 @@ class TestEventProcessor { } } +// Define connection string and related Event Hubs entity name here +const connectionString = ""; +const eventHubName = ""; + async function main() { - const client = new EventHubClient("connectionString", "eventHubName"); + const client = new EventHubClient(connectionString, eventHubName); const eventProcessorFactory = (context: PartitionContext) => { - return new TestEventProcessor(context.partitionId); + return new EventProcessorHost(); }; const eph = new EventProcessor( @@ -54,8 +56,8 @@ async function main() { } ); await eph.start(); - // after 10 seconds, stop processing - await delay(10000); + // after 2 seconds, stop processing + await delay(2000); await eph.stop(); await client.close(); From 711c871e092edb815f8bf9e6f45c331c45f7ae8a Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 11:59:38 -0700 Subject: [PATCH 07/13] Update sample --- sdk/eventhub/event-hubs/samples/eventProcessorHost.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts index 96991266580a..3201496112a4 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts @@ -10,14 +10,8 @@ import { class EventProcessorHost { async processEvents(events: EventData[]) { for (const event of events) { - console.log("Receive", event.body); + console.log("Received event", event.body); } - // try { - // // checkpoint using the last event in the batch - // await checkpointContext.checkpoint(events[events.length - 1]); - // } catch (err) { - // console.error(`Encountered an error while checkpointing on: ${err.message}`); - // } } async processError(error: Error) { From 8b83f64f0942189bec3f05597b32981efdd99bb5 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 13:58:50 -0700 Subject: [PATCH 08/13] Address comments --- ...ventProcessorHost.ts => eventProcessor.ts} | 4 +- sdk/eventhub/event-hubs/src/eventProcessor.ts | 10 ++-- sdk/eventhub/event-hubs/src/partitionPump.ts | 48 +++++++++++----- ...sorHost.spec.ts => eventProcessor.spec.ts} | 57 +++++++++---------- 4 files changed, 69 insertions(+), 50 deletions(-) rename sdk/eventhub/event-hubs/samples/{eventProcessorHost.ts => eventProcessor.ts} (94%) rename sdk/eventhub/event-hubs/test/{eventProcessorHost.spec.ts => eventProcessor.spec.ts} (70%) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts similarity index 94% rename from sdk/eventhub/event-hubs/samples/eventProcessorHost.ts rename to sdk/eventhub/event-hubs/samples/eventProcessor.ts index 3201496112a4..906125e59f86 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessorHost.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -39,14 +39,14 @@ async function main() { }; const eph = new EventProcessor( - "$Default", + EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, "partitionManager" as any, { initialEventPosition: EventPosition.earliest(), maxBatchSize: 10, - maxWaitTime: 20 + maxWaitTimeInSeconds: 20 } ); await eph.start(); diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 0b34acc52033..04f3743f1439 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -67,7 +67,7 @@ export interface PartitionManager { export interface EventProcessorOptions { initialEventPosition?: EventPosition; maxBatchSize?: number; - maxWaitTime?: number; + maxWaitTimeInSeconds?: number; } /** @@ -79,7 +79,7 @@ export class EventProcessor { private _eventHubClient: EventHubClient; private _partitionProcessorFactory: PartitionProcessorFactory; private _processorOptions: EventProcessorOptions; - private _partitionPump: PartitionPump | undefined; + private _partitionPump?: PartitionPump; constructor( consumerGroupName: string, @@ -112,7 +112,7 @@ export class EventProcessor { }; const partitionProcessor = this._partitionProcessorFactory( partitionContext, - "checkpointManager" as any + new CheckpointManager() ); this._partitionPump = new PartitionPump( this._eventHubClient, @@ -120,7 +120,7 @@ export class EventProcessor { partitionProcessor, this._processorOptions ); - this._partitionPump.start(partitionIds[0]); + await this._partitionPump.start(partitionIds[0]); } /** @@ -129,7 +129,7 @@ export class EventProcessor { */ async stop(): Promise { if (this._partitionPump) { - this._partitionPump.stop(); + await this._partitionPump.stop("Stopped processing"); } } } diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 425fa06062a7..e4534e2cc555 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -30,43 +30,63 @@ export class PartitionPump { } async start(partitionId: string): Promise { - await this._partitionProcessor.initialize!(); - await this._receiveEvents(partitionId); + if (typeof this._partitionProcessor.initialize !== "function") { + throw new TypeError("'initialize' must be of type 'function'."); + } + if (this._partitionProcessor.initialize) { + await this._partitionProcessor.initialize(); + } + this._receiveEvents(partitionId); log.partitionPump("Successfully started the receiver."); } private async _receiveEvents(partitionId: string): Promise { this._isReceiving = true; - this._receiver = await this._eventHubClient.createConsumer( - this._partitionContext.consumerGroupName, - partitionId, - this._processorOptions.initialEventPosition || EventPosition.earliest() - ); try { + this._receiver = await this._eventHubClient.createConsumer( + this._partitionContext.consumerGroupName, + partitionId, + this._processorOptions.initialEventPosition || EventPosition.earliest() + ); + while (this._isReceiving) { const receivedEvents = await this._receiver.receiveBatch( this._processorOptions.maxBatchSize || 1, - this._processorOptions.maxWaitTime + this._processorOptions.maxWaitTimeInSeconds ); + if (typeof this._partitionProcessor.processEvents !== "function") { + throw new TypeError("The parameter 'onMessage' must be of type 'function'."); + } await this._partitionProcessor.processEvents(receivedEvents); } } catch (err) { this._isReceiving = false; - this._receiver.close(); - await this._partitionProcessor.processError(err); - log.partitionPump("An error occurred while receiving events.", err); + try { + if (this._receiver) { + this._receiver.close(); + } + await this._partitionProcessor.processError(err); + log.error("An error occurred while receiving events.", err); + } catch (err) { + log.error("An error occurred while closing the receiver", err); + } } } - async stop(): Promise { + async stop(reason: string): Promise { this._isReceiving = false; try { if (this._receiver) { this._receiver.close(); } - await this._partitionProcessor.close!("Stopped processing"); + if (typeof this._partitionProcessor.close !== "function") { + throw new TypeError("'close' must be of type 'function'."); + } + if (this._partitionProcessor.close) { + await this._partitionProcessor.close(reason); + } } catch (err) { - log.partitionPump("An error occurred while closing the receiver.", err); + log.error("An error occurred while closing the receiver.", err); throw err; } } diff --git a/sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts similarity index 70% rename from sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts rename to sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 700b4026b2f7..b57d5159872c 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -18,7 +18,7 @@ import { import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; const env = getEnvVars(); -describe("Event processor Host", function(): void { +describe("Event Processor", function(): void { const service = { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] @@ -40,35 +40,34 @@ describe("Event processor Host", function(): void { }); describe("Partition processor", function(): void { - const receivedEvents: EventData[] = []; - let isinitializeCalled = false; - let isCloseCalled = false; - class TestEventProcessor { - async initialize() { - isinitializeCalled = true; - debug(`Started processing`); - } - async processEvents(events: EventData[]) { - for (const event of events) { - receivedEvents.push(event); - debug("Received event", event.body); + it("should call methods on a PartitionProcessor ", async function(): Promise { + const receivedEvents: EventData[] = []; + let isinitializeCalled = false; + let isCloseCalled = false; + class TestEventProcessor { + async initialize() { + isinitializeCalled = true; + debug(`Started processing`); + } + async processEvents(events: EventData[]) { + for (const event of events) { + receivedEvents.push(event); + debug("Received event", event.body); + } } - } - async processError(error: Error) { - debug(`Encountered an error: ${error.message}`); - } + async processError(error: Error) { + debug(`Encountered an error: ${error.message}`); + } - async close() { - isCloseCalled = true; - debug(`Stopped processing`); + async close() { + isCloseCalled = true; + debug(`Stopped processing`); + } } - } - const eventProcessorFactory = (context: PartitionContext) => { - return new TestEventProcessor(); - }; - - it("should call methods on a PartitionProcessor ", async function(): Promise { + const eventProcessorFactory = (context: PartitionContext) => { + return new TestEventProcessor(); + }; const partitionInfo = await client.getPartitionProperties("0"); const eph = new EventProcessor( "$Default", @@ -80,15 +79,15 @@ describe("Event processor Host", function(): void { partitionInfo.lastEnqueuedSequenceNumber ), maxBatchSize: 1, - maxWaitTime: 5 + maxWaitTimeInSeconds: 5 } ); const producer = client.createProducer({ partitionId: "0" }); await producer.send({ body: "Hello world!!!" }); await eph.start(); - // after 10 seconds, stop processing - await delay(1000); + // after 2 seconds, stop processing + await delay(2000); await eph.stop(); await producer.close(); isinitializeCalled.should.equal(true); From fe5c3e7176ff8830f7afcc13c3b74e492acbaa36 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 14:05:47 -0700 Subject: [PATCH 09/13] Remove check --- sdk/eventhub/event-hubs/src/partitionPump.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index e4534e2cc555..3b801de5921c 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -54,9 +54,6 @@ export class PartitionPump { this._processorOptions.maxBatchSize || 1, this._processorOptions.maxWaitTimeInSeconds ); - if (typeof this._partitionProcessor.processEvents !== "function") { - throw new TypeError("The parameter 'onMessage' must be of type 'function'."); - } await this._partitionProcessor.processEvents(receivedEvents); } } catch (err) { From 37baa5ce6c589f1be3f734fae1519343ccb82208 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 15:43:16 -0700 Subject: [PATCH 10/13] Add abort signal to make sure that the link is closed. --- sdk/eventhub/event-hubs/src/eventProcessor.ts | 13 +++++++++++++ sdk/eventhub/event-hubs/src/partitionPump.ts | 13 ++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 04f3743f1439..285fb43b1043 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -114,6 +114,19 @@ export class EventProcessor { partitionContext, new CheckpointManager() ); + if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") { + throw new TypeError("'initialize' must be of type 'function'."); + } + if (typeof partitionProcessor.processEvents !== "function") { + throw new TypeError("'processEvents' is required and must be of type 'function'."); + } + if (typeof partitionProcessor.processError !== "function") { + throw new TypeError("'processError' is required and must be of type 'function'."); + } + if (partitionProcessor.close && typeof partitionProcessor.close !== "function") { + throw new TypeError("'close' must be of type 'function'."); + } + this._partitionPump = new PartitionPump( this._eventHubClient, partitionContext, diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 3b801de5921c..04e1b7007b9a 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -7,6 +7,7 @@ import { PartitionContext } from "./partitionContext"; import { EventHubClient } from "./eventHubClient"; import { EventPosition } from "./eventPosition"; import { EventHubConsumer } from "./receiver"; +import { AbortController } from "@azure/abort-controller"; export class PartitionPump { private _partitionContext: PartitionContext; @@ -15,6 +16,7 @@ export class PartitionPump { private _processorOptions: EventProcessorOptions; private _receiver: EventHubConsumer | undefined; private _isReceiving: boolean = false; + private _abortController: AbortController; constructor( eventHubClient: EventHubClient, @@ -27,12 +29,10 @@ export class PartitionPump { this._partitionContext = partitionContext; this._partitionProcessor = partitionProcessor; this._processorOptions = options; + this._abortController = new AbortController(); } async start(partitionId: string): Promise { - if (typeof this._partitionProcessor.initialize !== "function") { - throw new TypeError("'initialize' must be of type 'function'."); - } if (this._partitionProcessor.initialize) { await this._partitionProcessor.initialize(); } @@ -52,7 +52,8 @@ export class PartitionPump { while (this._isReceiving) { const receivedEvents = await this._receiver.receiveBatch( this._processorOptions.maxBatchSize || 1, - this._processorOptions.maxWaitTimeInSeconds + this._processorOptions.maxWaitTimeInSeconds, + this._abortController.signal ); await this._partitionProcessor.processEvents(receivedEvents); } @@ -76,9 +77,7 @@ export class PartitionPump { if (this._receiver) { this._receiver.close(); } - if (typeof this._partitionProcessor.close !== "function") { - throw new TypeError("'close' must be of type 'function'."); - } + this._abortController.abort(); if (this._partitionProcessor.close) { await this._partitionProcessor.close(reason); } From 84c8bcdc42f7b176bef1fba00bb7d1589c87a148 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 16:41:35 -0700 Subject: [PATCH 11/13] Add await when close the receiver --- sdk/eventhub/event-hubs/src/partitionPump.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 04e1b7007b9a..cd55e47949e6 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -61,7 +61,7 @@ export class PartitionPump { this._isReceiving = false; try { if (this._receiver) { - this._receiver.close(); + await this._receiver.close(); } await this._partitionProcessor.processError(err); log.error("An error occurred while receiving events.", err); @@ -75,7 +75,7 @@ export class PartitionPump { this._isReceiving = false; try { if (this._receiver) { - this._receiver.close(); + await this._receiver.close(); } this._abortController.abort(); if (this._partitionProcessor.close) { From 315bbd16734d8f8f4a9aae2d607cf91a801c2b9c Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 17:04:25 -0700 Subject: [PATCH 12/13] Update sample --- .../event-hubs/samples/eventProcessor.ts | 20 +++++++++++++------ .../event-hubs/test/eventProcessor.spec.ts | 12 +++++------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts index 906125e59f86..27b36a3109c9 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -7,10 +7,18 @@ import { PartitionContext } from "@azure/event-hubs"; -class EventProcessorHost { +class SimplePartitionProcessor { + private _context: PartitionContext; + constructor(context: PartitionContext) { + this._context = context; + } async processEvents(events: EventData[]) { for (const event of events) { - console.log("Received event", event.body); + console.log( + "Received event: '%s' from partition: '%s'", + event.body, + this._context.partitionId + ); } } @@ -35,10 +43,10 @@ async function main() { const client = new EventHubClient(connectionString, eventHubName); const eventProcessorFactory = (context: PartitionContext) => { - return new EventProcessorHost(); + return new SimplePartitionProcessor(context); }; - const eph = new EventProcessor( + const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, @@ -49,11 +57,11 @@ async function main() { maxWaitTimeInSeconds: 20 } ); - await eph.start(); + await processor.start(); // after 2 seconds, stop processing await delay(2000); - await eph.stop(); + await processor.stop(); await client.close(); } diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index b57d5159872c..2fdb84605c45 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -44,7 +44,7 @@ describe("Event Processor", function(): void { const receivedEvents: EventData[] = []; let isinitializeCalled = false; let isCloseCalled = false; - class TestEventProcessor { + class SimpleEventProcessor { async initialize() { isinitializeCalled = true; debug(`Started processing`); @@ -66,11 +66,11 @@ describe("Event Processor", function(): void { } } const eventProcessorFactory = (context: PartitionContext) => { - return new TestEventProcessor(); + return new SimpleEventProcessor(); }; const partitionInfo = await client.getPartitionProperties("0"); - const eph = new EventProcessor( - "$Default", + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, "partitionManager" as any, @@ -85,10 +85,10 @@ describe("Event Processor", function(): void { const producer = client.createProducer({ partitionId: "0" }); await producer.send({ body: "Hello world!!!" }); - await eph.start(); + await processor.start(); // after 2 seconds, stop processing await delay(2000); - await eph.stop(); + await processor.stop(); await producer.close(); isinitializeCalled.should.equal(true); receivedEvents.length.should.equal(1); From 157a01a43c67105da26e3b09e0a9eebd691ebe25 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Mon, 29 Jul 2019 17:09:14 -0700 Subject: [PATCH 13/13] Add consumer group in console log --- sdk/eventhub/event-hubs/samples/eventProcessor.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts index 27b36a3109c9..9015fe7cf592 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -15,9 +15,10 @@ class SimplePartitionProcessor { async processEvents(events: EventData[]) { for (const event of events) { console.log( - "Received event: '%s' from partition: '%s'", + "Received event: '%s' from partition: '%s' and consumer group: '%s'", event.body, - this._context.partitionId + this._context.partitionId, + this._context.consumerGroupName ); } }