Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send EventDataBatch with per-message partition_key #23510

Closed
weltonrodrigo opened this issue Mar 15, 2022 · 12 comments
Closed

Send EventDataBatch with per-message partition_key #23510

weltonrodrigo opened this issue Mar 15, 2022 · 12 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs feature-request This issue requires a new behavior in the product in order be resolved. Messaging Messaging crew needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@weltonrodrigo
Copy link

weltonrodrigo commented Mar 15, 2022

It would be nice to be able to send a batch where every message has it's own partition_key.

The case here is when I get a batch from an external client, but I need to repartition this batch. Send every message isolated it not ideal, so using a batch is essential.

But there is no public interface to set the partition key for a single EventData, only EventDataBatch. It used to exist, but was removed some time ago, judging for avaiable examples.

Why was it removed? Can we get it back?

Currently, I'm having to do:

for message in messages:
  ev = EventData(json.dumps(message))
  ev._raw_amqp_message.annotations[b"x-opt-partition-key"] = self._custom_partitioner(message)  # hack
  event_data_batch.add(ev)

Where _custom_partitioner it's just a business-aware partitioner.

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Mar 15, 2022
@azure-sdk azure-sdk added Client This issue points to a problem in the data-plane of the library. Event Hubs needs-team-triage Workflow: This issue needs the team to triage. labels Mar 15, 2022
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Mar 15, 2022
@yunhaoling yunhaoling added feature-request This issue requires a new behavior in the product in order be resolved. Messaging Messaging crew and removed needs-team-triage Workflow: This issue needs the team to triage. labels Mar 15, 2022
@yunhaoling yunhaoling self-assigned this Mar 15, 2022
@yunhaoling
Copy link
Contributor

hey @weltonrodrigo , thanks for your feedback!

The EventDataBatch is designed to host a batch of events of the same partition_key or partition_id for high sending performance. Within the EventHubProducerClient there are producers for each partition id. events with no partition_id but with partition_key will be sent out by default producer.

I need to first check whether setting different partition keys of events within a batch would get events sent to the right partition.
If the service is working as we expected, then yes, we could consider adding the support.

@weltonrodrigo
Copy link
Author

I need to first check whether setting different partition keys of events within a batch would get events sent to the right partition.

Confirmed just now it's not working…

Should I need to create an EventBatch for each message or is there a way to send a single message?

@yunhaoling
Copy link
Contributor

thanks for confirmation.

this is expected by me actually because for a batch event, it's the partition id/partition key on the batch event that dominates.

Yes, so far you would need to create batch for each message (partition).

But we're working on a new feature called buffered producer in which you don't have to manage batch by yourselves and does automatically batching and sending in the background.

I think this coming feature should help resolve your issue!

@swathipil
Copy link
Member

Hi @weltonrodrigo - azure-eventhub v5.10.0 has been released with the buffered producer feature. The sample for buffered sending is [here].

Would you be able to test this to see if it resolves your issue? You can upgrade with pip install azure-eventhub --upgrade.

@swathipil swathipil added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Jul 8, 2022
@jsquire
Copy link
Member

jsquire commented Jul 14, 2022

It would be nice to be able to send a batch where every message has it's own partition_key.

For context, this is a service limitation. There is no way to send a single batch of messages with heterogeneous partition keys.

The Event Hubs service limits each message received by the gateway to a single partition key. In the case of an event batch, its physical form is an AMQP message that contains other messages as its body; the Event Hubs gateway associates the partition key from the batch envelope with all events that it contains. If any child message in the batch does have a different partition key annotation, the gateway will either choose to ignore the message annotation or may decide to reject the entire batch.

As Swathi mentions, the buffered producer can work around this because it manages batches implicitly. When you enqueue an event using a partition key, the buffered producer hashes that to a specific partition and puts it into a batch that will be sent to that partition. This approach allows the buffered producer to accept events enqueued with different routing methods and send them together in a single batch.

@weltonrodrigo
Copy link
Author

Are the other types of message also buffered?

I have this application where I use a call to async_producer.get_eventhub_properties() as healthcheck.

This is a moderate volume (100msg/second) and with the new buffered producer this call started to take a long time.

Is that a good approach to detect producer connection problems? Can your test harnesses reproduce if under load that call becomes too slow?

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Jul 14, 2022
@weltonrodrigo
Copy link
Author

