Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented MSGraphSensor as a deferrable sensor #39304

Merged
merged 30 commits into from
May 5, 2024

Conversation

dabla
Copy link
Contributor

@dabla dabla commented Apr 29, 2024

Implemented a default response handler which suppresses JSONDecodeError when response is empty or isn't of type json but the default response type json was specified, otherwise will return the contents and if that's empty the response headers. This can for example occur when triggering PowerBI dataset refreshes. Also added a test in TestResponseHandler and added example on how to use the operator to refresh a PowerBI dataset. This PR also removed the response_handler param as lambda expressions can't be serialized and triggers always get serialized to the database when deferred before getting executed. The most import change in thi PR is that the MSGraphSensor now also uses the deferable task mechanism like the MSGraphAsyncOperator, so it doesn't rely on the classic blocking poke method but defers the tasks in a 2 step. First it will execute the call in a deferred way and check the event, if the condition is met the sensor will stop. If not then the sensor will first defer a TimeDeltaTrigger with the specified retry_delay (default is 60 for MSGraphSensor ) and once the task completed it will redefer the poll task, that way the sensor doesn't block the workers.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@dabla dabla marked this pull request as draft April 29, 2024 12:38
@dabla
Copy link
Contributor Author

dabla commented Apr 29, 2024

FYI: I've seen accross different PR that following test is sometimes randomly failing:

=================================== FAILURES ===================================
_______ TestCeleryExecutor.test_celery_integration[redis://redis:6379/0] _______

self = <tests.integration.executors.test_celery_executor.TestCeleryExecutor object at 0x7f26999c6be0>
broker_url = 'redis://redis:6379/0'

    @pytest.mark.flaky(reruns=3)
    @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
    def test_celery_integration(self, broker_url):
        from airflow.providers.celery.executors import celery_executor, celery_executor_utils
    
        success_command = ["airflow", "tasks", "run", "true", "some_parameter"]
        fail_command = ["airflow", "version"]
    
        def fake_execute_command(command):
            if command != success_command:
                raise AirflowException("fail")
    
        with _prepare_app(broker_url, execute=fake_execute_command) as app:
            executor = celery_executor.CeleryExecutor()
            assert executor.tasks == {}
            executor.start()
    
            with start_worker(app=app, logfile=sys.stdout, loglevel="info"):
                execute_date = datetime.now()
    
                task_tuples_to_send = [
                    (
                        ("success", "fake_simple_ti", execute_date, 0),
                        success_command,
                        celery_executor_utils.celery_configuration["task_default_queue"],
                        celery_executor_utils.execute_command,
                    ),
                    (
                        ("fail", "fake_simple_ti", execute_date, 0),
                        fail_command,
                        celery_executor_utils.celery_configuration["task_default_queue"],
                        celery_executor_utils.execute_command,
                    ),
                ]
    
                # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict
                for key, command, queue, _ in task_tuples_to_send:
                    executor.queued_tasks[key] = (command, 1, queue, None)
                    executor.task_publish_retries[key] = 1
    
                executor._process_tasks(task_tuples_to_send)
    
>               assert list(executor.tasks.keys()) == [
                    ("success", "fake_simple_ti", execute_date, 0),
                    ("fail", "fake_simple_ti", execute_date, 0),
                ]
E               AssertionError: assert equals failed
E                 [                                [                               
E                                                    (                             
E                                                      'success',                  
E                                                      'fake_simple_ti',           
E                                                      datetime.datetime(2024, 4,  
E                                                  29, 13, 8, 45, 559826),         
E                                                      0,                          
E                                                    ),                            
E                   (                                (                             
E                     'fail',                          'fail',                     
E                     'fake_simple_ti',                'fake_simple_ti',           
E                     datetime.datetime(2024, 4,       datetime.datetime(2024, 4,  
E                 29, 13, 8, 45, 559826),          29, 13, 8, 45, 559826),         
E                     0,                               0,                          
E                   ),                               ),                            
E                 ]                                ]

tests/integration/executors/test_celery_executor.py:149: AssertionError

@dabla dabla changed the title Implement default response handler method and suppress JSONDecodeError Implement default response handler and use deferable mechanism in MSGraphSensor Apr 30, 2024
@dabla dabla changed the title Implement default response handler and use deferable mechanism in MSGraphSensor Implemented MSGraphSensor as a deferrable sensor Apr 30, 2024
@dabla dabla marked this pull request as ready for review May 1, 2024 18:00
@eladkal eladkal merged commit a61f393 into apache:main May 5, 2024
40 checks passed
@dabla dabla deleted the feature/default_response_handler branch May 6, 2024 06:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants