diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts index 2533ba2fe0af..8d304ad67e41 100644 --- a/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts @@ -1,14 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT Licence. + import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs"; -import AsyncLock from "async-lock"; -import { v4 as uuid } from "uuid"; +const BatchSendNumber = 100; +type Task = { + event: EventData; + callback: () => void; +}; export class EventProducer { private producerClient: EventHubProducerClient; - private batch: EventDataBatch; + private awaitableBatch: Promise; private count: number; - private lock: AsyncLock; - private lockKey: string; private lastBatchCreatedTime: number; + private tasks: Task[]; + private isSending: boolean; constructor( eventHubConnectString: string, @@ -18,9 +24,10 @@ export class EventProducer { ) { this.producerClient = new EventHubProducerClient(eventHubConnectString, eventHubName); this.count = 0; - this.lock = new AsyncLock(); - this.lockKey = `createBatch-${uuid()}`; this.lastBatchCreatedTime = Date.now(); + this.tasks = []; + this.isSending = false; + this.awaitableBatch = this.producerClient.createBatch(); } private constructEventData(requestId: string, payload: any): EventData { @@ -33,30 +40,51 @@ export class EventProducer { } public async send(requestId: string, payload: any) { - if (this.batch === undefined) { - this.batch = await this.lock.acquire(this.lockKey, () => { - if (this.batch === undefined) { - console.log(`Acquire lock ${this.lockKey} and create a new batch`); - this.lastBatchCreatedTime = Date.now(); - return this.producerClient.createBatch(); + const eventData = this.constructEventData(requestId, payload); + let callback: () => void; + const promise = new Promise((resolve) => (callback = resolve)); + this.tasks.push({ event: eventData, callback }); + await this.sendBatchLoop(); + await promise; + } + + private async sendBatchLoop(): Promise { + if (this.isSending) { + return; + } + this.isSending = true; + while (this.tasks.length > 0) { + let batch = await this.getOrCreateBatch(); + const tasks: Task[] = []; + this.awaitableBatch = undefined; + while (this.tasks.length > 0 && batch.count < BatchSendNumber) { + const task = this.tasks[0]; + if (batch.tryAdd(task.event)) { + tasks.push(task); + this.tasks.shift(); } else { - console.log( - `Acquire lock ${this.lockKey} and skip create. this.batch count: ${this.batch.count}` - ); - return this.batch; + break; } - }); + } + await this.sendBatch(batch); + for (const it of tasks) { + it.callback(); + } } - const isAdded = this.batch.tryAdd(this.constructEventData(requestId, payload)); + this.awaitableBatch = this.producerClient.createBatch(); + this.isSending = false; + } - if (!isAdded || this.batch.count >= this.batchSendNumber) { - const curBatch = this.batch; - this.batch = undefined; - await this.sendBatch(curBatch); - if (!isAdded) { - console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`); - } + private async getOrCreateBatch(): Promise { + // Check if there is an existing promise for a batch. + if (this.awaitableBatch) { + return this.awaitableBatch; } + + console.log(`Creating a new batch.`); + this.lastBatchCreatedTime = Date.now(); + this.awaitableBatch = this.producerClient.createBatch(); + return this.awaitableBatch; } private async sendBatch(curBatch: EventDataBatch) { @@ -69,19 +97,6 @@ export class EventProducer { } } - public timeScan() { - setInterval(async () => { - if (Date.now() - this.lastBatchCreatedTime >= this.sendBatchTimeIntervalSeconds * 1000) { - if (this.batch !== undefined) { - const curBatch = this.batch; - this.batch = undefined; - console.log(`time trigger send batch. count: ${curBatch.count}`); - await this.sendBatch(curBatch); - } - } - }, this.sendBatchTimeIntervalSeconds * 1000); - } - public stop() { this.producerClient.close(); } diff --git a/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts b/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts index 4b8f4f75ac97..1c49112a14bc 100644 --- a/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts +++ b/sdk/eventhub/event-hubs/samples/expressSample/src/index.ts @@ -27,7 +27,6 @@ app.post("/ingest", async (req, res) => { }); // enable timeTrigger for eventHub. -eventProducer.timeScan(); app.listen(port, () => { console.log(`Example app listening at http://localhost:${port}`);