weltonrodrigo commented Jul 25, 2022

I'm not sure if this is the correct place to provide feedback about this, but I advanced a little more on this problem.

I'm using python 3.10.5 (older versions also show the problem) gunicorn with aiohttp (gunicorn --worker-class=aiohttp.GunicornUVLoopWebWorker) and I get this strange problem where after one of the three partitions flush, the worker freezes.

It stops responding to requests and soon gets restarted by gunicorn.

Some weird combination of client requests per-second, max_wait_time and max_buffer_lenght gets is working for some time, but others make it freeze in the firts 30 seconds or so.

This is way my healthchecks were failling. The app starts, responds for 30 seconds or so, freezes and then stop responding to any requests (even the /healthcheck).

Do you have any version of aiohttp and gunicorn known to work with this buffered producer?

My environment has those versions:

aiohttp                     3.8.1
aiohttp-prometheus-exporter 0.2.4
aiosignal                   1.2.0
aiotools                    1.5.9
async-timeout               4.0.2
attrs                       21.4.0
azure-core                  1.24.2
azure-eventhub              5.10.0
certifi                     2022.6.15
charset-normalizer          2.1.0
click                       8.1.3
Deprecated                  1.2.13
frozenlist                  1.3.0
gunicorn                    20.1.0
h11                         0.13.0
idna                        3.3
limits                      2.7.0
multidict                   6.0.2
orjson                      3.6.7
packaging                   21.3
pip                         22.1.2
prometheus-client           0.14.1
pylogbeat                   2.0.0
pyparsing                   3.0.9
python-logstash-async       2.5.0
pytz                        2022.1
requests                    2.28.1
setuptools                  62.6.0
six                         1.16.0
typing_extensions           4.3.0
uamqp                       1.5.3
urllib3                     1.26.10
uvloop                      0.16.0
wheel                       0.37.1
wrapt                       1.14.1
yarl                        1.7.2

UPDATE: it gets stuck at

. Only SIGKILL can interrupt the process.

@kashifkhan
Copy link
Member

Hi @weltonrodrigo, thank you for your feedback. Would you be able to provide some code that I can use to reproduce the error please?

@weltonrodrigo
Copy link
Author

Hi @weltonrodrigo, thank you for your feedback. Would you be able to provide some code that I can use to reproduce the error please?

Trying to reproduce on a smaller code.

You can try enqueuing 10 messages per seconds, with a max_wait_time=100 and max_buffer_lenght that will take enough time to send that new messages will arrive while sending.

@weltonrodrigo
Copy link
Author

weltonrodrigo commented Jul 25, 2022

This code triggers the problem.

This code is run until is gets stuck. Then a CTRL-C will exit.

import asyncio
import logging
import sys

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubSharedKeyCredential, \
    EventHubProducerClient

logging.basicConfig(format=logging.BASIC_FORMAT)
log = logging.getLogger("azure")
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
log.addHandler(handler)

async def on_buffer_send_success(*args):
    log.info("flushed")

async def on_buffer_send_error(*args):
    log.error("error flushing")


producer = EventHubProducerClient(
    fully_qualified_namespace="dev-eventhub-spia.servicebus.windows.net",
    eventhub_name="testetorres",
    credential=EventHubSharedKeyCredential("leituraescrita", "A9rQFR3qthjTushFXm/246/t6hsKE1civ8lwvYnKYus="),
    auth_timeout=3, retry_total=3, retry_mode='fixed',
    retry_backoff_factor=0.01,
    buffered_mode=True,
    on_success=on_buffer_send_success,
    on_error=on_buffer_send_error,
    max_wait_time=10,
    max_buffer_length=100,
    logging_enable=True
)

payload = """
{
  "idImagem": "02032asdfasdfasdfasdfa1090-F01.jpg",
  "dataHoraTz": "2022-07-21T16:50:25-03:00",
  "camera": {
    "numero": "Lasdfasdfasdf325"
  },
  "empresa": "AasdfasdfsRIS",
  "dataHora": "2022-07-21 16:50:25",
  "key": "4asdfasdfasasdfasdfas4BD7Fasdfasdfasdfas",
  "placa": "Rasdfasdfasdfasdf3",
  "dataRecebimento": "2022-07-21T16:50:45.499461-03:00",
  "codigoLog": "2022072asdfasdfasdfasdfasdfas20325"
}
"""


