Skip to content

Commit

Permalink
add express sample for event hub (#12246)
Browse files Browse the repository at this point in the history
* add express sample

* update README.md

* update doc

* update doc

* add apiref

* update curl sample

* [event-hubs] express sample with sendBatch

* remove duplicated eventProducer

* update readme fix broken link

* Update sdk/eventhub/event-hubs/samples/expressSample/src/index.ts

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/README.md

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/README.md

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/README.md

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/README.md

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/README.md

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts

Co-authored-by: chradek <[email protected]>

* Update sdk/eventhub/event-hubs/samples/expressSample/src/index.ts

Co-authored-by: chradek <[email protected]>

* sample code batchSendSize=20

* renaming

* remove metadata from readme

Co-authored-by: chradek <[email protected]>
Co-authored-by: chradek <[email protected]>
  • Loading branch information
3 people authored Dec 17, 2020
1 parent 048b8cc commit c968d82
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 0 deletions.
45 changes: 45 additions & 0 deletions sdk/eventhub/event-hubs/samples/expressSample/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"name": "azure-event-hubs-samples-express",
"private": true,
"version": "1.0.0",
"description": "Azure Event Hubs client library samples with Express",
"engine": {
"node": ">=8.0.0"
},
"scripts": {
"build": "tsc",
"prebuild": "rimraf dist/",
"start": "node ./dist/index.js"
},
"repository": {
"type": "git",
"url": "git+https://github.com/Azure/azure-sdk-for-js.git"
},
"keywords": [
"Azure",
"Event Hubs",
"Node.js",
"TypeScript",
"Express"
],
"author": "Microsoft Corporation",
"license": "MIT",
"bugs": {
"url": "https://github.com/Azure/azure-sdk-for-js/issues"
},
"homepage": "https://github.com/Azure/azure-sdk-for-js#readme",
"sideEffects": false,
"dependencies": {
"@azure/event-hubs": "^5.3.0",
"body-parser": "^1.19.0",
"express": "^4.17.1",
"uuid": "^8.3.1"
},
"devDependencies": {
"@types/body-parser": "^1.19.0",
"@types/express": "^4.17.9",
"@types/node": "^12.12.17",
"rimraf": "^3.0.0",
"typescript": "^3.7.2"
}
}
54 changes: 54 additions & 0 deletions sdk/eventhub/event-hubs/samples/expressSample/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Azure Event Hubs client library express samples for Typescript

This sample programs show how to use the JavaScript client libraries for Azure Event Hubs to send events in the node express framework.
One scenario is building an HTTP-based service that accepts events as part of an HTTP request, then transforms and sends those events into a downstream Event Hub.

| **File Name** | **Description** |
| --------------------------------- | ------------------------------------------------------------------------------------------------------------------------------ |
| [asyncBatchingProducer.ts][eventproducer] | Demonstrates how the send() function can be used to send events to an Event Hub instance. Support batch send and time trigger. |
| [index.ts][index] | Express Http server entry point. Receive http payload and use AsyncBatchingProducer to ingest payload to eventHub. |

## Prerequisites

The samples are compatible with Node.js >= 8.0.0 and run in express.

You need [an Azure subscription][freesub] and [an Azure Event Hub resource][azhubacct] to run these sample programs.

## Setup

To run the samples using the published version of the package:

1. Install the dependencies using `npm`:

```bash
npm install
```

2. Compile the sample to JavaScript by running the following command:

```bash
npm run build
```

3. Start the node service on http://localhost:8080:

```bash
npm start
```

4. Call local http server

```bash
curl --header "Content-Type: application/json" \
--request POST \
--data '{"hello":"world"}' \
http://localhost:8080/ingest
```

## Next Steps

Take a look at our [API Documentation][apiref] for more information about the APIs that are available in the clients.

