diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 66e2d5741adc..b3d0bf75441d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -107,22 +107,31 @@ jobs: run: | export CARGO_HOME="/github/home/.cargo" export CARGO_TARGET_DIR="/github/home/target" + # run tests on all workspace members with default feature list cargo test + + # Switch to arrow crate cd arrow - # re-run tests on arrow workspace with additional features + # re-run tests on arrow crate with additional features cargo test --features=prettyprint - # run test on arrow with minimal set of features + # run test on arrow crate with minimal set of features cargo test --no-default-features cargo run --example builders cargo run --example dynamic_types cargo run --example read_csv cargo run --example read_csv_infer_schema - # Exit arrow directory - cd .. - (cd parquet && cargo check --no-default-features) - (cd arrow && cargo check --no-default-features) - (cd arrow-flight && cargo check --no-default-features) + cargo check --no-default-features + + # Switch to parquet crate + cd ../parquet + # re-run tests on parquet crate with async feature enabled + cargo test --features=async + cargo check --no-default-features + + # Switch to arrow-flight + cd ../arrow-flight + cargo check --no-default-features # test the --features "simd" of the arrow crate. This requires nightly. linux-test-simd: @@ -238,7 +247,7 @@ jobs: run: | export CARGO_HOME="/github/home/.cargo" export CARGO_TARGET_DIR="/github/home/target" - cargo clippy --features test_common --all-targets --workspace -- -D warnings -A clippy::redundant_field_names + cargo clippy --features test_common --features prettyprint --features=async --all-targets --workspace -- -D warnings -A clippy::redundant_field_names check_benches: name: Check Benchmarks (but don't run them) diff --git a/arrow/src/util/pretty.rs b/arrow/src/util/pretty.rs index 91343ec0f161..4b67f3d376ed 100644 --- a/arrow/src/util/pretty.rs +++ b/arrow/src/util/pretty.rs @@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result { let mut cells = Vec::new(); for col in 0..batch.num_columns() { let column = batch.column(col); - cells.push(Cell::new(&array_value_to_string(&column, row)?)); + cells.push(Cell::new(&array_value_to_string(column, row)?)); } table.add_row(cells); } @@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result
{ for col in columns { for row in 0..col.len() { - let cells = vec![Cell::new(&array_value_to_string(&col, row)?)]; + let cells = vec![Cell::new(&array_value_to_string(col, row)?)]; table.add_row(cells); } } @@ -320,7 +320,7 @@ mod tests { let mut builder = FixedSizeBinaryBuilder::new(3, 3); builder.append_value(&[1, 2, 3]).unwrap(); - builder.append_null(); + builder.append_null().unwrap(); builder.append_value(&[7, 8, 9]).unwrap(); let array = Arc::new(builder.finish()); @@ -677,7 +677,7 @@ mod tests { )?; let mut buf = String::new(); - write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap(); + write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap(); let s = vec![ "+---+-----+", @@ -689,7 +689,7 @@ mod tests { "| d | 100 |", "+---+-----+", ]; - let expected = String::from(s.join("\n")); + let expected = s.join("\n"); assert_eq!(expected, buf); Ok(()) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 0765da13c586..08d88d1d2c76 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust" homepage = "https://github.com/apache/arrow-rs" repository = "https://github.com/apache/arrow-rs" authors = ["Apache Arrow "] -keywords = [ "arrow", "parquet", "hadoop" ] +keywords = ["arrow", "parquet", "hadoop"] readme = "README.md" build = "build.rs" edition = "2021" @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true } clap = { version = "2.33.3", optional = true } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" +futures = { version = "0.3", optional = true } +tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } [dev-dependencies] criterion = "0.3" @@ -55,7 +57,7 @@ brotli = "3.3" flate2 = "1.0" lz4 = "1.23" serde_json = { version = "1.0", features = ["preserve_order"] } -arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] } +arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] } [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] @@ -63,16 +65,18 @@ cli = ["serde_json", "base64", "clap"] test_common = [] # Experimental, unstable functionality primarily used for testing experimental = [] +# Enable async API +async = ["futures", "tokio"] -[[ bin ]] +[[bin]] name = "parquet-read" required-features = ["cli"] -[[ bin ]] +[[bin]] name = "parquet-schema" required-features = ["cli"] -[[ bin ]] +[[bin]] name = "parquet-rowcount" required-features = ["cli"] diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 01e54f67fa6b..9fe5181d5b7e 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -42,7 +42,7 @@ use arrow::datatypes::{ Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema, - Time32MillisecondType as ArrowTime32MillisecondType, + SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType, Time32SecondType as ArrowTime32SecondType, Time64MicrosecondType as ArrowTime64MicrosecondType, Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, @@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; /// Array reader reads parquet data into arrow array. -pub trait ArrayReader { +pub trait ArrayReader: Send { fn as_any(&self) -> &dyn Any; /// Returns the arrow type of this array reader. @@ -117,6 +117,26 @@ pub trait ArrayReader { fn get_rep_levels(&self) -> Option<&[i16]>; } +/// A collection of row groups +pub trait RowGroupCollection { + /// Get schema of parquet file. + fn schema(&self) -> Result; + + /// Returns an iterator over the column chunks for particular column + fn column_chunks(&self, i: usize) -> Result>; +} + +impl RowGroupCollection for Arc { + fn schema(&self) -> Result { + Ok(self.metadata().file_metadata().schema_descr_ptr()) + } + + fn column_chunks(&self, column_index: usize) -> Result> { + let iterator = FilePageIterator::new(column_index, Arc::clone(self))?; + Ok(Box::new(iterator)) + } +} + /// Uses `record_reader` to read up to `batch_size` records from `pages` /// /// Returns the number of records read, which can be less than batch_size if @@ -478,7 +498,7 @@ where impl ArrayReader for ComplexObjectArrayReader where T: DataType, - C: Converter>, ArrayRef> + 'static, + C: Converter>, ArrayRef> + Send + 'static, { fn as_any(&self) -> &dyn Any { self @@ -1311,9 +1331,9 @@ impl ArrayReader for StructArrayReader { /// Create array reader from parquet schema, column indices, and parquet file reader. pub fn build_array_reader( parquet_schema: SchemaDescPtr, - arrow_schema: Schema, + arrow_schema: SchemaRef, column_indices: T, - file_reader: Arc, + row_groups: Box, ) -> Result> where T: IntoIterator, @@ -1351,13 +1371,8 @@ where fields: filtered_root_fields, }; - ArrayReaderBuilder::new( - Arc::new(proj), - Arc::new(arrow_schema), - Arc::new(leaves), - file_reader, - ) - .build_array_reader() + ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups) + .build_array_reader() } /// Used to build array reader. @@ -1367,7 +1382,7 @@ struct ArrayReaderBuilder { // Key: columns that need to be included in final array builder // Value: column index in schema columns_included: Arc>, - file_reader: Arc, + row_groups: Box, } /// Used in type visitor. @@ -1667,13 +1682,13 @@ impl<'a> ArrayReaderBuilder { root_schema: TypePtr, arrow_schema: Arc, columns_included: Arc>, - file_reader: Arc, + file_reader: Box, ) -> Self { Self { root_schema, arrow_schema, columns_included, - file_reader, + row_groups: file_reader, } } @@ -1707,10 +1722,10 @@ impl<'a> ArrayReaderBuilder { context.rep_level, context.path.clone(), )); - let page_iterator = Box::new(FilePageIterator::new( - self.columns_included[&(cur_type.as_ref() as *const Type)], - self.file_reader.clone(), - )?); + + let page_iterator = self + .row_groups + .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?; let arrow_type: Option = self .get_arrow_field(&cur_type, context) @@ -2823,7 +2838,8 @@ mod tests { #[test] fn test_create_array_reader() { let file = get_test_file("nulls.snappy.parquet"); - let file_reader = Arc::new(SerializedFileReader::new(file).unwrap()); + let file_reader: Arc = + Arc::new(SerializedFileReader::new(file).unwrap()); let file_metadata = file_reader.metadata().file_metadata(); let arrow_schema = parquet_to_arrow_schema( @@ -2834,9 +2850,9 @@ mod tests { let array_reader = build_array_reader( file_reader.metadata().file_metadata().schema_descr_ptr(), - arrow_schema, + Arc::new(arrow_schema), vec![0usize].into_iter(), - file_reader, + Box::new(file_reader), ) .unwrap(); diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 259a3c08e586..32003d2724b7 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader { .metadata() .file_metadata() .schema_descr_ptr(), - self.get_schema()?, + Arc::new(self.get_schema()?), column_indices, - self.file_reader.clone(), + Box::new(self.file_reader.clone()), )?; ParquetRecordBatchReader::try_new(batch_size, array_reader) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs new file mode 100644 index 000000000000..c7f1b9c0fc2c --- /dev/null +++ b/parquet/src/arrow/async_reader.rs @@ -0,0 +1,482 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains asynchronous APIs for reading parquet files into +//! arrow [`RecordBatch`] + +use std::collections::VecDeque; +use std::fmt::Formatter; +use std::io::{Cursor, SeekFrom}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use byteorder::{ByteOrder, LittleEndian}; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::Stream; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; + +use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; +use crate::arrow::arrow_reader::ParquetRecordBatchReader; +use crate::arrow::schema::parquet_to_arrow_schema; +use crate::basic::Compression; +use crate::column::page::{PageIterator, PageReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::footer::parse_metadata_buffer; +use crate::file::metadata::ParquetMetaData; +use crate::file::reader::SerializedPageReader; +use crate::file::PARQUET_MAGIC; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; +use crate::util::memory::ByteBufferPtr; + +/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file +/// +/// In particular, this handles reading the parquet file metadata, allowing consumers +/// to use this information to select what specific columns, row groups, etc... +/// they wish to be read by the resulting stream +/// +pub struct ParquetRecordBatchStreamBuilder { + input: T, + + metadata: Arc, + + schema: SchemaRef, + + batch_size: usize, + + row_groups: Option>, + + projection: Option>, +} + +impl ParquetRecordBatchStreamBuilder { + /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + pub async fn new(mut input: T) -> Result { + let metadata = Arc::new(read_footer(&mut input).await?); + + let schema = Arc::new(parquet_to_arrow_schema( + metadata.file_metadata().schema_descr(), + metadata.file_metadata().key_value_metadata(), + )?); + + Ok(Self { + input, + metadata, + schema, + batch_size: 1024, + row_groups: None, + projection: None, + }) + } + + /// Returns a reference to the [`ParquetMetaData`] for this parquet file + pub fn metadata(&self) -> &Arc { + &self.metadata + } + + /// Returns the arrow [`SchemaRef`] for this parquet file + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Set the size of [`RecordBatch`] to produce + pub fn with_batch_size(self, batch_size: usize) -> Self { + Self { batch_size, ..self } + } + + /// Only read data from the provided row group indexes + pub fn with_row_groups(self, row_groups: Vec) -> Self { + Self { + row_groups: Some(row_groups), + ..self + } + } + + /// Only read data from the provided column indexes + pub fn with_projection(self, projection: Vec) -> Self { + Self { + projection: Some(projection), + ..self + } + } + + /// Build a new [`ParquetRecordBatchStream`] + pub fn build(self) -> Result> { + let num_columns = self.schema.fields().len(); + let num_row_groups = self.metadata.row_groups().len(); + + let columns = match self.projection { + Some(projection) => { + if let Some(col) = projection.iter().find(|x| **x >= num_columns) { + return Err(general_err!( + "column projection {} outside bounds of schema 0..{}", + col, + num_columns + )); + } + projection + } + None => (0..num_columns).collect::>(), + }; + + let row_groups = match self.row_groups { + Some(row_groups) => { + if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) { + return Err(general_err!( + "row group {} out of bounds 0..{}", + col, + num_row_groups + )); + } + row_groups.into() + } + None => (0..self.metadata.row_groups().len()).collect(), + }; + + Ok(ParquetRecordBatchStream { + row_groups, + columns: columns.into(), + batch_size: self.batch_size, + metadata: self.metadata, + schema: self.schema, + input: Some(self.input), + state: StreamState::Init, + }) + } +} + +enum StreamState { + /// At the start of a new row group, or the end of the parquet stream + Init, + /// Decoding a batch + Decoding(ParquetRecordBatchReader), + /// Reading data from input + Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), + /// Error + Error, +} + +impl std::fmt::Debug for StreamState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + StreamState::Init => write!(f, "StreamState::Init"), + StreamState::Decoding(_) => write!(f, "StreamState::Decoding"), + StreamState::Reading(_) => write!(f, "StreamState::Reading"), + StreamState::Error => write!(f, "StreamState::Error"), + } + } +} + +/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file +pub struct ParquetRecordBatchStream { + metadata: Arc, + + schema: SchemaRef, + + batch_size: usize, + + columns: Arc<[usize]>, + + row_groups: VecDeque, + + /// This is an option so it can be moved into a future + input: Option, + + state: StreamState, +} + +impl std::fmt::Debug for ParquetRecordBatchStream { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetRecordBatchStream") + .field("metadata", &self.metadata) + .field("schema", &self.schema) + .field("batch_size", &self.batch_size) + .field("columns", &self.columns) + .field("state", &self.state) + .finish() + } +} + +impl ParquetRecordBatchStream { + /// Returns the [`SchemaRef`] for this parquet file + pub fn schema(&self) -> &SchemaRef { + &self.schema + } +} + +impl Stream + for ParquetRecordBatchStream +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match &mut self.state { + StreamState::Decoding(batch_reader) => match batch_reader.next() { + Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), + Some(Err(e)) => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(ParquetError::ArrowError( + e.to_string(), + )))); + } + None => self.state = StreamState::Init, + }, + StreamState::Init => { + let row_group_idx = match self.row_groups.pop_front() { + Some(idx) => idx, + None => return Poll::Ready(None), + }; + + let metadata = self.metadata.clone(); + let mut input = match self.input.take() { + Some(input) => input, + None => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(general_err!( + "input stream lost" + )))); + } + }; + + let columns = Arc::clone(&self.columns); + + self.state = StreamState::Reading( + async move { + let row_group_metadata = metadata.row_group(row_group_idx); + let mut column_chunks = + vec![None; row_group_metadata.columns().len()]; + + for column_idx in columns.iter() { + let column = row_group_metadata.column(*column_idx); + let (start, length) = column.byte_range(); + let end = start + length; + + input.seek(SeekFrom::Start(start)).await?; + + let mut buffer = vec![0_u8; (end - start) as usize]; + input.read_exact(buffer.as_mut_slice()).await?; + + column_chunks[*column_idx] = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data: ByteBufferPtr::new(buffer), + }); + } + + Ok(( + input, + InMemoryRowGroup { + schema: metadata.file_metadata().schema_descr_ptr(), + column_chunks, + }, + )) + } + .boxed(), + ) + } + StreamState::Reading(f) => { + let result = futures::ready!(f.poll_unpin(cx)); + self.state = StreamState::Init; + + let row_group: Box = match result { + Ok((input, row_group)) => { + self.input = Some(input); + Box::new(row_group) + } + Err(e) => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }; + + let parquet_schema = self.metadata.file_metadata().schema_descr_ptr(); + + let array_reader = build_array_reader( + parquet_schema, + self.schema.clone(), + self.columns.iter().cloned(), + row_group, + )?; + + let batch_reader = + ParquetRecordBatchReader::try_new(self.batch_size, array_reader) + .expect("reader"); + + self.state = StreamState::Decoding(batch_reader) + } + StreamState::Error => return Poll::Pending, + } + } + } +} + +async fn read_footer( + input: &mut T, +) -> Result { + input.seek(SeekFrom::End(-8)).await?; + + let mut buf = [0_u8; 8]; + input.read_exact(&mut buf).await?; + + if buf[4..] != PARQUET_MAGIC { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64; + if metadata_len < 0 { + return Err(general_err!( + "Invalid Parquet file. Metadata length is less than zero ({})", + metadata_len + )); + } + + input.seek(SeekFrom::End(-8 - metadata_len)).await?; + + let mut buf = Vec::with_capacity(metadata_len as usize + 8); + input.read_to_end(&mut buf).await?; + + parse_metadata_buffer(&mut Cursor::new(buf)) +} + +struct InMemoryRowGroup { + schema: SchemaDescPtr, + column_chunks: Vec>, +} + +impl RowGroupCollection for InMemoryRowGroup { + fn schema(&self) -> Result { + Ok(self.schema.clone()) + } + + fn column_chunks(&self, i: usize) -> Result> { + let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); + + Ok(Box::new(ColumnChunkIterator { + schema: self.schema.clone(), + column_schema: self.schema.columns()[i].clone(), + reader: Some(page_reader), + })) + } +} + +#[derive(Clone)] +struct InMemoryColumnChunk { + num_values: i64, + compression: Compression, + physical_type: crate::basic::Type, + data: ByteBufferPtr, +} + +impl InMemoryColumnChunk { + fn pages(&self) -> Result> { + let page_reader = SerializedPageReader::new( + Cursor::new(self.data.clone()), + self.num_values, + self.compression, + self.physical_type, + )?; + + Ok(Box::new(page_reader)) + } +} + +struct ColumnChunkIterator { + schema: SchemaDescPtr, + column_schema: ColumnDescPtr, + reader: Option>>, +} + +impl Iterator for ColumnChunkIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + self.reader.take() + } +} + +impl PageIterator for ColumnChunkIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.column_schema.clone()) + } +} + +#[cfg(test)] +mod tests { + use arrow::util::pretty::pretty_format_batches; + use futures::TryStreamExt; + use tokio::fs::File; + + use super::*; + + fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { + let formatted = pretty_format_batches(batches).unwrap().to_string(); + let actual_lines: Vec<_> = formatted.trim().lines().collect(); + assert_eq!( + &actual_lines, expected_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + } + + #[tokio::test] + async fn test_parquet_stream() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_plain.parquet", testdata); + let file = File::open(path).await.unwrap(); + + let builder = ParquetRecordBatchStreamBuilder::new(file) + .await + .unwrap() + .with_projection(vec![1, 2, 6]) + .with_batch_size(3); + + let stream = builder.build().unwrap(); + + let results = stream.try_collect::>().await.unwrap(); + assert_eq!(results.len(), 3); + + assert_batches_eq( + &results, + &[ + "+----------+-------------+-----------+", + "| bool_col | tinyint_col | float_col |", + "+----------+-------------+-----------+", + "| true | 0 | 0 |", + "| false | 1 | 1.1 |", + "| true | 0 | 0 |", + "| false | 1 | 1.1 |", + "| true | 0 | 0 |", + "| false | 1 | 1.1 |", + "| true | 0 | 0 |", + "| false | 1 | 1.1 |", + "+----------+-------------+-----------+", + ], + ); + } +} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index a2f885a8d116..6dacfdcba98d 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -122,6 +122,10 @@ experimental_mod!(array_reader); pub mod arrow_reader; pub mod arrow_writer; mod bit_util; + +#[cfg(feature = "async")] +pub mod async_reader; + experimental_mod!(converter); pub(in crate::arrow) mod levels; pub(in crate::arrow) mod record_reader; diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 31138c156919..2782adf72733 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -370,7 +370,7 @@ mod tests { use super::RecordReader; struct TestPageReader { - pages: Box>, + pages: Box + Send>, } impl TestPageReader { diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 3460d11d047c..40d8c148930c 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -95,7 +95,8 @@ pub struct ScalarBuffer { len: usize, /// Placeholder to allow `T` as an invariant generic parameter - _phantom: PhantomData<*mut T>, + /// without making it !Send + _phantom: PhantomData T>, } impl Default for ScalarBuffer { diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index acbf3eba410e..8c3a31d2f356 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -189,7 +189,7 @@ impl PageWriteSpec { /// API for reading pages from a column chunk. /// This offers a iterator like API to get the next page. -pub trait PageReader: Iterator> { +pub trait PageReader: Iterator> + Send { /// Gets the next page in the column chunk associated with this reader. /// Returns `None` if there are no pages left. fn get_next_page(&mut self) -> Result>; @@ -220,7 +220,7 @@ pub trait PageWriter { } /// An iterator over pages of some specific column in a parquet file. -pub trait PageIterator: Iterator>> { +pub trait PageIterator: Iterator>> + Send { /// Get schema of parquet file. fn schema(&mut self) -> Result; diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 5d6554d92da6..f4aecbf4e86f 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -48,7 +48,7 @@ use crate::basic::Compression as CodecType; use crate::errors::{ParquetError, Result}; /// Parquet compression codec interface. -pub trait Codec { +pub trait Codec: Send { /// Compresses data stored in slice `input_buf` and writes the compressed result /// to `output_buf`. /// Note that you'll need to call `clear()` before reusing the same `output_buf` diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 6f3468af8381..1a67c9de687b 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -598,6 +598,7 @@ pub(crate) mod private { + super::FromBytes + super::SliceAsBytes + PartialOrd + + Send { /// Encode the value directly from a higher level encoder fn encode( @@ -1036,7 +1037,7 @@ pub(crate) mod private { /// Contains the Parquet physical type information as well as the Rust primitive type /// presentation. -pub trait DataType: 'static { +pub trait DataType: 'static + Send { type T: private::ParquetValueType; /// Returns Parquet physical type. diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 48fc108840e9..b3f98d12b753 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -35,7 +35,7 @@ use crate::util::{ // Decoders /// A Parquet decoder for the data type `T`. -pub trait Decoder { +pub trait Decoder: Send { /// Sets the data to decode to be `data`, which should contain `num_values` of values /// to decode. fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()>; diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 48b013f478e7..a7a054c0e947 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -78,7 +78,6 @@ pub fn parse_metadata(chunk_reader: &R) -> Result; if footer_metadata_len > file_size as usize { return Err(general_err!( "Invalid Parquet file. Metadata start is less than zero ({})", @@ -87,16 +86,21 @@ pub fn parse_metadata(chunk_reader: &R) -> Result( + metadata_read: &mut T, +) -> Result { // TODO: row group filtering let mut prot = TCompactInputProtocol::new(metadata_read); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 47d8258694d7..abd6ac62af13 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -104,7 +104,7 @@ pub mod statistics; pub mod writer; const FOOTER_SIZE: usize = 8; -const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +pub(crate) const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; /// The number of bytes read at the end of the parquet file on first read const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024; diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index aa8ba83a6c0e..d752273655c5 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -43,8 +43,8 @@ pub trait Length { /// The ChunkReader trait generates readers of chunks of a source. /// For a file system reader, each chunk might contain a clone of File bounded on a given range. /// For an object store reader, each read can be mapped to a range request. -pub trait ChunkReader: Length { - type T: Read; +pub trait ChunkReader: Length + Send + Sync { + type T: Read + Send; /// get a serialy readeable slice of the current reader /// This should fail if the slice exceeds the current bounds fn get_read(&self, start: u64, length: usize) -> Result; @@ -55,7 +55,7 @@ pub trait ChunkReader: Length { /// Parquet file reader API. With this, user can get metadata information about the /// Parquet file, can get reader for each row group, and access record iterator. -pub trait FileReader { +pub trait FileReader: Send + Sync { /// Get metadata information about this file. fn metadata(&self) -> &ParquetMetaData; @@ -76,7 +76,7 @@ pub trait FileReader { /// Parquet row group reader API. With this, user can get metadata information about the /// row group, as well as readers for each individual column chunk. -pub trait RowGroupReader { +pub trait RowGroupReader: Send + Sync { /// Get metadata information about this row group. fn metadata(&self) -> &RowGroupMetaData; @@ -139,7 +139,7 @@ pub trait RowGroupReader { /// Implementation of page iterator for parquet file. pub struct FilePageIterator { column_index: usize, - row_group_indices: Box>, + row_group_indices: Box + Send>, file_reader: Arc, } @@ -156,7 +156,7 @@ impl FilePageIterator { /// Create page iterator from parquet file reader with only some row groups. pub fn with_row_groups( column_index: usize, - row_group_indices: Box>, + row_group_indices: Box + Send>, file_reader: Arc, ) -> Result { // Check that column_index is valid diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 9d6fb52491fb..420d2e22645f 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -271,7 +271,7 @@ impl SerializedPageReader { } } -impl Iterator for SerializedPageReader { +impl Iterator for SerializedPageReader { type Item = Result; fn next(&mut self) -> Option { @@ -279,7 +279,7 @@ impl Iterator for SerializedPageReader { } } -impl PageReader for SerializedPageReader { +impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { let page_header = self.read_page_header()?; diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 581845a3c1cf..1c0fd6283cde 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -177,13 +177,13 @@ impl> InMemoryPageReader

{ } } -impl> PageReader for InMemoryPageReader

{ +impl + Send> PageReader for InMemoryPageReader

{ fn get_next_page(&mut self) -> Result> { Ok(self.page_iter.next()) } } -impl> Iterator for InMemoryPageReader

{ +impl + Send> Iterator for InMemoryPageReader

{ type Item = Result; fn next(&mut self) -> Option { @@ -223,7 +223,7 @@ impl>> Iterator for InMemoryPageIterator { } } -impl>> PageIterator for InMemoryPageIterator { +impl> + Send> PageIterator for InMemoryPageIterator { fn schema(&mut self) -> Result { Ok(self.schema.clone()) }