Skip to content

Commit

Permalink
Merge pull request #1 from chradek/eh-express-sample
Browse files Browse the repository at this point in the history
[event-hubs] express sample with sendBatch
  • Loading branch information
ruowan authored Dec 8, 2020
2 parents a62412f + 586d02c commit 0e64fde
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 51 deletions.
39 changes: 32 additions & 7 deletions sdk/eventhub/event-hubs/samples/expressSample/package.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
{
"name": "expressSample",
"name": "azure-event-hubs-samples-express",
"private": true,
"version": "1.0.0",
"description": "",
"main": "index.js",
"description": "Azure Event Hubs client library samples with Express",
"engine": {
"node": ">=8.0.0"
},
"scripts": {
"build": "tsc",
"prebuild": "rimraf dist/",
"start": "node ./dist/index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"repository": {
"type": "git",
"url": "git+https://github.com/Azure/azure-sdk-for-js.git"
},
"keywords": [
"Azure",
"Event Hubs",
"Node.js",
"TypeScript",
"Express"
],
"author": "Microsoft Corporation",
"license": "MIT",
"bugs": {
"url": "https://github.com/Azure/azure-sdk-for-js/issues"
},
"homepage": "https://github.com/Azure/azure-sdk-for-js#readme",
"sideEffects": false,
"dependencies": {
"@azure/event-hubs": "^5.3.0",
"async-lock": "^1.2.4",
"body-parser": "^1.19.0",
"express": "^4.17.1",
"uuid": "^8.3.1"
},
"devDependencies": {
"@types/body-parser": "^1.19.0",
"@types/express": "^4.17.9",
"@types/node": "^12.12.17",
"rimraf": "^3.0.0",
"typescript": "^3.7.2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller";
import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";

export interface AsyncBatchingProducerOptions {
producer: EventHubProducerClient;
maxWaitTimeInSeconds: number;
maxBatchSize?: number;
}

export class AsyncBatchingProducer {
private _abortController = new AbortController();
private _lastBatchCreationTime: number = 0;
private _eventQueue: AwaitableQueue<EventData> = new AwaitableQueue();
private _maxBatchSize: number;
private _maxWaitTimeInMs: number;
private _producer: EventHubProducerClient;

constructor(options: AsyncBatchingProducerOptions) {
this._maxBatchSize = options.maxBatchSize ?? Infinity;
this._maxWaitTimeInMs = options.maxWaitTimeInSeconds * 1000;
this._producer = options.producer;
}

/**
* Queues up the eventData so it can be sent to Event Hubs.
* @param eventData
*/
public send(eventData: EventData) {
this._eventQueue.push(eventData);
}

/**
* Stops the `AsyncBatchingProducer` from sending anymore events to Event Hubs.
*/
public stop() {
this._abortController.abort();
return this._producer.close();
}

/**
* Starts sending events to Event Hubs in the order they were received via `send()` calls.
* This method will run continuously until `stop()` is called.
*/
async start() {
const abortSignal = this._abortController.signal;
let batch = await this._createBatch();
let futureEvent = this._eventQueue.shift();
while (!abortSignal.aborted) {
try {
const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime;
// If there aren't any events in the batch, wait the maximum amount of time for an event.
const maximumTimeToWaitForEvent = batch.count
? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0)
: this._maxWaitTimeInMs;

// Wait for either the next event, or for the allotted time to pass.
const event = await Promise.race([
futureEvent,
wait(maximumTimeToWaitForEvent, abortSignal)
]);

if (!event) {
// We didn't receive an event within the allotted time.
// Send the existing batch if it has events in it.
if (batch.count) {
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
}
continue;
} else {
// We received an event, so get a promise for the next one.
futureEvent = this._eventQueue.shift();
}

// Attempt to add the event to the existing batch.
const didAdd = batch.tryAdd(event);

// If the event was added to the batch and we're now
// at the max batch size, send the batch.
if (didAdd && batch.count >= this._maxBatchSize) {
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
} else if (!didAdd && batch.count) {
// If the event wasn't able to be added and the current
// batch isn't empty, attempt to send the current batch
// and add the event to a new batch.
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
// If the event still can't be added to an empty batch, just ignore it.
batch.tryAdd(event);
}
} catch (err) {
// Ignore `AbortError` since that gets thrown when `stop()` is called.
if (err.name !== "AbortError") {
console.error(`Encountered error: ${err}`);
}
}
}
}

/**
* Helper method that sets the lastBatchCreationTime and returns a new batch.
*/
private _createBatch(): Promise<EventDataBatch> {
this._lastBatchCreationTime = Date.now();
return this._producer.createBatch();
}
}

/**
* This function returns a promise that resolves after the specified amount of time.
* It also supports cancellation via passing in an `abortSignal`.
* @param timeInMs The amount of time in milliseconds the function should wait before resolving.
* @param abortSignal Used to support rejecting the promise immediately.
*/
function wait(timeInMs: number, abortSignal: AbortSignalLike): Promise<void> {
return new Promise((resolve, reject) => {
// Cancel quickly if the provided abortSignal has already been aborted.
if (abortSignal.aborted) {
return reject(new AbortError("The operation was cancelled."));
}
// Create an abort event listener that rejects the promise with an AbortError.
// It also clears the existing setTimeout and removes itself from the abortSignal.
const abortListener = () => {
clearTimeout(tid);
reject(new AbortError("This operation was cancelled."));
abortSignal.removeEventListener("abort", abortListener);
};
// Create the timer that will resolve the promise.
// It also ensures that abort event listener is removed from the abortSignal.
const tid = setTimeout(() => {
abortSignal.removeEventListener("abort", abortListener);
resolve();
}, timeInMs);
// Add an abort listener so that the promise can be rejected if the user cancels their operation.
abortSignal.addEventListener("abort", abortListener);
});
}

/**
* `AwaitableQueue` stores items in the order that they are received.
*
* This differs from ordinary Queues in that `shift` returns a Promise for a value.
* This allows a consumer of the queue to request an item that the queue does not yet have.
*/
class AwaitableQueue<T> {
private readonly _items: T[];

private _nextItemResolve?: (item: T) => void;
private _nextItemPromise?: Promise<T>;

constructor(items?: T[]) {
this._items = items ?? [];
}

public size(): number {
return this._items.length;
}

/**
* Returns a Promise that will resolve with the first item in the queue.
*/
public shift(): Promise<T> {
if (this._nextItemPromise) {
return this._nextItemPromise;
}

const item = this._items.shift();
if (typeof item !== "undefined") {
return Promise.resolve(item);
}

this._nextItemPromise = new Promise<T>((resolve) => (this._nextItemResolve = resolve));

return this._nextItemPromise;
}

/**
* Appends new item to the queue.
* @param item
*/
public push(item: T): void {
if (!this._resolveNextItem(item)) {
this._items.push(item);
}
}

private _resolveNextItem(item: T) {
if (!this._nextItemResolve) {
return false;
}
const resolve = this._nextItemResolve;
this._nextItemResolve = undefined;
this._nextItemPromise = undefined;
resolve(item);
return true;
}
}
59 changes: 29 additions & 30 deletions sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";
import AsyncLock from "async-lock";
import { v4 as uuid } from "uuid";

export class EventProducer {
private producerClient: EventHubProducerClient;
private batch: EventDataBatch;
private awaitableBatch: Promise<EventDataBatch>;
private count: number;
private lock: AsyncLock;
private lockKey: string;
private lastBatchCreatedTime: number;

constructor(
Expand All @@ -18,8 +17,6 @@ export class EventProducer {
) {
this.producerClient = new EventHubProducerClient(eventHubConnectString, eventHubName);
this.count = 0;
this.lock = new AsyncLock();
this.lockKey = `createBatch-${uuid()}`;
this.lastBatchCreatedTime = Date.now();
}

Expand All @@ -33,32 +30,34 @@ export class EventProducer {
}

public async send(requestId: string, payload: any) {
if (this.batch === undefined) {
this.batch = await this.lock.acquire(this.lockKey, () => {
if (this.batch === undefined) {
console.log(`Acquire lock ${this.lockKey} and create a new batch`);
this.lastBatchCreatedTime = Date.now();
return this.producerClient.createBatch();
} else {
console.log(
`Acquire lock ${this.lockKey} and skip create. this.batch count: ${this.batch.count}`
);
return this.batch;
}
});
}
const isAdded = this.batch.tryAdd(this.constructEventData(requestId, payload));
const batch = await this.getOrCreateBatch();
const eventData = this.constructEventData(requestId, payload)
const isAdded = batch.tryAdd(eventData);

if (!isAdded || this.batch.count >= this.batchSendNumber) {
const curBatch = this.batch;
this.batch = undefined;
await this.sendBatch(curBatch);
if (!isAdded || batch.count >= this.batchSendNumber) {
this.awaitableBatch = undefined;
await this.sendBatch(batch);
if (!isAdded) {
console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`);
const batch = await this.getOrCreateBatch();
if (!batch.tryAdd(eventData)) {
console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`);
}
}
}
}

private async getOrCreateBatch(): Promise<EventDataBatch> {
// Check if there is an existing promise for a batch.
if (this.awaitableBatch) {
return this.awaitableBatch;
}

console.log(`Creating a new batch.`);
this.lastBatchCreatedTime = Date.now();
this.awaitableBatch = this.producerClient.createBatch();
return this.awaitableBatch;
}

private async sendBatch(curBatch: EventDataBatch) {
try {
await this.producerClient.sendBatch(curBatch);
Expand All @@ -72,9 +71,9 @@ export class EventProducer {
public timeScan() {
setInterval(async () => {
if (Date.now() - this.lastBatchCreatedTime >= this.sendBatchTimeIntervalSeconds * 1000) {
if (this.batch !== undefined) {
const curBatch = this.batch;
this.batch = undefined;
if (this.awaitableBatch !== undefined) {
const curBatch = await this.awaitableBatch;
this.awaitableBatch = undefined;
console.log(`time trigger send batch. count: ${curBatch.count}`);
await this.sendBatch(curBatch);
}
Expand Down
Loading

0 comments on commit 0e64fde

Please sign in to comment.