Skip to content

Commit

Permalink
Add Pandas support
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Mar 6, 2022
1 parent 0f884c5 commit 6396d4e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
12 changes: 11 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import date, datetime
from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Union

import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs as pa_fs
Expand All @@ -26,7 +27,13 @@ class AddAction:

def write_deltalake(
table_or_uri: Union[str, DeltaTable],
data: Union[pa.Table, pa.RecordBatch, Iterable[pa.RecordBatch], RecordBatchReader],
data: Union[
pd.DataFrame,
pa.Table,
pa.RecordBatch,
Iterable[pa.RecordBatch],
RecordBatchReader,
],
schema: Optional[pa.Schema] = None,
partition_by: Optional[Iterable[str]] = None,
filesystem: Optional[pa_fs.FileSystem] = None,
Expand All @@ -48,6 +55,9 @@ def write_deltalake(
replace table with new data. If 'ignore', will not write anything if
table already exists.
"""
if isinstance(data, pd.DataFrame):
data = pa.Table.from_pandas(data)

if schema is None:
if isinstance(data, RecordBatchReader):
schema = data.schema
Expand Down
12 changes: 12 additions & 0 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pyarrow as pa
import pyarrow.compute as pc
import pytest
from pandas.testing import assert_frame_equal
from pyarrow.lib import RecordBatchReader

from deltalake import DeltaTable, write_deltalake
Expand Down Expand Up @@ -146,6 +147,17 @@ def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Ta
)


def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table):
# When timestamp is converted to Pandas, it gets casted to ns resolution,
# but Delta Lake schemas only support us resolution.
sample_pandas = sample_data.to_pandas().drop(["timestamp"], axis=1)
write_deltalake(str(tmp_path), sample_pandas)

delta_table = DeltaTable(str(tmp_path))
df = delta_table.to_pandas()
assert_frame_equal(df, sample_pandas)


def test_write_iterator(
tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table
):
Expand Down

0 comments on commit 6396d4e

Please sign in to comment.