Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPH] Call methods on PartitionProcessor while processing single partition #4467

Merged
merged 14 commits into from
Jul 30, 2019
Merged
20 changes: 20 additions & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ export class EventPosition {
sequenceNumber?: number;
}

// @public
export class EventProcessor {
// Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
start(): Promise<void>;
stop(): Promise<void>;
}

export { MessagingError }

// @public
Expand All @@ -154,6 +164,16 @@ export type OnError = (error: MessagingError | Error) => void;
// @public
export type OnMessage = (eventData: ReceivedEventData) => void;

// @public
export interface PartitionContext {
// (undocumented)
readonly consumerGroupName: string;
// (undocumented)
readonly eventHubName: string;
// (undocumented)
readonly partitionId: string;
}

// @public
export interface PartitionProperties {
beginningSequenceNumber: number;
Expand Down
62 changes: 62 additions & 0 deletions sdk/eventhub/event-hubs/samples/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import {
EventHubClient,
EventData,
EventPosition,
delay,
EventProcessor,
PartitionContext
} from "@azure/event-hubs";

class EventProcessorHost {
Copy link
Contributor

@ramya-rao-a ramya-rao-a Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are doing quite some naming changes, we should be careful of the terms we use in the samples. Here, I would suggest SimplePartitionProcessor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should add the constructor that stores the partition context and then in processEvents use the partitionId and consumer group name in the console.log()

This way, the user will know how to get the "partition" related info when they process events

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!!

async processEvents(events: EventData[]) {
for (const event of events) {
console.log("Received event", event.body);
}
}

async processError(error: Error) {
console.log(`Encountered an error: ${error.message}`);
}

async initialize() {
console.log(`Started processing`);
}

async close() {
console.log(`Stopped processing`);
}
}

// Define connection string and related Event Hubs entity name here
const connectionString = "";
const eventHubName = "";

async function main() {
const client = new EventHubClient(connectionString, eventHubName);

const eventProcessorFactory = (context: PartitionContext) => {
return new EventProcessorHost();
};

const eph = new EventProcessor(
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
"partitionManager" as any,
{
initialEventPosition: EventPosition.earliest(),
maxBatchSize: 10,
maxWaitTimeInSeconds: 20
}
);
await eph.start();
// after 2 seconds, stop processing
await delay(2000);

await eph.stop();
await client.close();
}

main().catch((err) => {
console.log("Error occurred: ", err);
});
57 changes: 53 additions & 4 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EventPosition } from "./eventPosition";
import { PartitionContext } from "./partitionContext";
import { CheckpointManager, Checkpoint } from "./checkpointManager";
import { EventData } from "./eventData";
import { PartitionPump } from "./partitionPump";

export interface PartitionProcessor {
/**
Expand Down Expand Up @@ -66,21 +67,34 @@ export interface PartitionManager {
export interface EventProcessorOptions {
initialEventPosition?: EventPosition;
maxBatchSize?: number;
maxWaitTime?: number;
maxWaitTimeInSeconds?: number;
}

/**
* Describes the Event Processor Host to process events from an EventHub.
* @class EventProcessorHost
*/
export class EventProcessor {
private _consumerGroupName: string;
private _eventHubClient: EventHubClient;
private _partitionProcessorFactory: PartitionProcessorFactory;
private _processorOptions: EventProcessorOptions;
private _partitionPump?: PartitionPump;

constructor(
consumerGroupName: string,
eventHubClient: EventHubClient,
partitionProcessorFactory: PartitionProcessorFactory,
partitionManager: PartitionManager,
options?: EventProcessorOptions
) {}
) {
if (!options) options = {};

this._consumerGroupName = consumerGroupName;
this._eventHubClient = eventHubClient;
this._partitionProcessorFactory = partitionProcessorFactory;
this._processorOptions = options;
}

/**
* Starts the event processor, fetching the list of partitions, and attempting to grab leases
Expand All @@ -89,11 +103,46 @@ export class EventProcessor {
*
* @return {Promise<void>}
*/
async start(): Promise<void> {}
async start(): Promise<void> {
const partitionIds = await this._eventHubClient.getPartitionIds();
const partitionContext: PartitionContext = {
partitionId: partitionIds[0],
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName
};
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
new CheckpointManager()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt checkpoint manager take partition context and partition manager in its constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't implemented CheckpointManager yet. After the implementation I'll update the sample.

);
if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
throw new TypeError("'initialize' must be of type 'function'.");
}
if (typeof partitionProcessor.processEvents !== "function") {
throw new TypeError("'processEvents' is required and must be of type 'function'.");
}
if (typeof partitionProcessor.processError !== "function") {
throw new TypeError("'processError' is required and must be of type 'function'.");
}
if (partitionProcessor.close && typeof partitionProcessor.close !== "function") {
throw new TypeError("'close' must be of type 'function'.");
}

this._partitionPump = new PartitionPump(
this._eventHubClient,
partitionContext,
partitionProcessor,
this._processorOptions
);
await this._partitionPump.start(partitionIds[0]);
}