async def publish(timeout):
    while True:
        await producer.send_event(EventData(payload))
        log.info("Enqueued message")
        await asyncio.sleep(timeout)


if __name__ == '__main__':
    try:
        asyncio.get_event_loop().run_until_complete(publish(.1))
    except Exception as err:
        log.exception(err)

The output:

foo@bar:~$ /usr/local/opt/[email protected]/bin/python3 -m venv 39env
foo@bar:~$ source 39env/bin/activate
foo@bar:~$ python test.py
Enqueued message
INFO:azure:Enqueued message
Partition '0' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' worker is checking max_wait_time.
Enqueued message
INFO:azure:Enqueued message
Partition '1' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '1' worker is checking max_wait_time.
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Partition '0' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' worker is checking max_wait_time.
Enqueued message
INFO:azure:Enqueued message
Partition '1' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '1' worker is checking max_wait_time.
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Partition '0' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' worker is checking max_wait_time.
Partition: '0' started flushing.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition: '0' started flushing.
Partition '0' is sending.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' is sending.
Enqueued message
INFO:azure:Enqueued message
Partition '1' worker is checking max_wait_time.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '1' worker is checking max_wait_time.
Partition: '1' started flushing.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition: '1' started flushing.
Partition '1' is sending.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '1' is sending.
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Enqueued message
INFO:azure:Enqueued message
Partition '0' sending 49 events succeeded.
INFO:azure.eventhub.aio._buffered_producer._buffered_producer_async:Partition '0' sending 49 events succeeded.
flushed
INFO:azure:flushed
^CTraceback (most recent call last):
  File "test.py", line 62, in <module>
    asyncio.get_event_loop().run_until_complete(publish(.1))
  File "/usr/local/Cellar/[email protected]/3.8.13_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/usr/local/Cellar/[email protected]/3.8.13_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/local/Cellar/[email protected]/3.8.13_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
  File "/usr/local/Cellar/[email protected]/3.8.13_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/torres/firmacode/spia/py-apps/wsocrspia/38env/lib/python3.8/site-packages/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py", line 206, in check_max_wait_time_worker
    await self._flush(raise_error=False)
  File "/Users/torres/firmacode/spia/py-apps/wsocrspia/38env/lib/python3.8/site-packages/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py", line 152, in _flush
    batch = self._buffered_queue.get()
  File "/usr/local/Cellar/[email protected]/3.8.13_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/queue.py", line 170, in get
    self.not_empty.wait()
  File "/usr/local/Cellar/[email protected]/3.8.13_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()
KeyboardInterrupt

Requirements are:

aiohttp==3.8.1
aiohttp-prometheus-exporter==0.2.4
aiosignal==1.2.0
aiotools==1.5.9
async-timeout==4.0.2
attrs==21.4.0
azure-core==1.24.2
azure-eventhub==5.10.0
certifi==2022.6.15
charset-normalizer==2.1.0
frozenlist==1.3.0
gunicorn==20.1.0
idna==3.3
multidict==6.0.2
orjson==3.6.7
prometheus-client==0.14.1
pytz==2022.1
requests==2.28.1
six==1.16.0
typing_extensions==4.3.0
uamqp==1.5.3
urllib3==1.26.11
yarl==1.7.2

Python is

foo@bar:~$ python -VV
Python 3.9.13 (main, May 24 2022, 21:28:44)
[Clang 13.0.0 (clang-1300.0.29.30)]

Python was installed with brew install [email protected].

Note that those credentials and that eventhub don't exist anymore (deleted after I posted).

@swathipil
Copy link
Member

swathipil commented Aug 17, 2022

Hi @weltonrodrigo - Thanks for your patience on this!
@kashifkhan was able to reproduce and fix this issue. This bug fix will be out in our next azure-eventhub release. I will update this issue as soon as it has been released.

More context on the bug from @kashifkhan: In our async producer, internally, we are using a sync queue to hold events. There is also an async queue implementation, but it's not thread-safe so we opted for the sync queue. This queue turned out to be blocking, which is not async-friendly, and we had to make a few changes to handle this behavior correctly.

@weltonrodrigo
Copy link
Author

We've been using it since it was merged and seems fine. thanks.

@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs feature-request This issue requires a new behavior in the product in order be resolved. Messaging Messaging crew needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

6 participants