[eventproducer]: https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/eventhub/event-hubs/samples/expressSample/src/asyncBatchingProducer.ts
[index]: https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts
[apiref]: https://docs.microsoft.com/javascript/api/@azure/event-hubs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/**
Copyright (c) Microsoft Corporation.
Licensed under the MIT Licence.
This sample demonstrates a strategy for creating and sending
batches of events to Event Hubs.
The AsyncBatchingProducer optimizes for creating the fewest
number of batches possible while sending events.
It supports setting thresholds for both the maximum number of
events allowed per batch, and the maximum amount of time
between sending batches.
*/

import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller";
import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";

export interface AsyncBatchingProducerOptions {
producer: EventHubProducerClient;
maxWaitTimeInSeconds: number;
maxBatchSize?: number;
}

export class AsyncBatchingProducer {
private _abortController = new AbortController();
private _lastBatchCreationTime: number = 0;
private _eventQueue: AwaitableQueue<EventData> = new AwaitableQueue();
private _maxBatchSize: number;
private _maxWaitTimeInMs: number;
private _producer: EventHubProducerClient;

constructor(options: AsyncBatchingProducerOptions) {
this._maxBatchSize = options.maxBatchSize ?? Infinity;
this._maxWaitTimeInMs = options.maxWaitTimeInSeconds * 1000;
this._producer = options.producer;
}

/**
* Queues up the eventData so it can be sent to Event Hubs.
* @param eventData
*/
public send(eventData: EventData) {
this._eventQueue.push(eventData);
}

/**
* Stops the `AsyncBatchingProducer` from sending anymore events to Event Hubs.
*/
public stop() {
this._abortController.abort();
return this._producer.close();
}

/**
* Starts sending events to Event Hubs in the order they were received via `send()` calls.
* This method will run continuously until `stop()` is called.
*/
async start() {
const abortSignal = this._abortController.signal;
let batch = await this._createBatch();
let futureEvent = this._eventQueue.shift();
while (!abortSignal.aborted) {
try {
const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime;
// If there aren't any events in the batch, wait the maximum amount of time for an event.
const maximumTimeToWaitForEvent = batch.count
? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0)
: this._maxWaitTimeInMs;

// Wait for either the next event, or for the allotted time to pass.
const event = await Promise.race([
futureEvent,
wait(maximumTimeToWaitForEvent, abortSignal)
]);

if (!event) {
// We didn't receive an event within the allotted time.
// Send the existing batch if it has events in it.
if (batch.count) {
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
}
continue;
} else {
// We received an event, so get a promise for the next one.
futureEvent = this._eventQueue.shift();
}

// Attempt to add the event to the existing batch.
const didAdd = batch.tryAdd(event);

// If the event was added to the batch and we're now
// at the max batch size, send the batch.
if (didAdd && batch.count >= this._maxBatchSize) {
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
} else if (!didAdd && batch.count) {
// If the event wasn't able to be added and the current
// batch isn't empty, attempt to send the current batch
// and add the event to a new batch.
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
// If the event still can't be added to an empty batch, just ignore it.
batch.tryAdd(event);
}
} catch (err) {
// Ignore `AbortError` since that gets thrown when `stop()` is called.
if (err.name !== "AbortError") {
console.error(`Encountered error: ${err}`);
}
}
}
}

/**
* Helper method that sets the lastBatchCreationTime and returns a new batch.
*/
private _createBatch(): Promise<EventDataBatch> {
this._lastBatchCreationTime = Date.now();
return this._producer.createBatch();
}
}

/**
* This function returns a promise that resolves after the specified amount of time.
* It also supports cancellation via passing in an `abortSignal`.
* @param timeInMs The amount of time in milliseconds the function should wait before resolving.
* @param abortSignal Used to support rejecting the promise immediately.
*/
function wait(timeInMs: number, abortSignal: AbortSignalLike): Promise<void> {
return new Promise((resolve, reject) => {
// Cancel quickly if the provided abortSignal has already been aborted.
if (abortSignal.aborted) {
return reject(new AbortError("The operation was cancelled."));
}
// Create an abort event listener that rejects the promise with an AbortError.
// It also clears the existing setTimeout and removes itself from the abortSignal.
const abortListener = () => {
clearTimeout(tid);
reject(new AbortError("This operation was cancelled."));
abortSignal.removeEventListener("abort", abortListener);
};
// Create the timer that will resolve the promise.
// It also ensures that abort event listener is removed from the abortSignal.
const tid = setTimeout(() => {
abortSignal.removeEventListener("abort", abortListener);
resolve();
}, timeInMs);
// Add an abort listener so that the promise can be rejected if the user cancels their operation.
abortSignal.addEventListener("abort", abortListener);
});
}

