From 586d02cd9484a9bca14f472bca3293fb3365e7c5 Mon Sep 17 00:00:00 2001 From: chradek Date: Mon, 30 Nov 2020 16:56:40 -0800 Subject: [PATCH] [event-hubs] express sample with sendBatch --- .../samples/expressSample/package.json | 39 +++- .../src/asyncBatchingProducer.ts | 198 ++++++++++++++++++ .../expressSample/src/eventProducer.ts | 59 +++--- .../samples/expressSample/src/index.ts | 36 ++-- 4 files changed, 281 insertions(+), 51 deletions(-) create mode 100644 sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts diff --git a/sdk/eventhub/event-hubs/samples/expressSample/package.json b/sdk/eventhub/event-hubs/samples/expressSample/package.json index 82bbde77123a..97b788346f5c 100644 --- a/sdk/eventhub/event-hubs/samples/expressSample/package.json +++ b/sdk/eventhub/event-hubs/samples/expressSample/package.json @@ -1,20 +1,45 @@ { - "name": "expressSample", + "name": "azure-event-hubs-samples-express", + "private": true, "version": "1.0.0", - "description": "", - "main": "index.js", + "description": "Azure Event Hubs client library samples with Express", + "engine": { + "node": ">=8.0.0" + }, "scripts": { "build": "tsc", + "prebuild": "rimraf dist/", "start": "node ./dist/index.js" }, - "keywords": [], - "author": "", - "license": "ISC", + "repository": { + "type": "git", + "url": "git+https://github.com/Azure/azure-sdk-for-js.git" + }, + "keywords": [ + "Azure", + "Event Hubs", + "Node.js", + "TypeScript", + "Express" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "bugs": { + "url": "https://github.com/Azure/azure-sdk-for-js/issues" + }, + "homepage": "https://github.com/Azure/azure-sdk-for-js#readme", + "sideEffects": false, "dependencies": { "@azure/event-hubs": "^5.3.0", - "async-lock": "^1.2.4", "body-parser": "^1.19.0", "express": "^4.17.1", "uuid": "^8.3.1" + }, + "devDependencies": { + "@types/body-parser": "^1.19.0", + "@types/express": "^4.17.9", + "@types/node": "^12.12.17", + "rimraf": "^3.0.0", + "typescript": "^3.7.2" } } diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts new file mode 100644 index 000000000000..83f494a2edcf --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts @@ -0,0 +1,198 @@ +import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs"; + +export interface AsyncBatchingProducerOptions { + producer: EventHubProducerClient; + maxWaitTimeInSeconds: number; + maxBatchSize?: number; +} + +export class AsyncBatchingProducer { + private _abortController = new AbortController(); + private _lastBatchCreationTime: number = 0; + private _eventQueue: AwaitableQueue = new AwaitableQueue(); + private _maxBatchSize: number; + private _maxWaitTimeInMs: number; + private _producer: EventHubProducerClient; + + constructor(options: AsyncBatchingProducerOptions) { + this._maxBatchSize = options.maxBatchSize ?? Infinity; + this._maxWaitTimeInMs = options.maxWaitTimeInSeconds * 1000; + this._producer = options.producer; + } + + /** + * Queues up the eventData so it can be sent to Event Hubs. + * @param eventData + */ + public send(eventData: EventData) { + this._eventQueue.push(eventData); + } + + /** + * Stops the `AsyncBatchingProducer` from sending anymore events to Event Hubs. + */ + public stop() { + this._abortController.abort(); + return this._producer.close(); + } + + /** + * Starts sending events to Event Hubs in the order they were received via `send()` calls. + * This method will run continuously until `stop()` is called. + */ + async start() { + const abortSignal = this._abortController.signal; + let batch = await this._createBatch(); + let futureEvent = this._eventQueue.shift(); + while (!abortSignal.aborted) { + try { + const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime; + // If there aren't any events in the batch, wait the maximum amount of time for an event. + const maximumTimeToWaitForEvent = batch.count + ? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0) + : this._maxWaitTimeInMs; + + // Wait for either the next event, or for the allotted time to pass. + const event = await Promise.race([ + futureEvent, + wait(maximumTimeToWaitForEvent, abortSignal) + ]); + + if (!event) { + // We didn't receive an event within the allotted time. + // Send the existing batch if it has events in it. + if (batch.count) { + await this._producer.sendBatch(batch, { abortSignal }); + batch = await this._createBatch(); + } + continue; + } else { + // We received an event, so get a promise for the next one. + futureEvent = this._eventQueue.shift(); + } + + // Attempt to add the event to the existing batch. + const didAdd = batch.tryAdd(event); + + // If the event was added to the batch and we're now + // at the max batch size, send the batch. + if (didAdd && batch.count >= this._maxBatchSize) { + await this._producer.sendBatch(batch, { abortSignal }); + batch = await this._createBatch(); + } else if (!didAdd && batch.count) { + // If the event wasn't able to be added and the current + // batch isn't empty, attempt to send the current batch + // and add the event to a new batch. + await this._producer.sendBatch(batch, { abortSignal }); + batch = await this._createBatch(); + // If the event still can't be added to an empty batch, just ignore it. + batch.tryAdd(event); + } + } catch (err) { + // Ignore `AbortError` since that gets thrown when `stop()` is called. + if (err.name !== "AbortError") { + console.error(`Encountered error: ${err}`); + } + } + } + } + + /** + * Helper method that sets the lastBatchCreationTime and returns a new batch. + */ + private _createBatch(): Promise { + this._lastBatchCreationTime = Date.now(); + return this._producer.createBatch(); + } +} + +/** + * This function returns a promise that resolves after the specified amount of time. + * It also supports cancellation via passing in an `abortSignal`. + * @param timeInMs The amount of time in milliseconds the function should wait before resolving. + * @param abortSignal Used to support rejecting the promise immediately. + */ +function wait(timeInMs: number, abortSignal: AbortSignalLike): Promise { + return new Promise((resolve, reject) => { + // Cancel quickly if the provided abortSignal has already been aborted. + if (abortSignal.aborted) { + return reject(new AbortError("The operation was cancelled.")); + } + // Create an abort event listener that rejects the promise with an AbortError. + // It also clears the existing setTimeout and removes itself from the abortSignal. + const abortListener = () => { + clearTimeout(tid); + reject(new AbortError("This operation was cancelled.")); + abortSignal.removeEventListener("abort", abortListener); + }; + // Create the timer that will resolve the promise. + // It also ensures that abort event listener is removed from the abortSignal. + const tid = setTimeout(() => { + abortSignal.removeEventListener("abort", abortListener); + resolve(); + }, timeInMs); + // Add an abort listener so that the promise can be rejected if the user cancels their operation. + abortSignal.addEventListener("abort", abortListener); + }); +} + +/** + * `AwaitableQueue` stores items in the order that they are received. + * + * This differs from ordinary Queues in that `shift` returns a Promise for a value. + * This allows a consumer of the queue to request an item that the queue does not yet have. + */ +class AwaitableQueue { + private readonly _items: T[]; + + private _nextItemResolve?: (item: T) => void; + private _nextItemPromise?: Promise; + + constructor(items?: T[]) { + this._items = items ?? []; + } + + public size(): number { + return this._items.length; + } + + /** + * Returns a Promise that will resolve with the first item in the queue. + */ + public shift(): Promise { + if (this._nextItemPromise) { + return this._nextItemPromise; + } + + const item = this._items.shift(); + if (typeof item !== "undefined") { + return Promise.resolve(item); + } + + this._nextItemPromise = new Promise((resolve) => (this._nextItemResolve = resolve)); + + return this._nextItemPromise; + } + + /** + * Appends new item to the queue. + * @param item + */ + public push(item: T): void { + if (!this._resolveNextItem(item)) { + this._items.push(item); + } + } + + private _resolveNextItem(item: T) { + if (!this._nextItemResolve) { + return false; + } + const resolve = this._nextItemResolve; + this._nextItemResolve = undefined; + this._nextItemPromise = undefined; + resolve(item); + return true; + } +} diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts index 2533ba2fe0af..ddf4f2cae114 100644 --- a/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts @@ -1,13 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT Licence. + import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs"; -import AsyncLock from "async-lock"; -import { v4 as uuid } from "uuid"; export class EventProducer { private producerClient: EventHubProducerClient; - private batch: EventDataBatch; + private awaitableBatch: Promise; private count: number; - private lock: AsyncLock; - private lockKey: string; private lastBatchCreatedTime: number; constructor( @@ -18,8 +17,6 @@ export class EventProducer { ) { this.producerClient = new EventHubProducerClient(eventHubConnectString, eventHubName); this.count = 0; - this.lock = new AsyncLock(); - this.lockKey = `createBatch-${uuid()}`; this.lastBatchCreatedTime = Date.now(); } @@ -33,32 +30,34 @@ export class EventProducer { } public async send(requestId: string, payload: any) { - if (this.batch === undefined) { - this.batch = await this.lock.acquire(this.lockKey, () => { - if (this.batch === undefined) { - console.log(`Acquire lock ${this.lockKey} and create a new batch`); - this.lastBatchCreatedTime = Date.now(); - return this.producerClient.createBatch(); - } else { - console.log( - `Acquire lock ${this.lockKey} and skip create. this.batch count: ${this.batch.count}` - ); - return this.batch; - } - }); - } - const isAdded = this.batch.tryAdd(this.constructEventData(requestId, payload)); + const batch = await this.getOrCreateBatch(); + const eventData = this.constructEventData(requestId, payload) + const isAdded = batch.tryAdd(eventData); - if (!isAdded || this.batch.count >= this.batchSendNumber) { - const curBatch = this.batch; - this.batch = undefined; - await this.sendBatch(curBatch); + if (!isAdded || batch.count >= this.batchSendNumber) { + this.awaitableBatch = undefined; + await this.sendBatch(batch); if (!isAdded) { - console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`); + const batch = await this.getOrCreateBatch(); + if (!batch.tryAdd(eventData)) { + console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`); + } } } } + private async getOrCreateBatch(): Promise { + // Check if there is an existing promise for a batch. + if (this.awaitableBatch) { + return this.awaitableBatch; + } + + console.log(`Creating a new batch.`); + this.lastBatchCreatedTime = Date.now(); + this.awaitableBatch = this.producerClient.createBatch(); + return this.awaitableBatch; + } + private async sendBatch(curBatch: EventDataBatch) { try { await this.producerClient.sendBatch(curBatch); @@ -72,9 +71,9 @@ export class EventProducer { public timeScan() { setInterval(async () => { if (Date.now() - this.lastBatchCreatedTime >= this.sendBatchTimeIntervalSeconds * 1000) { - if (this.batch !== undefined) { - const curBatch = this.batch; - this.batch = undefined; + if (this.awaitableBatch !== undefined) { + const curBatch = await this.awaitableBatch; + this.awaitableBatch = undefined; console.log(`time trigger send batch. count: ${curBatch.count}`); await this.sendBatch(curBatch); } diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts index 4b8f4f75ac97..b4f84e27ebf6 100644 --- a/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts @@ -1,20 +1,23 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT Licence. + import { v4 as uuid } from "uuid"; -import { EventProducer } from "./eventProducer"; +import { AsyncBatchingProducer } from "./asyncBatchingProducer"; import bodyParser from "body-parser"; -var express = require("express"); -var app = express(); +import express from "express"; +import { EventHubProducerClient } from "@azure/event-hubs"; +const app = express(); -const eventHubConnectionString = "your eventHub connection string"; -const eventHubName = "your eventHub name"; -const batchSendSize = 20; +const eventHubConnectionString = "my connection string"; +const eventHubName = "my event hub name"; +const batchSendSize = 3; const timeIntervalSeconds = 10; -const eventProducer = new EventProducer( - eventHubConnectionString, - eventHubName, - batchSendSize, - timeIntervalSeconds -); +const eventProducer = new AsyncBatchingProducer({ + producer: new EventHubProducerClient(eventHubConnectionString, eventHubName), + maxWaitTimeInSeconds: timeIntervalSeconds, + maxBatchSize: batchSendSize +}); const port = 8080; app.use(bodyParser.urlencoded({ extended: false })); @@ -22,12 +25,17 @@ app.use(bodyParser.json()); // respond with requestId app.post("/ingest", async (req, res) => { const requestId = uuid(); - await eventProducer.send(requestId, req.body); + await eventProducer.send({ + properties: { + request_id: requestId + }, + body: req.body + }); res.send(`ingested event. requestId: ${requestId}`); }); // enable timeTrigger for eventHub. -eventProducer.timeScan(); +eventProducer.start(); app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`);