-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add callback to process Azure Service Bus message contents #41601
Conversation
This addresses #26446 |
c6c4999
to
3b668a1
Compare
Any chance @Lee-W or @Taragolis can review? |
It looks good at the first glance but would like to know what would be the main usage of this functionality |
For my particular use case, I want to grab a file-location URL out of the message body and pass it to the next task in the DAG. In general, it provides a way to receive data from the message and react to it. This data may be success or failure information, file locations, etc. Other people have looked for this functionality too: #26446 In the AWS provider, the SQS sensor provides a similar result by putting the message into XCOM under the key messages. https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/sensors/sqs.py#L46 I don't know that I always want the entire message shoved into XCOM though. Often, I might want to branch or just store part of the data and so I provide a callback function so the user of the operator can choose what they want to happen with the data in the message body. |
3b668a1
to
3369fd3
Compare
Sounds good 👍 |
6aa59ca
to
b5754d8
Compare
b5754d8
to
26c6ede
Compare
This is ready to be reviewed again. |
I might have messed this one up from a usability point of view. I was assuming that if the context (to access XComs) was needed in the callback it could be captured in a lambda and passed down. However, I'm having trouble accomplishing that and it look like the old technique of calling an operator from within an "outer" task function is now highly discouraged. The obvious solution"is to add a context parameter to the callback function. However, this would change the method signature and potentially break code somewhere "in the wild". I highly doubt anyone is using this yet, but that seems like the wrong thing to do. Another solution would be to have to possible signatures of the callback: one with the context parameter and one without. Then inspect the callback function and only pass the context if the callback takes two arguments. That seems messy. Any thoughts on this? |
I suppose for 1-argument callbacks we could catch the type error, print a warning to update the function, and then call it with just the message. Is generating the TypeError expensive? |
maybe bump a major version for this?
I personally don't like it 🤔 sounds hacky to me
I guess it just as expensive as a regular exception? |
Everything except releasing a major version sounds too hacky to me as well.... I'll open a ticket for this. |
opened 43361 |
This PR adds a callback to process messages from an Azure Service Bus.
Right now, on main, the Azure Service Bus message receiver simply logs the messages received and returns. This PR preserves that default, but adds the ability to pass in a callback function with the signature ServiceBusMessage -> None which is applied to each message received.
This change is made to both the general receiver and the subscription receiver.
I'd like someone from the Airflow community to weigh in on if this is a good solution. To me, it seemed like the least intrusive way to add a hook to obtain the message contents. However, I'm not sure if it plays well with the Airflow Operator standards as I've barely been using Airflow for a month.
Please let me know if you see any errors, omissions, or if I'm headed in the wrong direction.
Thank you,
Tim