Skip to content

Commit

Permalink
fix(ingest/airflow): simplify env configuration (datahub-project#11371)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and asikowitz committed Sep 17, 2024
1 parent 494d0ff commit 6e68a64
Show file tree
Hide file tree
Showing 15 changed files with 1,458 additions and 779 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
"true",
"1",
)
_RUN_IN_THREAD_TIMEOUT = 30
_RUN_IN_THREAD_TIMEOUT = float(
os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 15)
)
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"


Expand Down Expand Up @@ -129,12 +131,22 @@ def wrapper(*args, **kwargs):
)
thread.start()

thread.join(timeout=_RUN_IN_THREAD_TIMEOUT)
if thread.is_alive():
logger.warning(
f"Thread for {f.__name__} is still running after {_RUN_IN_THREAD_TIMEOUT} seconds. "
"Continuing without waiting for it to finish."
)
if _RUN_IN_THREAD_TIMEOUT > 0:
# If _RUN_IN_THREAD_TIMEOUT is 0, we just kick off the thread and move on.
# Because it's a daemon thread, it'll be automatically killed when the main
# thread exists.

start_time = time.time()
thread.join(timeout=_RUN_IN_THREAD_TIMEOUT)
if thread.is_alive():
logger.warning(
f"Thread for {f.__name__} is still running after {_RUN_IN_THREAD_TIMEOUT} seconds. "
"Continuing without waiting for it to finish."
)
else:
logger.debug(
f"Thread for {f.__name__} finished after {time.time() - start_time} seconds"
)
else:
f(*args, **kwargs)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
"name": "basic_iolets"
"name": "basic_iolets",
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -92,7 +93,8 @@
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -244,34 +246,6 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "'all_success'",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down Expand Up @@ -572,8 +546,8 @@
"json": {
"timestampMillis": 1717180290951,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 2
Expand All @@ -589,8 +563,8 @@
"json": {
"timestampMillis": 1717180291140,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
"name": "simple_dag",
"description": "A simple DAG that runs a few fake data tasks."
"description": "A simple DAG that runs a few fake data tasks.",
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -93,7 +94,8 @@
"name": "task_1",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -203,34 +205,6 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'task_1'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'task_1'",
"trigger_rule": "'all_success'",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down Expand Up @@ -447,8 +421,8 @@
"json": {
"timestampMillis": 1717180227827,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 2
Expand All @@ -464,8 +438,8 @@
"json": {
"timestampMillis": 1717180228022,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand All @@ -475,30 +449,6 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "<fileloc>",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
"name": "simple_dag",
"description": "A simple DAG that runs a few fake data tasks."
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
Expand Down Expand Up @@ -558,7 +508,8 @@
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -624,34 +575,6 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_another_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_another_data_task'",
"trigger_rule": "'all_success'",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
Expand Down Expand Up @@ -758,8 +681,8 @@
"json": {
"timestampMillis": 1717180231676,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 2
Expand All @@ -775,8 +698,8 @@
"json": {
"timestampMillis": 1717180231824,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
"name": "basic_iolets"
"name": "basic_iolets",
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -113,13 +114,14 @@
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"airflow.operators.bash.BashOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.18.0/integration/airflow\", \"_schemaURL\": \"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"airflow.operators.bash.BashOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -423,8 +425,8 @@
"json": {
"timestampMillis": 1717179624988,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
Expand All @@ -440,8 +442,8 @@
"json": {
"timestampMillis": 1717179625524,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
Expand All @@ -458,44 +460,15 @@
"json": {
"timestampMillis": 1717179625547,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1717179625547
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"airflow.operators.bash.BashOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down Expand Up @@ -642,8 +615,8 @@
"json": {
"timestampMillis": 1717179625632,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand Down
Loading

0 comments on commit 6e68a64

Please sign in to comment.