From 6396d4e90f8ac225fc53f5b41010f772d79b35e4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 6 Mar 2022 13:43:15 -0800 Subject: [PATCH] Add Pandas support --- python/deltalake/writer.py | 12 +++++++++++- python/tests/test_writer.py | 12 ++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 4ee89fdbcd..575c7e77b5 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -4,6 +4,7 @@ from datetime import date, datetime from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Union +import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pyarrow.fs as pa_fs @@ -26,7 +27,13 @@ class AddAction: def write_deltalake( table_or_uri: Union[str, DeltaTable], - data: Union[pa.Table, pa.RecordBatch, Iterable[pa.RecordBatch], RecordBatchReader], + data: Union[ + pd.DataFrame, + pa.Table, + pa.RecordBatch, + Iterable[pa.RecordBatch], + RecordBatchReader, + ], schema: Optional[pa.Schema] = None, partition_by: Optional[Iterable[str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, @@ -48,6 +55,9 @@ def write_deltalake( replace table with new data. If 'ignore', will not write anything if table already exists. """ + if isinstance(data, pd.DataFrame): + data = pa.Table.from_pandas(data) + if schema is None: if isinstance(data, RecordBatchReader): schema = data.schema diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 53206de227..2cbc3142ee 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -6,6 +6,7 @@ import pyarrow as pa import pyarrow.compute as pc import pytest +from pandas.testing import assert_frame_equal from pyarrow.lib import RecordBatchReader from deltalake import DeltaTable, write_deltalake @@ -146,6 +147,17 @@ def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Ta ) +def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table): + # When timestamp is converted to Pandas, it gets casted to ns resolution, + # but Delta Lake schemas only support us resolution. + sample_pandas = sample_data.to_pandas().drop(["timestamp"], axis=1) + write_deltalake(str(tmp_path), sample_pandas) + + delta_table = DeltaTable(str(tmp_path)) + df = delta_table.to_pandas() + assert_frame_equal(df, sample_pandas) + + def test_write_iterator( tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table ):