Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] express sample with sendBatch #1

Merged
merged 1 commit into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions sdk/eventhub/event-hubs/samples/expressSample/package.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
{
"name": "expressSample",
"name": "azure-event-hubs-samples-express",
"private": true,
"version": "1.0.0",
"description": "",
"main": "index.js",
"description": "Azure Event Hubs client library samples with Express",
"engine": {
"node": ">=8.0.0"
},
"scripts": {
"build": "tsc",
"prebuild": "rimraf dist/",
"start": "node ./dist/index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"repository": {
"type": "git",
"url": "git+https://github.com/Azure/azure-sdk-for-js.git"
},
"keywords": [
"Azure",
"Event Hubs",
"Node.js",
"TypeScript",
"Express"
],
"author": "Microsoft Corporation",
"license": "MIT",
"bugs": {
"url": "https://github.com/Azure/azure-sdk-for-js/issues"
},
"homepage": "https://github.com/Azure/azure-sdk-for-js#readme",
"sideEffects": false,
"dependencies": {
"@azure/event-hubs": "^5.3.0",
"async-lock": "^1.2.4",
"body-parser": "^1.19.0",
"express": "^4.17.1",
"uuid": "^8.3.1"
},
"devDependencies": {
"@types/body-parser": "^1.19.0",
"@types/express": "^4.17.9",
"@types/node": "^12.12.17",
"rimraf": "^3.0.0",
"typescript": "^3.7.2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty solid, but we need a sample sample so we know how to use this sample 😁

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @bterlson I found your blog https://devblogs.microsoft.com/azure-sdk/how-to-use-abort-signals-to-cancel-operations-in-the-azure-sdk-for-javascript-typescript/. It's quite useful for me to understand abortController. Thanks~ 👍

import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";

export interface AsyncBatchingProducerOptions {
producer: EventHubProducerClient;
maxWaitTimeInSeconds: number;
maxBatchSize?: number;
}

export class AsyncBatchingProducer {
private _abortController = new AbortController();
private _lastBatchCreationTime: number = 0;
private _eventQueue: AwaitableQueue<EventData> = new AwaitableQueue();
private _maxBatchSize: number;
private _maxWaitTimeInMs: number;
private _producer: EventHubProducerClient;

constructor(options: AsyncBatchingProducerOptions) {
this._maxBatchSize = options.maxBatchSize ?? Infinity;
this._maxWaitTimeInMs = options.maxWaitTimeInSeconds * 1000;
this._producer = options.producer;
}

/**
* Queues up the eventData so it can be sent to Event Hubs.
* @param eventData
*/
public send(eventData: EventData) {
this._eventQueue.push(eventData);
}

/**
* Stops the `AsyncBatchingProducer` from sending anymore events to Event Hubs.
*/
public stop() {
this._abortController.abort();
return this._producer.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you flush and send anything still held in _eventQueue? Or is it guaranteed to be empty at this point?

Copy link
Author

@chradek chradek Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be items in the queue still, that's not a bad idea to flush it at this point.
Edit: But I'd just empty the queue, not send events, since this method is supposed to shut everything down asap 😄

}

/**
* Starts sending events to Event Hubs in the order they were received via `send()` calls.
* This method will run continuously until `stop()` is called.
*/
async start() {
const abortSignal = this._abortController.signal;
let batch = await this._createBatch();
let futureEvent = this._eventQueue.shift();
while (!abortSignal.aborted) {
try {
const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime;
// If there aren't any events in the batch, wait the maximum amount of time for an event.
const maximumTimeToWaitForEvent = batch.count
? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0)
: this._maxWaitTimeInMs;

// Wait for either the next event, or for the allotted time to pass.
const event = await Promise.race([
futureEvent,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does futureEvent promise resolve?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets fulfilled when the queue has an event available, and we get the value from the Promise.race call. We have to hold onto it in case the wait function resolves first in the Promise.race.

wait(maximumTimeToWaitForEvent, abortSignal)
]);

if (!event) {
// We didn't receive an event within the allotted time.
// Send the existing batch if it has events in it.
if (batch.count) {
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
}
continue;
} else {
// We received an event, so get a promise for the next one.
futureEvent = this._eventQueue.shift();
}

// Attempt to add the event to the existing batch.
const didAdd = batch.tryAdd(event);

// If the event was added to the batch and we're now
// at the max batch size, send the batch.
if (didAdd && batch.count >= this._maxBatchSize) {
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
} else if (!didAdd && batch.count) {
// If the event wasn't able to be added and the current
// batch isn't empty, attempt to send the current batch
// and add the event to a new batch.
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
// If the event still can't be added to an empty batch, just ignore it.
batch.tryAdd(event);
}
} catch (err) {
// Ignore `AbortError` since that gets thrown when `stop()` is called.
if (err.name !== "AbortError") {
console.error(`Encountered error: ${err}`);
}
}
}
}

/**
* Helper method that sets the lastBatchCreationTime and returns a new batch.
*/
private _createBatch(): Promise<EventDataBatch> {
this._lastBatchCreationTime = Date.now();
return this._producer.createBatch();
}
}

/**
* This function returns a promise that resolves after the specified amount of time.
* It also supports cancellation via passing in an `abortSignal`.
* @param timeInMs The amount of time in milliseconds the function should wait before resolving.
* @param abortSignal Used to support rejecting the promise immediately.
*/
function wait(timeInMs: number, abortSignal: AbortSignalLike): Promise<void> {
return new Promise((resolve, reject) => {
// Cancel quickly if the provided abortSignal has already been aborted.
if (abortSignal.aborted) {
return reject(new AbortError("The operation was cancelled."));
}
// Create an abort event listener that rejects the promise with an AbortError.
// It also clears the existing setTimeout and removes itself from the abortSignal.
const abortListener = () => {
clearTimeout(tid);
reject(new AbortError("This operation was cancelled."));
abortSignal.removeEventListener("abort", abortListener);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 😁

};
// Create the timer that will resolve the promise.
// It also ensures that abort event listener is removed from the abortSignal.
const tid = setTimeout(() => {
abortSignal.removeEventListener("abort", abortListener);
resolve();
}, timeInMs);
// Add an abort listener so that the promise can be rejected if the user cancels their operation.
abortSignal.addEventListener("abort", abortListener);
});
}

/**
* `AwaitableQueue` stores items in the order that they are received.
*
* This differs from ordinary Queues in that `shift` returns a Promise for a value.
* This allows a consumer of the queue to request an item that the queue does not yet have.
*/
class AwaitableQueue<T> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've written this millions of times, and this is a pretty clean implementation I have to say.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you sir.

private readonly _items: T[];

private _nextItemResolve?: (item: T) => void;
private _nextItemPromise?: Promise<T>;

constructor(items?: T[]) {
this._items = items ?? [];
}

public size(): number {
return this._items.length;
}

/**
* Returns a Promise that will resolve with the first item in the queue.
*/
public shift(): Promise<T> {
if (this._nextItemPromise) {
return this._nextItemPromise;
}

const item = this._items.shift();
if (typeof item !== "undefined") {
return Promise.resolve(item);
}

this._nextItemPromise = new Promise<T>((resolve) => (this._nextItemResolve = resolve));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this line not executed. this._nextItemPromise is an unused parameter

Copy link
Owner

@ruowan ruowan Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Does this line is to return a pending Promise, when there is no item in the queue? And the pending promise is used in Promise.race() so it supports timeout functionality for the awaitable queue. If my understanding wrong, pls correct me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right!


return this._nextItemPromise;
}

/**
* Appends new item to the queue.
* @param item
*/
public push(item: T): void {
if (!this._resolveNextItem(item)) {
this._items.push(item);
}
}

private _resolveNextItem(item: T) {
if (!this._nextItemResolve) {
return false;
}
const resolve = this._nextItemResolve;
this._nextItemResolve = undefined;
this._nextItemPromise = undefined;
resolve(item);
return true;
}
}
59 changes: 29 additions & 30 deletions sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";
import AsyncLock from "async-lock";
import { v4 as uuid } from "uuid";

