Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] express sample with sendBatch #1

Merged
merged 1 commit into from
Dec 8, 2020

Conversation

chradek
Copy link

@chradek chradek commented Dec 1, 2020

@ruowan
I've had time to review this sample and noticed one scenario that isn't working as I'd expect.

If I start the server and run 2 curl requests in parallel, I see 2 batches created:

curl --header "Content-Type: application/json" --request POST --data '{"hello":"world"}' http://localhost:8080/ingest \
& curl --header "Content-Type: application/json" --request POST --data '{"hello":"world"}' http://localhost:8080/ingest

Unfortunately 1 overwrites the other, so one of the events in this case is lost.

The reason for this is because the awaiting this.lock.acquire doesn't yield before the lock from the 1st acquire call is given up to the 2nd acquire call. This issue will happen anytime there are multiple parallel requests that come in at the same time when batch is undefined.

I've fixed this by updating the sample to get rid of AsyncLock altogether and instead treat the promise returned from createBatch() as a lock.

Other than that, I've also made some changes to the package.json to match our other samples. We do still need to add comments around all the methods in EventHubProducer and a summary of what this sample is meant to showcase in the index.ts file as well.

@ruowan
Copy link
Owner

ruowan commented Dec 1, 2020

Great thanks for your review and contribution. I reproduce my error and test your code, it works as expected!

I do not quite understand why you git rid of the async lock by using promise?

Btw, I approved your PR. If no updates, I will merge it. You could review PR in Azure/azure-sdk-for-js repo.

private async getOrCreateBatch(): Promise<EventDataBatch> {
// Check if there is an existing promise for a batch.
if (this.awaitableBatch) {
return this.awaitableBatch;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a great implement! You don't use any await in this function. All codes in this function are executed sequentially. event loop will not switch and avoid risk conditions. 👍

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No await suggests the async keyword could be dropped here for smaller codegen.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does smaller codegen mean?

@chradek
Copy link
Author

chradek commented Dec 2, 2020

So I found one more issue with the sample.

In the send method, when we decide that the existing batch should be sent (because isAdded === false or batch.count >= this.batchSendNumber), we set this.awaitableBatch to undefined, then send the current batch. There is a race condition that can occur if multiple events are being ingested at the same time.

Let's say 2 send calls happen at the same time.
Both calls call await on this.getOrCreateBatch();
The 1st one will regain control of the event loop after awaiting the batch. But now assume the event is too large to fit in the batch. It sets this.awaitableBatch to undefined and sends the existing batch.
Now the 2nd call regains control while the 1st call is sending. It gets a reference to the batch that the 1st call is currently sending.
The 2nd call might also fail to add to the batch, in which case we'll attempt to send this batch a 2nd time.

So, I'm beginning to think this is more complicated to get right. I've come up with another solution that doesn't have either of these race conditions. Instead of having multiple invocations of a method attempt to correctly create/share batches, what if we have a method we only call once (in this case, I called it start()) and it is in charge of creating batches and sending them?

Since this is a large change I put this implementation in a separate file, asyncBatchingProducer.ts. Please let me know what you think. Is it clear how it works? Did I miss any other race conditions? My hope is that by having all the logic for creating and sending batches in a single method that's only invoked once, we'll get rid of the race conditions that were present in both our samples :)

*/
public stop() {
this._abortController.abort();
return this._producer.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you flush and send anything still held in _eventQueue? Or is it guaranteed to be empty at this point?

Copy link
Author

@chradek chradek Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be items in the queue still, that's not a bad idea to flush it at this point.
Edit: But I'd just empty the queue, not send events, since this method is supposed to shut everything down asap 😄

@ruowan
Copy link
Owner

ruowan commented Dec 2, 2020

Hi, @chradek . I think there are no other risk conditions now. The code has a lot of comments. I understand your code and it works well. But I am a little bit confused about abortController, could you pls explain more about it? Why and which scenario we need abortController? I just want to learn why the design?

Here is another implement PR that looks more simple: #2 inspired from @PhoenixHe-msft. Could you pls help to take a look?

Btw, I think it's a little hard for beginners to understand your sample codes. Could you pls share how you think and why to implement code like this?


// Wait for either the next event, or for the allotted time to pass.
const event = await Promise.race([
futureEvent,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does futureEvent promise resolve?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets fulfilled when the queue has an event available, and we get the value from the Promise.race call. We have to hold onto it in case the wait function resolves first in the Promise.race.

return Promise.resolve(item);
}

this._nextItemPromise = new Promise<T>((resolve) => (this._nextItemResolve = resolve));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this line not executed. this._nextItemPromise is an unused parameter

Copy link
Owner

@ruowan ruowan Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Does this line is to return a pending Promise, when there is no item in the queue? And the pending promise is used in Promise.race() so it supports timeout functionality for the awaitable queue. If my understanding wrong, pls correct me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right!

const abortListener = () => {
clearTimeout(tid);
reject(new AbortError("This operation was cancelled."));
abortSignal.removeEventListener("abort", abortListener);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 😁

* This differs from ordinary Queues in that `shift` returns a Promise for a value.
* This allows a consumer of the queue to request an item that the queue does not yet have.
*/
class AwaitableQueue<T> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've written this millions of times, and this is a pretty clean implementation I have to say.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you sir.

@@ -0,0 +1,198 @@
import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty solid, but we need a sample sample so we know how to use this sample 😁

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @bterlson I found your blog https://devblogs.microsoft.com/azure-sdk/how-to-use-abort-signals-to-cancel-operations-in-the-azure-sdk-for-javascript-typescript/. It's quite useful for me to understand abortController. Thanks~ 👍

@ruowan
Copy link
Owner

ruowan commented Dec 3, 2020

LGTM. Ready to merge?

@chradek
Copy link
Author

chradek commented Dec 7, 2020

@ruowan
I'm fine with having this merged, I think we'll still want to review the sample docs in the PR you submitted once this is merged but I think we're almost there.

@ruowan ruowan merged commit 0e64fde into ruowan:ruowan/expressSample Dec 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants