-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Event Hubs] Updates APIs surrounding event processing and checkpointing #4994
Conversation
/azp run js - eventhubs-client - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run js - eventhubs-client - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
); | ||
try { | ||
// checkpoint using the last event in the batch | ||
await this._checkpointManager.updateCheckpoint(events[events.length - 1]); | ||
await partitionContext.updateCheckpoint(events[events.length - 1]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we checkpointing inside the for loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I only looked at changing the API to use the new version, missed the fact that it was in the for loop this whole time. I'll update it.
async close() { | ||
console.log(`Stopped processing`); | ||
async close(reason: CloseReason, partitionContext: PartitionContext) { | ||
console.log(`Stopped processing for reason ${reason}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this print out the reason in string or number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This prints out the reason as a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I think we should probably change the CloseReason
enum to a union of string literals. Then it's easy to get auto-completion, and clear that it's a string.
@@ -266,18 +193,18 @@ export class EventProcessor { | |||
constructor( | |||
consumerGroupName: string, | |||
eventHubClient: EventHubClient, | |||
partitionProcessorFactory: PartitionProcessorFactory, | |||
PartitionProcessorClass: typeof PartitionProcessor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this stand out more, do you think PartitionProcessorClassName
would be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as no one tries to put in the class name as a string :) What about PartitionProcessorConstructor
? Or is that as confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meh, lets keep what you have for now.
* A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
* partition of a consumer group in an Event Hub instance. | ||
* | ||
* When the `updateCheckpoint()` method on the `CheckpointManager` class is called by the user, a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* When the `updateCheckpoint()` method on the `CheckpointManager` class is called by the user, a | |
* When the `updateCheckpoint()` method on the `PartitionContext` class is called by the user, a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
} | ||
|
||
/** | ||
* `PartitionContext` holds information on the partition, consumer group and event hub | ||
* being processed by the `EventProcessor`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* being processed by the `EventProcessor`. | |
* being processed by the `EventProcessor`. It also allows users to update checkpoints via the `updateCheckpoint` method |
? eventDataOrSequenceNumber | ||
: eventDataOrSequenceNumber.sequenceNumber, | ||
offset: | ||
typeof eventDataOrSequenceNumber === "number" ? offset! : eventDataOrSequenceNumber.offset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typeof eventDataOrSequenceNumber === "number" ? offset! : eventDataOrSequenceNumber.offset, | |
typeof offset=== "number" ? offset : eventDataOrSequenceNumber.offset, |
This change:
PartitionProcessor
to be a class instead of an interface. This allows users to subclassPartitionProcessor
so they only need to implement the methods they need.EventProcessor
to accept aPartitionProcessor
class, instead of a factory that returns aPartitionProcessor
.PartitionProcessor
methods to each accept aPartitionContext
. This way no parameters need to be passed to thePartitionProcessor
constructor when the SDK instantiates it.CheckpointManager
and movesupdateCheckpoint
intoPartitionContext
.initialOffsetPosition
from theEventProcessorOptions
.