Skip to content

Commit

Permalink
[Event Hubs] Updates APIs surrounding event processing and checkpoint…
Browse files Browse the repository at this point in the history
…ing (#4994)
  • Loading branch information
chradek authored Sep 5, 2019
1 parent 19cbead commit c8afde1
Show file tree
Hide file tree
Showing 13 changed files with 542 additions and 571 deletions.
14 changes: 7 additions & 7 deletions sdk/eventhub/event-hubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,25 @@ While load balancing is a feature we will be adding in the next update, you can
example, where we use an [InMemoryPartitionManager](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/inmemorypartitionmanager.html) that does checkpointing in memory.

```javascript
class SimplePartitionProcessor {
class SamplePartitionProcessor extends PartitionProcessor {
// Gets called once before the processing of events from current partition starts.
async initialize() {
async initialize(partitionContext) {
/* your code here */
}

// Gets called for each batch of events that are received.
// You may choose to use the checkpoint manager to update checkpoints.
async processEvents(events) {
async processEvents(events, partitionContext) {
/* your code here */
}

// Gets called for any error when receiving events.
async processError(error) {
async processError(error, partitionContext) {
/* your code here */
}

// Gets called when Event Processor stops processing events for current partition.
async close(reason) {
async close(reason, partitionContext) {
/* your code here */
}
}
Expand All @@ -241,12 +241,12 @@ const client = new EventHubClient("my-connection-string", "my-event-hub");
const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
(partitionContext, checkpointManager) => new SimplePartitionProcessor(),
SamplePartitionProcessor,
new InMemoryPartitionManager()
);
await processor.start();
// At this point, the processor is consuming events from each partition of the Event Hub and
// delegating them to the SimplePartitionProcessor instance created for that partition. This
// delegating them to the SamplePartitionProcessor instance created for that partition. This
// processing takes place in the background and will not block.
//
// In this example, we'll stop processing after five seconds.
Expand Down
36 changes: 11 additions & 25 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ export interface Checkpoint {
sequenceNumber: number;
}

// @public
export class CheckpointManager {
// @internal
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, eventProcessorId: string);
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}

// @public
export enum CloseReason {
EventHubException = "EventHubException",
Expand Down Expand Up @@ -175,19 +167,15 @@ export class EventPosition {

// @public
export class EventProcessor {
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
constructor(consumerGroupName: string, eventHubClient: EventHubClient, PartitionProcessorClass: typeof PartitionProcessor, partitionManager: PartitionManager, options?: EventProcessorOptions);
readonly id: string;
start(): void;
stop(): Promise<void>;
}

// @public (undocumented)
// @public
export interface EventProcessorOptions {
// (undocumented)
initialEventPosition?: EventPosition;
// (undocumented)
maxBatchSize?: number;
// (undocumented)
maxWaitTimeInSeconds?: number;
}

Expand All @@ -207,10 +195,13 @@ export type OnError = (error: MessagingError | Error) => void;
export type OnMessage = (eventData: ReceivedEventData) => void;

// @public
export interface PartitionContext {
export class PartitionContext {
constructor(eventHubName: string, consumerGroupName: string, partitionId: string, partitionManager: PartitionManager, eventProcessorId: string);
readonly consumerGroupName: string;
readonly eventHubName: string;
readonly partitionId: string;
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}

// @public
Expand All @@ -234,16 +225,11 @@ export interface PartitionOwnership {
}

// @public
export interface PartitionProcessor {
close?(reason: CloseReason): Promise<void>;
initialize?(): Promise<void>;
processError(error: Error): Promise<void>;
processEvents(events: ReceivedEventData[]): Promise<void>;
}

// @public
export interface PartitionProcessorFactory {
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor;
export class PartitionProcessor {
close(reason: CloseReason, partitionContext: PartitionContext): Promise<void>;
initialize(partitionContext: PartitionContext): Promise<void>;
processError(error: Error, partitionContext: PartitionContext): Promise<void>;
processEvents(events: ReceivedEventData[], partitionContext: PartitionContext): Promise<void>;
}

// @public
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/samples/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
## Getting started with samples ##

The samples in this folder are for version 3.0.0 and above of this library. If you are using version 2.1.0 or lower, then please use [samples for v2.1.0](https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples) instead
The samples in this folder are for version 5.0.0 and above of this library. If you are using version 2.1.0 or lower, then please use [samples for v2.1.0](https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples) instead

## Install the library

Run the below in your samples folder to install the npm package for Event Hubs library.
```bash
npm install @azure/event-hubs
npm install @azure/event-hubs@next
```

## Get connection string & Event Hubs name
Expand Down
68 changes: 30 additions & 38 deletions sdk/eventhub/event-hubs/samples/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,55 @@
import {
EventHubClient,
ReceivedEventData,
EventPosition,
delay,
EventProcessor,
PartitionContext,
InMemoryPartitionManager,
CheckpointManager
PartitionProcessor,
CloseReason
} from "@azure/event-hubs";

class SimplePartitionProcessor {
private _context: PartitionContext;
private _checkpointManager: CheckpointManager;
constructor(context: PartitionContext, checkpointManager: CheckpointManager) {
this._context = context;
this._checkpointManager = checkpointManager;
}
async processEvents(events: ReceivedEventData[]) {
if(events.length === 0){
class SamplePartitionProcessor extends PartitionProcessor {
private _messageCount = 0;

async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) {
if (events.length === 0) {
return;
}
for (const event of events) {
console.log(
"Received event: '%s' from partition: '%s' and consumer group: '%s'",
event.body,
this._context.partitionId,
this._context.consumerGroupName
`Received event: '${event.body}' from partition: '${partitionContext.partitionId}' and consumer group: '${partitionContext.consumerGroupName}'`,
);
this._messageCount++;
}

try {
// checkpoint using the last event in the batch
await partitionContext.updateCheckpoint(events[events.length - 1]);

console.log(
"Successfully checkpointed event: '%s' from partition: '%s'",
events[events.length - 1].body,
partitionContext.partitionId
);
} catch (err) {
console.log(
`Encountered an error while checkpointing on ${partitionContext.partitionId}: ${err.message}`
);
try {
// checkpoint using the last event in the batch
await this._checkpointManager.updateCheckpoint(events[events.length - 1]);
console.log(
"Successfully checkpointed event: '%s' from partition: '%s'",
events[events.length - 1].body,
this._context.partitionId
);
} catch (err) {
console.log(
`Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}`
);
}
}
}

async processError(error: Error) {
console.log(`Encountered an error: ${error.message}`);
}

async initialize() {
console.log(`Started processing`);
async initialize(partitionContext: PartitionContext) {
console.log(`Started processing partition: ${partitionContext.partitionId}`);
}

async close() {
console.log(`Stopped processing`);
async close(reason: CloseReason, partitionContext: PartitionContext) {
console.log(`Stopped processing for reason ${reason}`);
console.log(`Processed ${this._messageCount} from partition ${partitionContext.partitionId}.`);
}
}

Expand All @@ -63,17 +60,12 @@ const eventHubName = "";
async function main() {
const client = new EventHubClient(connectionString, eventHubName);

const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => {
return new SimplePartitionProcessor(context, checkpoint);
};

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
SamplePartitionProcessor,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.earliest(),
maxBatchSize: 10,
maxWaitTimeInSeconds: 20
}
Expand Down
126 changes: 0 additions & 126 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts

This file was deleted.

Loading

0 comments on commit c8afde1

Please sign in to comment.