Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteTriggerSyncOperator does not work on deferrable mode #37449

Closed
1 of 2 tasks
henriquemeloo opened this issue Feb 15, 2024 · 3 comments · Fixed by #38860
Closed
1 of 2 tasks

AirbyteTriggerSyncOperator does not work on deferrable mode #37449

henriquemeloo opened this issue Feb 15, 2024 · 3 comments · Fixed by #38860

Comments

@henriquemeloo
Copy link

henriquemeloo commented Feb 15, 2024

Apache Airflow Provider(s)

airbyte

Versions of Apache Airflow Providers

apache-airflow-providers-airbyte==3.6.0
apache-airflow-providers-http==4.5.1

Apache Airflow version

2.7.1

Operating System

Ubuntu 22.04.3 LTS

Deployment

Docker-Compose

Deployment details

No response

What happened

AirbyteTriggerSyncOperator does not work properly in deferrable mode.

What you think should happen instead

No response

How to reproduce

Create the following DAG, replacing "id_of_airbyte_connection_to_be_synced" with the corresponding value.

from datetime import datetime

from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import \
    AirbyteTriggerSyncOperator


AIRFLOW_AIRBYTE_CONN_ID = "airbyte_default"
AIRBYTE_CONNECTION_ID = "id_of_airbyte_connection_to_be_synced"  # replace this


with DAG("test_dag", start_date=datetime.min, catchup=False) as dag:
    not_deferrable = AirbyteTriggerSyncOperator(
        task_id="not_deferrable",
        airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID,
        connection_id=AIRBYTE_CONNECTION_ID,
        deferrable=False
    )

    deferrable = AirbyteTriggerSyncOperator(
        task_id="deferrable",
        airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID,
        connection_id=AIRBYTE_CONNECTION_ID,
        deferrable=True
    )

The not_deferrable task works, while the deferrable task fails. The "airbyte_default" connection is set in all containers via the environment variable

AIRFLOW_CONN_AIRBYTE_DEFAULT='{
    "conn_type": "airbyte",
    "host": "airbyte-proxy",
    "port": 8000
}'

Anything else

The deferrable task fails with the following log:

deferrable.log
78f82177fe4f
*** Found local files:
***   * /opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log
***   * /opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log.trigger.1205.log
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1359} INFO - Starting attempt 1 of 1
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1380} INFO - Executing <Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14 15:14:37.364888+00:00
[2024-02-15, 15:14:38 UTC] {standard_task_runner.py:57} INFO - Started process 7210 to run task
[2024-02-15, 15:14:38 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'deferrable', 'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1213', '--raw', '--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7v9zx5oh']
[2024-02-15, 15:14:38 UTC] {standard_task_runner.py:85} INFO - Job 1213: Subtask deferrable
[2024-02-15, 15:14:38 UTC] {task_command.py:415} INFO - Running <TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [running]> on host 78f82177fe4f
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='deferrable' AIRFLOW_CTX_EXECUTION_DATE='2024-02-14T15:14:37.364888+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-14T15:14:37.364888+00:00'
[2024-02-15, 15:14:38 UTC] {base.py:73} INFO - Using connection ID 'airbyte_default' for task execution.
[2024-02-15, 15:14:43 UTC] {airbyte.py:86} INFO - Job 224 was submitted to Airbyte Server
[2024-02-15, 15:14:43 UTC] {airbyte.py:88} INFO - Waiting for job 224 to complete
[2024-02-15, 15:14:43 UTC] {taskinstance.py:1524} INFO - Pausing task as DEFERRED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437, start_date=20240215T151438
[2024-02-15, 15:14:44 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 100 (task deferral)
[2024-02-15, 15:14:44 UTC] {airbyte.py:104} INFO - Getting the status of job run 224.
[2024-02-15, 15:14:44 UTC] {base.py:73} INFO - Using connection ID 'airbyte_default' for task execution.
[2024-02-15, 15:14:44 UTC] {airbyte.py:88} INFO - URL for api request: airbyte-proxy/api/v1/jobs/get
[2024-02-15, 15:14:44 UTC] {triggerer_job_runner.py:599} INFO - Trigger test_dag/scheduled__2024-02-14T15:14:37.364888+00:00/deferrable/-1/1 (ID 17) fired: TriggerEvent<{'status': 'error', 'message': 'airbyte-proxy/api/v1/jobs/get', 'job_id': 224}>
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1357} INFO - Resuming after deferral
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1380} INFO - Executing <Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14 15:14:37.364888+00:00
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:57} INFO - Started process 7238 to run task
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'deferrable', 'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1214', '--raw', '--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7eh43099']
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:85} INFO - Job 1214: Subtask deferrable
[2024-02-15, 15:14:46 UTC] {task_command.py:415} INFO - Running <TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00 [running]> on host 78f82177fe4f
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1608, in resume_execution
    return execute_callable(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/airbyte/operators/airbyte.py", line 124, in execute_complete
    raise AirflowException(event["message"])
airflow.exceptions.AirflowException: airbyte-proxy/api/v1/jobs/get
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437, start_date=20240215T151438, end_date=20240215T151446
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 1214 for task deferrable (airbyte-proxy/api/v1/jobs/get; 7238)
[2024-02-15, 15:14:46 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-02-15, 15:14:46 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks scheduled from follow-on schedule check

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@henriquemeloo henriquemeloo added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Feb 15, 2024
Copy link

boring-cyborg bot commented Feb 15, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@josix
Copy link
Contributor

josix commented Mar 8, 2024

I could help investigate it if there is no one working on this. Please feel free to assign me, thanks.

@eladkal eladkal added good first issue provider:airbyte and removed needs-triage label for new issues that we didn't triage yet labels Mar 8, 2024
@josix
Copy link
Contributor

josix commented Apr 8, 2024

I did some investigation, the root cause of this issue is that the url format is invalid for aiohttp so it would raise the following error. Note that I used the local airbyte to verify it, which the host of airbyte is host.docker.internal. However, I think this error is identical to the use case that sets the host to airbyte-proxy mentioned in this issue.

Traceback (most recent call last):
  File "/opt/***/***/jobs/triggerer_job_runner.py", line 531, in cleanup_finished_triggers
    result = details["task"].result()
  File "/opt/***/***/jobs/triggerer_job_runner.py", line 603, in run_trigger
    async for event in trigger.run():
  File "/opt/***/***/providers/airbyte/triggers/airbyte.py", line 68, in run
    while await self.is_still_running(hook):
  File "/opt/***/***/providers/airbyte/triggers/airbyte.py", line 111, in is_still_running
    job_run_status = await hook.get_job_status(self.job_id)
  File "/opt/***/***/providers/airbyte/hooks/airbyte.py", line 105, in get_job_status
    response = await self.get_job_details(job_id=job_id)
  File "/opt/***/***/providers/airbyte/hooks/airbyte.py", line 90, in get_job_details
    async with session.post(url=url, data=json.dumps({"id": job_id})) as response:
  File "/usr/local/lib/python3.8/site-packages/aiohttp/client.py", line 1194, in __aenter__
    self._resp = await self._coro
  File "/usr/local/lib/python3.8/site-packages/aiohttp/client.py", line 545, in _request
    req = self._request_class(
  File "/usr/local/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 326, in __init__
    self.update_host(url)
  File "/usr/local/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 400, in update_host
    raise InvalidURL(url)
aiohttp.client_exceptions.InvalidURL: host.docker.internal/api/v1/jobs/get

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants