Skip to content

Commit

Permalink
[event-hubs] introducing the AsyncBatchingProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
chradek committed Dec 2, 2020
1 parent a66efb4 commit 51eda14
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller";
import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";


export interface AsyncBatchingProducerOptions {
producer: EventHubProducerClient;
maxWaitTimeInSeconds: number;
Expand All @@ -15,7 +14,7 @@ export class AsyncBatchingProducer {
private _maxBatchSize: number;
private _maxWaitTimeInMs: number;
private _producer: EventHubProducerClient;

constructor(options: AsyncBatchingProducerOptions) {
this._maxBatchSize = options.maxBatchSize ?? Infinity;
this._maxWaitTimeInMs = options.maxWaitTimeInSeconds * 1000;
Expand All @@ -24,7 +23,7 @@ export class AsyncBatchingProducer {

/**
* Queues up the eventData so it can be sent to Event Hubs.
* @param eventData
* @param eventData
*/
public send(eventData: EventData) {
this._eventQueue.push(eventData);
Expand All @@ -50,10 +49,15 @@ export class AsyncBatchingProducer {
try {
const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime;
// If there aren't any events in the batch, wait the maximum amount of time for an event.
const maximumTimeToWaitForEvent = batch.count ? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0) : this._maxWaitTimeInMs;
const maximumTimeToWaitForEvent = batch.count
? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0)
: this._maxWaitTimeInMs;

// Wait for either the next event, or for the allotted time to pass.
const event = await Promise.race([futureEvent, wait(maximumTimeToWaitForEvent, abortSignal)]);
const event = await Promise.race([
futureEvent,
wait(maximumTimeToWaitForEvent, abortSignal)
]);

if (!event) {
// We didn't receive an event within the allotted time.
Expand All @@ -70,13 +74,13 @@ export class AsyncBatchingProducer {

// Attempt to add the event to the existing batch.
const didAdd = batch.tryAdd(event);

// If the event was added to the batch and we're now
// at the max batch size, send the batch.
if (didAdd && batch.count >= this._maxBatchSize) {
await this._producer.sendBatch(batch, { abortSignal })
await this._producer.sendBatch(batch, { abortSignal });
batch = await this._createBatch();
} else if ((!didAdd && batch.count)) {
} else if (!didAdd && batch.count) {
// If the event wasn't able to be added and the current
// batch isn't empty, attempt to send the current batch
// and add the event to a new batch.
Expand Down Expand Up @@ -113,23 +117,23 @@ function wait(timeInMs: number, abortSignal: AbortSignalLike): Promise<void> {
return new Promise((resolve, reject) => {
// Cancel quickly if the provided abortSignal has already been aborted.
if (abortSignal.aborted) {
return reject(new AbortError('The operation was cancelled.'));
return reject(new AbortError("The operation was cancelled."));
}
// Create an abort event listener that rejects the promise with an AbortError.
// It also clears the existing setTimeout and removes itself from the abortSignal.
const abortListener = () => {
clearTimeout(tid);
reject(new AbortError('This operation was cancelled.'));
abortSignal.removeEventListener('abort', abortListener);
}
reject(new AbortError("This operation was cancelled."));
abortSignal.removeEventListener("abort", abortListener);
};
// Create the timer that will resolve the promise.
// It also ensures that abort event listener is removed from the abortSignal.
const tid = setTimeout(() => {
abortSignal.removeEventListener('abort', abortListener);
abortSignal.removeEventListener("abort", abortListener);
resolve();
}, timeInMs);
// Add an abort listener so that the promise can be rejected if the user cancels their operation.
abortSignal.addEventListener('abort', abortListener);
abortSignal.addEventListener("abort", abortListener);
});
}

Expand Down Expand Up @@ -191,4 +195,4 @@ class AwaitableQueue<T> {
resolve(item);
return true;
}
}
}
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/samples/expressSample/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import express from "express";
import { EventHubProducerClient } from "@azure/event-hubs";
const app = express();

const eventHubConnectionString = "Endpoint=sb://chradek-ehub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Wy4gW3IYq8j/MD18NUUUqUGn3cspNIfHdqwZhuh+9pI=";
const eventHubConnectionString =
"Endpoint=sb://chradek-ehub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Wy4gW3IYq8j/MD18NUUUqUGn3cspNIfHdqwZhuh+9pI=";
const eventHubName = "sdk-issue";
const batchSendSize = 3;
const timeIntervalSeconds = 10;
Expand Down

0 comments on commit 51eda14

Please sign in to comment.