Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate pyiceberg with Dask #5800

Open
Fokko opened this issue Sep 20, 2022 · 8 comments
Open

Integrate pyiceberg with Dask #5800

Fokko opened this issue Sep 20, 2022 · 8 comments
Assignees

Comments

@Fokko
Copy link
Contributor

Fokko commented Sep 20, 2022

Feature Request / Improvement

It would be awesome to integrate pyiceberg with Dask to allow reading in data from iceberg tables into Dask.

Query engine

Other

@Fokko Fokko added this to the Python 0.2.0 release milestone Sep 20, 2022
@martindurant
Copy link

cc martindurant/daskberg#1

@TomAugspurger
Copy link

TomAugspurger commented Feb 24, 2023

Here's a rough version of a DataScan to Dask DataFrame.

from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
from pyiceberg.io.pyarrow import _file_to_table
from pyiceberg.io.pyarrow import (
    PyArrowFileIO, bind, extract_field_ids, schema_to_pyarrow, MapType, ListType,
)
import pyarrow as pa
import dask
import dask.dataframe as dd    


def _file_to_pandas(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive):
    return _file_to_table(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive).to_pandas()


def to_dask_dataframe(sc):
    """Convert a DataScan to a Dask DataFrame"""
    # arguments
    tasks = list(sc.plan_files())
    table = sc.table
    row_filter = sc.row_filter
    projected_schema = sc.projection()
    case_sensitive = sc.case_sensitive

    # stuff stolen from to_arrow()
    if isinstance(table.io, PyArrowFileIO):
        scheme, _ = PyArrowFileIO.parse_location(table.location())
        fs = table.io.get_fs(scheme)
    else:
        raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")

    bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)

    projected_field_ids = {
        id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
    }.union(extract_field_ids(bound_row_filter))

    # build the Dask DataFrame
    schema = schema_to_pyarrow(projected_schema)
    names = [x.name for x in projected_schema.fields]
    meta = pa.table([[]] * len(schema.names), schema=schema).to_pandas()
    # TODO: ensure deterministic
    token = dask.base.tokenize(fs, bound_row_filter, projected_schema, projected_field_ids, case_sensitive)
    name = f'from-iceberg-{token}'

    dsk = {
        (name, i): (
            _file_to_pandas, fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive
        )
        for i, task in enumerate(tasks)
    }
    divisions = [None] * len(dsk)
    df = dd.DataFrame(dsk, name, meta, divisions)

    return df

It seems to work, but I haven't tested it beyond a basic df.head(). A couple notes:

  1. This returns a Dask DataFrame with a single Dask partition per "scan file" in .scan_files(). Which is maybe equal to the number of parquet files?
  2. Currently, divisions is set to None, which is sub-optimal. (xref https://docs.dask.org/en/stable/dataframe-design.html?highlight=divisions#partitions, https://docs.dask.org/en/stable/dataframe-parquet.html?highlight=divisions#calculating-divisions). I'm seeing some stuff in DataFile that might help with setting those properly.
  3. I've never used (py)iceberg before, but I was pleasantly surprised with how straightforward this was. This is copy-pasting bits and pieces out of pyiceberg.io.pyarrow. With a proper refactor, this is even fewer net new lines of code.
  4. On the Dask side, we would be interested in making this more sophisticated, to allow operations on the Dask DataFrame to affect the original scan (Expressions and Query Optimization (again) dask/dask#9970 and linked issues)

@grobgl
Copy link
Contributor

grobgl commented May 18, 2023

I'm keen to push this forward. @TomAugspurger's implementation works in single-threaded mode but fails in a distributed scenario due to current lack of pickle support (I raised a separate issue #7644).

Extending Tom's approach, this is a solution which utilises Dask's from_map and DataFramIOFunction which allows us to pass projected columns to the Parquet reader:

class IcebergFunctionWrapper(DataFrameIOFunction):
    def __init__(
        self,
        fs: FileSystem,
        bound_row_filter: BooleanExpression,
        projected_schema: Schema,
        case_sensitive: bool,
    ):
        self._fs = fs
        self._bound_row_filter = bound_row_filter
        self._projected_schema = projected_schema
        self._case_sensitive = case_sensitive
        self._projected_field_ids = {
            id for id in projected_schema.field_ids
            if not isinstance(projected_schema.find_type(id), (MapType, ListType))
        }.union(extract_field_ids(bound_row_filter))
        super().__init__()
    
    @property
    def columns(self) -> List[str]:
        self._projected_schema.column_names

    @property
    def empty_table(self) -> pd.DataFrame:
        return schema_to_pyarrow(self._projected_schema).empty_table().to_pandas(date_as_object=False)

    def project_columns(self, columns: Sequence[str]) -> 'IcebergFunctionWrapper':
        if list(columns) == self.columns:
            return self

        return IcebergFunctionWrapper(
            self._fs,
            self._bound_row_filter,
            self._projected_schema.select(*columns),
            self._case_sensitive,
        )

    def __call__(self, task: FileScanTask) -> pd.DataFrame:
        table = _file_to_table(
            self._fs,
            task,
            self._bound_row_filter,
            self._projected_schema,
            self._projected_field_ids,
            self._case_sensitive,
            0,  # no limit support yet
        )

        if table is None:
            return self.empty_table

        return table.to_pandas(date_as_object=False)


def to_dask_dataframe(scan: DataScan) -> dd.DataFrame:
    tasks = scan.plan_files()
    table = scan.table
    row_filter = scan.row_filter
    projected_schema = scan.projection()
    case_sensitive = scan.case_sensitive

    scheme, _ = PyArrowFileIO.parse_location(table.location())
    if isinstance(table.io, PyArrowFileIO):
        fs = table.io.get_fs(scheme)
    else:
        try:
            from pyiceberg.io.fsspec import FsspecFileIO

            if isinstance(table.io, FsspecFileIO):
                fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
            else:
                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
        except ModuleNotFoundError as e:
            # When FsSpec is not installed
            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e

    bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)

    io_func = IcebergFunctionWrapper(fs, bound_row_filter, projected_schema, case_sensitive)

    return dd.from_map(
        io_func,
        tasks,
        meta=io_func.empty_table,
        enforce_metadata=False,
    )

I'm also looking into adding divisions support and row-group-level parallelisation to this.

Generally, should this be part of the Dask library instead of PyIceberg?

@TomAugspurger
Copy link

Generally, should this be part of the Dask library instead of PyIceberg?

I was wondering that too. I think either would be sensible, but I'd lean slightly towards putting the implementation in Dask (and pyiceberg could add a .to_dask_dataframe to mirror to_pandas if desired).

That said, the current implementation does use a private _file_to_table. I think pyiceberg would need to add a public method that achieves that before Dask could use it. And given the similarity to pyiceberg.io.pyarrow, maybe we could start with some refactoring to add some methods used by both the pyarrow and dask.dataframe implementations?

@Fokko
Copy link
Contributor Author

Fokko commented Jul 3, 2023

PyIceberg 0.4.0 has been released today, and should fix the pickling issues! 👍🏻

@sam-goodwin
Copy link

What's the latest on this issue? Also keen to write to Iceberg directly from Dask.

@TomAugspurger
Copy link

I think the main decision point was around where this should live: dask or pyiceberg.

I don't see the private _file_to_table in https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py anymore. If we can write the snippets provided in this issue using just public pyiceberg APIs, I think that a PR to Dask would be reasonable.

@theearthwanderer
Copy link

Hello, is there any update on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants