From b1d5935d8775a8f6d084629385af8c5c4de7433d Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Mon, 2 Dec 2024 11:28:38 +0100 Subject: [PATCH] docs: Refresh docs about scheduling Airflow from Spark (#3293) Signed-off-by: Kacper Muda --- .../spark/configuration/airflow.md | 76 +++++++++++++------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/website/docs/integrations/spark/configuration/airflow.md b/website/docs/integrations/spark/configuration/airflow.md index 8adba52045..086e15e36b 100644 --- a/website/docs/integrations/spark/configuration/airflow.md +++ b/website/docs/integrations/spark/configuration/airflow.md @@ -4,30 +4,58 @@ title: Scheduling from Airflow --- -The same parameters passed to `spark-submit` can be supplied from Airflow and other schedulers. If -using the [openlineage-airflow](../../airflow/airflow.md) integration, each task in the DAG has its own Run id -which can be connected to the Spark job run via the `spark.openlineage.parentRunId` parameter. For example, -here is an example of a `DataProcPySparkOperator` that submits a Pyspark application on Dataproc: +The same parameters that are passed to `spark-submit` can also be supplied directly from **Airflow** +and other schedulers, allowing for seamless configuration and execution of Spark jobs. + +When using the [`OpenLineage Airflow`](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html) +integration with operators that submit Spark jobs, the entire Spark OpenLineage integration can be configured +directly within Airflow. + +### Preserving Job Hierarchy + +To establish a correct job hierarchy in lineage tracking, the Spark application and lineage backend require +identifiers of the parent job that triggered the Spark job. These identifiers allow the Spark integration +to automatically add a `parentRunFacet` to the application-level OpenLineage event, facilitating the linkage +of the Spark job to its originating (Airflow) job in the lineage graph. + +The following properties are necessary for the automatic creation of the `parentRunFacet`: + +- `spark.openlineage.parentJobNamespace` +- `spark.openlineage.parentJobName` +- `spark.openlineage.parentRunId` + +Refer to the [Spark Configuration](spark_conf.md) documentation for more information on these properties. + +OpenLineage Airflow integration provides powerful [macros](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/macros.html) +that can be used to dynamically generate these identifiers. + +### Example + +Below is an example of a `DataprocSubmitJobOperator` that submits a PySpark application to Dataproc cluster: ```python -t1 = DataProcPySparkOperator( - task_id=job_name, - gcp_conn_id='google_cloud_default', - project_id='project_id', - cluster_name='cluster-name', - region='us-west1', - main='gs://bucket/your-prog.py', - job_name=job_name, - dataproc_pyspark_properties={ - "spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener", - "spark.jars.packages": "io.openlineage:openlineage-spark:1.0.0+", - "spark.openlineage.transport.url": openlineage_url, - "spark.openlineage.transport.auth.apiKey": api_key, - "spark.openlineage.transport.auth.type": api_key, - "spark.openlineage.namespace": openlineage_spark_namespace, - "spark.openlineage.parentJobNamespace": openlineage_airflow_namespace, - "spark.openlineage.parentJobName": job_name, - "spark.openlineage.parentRunId": "{{ lineage_parent_id(run_id, task, task_instance) }} +t1 = DataprocSubmitJobOperator( + task_id="task_id", + project_id="project_id", + region='eu-central2', + job={ + "reference": {"project_id": "project_id"}, + "placement": {"cluster_name": "cluster_name"}, + "pyspark_job": { + "main_python_file_uri": "gs://bucket/your-prog.py", + "properties": { + "spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener", + "spark.jars.packages": "io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:{{PREPROCESSOR:OPENLINEAGE_VERSION}}", + "spark.openlineage.transport.url": openlineage_url, + "spark.openlineage.transport.auth.apiKey": api_key, + "spark.openlineage.transport.auth.type": "apiKey", + "spark.openlineage.namespace": openlineage_spark_namespace, + "spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}", + "spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}", + "spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}", + } + }, }, - dag=dag) -``` + dag=dag +) +``` \ No newline at end of file