Skip to content

Commit

Permalink
[EPH] Call methods on PartitionProcessor while processing single part…
Browse files Browse the repository at this point in the history
…ition (#4467)

* [EPH] Call methods on PartitionProcessor while processing single partition
  • Loading branch information
ShivangiReja authored Jul 30, 2019
1 parent 7d9c275 commit 3d1475c
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 4 deletions.
20 changes: 20 additions & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ export class EventPosition {
sequenceNumber?: number;
}

// @public
export class EventProcessor {
// Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
start(): Promise<void>;
stop(): Promise<void>;
}

export { MessagingError }

// @public
Expand All @@ -154,6 +164,16 @@ export type OnError = (error: MessagingError | Error) => void;
// @public
export type OnMessage = (eventData: ReceivedEventData) => void;

// @public
export interface PartitionContext {
// (undocumented)
readonly consumerGroupName: string;
// (undocumented)
readonly eventHubName: string;
// (undocumented)
readonly partitionId: string;
}

// @public
export interface PartitionProperties {
beginningSequenceNumber: number;
Expand Down
71 changes: 71 additions & 0 deletions sdk/eventhub/event-hubs/samples/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
EventHubClient,
EventData,
EventPosition,
delay,
EventProcessor,
PartitionContext
} from "@azure/event-hubs";

class SimplePartitionProcessor {
private _context: PartitionContext;
constructor(context: PartitionContext) {
this._context = context;
}
async processEvents(events: EventData[]) {
for (const event of events) {
console.log(
"Received event: '%s' from partition: '%s' and consumer group: '%s'",
event.body,
this._context.partitionId,
this._context.consumerGroupName
);
}
}

async processError(error: Error) {
console.log(`Encountered an error: ${error.message}`);
}

async initialize() {
console.log(`Started processing`);
}

async close() {
console.log(`Stopped processing`);
}
}

// Define connection string and related Event Hubs entity name here
const connectionString = "";
const eventHubName = "";

async function main() {
const client = new EventHubClient(connectionString, eventHubName);

const eventProcessorFactory = (context: PartitionContext) => {
return new SimplePartitionProcessor(context);
};

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
"partitionManager" as any,
{
initialEventPosition: EventPosition.earliest(),
maxBatchSize: 10,
maxWaitTimeInSeconds: 20
}
);
await processor.start();
// after 2 seconds, stop processing
await delay(2000);

await processor.stop();
await client.close();
}

main().catch((err) => {
console.log("Error occurred: ", err);
});
57 changes: 53 additions & 4 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EventPosition } from "./eventPosition";
import { PartitionContext } from "./partitionContext";
import { CheckpointManager, Checkpoint } from "./checkpointManager";
import { EventData } from "./eventData";
import { PartitionPump } from "./partitionPump";

export interface PartitionProcessor {
/**
Expand Down Expand Up @@ -66,21 +67,34 @@ export interface PartitionManager {
export interface EventProcessorOptions {
initialEventPosition?: EventPosition;
maxBatchSize?: number;
maxWaitTime?: number;
maxWaitTimeInSeconds?: number;
}

/**
* Describes the Event Processor Host to process events from an EventHub.
* @class EventProcessorHost
*/
export class EventProcessor {
private _consumerGroupName: string;
private _eventHubClient: EventHubClient;
private _partitionProcessorFactory: PartitionProcessorFactory;
private _processorOptions: EventProcessorOptions;
private _partitionPump?: PartitionPump;

constructor(
consumerGroupName: string,
eventHubClient: EventHubClient,
partitionProcessorFactory: PartitionProcessorFactory,
partitionManager: PartitionManager,
options?: EventProcessorOptions
) {}
) {
if (!options) options = {};

this._consumerGroupName = consumerGroupName;
this._eventHubClient = eventHubClient;
this._partitionProcessorFactory = partitionProcessorFactory;
this._processorOptions = options;
}

/**
* Starts the event processor, fetching the list of partitions, and attempting to grab leases
Expand All @@ -89,11 +103,46 @@ export class EventProcessor {
*
* @return {Promise<void>}
*/
async start(): Promise<void> {}
async start(): Promise<void> {
const partitionIds = await this._eventHubClient.getPartitionIds();
const partitionContext: PartitionContext = {
partitionId: partitionIds[0],
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName
};
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
new CheckpointManager()
);
if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") {
throw new TypeError("'initialize' must be of type 'function'.");
}
if (typeof partitionProcessor.processEvents !== "function") {
throw new TypeError("'processEvents' is required and must be of type 'function'.");
}
if (typeof partitionProcessor.processError !== "function") {
throw new TypeError("'processError' is required and must be of type 'function'.");
}
if (partitionProcessor.close && typeof partitionProcessor.close !== "function") {
throw new TypeError("'close' must be of type 'function'.");
}

