Skip to content

Commit

Permalink
[event-hubs] fix subscription deadlock (#9491)
Browse files Browse the repository at this point in the history
  • Loading branch information
chradek authored Jun 11, 2020
1 parent 1bb378e commit b425f64
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
3 changes: 3 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 5.2.2 (Unreleased)

- Fixes issue [#9289](https://github.com/Azure/azure-sdk-for-js/issues/9289)
where calling `await subscription.close()` inside of a subscription's `processError`
handler would cause the subscription to deadlock.

## 5.2.1 (2020-06-08)

Expand Down
19 changes: 18 additions & 1 deletion sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,18 @@ export class EventProcessor {
loadBalancer: PartitionLoadBalancer,
abortSignal: AbortSignalLike
): Promise<void> {
let cancelLoopResolver;
// This provides a mechanism for exiting the loop early
// if the subscription has had `close` called.
const cancelLoopPromise = new Promise((resolve) => {
cancelLoopResolver = resolve;
if (abortSignal.aborted) {
return resolve();
}

abortSignal.addEventListener("abort", resolve);
});

// periodically check if there is any partition not being processed and process it
while (!abortSignal.aborted) {
try {
Expand Down Expand Up @@ -444,7 +456,8 @@ export class EventProcessor {
} catch (err) {
logger.warning(`[${this._id}] An error occured within the EventProcessor loop: ${err}`);
logErrorStackTrace(err);
await this._handleSubscriptionError(err);
// Protect against the scenario where the user awaits on subscription.close() from inside processError.
await Promise.race([this._handleSubscriptionError(err), cancelLoopPromise]);
} finally {
// sleep for some time, then continue the loop again.
logger.verbose(
Expand All @@ -454,6 +467,10 @@ export class EventProcessor {
await delayWithoutThrow(this._loopIntervalInMs, abortSignal);
}
}

if (cancelLoopResolver) {
abortSignal.removeEventListener("abort", cancelLoopResolver);
}
this._isRunning = false;
}

Expand Down
59 changes: 59 additions & 0 deletions sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -681,5 +681,64 @@ describe("EventHubConsumerClient", () => {
"processClose was not called the same number of times as processInitialize."
);
});

describe("processError", function(): void {
it("supports awaiting subscription.close on non partition-specific errors", async function(): Promise<
void
> {
// Use an invalid Event Hub name to trigger a non partition-specific error.
const client = new EventHubConsumerClient(
EventHubConsumerClient.defaultConsumerGroupName,
service.connectionString,
"Fake-Hub"
);

let subscription: Subscription;
const caughtErr: Error = await new Promise((resolve) => {
subscription = client.subscribe({
processEvents: async () => {},
processError: async (err, context) => {
if (!context.partitionId) {
await subscription.close();
resolve(err);
}
}
});
});

should.exist(caughtErr);

await client.close();
});

it("supports awaiting subscription.close on partition-specific errors", async function(): Promise<
void
> {
// Use an invalid Event Hub name to trigger a non partition-specific error.
const client = new EventHubConsumerClient(
EventHubConsumerClient.defaultConsumerGroupName,
service.connectionString,
service.path
);

let subscription: Subscription;
const caughtErr: Error = await new Promise((resolve) => {
// Subscribe to an invalid partition id to trigger a partition-specific error.
subscription = client.subscribe("-1", {
processEvents: async () => {},
processError: async (err, context) => {
if (context.partitionId) {
await subscription.close();
resolve(err);
}
}
});
});

should.exist(caughtErr);

await client.close();
});
});
});
});

0 comments on commit b425f64

Please sign in to comment.