Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPH] Call methods on PartitionProcessor while processing single partition #4467

Merged
merged 14 commits into from
Jul 30, 2019

Conversation

ShivangiReja
Copy link
Member

@ShivangiReja ShivangiReja commented Jul 26, 2019

This change includes a basic working event processor with no load balancing capabilities. It brings up the event processor instance and starts consuming events from single partition. A separate issue will be created to processing multiple partitions from a single EPH instance.

This PR is responsible for:

  • Create a class called PartitionPump which is responsible for

    • creating and maintaining an EventHubConsumer
    • calling methods on a PartitionProcessor as appropriate
      • start(): creates the receiver and begins receiving events in batches for the PartitionProcessor.
      • stop(): closes the receiver and signals to the PartitionProcessor that processing has completed.
  • Added one working sample.

@ShivangiReja ShivangiReja marked this pull request as ready for review July 29, 2019 18:46
@ShivangiReja ShivangiReja requested a review from chradek as a code owner July 29, 2019 18:46
@ShivangiReja ShivangiReja requested a review from ramya-rao-a July 29, 2019 18:57
sdk/eventhub/event-hubs/samples/eventProcessorHost.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/samples/eventProcessorHost.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventProcessor.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventProcessor.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/partitionPump.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/test/eventProcessorHost.spec.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventProcessor.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/partitionPump.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/partitionPump.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventProcessor.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventProcessor.ts Show resolved Hide resolved
* @ignore
* log statements for partitionManager
*/
export const partitionPump = debugModule("azure:event-hubs:partitionPump");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

async start(partitionId: string): Promise<void> {
if (this._partitionProcessor.initialize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it would be safer to replace this check with:

Suggested change
if (this._partitionProcessor.initialize) {
if (typeof this._partitionProcessor.initialize === "function") {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same check when we create partitionProcessor.

this._isReceiving = false;
try {
if (this._receiver) {
this._receiver.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May still need a .catch on this close since it's an async method. It won't get caught in this try/catch unless you await it.

this._receiver.close();
}
this._abortController.abort();
if (this._partitionProcessor.close) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with typeof check:

Suggested change
if (this._partitionProcessor.close) {
if (typeof this._partitionProcessor.close === "function") {

};
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
new CheckpointManager()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt checkpoint manager take partition context and partition manager in its constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't implemented CheckpointManager yet. After the implementation I'll update the sample.

PartitionContext
} from "@azure/event-hubs";

class EventProcessorHost {
Copy link
Contributor

@ramya-rao-a ramya-rao-a Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are doing quite some naming changes, we should be careful of the terms we use in the samples. Here, I would suggest SimplePartitionProcessor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should add the constructor that stores the partition context and then in processEvents use the partitionId and consumer group name in the console.log()

This way, the user will know how to get the "partition" related info when they process events

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants