Skip to content

Commit

Permalink
add an integration with pytest against pyspark (#3176)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Nov 24, 2022
1 parent 22deab0 commit 007fb4c
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 6 deletions.
34 changes: 32 additions & 2 deletions .github/workflows/parquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
# tests for parquet crate
name: "parquet"


# trigger for all PRs that touch certain files and changes to master
on:
push:
Expand Down Expand Up @@ -58,7 +57,6 @@ jobs:
- name: Test --all-features
run: cargo test -p parquet --all-features


# test compilation
linux-features:
name: Check Compilation
Expand Down Expand Up @@ -120,6 +118,38 @@ jobs:
- name: Build wasm32-wasi
run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi

pyspark-integration-test:
name: PySpark Integration Test
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: Install Python dependencies
run: |
cd parquet/pytest
pip install -r requirements.txt
- name: Black check the test files
run: |
cd parquet/pytest
black --check *.py --verbose
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
- name: Install binary for checking
run: cargo install --path parquet --bin parquet-show-bloom-filter --features=arrow,cli
- name: Run pytest
run: |
cd parquet/pytest
pytest -v
clippy:
name: Clippy
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ $RECYCLE.BIN/
# Windows shortcuts
*.lnk

# Python virtual env in parquet crate
parquet/pytest/venv/
__pycache__/
1 change: 0 additions & 1 deletion parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:

- `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
- `async` - support `async` APIs for reading parquet
- `json` - support for reading / writing `json` data to / from parquet
- `brotli` (default) - support for parquet using `brotli` compression
Expand Down
65 changes: 65 additions & 0 deletions parquet/pytest/pyspark_integration_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyspark.sql
import tempfile
import subprocess
import pathlib


def create_data_and_df():
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.conf.set("parquet.bloom.filter.enabled", True)
spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
spark.conf.set("parquet.bloom.filter.max.bytes", 32)
data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(100)]
df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
return data, df


def get_expected_output(data):
expected = ["Row group #0", "=" * 80]
for v in data:
expected.append(f"Value {v[0]} is present in bloom filter")
for v in data:
expected.append(f"Value {v[1]} is absent in bloom filter")
expected = "\n".join(expected) + "\n"
return expected.encode("utf-8")


def get_cli_output(output_dir, data, col_name="id"):
# take the first (and only) parquet file
parquet_file = sorted(pathlib.Path(output_dir).glob("*.parquet"))[0]
args = [
"parquet-show-bloom-filter",
"--file-name",
parquet_file,
"--column",
col_name,
]
for v in data:
args.extend(["--values", v[0]])
for v in data:
args.extend(["--values", v[1]])
return subprocess.check_output(args)


def test_pyspark_bloom_filter():
data, df = create_data_and_df()
with tempfile.TemporaryDirectory() as output_dir:
df.write.parquet(output_dir, mode="overwrite")
cli_output = get_cli_output(output_dir, data)
assert cli_output == get_expected_output(data)
20 changes: 20 additions & 0 deletions parquet/pytest/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
pytest
pyspark
black

102 changes: 102 additions & 0 deletions parquet/pytest/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This file is autogenerated by pip-compile with python 3.10
# To update, run:
#
# pip-compile --generate-hashes --resolver=backtracking
#
attrs==22.1.0 \
--hash=sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6 \
--hash=sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c
# via pytest
black==22.10.0 \
--hash=sha256:14ff67aec0a47c424bc99b71005202045dc09270da44a27848d534600ac64fc7 \
--hash=sha256:197df8509263b0b8614e1df1756b1dd41be6738eed2ba9e9769f3880c2b9d7b6 \
--hash=sha256:1e464456d24e23d11fced2bc8c47ef66d471f845c7b7a42f3bd77bf3d1789650 \
--hash=sha256:2039230db3c6c639bd84efe3292ec7b06e9214a2992cd9beb293d639c6402edb \
--hash=sha256:21199526696b8f09c3997e2b4db8d0b108d801a348414264d2eb8eb2532e540d \
--hash=sha256:2644b5d63633702bc2c5f3754b1b475378fbbfb481f62319388235d0cd104c2d \
--hash=sha256:432247333090c8c5366e69627ccb363bc58514ae3e63f7fc75c54b1ea80fa7de \
--hash=sha256:444ebfb4e441254e87bad00c661fe32df9969b2bf224373a448d8aca2132b395 \
--hash=sha256:5b9b29da4f564ba8787c119f37d174f2b69cdfdf9015b7d8c5c16121ddc054ae \
--hash=sha256:5cc42ca67989e9c3cf859e84c2bf014f6633db63d1cbdf8fdb666dcd9e77e3fa \
--hash=sha256:5d8f74030e67087b219b032aa33a919fae8806d49c867846bfacde57f43972ef \
--hash=sha256:72ef3925f30e12a184889aac03d77d031056860ccae8a1e519f6cbb742736383 \
--hash=sha256:819dc789f4498ecc91438a7de64427c73b45035e2e3680c92e18795a839ebb66 \
--hash=sha256:915ace4ff03fdfff953962fa672d44be269deb2eaf88499a0f8805221bc68c87 \
--hash=sha256:9311e99228ae10023300ecac05be5a296f60d2fd10fff31cf5c1fa4ca4b1988d \
--hash=sha256:974308c58d057a651d182208a484ce80a26dac0caef2895836a92dd6ebd725e0 \
--hash=sha256:b8b49776299fece66bffaafe357d929ca9451450f5466e997a7285ab0fe28e3b \
--hash=sha256:c957b2b4ea88587b46cf49d1dc17681c1e672864fd7af32fc1e9664d572b3458 \
--hash=sha256:e41a86c6c650bcecc6633ee3180d80a025db041a8e2398dcc059b3afa8382cd4 \
--hash=sha256:f513588da599943e0cde4e32cc9879e825d58720d6557062d1098c5ad80080e1 \
--hash=sha256:fba8a281e570adafb79f7755ac8721b6cf1bbf691186a287e990c7929c7692ff
# via -r requirements.in
click==8.1.3 \
--hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \
--hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48
# via black
exceptiongroup==1.0.4 \
--hash=sha256:542adf9dea4055530d6e1279602fa5cb11dab2395fa650b8674eaec35fc4a828 \
--hash=sha256:bd14967b79cd9bdb54d97323216f8fdf533e278df937aa2a90089e7d6e06e5ec
# via pytest
iniconfig==1.1.1 \
--hash=sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3 \
--hash=sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32
# via pytest
mypy-extensions==0.4.3 \
--hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
--hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8
# via black
packaging==21.3 \
--hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \
--hash=sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522
# via pytest
pathspec==0.10.2 \
--hash=sha256:88c2606f2c1e818b978540f73ecc908e13999c6c3a383daf3705652ae79807a5 \
--hash=sha256:8f6bf73e5758fd365ef5d58ce09ac7c27d2833a8d7da51712eac6e27e35141b0
# via black
platformdirs==2.5.4 \
--hash=sha256:1006647646d80f16130f052404c6b901e80ee4ed6bef6792e1f238a8969106f7 \
--hash=sha256:af0276409f9a02373d540bf8480021a048711d572745aef4b7842dad245eba10
# via black
pluggy==1.0.0 \
--hash=sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159 \
--hash=sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3
# via pytest
py4j==0.10.9.5 \
--hash=sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6 \
--hash=sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04
# via pyspark
pyparsing==3.0.9 \
--hash=sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb \
--hash=sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc
# via packaging
pyspark==3.3.1 \
--hash=sha256:e99fa7de92be406884bfd831c32b9306a3a99de44cfc39a2eefb6ed07445d5fa
# via -r requirements.in
pytest==7.2.0 \
--hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \
--hash=sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59
# via -r requirements.in
tomli==2.0.1 \
--hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \
--hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f
# via
# black
# pytest
19 changes: 16 additions & 3 deletions parquet/src/bin/parquet-show-bloom-filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
//! ```
use clap::Parser;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::{
properties::ReaderProperties,
reader::{FileReader, SerializedFileReader},
serialized_reader::ReadOptionsBuilder,
};
use std::{fs::File, path::Path};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -63,8 +67,17 @@ fn main() {
let path = Path::new(&file_name);
let file = File::open(path).expect("Unable to open file");

let file_reader =
SerializedFileReader::new(file).expect("Unable to open file as Parquet");
let file_reader = SerializedFileReader::new_with_options(
file,
ReadOptionsBuilder::new()
.with_reader_properties(
ReaderProperties::builder()
.set_read_bloom_filter(true)
.build(),
)
.build(),
)
.expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{}", ri);
Expand Down

0 comments on commit 007fb4c

Please sign in to comment.