-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Eventhub] Fix Blocking Behavior of Buffered Producer Flush #25406
Conversation
API change check APIView has identified API level changes in this PR and created following API reviews. |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
sdk/eventhub/azure-eventhub/samples/async_samples/send_buffered_mode_async.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just had some questions about the locking -- wasn't sure how it works
sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py
Outdated
Show resolved
Hide resolved
sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py
Outdated
Show resolved
Hide resolved
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py
Outdated
Show resolved
Hide resolved
sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py
Outdated
Show resolved
Hide resolved
sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_buffered_producer_async.py
Show resolved
Hide resolved
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/check-enforcer override |
sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py
Outdated
Show resolved
Hide resolved
sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py
Outdated
Show resolved
Hide resolved
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/check-enforcer override |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
) * changes to make get non blocking async * make emptying a queue non blocking * test * reset sample async * test * new line * remove second lock * sum over all the partitions * fix async tests * remove debug print * lock around queue * fix pylint * changelog * remove print * reset cur_buffer_length outside while loop * increase waiting time to receive events * remove sleep on add
) * changes to make get non blocking async * make emptying a queue non blocking * test * reset sample async * test * new line * remove second lock * sum over all the partitions * fix async tests * remove debug print * lock around queue * fix pylint * changelog * remove print * reset cur_buffer_length outside while loop * increase waiting time to receive events * remove sleep on add
update typescript.md in resource (Azure#25406) * update typescript.md in resource * update
This is in reference to the issue #23510
Thanks to @annatisch for helping with the bug :)
queue.get
was not needed for both sync & async. Flush should simply empty the queue, reset the buffer length and move on. This is whyblock=False
is passed in andcur_buffered_len
is reset on Empty exceptionput_event
as it is modifying some shared variables such ascurr_batch
,cur_buffered_len
as well as attempts aflush
when the queue is full. This way we won't be adding things into the queue while flushing, leading to an infinite loop.