From c7e88a257ca53d6bdbfd839b1051cbec717b9b88 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 18 Jun 2024 04:37:29 +0800 Subject: [PATCH 01/16] fix: prevent potential out-of-range access in FixedSizeListArray (#5902) * fix: prevent potential out-of-range access in FixedSizeListArray Signed-off-by: BubbleCal * add benchmark & format Signed-off-by: BubbleCal * format Cargo.toml Signed-off-by: BubbleCal --------- Signed-off-by: BubbleCal --- arrow-array/Cargo.toml | 4 ++ arrow-array/benches/fixed_size_list_array.rs | 51 +++++++++++++++++++ .../src/array/fixed_size_list_array.rs | 8 +-- 3 files changed, 59 insertions(+), 4 deletions(-) create mode 100644 arrow-array/benches/fixed_size_list_array.rs diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index a8dbeded9ce7..bf6e27f6b232 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -66,3 +66,7 @@ harness = false [[bench]] name = "gc_view_types" harness = false + +[[bench]] +name = "fixed_size_list_array" +harness = false diff --git a/arrow-array/benches/fixed_size_list_array.rs b/arrow-array/benches/fixed_size_list_array.rs new file mode 100644 index 000000000000..5f001a4f3d3a --- /dev/null +++ b/arrow-array/benches/fixed_size_list_array.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, FixedSizeListArray, Int32Array}; +use arrow_schema::Field; +use criterion::*; +use rand::{thread_rng, Rng}; +use std::sync::Arc; + +fn gen_fsl(len: usize, value_len: usize) -> FixedSizeListArray { + let mut rng = thread_rng(); + let values = Arc::new(Int32Array::from( + (0..len).map(|_| rng.gen::()).collect::>(), + )); + let field = Arc::new(Field::new("item", values.data_type().clone(), true)); + FixedSizeListArray::new(field, value_len as i32, values, None) +} + +fn criterion_benchmark(c: &mut Criterion) { + let len = 4096; + for value_len in [1, 32, 1024] { + let fsl = gen_fsl(len, value_len); + c.bench_function( + &format!("fixed_size_list_array(len: {len}, value_len: {value_len})"), + |b| { + b.iter(|| { + for i in 0..len / value_len { + black_box(fsl.value(i)); + } + }); + }, + ); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/arrow-array/src/array/fixed_size_list_array.rs b/arrow-array/src/array/fixed_size_list_array.rs index 7bb4e0c5ee5a..6f3a76908723 100644 --- a/arrow-array/src/array/fixed_size_list_array.rs +++ b/arrow-array/src/array/fixed_size_list_array.rs @@ -245,7 +245,7 @@ impl FixedSizeListArray { /// Returns ith value of this list array. pub fn value(&self, i: usize) -> ArrayRef { self.values - .slice(self.value_offset(i) as usize, self.value_length() as usize) + .slice(self.value_offset_at(i), self.value_length() as usize) } /// Returns the offset for value at index `i`. @@ -253,7 +253,7 @@ impl FixedSizeListArray { /// Note this doesn't do any bound checking, for performance reason. #[inline] pub fn value_offset(&self, i: usize) -> i32 { - self.value_offset_at(i) + self.value_offset_at(i) as i32 } /// Returns the length for an element. @@ -265,8 +265,8 @@ impl FixedSizeListArray { } #[inline] - const fn value_offset_at(&self, i: usize) -> i32 { - i as i32 * self.value_length + const fn value_offset_at(&self, i: usize) -> usize { + i * self.value_length as usize } /// Returns a zero-copy slice of this array with the indicated offset and length. From 20a569a733f450e463eea2d635d052958a71f750 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 19 Jun 2024 18:43:05 -0700 Subject: [PATCH 02/16] fix: Adjust FFI_ArrowArray offset based on the offset of offset buffer (#5895) * fix: Adjust FFI_ArrowArray offset based on the offset of offset buffer * For review * Fix clippy * For review --- arrow-array/src/ffi.rs | 56 ++++++++++++++++++++++++++++ arrow-buffer/src/buffer/immutable.rs | 13 +++++++ arrow-data/src/ffi.rs | 55 +++++++++++++++++++++++++-- 3 files changed, 120 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/ffi.rs b/arrow-array/src/ffi.rs index 088a0a6ab58a..fe755b91f765 100644 --- a/arrow-array/src/ffi.rs +++ b/arrow-array/src/ffi.rs @@ -1225,6 +1225,7 @@ mod tests_from_ffi { use std::sync::Arc; use arrow_buffer::{bit_util, buffer::Buffer, MutableBuffer, OffsetBuffer}; + use arrow_data::transform::MutableArrayData; use arrow_data::ArrayData; use arrow_schema::{DataType, Field}; @@ -1234,6 +1235,7 @@ mod tests_from_ffi { Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array, }, ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}, + make_array, ArrayRef, }; use super::{ImportedArrowArray, Result}; @@ -1458,4 +1460,58 @@ mod tests_from_ffi { test_round_trip(&imported_array.consume()?) } + + fn roundtrip_string_array(array: StringArray) -> StringArray { + let data = array.into_data(); + + let array = FFI_ArrowArray::new(&data); + let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap(); + + let array = unsafe { from_ffi(array, &schema) }.unwrap(); + StringArray::from(array) + } + + fn extend_array(array: &dyn Array) -> ArrayRef { + let len = array.len(); + let data = array.to_data(); + + let mut mutable = MutableArrayData::new(vec![&data], false, len); + mutable.extend(0, 0, len); + make_array(mutable.freeze()) + } + + #[test] + fn test_extend_imported_string_slice() { + let mut strings = vec![]; + + for i in 0..1000 { + strings.push(format!("string: {}", i)); + } + + let string_array = StringArray::from(strings); + + let imported = roundtrip_string_array(string_array.clone()); + assert_eq!(imported.len(), 1000); + assert_eq!(imported.value(0), "string: 0"); + assert_eq!(imported.value(499), "string: 499"); + + let copied = extend_array(&imported); + assert_eq!( + copied.as_any().downcast_ref::().unwrap(), + &imported + ); + + let slice = string_array.slice(500, 500); + + let imported = roundtrip_string_array(slice); + assert_eq!(imported.len(), 500); + assert_eq!(imported.value(0), "string: 500"); + assert_eq!(imported.value(499), "string: 999"); + + let copied = extend_array(&imported); + assert_eq!( + copied.as_any().downcast_ref::().unwrap(), + &imported + ); + } } diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index f26cde05b7ab..2c743842fb05 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -71,6 +71,19 @@ impl Buffer { } } + /// Returns the offset, in bytes, of `Self::ptr` to `Self::data` + /// + /// self.ptr and self.data can be different after slicing or advancing the buffer. + pub fn ptr_offset(&self) -> usize { + // Safety: `ptr` is always in bounds of `data`. + unsafe { self.ptr.offset_from(self.data.ptr().as_ptr()) as usize } + } + + /// Returns the pointer to the start of the buffer without the offset. + pub fn data_ptr(&self) -> NonNull { + self.data.ptr() + } + /// Create a [`Buffer`] from the provided [`Vec`] without copying #[inline] pub fn from_vec(vec: Vec) -> Self { diff --git a/arrow-data/src/ffi.rs b/arrow-data/src/ffi.rs index 589f7dac6d19..7fe87d4ae107 100644 --- a/arrow-data/src/ffi.rs +++ b/arrow-data/src/ffi.rs @@ -131,6 +131,37 @@ impl FFI_ArrowArray { data.buffers().iter().map(|b| Some(b.clone())).collect() }; + // Handle buffer offset for offset buffer. + let offset_offset = match data.data_type() { + DataType::Utf8 | DataType::Binary => { + // Offset buffer is possible a slice of the buffer. + // If we use slice pointer as exported buffer pointer, it will cause + // the consumer to calculate incorrect length of data buffer (buffer 1). + // We need to get the offset of the offset buffer and fill it in + // the `FFI_ArrowArray` offset field. + Some(data.buffers()[0].ptr_offset() / std::mem::size_of::()) + } + DataType::LargeUtf8 | DataType::LargeBinary => { + // Offset buffer is possible a slice of the buffer. + // If we use slice pointer as exported buffer pointer, it will cause + // the consumer to calculate incorrect length of data buffer (buffer 1). + // We need to get the offset of the offset buffer and fill it in + // the `FFI_ArrowArray` offset field. + Some(data.buffers()[0].ptr_offset() / std::mem::size_of::()) + } + _ => None, + }; + + let offset = if let Some(offset) = offset_offset { + if data.offset() != 0 { + // TODO: Adjust for data offset + panic!("The ArrayData of a slice offset buffer should not have offset"); + } + offset + } else { + data.offset() + }; + // `n_buffers` is the number of buffers by the spec. let n_buffers = { data_layout.buffers.len() + { @@ -143,9 +174,25 @@ impl FFI_ArrowArray { let buffers_ptr = buffers .iter() - .flat_map(|maybe_buffer| match maybe_buffer { - // note that `raw_data` takes into account the buffer's offset - Some(b) => Some(b.as_ptr() as *const c_void), + .enumerate() + .flat_map(|(buffer_idx, maybe_buffer)| match maybe_buffer { + Some(b) => { + match (data.data_type(), buffer_idx) { + ( + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary, + 1, + ) => { + // For offset buffer, take original pointer without offset. + // Buffer offset should be handled by `FFI_ArrowArray` offset field. + Some(b.data_ptr().as_ptr() as *const c_void) + } + // For other buffers, note that `raw_data` takes into account the buffer's offset + _ => Some(b.as_ptr() as *const c_void), + } + } // This is for null buffer. We only put a null pointer for // null buffer if by spec it can contain null mask. None if data_layout.can_contain_null_mask => Some(std::ptr::null()), @@ -186,7 +233,7 @@ impl FFI_ArrowArray { Self { length: data.len() as i64, null_count: null_count as i64, - offset: data.offset() as i64, + offset: offset as i64, n_buffers, n_children, buffers: private_data.buffers_ptr.as_mut_ptr(), From 2f1c0147b29d32cf119075fad45ea419bddfca5f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 21 Jun 2024 10:09:11 -0400 Subject: [PATCH 03/16] Update proc-macro2 requirement from =1.0.85 to =1.0.86 (#5927) Updates the requirements on [proc-macro2](https://github.com/dtolnay/proc-macro2) to permit the latest version. - [Release notes](https://github.com/dtolnay/proc-macro2/releases) - [Commits](https://github.com/dtolnay/proc-macro2/compare/1.0.85...1.0.86) --- updated-dependencies: - dependency-name: proc-macro2 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- arrow-flight/gen/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml index 8c4587cec845..7264a527ca8d 100644 --- a/arrow-flight/gen/Cargo.toml +++ b/arrow-flight/gen/Cargo.toml @@ -32,6 +32,6 @@ publish = false [dependencies] # Pin specific version of the tonic-build dependencies to avoid auto-generated # (and checked in) arrow.flight.protocol.rs from changing -proc-macro2 = { version = "=1.0.85", default-features = false } +proc-macro2 = { version = "=1.0.86", default-features = false } prost-build = { version = "=0.12.6", default-features = false } tonic-build = { version = "=0.11.0", default-features = false, features = ["transport", "prost"] } From 29c07d00ad2727a00497e10af1ff6992c2290e5c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 21 Jun 2024 22:11:05 +0800 Subject: [PATCH 04/16] docs: Add object_store_opendal as related projects (#5926) * docs: Add object_store_opendal as related projects Signed-off-by: Xuanwo * Update README.md --------- Signed-off-by: Xuanwo Co-authored-by: Andrew Lamb --- README.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ac676f7df4d5..fdcd3a665e3b 100644 --- a/README.md +++ b/README.md @@ -79,15 +79,19 @@ versions approximately every 2 months. ## Related Projects -There are two related crates in different repositories +There are several related crates in different repositories -| Crate | Description | Documentation | -| -------------- | --------------------------------------- | ----------------------------- | -| [`datafusion`] | In-memory query engine with SQL support | [(README)][datafusion-readme] | -| [`ballista`] | Distributed query execution | [(README)][ballista-readme] | +| Crate | Description | Documentation | +| ------------------------ | ------------------------------------------- | --------------------------------------- | +| [`datafusion`] | In-memory query engine with SQL support | [(README)][datafusion-readme] | +| [`ballista`] | Distributed query execution | [(README)][ballista-readme] | +| [`object_store_opendal`] | Use [`opendal`] as [`object_store`] backend | [(README)][object_store_opendal-readme] | [`datafusion`]: https://crates.io/crates/datafusion [`ballista`]: https://crates.io/crates/ballista +[`object_store_opendal`]: https://crates.io/crates/object_store_opendal +[`opendal`]: https://crates.io/crates/opendal +[`object_store_opendal-readme`]: https://github.com/apache/opendal/blob/main/integrations/object_store/README.md Collectively, these crates support a wider array of functionality for analytic computations in Rust. From 492bba98fbae24837aaa75948f203012fd8ac595 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 21 Jun 2024 23:35:22 +0800 Subject: [PATCH 05/16] docs: Fix broken links of object_store_opendal README (#5929) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fdcd3a665e3b..335ae2415eb7 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ There are several related crates in different repositories [`ballista`]: https://crates.io/crates/ballista [`object_store_opendal`]: https://crates.io/crates/object_store_opendal [`opendal`]: https://crates.io/crates/opendal -[`object_store_opendal-readme`]: https://github.com/apache/opendal/blob/main/integrations/object_store/README.md +[object_store_opendal-readme]: https://github.com/apache/opendal/blob/main/integrations/object_store/README.md Collectively, these crates support a wider array of functionality for analytic computations in Rust. From 3930d5b056b8157e0b5c5eb01b1076e476acf99b Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Fri, 21 Jun 2024 17:56:25 +0200 Subject: [PATCH 06/16] Write Bloom filters between row groups instead of the end (#5860) * Add example script to write Parquet files with a Bloom filter * Write Bloom filters between row groups instead of the end This allows Bloom filters to not be saved in memory, which can save significant space when writing long files * Add WriterProperties::bloom_filter_position * Mutate the right row group metadata When using BloomFilterPosition::AfterRowGroup this was only writing the Bloom Filter offset to a temporary clone of the metadata, causing the Bloom Filter to never be seen by readers * Add a test for Bloom Filters written at the end * Update async writer accordingly * Undo accidental commit * Clippy * Apply suggestions from code review Improve documentation Co-authored-by: Andrew Lamb * Rewrite example with constants as parameters and fewer dependencies * rustfmt * Clippy * Fix MSRV --------- Co-authored-by: Andrew Lamb --- parquet/Cargo.toml | 8 ++ parquet/examples/write_parquet.rs | 131 ++++++++++++++++++++++++++ parquet/src/arrow/arrow_writer/mod.rs | 28 +++++- parquet/src/arrow/async_writer/mod.rs | 4 +- parquet/src/file/metadata.rs | 5 + parquet/src/file/properties.rs | 36 +++++++ parquet/src/file/writer.rs | 117 ++++++++++++++--------- 7 files changed, 277 insertions(+), 52 deletions(-) create mode 100644 parquet/examples/write_parquet.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 775ac825a2e4..eec7faf09d06 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } +sysinfo = { version = "0.30.12", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -114,12 +115,19 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd", "zstd-sys"] +# Display memory in example/write_parquet.rs +sysinfo = ["dep:sysinfo"] [[example]] name = "read_parquet" required-features = ["arrow"] path = "./examples/read_parquet.rs" +[[example]] +name = "write_parquet" +required-features = ["cli", "sysinfo"] +path = "./examples/write_parquet.rs" + [[example]] name = "async_read_parquet" required-features = ["arrow", "async"] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs new file mode 100644 index 000000000000..d2ef550df840 --- /dev/null +++ b/parquet/examples/write_parquet.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use arrow::array::{StructArray, UInt64Builder}; +use arrow::datatypes::DataType::UInt64; +use arrow::datatypes::{Field, Schema}; +use clap::{Parser, ValueEnum}; +use parquet::arrow::ArrowWriter as ParquetWriter; +use parquet::basic::Encoding; +use parquet::errors::Result; +use parquet::file::properties::{BloomFilterPosition, WriterProperties}; +use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System}; + +#[derive(ValueEnum, Clone)] +enum BloomFilterPositionArg { + End, + AfterRowGroup, +} + +#[derive(Parser)] +#[command(version)] +/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage. +struct Args { + #[arg(long, default_value_t = 1000)] + /// Number of batches to write + iterations: u64, + + #[arg(long, default_value_t = 1000000)] + /// Number of rows in each batch + batch: u64, + + #[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)] + /// Where to write Bloom Filters + bloom_filter_position: BloomFilterPositionArg, + + /// Path to the file to write + path: PathBuf, +} + +fn now() -> String { + chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string() +} + +fn mem(system: &mut System) -> String { + let pid = Pid::from(std::process::id() as usize); + system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory()); + system + .process(pid) + .map(|proc| format!("{}MB", proc.memory() / 1_000_000)) + .unwrap_or("N/A".to_string()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + + let bloom_filter_position = match args.bloom_filter_position { + BloomFilterPositionArg::End => BloomFilterPosition::End, + BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup, + }; + + let properties = WriterProperties::builder() + .set_column_bloom_filter_enabled("id".into(), true) + .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) + .set_bloom_filter_position(bloom_filter_position) + .build(); + let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); + // Create parquet file that will be read. + let file = File::create(args.path).unwrap(); + let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; + + let mut system = + System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); + eprintln!( + "{} Writing {} batches of {} rows. RSS = {}", + now(), + args.iterations, + args.batch, + mem(&mut system) + ); + + let mut array_builder = UInt64Builder::new(); + let mut last_log = Instant::now(); + for i in 0..args.iterations { + if Instant::now() - last_log > Duration::new(10, 0) { + last_log = Instant::now(); + eprintln!( + "{} Iteration {}/{}. RSS = {}", + now(), + i + 1, + args.iterations, + mem(&mut system) + ); + } + for j in 0..args.batch { + array_builder.append_value(i + j); + } + writer.write( + &StructArray::new( + schema.fields().clone(), + vec![Arc::new(array_builder.finish())], + None, + ) + .into(), + )?; + } + writer.flush()?; + writer.close()?; + + eprintln!("{} Done. RSS = {}", now(), mem(&mut system)); + + Ok(()) +} diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0beb93f80a5f..800751ff964b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -199,7 +199,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.writer.flushed_row_groups() } @@ -1053,7 +1053,9 @@ mod tests { use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; + use crate::file::properties::{ + BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, + }; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1701,6 +1703,7 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, + bloom_filter_position: BloomFilterPosition, } impl RoundTripOptions { @@ -1711,6 +1714,7 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, + bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } } @@ -1730,6 +1734,7 @@ mod tests { values, schema, bloom_filter, + bloom_filter_position, } = options; let encodings = match values.data_type() { @@ -1770,6 +1775,7 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) + .set_bloom_filter_position(bloom_filter_position) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -2127,6 +2133,22 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn i32_column_bloom_filter_at_end() { + let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + options.bloom_filter_position = BloomFilterPosition::End; + + let files = one_column_roundtrip_with_options(options); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 0bedf1fcb731..28efbdc7c66e 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,7 +54,7 @@ use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties}, + file::{metadata::RowGroupMetaData, properties::WriterProperties}, format::{FileMetaData, KeyValue}, }; use arrow_array::RecordBatch; @@ -172,7 +172,7 @@ impl AsyncArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.sync_writer.flushed_row_groups() } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index fb8f798fd3ac..255fe1b7b253 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -333,6 +333,11 @@ impl RowGroupMetaData { &self.columns } + /// Returns mutable slice of column chunk metadata. + pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { + &mut self.columns + } + /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 87d84cef80aa..7fc73bd56fe2 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Default value for [`WriterProperties::bloom_filter_position`] +pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")); /// Default value for [`WriterProperties::column_index_truncate_length`] @@ -86,6 +88,24 @@ impl FromStr for WriterVersion { } } +/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should +/// write Bloom filters +/// +/// Basic constant, which is not part of the Thrift definition. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BloomFilterPosition { + /// Write Bloom Filters of each row group right after the row group + /// + /// This saves memory by writing it as soon as it is computed, at the cost + /// of data locality for readers + AfterRowGroup, + /// Write Bloom Filters at the end of the file + /// + /// This allows better data locality for readers, at the cost of memory usage + /// for writers. + End, +} + /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -130,6 +150,7 @@ pub struct WriterProperties { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, pub(crate) key_value_metadata: Option>, @@ -217,6 +238,11 @@ impl WriterProperties { self.max_row_group_size } + /// Returns maximum number of rows in a row group. + pub fn bloom_filter_position(&self) -> BloomFilterPosition { + self.bloom_filter_position + } + /// Returns configured writer version. pub fn writer_version(&self) -> WriterVersion { self.writer_version @@ -338,6 +364,7 @@ pub struct WriterPropertiesBuilder { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, key_value_metadata: Option>, @@ -357,6 +384,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: usize::MAX, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, + bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, @@ -376,6 +404,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, + bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, key_value_metadata: self.key_value_metadata, @@ -487,6 +516,12 @@ impl WriterPropertiesBuilder { self } + /// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`) + pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { + self.bloom_filter_position = value; + self + } + /// Sets "created by" property (defaults to `parquet-rs version `). pub fn set_created_by(mut self, value: String) -> Self { self.created_by = value; @@ -1052,6 +1087,7 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); assert_eq!(props.key_value_metadata(), None); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7806384cdb52..eb633f31c477 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -34,8 +34,9 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; +use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; -use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; +use crate::file::{metadata::*, PARQUET_MAGIC}; use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; /// A wrapper around a [`Write`] that keeps track of the number @@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the row group metadata /// - the column index for each column chunk /// - the offset index for each column chunk -pub type OnCloseRowGroup<'a> = Box< +pub type OnCloseRowGroup<'a, W> = Box< dyn FnOnce( - RowGroupMetaDataPtr, + &'a mut TrackedWrite, + RowGroupMetaData, Vec>, Vec>, Vec>, @@ -143,7 +145,7 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, @@ -197,18 +199,29 @@ impl SerializedFileWriter { self.row_group_index += 1; + let bloom_filter_position = self.properties().bloom_filter_position(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = - |metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| { - row_groups.push(metadata); - row_bloom_filters.push(row_group_bloom_filter); - row_column_indexes.push(row_group_column_index); - row_offset_indexes.push(row_group_offset_index); - Ok(()) + let on_close = move |buf, + mut metadata, + row_group_bloom_filter, + row_group_column_index, + row_group_offset_index| { + row_bloom_filters.push(row_group_bloom_filter); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); + // write bloom filters out immediately after the row group if requested + match bloom_filter_position { + BloomFilterPosition::AfterRowGroup => { + write_bloom_filters(buf, row_bloom_filters, &mut metadata)? + } + BloomFilterPosition::End => (), }; + row_groups.push(metadata); + Ok(()) + }; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), @@ -221,7 +234,7 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { &self.row_groups } @@ -273,34 +286,6 @@ impl SerializedFileWriter { Ok(()) } - /// Serialize all the bloom filter to the file - fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { - // iter row group - // iter each column - // write bloom filter to the file - for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { - for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { - match &self.bloom_filters[row_group_idx][column_idx] { - Some(bloom_filter) => { - let start_offset = self.buf.bytes_written(); - bloom_filter.write(&mut self.buf)?; - let end_offset = self.buf.bytes_written(); - // set offset and index for bloom filter - let column_chunk_meta = column_chunk - .meta_data - .as_mut() - .expect("can't have bloom filter without column metadata"); - column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); - column_chunk_meta.bloom_filter_length = - Some((end_offset - start_offset) as i32); - } - None => {} - } - } - } - Ok(()) - } - /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -331,6 +316,11 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + // write out any remaining bloom filters after all row groups + for row_group in &mut self.row_groups { + write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; + } + let mut row_groups = self .row_groups .as_slice() @@ -338,7 +328,6 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); - self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -443,6 +432,40 @@ impl SerializedFileWriter { } } +/// Serialize all the bloom filters of the given row group to the given buffer, +/// and returns the updated row group metadata. +fn write_bloom_filters( + buf: &mut TrackedWrite, + bloom_filters: &mut [Vec>], + row_group: &mut RowGroupMetaData, +) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + + let row_group_idx: u16 = row_group + .ordinal() + .expect("Missing row group ordinal") + .try_into() + .expect("Negative row group ordinal"); + let row_group_idx = row_group_idx as usize; + for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { + if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() { + let start_offset = buf.bytes_written(); + bloom_filter.write(&mut *buf)?; + let end_offset = buf.bytes_written(); + // set offset and index for bloom filter + *column_chunk = column_chunk + .clone() + .into_builder() + .set_bloom_filter_offset(Some(start_offset as i64)) + .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) + .build()?; + } + } + Ok(()) +} + /// Parquet row group writer API. /// Provides methods to access column writers in an iterator-like fashion, order is /// guaranteed to match the order of schema leaves (column descriptors). @@ -468,7 +491,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { offset_indexes: Vec>, row_group_index: i16, file_offset: i64, - on_close: Option>, + on_close: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -485,7 +508,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: i16, - on_close: Option>, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); let file_offset = buf.bytes_written() as i64; @@ -669,12 +692,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_file_offset(self.file_offset) .build()?; - let metadata = Arc::new(row_group_metadata); - self.row_group_metadata = Some(metadata.clone()); + self.row_group_metadata = Some(Arc::new(row_group_metadata.clone())); if let Some(on_close) = self.on_close.take() { on_close( - metadata, + self.buf, + row_group_metadata, self.bloom_filters, self.column_indexes, self.offset_indexes, @@ -1446,7 +1469,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); + assert_eq!(&flushed[idx], last_group.as_ref()); } let file_metadata = file_writer.close().unwrap(); From 22e0b4432c9838f2536284015271d3de9a165135 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 Jun 2024 11:59:05 -0400 Subject: [PATCH 07/16] Revert "Write Bloom filters between row groups instead of the end (#5860)" (#5932) This reverts commit 3930d5b056b8157e0b5c5eb01b1076e476acf99b. --- parquet/Cargo.toml | 8 -- parquet/examples/write_parquet.rs | 131 -------------------------- parquet/src/arrow/arrow_writer/mod.rs | 28 +----- parquet/src/arrow/async_writer/mod.rs | 4 +- parquet/src/file/metadata.rs | 5 - parquet/src/file/properties.rs | 36 ------- parquet/src/file/writer.rs | 117 +++++++++-------------- 7 files changed, 52 insertions(+), 277 deletions(-) delete mode 100644 parquet/examples/write_parquet.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index eec7faf09d06..775ac825a2e4 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,7 +67,6 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } -sysinfo = { version = "0.30.12", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -115,19 +114,12 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd", "zstd-sys"] -# Display memory in example/write_parquet.rs -sysinfo = ["dep:sysinfo"] [[example]] name = "read_parquet" required-features = ["arrow"] path = "./examples/read_parquet.rs" -[[example]] -name = "write_parquet" -required-features = ["cli", "sysinfo"] -path = "./examples/write_parquet.rs" - [[example]] name = "async_read_parquet" required-features = ["arrow", "async"] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs deleted file mode 100644 index d2ef550df840..000000000000 --- a/parquet/examples/write_parquet.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fs::File; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use arrow::array::{StructArray, UInt64Builder}; -use arrow::datatypes::DataType::UInt64; -use arrow::datatypes::{Field, Schema}; -use clap::{Parser, ValueEnum}; -use parquet::arrow::ArrowWriter as ParquetWriter; -use parquet::basic::Encoding; -use parquet::errors::Result; -use parquet::file::properties::{BloomFilterPosition, WriterProperties}; -use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System}; - -#[derive(ValueEnum, Clone)] -enum BloomFilterPositionArg { - End, - AfterRowGroup, -} - -#[derive(Parser)] -#[command(version)] -/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage. -struct Args { - #[arg(long, default_value_t = 1000)] - /// Number of batches to write - iterations: u64, - - #[arg(long, default_value_t = 1000000)] - /// Number of rows in each batch - batch: u64, - - #[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)] - /// Where to write Bloom Filters - bloom_filter_position: BloomFilterPositionArg, - - /// Path to the file to write - path: PathBuf, -} - -fn now() -> String { - chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string() -} - -fn mem(system: &mut System) -> String { - let pid = Pid::from(std::process::id() as usize); - system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory()); - system - .process(pid) - .map(|proc| format!("{}MB", proc.memory() / 1_000_000)) - .unwrap_or("N/A".to_string()) -} - -fn main() -> Result<()> { - let args = Args::parse(); - - let bloom_filter_position = match args.bloom_filter_position { - BloomFilterPositionArg::End => BloomFilterPosition::End, - BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup, - }; - - let properties = WriterProperties::builder() - .set_column_bloom_filter_enabled("id".into(), true) - .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) - .set_bloom_filter_position(bloom_filter_position) - .build(); - let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); - // Create parquet file that will be read. - let file = File::create(args.path).unwrap(); - let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; - - let mut system = - System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); - eprintln!( - "{} Writing {} batches of {} rows. RSS = {}", - now(), - args.iterations, - args.batch, - mem(&mut system) - ); - - let mut array_builder = UInt64Builder::new(); - let mut last_log = Instant::now(); - for i in 0..args.iterations { - if Instant::now() - last_log > Duration::new(10, 0) { - last_log = Instant::now(); - eprintln!( - "{} Iteration {}/{}. RSS = {}", - now(), - i + 1, - args.iterations, - mem(&mut system) - ); - } - for j in 0..args.batch { - array_builder.append_value(i + j); - } - writer.write( - &StructArray::new( - schema.fields().clone(), - vec![Arc::new(array_builder.finish())], - None, - ) - .into(), - )?; - } - writer.flush()?; - writer.close()?; - - eprintln!("{} Done. RSS = {}", now(), mem(&mut system)); - - Ok(()) -} diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 800751ff964b..0beb93f80a5f 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -199,7 +199,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { self.writer.flushed_row_groups() } @@ -1053,9 +1053,7 @@ mod tests { use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{ - BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, - }; + use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1703,7 +1701,6 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, - bloom_filter_position: BloomFilterPosition, } impl RoundTripOptions { @@ -1714,7 +1711,6 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, - bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } } @@ -1734,7 +1730,6 @@ mod tests { values, schema, bloom_filter, - bloom_filter_position, } = options; let encodings = match values.data_type() { @@ -1775,7 +1770,6 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) - .set_bloom_filter_position(bloom_filter_position) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -2133,22 +2127,6 @@ mod tests { values_required::(many_vecs_iter); } - #[test] - fn i32_column_bloom_filter_at_end() { - let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); - let mut options = RoundTripOptions::new(array, false); - options.bloom_filter = true; - options.bloom_filter_position = BloomFilterPosition::End; - - let files = one_column_roundtrip_with_options(options); - check_bloom_filter( - files, - "col".to_string(), - (0..SMALL_SIZE as i32).collect(), - (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), - ); - } - #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 28efbdc7c66e..0bedf1fcb731 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,7 +54,7 @@ use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaData, properties::WriterProperties}, + file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties}, format::{FileMetaData, KeyValue}, }; use arrow_array::RecordBatch; @@ -172,7 +172,7 @@ impl AsyncArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { self.sync_writer.flushed_row_groups() } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 255fe1b7b253..fb8f798fd3ac 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -333,11 +333,6 @@ impl RowGroupMetaData { &self.columns } - /// Returns mutable slice of column chunk metadata. - pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { - &mut self.columns - } - /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 7fc73bd56fe2..87d84cef80aa 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,8 +43,6 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; -/// Default value for [`WriterProperties::bloom_filter_position`] -pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")); /// Default value for [`WriterProperties::column_index_truncate_length`] @@ -88,24 +86,6 @@ impl FromStr for WriterVersion { } } -/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should -/// write Bloom filters -/// -/// Basic constant, which is not part of the Thrift definition. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BloomFilterPosition { - /// Write Bloom Filters of each row group right after the row group - /// - /// This saves memory by writing it as soon as it is computed, at the cost - /// of data locality for readers - AfterRowGroup, - /// Write Bloom Filters at the end of the file - /// - /// This allows better data locality for readers, at the cost of memory usage - /// for writers. - End, -} - /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -150,7 +130,6 @@ pub struct WriterProperties { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, - bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, pub(crate) key_value_metadata: Option>, @@ -238,11 +217,6 @@ impl WriterProperties { self.max_row_group_size } - /// Returns maximum number of rows in a row group. - pub fn bloom_filter_position(&self) -> BloomFilterPosition { - self.bloom_filter_position - } - /// Returns configured writer version. pub fn writer_version(&self) -> WriterVersion { self.writer_version @@ -364,7 +338,6 @@ pub struct WriterPropertiesBuilder { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, - bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, key_value_metadata: Option>, @@ -384,7 +357,6 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: usize::MAX, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, - bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, @@ -404,7 +376,6 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, - bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, key_value_metadata: self.key_value_metadata, @@ -516,12 +487,6 @@ impl WriterPropertiesBuilder { self } - /// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`) - pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { - self.bloom_filter_position = value; - self - } - /// Sets "created by" property (defaults to `parquet-rs version `). pub fn set_created_by(mut self, value: String) -> Self { self.created_by = value; @@ -1087,7 +1052,6 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); - assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); assert_eq!(props.key_value_metadata(), None); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index eb633f31c477..7806384cdb52 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -34,9 +34,8 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; -use crate::file::{metadata::*, PARQUET_MAGIC}; +use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; /// A wrapper around a [`Write`] that keeps track of the number @@ -116,10 +115,9 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the row group metadata /// - the column index for each column chunk /// - the offset index for each column chunk -pub type OnCloseRowGroup<'a, W> = Box< +pub type OnCloseRowGroup<'a> = Box< dyn FnOnce( - &'a mut TrackedWrite, - RowGroupMetaData, + RowGroupMetaDataPtr, Vec>, Vec>, Vec>, @@ -145,7 +143,7 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, @@ -199,29 +197,18 @@ impl SerializedFileWriter { self.row_group_index += 1; - let bloom_filter_position = self.properties().bloom_filter_position(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = move |buf, - mut metadata, - row_group_bloom_filter, - row_group_column_index, - row_group_offset_index| { - row_bloom_filters.push(row_group_bloom_filter); - row_column_indexes.push(row_group_column_index); - row_offset_indexes.push(row_group_offset_index); - // write bloom filters out immediately after the row group if requested - match bloom_filter_position { - BloomFilterPosition::AfterRowGroup => { - write_bloom_filters(buf, row_bloom_filters, &mut metadata)? - } - BloomFilterPosition::End => (), + let on_close = + |metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| { + row_groups.push(metadata); + row_bloom_filters.push(row_group_bloom_filter); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); + Ok(()) }; - row_groups.push(metadata); - Ok(()) - }; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), @@ -234,7 +221,7 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { &self.row_groups } @@ -286,6 +273,34 @@ impl SerializedFileWriter { Ok(()) } + /// Serialize all the bloom filter to the file + fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { + match &self.bloom_filters[row_group_idx][column_idx] { + Some(bloom_filter) => { + let start_offset = self.buf.bytes_written(); + bloom_filter.write(&mut self.buf)?; + let end_offset = self.buf.bytes_written(); + // set offset and index for bloom filter + let column_chunk_meta = column_chunk + .meta_data + .as_mut() + .expect("can't have bloom filter without column metadata"); + column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); + column_chunk_meta.bloom_filter_length = + Some((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -316,11 +331,6 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); - // write out any remaining bloom filters after all row groups - for row_group in &mut self.row_groups { - write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; - } - let mut row_groups = self .row_groups .as_slice() @@ -328,6 +338,7 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); + self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -432,40 +443,6 @@ impl SerializedFileWriter { } } -/// Serialize all the bloom filters of the given row group to the given buffer, -/// and returns the updated row group metadata. -fn write_bloom_filters( - buf: &mut TrackedWrite, - bloom_filters: &mut [Vec>], - row_group: &mut RowGroupMetaData, -) -> Result<()> { - // iter row group - // iter each column - // write bloom filter to the file - - let row_group_idx: u16 = row_group - .ordinal() - .expect("Missing row group ordinal") - .try_into() - .expect("Negative row group ordinal"); - let row_group_idx = row_group_idx as usize; - for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { - if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() { - let start_offset = buf.bytes_written(); - bloom_filter.write(&mut *buf)?; - let end_offset = buf.bytes_written(); - // set offset and index for bloom filter - *column_chunk = column_chunk - .clone() - .into_builder() - .set_bloom_filter_offset(Some(start_offset as i64)) - .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) - .build()?; - } - } - Ok(()) -} - /// Parquet row group writer API. /// Provides methods to access column writers in an iterator-like fashion, order is /// guaranteed to match the order of schema leaves (column descriptors). @@ -491,7 +468,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { offset_indexes: Vec>, row_group_index: i16, file_offset: i64, - on_close: Option>, + on_close: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -508,7 +485,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: i16, - on_close: Option>, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); let file_offset = buf.bytes_written() as i64; @@ -692,12 +669,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_file_offset(self.file_offset) .build()?; - self.row_group_metadata = Some(Arc::new(row_group_metadata.clone())); + let metadata = Arc::new(row_group_metadata); + self.row_group_metadata = Some(metadata.clone()); if let Some(on_close) = self.on_close.take() { on_close( - self.buf, - row_group_metadata, + metadata, self.bloom_filters, self.column_indexes, self.offset_indexes, @@ -1469,7 +1446,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(&flushed[idx], last_group.as_ref()); + assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); } let file_metadata = file_writer.close().unwrap(); From 7ef6be4cd951d41cf050f84c2d7e5c400395d020 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 21 Jun 2024 20:51:43 +0100 Subject: [PATCH 08/16] Preallocate for `FixedSizeList` in `concat` (#5862) * Add specific fixed size list concat test * Add fixed size list concat benchmark * Improve `FixedSizeList` concat performance for large list * `cargo fmt` * Increase size of `FixedSizeList` benchmark data * Get capacity recursively for `FixedSizeList` * Reuse `Capacities::List` to avoid breaking change * Use correct default capacities * Avoid a `Box::new()` when not needed * format --------- Co-authored-by: Will Jones --- arrow-data/src/transform/mod.rs | 20 ++++++- arrow-select/src/concat.rs | 89 ++++++++++++++++++++++++---- arrow/benches/concatenate_kernel.rs | 20 +++++++ arrow/tests/array_transform.rs | 92 +++++++++++++---------------- 4 files changed, 155 insertions(+), 66 deletions(-) diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index b0d9475afcd6..1e43bf488cbe 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -385,7 +385,10 @@ impl<'a> MutableArrayData<'a> { array_capacity = *capacity; new_buffers(data_type, *capacity) } - (DataType::List(_) | DataType::LargeList(_), Capacities::List(capacity, _)) => { + ( + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _), + Capacities::List(capacity, _), + ) => { array_capacity = *capacity; new_buffers(data_type, *capacity) } @@ -501,12 +504,23 @@ impl<'a> MutableArrayData<'a> { MutableArrayData::new(value_child, use_nulls, array_capacity), ] } - DataType::FixedSizeList(_, _) => { + DataType::FixedSizeList(_, size) => { let children = arrays .iter() .map(|array| &array.child_data()[0]) .collect::>(); - vec![MutableArrayData::new(children, use_nulls, array_capacity)] + let capacities = + if let Capacities::List(capacity, ref child_capacities) = capacities { + child_capacities + .clone() + .map(|c| *c) + .unwrap_or(Capacities::Array(capacity * *size as usize)) + } else { + Capacities::Array(array_capacity * *size as usize) + }; + vec![MutableArrayData::with_capacities( + children, use_nulls, capacities, + )] } DataType::Union(fields, _) => (0..fields.len()) .map(|i| { diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index f98e85475a25..5732d721b340 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -54,6 +54,34 @@ fn binary_capacity(arrays: &[&dyn Array]) -> Capacities { Capacities::Binary(item_capacity, Some(bytes_capacity)) } +fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities { + if let DataType::FixedSizeList(f, _) = data_type { + let item_capacity = arrays.iter().map(|a| a.len()).sum(); + let child_data_type = f.data_type(); + match child_data_type { + // These types should match the types that `get_capacity` + // has special handling for. + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::FixedSizeList(_, _) => { + let values: Vec<&dyn arrow_array::Array> = arrays + .iter() + .map(|a| a.as_fixed_size_list().values().as_ref()) + .collect(); + Capacities::List( + item_capacity, + Some(Box::new(get_capacity(&values, child_data_type))), + ) + } + _ => Capacities::Array(item_capacity), + } + } else { + unreachable!("illegal data type for fixed size list") + } +} + fn concat_dictionaries( arrays: &[&dyn Array], ) -> Result { @@ -107,6 +135,17 @@ macro_rules! dict_helper { }; } +fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities { + match data_type { + DataType::Utf8 => binary_capacity::(arrays), + DataType::LargeUtf8 => binary_capacity::(arrays), + DataType::Binary => binary_capacity::(arrays), + DataType::LargeBinary => binary_capacity::(arrays), + DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type), + _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()), + } +} + /// Concatenate multiple [Array] of the same type into a single [ArrayRef]. pub fn concat(arrays: &[&dyn Array]) -> Result { if arrays.is_empty() { @@ -124,20 +163,15 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { "It is not possible to concatenate arrays of different data types.".to_string(), )); } - - let capacity = match d { - DataType::Utf8 => binary_capacity::(arrays), - DataType::LargeUtf8 => binary_capacity::(arrays), - DataType::Binary => binary_capacity::(arrays), - DataType::LargeBinary => binary_capacity::(arrays), - DataType::Dictionary(k, _) => downcast_integer! { + if let DataType::Dictionary(k, _) = d { + downcast_integer! { k.as_ref() => (dict_helper, arrays), _ => unreachable!("illegal dictionary key type {k}") - }, - _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()), - }; - - concat_fallback(arrays, capacity) + }; + } else { + let capacity = get_capacity(arrays, d); + concat_fallback(arrays, capacity) + } } /// Concatenates arrays using MutableArrayData @@ -373,6 +407,37 @@ mod tests { assert_eq!(array_result.as_ref(), &array_expected as &dyn Array); } + #[test] + fn test_concat_primitive_fixed_size_list_arrays() { + let list1 = vec![ + Some(vec![Some(-1), None]), + None, + Some(vec![Some(10), Some(20)]), + ]; + let list1_array = + FixedSizeListArray::from_iter_primitive::(list1.clone(), 2); + + let list2 = vec![ + None, + Some(vec![Some(100), None]), + Some(vec![Some(102), Some(103)]), + ]; + let list2_array = + FixedSizeListArray::from_iter_primitive::(list2.clone(), 2); + + let list3 = vec![Some(vec![Some(1000), Some(1001)])]; + let list3_array = + FixedSizeListArray::from_iter_primitive::(list3.clone(), 2); + + let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap(); + + let expected = list1.into_iter().chain(list2).chain(list3); + let array_expected = + FixedSizeListArray::from_iter_primitive::(expected, 2); + + assert_eq!(array_result.as_ref(), &array_expected as &dyn Array); + } + #[test] fn test_concat_struct_arrays() { let field = Arc::new(Field::new("field", DataType::Int64, true)); diff --git a/arrow/benches/concatenate_kernel.rs b/arrow/benches/concatenate_kernel.rs index 2f5b654394e4..0c553f8b3f3c 100644 --- a/arrow/benches/concatenate_kernel.rs +++ b/arrow/benches/concatenate_kernel.rs @@ -17,6 +17,8 @@ #[macro_use] extern crate criterion; +use std::sync::Arc; + use criterion::Criterion; extern crate arrow; @@ -82,6 +84,24 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("concat str nulls 1024", |b| { b.iter(|| bench_concat(&v1, &v2)) }); + + let v1 = FixedSizeListArray::try_new( + Arc::new(Field::new("item", DataType::Int32, true)), + 1024, + Arc::new(create_primitive_array::(1024 * 1024, 0.0)), + None, + ) + .unwrap(); + let v2 = FixedSizeListArray::try_new( + Arc::new(Field::new("item", DataType::Int32, true)), + 1024, + Arc::new(create_primitive_array::(1024 * 1024, 0.0)), + None, + ) + .unwrap(); + c.bench_function("concat fixed size lists", |b| { + b.iter(|| bench_concat(&v1, &v2)) + }); } criterion_group!(benches, add_benchmark); diff --git a/arrow/tests/array_transform.rs b/arrow/tests/array_transform.rs index 42e4da7c4b4e..08f23c200d52 100644 --- a/arrow/tests/array_transform.rs +++ b/arrow/tests/array_transform.rs @@ -17,9 +17,9 @@ use arrow::array::{ Array, ArrayRef, BooleanArray, Decimal128Array, DictionaryArray, FixedSizeBinaryArray, - Int16Array, Int32Array, Int64Array, Int64Builder, ListArray, ListBuilder, MapBuilder, - NullArray, StringArray, StringBuilder, StringDictionaryBuilder, StructArray, UInt8Array, - UnionArray, + FixedSizeListBuilder, Int16Array, Int32Array, Int64Array, Int64Builder, ListArray, ListBuilder, + MapBuilder, NullArray, StringArray, StringBuilder, StringDictionaryBuilder, StructArray, + UInt16Array, UInt16Builder, UInt8Array, UnionArray, }; use arrow::datatypes::Int16Type; use arrow_array::StringViewArray; @@ -1074,43 +1074,42 @@ fn test_mixed_types() { MutableArrayData::new(vec![&a, &b], false, 4); } -/* -// this is an old test used on a meanwhile removed dead code -// that is still useful when `MutableArrayData` supports fixed-size lists. #[test] -fn test_fixed_size_list_append() -> Result<()> { - let int_builder = UInt16Builder::new(64); +fn test_fixed_size_list_append() { + let int_builder = UInt16Builder::with_capacity(64); let mut builder = FixedSizeListBuilder::::new(int_builder, 2); - builder.values().append_slice(&[1, 2])?; - builder.append(true)?; - builder.values().append_slice(&[3, 4])?; - builder.append(false)?; - builder.values().append_slice(&[5, 6])?; - builder.append(true)?; - - let a_builder = UInt16Builder::new(64); + builder.values().append_slice(&[1, 2]); + builder.append(true); + builder.values().append_slice(&[3, 4]); + builder.append(false); + builder.values().append_slice(&[5, 6]); + builder.append(true); + let a = builder.finish().into_data(); + + let a_builder = UInt16Builder::with_capacity(64); let mut a_builder = FixedSizeListBuilder::::new(a_builder, 2); - a_builder.values().append_slice(&[7, 8])?; - a_builder.append(true)?; - a_builder.values().append_slice(&[9, 10])?; - a_builder.append(true)?; - a_builder.values().append_slice(&[11, 12])?; - a_builder.append(false)?; - a_builder.values().append_slice(&[13, 14])?; - a_builder.append(true)?; - a_builder.values().append_null()?; - a_builder.values().append_null()?; - a_builder.append(true)?; - let a = a_builder.finish(); + a_builder.values().append_slice(&[7, 8]); + a_builder.append(true); + a_builder.values().append_slice(&[9, 10]); + a_builder.append(true); + a_builder.values().append_slice(&[11, 12]); + a_builder.append(false); + a_builder.values().append_slice(&[13, 14]); + a_builder.append(true); + a_builder.values().append_null(); + a_builder.values().append_null(); + a_builder.append(true); + let b = a_builder.finish().into_data(); + + let mut mutable = MutableArrayData::new(vec![&a, &b], false, 10); + mutable.extend(0, 0, a.len()); + mutable.extend(1, 0, b.len()); // append array - builder.append_data(&[ - a.data(), - a.slice(1, 3).data(), - a.slice(2, 1).data(), - a.slice(5, 0).data(), - ])?; - let finished = builder.finish(); + mutable.extend(1, 1, 4); + mutable.extend(1, 2, 3); + + let finished = mutable.freeze(); let expected_int_array = UInt16Array::from(vec![ Some(1), @@ -1141,23 +1140,14 @@ fn test_fixed_size_list_append() -> Result<()> { Some(11), Some(12), ]); - let expected_list_data = ArrayData::new( - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::UInt16, true)), - 2, - ), + let expected_fixed_size_list_data = ArrayData::try_new( + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt16, true)), 2), 12, - None, - None, + Some(Buffer::from(&[0b11011101, 0b101])), 0, vec![], - vec![expected_int_array.data()], - ); - let expected_list = - FixedSizeListArray::from(Arc::new(expected_list_data) as ArrayData); - assert_eq!(&expected_list.values(), &finished.values()); - assert_eq!(expected_list.len(), finished.len()); - - Ok(()) + vec![expected_int_array.to_data()], + ) + .unwrap(); + assert_eq!(finished, expected_fixed_size_list_data); } -*/ From 13c9e9083ef2ef2e24019cbbd04c00025c11e3d5 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 21 Jun 2024 16:10:56 -0400 Subject: [PATCH 09/16] Add eq benchmark for StringArray/StringViewArray (#5924) * add neq/eq benchmark for String/ViewArray * move bench to comparsion kernel * clean unnecessary dep * make clippy happy --- arrow/benches/comparison_kernels.rs | 51 +++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/arrow/benches/comparison_kernels.rs b/arrow/benches/comparison_kernels.rs index f330e1386cc4..1e081d141a0a 100644 --- a/arrow/benches/comparison_kernels.rs +++ b/arrow/benches/comparison_kernels.rs @@ -17,6 +17,7 @@ #[macro_use] extern crate criterion; +use arrow::util::test_util::seedable_rng; use criterion::Criterion; extern crate arrow; @@ -27,6 +28,8 @@ use arrow::{array::*, datatypes::Float32Type, datatypes::Int32Type}; use arrow_buffer::IntervalMonthDayNano; use arrow_string::like::*; use arrow_string::regexp::regexp_is_match_utf8_scalar; +use rand::rngs::StdRng; +use rand::Rng; const SIZE: usize = 65536; @@ -55,6 +58,14 @@ fn bench_regexp_is_match_utf8_scalar(arr_a: &StringArray, value_b: &str) { .unwrap(); } +fn make_string_array(size: usize, rng: &mut StdRng) -> impl Iterator> + '_ { + (0..size).map(|_| { + let len = rng.gen_range(0..64); + let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); + Some(String::from_utf8(bytes).unwrap()) + }) +} + fn add_benchmark(c: &mut Criterion) { let arr_a = create_primitive_array_with_seed::(SIZE, 0.0, 42); let arr_b = create_primitive_array_with_seed::(SIZE, 0.0, 43); @@ -63,6 +74,7 @@ fn add_benchmark(c: &mut Criterion) { let arr_month_day_nano_b = create_month_day_nano_array_with_seed(SIZE, 0.0, 43); let arr_string = create_string_array::(SIZE, 0.0); + let scalar = Float32Array::from(vec![1.0]); c.bench_function("eq Float32", |b| b.iter(|| eq(&arr_a, &arr_b))); @@ -138,6 +150,45 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| eq(&arr_month_day_nano_b, &scalar).unwrap()) }); + let mut rng = seedable_rng(); + let mut array_gen = make_string_array(1024 * 1024 * 8, &mut rng); + let string_left = StringArray::from_iter(array_gen); + let string_view_left = StringViewArray::from_iter(string_left.iter()); + + // reference to the same rng to make sure we generate **different** array data, + // ow. the left and right will be identical + array_gen = make_string_array(1024 * 1024 * 8, &mut rng); + let string_right = StringArray::from_iter(array_gen); + let string_view_right = StringViewArray::from_iter(string_right.iter()); + + c.bench_function("eq scalar StringArray", |b| { + b.iter(|| { + eq( + &Scalar::new(StringArray::from_iter_values(["xxxx"])), + &string_left, + ) + .unwrap() + }) + }); + + c.bench_function("eq scalar StringViewArray", |b| { + b.iter(|| { + eq( + &Scalar::new(StringViewArray::from_iter_values(["xxxx"])), + &string_view_left, + ) + .unwrap() + }) + }); + + c.bench_function("eq StringArray StringArray", |b| { + b.iter(|| eq(&string_left, &string_right).unwrap()) + }); + + c.bench_function("eq StringViewArray StringViewArray", |b| { + b.iter(|| eq(&string_view_left, &string_view_right).unwrap()) + }); + c.bench_function("like_utf8 scalar equals", |b| { b.iter(|| bench_like_utf8_scalar(&arr_string, "xxxx")) }); From 9413cd3ffdccdc529e44b7aa9d77c9565f7ecaca Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Sat, 22 Jun 2024 06:19:52 -0500 Subject: [PATCH 10/16] Add the ability for Maps to cast to another case where the field names are different (#5703) * Add the ability for Maps to cast to another case where the field names are different. Arrow Maps have field names for the elements of the fields, the field names are allowed to be any value and do not affect the type of the data. This allows a Map where the field names are key_value, key, value to be mapped to a entries, keys, values. This can be helpful in merging record batches that may have come from different sources. This also makes maps behave similar to lists which also have a field to distinguish their elements. * Apply suggestions from code review Co-authored-by: Andrew Lamb * Feedback from code review - simplify map casting logic to reuse the entries - Added unit tests for negative cases - Use MapBuilder to make the intended type clearer. * fix formatting * Lint and format * correctly set the null fields --------- Co-authored-by: Andrew Lamb --- arrow-cast/src/cast/map.rs | 74 +++++++++++ arrow-cast/src/cast/mod.rs | 245 +++++++++++++++++++++++++++++++++++++ 2 files changed, 319 insertions(+) create mode 100644 arrow-cast/src/cast/map.rs diff --git a/arrow-cast/src/cast/map.rs b/arrow-cast/src/cast/map.rs new file mode 100644 index 000000000000..d62a9519b7b3 --- /dev/null +++ b/arrow-cast/src/cast/map.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::cast::*; + +/// Helper function that takes a map container and casts the inner datatype. +pub(crate) fn cast_map_values( + from: &MapArray, + to_data_type: &DataType, + cast_options: &CastOptions, + to_ordered: bool, +) -> Result { + let entries_field = if let DataType::Map(entries_field, _) = to_data_type { + entries_field + } else { + return Err(ArrowError::CastError( + "Internal Error: to_data_type is not a map type.".to_string(), + )); + }; + + let key_field = key_field(entries_field).ok_or(ArrowError::CastError( + "map is missing key field".to_string(), + ))?; + let value_field = value_field(entries_field).ok_or(ArrowError::CastError( + "map is missing value field".to_string(), + ))?; + + let key_array = cast_with_options(from.keys(), key_field.data_type(), cast_options)?; + let value_array = cast_with_options(from.values(), value_field.data_type(), cast_options)?; + + Ok(Arc::new(MapArray::new( + entries_field.clone(), + from.offsets().clone(), + StructArray::new( + Fields::from(vec![key_field, value_field]), + vec![key_array, value_array], + from.entries().nulls().cloned(), + ), + from.nulls().cloned(), + to_ordered, + ))) +} + +/// Gets the key field from the entries of a map. For all other types returns None. +pub(crate) fn key_field(entries_field: &FieldRef) -> Option { + if let DataType::Struct(fields) = entries_field.data_type() { + fields.first().cloned() + } else { + None + } +} + +/// Gets the value field from the entries of a map. For all other types returns None. +pub(crate) fn value_field(entries_field: &FieldRef) -> Option { + if let DataType::Struct(fields) = entries_field.data_type() { + fields.get(1).cloned() + } else { + None + } +} diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 55f2ed72836b..7a6e1a31bb48 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -40,10 +40,12 @@ mod decimal; mod dictionary; mod list; +mod map; mod string; use crate::cast::decimal::*; use crate::cast::dictionary::*; use crate::cast::list::*; +use crate::cast::map::*; use crate::cast::string::*; use arrow_buffer::IntervalMonthDayNano; @@ -159,6 +161,12 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { can_cast_types(from_type, list_to.data_type())}, (FixedSizeList(list_from,size), _) if *size == 1 => { can_cast_types(list_from.data_type(), to_type)}, + (Map(from_entries,ordered_from), Map(to_entries, ordered_to)) if ordered_from == ordered_to => + match (key_field(from_entries), key_field(to_entries), value_field(from_entries), value_field(to_entries)) { + (Some(from_key), Some(to_key), Some(from_value), Some(to_value)) => + can_cast_types(from_key.data_type(), to_key.data_type()) && can_cast_types(from_value.data_type(), to_value.data_type()), + _ => false + }, // cast one decimal type to another decimal type (Decimal128(_, _), Decimal128(_, _)) => true, (Decimal256(_, _), Decimal256(_, _)) => true, @@ -802,6 +810,9 @@ pub fn cast_with_options( (FixedSizeList(_, size), _) if *size == 1 => { cast_single_element_fixed_size_list_to_values(array, to_type, cast_options) } + (Map(_, ordered1), Map(_, ordered2)) if ordered1 == ordered2 => { + cast_map_values(array.as_map(), to_type, cast_options, ordered1.to_owned()) + } (Decimal128(_, s1), Decimal128(p2, s2)) => { cast_decimal_to_decimal_same_type::( array.as_primitive(), @@ -7361,6 +7372,240 @@ mod tests { FixedSizeListArray::from(list_data) } + #[test] + fn test_cast_map_dont_allow_change_of_order() { + let string_builder = StringBuilder::new(); + let value_builder = StringBuilder::new(); + let mut builder = MapBuilder::new( + Some(MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + string_builder, + value_builder, + ); + + builder.keys().append_value("0"); + builder.values().append_value("test_val_1"); + builder.append(true).unwrap(); + builder.keys().append_value("1"); + builder.values().append_value("test_val_2"); + builder.append(true).unwrap(); + + // map builder returns unsorted map by default + let array = builder.finish(); + + let new_ordered = true; + let new_type = DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ] + .into(), + ), + false, + )), + new_ordered, + ); + + let new_array_result = cast(&array, &new_type.clone()); + assert!(!can_cast_types(array.data_type(), &new_type)); + assert!( + matches!(new_array_result, Err(ArrowError::CastError(t)) if t == r#"Casting from Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) to Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, true) not supported"#) + ); + } + + #[test] + fn test_cast_map_dont_allow_when_container_cant_cast() { + let string_builder = StringBuilder::new(); + let value_builder = IntervalDayTimeArray::builder(2); + let mut builder = MapBuilder::new( + Some(MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + string_builder, + value_builder, + ); + + builder.keys().append_value("0"); + builder.values().append_value(IntervalDayTime::new(1, 1)); + builder.append(true).unwrap(); + builder.keys().append_value("1"); + builder.values().append_value(IntervalDayTime::new(2, 2)); + builder.append(true).unwrap(); + + // map builder returns unsorted map by default + let array = builder.finish(); + + let new_ordered = true; + let new_type = DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Duration(TimeUnit::Second), false), + ] + .into(), + ), + false, + )), + new_ordered, + ); + + let new_array_result = cast(&array, &new_type.clone()); + assert!(!can_cast_types(array.data_type(), &new_type)); + assert!( + matches!(new_array_result, Err(ArrowError::CastError(t)) if t == r#"Casting from Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Interval(DayTime), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) to Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Duration(Second), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, true) not supported"#) + ); + } + + #[test] + fn test_cast_map_field_names() { + let string_builder = StringBuilder::new(); + let value_builder = StringBuilder::new(); + let mut builder = MapBuilder::new( + Some(MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + string_builder, + value_builder, + ); + + builder.keys().append_value("0"); + builder.values().append_value("test_val_1"); + builder.append(true).unwrap(); + builder.keys().append_value("1"); + builder.values().append_value("test_val_2"); + builder.append(true).unwrap(); + builder.append(false).unwrap(); + + let array = builder.finish(); + + let new_type = DataType::Map( + Arc::new(Field::new( + "entries_new", + DataType::Struct( + vec![ + Field::new("key_new", DataType::Utf8, false), + Field::new("value_values", DataType::Utf8, false), + ] + .into(), + ), + false, + )), + false, + ); + + assert_ne!(new_type, array.data_type().clone()); + + let new_array = cast(&array, &new_type.clone()).unwrap(); + assert_eq!(new_type, new_array.data_type().clone()); + let map_array = new_array.as_map(); + + assert_ne!(new_type, array.data_type().clone()); + assert_eq!(new_type, map_array.data_type().clone()); + + let key_string = map_array + .keys() + .as_any() + .downcast_ref::() + .unwrap() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(&key_string, &vec!["0", "1"]); + + let values_string_array = cast(map_array.values(), &DataType::Utf8).unwrap(); + let values_string = values_string_array + .as_any() + .downcast_ref::() + .unwrap() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(&values_string, &vec!["test_val_1", "test_val_2"]); + + assert_eq!( + map_array.nulls(), + Some(&NullBuffer::from(vec![true, true, false])) + ); + } + + #[test] + fn test_cast_map_contained_values() { + let string_builder = StringBuilder::new(); + let value_builder = Int8Builder::new(); + let mut builder = MapBuilder::new( + Some(MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + string_builder, + value_builder, + ); + + builder.keys().append_value("0"); + builder.values().append_value(44); + builder.append(true).unwrap(); + builder.keys().append_value("1"); + builder.values().append_value(22); + builder.append(true).unwrap(); + + let array = builder.finish(); + + let new_type = DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ] + .into(), + ), + false, + )), + false, + ); + + let new_array = cast(&array, &new_type.clone()).unwrap(); + assert_eq!(new_type, new_array.data_type().clone()); + let map_array = new_array.as_map(); + + assert_ne!(new_type, array.data_type().clone()); + assert_eq!(new_type, map_array.data_type().clone()); + + let key_string = map_array + .keys() + .as_any() + .downcast_ref::() + .unwrap() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(&key_string, &vec!["0", "1"]); + + let values_string_array = cast(map_array.values(), &DataType::Utf8).unwrap(); + let values_string = values_string_array + .as_any() + .downcast_ref::() + .unwrap() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(&values_string, &vec!["44", "22"]); + } + #[test] fn test_utf8_cast_offsets() { // test if offset of the array is taken into account during cast From 86eb191fb285206c2a3153675ababd179e0612c8 Mon Sep 17 00:00:00 2001 From: Tomoaki Kawada Date: Sun, 23 Jun 2024 16:37:29 +0900 Subject: [PATCH 11/16] fix(ipc): set correct row count when reading struct arrays with zero fields (#5918) --- arrow-ipc/src/reader.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 3c203a7f3654..3423d06b6fca 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -31,7 +31,7 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_array::*; -use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; +use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer}; use arrow_data::ArrayData; use arrow_schema::*; @@ -152,7 +152,15 @@ fn create_array( struct_arrays.push((struct_field.clone(), child)); } let null_count = struct_node.null_count() as usize; - let struct_array = if null_count > 0 { + let struct_array = if struct_arrays.is_empty() { + // `StructArray::from` can't infer the correct row count + // if we have zero fields + let len = struct_node.length() as usize; + StructArray::new_empty_fields( + len, + (null_count > 0).then(|| BooleanBuffer::new(null_buffer, 0, len).into()), + ) + } else if null_count > 0 { // create struct array from fields, arrays and null data StructArray::from((struct_arrays, null_buffer)) } else { @@ -1361,6 +1369,7 @@ mod tests { use crate::root_as_message; use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder}; use arrow_array::types::*; + use arrow_buffer::NullBuffer; use arrow_data::ArrayDataBuilder; fn create_test_projection_schema() -> Schema { @@ -1699,6 +1708,18 @@ mod tests { check_union_with_builder(UnionBuilder::new_sparse()); } + #[test] + fn test_roundtrip_struct_empty_fields() { + let nulls = NullBuffer::from(&[true, true, false][..]); + let rb = RecordBatch::try_from_iter([( + "", + Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _, + )]) + .unwrap(); + let rb2 = roundtrip_ipc(&rb); + assert_eq!(rb, rb2); + } + #[test] fn test_roundtrip_stream_run_array_sliced() { let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"] From 02fb714142fe69d7c3fa3d6fc1612eb7161b8a95 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 23 Jun 2024 08:37:49 +0100 Subject: [PATCH 12/16] Update zstd-sys requirement from >=2.0.0, <2.0.10 to >=2.0.0, <2.0.12 (#5913) Updates the requirements on [zstd-sys](https://github.com/gyscos/zstd-rs) to permit the latest version. - [Release notes](https://github.com/gyscos/zstd-rs/releases) - [Commits](https://github.com/gyscos/zstd-rs/compare/zstd-sys-2.0.7...zstd-sys-2.0.11) --- updated-dependencies: - dependency-name: zstd-sys dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- parquet/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 775ac825a2e4..7df978016501 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -86,10 +86,10 @@ object_store = { version = "0.10.0", default-features = false, features = ["azur # TODO: temporary to fix parquet wasm build # upstream issue: https://github.com/gyscos/zstd-rs/issues/269 [target.'cfg(target_family = "wasm")'.dependencies] -zstd-sys = { version = ">=2.0.0, <2.0.10", optional = true, default-features = false } +zstd-sys = { version = ">=2.0.0, <2.0.12", optional = true, default-features = false } [target.'cfg(target_family = "wasm")'.dev-dependencies] -zstd-sys = { version = ">=2.0.0, <2.0.10", default-features = false } +zstd-sys = { version = ">=2.0.0, <2.0.12", default-features = false } [package.metadata.docs.rs] all-features = true From 0ea074af77ef03371ec0c1b1ebf626390059baa2 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com> Date: Sun, 23 Jun 2024 09:40:35 +0200 Subject: [PATCH 13/16] Add `MultipartUpload` blanket implementation for `Box` (#5919) * add impl for box * update * another update * small fix --- object_store/src/upload.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index e5f683a034ac..dc499e2e0e5e 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -92,6 +92,21 @@ pub trait MultipartUpload: Send + std::fmt::Debug { async fn abort(&mut self) -> Result<()>; } +#[async_trait] +impl MultipartUpload for Box { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + (**self).put_part(data) + } + + async fn complete(&mut self) -> Result { + (**self).complete().await + } + + async fn abort(&mut self) -> Result<()> { + (**self).abort().await + } +} + /// A synchronous write API for uploading data in parallel in fixed size chunks /// /// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in parallel From a35214f92ad7c3bce19875bb091cb776447aa49e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 23 Jun 2024 03:41:08 -0400 Subject: [PATCH 14/16] Fix typo in benchmarks (#5935) --- arrow/benches/comparison_kernels.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow/benches/comparison_kernels.rs b/arrow/benches/comparison_kernels.rs index 1e081d141a0a..360d4865924f 100644 --- a/arrow/benches/comparison_kernels.rs +++ b/arrow/benches/comparison_kernels.rs @@ -269,11 +269,11 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_nilike_utf8_scalar(&arr_string, "%xx_xX%xXX")) }); - c.bench_function("egexp_matches_utf8 scalar starts with", |b| { + c.bench_function("regexp_matches_utf8 scalar starts with", |b| { b.iter(|| bench_regexp_is_match_utf8_scalar(&arr_string, "^xx")) }); - c.bench_function("egexp_matches_utf8 scalar ends with", |b| { + c.bench_function("regexp_matches_utf8 scalar ends with", |b| { b.iter(|| bench_regexp_is_match_utf8_scalar(&arr_string, "xx$")) }); From 063ac13af057db7a766338d07b865b2bd7e4761b Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sun, 23 Jun 2024 20:22:45 +0300 Subject: [PATCH 15/16] row format benches for bool & nullable int (#5943) --- arrow/benches/row_format.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs index cb7455939e0b..b5298cbe3679 100644 --- a/arrow/benches/row_format.rs +++ b/arrow/benches/row_format.rs @@ -23,8 +23,8 @@ use arrow::array::ArrayRef; use arrow::datatypes::{Int64Type, UInt64Type}; use arrow::row::{RowConverter, SortField}; use arrow::util::bench_util::{ - create_dict_from_values, create_primitive_array, create_string_array_with_len, - create_string_dict_array, + create_boolean_array, create_dict_from_values, create_primitive_array, + create_string_array_with_len, create_string_dict_array, }; use arrow_array::types::Int32Type; use arrow_array::Array; @@ -60,9 +60,21 @@ fn row_bench(c: &mut Criterion) { let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; do_bench(c, "4096 u64(0)", cols); + let cols = vec![Arc::new(create_primitive_array::(4096, 0.3)) as ArrayRef]; + do_bench(c, "4096 u64(0.3)", cols); + let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; do_bench(c, "4096 i64(0)", cols); + let cols = vec![Arc::new(create_primitive_array::(4096, 0.3)) as ArrayRef]; + do_bench(c, "4096 i64(0.3)", cols); + + let cols = vec![Arc::new(create_boolean_array(4096, 0., 0.5)) as ArrayRef]; + do_bench(c, "4096 bool(0, 0.5)", cols); + + let cols = vec![Arc::new(create_boolean_array(4096, 0.3, 0.5)) as ArrayRef]; + do_bench(c, "4096 bool(0.3, 0.5)", cols); + let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 10)) as ArrayRef]; do_bench(c, "4096 string(10, 0)", cols); From 0c3a24d2a42dfac7bf56bf2c87374463efee722e Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 23 Jun 2024 19:30:59 -0700 Subject: [PATCH 16/16] Implement arrow-row encoding/decoding for view types (#5922) * implement arrow-row encoding/decoding for view types * add doc comments, better error msg, more test coverage * ensure no performance regression * update perf * fix bug * make fmt happy * Update arrow-array/src/array/byte_view_array.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> * update * update comments * move cmp around * move things around and remove inline hint * Update arrow-array/src/array/byte_view_array.rs Co-authored-by: Andrew Lamb * Update arrow-ord/src/cmp.rs Co-authored-by: Andrew Lamb * return error instead of panic * remove unnecessary func --------- Co-authored-by: Andrew Lamb Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-array/src/array/byte_view_array.rs | 54 +++++++- arrow-cast/src/cast/mod.rs | 40 +----- arrow-ord/src/cmp.rs | 158 ++++++++++++----------- arrow-ord/src/ord.rs | 17 +++ arrow-row/src/lib.rs | 54 +++++++- arrow-row/src/variable.rs | 27 ++++ arrow/benches/comparison_kernels.rs | 20 +++ 7 files changed, 250 insertions(+), 120 deletions(-) diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 187f5b8e6f96..f31bc1c785b9 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -16,19 +16,22 @@ // under the License. use crate::array::print_long_array; -use crate::builder::GenericByteViewBuilder; +use crate::builder::{ArrayBuilder, GenericByteViewBuilder}; use crate::iterator::ArrayIter; use crate::types::bytes::ByteArrayNativeType; use crate::types::{BinaryViewType, ByteViewType, StringViewType}; -use crate::{Array, ArrayAccessor, ArrayRef, Scalar}; -use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; +use crate::{Array, ArrayAccessor, ArrayRef, GenericByteArray, OffsetSizeTrait, Scalar}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer, ScalarBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder, ByteView}; use arrow_schema::{ArrowError, DataType}; +use num::ToPrimitive; use std::any::Any; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use super::ByteArrayType; + /// [Variable-size Binary View Layout]: An array of variable length bytes view arrays. /// /// Different than [`crate::GenericByteArray`] as it stores both an offset and length @@ -429,6 +432,51 @@ impl From for GenericByteViewArray { } } +/// Convert a [`GenericByteArray`] to a [`GenericByteViewArray`] but in a smart way: +/// If the offsets are all less than u32::MAX, then we directly build the view array on top of existing buffer. +impl From<&GenericByteArray> for GenericByteViewArray +where + FROM: ByteArrayType, + FROM::Offset: OffsetSizeTrait + ToPrimitive, + V: ByteViewType, +{ + fn from(byte_array: &GenericByteArray) -> Self { + let offsets = byte_array.offsets(); + + let can_reuse_buffer = match offsets.last() { + Some(offset) => offset.as_usize() < u32::MAX as usize, + None => true, + }; + + if can_reuse_buffer { + let len = byte_array.len(); + let mut views_builder = GenericByteViewBuilder::::with_capacity(len); + let str_values_buf = byte_array.values().clone(); + let block = views_builder.append_block(str_values_buf); + for (i, w) in offsets.windows(2).enumerate() { + let offset = w[0].as_usize(); + let end = w[1].as_usize(); + let length = end - offset; + + if byte_array.is_null(i) { + views_builder.append_null(); + } else { + // Safety: the input was a valid array so it valid UTF8 (if string). And + // all offsets were valid + unsafe { + views_builder.append_view_unchecked(block, offset as u32, length as u32) + } + } + } + assert_eq!(views_builder.len(), len); + views_builder.finish() + } else { + // TODO: the first u32::MAX can still be reused + GenericByteViewArray::::from_iter(byte_array.iter()) + } + } +} + impl From> for ArrayData { fn from(mut array: GenericByteViewArray) -> Self { let len = array.len(); diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 7a6e1a31bb48..e5ab304bb6fb 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -1230,7 +1230,7 @@ pub fn cast_with_options( let binary = BinaryArray::from(array.as_string::().clone()); cast_byte_container::(&binary) } - Utf8View => cast_byte_to_view::(array), + Utf8View => Ok(Arc::new(StringViewArray::from(array.as_string::()))), LargeUtf8 => cast_byte_container::(array), Time32(TimeUnit::Second) => parse_string::(array, cast_options), Time32(TimeUnit::Millisecond) => { @@ -1290,7 +1290,7 @@ pub fn cast_with_options( LargeBinary => Ok(Arc::new(LargeBinaryArray::from( array.as_string::().clone(), ))), - Utf8View => cast_byte_to_view::(array), + Utf8View => Ok(Arc::new(StringViewArray::from(array.as_string::()))), Time32(TimeUnit::Second) => parse_string::(array, cast_options), Time32(TimeUnit::Millisecond) => { parse_string::(array, cast_options) @@ -1338,7 +1338,7 @@ pub fn cast_with_options( FixedSizeBinary(size) => { cast_binary_to_fixed_size_binary::(array, *size, cast_options) } - BinaryView => cast_byte_to_view::(array), + BinaryView => Ok(Arc::new(BinaryViewArray::from(array.as_binary::()))), _ => Err(ArrowError::CastError(format!( "Casting from {from_type:?} to {to_type:?} not supported", ))), @@ -1353,7 +1353,7 @@ pub fn cast_with_options( FixedSizeBinary(size) => { cast_binary_to_fixed_size_binary::(array, *size, cast_options) } - BinaryView => cast_byte_to_view::(array), + BinaryView => Ok(Arc::new(BinaryViewArray::from(array.as_binary::()))), _ => Err(ArrowError::CastError(format!( "Casting from {from_type:?} to {to_type:?} not supported", ))), @@ -2345,38 +2345,6 @@ where Ok(Arc::new(GenericByteArray::::from(array_data))) } -/// Helper function to cast from one `ByteArrayType` array to `ByteViewType` array. -fn cast_byte_to_view(array: &dyn Array) -> Result -where - FROM: ByteArrayType, - FROM::Offset: OffsetSizeTrait + ToPrimitive, - V: ByteViewType, -{ - let byte_array: &GenericByteArray = array.as_bytes(); - let len = array.len(); - let str_values_buf = byte_array.values().clone(); - let offsets = byte_array.offsets(); - - let mut views_builder = GenericByteViewBuilder::::with_capacity(len); - let block = views_builder.append_block(str_values_buf); - for (i, w) in offsets.windows(2).enumerate() { - let offset = w[0].to_u32().unwrap(); - let end = w[1].to_u32().unwrap(); - let length = end - offset; - - if byte_array.is_null(i) { - views_builder.append_null(); - } else { - // Safety: the input was a valid array so it valid UTF8 (if string). And - // all offsets were valid and we created the views correctly - unsafe { views_builder.append_view_unchecked(block, offset, length) } - } - } - - assert_eq!(views_builder.len(), len); - Ok(Arc::new(views_builder.finish())) -} - /// Helper function to cast from one `ByteViewType` array to `ByteArrayType` array. fn cast_view_to_byte(array: &dyn Array) -> Result where diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs index c300f995283b..18f77a9275ce 100644 --- a/arrow-ord/src/cmp.rs +++ b/arrow-ord/src/cmp.rs @@ -540,98 +540,32 @@ impl<'a, T: ByteArrayType> ArrayOrd for &'a GenericByteArray { } } -/// Comparing two ByteView types are non-trivial. -/// It takes a bit of patience to understand why we don't just compare two &[u8] directly. -/// -/// ByteView types give us the following two advantages, and we need to be careful not to lose them: -/// (1) For string/byte smaller than 12 bytes, the entire data is inlined in the view. -/// Meaning that reading one array element requires only one memory access -/// (two memory access required for StringArray, one for offset buffer, the other for value buffer). -/// -/// (2) For string/byte larger than 12 bytes, we can still be faster than (for certain operations) StringArray/ByteArray, -/// thanks to the inlined 4 bytes. -/// Consider equality check: -/// If the first four bytes of the two strings are different, we can return false immediately (with just one memory access). -/// If we are unlucky and the first four bytes are the same, we need to fallback to compare two full strings. impl<'a, T: ByteViewType> ArrayOrd for &'a GenericByteViewArray { - /// Item.0 is the array, Item.1 is the index into the array. - /// Why don't we just store Item.0[Item.1] as the item? - /// - Because if we do so, we materialize the entire string (i.e., make multiple memory accesses), which might be unnecessary. - /// - Most of the time (eq, ord), we only need to look at the first 4 bytes to know the answer, - /// e.g., if the inlined 4 bytes are different, we can directly return unequal without looking at the full string. + /// This is the item type for the GenericByteViewArray::compare + /// Item.0 is the array, Item.1 is the index type Item = (&'a GenericByteViewArray, usize); - /// # Equality check flow - /// (1) if both string are smaller than 12 bytes, we can directly compare the data inlined to the view. - /// (2) if any of the string is larger than 12 bytes, we need to compare the full string. - /// (2.1) if the inlined 4 bytes are different, we can return false immediately. - /// (2.2) o.w., we need to compare the full string. - /// - /// # Safety - /// (1) Indexing. The Self::Item.1 encodes the index value, which is already checked in `value` function, - /// so it is safe to index into the views. - /// (2) Slice data from view. We know the bytes 4-8 are inlined data (per spec), so it is safe to slice from the view. fn is_eq(l: Self::Item, r: Self::Item) -> bool { + // # Safety + // The index is within bounds as it is checked in value() let l_view = unsafe { l.0.views().get_unchecked(l.1) }; let l_len = *l_view as u32; let r_view = unsafe { r.0.views().get_unchecked(r.1) }; let r_len = *r_view as u32; - + // This is a fast path for equality check. + // We don't need to look at the actual bytes to determine if they are equal. if l_len != r_len { return false; } - if l_len <= 12 { - let l_data = unsafe { GenericByteViewArray::::inline_value(l_view, l_len as usize) }; - let r_data = unsafe { GenericByteViewArray::::inline_value(r_view, r_len as usize) }; - l_data == r_data - } else { - let l_inlined_data = unsafe { GenericByteViewArray::::inline_value(l_view, 4) }; - let r_inlined_data = unsafe { GenericByteViewArray::::inline_value(r_view, 4) }; - if l_inlined_data != r_inlined_data { - return false; - } - - let l_full_data: &[u8] = unsafe { l.0.value_unchecked(l.1).as_ref() }; - let r_full_data: &[u8] = unsafe { r.0.value_unchecked(r.1).as_ref() }; - l_full_data == r_full_data - } + unsafe { compare_byte_view_unchecked(l.0, l.1, r.0, r.1).is_eq() } } - /// # Ordering check flow - /// (1) if both string are smaller than 12 bytes, we can directly compare the data inlined to the view. - /// (2) if any of the string is larger than 12 bytes, we need to compare the full string. - /// (2.1) if the inlined 4 bytes are different, we can return the result immediately. - /// (2.2) o.w., we need to compare the full string. - /// - /// # Safety - /// (1) Indexing. The Self::Item.1 encodes the index value, which is already checked in `value` function, - /// so it is safe to index into the views. - /// (2) Slice data from view. We know the bytes 4-8 are inlined data (per spec), so it is safe to slice from the view. fn is_lt(l: Self::Item, r: Self::Item) -> bool { - let l_view = l.0.views().get(l.1).unwrap(); - let l_len = *l_view as u32; - - let r_view = r.0.views().get(r.1).unwrap(); - let r_len = *r_view as u32; - - if l_len <= 12 && r_len <= 12 { - let l_data = unsafe { GenericByteViewArray::::inline_value(l_view, l_len as usize) }; - let r_data = unsafe { GenericByteViewArray::::inline_value(r_view, r_len as usize) }; - return l_data < r_data; - } - // one of the string is larger than 12 bytes, - // we then try to compare the inlined data first - let l_inlined_data = unsafe { GenericByteViewArray::::inline_value(l_view, 4) }; - let r_inlined_data = unsafe { GenericByteViewArray::::inline_value(r_view, 4) }; - if r_inlined_data != l_inlined_data { - return l_inlined_data < r_inlined_data; - } - // unfortunately, we need to compare the full data - let l_full_data: &[u8] = unsafe { l.0.value_unchecked(l.1).as_ref() }; - let r_full_data: &[u8] = unsafe { r.0.value_unchecked(r.1).as_ref() }; - l_full_data < r_full_data + // # Safety + // The index is within bounds as it is checked in value() + unsafe { compare_byte_view_unchecked(l.0, l.1, r.0, r.1).is_lt() } } fn len(&self) -> usize { @@ -663,6 +597,78 @@ impl<'a> ArrayOrd for &'a FixedSizeBinaryArray { } } +/// Compares two [`GenericByteViewArray`] at index `left_idx` and `right_idx` +pub fn compare_byte_view( + left: &GenericByteViewArray, + left_idx: usize, + right: &GenericByteViewArray, + right_idx: usize, +) -> std::cmp::Ordering { + assert!(left_idx < left.len()); + assert!(right_idx < right.len()); + unsafe { compare_byte_view_unchecked(left, left_idx, right, right_idx) } +} + +/// Comparing two [`GenericByteViewArray`] at index `left_idx` and `right_idx` +/// +/// Comparing two ByteView types are non-trivial. +/// It takes a bit of patience to understand why we don't just compare two &[u8] directly. +/// +/// ByteView types give us the following two advantages, and we need to be careful not to lose them: +/// (1) For string/byte smaller than 12 bytes, the entire data is inlined in the view. +/// Meaning that reading one array element requires only one memory access +/// (two memory access required for StringArray, one for offset buffer, the other for value buffer). +/// +/// (2) For string/byte larger than 12 bytes, we can still be faster than (for certain operations) StringArray/ByteArray, +/// thanks to the inlined 4 bytes. +/// Consider equality check: +/// If the first four bytes of the two strings are different, we can return false immediately (with just one memory access). +/// +/// If we directly compare two &[u8], we materialize the entire string (i.e., make multiple memory accesses), which might be unnecessary. +/// - Most of the time (eq, ord), we only need to look at the first 4 bytes to know the answer, +/// e.g., if the inlined 4 bytes are different, we can directly return unequal without looking at the full string. +/// +/// # Order check flow +/// (1) if both string are smaller than 12 bytes, we can directly compare the data inlined to the view. +/// (2) if any of the string is larger than 12 bytes, we need to compare the full string. +/// (2.1) if the inlined 4 bytes are different, we can return the result immediately. +/// (2.2) o.w., we need to compare the full string. +/// +/// # Safety +/// The left/right_idx must within range of each array +pub unsafe fn compare_byte_view_unchecked( + left: &GenericByteViewArray, + left_idx: usize, + right: &GenericByteViewArray, + right_idx: usize, +) -> std::cmp::Ordering { + let l_view = left.views().get_unchecked(left_idx); + let l_len = *l_view as u32; + + let r_view = right.views().get_unchecked(right_idx); + let r_len = *r_view as u32; + + if l_len <= 12 && r_len <= 12 { + let l_data = unsafe { GenericByteViewArray::::inline_value(l_view, l_len as usize) }; + let r_data = unsafe { GenericByteViewArray::::inline_value(r_view, r_len as usize) }; + return l_data.cmp(r_data); + } + + // one of the string is larger than 12 bytes, + // we then try to compare the inlined data first + let l_inlined_data = unsafe { GenericByteViewArray::::inline_value(l_view, 4) }; + let r_inlined_data = unsafe { GenericByteViewArray::::inline_value(r_view, 4) }; + if r_inlined_data != l_inlined_data { + return l_inlined_data.cmp(r_inlined_data); + } + + // unfortunately, we need to compare the full data + let l_full_data: &[u8] = unsafe { left.value_unchecked(left_idx).as_ref() }; + let r_full_data: &[u8] = unsafe { right.value_unchecked(right_idx).as_ref() }; + + l_full_data.cmp(r_full_data) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/arrow-ord/src/ord.rs b/arrow-ord/src/ord.rs index 3825e5ec66f4..6430c8f0e405 100644 --- a/arrow-ord/src/ord.rs +++ b/arrow-ord/src/ord.rs @@ -135,6 +135,21 @@ fn compare_bytes( }) } +fn compare_byte_view( + left: &dyn Array, + right: &dyn Array, + opts: SortOptions, +) -> DynComparator { + let left = left.as_byte_view::(); + let right = right.as_byte_view::(); + + let l = left.clone(); + let r = right.clone(); + compare(left, right, opts, move |i, j| { + crate::cmp::compare_byte_view(&l, i, &r, j) + }) +} + fn compare_dict( left: &dyn Array, right: &dyn Array, @@ -342,8 +357,10 @@ pub fn make_comparator( (Boolean, Boolean) => Ok(compare_boolean(left, right, opts)), (Utf8, Utf8) => Ok(compare_bytes::(left, right, opts)), (LargeUtf8, LargeUtf8) => Ok(compare_bytes::(left, right, opts)), + (Utf8View, Utf8View) => Ok(compare_byte_view::(left, right, opts)), (Binary, Binary) => Ok(compare_bytes::(left, right, opts)), (LargeBinary, LargeBinary) => Ok(compare_bytes::(left, right, opts)), + (BinaryView, BinaryView) => Ok(compare_byte_view::(left, right, opts)), (FixedSizeBinary(_), FixedSizeBinary(_)) => { let left = left.as_fixed_size_binary(); let right = right.as_fixed_size_binary(); diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 8e1285493b0b..5dce771c85a9 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -135,6 +135,7 @@ use arrow_array::*; use arrow_buffer::ArrowNativeType; use arrow_data::ArrayDataBuilder; use arrow_schema::*; +use variable::{decode_binary_view, decode_string_view}; use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; use crate::variable::{decode_binary, decode_string}; @@ -1079,6 +1080,9 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { + *length += variable::encoded_len(slice) + }), DataType::Utf8 => array.as_string::() .iter() .zip(lengths.iter_mut()) @@ -1091,11 +1095,14 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { .for_each(|(slice, length)| { *length += variable::encoded_len(slice.map(|x| x.as_bytes())) }), + DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), DataType::FixedSizeBinary(len) => { let len = len.to_usize().unwrap(); lengths.iter_mut().for_each(|x| *x += 1 + len) } - _ => unreachable!(), + _ => unimplemented!("unsupported data type: {}", array.data_type()), } } Encoder::Dictionary(values, null) => { @@ -1152,6 +1159,9 @@ fn encode_column( DataType::Binary => { variable::encode(data, offsets, as_generic_binary_array::(column).iter(), opts) } + DataType::BinaryView => { + variable::encode(data, offsets, column.as_binary_view().iter(), opts) + } DataType::LargeBinary => { variable::encode(data, offsets, as_generic_binary_array::(column).iter(), opts) } @@ -1167,11 +1177,16 @@ fn encode_column( .map(|x| x.map(|x| x.as_bytes())), opts, ), + DataType::Utf8View => variable::encode( + data, offsets, + column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())), + opts, + ), DataType::FixedSizeBinary(_) => { let array = column.as_any().downcast_ref().unwrap(); fixed::encode_fixed_size_binary(data, offsets, array, opts) } - _ => unreachable!(), + _ => unimplemented!("unsupported data type: {}", column.data_type()), } } Encoder::Dictionary(values, nulls) => { @@ -1255,11 +1270,12 @@ unsafe fn decode_column( DataType::Boolean => Arc::new(decode_bool(rows, options)), DataType::Binary => Arc::new(decode_binary::(rows, options)), DataType::LargeBinary => Arc::new(decode_binary::(rows, options)), + DataType::BinaryView => Arc::new(decode_binary_view(rows, options)), DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)), DataType::Utf8 => Arc::new(decode_string::(rows, options, validate_utf8)), DataType::LargeUtf8 => Arc::new(decode_string::(rows, options, validate_utf8)), - DataType::Dictionary(_, _) => todo!(), - _ => unreachable!() + DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)), + _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type))) } } Codec::Dictionary(converter, _) => { @@ -2047,6 +2063,32 @@ mod tests { .collect() } + fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray { + let mut rng = thread_rng(); + (0..len) + .map(|_| { + rng.gen_bool(valid_percent).then(|| { + let len = rng.gen_range(0..100); + let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); + String::from_utf8(bytes).unwrap() + }) + }) + .collect() + } + + fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray { + let mut rng = thread_rng(); + (0..len) + .map(|_| { + rng.gen_bool(valid_percent).then(|| { + let len = rng.gen_range(0..100); + let bytes: Vec<_> = (0..len).map(|_| rng.gen_range(0..128)).collect(); + bytes + }) + }) + .collect() + } + fn generate_dictionary( values: ArrayRef, len: usize, @@ -2127,7 +2169,7 @@ mod tests { fn generate_column(len: usize) -> ArrayRef { let mut rng = thread_rng(); - match rng.gen_range(0..14) { + match rng.gen_range(0..16) { 0 => Arc::new(generate_primitive_array::(len, 0.8)), 1 => Arc::new(generate_primitive_array::(len, 0.8)), 2 => Arc::new(generate_primitive_array::(len, 0.8)), @@ -2161,6 +2203,8 @@ mod tests { 13 => Arc::new(generate_list(len, 0.8, |values_len| { Arc::new(generate_struct(values_len, 0.8)) })), + 14 => Arc::new(generate_string_view(len, 0.8)), + 15 => Arc::new(generate_byte_view(len, 0.8)), _ => unreachable!(), } } diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs index 45068baf2a32..c5aa7d8ac323 100644 --- a/arrow-row/src/variable.rs +++ b/arrow-row/src/variable.rs @@ -268,3 +268,30 @@ pub unsafe fn decode_string( // Row data must have come from a valid UTF-8 array GenericStringArray::from(builder.build_unchecked()) } + +/// Decodes a binary view array from `rows` with the provided `options` +pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray { + let decoded: GenericBinaryArray = decode_binary(rows, options); + + // Better performance might be to directly build the binary view instead of building to BinaryArray and then casting + // I suspect that the overhead is not a big deal. + // If it is, we can reimplement the `decode_binary_view` function to directly build the StringViewArray + BinaryViewArray::from(&decoded) +} + +/// Decodes a string view array from `rows` with the provided `options` +/// +/// # Safety +/// +/// The row must contain valid UTF-8 data +pub unsafe fn decode_string_view( + rows: &mut [&[u8]], + options: SortOptions, + validate_utf8: bool, +) -> StringViewArray { + let decoded: GenericStringArray = decode_string(rows, options, validate_utf8); + // Better performance might be to directly build the string view instead of building to StringArray and then casting + // I suspect that the overhead is not a big deal. + // If it is, we can reimplement the `decode_string_view` function to directly build the StringViewArray + StringViewArray::from(&decoded) +} diff --git a/arrow/benches/comparison_kernels.rs b/arrow/benches/comparison_kernels.rs index 360d4865924f..e5432c70ee46 100644 --- a/arrow/benches/comparison_kernels.rs +++ b/arrow/benches/comparison_kernels.rs @@ -171,6 +171,26 @@ fn add_benchmark(c: &mut Criterion) { }) }); + c.bench_function("lt scalar StringViewArray", |b| { + b.iter(|| { + lt( + &Scalar::new(StringViewArray::from_iter_values(["xxxx"])), + &string_view_left, + ) + .unwrap() + }) + }); + + c.bench_function("lt scalar StringArray", |b| { + b.iter(|| { + lt( + &Scalar::new(StringArray::from_iter_values(["xxxx"])), + &string_left, + ) + .unwrap() + }) + }); + c.bench_function("eq scalar StringViewArray", |b| { b.iter(|| { eq(