[EventHub] Fix race condition when buffered mode is enabled #34712
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
This PR implements a fix to the race condition described in #34711.
The approach here is twofold:
lock
while trying to add new events to internal EventBatch bufferImplementing either of these would be enough to fix the related issue, but I would argue that both are necessary to make thinks consistent:
self._cur_batch
. It is especially important to synchronousBufferedProducer
, since it relies on multi-threading, which can be even more unpredictable than an event loop.self._cur_batch
andself._cur_buffered_len
:_cur_batch
is "reset" (i.e., assigned to a newEventBatch
object) when and only when it gets enqueued toself._buffered_queue
_cur_buffered_len
is decreased solely as a consequence of aself._producer.send
. PR [Eventhub] Fix Blocking Behavior of Buffered Producer Flush #25406 previously introduced a manual reset to this number, but it did not consider the timeout-based exit condition:azure-sdk-for-python/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py
Lines 190 to 200 in 23121a5
A last note regarding Change 1.:
azure-sdk-for-net
also updates theself._cur_batch
with a synchronization tool, seeRunPublishingAsync
I also have another small set of changes that could make sync and async
BufferedProducer
codebase more consistent. Since they do not have an impact on functionality, I thought it would be better to submit them in follow up PR.All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines
It would indeed be interesting to have some test code here, though I think it's hard to make determinist tests check this potential race condition. We would think something based on this test, though it is currently skipped:
azure-sdk-for-python/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_buffered_producer_async.py
Lines 529 to 532 in 23121a5