Skip to content

Commit

Permalink
docs: Refresh docs about scheduling Airflow from Spark (OpenLineage#3293
Browse files Browse the repository at this point in the history
)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Dec 2, 2024
1 parent d3b50ee commit b1d5935
Showing 1 changed file with 52 additions and 24 deletions.
76 changes: 52 additions & 24 deletions website/docs/integrations/spark/configuration/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,58 @@ title: Scheduling from Airflow
---


The same parameters passed to `spark-submit` can be supplied from Airflow and other schedulers. If
using the [openlineage-airflow](../../airflow/airflow.md) integration, each task in the DAG has its own Run id
which can be connected to the Spark job run via the `spark.openlineage.parentRunId` parameter. For example,
here is an example of a `DataProcPySparkOperator` that submits a Pyspark application on Dataproc:
The same parameters that are passed to `spark-submit` can also be supplied directly from **Airflow**
and other schedulers, allowing for seamless configuration and execution of Spark jobs.

When using the [`OpenLineage Airflow`](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html)
integration with operators that submit Spark jobs, the entire Spark OpenLineage integration can be configured
directly within Airflow.

### Preserving Job Hierarchy

To establish a correct job hierarchy in lineage tracking, the Spark application and lineage backend require
identifiers of the parent job that triggered the Spark job. These identifiers allow the Spark integration
to automatically add a `parentRunFacet` to the application-level OpenLineage event, facilitating the linkage
of the Spark job to its originating (Airflow) job in the lineage graph.

The following properties are necessary for the automatic creation of the `parentRunFacet`:

- `spark.openlineage.parentJobNamespace`
- `spark.openlineage.parentJobName`
- `spark.openlineage.parentRunId`

Refer to the [Spark Configuration](spark_conf.md) documentation for more information on these properties.

OpenLineage Airflow integration provides powerful [macros](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/macros.html)
that can be used to dynamically generate these identifiers.

### Example

Below is an example of a `DataprocSubmitJobOperator` that submits a PySpark application to Dataproc cluster:

```python
t1 = DataProcPySparkOperator(
task_id=job_name,
gcp_conn_id='google_cloud_default',
project_id='project_id',
cluster_name='cluster-name',
region='us-west1',
main='gs://bucket/your-prog.py',
job_name=job_name,
dataproc_pyspark_properties={
"spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
"spark.jars.packages": "io.openlineage:openlineage-spark:1.0.0+",
"spark.openlineage.transport.url": openlineage_url,
"spark.openlineage.transport.auth.apiKey": api_key,
"spark.openlineage.transport.auth.type": api_key,
"spark.openlineage.namespace": openlineage_spark_namespace,
"spark.openlineage.parentJobNamespace": openlineage_airflow_namespace,
"spark.openlineage.parentJobName": job_name,
"spark.openlineage.parentRunId": "{{ lineage_parent_id(run_id, task, task_instance) }}
t1 = DataprocSubmitJobOperator(
task_id="task_id",
project_id="project_id",
region='eu-central2',
job={
"reference": {"project_id": "project_id"},
"placement": {"cluster_name": "cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://bucket/your-prog.py",
"properties": {
"spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
"spark.jars.packages": "io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:{{PREPROCESSOR:OPENLINEAGE_VERSION}}",
"spark.openlineage.transport.url": openlineage_url,
"spark.openlineage.transport.auth.apiKey": api_key,
"spark.openlineage.transport.auth.type": "apiKey",
"spark.openlineage.namespace": openlineage_spark_namespace,
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
}
},
},
dag=dag)
```
dag=dag
)
```

0 comments on commit b1d5935

Please sign in to comment.