diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index daf38f2523fc..0b90a78577e5 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -146,11 +146,11 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - target: wasm32-unknown-unknown,wasm32-wasi + target: wasm32-unknown-unknown,wasm32-wasip1 - name: Build wasm32-unknown-unknown run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-unknown-unknown - - name: Build wasm32-wasi - run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-wasi + - name: Build wasm32-wasip1 + run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-wasip1 clippy: name: Clippy diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 899318f01324..1639b031ebfc 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -204,11 +204,11 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - target: wasm32-unknown-unknown,wasm32-wasi + target: wasm32-unknown-unknown,wasm32-wasip1 - name: Build wasm32-unknown-unknown run: cargo build --target wasm32-unknown-unknown - - name: Build wasm32-wasi - run: cargo build --target wasm32-wasi + - name: Build wasm32-wasip1 + run: cargo build --target wasm32-wasip1 windows: name: cargo test LocalFileSystem (win64) diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index 2269950fd235..19503fde7991 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -123,13 +123,13 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder with: - target: wasm32-unknown-unknown,wasm32-wasi + target: wasm32-unknown-unknown,wasm32-wasip1 - name: Install clang # Needed for zlib compilation run: apt-get update && apt-get install -y clang gcc-multilib - name: Build wasm32-unknown-unknown run: cargo build -p parquet --target wasm32-unknown-unknown - - name: Build wasm32-wasi - run: cargo build -p parquet --target wasm32-wasi + - name: Build wasm32-wasip1 + run: cargo build -p parquet --target wasm32-wasip1 pyspark-integration-test: name: PySpark Integration Test diff --git a/README.md b/README.md index ed42f630514b..7a3dc1dd2458 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,16 @@ Planned Release Schedule | Dec 2024 | `0.11.2` | Minor, NO breaking API changes | | Feb 2025 | `0.12.0` | Major, potentially breaking API changes | +### Guidelines for `panic` vs `Result` + +In general, use panics for bad states that are unreachable, unrecoverable or harmful. +For those caused by invalid user input, however, we prefer to report that invalidity +gracefully as an error result instead of panicking. In general, invalid input should result +in an `Error` as soon as possible. It _is_ ok for code paths after validation to assume +validation has already occurred and panic if not. See [this ticket] for more nuances. + +[this ticket]: https://github.com/apache/arrow-rs/issues/6737 + ### Deprecation Guidelines Minor releases may deprecate, but not remove APIs. Deprecating APIs allows diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index a35b5e8629e9..be7e5f86a04d 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -264,8 +264,12 @@ impl ArrayData { offset, buffers, child_data, + align_buffers: false, + // SAFETY: caller responsible for ensuring data is valid + skip_validation: true, } - .build_unchecked() + .build() + .unwrap() } /// Create a new ArrayData, validating that the provided buffers form a valid @@ -1775,7 +1779,7 @@ impl PartialEq for ArrayData { } } -/// Builder for `ArrayData` type +/// Builder for [`ArrayData`] type #[derive(Debug)] pub struct ArrayDataBuilder { data_type: DataType, @@ -1786,6 +1790,20 @@ pub struct ArrayDataBuilder { offset: usize, buffers: Vec, child_data: Vec, + /// Should buffers be realigned (copying if necessary)? + /// + /// Defaults to false. + align_buffers: bool, + /// Should data validation be skipped for this [`ArrayData`]? + /// + /// Defaults to false. + /// + /// # Safety + /// + /// This flag can only be set to true using `unsafe` APIs. However, once true + /// subsequent calls to `build()` may result in undefined behavior if the data + /// is not valid. + skip_validation: bool, } impl ArrayDataBuilder { @@ -1801,6 +1819,8 @@ impl ArrayDataBuilder { offset: 0, buffers: vec![], child_data: vec![], + align_buffers: false, + skip_validation: false, } } @@ -1877,51 +1897,79 @@ impl ArrayDataBuilder { /// Creates an array data, without any validation /// + /// Note: This is shorthand for `self.with_skip_validation(true).build()` + /// /// # Safety /// /// The same caveats as [`ArrayData::new_unchecked`] /// apply. - #[allow(clippy::let_and_return)] pub unsafe fn build_unchecked(self) -> ArrayData { - let data = self.build_impl(); - // Provide a force_validate mode - #[cfg(feature = "force_validate")] - data.validate_data().unwrap(); - data + self.skip_validation(true).build().unwrap() } - /// Same as [`Self::build_unchecked`] but ignoring `force_validate` feature flag - unsafe fn build_impl(self) -> ArrayData { - let nulls = self - .nulls + /// Creates an `ArrayData`, consuming `self` + /// + /// # Safety + /// + /// By default the underlying buffers are checked to ensure they are valid + /// Arrow data. However, if the [`Self::skip_validation`] flag has been set + /// to true (by the `unsafe` API) this validation is skipped. If the data is + /// not valid, undefined behavior will result. + pub fn build(self) -> Result { + let Self { + data_type, + len, + null_count, + null_bit_buffer, + nulls, + offset, + buffers, + child_data, + align_buffers, + skip_validation, + } = self; + + let nulls = nulls .or_else(|| { - let buffer = self.null_bit_buffer?; - let buffer = BooleanBuffer::new(buffer, self.offset, self.len); - Some(match self.null_count { - Some(n) => NullBuffer::new_unchecked(buffer, n), + let buffer = null_bit_buffer?; + let buffer = BooleanBuffer::new(buffer, offset, len); + Some(match null_count { + Some(n) => { + // SAFETY: call to `data.validate_data()` below validates the null buffer is valid + unsafe { NullBuffer::new_unchecked(buffer, n) } + } None => NullBuffer::new(buffer), }) }) .filter(|b| b.null_count() != 0); - ArrayData { - data_type: self.data_type, - len: self.len, - offset: self.offset, - buffers: self.buffers, - child_data: self.child_data, + let mut data = ArrayData { + data_type, + len, + offset, + buffers, + child_data, nulls, + }; + + if align_buffers { + data.align_buffers(); } - } - /// Creates an array data, validating all inputs - pub fn build(self) -> Result { - let data = unsafe { self.build_impl() }; - data.validate_data()?; + // SAFETY: `skip_validation` is only set to true using `unsafe` APIs + if !skip_validation || cfg!(feature = "force_validate") { + data.validate_data()?; + } Ok(data) } /// Creates an array data, validating all inputs, and aligning any buffers + #[deprecated(since = "54.1.0", note = "Use ArrayData::align_buffers instead")] + pub fn build_aligned(self) -> Result { + self.align_buffers(true).build() + } + + /// Ensure that all buffers are aligned, copying data if necessary /// /// Rust requires that arrays are aligned to their corresponding primitive, /// see [`Layout::array`](std::alloc::Layout::array) and [`std::mem::align_of`]. @@ -1930,17 +1978,33 @@ impl ArrayDataBuilder { /// to allow for [slice](std::slice) based APIs. See [`BufferSpec::FixedWidth`]. /// /// As this alignment is architecture specific, and not guaranteed by all arrow implementations, - /// this method is provided to automatically copy buffers to a new correctly aligned allocation + /// this flag is provided to automatically copy buffers to a new correctly aligned allocation /// when necessary, making it useful when interacting with buffers produced by other systems, /// e.g. IPC or FFI. /// - /// This is unlike `[Self::build`] which will instead return an error on encountering + /// If this flag is not enabled, `[Self::build`] return an error on encountering /// insufficiently aligned buffers. - pub fn build_aligned(self) -> Result { - let mut data = unsafe { self.build_impl() }; - data.align_buffers(); - data.validate_data()?; - Ok(data) + pub fn align_buffers(mut self, align_buffers: bool) -> Self { + self.align_buffers = align_buffers; + self + } + + /// Skips validation of the data. + /// + /// If this flag is enabled, `[Self::build`] will skip validation of the + /// data + /// + /// If this flag is not enabled, `[Self::build`] will validate that all + /// buffers are valid and will return an error if any data is invalid. + /// Validation can be expensive. + /// + /// # Safety + /// + /// If validation is skipped, the buffers must form a valid Arrow array, + /// otherwise undefined behavior will result + pub unsafe fn skip_validation(mut self, skip_validation: bool) -> Self { + self.skip_validation = skip_validation; + self } } @@ -1955,6 +2019,8 @@ impl From for ArrayDataBuilder { nulls: d.nulls, null_bit_buffer: None, null_count: None, + align_buffers: false, + skip_validation: false, } } } diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 9ff4da30ed8c..5cf208c8eb5c 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -177,17 +177,13 @@ fn create_array( let values = create_array(reader, values_field, variadic_counts, require_alignment)?; let run_array_length = run_node.length() as usize; - let builder = ArrayData::builder(data_type.clone()) + let array_data = ArrayData::builder(data_type.clone()) .len(run_array_length) .offset(0) .add_child_data(run_ends.into_data()) - .add_child_data(values.into_data()); - - let array_data = if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + .add_child_data(values.into_data()) + .align_buffers(!require_alignment) + .build()?; Ok(make_array(array_data)) } @@ -257,15 +253,11 @@ fn create_array( ))); } - let builder = ArrayData::builder(data_type.clone()) + let array_data = ArrayData::builder(data_type.clone()) .len(length as usize) - .offset(0); - - let array_data = if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + .offset(0) + .align_buffers(!require_alignment) + .build()?; // no buffer increases Ok(Arc::new(NullArray::from(array_data))) @@ -311,11 +303,7 @@ fn create_primitive_array( t => unreachable!("Data type {:?} either unsupported or not primitive", t), }; - let array_data = if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + let array_data = builder.align_buffers(!require_alignment).build()?; Ok(make_array(array_data)) } @@ -347,11 +335,7 @@ fn create_list_array( _ => unreachable!("Cannot create list or map array from {:?}", data_type), }; - let array_data = if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + let array_data = builder.align_buffers(!require_alignment).build()?; Ok(make_array(array_data)) } @@ -367,17 +351,13 @@ fn create_dictionary_array( ) -> Result { if let Dictionary(_, _) = *data_type { let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); - let builder = ArrayData::builder(data_type.clone()) + let array_data = ArrayData::builder(data_type.clone()) .len(field_node.length() as usize) .add_buffer(buffers[1].clone()) .add_child_data(value_array.into_data()) - .null_bit_buffer(null_buffer); - - let array_data = if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + .null_bit_buffer(null_buffer) + .align_buffers(!require_alignment) + .build()?; Ok(make_array(array_data)) } else { diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e4085472ea20..c14c0e1d34c4 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,6 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.33.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } +simdutf8 = { version = "0.1.5"} [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 0e16642940d2..00627ad612ea 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -683,9 +683,12 @@ impl ByteViewArrayDecoderDelta { /// Check that `val` is a valid UTF-8 sequence pub fn check_valid_utf8(val: &[u8]) -> Result<()> { - match std::str::from_utf8(val) { + match simdutf8::basic::from_utf8(val) { Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + Err(_) => { + let e = simdutf8::compat::from_utf8(val).unwrap_err(); + Err(general_err!("encountered non UTF-8 data: {}", e)) + } } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5323251b07e7..2c8a59399de1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -15,65 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Provides `async` API for reading parquet files as +//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet files as //! [`RecordBatch`]es //! -//! ``` -//! # #[tokio::main(flavor="current_thread")] -//! # async fn main() { -//! # -//! # use arrow_array::RecordBatch; -//! # use arrow::util::pretty::pretty_format_batches; -//! # use futures::TryStreamExt; -//! # use tokio::fs::File; -//! # -//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -//! # -//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { -//! # let formatted = pretty_format_batches(batches).unwrap().to_string(); -//! # let actual_lines: Vec<_> = formatted.trim().lines().collect(); -//! # assert_eq!( -//! # &actual_lines, expected_lines, -//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", -//! # expected_lines, actual_lines -//! # ); -//! # } -//! # -//! let testdata = arrow::util::test_util::parquet_test_data(); -//! let path = format!("{}/alltypes_plain.parquet", testdata); -//! let file = File::open(path).await.unwrap(); +//! This can be used to decode a Parquet file in streaming fashion (without +//! downloading the whole file at once) from a remote source, such as an object store. //! -//! let builder = ParquetRecordBatchStreamBuilder::new(file) -//! .await -//! .unwrap() -//! .with_batch_size(3); -//! -//! let file_metadata = builder.metadata().file_metadata(); -//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]); -//! -//! let stream = builder.with_projection(mask).build().unwrap(); -//! let results = stream.try_collect::>().await.unwrap(); -//! assert_eq!(results.len(), 3); -//! -//! assert_batches_eq( -//! &results, -//! &[ -//! "+----------+-------------+-----------+", -//! "| bool_col | tinyint_col | float_col |", -//! "+----------+-------------+-----------+", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "+----------+-------------+-----------+", -//! ], -//! ); -//! # } -//! ``` +//! See example on [`ParquetRecordBatchStreamBuilder::new`] use std::collections::VecDeque; use std::fmt::Formatter; @@ -249,53 +197,153 @@ impl ArrowReaderMetadata { /// breaking the pre-existing ParquetRecordBatchStreamBuilder API pub struct AsyncReader(T); -/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file +/// A builder for reading parquet files from an `async` source as [`ParquetRecordBatchStream`] /// -/// In particular, this handles reading the parquet file metadata, allowing consumers +/// This builder handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... /// they wish to be read by the resulting stream /// +/// See examples on [`ParquetRecordBatchStreamBuilder::new`] +/// /// See [`ArrowReaderBuilder`] for additional member functions pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder>; impl ParquetRecordBatchStreamBuilder { - /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the + /// specified source. /// /// # Example + /// ``` + /// # #[tokio::main(flavor="current_thread")] + /// # async fn main() { + /// # + /// # use arrow_array::RecordBatch; + /// # use arrow::util::pretty::pretty_format_batches; + /// # use futures::TryStreamExt; + /// # + /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; + /// # + /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { + /// # let formatted = pretty_format_batches(batches).unwrap().to_string(); + /// # let actual_lines: Vec<_> = formatted.trim().lines().collect(); + /// # assert_eq!( + /// # &actual_lines, expected_lines, + /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + /// # expected_lines, actual_lines + /// # ); + /// # } + /// # + /// # let testdata = arrow::util::test_util::parquet_test_data(); + /// # let path = format!("{}/alltypes_plain.parquet", testdata); + /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with + /// // another async I/O reader such as a reader from an object store. + /// let file = tokio::fs::File::open(path).await.unwrap(); + /// + /// // Configure options for reading from the async souce + /// let builder = ParquetRecordBatchStreamBuilder::new(file) + /// .await + /// .unwrap(); + /// // Building the stream opens the parquet file (reads metadata, etc) and returns + /// // a stream that can be used to incrementally read the data in batches + /// let stream = builder.build().unwrap(); + /// // In this example, we collect the stream into a Vec + /// // but real applications would likely process the batches as they are read + /// let results = stream.try_collect::>().await.unwrap(); + /// // Demonstrate the results are as expected + /// assert_batches_eq( + /// &results, + /// &[ + /// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + /// "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", + /// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + /// "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", + /// "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |", + /// "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |", + /// "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |", + /// "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |", + /// "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |", + /// "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |", + /// "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |", + /// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + /// ], + /// ); + /// # } + /// ``` + /// + /// # Example configuring options and reading metadata + /// + /// There are many options that control the behavior of the reader, such as + /// `with_batch_size`, `with_projection`, `with_filter`, etc... /// /// ``` - /// # use std::fs::metadata; - /// # use std::sync::Arc; - /// # use bytes::Bytes; - /// # use arrow_array::{Int32Array, RecordBatch}; - /// # use arrow_schema::{DataType, Field, Schema}; - /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata; - /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; - /// # use tempfile::tempfile; - /// # use futures::StreamExt; /// # #[tokio::main(flavor="current_thread")] /// # async fn main() { /// # - /// # let mut file = tempfile().unwrap(); - /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); - /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); - /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); - /// # writer.write(&batch).unwrap(); - /// # writer.close().unwrap(); - /// // Open async file containing parquet data - /// let mut file = tokio::fs::File::from_std(file); - /// // construct the reader - /// let mut reader = ParquetRecordBatchStreamBuilder::new(file) - /// .await.unwrap().build().unwrap(); - /// // Read batche - /// let batch: RecordBatch = reader.next().await.unwrap().unwrap(); + /// # use arrow_array::RecordBatch; + /// # use arrow::util::pretty::pretty_format_batches; + /// # use futures::TryStreamExt; + /// # + /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; + /// # + /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { + /// # let formatted = pretty_format_batches(batches).unwrap().to_string(); + /// # let actual_lines: Vec<_> = formatted.trim().lines().collect(); + /// # assert_eq!( + /// # &actual_lines, expected_lines, + /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + /// # expected_lines, actual_lines + /// # ); + /// # } + /// # + /// # let testdata = arrow::util::test_util::parquet_test_data(); + /// # let path = format!("{}/alltypes_plain.parquet", testdata); + /// // As before, use tokio::fs::File to read data using an async I/O. + /// let file = tokio::fs::File::open(path).await.unwrap(); + /// + /// // Configure options for reading from the async source, in this case we set the batch size + /// // to 3 which produces 3 rows at a time. + /// let builder = ParquetRecordBatchStreamBuilder::new(file) + /// .await + /// .unwrap() + /// .with_batch_size(3); + /// + /// // We can also read the metadata to inspect the schema and other metadata + /// // before actually reading the data + /// let file_metadata = builder.metadata().file_metadata(); + /// // Specify that we only want to read the 1st, 2nd, and 6th columns + /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]); + /// + /// let stream = builder.with_projection(mask).build().unwrap(); + /// let results = stream.try_collect::>().await.unwrap(); + /// // Print out the results + /// assert_batches_eq( + /// &results, + /// &[ + /// "+----------+-------------+-----------+", + /// "| bool_col | tinyint_col | float_col |", + /// "+----------+-------------+-----------+", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "+----------+-------------+-----------+", + /// ], + /// ); + /// + /// // The results has 8 rows, so since we set the batch size to 3, we expect + /// // 3 batches, two with 3 rows each and the last batch with 2 rows. + /// assert_eq!(results.len(), 3); /// # } /// ``` pub async fn new(input: T) -> Result { Self::new_with_options(input, Default::default()).await } - /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source /// and [`ArrowReaderOptions`] pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result { let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?; @@ -352,6 +400,7 @@ impl ParquetRecordBatchStreamBuilder { } /// Read bloom filter for a column in a row group + /// /// Returns `None` if the column does not have a bloom filter /// /// We should call this function after other forms pruning, such as projection and predicate pushdown. @@ -415,6 +464,8 @@ impl ParquetRecordBatchStreamBuilder { } /// Build a new [`ParquetRecordBatchStream`] + /// + /// See examples on [`ParquetRecordBatchStreamBuilder::new`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len(); diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index ce9eb1142a5b..8dfb859612cb 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -117,9 +117,13 @@ impl OffsetBuffer { /// /// [`Self::try_push`] can perform this validation check on insertion pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> { - match std::str::from_utf8(&self.values.as_slice()[start_offset..]) { + match simdutf8::basic::from_utf8(&self.values.as_slice()[start_offset..]) { Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + Err(_) => { + let e = simdutf8::compat::from_utf8(&self.values.as_slice()[start_offset..]) + .unwrap_err(); + Err(general_err!("encountered non UTF-8 data: {}", e)) + } } } diff --git a/parquet/src/util/interner.rs b/parquet/src/util/interner.rs index 489d4d58122c..34c7d1390f7a 100644 --- a/parquet/src/util/interner.rs +++ b/parquet/src/util/interner.rs @@ -24,7 +24,7 @@ const DEFAULT_DEDUP_CAPACITY: usize = 4096; pub trait Storage { type Key: Copy; - type Value: AsBytes + PartialEq + ?Sized; + type Value: AsBytes + ?Sized; /// Gets an element by its key fn get(&self, idx: Self::Key) -> &Self::Value; @@ -66,7 +66,8 @@ impl Interner { .dedup .entry( hash, - |index| value == self.storage.get(*index), + // Compare bytes rather than directly comparing values so NaNs can be interned + |index| value.as_bytes() == self.storage.get(*index).as_bytes(), |key| self.state.hash_one(self.storage.get(*key).as_bytes()), ) .or_insert_with(|| self.storage.push(value))