Skip to content

Commit

Permalink
[Event Hubs] Use namespace in Partition Manager to guarantee uniquene…
Browse files Browse the repository at this point in the history
…ss (#5153)
  • Loading branch information
ShivangiReja authored and ramya-rao-a committed Sep 18, 2019
1 parent 60bcffe commit 6f5fd23
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 50 deletions.
5 changes: 5 additions & 0 deletions sdk/eventhub/event-hubs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### (Date) 5.0.0-preview.4
- Current implementation of the Partition Manager takes the event hub name, consumer group name and partition id to ensure uniqueness for the checkpoint and ownership.
Since the same event hub name and consumer group name can exist in another namespace, we added `fullyQualifiedNamespace` as well to ensure uniqueness.
([PR #5153](https://github.com/Azure/azure-sdk-for-js/pull/5153))

### 2019-09-09 5.0.0-preview.3

#### Features
Expand Down
8 changes: 6 additions & 2 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface Checkpoint {
consumerGroupName: string;
eTag: string;
eventHubName: string;
fullyQualifiedNamespace: string;
offset: number;
ownerId: string;
partitionId: string;
Expand Down Expand Up @@ -84,6 +85,7 @@ export class EventHubClient {
createProducer(options?: EventHubProducerOptions): EventHubProducer;
static defaultConsumerGroupName: string;
readonly eventHubName: string;
readonly fullyQualifiedNamespace: string;
getPartitionIds(abortSignal?: AbortSignalLike): Promise<Array<string>>;
getPartitionProperties(partitionId: string, abortSignal?: AbortSignalLike): Promise<PartitionProperties>;
getProperties(abortSignal?: AbortSignalLike): Promise<EventHubProperties>;
Expand Down Expand Up @@ -184,7 +186,7 @@ export interface EventProcessorOptions {
// @public
export class InMemoryPartitionManager implements PartitionManager {
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

Expand All @@ -207,7 +209,7 @@ export type OnMessage = (eventData: ReceivedEventData) => void;
// @public
export interface PartitionManager {
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

Expand All @@ -216,6 +218,7 @@ export interface PartitionOwnership {
consumerGroupName: string;
eTag?: string;
eventHubName: string;
fullyQualifiedNamespace: string;
lastModifiedTimeInMS?: number;
offset?: number;
ownerId: string;
Expand All @@ -230,6 +233,7 @@ export class PartitionProcessor {
consumerGroupName: string;
eventHubName: string;
eventProcessorId: string;
fullyQualifiedNamespace: string;
initialize(): Promise<void>;
lastEnqueuedEventInfo: LastEnqueuedEventInfo;
partitionId: string;
Expand Down
28 changes: 19 additions & 9 deletions sdk/eventhub/event-hubs/src/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import { throwTypeErrorIfParameterMissing, throwErrorIfConnectionClosed } from "
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
const timeoutInMs =
retryOptions == undefined ||
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs
? Constants.defaultOperationTimeoutInMs
: retryOptions.timeoutInMs;
return timeoutInMs;
Expand Down Expand Up @@ -64,7 +64,7 @@ export interface EventHubProducerOptions {
* The set of options to configure the `send` operation on the `EventHubProducer`.
* - `partitionKey` : A value that is hashed to produce a partition assignment.
* - `abortSignal` : A signal the request to cancel the send operation.
*
*
* Example usage:
* ```js
* {
Expand Down Expand Up @@ -94,7 +94,7 @@ export interface SendOptions {
* Not applicable if the `EventHubProducer` was created using a `partitionId`.
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached.
* - `abortSignal` : A signal the request to cancel the send operation.
*
*
* Example usage:
* ```js
* {
Expand Down Expand Up @@ -131,7 +131,7 @@ export interface BatchOptions {
* consumers to fail if their `ownerLevel` is lower or doesn't exist.
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events.
* A simple usage can be `{ "maxRetries": 4 }`.
*
*
* Example usage:
* ```js
* {
Expand Down Expand Up @@ -184,7 +184,7 @@ export interface EventHubConsumerOptions {
* over a WebSocket.
* - `retryOptions` : The retry options for all the operations on the client/producer/consumer.
* A simple usage can be `{ "maxRetries": 4 }`.
*
*
* Example usage:
* ```js
* {
Expand Down Expand Up @@ -274,6 +274,16 @@ export class EventHubClient {
return this._context.config.entityPath;
}

/**
* @property
* @readonly
* The fully qualified Event Hubs namespace for which this client is created. This is likely to be similar to
* <yournamespace>.servicebus.windows.net.
*/
get fullyQualifiedNamespace(): string {
return this._context.config.host;
}

/**
* @constructor
* @param connectionString - The connection string to use for connecting to the Event Hubs namespace.
Expand Down Expand Up @@ -356,7 +366,7 @@ export class EventHubClient {
) {
throw new TypeError(
`Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` +
`must contain "EntityPath=<your-event-hub-name>".`
`must contain "EntityPath=<your-event-hub-name>".`
);
}
if (
Expand All @@ -367,7 +377,7 @@ export class EventHubClient {
) {
throw new TypeError(
`The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` +
`doesn't match with eventHubName: "${eventHubNameOrOptions}".`
`doesn't match with eventHubName: "${eventHubNameOrOptions}".`
);
}
connectionString = hostOrConnectionString;
Expand Down
16 changes: 15 additions & 1 deletion sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ export enum CloseReason {
* **Note**: This is used internally by the `EventProcessor` and user never has to create it directly.
*/
export interface PartitionOwnership {
/**
* @property The fully qualified Event Hubs namespace. This is likely to be similar to
* <yournamespace>.servicebus.windows.net
*/
fullyQualifiedNamespace: string;
/**
* @property The event hub name
*/
Expand Down Expand Up @@ -91,11 +96,17 @@ export interface PartitionManager {
* Called to get the list of all existing partition ownership from the underlying data store. Could return empty
* results if there are is no existing ownership information.
*
* @param fullyQualifiedNamespace The fully qualified Event Hubs namespace. This is likely to be similar to
* <yournamespace>.servicebus.windows.net.
* @param eventHubName The event hub name.
* @param consumerGroupName The consumer group name.
* @return A list of partition ownership details of all the partitions that have/had an owner.
*/
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
listOwnership(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroupName: string
): Promise<PartitionOwnership[]>;
/**
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
Expand Down Expand Up @@ -242,6 +253,7 @@ export class EventProcessor {
const partitionOwnership: PartitionOwnership = {
ownerId: this._id,
partitionId: partitionIdToClaim,
fullyQualifiedNamespace: this._eventHubClient.fullyQualifiedNamespace,
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName,
sequenceNumber: previousPartitionOwnership
Expand Down Expand Up @@ -279,6 +291,7 @@ export class EventProcessor {
`[${this._id}] [${partitionIdToClaim}] Calling user-provided PartitionProcessorFactory.`
);
const partitionProcessor = new this._partitionProcessorClass();
partitionProcessor.fullyQualifiedNamespace = this._eventHubClient.fullyQualifiedNamespace;
partitionProcessor.eventHubName = this._eventHubClient.eventHubName;
partitionProcessor.consumerGroupName = this._consumerGroupName;
partitionProcessor.partitionId = ownershipRequest.partitionId;
Expand Down Expand Up @@ -317,6 +330,7 @@ export class EventProcessor {
const partitionOwnershipMap: Map<string, PartitionOwnership> = new Map();
// Retrieve current partition ownership details from the datastore.
const partitionOwnership = await this._partitionManager.listOwnership(
this._eventHubClient.fullyQualifiedNamespace,
this._eventHubClient.eventHubName,
this._consumerGroupName
);
Expand Down
3 changes: 3 additions & 0 deletions sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ export class InMemoryPartitionManager implements PartitionManager {
* Get the list of all existing partition ownership from the underlying data store. Could return empty
* results if there are is no existing ownership information.
*
* @param fullyQualifiedNamespace The fully qualified Event Hubs namespace. This is likely to be similar to
* <yournamespace>.servicebus.windows.net.
* @param eventHubName The event hub name.
* @param consumerGroupName The consumer group name.
* @return Partition ownership details of all the partitions that have/had an owner..
*/
async listOwnership(
fullyQualifiedNamespace: string,
eventHubName: string,
consumerGroupName: string
): Promise<PartitionOwnership[]> {
Expand Down
24 changes: 24 additions & 0 deletions sdk/eventhub/event-hubs/src/partitionProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import { LastEnqueuedEventInfo } from "./eventHubReceiver";
* internal workings of `EventProcessor` and `PartitionManager`.
**/
export interface Checkpoint {
/**
* @property The fully qualified Event Hubs namespace. This is likely to be similar to
* <yournamespace>.servicebus.windows.net
*/
fullyQualifiedNamespace: string;
/**
* @property The event hub name
*/
Expand Down Expand Up @@ -56,6 +61,7 @@ export interface Checkpoint {
export class PartitionProcessor {
private _partitionManager: PartitionManager | undefined;
private _consumerGroupName: string | undefined;
private _fullyQualifiedNamespace: string | undefined;
private _eventHubName: string | undefined;
private _eventProcessorId: string | undefined;
private _partitionId: string | undefined;
Expand All @@ -81,6 +87,23 @@ export class PartitionProcessor {
this._lastEnqueuedEventInfo = lastEnqueuedEventInfo;
}

/**
* @property The fully qualified namespace from where the current partition is being processed. It is set by the `EventProcessor`
* @readonly
*/
public get fullyQualifiedNamespace() {
return this._fullyQualifiedNamespace!;
}

/**
* @property The fully qualified namespace from where the current partition is being processed. It is set by the `EventProcessor`
*/
public set fullyQualifiedNamespace(fullyQualifiedNamespace: string) {
if (!this._fullyQualifiedNamespace) {
this._fullyQualifiedNamespace = fullyQualifiedNamespace;
}
}

/**
* @property The name of the consumer group from where the current partition is being processed. It is set by the `EventProcessor`
* @readonly
Expand Down Expand Up @@ -210,6 +233,7 @@ export class PartitionProcessor {
offset?: number
): Promise<void> {
const checkpoint: Checkpoint = {
fullyQualifiedNamespace: this._fullyQualifiedNamespace!,
eventHubName: this._eventHubName!,
consumerGroupName: this._consumerGroupName!,
ownerId: this._eventProcessorId!,
Expand Down
16 changes: 15 additions & 1 deletion sdk/eventhub/event-hubs/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ describe("Create EventHubClient #RunnableInBrowser", function(): void {
should.equal(client.eventHubName, "my-event-hub-name");
});

it("Verify fullyQualifiedNamespace creating an EventHubClient using a connection string", function(): void {
const client = new EventHubClient(
"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=a;SharedAccessKey=b;EntityPath=my-event-hub-name"
);
should.equal(client.fullyQualifiedNamespace, "test.servicebus.windows.net");
});

it("creates an EventHubClient from a connection string and an Event Hub name", function(): void {
const client = new EventHubClient(
"Endpoint=sb://a;SharedAccessKeyName=b;SharedAccessKey=c",
Expand All @@ -75,7 +82,7 @@ describe("Create EventHubClient #RunnableInBrowser", function(): void {
};
}
};
const client = new EventHubClient("abc","my-event-hub-name", dummyCredential);
const client = new EventHubClient("abc", "my-event-hub-name", dummyCredential);
client.should.be.an.instanceof(EventHubClient);
should.equal(client.eventHubName, "my-event-hub-name");
});
Expand Down Expand Up @@ -111,6 +118,13 @@ describe("Create EventHubClient #RunnableInBrowser", function(): void {
should.equal(hubInfo.path, client.eventHubName);
await client.close();
});

it("Verify fullyQualifiedNamespace when creating an EventHubClient from an Azure.Identity credential", function(): void {
const endpoint = "test.servicebus.windows.net";
const credential = new EnvironmentCredential();
const client = new EventHubClient(endpoint, "my-event-hub-name", credential);
should.equal(client.fullyQualifiedNamespace, "test.servicebus.windows.net");
});
});

describe("ServiceCommunicationError for non existent namespace", function(): void {
Expand Down
Loading

0 comments on commit 6f5fd23

Please sign in to comment.