export class EventProducer {
private producerClient: EventHubProducerClient;
private batch: EventDataBatch;
private awaitableBatch: Promise<EventDataBatch>;
private count: number;
private lock: AsyncLock;
private lockKey: string;
private lastBatchCreatedTime: number;

constructor(
Expand All @@ -18,8 +17,6 @@ export class EventProducer {
) {
this.producerClient = new EventHubProducerClient(eventHubConnectString, eventHubName);
this.count = 0;
this.lock = new AsyncLock();
this.lockKey = `createBatch-${uuid()}`;
this.lastBatchCreatedTime = Date.now();
}

Expand All @@ -33,32 +30,34 @@ export class EventProducer {
}

public async send(requestId: string, payload: any) {
if (this.batch === undefined) {
this.batch = await this.lock.acquire(this.lockKey, () => {
if (this.batch === undefined) {
console.log(`Acquire lock ${this.lockKey} and create a new batch`);
this.lastBatchCreatedTime = Date.now();
return this.producerClient.createBatch();
} else {
console.log(
`Acquire lock ${this.lockKey} and skip create. this.batch count: ${this.batch.count}`
);
return this.batch;
}
});
}
const isAdded = this.batch.tryAdd(this.constructEventData(requestId, payload));
const batch = await this.getOrCreateBatch();
const eventData = this.constructEventData(requestId, payload)
const isAdded = batch.tryAdd(eventData);

if (!isAdded || this.batch.count >= this.batchSendNumber) {
const curBatch = this.batch;
this.batch = undefined;
await this.sendBatch(curBatch);
if (!isAdded || batch.count >= this.batchSendNumber) {
this.awaitableBatch = undefined;
await this.sendBatch(batch);
if (!isAdded) {
console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`);
const batch = await this.getOrCreateBatch();
if (!batch.tryAdd(eventData)) {
console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`);
}
}
}
}

private async getOrCreateBatch(): Promise<EventDataBatch> {
// Check if there is an existing promise for a batch.
if (this.awaitableBatch) {
return this.awaitableBatch;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a great implement! You don't use any await in this function. All codes in this function are executed sequentially. event loop will not switch and avoid risk conditions. 👍

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No await suggests the async keyword could be dropped here for smaller codegen.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does smaller codegen mean?

}

console.log(`Creating a new batch.`);
this.lastBatchCreatedTime = Date.now();
this.awaitableBatch = this.producerClient.createBatch();
return this.awaitableBatch;
}

private async sendBatch(curBatch: EventDataBatch) {
try {
await this.producerClient.sendBatch(curBatch);
Expand All @@ -72,9 +71,9 @@ export class EventProducer {
public timeScan() {
setInterval(async () => {
if (Date.now() - this.lastBatchCreatedTime >= this.sendBatchTimeIntervalSeconds * 1000) {
if (this.batch !== undefined) {
const curBatch = this.batch;
this.batch = undefined;
if (this.awaitableBatch !== undefined) {
const curBatch = await this.awaitableBatch;
this.awaitableBatch = undefined;
console.log(`time trigger send batch. count: ${curBatch.count}`);
await this.sendBatch(curBatch);
}
Expand Down
Loading