/**
* Stops the EventProcessor from processing messages.
* @return {Promise<void>}
*/
async stop(): Promise<void> {}
async stop(): Promise<void> {
if (this._partitionPump) {
await this._partitionPump.stop("Stopped processing");
}
}
}
2 changes: 2 additions & 0 deletions sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export { PartitionProperties, EventHubProperties } from "./managementClient";
export { EventHubProducer } from "./sender";
export { EventHubConsumer, EventIteratorOptions } from "./receiver";
export { EventDataBatch } from "./eventDataBatch";
export { EventProcessor } from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export {
MessagingError,
DataTransformer,
Expand Down
5 changes: 5 additions & 0 deletions sdk/eventhub/event-hubs/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ export const client = debugModule("azure:event-hubs:client");
* log statements for iothub client
*/
export const iotClient = debugModule("azure:event-hubs:iothubClient");
/**
* @ignore
* log statements for partitionManager
*/
export const partitionPump = debugModule("azure:event-hubs:partitionPump");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

89 changes: 89 additions & 0 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import * as log from "./log";
import { EventProcessorOptions, PartitionProcessor } from "./eventProcessor";
import { PartitionContext } from "./partitionContext";
import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { EventHubConsumer } from "./receiver";
import { AbortController } from "@azure/abort-controller";

export class PartitionPump {
private _partitionContext: PartitionContext;
private _eventHubClient: EventHubClient;
private _partitionProcessor: PartitionProcessor;
private _processorOptions: EventProcessorOptions;
private _receiver: EventHubConsumer | undefined;
private _isReceiving: boolean = false;
private _abortController: AbortController;

constructor(
eventHubClient: EventHubClient,
partitionContext: PartitionContext,
partitionProcessor: PartitionProcessor,
options?: EventProcessorOptions
) {
if (!options) options = {};
this._eventHubClient = eventHubClient;
this._partitionContext = partitionContext;
this._partitionProcessor = partitionProcessor;
this._processorOptions = options;
this._abortController = new AbortController();
}

async start(partitionId: string): Promise<void> {
if (this._partitionProcessor.initialize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it would be safer to replace this check with:

Suggested change
if (this._partitionProcessor.initialize) {
if (typeof this._partitionProcessor.initialize === "function") {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same check when we create partitionProcessor.

await this._partitionProcessor.initialize();
}
this._receiveEvents(partitionId);
log.partitionPump("Successfully started the receiver.");
}

private async _receiveEvents(partitionId: string): Promise<void> {
this._isReceiving = true;
try {
this._receiver = await this._eventHubClient.createConsumer(
this._partitionContext.consumerGroupName,
partitionId,
this._processorOptions.initialEventPosition || EventPosition.earliest()
);

while (this._isReceiving) {
const receivedEvents = await this._receiver.receiveBatch(
this._processorOptions.maxBatchSize || 1,
this._processorOptions.maxWaitTimeInSeconds,
this._abortController.signal
);
await this._partitionProcessor.processEvents(receivedEvents);
}
} catch (err) {
this._isReceiving = false;
try {
if (this._receiver) {
await this._receiver.close();
}
await this._partitionProcessor.processError(err);
log.error("An error occurred while receiving events.", err);
} catch (err) {
log.error("An error occurred while closing the receiver", err);
}
}
}

async stop(reason: string): Promise<void> {
this._isReceiving = false;
try {
if (this._receiver) {
await this._receiver.close();
}
this._abortController.abort();
if (this._partitionProcessor.close) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with typeof check:

Suggested change
if (this._partitionProcessor.close) {
if (typeof this._partitionProcessor.close === "function") {

await this._partitionProcessor.close(reason);
}
} catch (err) {
log.error("An error occurred while closing the receiver.", err);
throw err;
}
}
}
99 changes: 99 additions & 0 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import chai from "chai";
const should = chai.should();
import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import debugModule from "debug";
const debug = debugModule("azure:event-hubs:partitionPump");
import {
EventPosition,
EventHubClient,
EventData,
EventProcessor,
PartitionContext,
delay
} from "../src";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
const env = getEnvVars();

describe("Event Processor", function(): void {
const service = {
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
path: env[EnvVarKeys.EVENTHUB_NAME]
};
const client: EventHubClient = new EventHubClient(service.connectionString, service.path);
before("validate environment", async function(): Promise<void> {
should.exist(
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests."
);
should.exist(
env[EnvVarKeys.EVENTHUB_NAME],
"define EVENTHUB_NAME in your environment before running integration tests."
);
});

after("close the connection", async function(): Promise<void> {
await client.close();
});

describe("Partition processor", function(): void {
it("should call methods on a PartitionProcessor ", async function(): Promise<void> {
const receivedEvents: EventData[] = [];
let isinitializeCalled = false;
let isCloseCalled = false;
class TestEventProcessor {
async initialize() {
isinitializeCalled = true;
debug(`Started processing`);
}
async processEvents(events: EventData[]) {
for (const event of events) {
receivedEvents.push(event);
debug("Received event", event.body);
}
}

async processError(error: Error) {
debug(`Encountered an error: ${error.message}`);
}

async close() {
isCloseCalled = true;
debug(`Stopped processing`);
}
}
const eventProcessorFactory = (context: PartitionContext) => {
return new TestEventProcessor();
};
const partitionInfo = await client.getPartitionProperties("0");
const eph = new EventProcessor(
"$Default",
client,
eventProcessorFactory,
"partitionManager" as any,
{
initialEventPosition: EventPosition.fromSequenceNumber(
partitionInfo.lastEnqueuedSequenceNumber
),
maxBatchSize: 1,
maxWaitTimeInSeconds: 5
}
);
const producer = client.createProducer({ partitionId: "0" });
await producer.send({ body: "Hello world!!!" });

await eph.start();
// after 2 seconds, stop processing
await delay(2000);
await eph.stop();
await producer.close();
isinitializeCalled.should.equal(true);
receivedEvents.length.should.equal(1);
receivedEvents[0].body.should.equal("Hello world!!!");
isCloseCalled.should.equal(true);
});
});
}).timeout(90000);