Skip to content

Commit

Permalink
[event-hubs] adds loadBalancingOptions and greedy load balancing stra…
Browse files Browse the repository at this point in the history
…tegy (#9706)

* [event-hubs] adds EventHubConsumerClientOptions

* fix conflicts from rebase

* refactor load balancer to get all claimable partitions

* add balanced and greedy load balancer strategies

* add unbalancedLoadBalancingStrategy

* update pumpManager to expose receivingFromPartitions

* updates EventProcessor to use LoadBalancingStrategies

* add tests for the partitionOwnershipExpirationIntervalInMs

* add functional load balancing tests

* update docs

* update version to 5.3.0-preview.1

* add changelog

* update pnpm-lock

* address feedback

* add explicity existance check to partitionOwnership.lastModifiedTimeInMs

* trashing -> thrashing

* be smarter about else-if statements

* explain the magic number 6

* identifyPartitionsToClaim -> getPartitionsToClaim

* identifyClaimablePartitions -> listAvailablePartitions

* add better summary for EventHubConsumerClientOptions

* add better docs around greedy and balanced strategies

* remove superfluous doc from CommonEventProcessorOptions

* add comment around why we have abandoned partitions

* throw AbortError instead of silent return

* remove unneeded receivingFrom method

* update pnpm-lock.yaml

* rush update
  • Loading branch information
chradek authored Jul 2, 2020
1 parent b3989f1 commit 9b0d772
Show file tree
Hide file tree
Showing 19 changed files with 2,276 additions and 904 deletions.
43 changes: 42 additions & 1 deletion common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 5.3.0-preview.1 (Unreleased)

- Adds `loadBalancingOptions` to the `EventHubConsumerClient` to add control around
how aggressively the client claims partitions while load balancing.
([PR 9706](https://github.com/Azure/azure-sdk-for-js/pull/9706))

## 5.2.2 (2020-06-30)

- Fixes issue [#9289](https://github.com/Azure/azure-sdk-for-js/issues/9289)
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.2.2",
"version": "5.3.0-preview.1",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
24 changes: 18 additions & 6 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ export interface EventHubClientOptions {

// @public
export class EventHubConsumerClient {
constructor(consumerGroup: string, connectionString: string, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubClientOptions);
constructor(consumerGroup: string, connectionString: string, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubConsumerClientOptions);
constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions);
close(): Promise<void>;
static defaultConsumerGroupName: string;
get eventHubName(): string;
Expand All @@ -98,6 +98,11 @@ export class EventHubConsumerClient {
subscribe(partitionId: string, handlers: SubscriptionEventHandlers, options?: SubscribeOptions): Subscription;
}

// @public
export interface EventHubConsumerClientOptions extends EventHubClientOptions {
loadBalancingOptions?: LoadBalancingOptions;
}

// @public
export class EventHubProducerClient {
constructor(connectionString: string, options?: EventHubClientOptions);
Expand Down Expand Up @@ -152,6 +157,13 @@ export interface LastEnqueuedEventProperties {
// @public
export const latestEventPosition: EventPosition;

// @public
export interface LoadBalancingOptions {
partitionOwnershipExpirationIntervalInMs?: number;
strategy?: "balanced" | "greedy";
updateIntervalInMs?: number;
}

// @public
export const logger: import("@azure/logger").AzureLogger;

Expand Down
91 changes: 69 additions & 22 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

import { ConnectionContext, createConnectionContext } from "./connectionContext";
import {
EventHubClientOptions,
EventHubConsumerClientOptions,
GetEventHubPropertiesOptions,
GetPartitionIdsOptions,
GetPartitionPropertiesOptions
GetPartitionPropertiesOptions,
LoadBalancingOptions
} from "./models/public";
import { InMemoryCheckpointStore } from "./inMemoryCheckpointStore";
import { CheckpointStore, EventProcessor, FullEventProcessorOptions } from "./eventProcessor";
import { GreedyPartitionLoadBalancer } from "./partitionLoadBalancer";
import { Constants, TokenCredential } from "@azure/core-amqp";
import { logger } from "./log";

Expand All @@ -24,6 +24,10 @@ import { EventHubProperties, PartitionProperties } from "./managementClient";
import { PartitionGate } from "./impl/partitionGate";
import { v4 as uuid } from "uuid";
import { validateEventPositions } from "./eventPosition";
import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStrategy";
import { UnbalancedLoadBalancingStrategy } from "./loadBalancerStrategies/unbalancedStrategy";
import { GreedyLoadBalancingStrategy } from "./loadBalancerStrategies/greedyStrategy";
import { BalancedLoadBalancingStrategy } from "./loadBalancerStrategies/balancedStrategy";

const defaultConsumerClientOptions: Required<Pick<
FullEventProcessorOptions,
Expand Down Expand Up @@ -58,7 +62,7 @@ export class EventHubConsumerClient {
/**
* The options passed by the user when creating the EventHubClient instance.
*/
private _clientOptions: EventHubClientOptions;
private _clientOptions: EventHubConsumerClientOptions;
private _partitionGate = new PartitionGate();
private _id = uuid();

Expand All @@ -78,6 +82,11 @@ export class EventHubConsumerClient {
private _checkpointStore: CheckpointStore;
private _userChoseCheckpointStore: boolean;

/**
* Options for configuring load balancing.
*/
private readonly _loadBalancingOptions: Required<LoadBalancingOptions>;

/**
* @property
* @readonly
Expand Down Expand Up @@ -111,7 +120,11 @@ export class EventHubConsumerClient {
* - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets.
* - `userAgent` : A string to append to the built in user agent string that is passed to the service.
*/
constructor(consumerGroup: string, connectionString: string, options?: EventHubClientOptions); // #1
constructor(
consumerGroup: string,
connectionString: string,
options?: EventHubConsumerClientOptions
); // #1
/**
* @constructor
* The `EventHubConsumerClient` class is used to consume events from an Event Hub.
Expand All @@ -133,7 +146,7 @@ export class EventHubConsumerClient {
consumerGroup: string,
connectionString: string,
checkpointStore: CheckpointStore,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #1.1
/**
* @constructor
Expand All @@ -154,7 +167,7 @@ export class EventHubConsumerClient {
consumerGroup: string,
connectionString: string,
eventHubName: string,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #2
/**
* @constructor
Expand All @@ -179,7 +192,7 @@ export class EventHubConsumerClient {
connectionString: string,
eventHubName: string,
checkpointStore: CheckpointStore,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #2.1
/**
* @constructor
Expand All @@ -202,7 +215,7 @@ export class EventHubConsumerClient {
fullyQualifiedNamespace: string,
eventHubName: string,
credential: TokenCredential,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #3
/**
* @constructor
Expand All @@ -229,18 +242,21 @@ export class EventHubConsumerClient {
eventHubName: string,
credential: TokenCredential,
checkpointStore: CheckpointStore,
options?: EventHubClientOptions
options?: EventHubConsumerClientOptions
); // #3.1
constructor(
private _consumerGroup: string,
connectionStringOrFullyQualifiedNamespace2: string,
checkpointStoreOrEventHubNameOrOptions3?: CheckpointStore | EventHubClientOptions | string,
checkpointStoreOrEventHubNameOrOptions3?:
| CheckpointStore
| EventHubConsumerClientOptions
| string,
checkpointStoreOrCredentialOrOptions4?:
| CheckpointStore
| EventHubClientOptions
| EventHubConsumerClientOptions
| TokenCredential,
checkpointStoreOrOptions5?: CheckpointStore | EventHubClientOptions,
options6?: EventHubClientOptions
checkpointStoreOrOptions5?: CheckpointStore | EventHubConsumerClientOptions,
options6?: EventHubConsumerClientOptions
) {
if (isTokenCredential(checkpointStoreOrCredentialOrOptions4)) {
// #3 or 3.1
Expand Down Expand Up @@ -271,7 +287,7 @@ export class EventHubConsumerClient {
// 2.1
this._checkpointStore = checkpointStoreOrCredentialOrOptions4;
this._userChoseCheckpointStore = true;
this._clientOptions = (checkpointStoreOrOptions5 as EventHubClientOptions) || {};
this._clientOptions = (checkpointStoreOrOptions5 as EventHubConsumerClientOptions) || {};
} else {
// 2
this._checkpointStore = new InMemoryCheckpointStore();
Expand All @@ -293,20 +309,28 @@ export class EventHubConsumerClient {
this._checkpointStore = checkpointStoreOrEventHubNameOrOptions3;
this._userChoseCheckpointStore = true;
this._clientOptions =
(checkpointStoreOrCredentialOrOptions4 as EventHubClientOptions) || {};
(checkpointStoreOrCredentialOrOptions4 as EventHubConsumerClientOptions) || {};
} else {
// 1
this._checkpointStore = new InMemoryCheckpointStore();
this._userChoseCheckpointStore = false;
this._clientOptions =
(checkpointStoreOrEventHubNameOrOptions3 as EventHubClientOptions) || {};
(checkpointStoreOrEventHubNameOrOptions3 as EventHubConsumerClientOptions) || {};
}

this._context = createConnectionContext(
connectionStringOrFullyQualifiedNamespace2,
this._clientOptions
);
}
this._loadBalancingOptions = {
// default options
strategy: "balanced",
updateIntervalInMs: 10000,
partitionOwnershipExpirationIntervalInMs: 60000,
// options supplied by user
...this._clientOptions?.loadBalancingOptions
};
}

/**
Expand Down Expand Up @@ -466,6 +490,27 @@ export class EventHubConsumerClient {
return subscription;
}

/**
* Gets the LoadBalancing strategy that should be used based on what the user provided.
*/
private _getLoadBalancingStrategy(): LoadBalancingStrategy {
if (!this._userChoseCheckpointStore) {
// The default behavior when a checkpointstore isn't provided
// is to always grab all the partitions.
return new UnbalancedLoadBalancingStrategy();
}

const partitionOwnershipExpirationIntervalInMs = this._loadBalancingOptions
.partitionOwnershipExpirationIntervalInMs;
if (this._loadBalancingOptions?.strategy === "greedy") {
return new GreedyLoadBalancingStrategy(partitionOwnershipExpirationIntervalInMs);
}

// The default behavior when a checkpointstore is provided is
// to grab one partition at a time.
return new BalancedLoadBalancingStrategy(partitionOwnershipExpirationIntervalInMs);
}

private createEventProcessorForAllPartitions(
subscriptionEventHandlers: SubscriptionEventHandlers,
options?: SubscribeOptions
Expand All @@ -480,6 +525,7 @@ export class EventHubConsumerClient {
logger.verbose("EventHubConsumerClient subscribing to all partitions, no checkpoint store.");
}

const loadBalancingStrategy = this._getLoadBalancingStrategy();
const eventProcessor = this._createEventProcessor(
this._context,
subscriptionEventHandlers,
Expand All @@ -488,13 +534,12 @@ export class EventHubConsumerClient {
...defaultConsumerClientOptions,
...(options as SubscribeOptions),
ownerLevel: getOwnerLevel(options, this._userChoseCheckpointStore),
processingTarget: this._userChoseCheckpointStore
? undefined
: new GreedyPartitionLoadBalancer(),
// make it so all the event processors process work with the same overarching owner ID
// this allows the EventHubConsumer to unify all the work for any processors that it spawns
ownerId: this._id,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
loadBalancingStrategy,
loopIntervalInMs: this._loadBalancingOptions.updateIntervalInMs
}
);

Expand Down Expand Up @@ -529,7 +574,9 @@ export class EventHubConsumerClient {
...options,
processingTarget: partitionId,
ownerLevel: getOwnerLevel(subscribeOptions, this._userChoseCheckpointStore),
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
loadBalancingStrategy: new UnbalancedLoadBalancingStrategy(),
loopIntervalInMs: this._loadBalancingOptions.updateIntervalInMs ?? 10000
}
);

Expand Down
Loading

0 comments on commit 9b0d772

Please sign in to comment.