-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async EventHubProducerClient
+ AzureFunctions
#21849
Comments
hey @Stael , thanks for reaching out, we'll investigate into the issue ASAP |
Hello @yunhaoling, perfect, thanks ! |
hey @Stael , I got some time to read through the issue. So first of all, it is expected that the EventHubProducerClient takes some time to establish the connection to the service. There're multiple AMQP layers initialization as well as authentication needed to be performed as per the AMQP spec. In general, I would recommend you to reuse the same client if your application is time sensitive. to answer your questions:
I would say yes, it would be helpful to narrowing down the problems whether it's within the Event Hub or Azure Function.
I have read your original issue, it seems that you want to reduce the time of reconnection. Async/await would be helpful if you're working with I/O bound tasks (let's say you have 10 coroutine tasks that perform sending). But yea, you could keep the async one as it does no harm.
I'm sorry this is actually a bug in our SDK that reusing (open-close-open) the same client to send events without specifying partition id would lead to failure. (I'll make a PR to fix it) but I think it's better to remove I'll probably just go with: class EventProducer:
def __init__(self, password: str):
self._password: str = password
self.client = EventHubProducerClient.from_connection_string(self._password)
async def produce(self, data: dict):
bson_encoded_data = bson.BSON.encode(data)
b64_encoded_data = base64.b64encode(bson_encoded_data)
await client.send_batch([EventData(b64_encoded_data)]) and for more context, the service side has enforced a 10 mins idle timeout which means if there's no activities in 10 mins, the service would force closing the connection between the service and the client. Please let me know if you have any questions! |
Hello @yunhaoling,
Ok, got it. This will be do-able once your PR fixing the reuse of
To be clear the timeout is ajustable on our side.
From my understanding (I asked the question previously here) :
However from time to time the function needs more than 120 sec to execute which triggers a timeout. IMHO me there are 3 possibilities :
Ok. I will implement a version with a
Alright, once the bug is fixed on your side, I will implement a version which reuses
I have tried to use your simplified version (which is different from the one in the docs). Thanks again. |
Update on the async def produce(self, data: dict):
logging.info("Start BSON encode")
bson_encoded_data = bson.BSON.encode(data)
logging.info("Done BSON encode")
logging.info("Start B64 encode")
b64_encoded_data = base64.b64encode(bson_encoded_data)
logging.info("Done B64 encode")
logging.info("Start creating client from connection string")
client = EventHubProducerClient.from_connection_string(self._password)
logging.info("Done creating client from connection string")
for i in range(1, 4):
try:
logging.info("Start async with")
async with client:
logging.info("In async with")
logging.info("Start send batch")
await client.send_batch([EventData(b64_encoded_data)], timeout=2)
logging.info("Done send batch")
logging.info("Done async with")
return
except Exception as e:
logging.error(f"Try: {i}: failed to send batch")
logging.exception(e) This code still triggered a timeout after See logs: example 1 | example 2 with debug logs |
Hello @yunhaoling, any update on this? |
hey @Stael , apologize for the late response and I appreciate you trying out different approaches and sharing your latest debugging information. I think there're two issues here:
How about let's solving one issue at a time? I would like to work with you on getting a workaround first. Let's focus on reusing a single async client because I feel this is the ultimate approach we want to go with, just one client, no new client/new connection each time the Function is triggered. Let's see why the async client would crash with Are you aware of any pattern that your application (Azure Function) is undergoing when the single async client program crashes? like a burst of http request coming in or a long period of activity? Usually I see crashes like 139(SIGSEGV) due to our underlying networking uAMQP lib crashes as it's not thread safe. A burst of messages + long inactivity would lead to a client shutting down and recreating the underlying connection which is error prone. I would go with adding a lock see if it helps mitigate the crash issue. Also, one more thing is that could you help turn on the networking logging by setting import asyncio
class EventProducer:
def __init__(self, password: str):
self._password: str = password
self.client = EventHubProducerClient.from_connection_string(self._password, logging_enable=True)
self.lock = asyncio.Lock()
async def produce(self, data: dict):
bson_encoded_data = bson.BSON.encode(data)
b64_encoded_data = base64.b64encode(bson_encoded_data)
async with self.lock:
await client.send_batch([EventData(b64_encoded_data)]) |
Hello @yunhaoling Yes, I do agree with you, it looks like there are several issues mixed up. I have tried your snippet on our staging environment with success (no 139 crash / timeout) So it looks like the lock does indeed solve the issue (or part of it). However I am unsure of "why". Moreover unlike previously where the logs would abruptly stop right after entering the
Unfortunately no, I have tried several times to find a pattern, but I did not find anything conclusive. If it helps I can, on our staging environment, re-deploy the previous version of the code with I will now open a ticket on the AzureFunction for python repo. I want them to have a look at what can freeze the Function App like that. |
Hello @yunhaoling do you have any update on this ? |
hey @Stael , apologize for not getting back to your sooner and thanks for trying out the snippet. I do have an update for you is that the producer client unable to reopen after close due to partition id issue which your posed earlier has been fixed in the latest stable release.
This is interesting.. I'm gonna to verify that if send timeout is met, the send_batch method would raise the error properly and log the necessary error information. Yea, I would also like to know why the logs freeze.. I would appreciate if you could help turn on the logging by |
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you! |
Hello @yunhaoling Thank you for your reply. I have noted that the issue with the producer client has been fixed. Thank you. import asyncio
class EventProducer:
def __init__(self, password: str):
self._password: str = password
self.client = EventHubProducerClient.from_connection_string(self._password, logging_enable=True)
self.lock = asyncio.Lock()
async def produce(self, data: dict):
bson_encoded_data = bson.BSON.encode(data)
b64_encoded_data = base64.b64encode(bson_encoded_data)
async with self.lock:
await client.send_batch([EventData(b64_encoded_data)]) to : class EventProducer:
def __init__(self, password: str):
self._password: str = password
self.client = EventHubProducerClient.from_connection_string(self._password)
async def produce(self, data: dict):
bson_encoded_data = bson.BSON.encode(data)
b64_encoded_data = base64.b64encode(bson_encoded_data)
await client.send_batch([EventData(b64_encoded_data)]) Have you been able to look at why does the I had turned on the logging using I am also waiting for an investigation on the other issue that it opened: Azure/azure-functions-python-worker#931 |
Hello again @yunhaoling ! Out of sheer luck ... I managed to trigger the bug and to capture the logs. So it is definitely related to the Could you please have a look at it and get back as soon as possible. |
hey @Stael , apologize for the delayed response and thanks for sharing the logs! from the logs we could conclude that:
I have inspiration on debugging the issue, as the next thing to expect in the flow in creating a new uamqp sender and get sender ready, however, it's not happening means the uamqp sender preparation process got stuck probably due to an inconsistent internal status. I will dig deep into that part. one question for you to confirm it that does the error occur with or without the asyncio lock or both would trigger this hanging issue? |
Hello @yunhaoling,
I don't think that it is during "the first time".
I am currently using the code that you suggested with the asyncio lock in production. Thanks, |
apologize for not making it clear, it's the "first try to send the message". Thanks for the extra info on which scripts you're using, will dig deep into it! btw, I'm thinking of a workaround for you to unblock your deployment/application, in case fixing/locating the issue takes longer than I expect. I think async def produce(self, data: dict):
bson_encoded_data = bson.BSON.encode(data)
b64_encoded_data = base64.b64encode(bson_encoded_data)
retry = 0
while retry < 4:
try:
async with self.lock:
await asyncio.wait_for(client.send_batch([EventData(b64_encoded_data)]), timeout=30) # if the coroutine doesn't finish in 30s, asyncio.TimeoutError would be raised.
return
except asyncio.TimeoutError:
retry += 1
logging.warning("retry exhausted") |
Hello @yunhaoling. Thank you for this snippet, I will try to implement and test it ASAP. |
Hello @yunhaoling Here is the implementation I went for ( class EventProducer:
_MAX_NUM_RETRIES: int = 4
_TIMEOUT_SEC: int = 5
def __init__(self, password: str):
self.client = EventHubProducerClient.from_connection_string(
password, logging_enable=True, retry_total=0, auth_timeout=EventProducer._TIMEOUT_SEC
)
self.lock = asyncio.Lock()
async def produce(self, data: dict):
bson_encoded_data = bson.BSON.encode(data)
b64_encoded_data = base64.b64encode(bson_encoded_data)
logging.info('Done encoding data')
for i in range(EventProducer._MAX_NUM_RETRIES):
try:
async with self.lock:
logging.info(f'Start sending batch, try : {i + 1}')
await asyncio.wait_for(
self.client.send_batch([EventData(b64_encoded_data)], timeout=EventProducer._TIMEOUT_SEC),
timeout=EventProducer._TIMEOUT_SEC
)
logging.info(f'Done sending batch, try : {i + 1}')
return
except asyncio.TimeoutError:
logging.error(
f'Timeout while sending data, try : {i + 1} / {EventProducer._MAX_NUM_RETRIES}'
)
except Exception as e:
logging.error(f'Unexpected exception while sending data: {e}')
logging.exception(e)
logging.error('Unable to send the data: retries exhausted') Unfortunately no impact. Here are two examples :
To be honest with you, at this point, seeing how many issues we are facing, we will probably stop using EventHub/AzureFunctions/BlobStorage in the next few months. |
hey @Stael , thanks for the updates and sharing me with your complete code. I sincerely feel sorry about your bad experience, and I admit it's our fault that we didn't run enough test against the scenarios when the Event Hub SDK is integrated with Azure Function. I have prioritized this issue and working on it now, making changes to the underlying dependency library uamqp where I believe the stuck took place. I would probably give you a manually built patch first to try out the fix for the sake of saving time. Again, I understand the pain here and I really appreciate your collaboration and patience with us. I will do my best to support. |
Hello @yunhaoling. Thank you for the update. Alright, you can send me a patch and I will try my best to test it asap. |
hey @Stael , thanks for your patience and I have got some updates and potential workarounds for you. Background knowledge: connection idle time out
The two issues related to idle time out:
Attempts:- approach 1, try manually keeping connection alive? The workaround jumps right into my mind is whether we could keep the connection alive to avoid idle timeout, so I tried using a background thread to keep sending heartbeats to keep connection alive (a feature supported by the dependency uamqp library). However, this doesn't work out nicely as the azure function is triggered by incoming request (event driven), the program gets suspended once the main function returns. And I see azure function switching workers executing the function. Then I realize what I'm trying to achieve is to turn azure function into a stateful/long processing service (the long amqp connection as a state), while the azure function itself is designed to be stateless. I did some search on the stackoverflow and found this: https://stackoverflow.com/questions/63490224/is-azure-functions-a-right-fit-for-long-running-tasks, maybe azure function is not the best for this scenario, or you could alternatively use EventGrid instead EventHubs which is a http based service. - approach 2, use azure eventhub output binding integration provided by azure function I was wondering instead doing it myself, is there feature azure function provides? I tried it and it feels promising, I didn't see any issue so far. the documentation could be found here: My code:I have put all my code in the following repo, approach 2 is fairly straightforward, it's more about configuration. I feel approach 2 is the right approach here, could you help check if approach 2 satisfies your requirements? I would be glad to help you out! |
Hello @yunhaoling, Thank you for the update. I looked at the two approaches that you offer :
I will let it run for a while and do some more tests before releasing to our production env. Thank you for your help & the code that you provided. function.json: ...
{
"authLevel": "anonymous",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": ["get"],
"route": "my-route"
},
{
"type": "http",
"direction": "out",
"name": "$return"
},
{
"type": "eventHub",
"name": "queue",
"eventHubName": "__MUST_EXISTS_BUT_WILL_BE_OVERWRITTEN_BY_CONNECTION__",
"connection": "<connection>",
"direction": "out"
} controller: def main(req: func.HttpRequest, queue: func.Out[bytes]) -> func.HttpResponse:
response = ...
encoded_data = ...
queue.set(encoded_data)
return response |
awesome, looking forward to your good news. The azure function eventhub output binding has internally integrated with a .NET SDK EventHubProducerClient, and I assume it's kept alive by the azure function for sending events. |
Hi @Stael. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “ |
Hi @Stael, since you haven’t asked that we “ |
Hello @yunhaoling, I am happy to report that after a full week using the solution mentioned previously ... I encountered 0 issue 👍 I will also update my other issue to let them know that: the issue seems related to how Indeed, I was advised to use this combo and it is actually described and covered by tests on their side. Thank you for your help. |
Hello !
I am posting this ticket here, but please let me know if you think I should close it and post it on Azure/azure-functions-python-worker instead.
We are currently using several AzureFunctions in production :
Use case:
azure-eventhub
package.The AzureFunction has a timeout of X seconds, meaning that after X seconds, the function is "aborted" (we have been experimenting with 15/30/60/120sec).
We have 2 versions of the code :
"Threaded Version": the one currently in production which doesn't use async/await but uses threading (See here). This version was created to mitigate issue 2 (see below).
The function code looks like this:
"Async/Await Version": Another one which we created and released in "staging" because we were unable to debug issue 1 while using the "threading version" of the code.
Our producer code looks like this:
The function code looks like this:
We face two issues :
With the "Threaded Version" of the code, several times per day we experience timeouts. We were unable to pinpoint the issue so we developed the "Async/Await version" of the code and managed to trace the issue back to (from what I understand)
send_batch
which requires more than X seconds to be executed. See here an example with a 120sec timeout.timeout
tosend_batch
with a retry mechanism ? (but there is already a retry mechanism in the library right ?)As you can see, on each HTTP Request, we create a new
EventHubProducerClient
. But this process takes more than 500ms each time (See here and here) and sometimes even more, which is a big deal for us.EventHubProducerClient
(by instancing theEventHubProducerClient
in theEventProducer
construct basically) unfortunately it seems that this approach doesn't seem to work (see here).Thanks !
The text was updated successfully, but these errors were encountered: