Skip to content

Commit

Permalink
update eventProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
ruowan committed Dec 2, 2020
1 parent a62412f commit e631cf8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 40 deletions.
93 changes: 54 additions & 39 deletions sdk/eventhub/event-hubs/samples/expressSample/src/eventProducer.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT Licence.

import { EventData, EventDataBatch, EventHubProducerClient } from "@azure/event-hubs";
import AsyncLock from "async-lock";
import { v4 as uuid } from "uuid";

const BatchSendNumber = 100;
type Task = {
event: EventData;
callback: () => void;
};
export class EventProducer {
private producerClient: EventHubProducerClient;
private batch: EventDataBatch;
private awaitableBatch: Promise<EventDataBatch>;
private count: number;
private lock: AsyncLock;
private lockKey: string;
private lastBatchCreatedTime: number;
private tasks: Task[];
private isSending: boolean;

constructor(
eventHubConnectString: string,
Expand All @@ -18,9 +24,10 @@ export class EventProducer {
) {
this.producerClient = new EventHubProducerClient(eventHubConnectString, eventHubName);
this.count = 0;
this.lock = new AsyncLock();
this.lockKey = `createBatch-${uuid()}`;
this.lastBatchCreatedTime = Date.now();
this.tasks = [];
this.isSending = false;
this.awaitableBatch = this.producerClient.createBatch();
}

private constructEventData(requestId: string, payload: any): EventData {
Expand All @@ -33,30 +40,51 @@ export class EventProducer {
}

public async send(requestId: string, payload: any) {
if (this.batch === undefined) {
this.batch = await this.lock.acquire(this.lockKey, () => {
if (this.batch === undefined) {
console.log(`Acquire lock ${this.lockKey} and create a new batch`);
this.lastBatchCreatedTime = Date.now();
return this.producerClient.createBatch();
const eventData = this.constructEventData(requestId, payload);
let callback: () => void;
const promise = new Promise((resolve) => (callback = resolve));
this.tasks.push({ event: eventData, callback });
await this.sendBatchLoop();
await promise;
}

private async sendBatchLoop(): Promise<void> {
if (this.isSending) {
return;
}
this.isSending = true;
while (this.tasks.length > 0) {
let batch = await this.getOrCreateBatch();
const tasks: Task[] = [];
this.awaitableBatch = undefined;
while (this.tasks.length > 0 && batch.count < BatchSendNumber) {
const task = this.tasks[0];
if (batch.tryAdd(task.event)) {
tasks.push(task);
this.tasks.shift();
} else {
console.log(
`Acquire lock ${this.lockKey} and skip create. this.batch count: ${this.batch.count}`
);
return this.batch;
break;
}
});
}
await this.sendBatch(batch);
for (const it of tasks) {
it.callback();
}
}
const isAdded = this.batch.tryAdd(this.constructEventData(requestId, payload));
this.awaitableBatch = this.producerClient.createBatch();
this.isSending = false;
}

if (!isAdded || this.batch.count >= this.batchSendNumber) {
const curBatch = this.batch;
this.batch = undefined;
await this.sendBatch(curBatch);
if (!isAdded) {
console.log(`Up to batch size limit, add event failed. requestId: ${requestId}`);
}
private async getOrCreateBatch(): Promise<EventDataBatch> {
// Check if there is an existing promise for a batch.
if (this.awaitableBatch) {
return this.awaitableBatch;
}

console.log(`Creating a new batch.`);
this.lastBatchCreatedTime = Date.now();
this.awaitableBatch = this.producerClient.createBatch();
return this.awaitableBatch;
}

private async sendBatch(curBatch: EventDataBatch) {
Expand All @@ -69,19 +97,6 @@ export class EventProducer {
}
}

public timeScan() {
setInterval(async () => {
if (Date.now() - this.lastBatchCreatedTime >= this.sendBatchTimeIntervalSeconds * 1000) {
if (this.batch !== undefined) {
const curBatch = this.batch;
this.batch = undefined;
console.log(`time trigger send batch. count: ${curBatch.count}`);
await this.sendBatch(curBatch);
}
}
}, this.sendBatchTimeIntervalSeconds * 1000);
}

public stop() {
this.producerClient.close();
}
Expand Down
1 change: 0 additions & 1 deletion sdk/eventhub/event-hubs/samples/expressSample/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ app.post("/ingest", async (req, res) => {
});

// enable timeTrigger for eventHub.
eventProducer.timeScan();

app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`);
Expand Down

0 comments on commit e631cf8

Please sign in to comment.