Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Updates APIs surrounding event processing and checkpointing #4994

Merged
merged 6 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions sdk/eventhub/event-hubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,25 @@ While load balancing is a feature we will be adding in the next update, you can
example, where we use an [InMemoryPartitionManager](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/inmemorypartitionmanager.html) that does checkpointing in memory.

```javascript
class SimplePartitionProcessor {
class SamplePartitionProcessor extends PartitionProcessor {
// Gets called once before the processing of events from current partition starts.
async initialize() {
async initialize(partitionContext) {
/* your code here */
}

// Gets called for each batch of events that are received.
// You may choose to use the checkpoint manager to update checkpoints.
async processEvents(events) {
async processEvents(events, partitionContext) {
/* your code here */
}

// Gets called for any error when receiving events.
async processError(error) {
async processError(error, partitionContext) {
/* your code here */
}

// Gets called when Event Processor stops processing events for current partition.
async close(reason) {
async close(reason, partitionContext) {
/* your code here */
}
}
Expand All @@ -241,12 +241,12 @@ const client = new EventHubClient("my-connection-string", "my-event-hub");
const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
(partitionContext, checkpointManager) => new SimplePartitionProcessor(),
SamplePartitionProcessor,
new InMemoryPartitionManager()
);
await processor.start();
// At this point, the processor is consuming events from each partition of the Event Hub and
// delegating them to the SimplePartitionProcessor instance created for that partition. This
// delegating them to the SamplePartitionProcessor instance created for that partition. This
// processing takes place in the background and will not block.
//
// In this example, we'll stop processing after five seconds.
Expand Down
36 changes: 11 additions & 25 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ export interface Checkpoint {
sequenceNumber: number;
}

// @public
export class CheckpointManager {
// @internal
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, eventProcessorId: string);
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}

// @public
export enum CloseReason {
EventHubException = "EventHubException",
Expand Down Expand Up @@ -175,19 +167,15 @@ export class EventPosition {

// @public
export class EventProcessor {
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
constructor(consumerGroupName: string, eventHubClient: EventHubClient, PartitionProcessorClass: typeof PartitionProcessor, partitionManager: PartitionManager, options?: EventProcessorOptions);
readonly id: string;
start(): void;
stop(): Promise<void>;
}

// @public (undocumented)
// @public
export interface EventProcessorOptions {
// (undocumented)
initialEventPosition?: EventPosition;
// (undocumented)
maxBatchSize?: number;
// (undocumented)
maxWaitTimeInSeconds?: number;
}

Expand All @@ -207,10 +195,13 @@ export type OnError = (error: MessagingError | Error) => void;
export type OnMessage = (eventData: ReceivedEventData) => void;

// @public
export interface PartitionContext {
export class PartitionContext {
constructor(eventHubName: string, consumerGroupName: string, partitionId: string, partitionManager: PartitionManager, eventProcessorId: string);
readonly consumerGroupName: string;
readonly eventHubName: string;
readonly partitionId: string;
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}

// @public
Expand All @@ -234,16 +225,11 @@ export interface PartitionOwnership {
}

// @public
export interface PartitionProcessor {
close?(reason: CloseReason): Promise<void>;
initialize?(): Promise<void>;
processError(error: Error): Promise<void>;
processEvents(events: ReceivedEventData[]): Promise<void>;
}

// @public
export interface PartitionProcessorFactory {
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor;
export class PartitionProcessor {
close(reason: CloseReason, partitionContext: PartitionContext): Promise<void>;
initialize(partitionContext: PartitionContext): Promise<void>;
processError(error: Error, partitionContext: PartitionContext): Promise<void>;
processEvents(events: ReceivedEventData[], partitionContext: PartitionContext): Promise<void>;
}

// @public
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/samples/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
## Getting started with samples ##

The samples in this folder are for version 3.0.0 and above of this library. If you are using version 2.1.0 or lower, then please use [samples for v2.1.0](https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples) instead
The samples in this folder are for version 5.0.0 and above of this library. If you are using version 2.1.0 or lower, then please use [samples for v2.1.0](https://github.com/Azure/azure-sdk-for-js/tree/%40azure/event-hubs_2.1.0/sdk/eventhub/event-hubs/samples) instead

## Install the library

Run the below in your samples folder to install the npm package for Event Hubs library.
```bash
npm install @azure/event-hubs
npm install @azure/event-hubs@next
```

## Get connection string & Event Hubs name
Expand Down
46 changes: 18 additions & 28 deletions sdk/eventhub/event-hubs/samples/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,37 @@
import {
EventHubClient,
ReceivedEventData,
EventPosition,
delay,
EventProcessor,
PartitionContext,
InMemoryPartitionManager,
CheckpointManager
PartitionProcessor,
CloseReason
} from "@azure/event-hubs";

class SimplePartitionProcessor {
private _context: PartitionContext;
private _checkpointManager: CheckpointManager;
constructor(context: PartitionContext, checkpointManager: CheckpointManager) {
this._context = context;
this._checkpointManager = checkpointManager;
}
async processEvents(events: ReceivedEventData[]) {
if(events.length === 0){
class SamplePartitionProcessor extends PartitionProcessor {
private _messageCount = 0;

async processEvents(events: ReceivedEventData[], partitionContext: PartitionContext) {
if (events.length === 0) {
return;
}
for (const event of events) {
console.log(
"Received event: '%s' from partition: '%s' and consumer group: '%s'",
event.body,
this._context.partitionId,
this._context.consumerGroupName
`Received event: '${event.body}' from partition: '${partitionContext.partitionId}' and consumer group: '${partitionContext.consumerGroupName}'`,
);
try {
// checkpoint using the last event in the batch
await this._checkpointManager.updateCheckpoint(events[events.length - 1]);
await partitionContext.updateCheckpoint(events[events.length - 1]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we checkpointing inside the for loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I only looked at changing the API to use the new version, missed the fact that it was in the for loop this whole time. I'll update it.

this._messageCount++;
console.log(
"Successfully checkpointed event: '%s' from partition: '%s'",
events[events.length - 1].body,
this._context.partitionId
partitionContext.partitionId
);
} catch (err) {
console.log(
`Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}`
`Encountered an error while checkpointing on ${partitionContext.partitionId}: ${err.message}`
);
}
}
Expand All @@ -47,12 +41,13 @@ class SimplePartitionProcessor {
console.log(`Encountered an error: ${error.message}`);
}

async initialize() {
console.log(`Started processing`);
async initialize(partitionContext: PartitionContext) {
console.log(`Started processing partition: ${partitionContext.partitionId}`);
}

async close() {
console.log(`Stopped processing`);
async close(reason: CloseReason, partitionContext: PartitionContext) {
console.log(`Stopped processing for reason ${reason}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this print out the reason in string or number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prints out the reason as a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I think we should probably change the CloseReason enum to a union of string literals. Then it's easy to get auto-completion, and clear that it's a string.

console.log(`Processed ${this._messageCount} from partition ${partitionContext.partitionId}.`);
}
}

Expand All @@ -63,17 +58,12 @@ const eventHubName = "";
async function main() {
const client = new EventHubClient(connectionString, eventHubName);

const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => {
return new SimplePartitionProcessor(context, checkpoint);
};

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
SamplePartitionProcessor,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.earliest(),
maxBatchSize: 10,
maxWaitTimeInSeconds: 20
}
Expand Down
126 changes: 0 additions & 126 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts

This file was deleted.

Loading