Skip to content

Commit

Permalink
Improving transcode dataset docs (#3833)
Browse files Browse the repository at this point in the history
* Updated section name

Signed-off-by: Elena Khaustova <[email protected]>

* Added incorrect use of transcoding section

Signed-off-by: Elena Khaustova <[email protected]>

* Added a clarification on how trancoding is used

Signed-off-by: Elena Khaustova <[email protected]>

* Added examples explanation

Signed-off-by: Elena Khaustova <[email protected]>

* Updated section reference

Signed-off-by: Elena Khaustova <[email protected]>

* Applied suggested change of section name

Signed-off-by: Elena Khaustova <[email protected]>

* Rephrased wordings to avoid github warnings

Signed-off-by: Elena Khaustova <[email protected]>

* Updated section reference

Signed-off-by: Elena Khaustova <[email protected]>

* Suggestions for the transcoding docs (#3837)

* Suggestions

Signed-off-by: Ahdra Merali <[email protected]>

* Apply suggestions from review

Signed-off-by: Ahdra Merali <[email protected]>

* Fix broken internal link

Signed-off-by: Ahdra Merali <[email protected]>

* Appease Vale (as much as possible)

Signed-off-by: Ahdra Merali <[email protected]>

---------

Signed-off-by: Ahdra Merali <[email protected]>

* Added missing quotes

Signed-off-by: Elena Khaustova <[email protected]>

* Applied suggested changes

Signed-off-by: Elena Khaustova <[email protected]>

* Added admonition

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed typo

Signed-off-by: Elena Khaustova <[email protected]>

---------

Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Ahdra Merali <[email protected]>
Co-authored-by: Ahdra Merali <[email protected]>
  • Loading branch information
ElenaKhaustova and AhdraMeraliQB authored Apr 29, 2024
1 parent a94d832 commit 9da8b37
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
90 changes: 84 additions & 6 deletions docs/source/data/data_catalog_yaml_examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,15 @@ airplanes:
In this example, the default `csv` configuration is inserted into `airplanes` and then the `load_args` block is overridden. Normally, that would replace the whole dictionary. In order to extend `load_args`, the defaults for that block are then re-inserted.

## Read the same file using two different datasets
## Read the same file using different datasets with transcoding

You might come across a situation where you would like to read the same file using two different dataset implementations (known as transcoding). For example, Parquet files can not only be loaded via the `ParquetDataset` using `pandas`, but also directly by `SparkDataset`. This conversion is typical when coordinating a `Spark` to `pandas` workflow.
You might come across a situation where you would like to read the same file using two different `Dataset` implementations. You can achieve this by using transcoding to define separate `DataCatalog` entries that point to the same `filepath`.

Define two `DataCatalog` entries for the same dataset in a common format (for example, Parquet, JSON, CSV) in your `conf/base/catalog.yml`:
### How to use transcoding

Consider an example with Parquet files. Parquet files can be loaded with both the `pandas.ParquetDataset`, and the `spark.SparkDataset` directly. This conversion is typical when coordinating a `Spark` to `pandas` workflow.

To load the same file as both a `pandas.ParquetDataset` and a `spark.SparkDataset`, define two `DataCatalog` entries for the same dataset in your `conf/base/catalog.yml`:

```yaml
my_dataframe@spark:
Expand All @@ -382,13 +386,13 @@ my_dataframe@pandas:
filepath: data/02_intermediate/data.parquet
```

These entries are used in the pipeline like this:
When using transcoding you must ensure the filepaths defined for each catalog entry share the same format (for example: CSV, JSON, Parquet). These entries can then be used in the pipeline as follows:

```python
pipeline(
[
node(func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
node(func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"),
node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
node(name="my_func2_node", func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"),
]
)
```
Expand All @@ -398,6 +402,80 @@ In this example, Kedro understands that `my_dataframe` is the same dataset in it
In the pipeline, Kedro uses the `spark.SparkDataset` implementation for saving and `pandas.ParquetDataset`
for loading, so the first node outputs a `pyspark.sql.DataFrame`, while the second node receives a `pandas.Dataframe`.

### How *not* to use transcoding

Kedro pipelines automatically resolve the node execution order and check to ensure there are no circular dependencies in the pipeline. It is during this process that the transcoded datasets are resolved and the transcoding notation `@...` is stripped. This means within the pipeline the datasets `my_dataframe@spark` and `my_dataframe@pandas` are considered to be one `my_dataframe` dataset. The `DataCatalog`, however, treats transcoded entries as separate datasets, as they are only resolved as part of the pipeline resolution process. This results in differences between your defined pipeline in `pipeline.py` and the resolved pipeline that is run by Kedro, and these differences may lead to unintended behaviours. Thus, it is important to be aware of this when using transcoding.

```{caution}
Below are some examples where transcoding may produce unwanted side effects and raise errors.
```

#### Defining a node with the same inputs and outputs

Consider the following pipeline:

```python
pipeline(
[
node(name="my_func1_node", func=my_func1, inputs="my_dataframe@pandas", outputs="my_dataframe@spark"),
]
)
```

During the pipeline resolution, the node above is defined as having the dataset `my_dataset` as both its input and output. As a node cannot have the same inputs and outputs, trying to run this pipeline will fail with the following error:

```console
ValueError: Failed to create node my_func1([my_dataframe@pandas]) -> [my_dataframe@spark].
A node cannot have the same inputs and outputs even if they are transcoded: {'my_dataframe'}
```

#### Defining several nodes that share the same output

Consider the following pipeline:

```python
pipeline(
[
node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe@pandas"),
]
)
```

When this pipeline is resolved, both nodes are defined as returning the same output `my_dataset`, which is not allowed. Running the pipeline will fail with the following error:

```console
kedro.pipeline.pipeline.OutputNotUniqueError: Output(s) ['my_dataframe'] are returned by more than one nodes. Node outputs must be unique.
```

#### Creating pipelines with hidden dependencies

Consider the following pipeline:

```python
pipeline(
[
node(name="my_func1_node", func=my_func1, inputs="my_dataframe@spark", outputs="spark_output"),
node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe@pandas"),
node(name="my_func3_node", func=my_func3, inputs="my_dataframe@pandas", outputs="pandas_output"),
]
)
```

In this example, there is a single dependency between the nodes `my_func3_node` and `my_func2_node`. However, when this pipeline is resolved there are some hidden dependencies that will restrict the node execution order. We can expose them by removing the transcoding notation:

```python
resolved_pipeline(
[
node(name="my_func1_node", func=my_func1, inputs="my_dataframe", outputs="spark_output"),
node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe"),
node(name="my_func3_node", func=my_func3, inputs="my_dataframe", outputs="pandas_output"),
]
)
```

When the node order is resolved, we can see that the node `my_func1_node` is treated as dependent on the node `my_func2_node`. This pipeline will still run without any errors, but one should be careful about creating hidden dependencies as they can decrease performance, for example, when using the `ParallelRunner`.

## Create a Data Catalog YAML configuration file via the CLI

You can use the [`kedro catalog create` command to create a Data Catalog YAML configuration](../development/commands_reference.md#create-a-data-catalog-yaml-configuration-file).
Expand Down
2 changes: 1 addition & 1 deletion docs/source/integrations/pyspark_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ assert isinstance(df, pyspark.sql.DataFrame)
[Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python).

We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-two-different-datasets):
We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-different-datasets-with-transcoding):

* To create a Delta table, use a `SparkDataset` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table or overwrite it.
* To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataset` and perform the write operations within the node function.
Expand Down

0 comments on commit 9da8b37

Please sign in to comment.