this._partitionPump = new PartitionPump(
this._eventHubClient,
partitionContext,
partitionProcessor,
this._processorOptions
);
await this._partitionPump.start(partitionIds[0]);
}

/**
* Stops the EventProcessor from processing messages.
* @return {Promise<void>}
*/
async stop(): Promise<void> {}
async stop(): Promise<void> {
if (this._partitionPump) {
await this._partitionPump.stop("Stopped processing");
}
}
}
2 changes: 2 additions & 0 deletions sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export { PartitionProperties, EventHubProperties } from "./managementClient";
export { EventHubProducer } from "./sender";
export { EventHubConsumer, EventIteratorOptions } from "./receiver";
export { EventDataBatch } from "./eventDataBatch";
export { EventProcessor } from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export {
MessagingError,
DataTransformer,
Expand Down
5 changes: 5 additions & 0 deletions sdk/eventhub/event-hubs/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ export const client = debugModule("azure:event-hubs:client");
* log statements for iothub client
*/
export const iotClient = debugModule("azure:event-hubs:iothubClient");
/**
* @ignore
* log statements for partitionManager
*/
export const partitionPump = debugModule("azure:event-hubs:partitionPump");
89 changes: 89 additions & 0 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import * as log from "./log";
import { EventProcessorOptions, PartitionProcessor } from "./eventProcessor";
import { PartitionContext } from "./partitionContext";
import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { EventHubConsumer } from "./receiver";
import { AbortController } from "@azure/abort-controller";

export class PartitionPump {
private _partitionContext: PartitionContext;
private _eventHubClient: EventHubClient;
private _partitionProcessor: PartitionProcessor;
private _processorOptions: EventProcessorOptions;
private _receiver: EventHubConsumer | undefined;
private _isReceiving: boolean = false;
private _abortController: AbortController;

constructor(
eventHubClient: EventHubClient,
partitionContext: PartitionContext,
partitionProcessor: PartitionProcessor,
options?: EventProcessorOptions
) {
if (!options) options = {};
this._eventHubClient = eventHubClient;
this._partitionContext = partitionContext;
this._partitionProcessor = partitionProcessor;
this._processorOptions = options;
this._abortController = new AbortController();
}

async start(partitionId: string): Promise<void> {
if (this._partitionProcessor.initialize) {
await this._partitionProcessor.initialize();
}
this._receiveEvents(partitionId);
log.partitionPump("Successfully started the receiver.");
}

private async _receiveEvents(partitionId: string): Promise<void> {
this._isReceiving = true;
try {
this._receiver = await this._eventHubClient.createConsumer(
this._partitionContext.consumerGroupName,
partitionId,
this._processorOptions.initialEventPosition || EventPosition.earliest()
);

while (this._isReceiving) {
const receivedEvents = await this._receiver.receiveBatch(
this._processorOptions.maxBatchSize || 1,
this._processorOptions.maxWaitTimeInSeconds,
this._abortController.signal
);
await this._partitionProcessor.processEvents(receivedEvents);
}
} catch (err) {
this._isReceiving = false;
try {
if (this._receiver) {
await this._receiver.close();
}
await this._partitionProcessor.processError(err);
log.error("An error occurred while receiving events.", err);
} catch (err) {
log.error("An error occurred while closing the receiver", err);
}
}
}

async stop(reason: string): Promise<void> {
this._isReceiving = false;
try {
if (this._receiver) {
await this._receiver.close();
}
this._abortController.abort();
if (this._partitionProcessor.close) {
await this._partitionProcessor.close(reason);
}
} catch (err) {
log.error("An error occurred while closing the receiver.", err);
throw err;
}
}
}
Loading

0 comments on commit 3d1475c

Please sign in to comment.