Skip to content

Commit

Permalink
Merge string-view2 branch: reading from parquet up to 2x faster for…
Browse files Browse the repository at this point in the history
… some ClickBench queries (not on by default) (apache#11667)

* Pin to pre-release version of arrow 52.2.0

* Update for deprecated method

* Add a config to force using string view in benchmark (apache#11514)

* add a knob to force string view in benchmark

* fix sql logic test

* update doc

* fix ci

* fix ci only test

* Update benchmarks/src/util/options.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/common/src/config.rs

Co-authored-by: Andrew Lamb <[email protected]>

* update tests

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Add String view helper functions (apache#11517)

* add functions

* add tests for hash util

* Add ArrowBytesViewMap and ArrowBytesViewSet (apache#11515)

* Update `string-view` branch to arrow-rs main (apache#10966)

* Pin to arrow main

* Fix clippy with latest arrow

* Uncomment test that needs new arrow-rs to work

* Update datafusion-cli Cargo.lock

* Update Cargo.lock

* tapelo

* merge

* update cast

* consistent dep

* fix ci

* add more tests

* make doc happy

* update new implementation

* fix bug

* avoid unused dep

* update dep

* update

* fix cargo check

* update doc

* pick up the comments change again

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Enable `GroupValueBytesView` for aggregation with StringView types (apache#11519)

* add functions

* Update `string-view` branch to arrow-rs main (apache#10966)

* Pin to arrow main

* Fix clippy with latest arrow

* Uncomment test that needs new arrow-rs to work

* Update datafusion-cli Cargo.lock

* Update Cargo.lock

* tapelo

* merge

* update cast

* consistent dep

* fix ci

* avoid unused dep

* update dep

* update

* fix cargo check

* better group value view aggregation

* update

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Initial support for regex_replace on `StringViewArray` (apache#11556)

* initial support for string view regex

* update tests

* Add support for Utf8View for date/temporal codepaths (apache#11518)

* Add StringView support for date_part and make_date funcs

* run cargo update in datafusion-cli

* cargo fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>

* GC `StringViewArray` in `CoalesceBatchesStream` (apache#11587)

* gc string view when appropriate

* make clippy happy

* address comments

* make doc happy

* update style

* Add comments and tests for gc_string_view_batch

* better herustic

* update test

* Update datafusion/physical-plan/src/coalesce_batches.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>

* [Bug] fix bug in return type inference of `utf8_to_int_type` (apache#11662)

* fix bug in return type inference

* update doc

* add tests

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Fix clippy

* Increase ByteViewMap block size to 2MB (apache#11674)

* better default block size

* fix related test

* Change `--string-view` to only apply to parquet formats (apache#11663)

* use inferenced schema, don't load schema again

* move config to parquet-only

* update

* update

* better format

* format

* update

* Implement native support StringView for character length (apache#11676)

* native support for character length

* Update datafusion/functions/src/unicode/character_length.rs

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Remove uneeded patches

* cargo fmt

---------

Co-authored-by: Xiangpeng Hao <[email protected]>
Co-authored-by: Xiangpeng Hao <[email protected]>
Co-authored-by: Andrew Duffy <[email protected]>
  • Loading branch information
4 people authored Jul 29, 2024
1 parent ea8c287 commit a591301
Show file tree
Hide file tree
Showing 40 changed files with 1,714 additions and 192 deletions.
8 changes: 7 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ impl RunOpt {
None => queries.min_query_id()..=queries.max_query_id(),
};

let config = self.common.config();
let mut config = self.common.config();
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;

Expand Down
7 changes: 7 additions & 0 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -339,6 +344,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -372,6 +378,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub struct CommonOpt {
/// Activate debug mode to see more details
#[structopt(short, long)]
pub debug: bool,

/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub string_view: bool,
}

impl CommonOpt {
Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions datafusion/common/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use arrow::{
},
datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType},
};
use arrow_array::{BinaryViewArray, StringViewArray};

// Downcast ArrayRef to Date32Array
pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> {
Expand Down Expand Up @@ -87,6 +88,11 @@ pub fn as_string_array(array: &dyn Array) -> Result<&StringArray> {
Ok(downcast_value!(array, StringArray))
}

// Downcast ArrayRef to StringViewArray
pub fn as_string_view_array(array: &dyn Array) -> Result<&StringViewArray> {
Ok(downcast_value!(array, StringViewArray))
}

// Downcast ArrayRef to UInt32Array
pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array> {
Ok(downcast_value!(array, UInt32Array))
Expand Down Expand Up @@ -221,6 +227,11 @@ pub fn as_binary_array(array: &dyn Array) -> Result<&BinaryArray> {
Ok(downcast_value!(array, BinaryArray))
}

// Downcast ArrayRef to BinaryViewArray
pub fn as_binary_view_array(array: &dyn Array) -> Result<&BinaryViewArray> {
Ok(downcast_value!(array, BinaryViewArray))
}

// Downcast ArrayRef to FixedSizeListArray
pub fn as_fixed_size_list_array(array: &dyn Array) -> Result<&FixedSizeListArray> {
Ok(downcast_value!(array, FixedSizeListArray))
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl ParquetOptions {
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_string_view: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -440,6 +441,7 @@ mod tests {
maximum_buffered_record_batches_per_stream: defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_string_view: defaults.schema_force_string_view,
}
}

Expand Down Expand Up @@ -540,6 +542,8 @@ mod tests {
maximum_buffered_record_batches_per_stream: global_options_defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_string_view: global_options_defaults
.schema_force_string_view,
},
column_specific_options,
key_value_metadata,
Expand Down
125 changes: 110 additions & 15 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use arrow_buffer::IntervalMonthDayNano;

#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
as_large_list_array, as_list_array, as_map_array, as_primitive_array,
as_string_array, as_struct_array,
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -415,8 +415,10 @@ pub fn create_hashes<'a>(
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
Expand Down Expand Up @@ -540,22 +542,57 @@ mod tests {
Ok(())
}

#[test]
fn create_hashes_binary() -> Result<()> {
let byte_array = Arc::new(BinaryArray::from_vec(vec![
&[4, 3, 2],
&[4, 3, 2],
&[1, 2, 3],
]));
macro_rules! create_hash_binary {
($NAME:ident, $ARRAY:ty) => {
#[cfg(not(feature = "force_hash_collisions"))]
#[test]
fn $NAME() {
let binary = [
Some(b"short".to_byte_slice()),
None,
Some(b"long but different 12 bytes string"),
Some(b"short2"),
Some(b"Longer than 12 bytes string"),
Some(b"short"),
Some(b"Longer than 12 bytes string"),
];

let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut binary_hashes = vec![0; binary.len()];
create_hashes(&[binary_array], &random_state, &mut binary_hashes)
.unwrap();

let mut ref_hashes = vec![0; binary.len()];
create_hashes(&[ref_array], &random_state, &mut ref_hashes).unwrap();

// Null values result in a zero hash,
for (val, hash) in binary.iter().zip(binary_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; byte_array.len()];
let hashes = create_hashes(&[byte_array], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 3,);
// same logical values should hash to the same hash value
assert_eq!(binary_hashes, ref_hashes);

Ok(())
// Same values should map to same hash values
assert_eq!(binary[0], binary[5]);
assert_eq!(binary[4], binary[6]);

// different binary should map to different hash values
assert_ne!(binary[0], binary[2]);
}
};
}

create_hash_binary!(binary_array, BinaryArray);
create_hash_binary!(binary_view_array, BinaryViewArray);

#[test]
fn create_hashes_fixed_size_binary() -> Result<()> {
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
Expand All @@ -571,6 +608,64 @@ mod tests {
Ok(())
}

macro_rules! create_hash_string {
($NAME:ident, $ARRAY:ty) => {
#[cfg(not(feature = "force_hash_collisions"))]
#[test]
fn $NAME() {
let strings = [
Some("short"),
None,
Some("long but different 12 bytes string"),
Some("short2"),
Some("Longer than 12 bytes string"),
Some("short"),
Some("Longer than 12 bytes string"),
];

let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array = Arc::new(
strings
.iter()
.cloned()
.collect::<DictionaryArray<Int8Type>>(),
);

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut string_hashes = vec![0; strings.len()];
create_hashes(&[string_array], &random_state, &mut string_hashes)
.unwrap();

let mut dict_hashes = vec![0; strings.len()];
create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap();

// Null values result in a zero hash,
for (val, hash) in strings.iter().zip(string_hashes.iter()) {
match val {
Some(_) => assert_ne!(*hash, 0),
None => assert_eq!(*hash, 0),
}
}

// same logical values should hash to the same hash value
assert_eq!(string_hashes, dict_hashes);

// Same values should map to same hash values
assert_eq!(strings[0], strings[5]);
assert_eq!(strings[4], strings[6]);

// different strings should map to different hash values
assert_ne!(strings[0], strings[2]);
}
};
}

create_hash_string!(string_array, StringArray);
create_hash_string!(large_string_array, LargeStringArray);
create_hash_string!(string_view_array, StringArray);
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -204,6 +205,28 @@ pub fn file_type_to_format(
}
}

/// Transform a schema to use view types for Utf8 and Binary
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileFormatFactory, FileScanConfig};
use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -316,6 +316,17 @@ impl FileFormat for ParquetFormat {
Schema::try_merge(schemas)
}?;

let schema = if state
.config_options()
.execution
.parquet
.schema_force_string_view
{
transform_schema_to_view(&schema)
} else {
schema
};

Ok(Arc::new(schema))
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ impl ListingOptions {
.try_collect()
.await?;

self.format.infer_schema(state, &store, &files).await
let schema = self.format.infer_schema(state, &store, &files).await?;

Ok(schema)
}

/// Infers the partition columns stored in `LOCATION` and compares
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,10 @@ impl ExecutionPlan for ParquetExec {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
schema_force_string_view: self
.table_parquet_options
.global
.schema_force_string_view,
};

let stream =
Expand Down
Loading

0 comments on commit a591301

Please sign in to comment.