Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHub] Race condition when buffered mode is enabled drops message #34711

Closed
falcaopetri opened this issue Mar 9, 2024 · 3 comments
Closed
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs Messaging Messaging crew needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@falcaopetri
Copy link
Contributor

falcaopetri commented Mar 9, 2024

  • Package Name: azure-eventhub
  • Package Version: 5.11.6
  • Operating System: Linux
  • Python Version: Python 3.10

Describe the bug

An event passed to async EventHubProducerClient with buffered_mode=True may, sometimes, never get flushed (i.e., actually sent to the Event Hub's topic).

I've already tracked the issue down to a race condition on the inner implementation of BufferedProducer, see the sections below.

This has actively impacted one use case I know, implying in losing messages randomly: some "sent" messages never arrived at the other end.

EventHubProducerClient's max_wait_time config directly contributes to the frequency of dropping a message. And given the race condition nature I mentioned, I think it's safe to say that factors such as network latency and co-routine/thread scheduling order can also affect the frequency of this occuring.

In this issue, I will mainly talk about the EventHubProducerClient async version, though the same should apply to the sync, thread-based EventHubProducerClient version.

To Reproduce

Since I ended up blaming a race condition issue, it is expected to occur only under specific conditions, which might be hard to reproduce.

Even so, I was able to trigger the critical path fairly often using some specific combination of thresholds. Of course, a real scenario is harder to reproduce, but I can tell that the same idea applies.

You can see the code at https://gist.github.com/falcaopetri/18a65d316f0f2cc12ee65f3e6939976d. Here I will focus on explaining the log and the critical path.

The log at the end of the reproduction code is:

A couple of log lines...
DEBUG:race_scenario:cur_buffer_id='136368156972400', adding the event (EventData(body='{"seq_id": 1, "timestamp": "2024-03-09T15:44:40.766777"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:cur_buffer_id='136368156972400', added the event (EventData(body='{"seq_id": 1, "timestamp": "2024-03-09T15:44:40.766777"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:cur_buffer_id='136368156972400', adding the event (EventData(body='{"seq_id": 2, "timestamp": "2024-03-09T15:44:40.775864"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:cur_buffer_id='136368156972400', added the event (EventData(body='{"seq_id": 2, "timestamp": "2024-03-09T15:44:40.775864"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:cur_buffer_id='136368156972400', adding the event (EventData(body='{"seq_id": 3, "timestamp": "2024-03-09T15:44:40.780930"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:cur_buffer_id='136368156972400', added the event (EventData(body='{"seq_id": 3, "timestamp": "2024-03-09T15:44:40.780930"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition: '0' started flushing.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' is sending.
DEBUG:race_scenario:called _send (EventDataBatch(max_size_in_bytes=1048576, partition_id=None, partition_key=None, event_count=4),) {'timeout': None}
DEBUG:race_scenario:cur_buffer_id='136368156978736', adding the event (EventData(body='{"seq_id": 4, "timestamp": "2024-03-09T15:44:43.784397"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:_send finished (EventDataBatch(max_size_in_bytes=1048576, partition_id=None, partition_key=None, event_count=4),) {'timeout': None}
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' sending 4 events succeeded.
WARNING:race_scenario:on_success, partition_id: 0, events: [EventData(body='{"seq_id": 0, "timestamp": "2024-03-09T15:44:39.154016"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), EventData(body='{"seq_id": 1, "timestamp": "2024-03-09T15:44:40.766777"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), EventData(body='{"seq_id": 2, "timestamp": "2024-03-09T15:44:40.775864"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), EventData(body='{"seq_id": 3, "timestamp": "2024-03-09T15:44:40.780930"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None)]
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' finished flushing.
DEBUG:race_scenario:cur_buffer_id='136368156971104', added the event (EventData(body='{"seq_id": 4, "timestamp": "2024-03-09T15:44:43.784397"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition: '0' started flushing.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' finished flushing.
DEBUG:race_scenario:cur_buffer_id='136368156973792', adding the event (EventData(body='{"seq_id": 5, "timestamp": "2024-03-09T15:45:03.173829"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
DEBUG:race_scenario:cur_buffer_id='136368156973792', added the event (EventData(body='{"seq_id": 5, "timestamp": "2024-03-09T15:45:03.173829"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None), None) {} 
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition: '0' started flushing.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' is sending.
DEBUG:race_scenario:called _send (EventDataBatch(max_size_in_bytes=1048576, partition_id=None, partition_key=None, event_count=1),) {'timeout': None}
DEBUG:race_scenario:_send finished (EventDataBatch(max_size_in_bytes=1048576, partition_id=None, partition_key=None, event_count=1),) {'timeout': None}
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' sending 1 events succeeded.
WARNING:race_scenario:on_success, partition_id: 0, events: [EventData(body='{"seq_id": 5, "timestamp": "2024-03-09T15:45:03.173829"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None)]
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' finished flushing.

Which can be simplified as:

# enqueue events with seq_id={1,2,3}
# (note that seq_id=0 has already being enqueued before, and will be flushed on the same EventBatch)
# start flushing partition 0
# enqueue event with seq_id=4
# ended flushing events seq_id={0,1,2,3}
# enqueue event with seq_id=5
# start flushing partition 0
# ended flushing events seq_id={5}

The issue happens to seq_id=4, which gets "enqueued", but never flushed.

The critical path is a classic consumer/producer race condition between BufferedProducer.check_max_wait_time_worker and BufferedProducer.put_events and looks like this:

  1. BufferedProducer._check_max_wait_time_worker loop get's executed by the event loop and decides that it's time for flushing, acquires the BufferedProducer object's lock and calls BufferedProducer._flush:

    if (now_time - self._last_send_time > self._max_wait_time) or (
    self._cur_buffered_len >= self._max_buffer_len
    ):
    # in the worker, not raising error for flush, users can not handle this
    async with self._lock:
    await self._flush(raise_error=False)

  2. _flush start's by adding the _cur_batch to the queue, and "cleaning" the _cur_batch by pointing it to a newly created object:

    if self._cur_batch: # if there is batch, enqueue it to the buffer first
    self._buffered_queue.put(self._cur_batch)
    self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)

  3. _flush starts calling _producer.send asynchronously, which might sometimes take a few moments before returning:

    await self._producer.send(
    batch,
    timeout=timeout_time - time.time() if timeout_time else None,
    )

  4. A new event is requested to be added to the buffer batch (e.g. by user code):

    await producer.send_event(EventData('Single data {}'.format(i)))

  5. The call goes all the way down to await BufferedProducer.put_events (while the event loop is still preferring this path, since producer.send has not completed yet):

  6. BufferedProducer.put_events gets called, and adds an event to the object's internal buffer without any lock (some operations are protected by lock as introduced in [Eventhub] Fix Blocking Behavior of Buffered Producer Flush #25406, but not this operation):

  7. BufferedProducer.put_events finished its job, and so did BufferedProducer._flush because producer.send finished. Just some clean up before exiting: assign _cur_batch to a new object, losing the previous reference to which user's event was added

    self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)

Expected behavior
A message added to a buffered EventHubProducerClient should always get sent.

Additional context

I am also submitting a PR (#34712) with some more, implementation specific, documentation.

This issue seems to exist since the first release of buffered_mode in 5.10.0 via #24653.

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-triage Workflow: This issue needs the team to triage. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Mar 9, 2024
@kashifkhan kashifkhan self-assigned this Mar 11, 2024
@kashifkhan kashifkhan added the Messaging Messaging crew label Mar 11, 2024
@kashifkhan
Copy link
Member

Thank you for the detailed issue and PR @falcaopetri. I'll be taking a look at the PR and the issue, we will need to do some testing and checking on our side so there will be some back and forth :)

@kashifkhan kashifkhan removed the needs-team-triage Workflow: This issue needs the team to triage. label Mar 11, 2024
@github-actions github-actions bot added the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Mar 11, 2024
@kashifkhan
Copy link
Member

kashifkhan commented Mar 21, 2024

Thank you for the fantastic PR @falcaopetri :) Its merged now and will be part of the next release

@kashifkhan
Copy link
Member

@falcaopetri the change is now there in the latest release that is on pypi

@github-actions github-actions bot locked and limited conversation to collaborators Jul 9, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs Messaging Messaging crew needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

4 participants