Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Define classes and interfaces for EPH as per API design #4416

Merged
merged 5 commits into from
Jul 26, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

// import { PartitionContext } from "./partitionContext";
import { EventData } from "./eventData";
// import { PartitionManager } from "./eventProcessor";

/**
* Used by createCheckpoint in PartitionManager
**/
export interface Checkpoint {
eventHubName: string;
consumerGroupName: string;
instanceId: string;
partitionId: string;
sequenceNumber: number;
offset: number;
}

/**
* CheckPointManager is created by the library & passed to user's code to let them create a checkpoint
*/
export class CheckpointManager {
// private _partitionContext: PartitionContext; // for internal use by createCheckpoint
// private _partitionManager: PartitionManager; // for internal use by createCheckpoint

// constructor(partitionContext: PartitionContext, partitionManager: PartitionManager) {
// this._partitionContext = partitionContext;
// this._partitionManager = partitionManager;
// }

public async createCheckpoint(eventData: EventData): Promise<void>;

public async createCheckpoint(offset: string, sequenceNumber: number): Promise<void>;

public async createCheckpoint(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be updateCheckpoint after the latest round of updates

eventDataOrOffset: EventData | string,
sequenceNumber?: number
): Promise<void> {}
}
99 changes: 99 additions & 0 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { PartitionContext } from "./partitionContext";
import { CheckpointManager, Checkpoint } from "./checkpointManager";
import { EventData } from "./eventData";

export interface PartitionProcessor {
/**
* Optional. Called when EPH begins processing a partition.
*/
initialize?(): Promise<void>;
/**
* Optional. Called when EPH stops processing a partition.
* This may occur when control of the partition switches to another EPH or when user stops EPH
* TODO: update string -> CloseReason
*/
close?(reason: string): Promise<void>;
/**
* Called when a batch of events have been received.
*/
processEvents(events: EventData[]): Promise<void>;
/**
* Called when the underlying client experiences an error while receiving.
*/
processError(error: Error): Promise<void>;
}

/**
* used by PartitionManager to claim ownership.
* returned by listOwnerships
*/
export interface PartitionOwnership {
eventHubName: string;
consumerGroupName: string;
instanceId: string;
partitionId: string;
ownerLevel: number;
offset?: number;
sequenceNumber?: number;
lastModifiedTime?: number;
ETag?: string;
}

/**
* The PartitionProcessorFactory is called by EPH whenever a new partition is about to be processed.
*/
export interface PartitionProcessorFactory {
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor;
}

/**
* Interface for the plugin to be passed when creating the EventProcessorHost
* to manage partition ownership and checkpoint creation.
* Deals mainly with read/write to the chosen storage service
*/
export interface PartitionManager {
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

// Options passed when creating EventProcessor, everything is optional
export interface EventProcessorOptions {
initialEventPosition?: EventPosition;
maxBatchSize?: number;
maxWaitTime?: number;
}

/**
* Describes the Event Processor Host to process events from an EventHub.
* @class EventProcessorHost
*/
export class EventProcessor {
constructor(
consumerGroupName: string,
eventHubClient: EventHubClient,
partitionProcessorFactory: PartitionProcessorFactory,
partitionManager: PartitionManager,
options?: EventProcessorOptions
) {}

/**
* Starts the event processor, fetching the list of partitions, and attempting to grab leases
* For each successful lease, it will get the details from the blob and start a receiver at the
* point where it left off previously.
*
* @return {Promise<void>}
*/
async start(): Promise<void> {}

/**
* Stops the EventProcessor from processing messages.
* @return {Promise<void>}
*/
async stop(): Promise<void> {}
}
18 changes: 18 additions & 0 deletions sdk/eventhub/event-hubs/src/partitionContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* PartitionContext is passed into an EventProrcessor's initialization handler and contains information
* about the partition, the EventProcessor will be processing events from.
*/
export class PartitionContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chradek Do we need this to be a class?
Or are we making it a class to ensure that the properties are readonly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to make this an interface now. If we want to ensure the properties are readonly at runtime we can either freeze the object that implements the interface Object.freeze or create the object using Object.defineProperties

public readonly partitionId: string;
public readonly eventHubName: string;
public readonly consumerGroupName: string;

constructor(partitionId: string, eventHubName: string, consumerGroupName: string) {
this.partitionId = partitionId;
this.eventHubName = eventHubName;
this.consumerGroupName = consumerGroupName;
}
}