Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve README and add more examples #137

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 40 additions & 101 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,139 +24,79 @@

This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).

Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.
Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
files, run it in a multi-threaded environment, and obtain the result back in Python.

It also allows you to use UDFs and UDAFs for complex operations.

The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.
The major advantage of this library over other execution engines is that this library achieves zero-copy between
Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
from having to lock the GIL when running those operations.

Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks.
Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
about thread safety and lack of memory leaks.

Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).

## How to use it
## Example Usage

Simple usage:
The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results
in a Pandas DataFrame, and then plotting a chart.

```python
import datafusion
from datafusion import col
import pyarrow

# create a context
ctx = datafusion.SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

# create a new statement
df = df.select(
col("a") + col("b"),
col("a") - col("b"),
)

# execute and collect the first (and only) batch
result = df.collect()[0]

assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])
```

### UDFs

```python
import pyarrow
from datafusion import udf

def is_null(array: pyarrow.Array) -> pyarrow.Array:
return array.is_null()

is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')

# create a context
ctx = datafusion.SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

df = df.select(is_null_arr(col("a")))
The Parquet file used in this example can be downloaded from the following page:

result = df.collect()[0]
- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

assert result.column(0) == pyarrow.array([False] * 3)
```

### UDAF
See the [examples](examples) directory for more examples.

```python
import pyarrow
import pyarrow.compute
import datafusion
from datafusion import udaf, Accumulator
from datafusion import col


class MyAccumulator(Accumulator):
"""
Interface of a user-defined accumulation.
"""
def __init__(self):
self._sum = pyarrow.scalar(0.0)

def update(self, values: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
from datafusion import SessionContext
import pandas as pd
import pyarrow as pa

def merge(self, states: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())
# Create a DataFusion context
ctx = SessionContext()

def state(self) -> pyarrow.Array:
return pyarrow.array([self._sum.as_py()])
# Register table with context
ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet')

def evaluate(self) -> pyarrow.Scalar:
return self._sum
# Execute SQL
df = ctx.sql("select passenger_count, count(*) "
"from taxi "
"where passenger_count is not null "
"group by passenger_count "
"order by passenger_count")

# create a context
ctx = datafusion.SessionContext()
# collect as list of pyarrow.RecordBatch
results = df.collect()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])
# get first batch
batch = results[0]

my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
# convert to Pandas
df = batch.to_pandas()

df = df.aggregate(
[],
[my_udaf(col("a"))]
)
# create a chart
fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
fig.savefig('chart.png')
```

result = df.collect()[0]
This produces the following chart:

assert result.column(0) == pyarrow.array([6.0])
```
![Chart](examples/chart.png)

## How to install (from pip)

### Pip

```bash
pip install datafusion
# or
python -m pip install datafusion
```

### Conda

```bash
conda install -c conda-forge datafusion
```
Expand All @@ -169,7 +109,6 @@ You can verify the installation by running:
'0.6.0'
```


## How to develop

This assumes that you have rust and cargo installed. We use the workflow recommended by [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin).
Expand Down
27 changes: 27 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Python Examples

- [Query a Parquet file using SQL](./sql-parquet.py)
- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
- [Query PyArrow Data](./query-pyarrow-data.py)
- [Register a Python UDF with DataFusion](./python-udf.py)
- [Register a Python UDAF with DataFusion](./python-udaf.py)
Binary file added examples/chart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions examples/dataframe-parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datafusion import SessionContext
from datafusion import functions as f

ctx = SessionContext()
df = ctx.read_parquet(
"/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
).aggregate([f.col("passenger_count")], [f.count_star()])
df.show()
74 changes: 74 additions & 0 deletions examples/python-udaf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pyarrow
import pyarrow.compute
import datafusion
from datafusion import udaf, Accumulator
from datafusion import col


class MyAccumulator(Accumulator):
"""
Interface of a user-defined accumulation.
"""

def __init__(self):
self._sum = pyarrow.scalar(0.0)

def update(self, values: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(
self._sum.as_py() + pyarrow.compute.sum(values).as_py()
)

def merge(self, states: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(
self._sum.as_py() + pyarrow.compute.sum(states).as_py()
)

def state(self) -> pyarrow.Array:
return pyarrow.array([self._sum.as_py()])

def evaluate(self) -> pyarrow.Scalar:
return self._sum


# create a context
ctx = datafusion.SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

my_udaf = udaf(
MyAccumulator,
pyarrow.float64(),
pyarrow.float64(),
[pyarrow.float64()],
"stable",
)

df = df.aggregate([], [my_udaf(col("a"))])

result = df.collect()[0]

assert result.column(0) == pyarrow.array([6.0])
42 changes: 42 additions & 0 deletions examples/python-udf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pyarrow
from datafusion import udf, SessionContext, functions as f


def is_null(array: pyarrow.Array) -> pyarrow.Array:
return array.is_null()


is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), "stable")

# create a context
ctx = SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

df = df.select(is_null_arr(f.col("a")))

result = df.collect()[0]

assert result.column(0) == pyarrow.array([False] * 3)
Loading