Skip to content

Commit

Permalink
fix(ingest/airflow): simplify env configuration (#11371)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 17, 2024
1 parent da958b7 commit 1d83131
Show file tree
Hide file tree
Showing 15 changed files with 1,458 additions and 779 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
"true",
"1",
)
_RUN_IN_THREAD_TIMEOUT = 30
_RUN_IN_THREAD_TIMEOUT = float(
os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 15)
)
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"


Expand Down Expand Up @@ -129,12 +131,22 @@ def wrapper(*args, **kwargs):
)
thread.start()

thread.join(timeout=_RUN_IN_THREAD_TIMEOUT)
if thread.is_alive():
logger.warning(
f"Thread for {f.__name__} is still running after {_RUN_IN_THREAD_TIMEOUT} seconds. "
"Continuing without waiting for it to finish."
)
if _RUN_IN_THREAD_TIMEOUT > 0:
# If _RUN_IN_THREAD_TIMEOUT is 0, we just kick off the thread and move on.
# Because it's a daemon thread, it'll be automatically killed when the main
# thread exists.

start_time = time.time()
thread.join(timeout=_RUN_IN_THREAD_TIMEOUT)
if thread.is_alive():
logger.warning(
f"Thread for {f.__name__} is still running after {_RUN_IN_THREAD_TIMEOUT} seconds. "
"Continuing without waiting for it to finish."
)
else:
logger.debug(
f"Thread for {f.__name__} finished after {time.time() - start_time} seconds"
)
else:
f(*args, **kwargs)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
"name": "basic_iolets"
"name": "basic_iolets",
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -92,7 +93,8 @@
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -244,34 +246,6 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "'all_success'",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down Expand Up @@ -572,8 +546,8 @@
"json": {
"timestampMillis": 1717180290951,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 2
Expand All @@ -589,8 +563,8 @@
"json": {
"timestampMillis": 1717180291140,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
"name": "simple_dag",
"description": "A simple DAG that runs a few fake data tasks."
"description": "A simple DAG that runs a few fake data tasks.",
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -93,7 +94,8 @@
"name": "task_1",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -203,34 +205,6 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'task_1'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'task_1'",
"trigger_rule": "'all_success'",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down Expand Up @@ -447,8 +421,8 @@
"json": {
"timestampMillis": 1717180227827,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 2
Expand All @@ -464,8 +438,8 @@
"json": {
"timestampMillis": 1717180228022,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand All @@ -475,30 +449,6 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "<fileloc>",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
"name": "simple_dag",
"description": "A simple DAG that runs a few fake data tasks."
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
Expand Down Expand Up @@ -558,7 +508,8 @@
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -624,34 +575,6 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_another_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_another_data_task'",
"trigger_rule": "'all_success'",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
Expand Down Expand Up @@ -758,8 +681,8 @@
"json": {
"timestampMillis": 1717180231676,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 2
Expand All @@ -775,8 +698,8 @@
"json": {
"timestampMillis": 1717180231824,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
"name": "basic_iolets"
"name": "basic_iolets",
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -113,13 +114,14 @@
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"airflow.operators.bash.BashOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.18.0/integration/airflow\", \"_schemaURL\": \"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"airflow.operators.bash.BashOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
}
},
Expand Down Expand Up @@ -423,8 +425,8 @@
"json": {
"timestampMillis": 1717179624988,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
Expand All @@ -440,8 +442,8 @@
"json": {
"timestampMillis": 1717179625524,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
Expand All @@ -458,44 +460,15 @@
"json": {
"timestampMillis": 1717179625547,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1717179625547
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"depends_on_past\": false, \"downstream_task_ids\": \"[]\", \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"is_setup\": false, \"is_teardown\": false, \"mapped\": false, \"operator_class\": \"airflow.operators.bash.BashOperator\", \"owner\": \"airflow\", \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_exponential_backoff\": false, \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": \"[]\", \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down Expand Up @@ -642,8 +615,8 @@
"json": {
"timestampMillis": 1717179625632,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
Expand Down
Loading

0 comments on commit 1d83131

Please sign in to comment.