diff --git a/sdk/eventhub/event-hubs/samples/expressSample/package.json b/sdk/eventhub/event-hubs/samples/expressSample/package.json new file mode 100644 index 000000000000..97b788346f5c --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/expressSample/package.json @@ -0,0 +1,45 @@ +{ + "name": "azure-event-hubs-samples-express", + "private": true, + "version": "1.0.0", + "description": "Azure Event Hubs client library samples with Express", + "engine": { + "node": ">=8.0.0" + }, + "scripts": { + "build": "tsc", + "prebuild": "rimraf dist/", + "start": "node ./dist/index.js" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/Azure/azure-sdk-for-js.git" + }, + "keywords": [ + "Azure", + "Event Hubs", + "Node.js", + "TypeScript", + "Express" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "bugs": { + "url": "https://github.com/Azure/azure-sdk-for-js/issues" + }, + "homepage": "https://github.com/Azure/azure-sdk-for-js#readme", + "sideEffects": false, + "dependencies": { + "@azure/event-hubs": "^5.3.0", + "body-parser": "^1.19.0", + "express": "^4.17.1", + "uuid": "^8.3.1" + }, + "devDependencies": { + "@types/body-parser": "^1.19.0", + "@types/express": "^4.17.9", + "@types/node": "^12.12.17", + "rimraf": "^3.0.0", + "typescript": "^3.7.2" + } +} diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/README.md b/sdk/eventhub/event-hubs/samples/expressSample/src/README.md new file mode 100644 index 000000000000..235af012b016 --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/README.md @@ -0,0 +1,54 @@ +# Azure Event Hubs client library express samples for Typescript + +This sample programs show how to use the JavaScript client libraries for Azure Event Hubs to send events in the node express framework. +One scenario is building an HTTP-based service that accepts events as part of an HTTP request, then transforms and sends those events into a downstream Event Hub. + +| **File Name** | **Description** | +| --------------------------------- | ------------------------------------------------------------------------------------------------------------------------------ | +| [asyncBatchingProducer.ts][eventproducer] | Demonstrates how the send() function can be used to send events to an Event Hub instance. Support batch send and time trigger. | +| [index.ts][index] | Express Http server entry point. Receive http payload and use AsyncBatchingProducer to ingest payload to eventHub. | + +## Prerequisites + +The samples are compatible with Node.js >= 8.0.0 and run in express. + +You need [an Azure subscription][freesub] and [an Azure Event Hub resource][azhubacct] to run these sample programs. + +## Setup + +To run the samples using the published version of the package: + +1. Install the dependencies using `npm`: + +```bash +npm install +``` + +2. Compile the sample to JavaScript by running the following command: + +```bash +npm run build +``` + +3. Start the node service on http://localhost:8080: + +```bash +npm start +``` + +4. Call local http server + +```bash +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"hello":"world"}' \ + http://localhost:8080/ingest +``` + +## Next Steps + +Take a look at our [API Documentation][apiref] for more information about the APIs that are available in the clients. + +[eventproducer]: https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts +[index]: https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts +[apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts new file mode 100644 index 000000000000..904a51389fdd --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts @@ -0,0 +1,212 @@ +/** + Copyright (c) Microsoft Corporation. + Licensed under the MIT Licence. + + This sample demonstrates a strategy for creating and sending + batches of events to Event Hubs. + + The AsyncBatchingProducer optimizes for creating the fewest + number of batches possible while sending events. + It supports setting thresholds for both the maximum number of + events allowed per batch, and the maximum amount of time + between sending batches. +*/ + +import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs"; + +export interface AsyncBatchingProducerOptions { + producer: EventHubProducerClient; + maxWaitTimeInSeconds: number; + maxBatchSize?: number; +} + +export class AsyncBatchingProducer { + private _abortController = new AbortController(); + private _lastBatchCreationTime: number = 0; + private _eventQueue: AwaitableQueue = new AwaitableQueue(); + private _maxBatchSize: number; + private _maxWaitTimeInMs: number; + private _producer: EventHubProducerClient; + + constructor(options: AsyncBatchingProducerOptions) { + this._maxBatchSize = options.maxBatchSize ?? Infinity; + this._maxWaitTimeInMs = options.maxWaitTimeInSeconds * 1000; + this._producer = options.producer; + } + + /** + * Queues up the eventData so it can be sent to Event Hubs. + * @param eventData + */ + public send(eventData: EventData) { + this._eventQueue.push(eventData); + } + + /** + * Stops the `AsyncBatchingProducer` from sending anymore events to Event Hubs. + */ + public stop() { + this._abortController.abort(); + return this._producer.close(); + } + + /** + * Starts sending events to Event Hubs in the order they were received via `send()` calls. + * This method will run continuously until `stop()` is called. + */ + async start() { + const abortSignal = this._abortController.signal; + let batch = await this._createBatch(); + let futureEvent = this._eventQueue.shift(); + while (!abortSignal.aborted) { + try { + const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime; + // If there aren't any events in the batch, wait the maximum amount of time for an event. + const maximumTimeToWaitForEvent = batch.count + ? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0) + : this._maxWaitTimeInMs; + + // Wait for either the next event, or for the allotted time to pass. + const event = await Promise.race([ + futureEvent, + wait(maximumTimeToWaitForEvent, abortSignal) + ]); + + if (!event) { + // We didn't receive an event within the allotted time. + // Send the existing batch if it has events in it. + if (batch.count) { + await this._producer.sendBatch(batch, { abortSignal }); + batch = await this._createBatch(); + } + continue; + } else { + // We received an event, so get a promise for the next one. + futureEvent = this._eventQueue.shift(); + } + + // Attempt to add the event to the existing batch. + const didAdd = batch.tryAdd(event); + + // If the event was added to the batch and we're now + // at the max batch size, send the batch. + if (didAdd && batch.count >= this._maxBatchSize) { + await this._producer.sendBatch(batch, { abortSignal }); + batch = await this._createBatch(); + } else if (!didAdd && batch.count) { + // If the event wasn't able to be added and the current + // batch isn't empty, attempt to send the current batch + // and add the event to a new batch. + await this._producer.sendBatch(batch, { abortSignal }); + batch = await this._createBatch(); + // If the event still can't be added to an empty batch, just ignore it. + batch.tryAdd(event); + } + } catch (err) { + // Ignore `AbortError` since that gets thrown when `stop()` is called. + if (err.name !== "AbortError") { + console.error(`Encountered error: ${err}`); + } + } + } + } + + /** + * Helper method that sets the lastBatchCreationTime and returns a new batch. + */ + private _createBatch(): Promise { + this._lastBatchCreationTime = Date.now(); + return this._producer.createBatch(); + } +} + +/** + * This function returns a promise that resolves after the specified amount of time. + * It also supports cancellation via passing in an `abortSignal`. + * @param timeInMs The amount of time in milliseconds the function should wait before resolving. + * @param abortSignal Used to support rejecting the promise immediately. + */ +function wait(timeInMs: number, abortSignal: AbortSignalLike): Promise { + return new Promise((resolve, reject) => { + // Cancel quickly if the provided abortSignal has already been aborted. + if (abortSignal.aborted) { + return reject(new AbortError("The operation was cancelled.")); + } + // Create an abort event listener that rejects the promise with an AbortError. + // It also clears the existing setTimeout and removes itself from the abortSignal. + const abortListener = () => { + clearTimeout(tid); + reject(new AbortError("This operation was cancelled.")); + abortSignal.removeEventListener("abort", abortListener); + }; + // Create the timer that will resolve the promise. + // It also ensures that abort event listener is removed from the abortSignal. + const tid = setTimeout(() => { + abortSignal.removeEventListener("abort", abortListener); + resolve(); + }, timeInMs); + // Add an abort listener so that the promise can be rejected if the user cancels their operation. + abortSignal.addEventListener("abort", abortListener); + }); +} + +/** + * `AwaitableQueue` stores items in the order that they are received. + * + * This differs from ordinary Queues in that `shift` returns a Promise for a value. + * This allows a consumer of the queue to request an item that the queue does not yet have. + */ +class AwaitableQueue { + private readonly _items: T[]; + + private _nextItemResolve?: (item: T) => void; + private _nextItemPromise?: Promise; + + constructor(items?: T[]) { + this._items = items ?? []; + } + + public size(): number { + return this._items.length; + } + + /** + * Returns a Promise that will resolve with the first item in the queue. + */ + public shift(): Promise { + if (this._nextItemPromise) { + return this._nextItemPromise; + } + + const item = this._items.shift(); + if (typeof item !== "undefined") { + return Promise.resolve(item); + } + + this._nextItemPromise = new Promise((resolve) => (this._nextItemResolve = resolve)); + + return this._nextItemPromise; + } + + /** + * Appends new item to the queue. + * @param item + */ + public push(item: T): void { + if (!this._resolveNextItem(item)) { + this._items.push(item); + } + } + + private _resolveNextItem(item: T) { + if (!this._nextItemResolve) { + return false; + } + const resolve = this._nextItemResolve; + this._nextItemResolve = undefined; + this._nextItemPromise = undefined; + resolve(item); + return true; + } +} diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts new file mode 100644 index 000000000000..5902e3701af1 --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts @@ -0,0 +1,55 @@ +/* + Copyright (c) Microsoft Corporation. + Licensed under the MIT Licence. + + This sample demonstrates how to send events to Event Hubs + from an express service. The service will take the HTTP body of + any request sent to `POST /ingest` and transform it before sending + it to Event Hubs. + + As events are handed to the `AsyncBatchingProducer` via the `send()` call, + the producer will ensure that events are sent in the same batch so long as: + 1. The batch has enough space for additional events. + 2. the maxBatchSize is not exceeded by adding an event. + 3. The elapsed time since the last batch was sent does not exceed the maxWaitTimeInSeconds. + Once any of these conditions are met, a new batch is created and the cycle continues. +*/ + +import { v4 as uuid } from "uuid"; +import { AsyncBatchingProducer } from "./asyncBatchingProducer"; +import bodyParser from "body-parser"; +import express from "express"; +import { EventHubProducerClient } from "@azure/event-hubs"; +const app = express(); + +const eventHubConnectionString = "my connection string"; +const eventHubName = "my event hub name"; +const maxBatchSendSize = 20; +const maxWaitTimeInSeconds = 10; +const eventProducer = new AsyncBatchingProducer({ + producer: new EventHubProducerClient(eventHubConnectionString, eventHubName), + maxWaitTimeInSeconds: maxWaitTimeInSeconds, + maxBatchSize: maxBatchSendSize +}); +const port = 8080; + +app.use(bodyParser.urlencoded({ extended: false })); +app.use(bodyParser.json()); +// respond with requestId +app.post("/ingest", async (req, res) => { + const requestId = uuid(); + await eventProducer.send({ + properties: { + request_id: requestId + }, + body: req.body + }); + res.send(`ingested event. requestId: ${requestId}`); +}); + +// Enable sending events to an Event Hub based on the maxWaitTimeInSeconds and maxBatchSize. +eventProducer.start(); + +app.listen(port, () => { + console.log(`Example app listening at http://localhost:${port}`); +}); diff --git a/sdk/eventhub/event-hubs/samples/expressSample/tsconfig.json b/sdk/eventhub/event-hubs/samples/expressSample/tsconfig.json new file mode 100644 index 000000000000..a3ef806cfb5e --- /dev/null +++ b/sdk/eventhub/event-hubs/samples/expressSample/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "module": "commonjs", + "moduleResolution": "node", + + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**.ts"], + "exclude": ["node_modules"] +} +