/**
* `AwaitableQueue` stores items in the order that they are received.
*
* This differs from ordinary Queues in that `shift` returns a Promise for a value.
* This allows a consumer of the queue to request an item that the queue does not yet have.
*/
class AwaitableQueue<T> {
private readonly _items: T[];

private _nextItemResolve?: (item: T) => void;
private _nextItemPromise?: Promise<T>;

constructor(items?: T[]) {
this._items = items ?? [];
}

public size(): number {
return this._items.length;
}

/**
* Returns a Promise that will resolve with the first item in the queue.
*/
public shift(): Promise<T> {
if (this._nextItemPromise) {
return this._nextItemPromise;
}

const item = this._items.shift();
if (typeof item !== "undefined") {
return Promise.resolve(item);
}

this._nextItemPromise = new Promise<T>((resolve) => (this._nextItemResolve = resolve));

return this._nextItemPromise;
}

/**
* Appends new item to the queue.
* @param item
*/
public push(item: T): void {
if (!this._resolveNextItem(item)) {
this._items.push(item);
}
}

private _resolveNextItem(item: T) {
if (!this._nextItemResolve) {
return false;
}
const resolve = this._nextItemResolve;
this._nextItemResolve = undefined;
this._nextItemPromise = undefined;
resolve(item);
return true;
}
}
55 changes: 55 additions & 0 deletions sdk/eventhub/event-hubs/samples/expressSample/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT Licence.
This sample demonstrates how to send events to Event Hubs
from an express service. The service will take the HTTP body of
any request sent to `POST /ingest` and transform it before sending
it to Event Hubs.
As events are handed to the `AsyncBatchingProducer` via the `send()` call,
the producer will ensure that events are sent in the same batch so long as:
1. The batch has enough space for additional events.
2. the maxBatchSize is not exceeded by adding an event.
3. The elapsed time since the last batch was sent does not exceed the maxWaitTimeInSeconds.
Once any of these conditions are met, a new batch is created and the cycle continues.
*/

import { v4 as uuid } from "uuid";
import { AsyncBatchingProducer } from "./asyncBatchingProducer";
import bodyParser from "body-parser";
import express from "express";
import { EventHubProducerClient } from "@azure/event-hubs";
const app = express();

const eventHubConnectionString = "my connection string";
const eventHubName = "my event hub name";
const maxBatchSendSize = 20;
const maxWaitTimeInSeconds = 10;
const eventProducer = new AsyncBatchingProducer({
producer: new EventHubProducerClient(eventHubConnectionString, eventHubName),
maxWaitTimeInSeconds: maxWaitTimeInSeconds,
maxBatchSize: maxBatchSendSize
});
const port = 8080;

app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
// respond with requestId
app.post("/ingest", async (req, res) => {
const requestId = uuid();
await eventProducer.send({
properties: {
request_id: requestId
},
body: req.body
});
res.send(`ingested event. requestId: ${requestId}`);
});

// Enable sending events to an Event Hub based on the maxWaitTimeInSeconds and maxBatchSize.
eventProducer.start();

app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`);
});
15 changes: 15 additions & 0 deletions sdk/eventhub/event-hubs/samples/expressSample/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"compilerOptions": {
"module": "commonjs",
"moduleResolution": "node",

"allowSyntheticDefaultImports": true,
"esModuleInterop": true,

"outDir": "dist",
"rootDir": "src"
},
"include": ["src/**.ts"],
"exclude": ["node_modules"]
}

0 comments on commit c968d82

Please sign in to comment.