diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 66e2d5741adc..4df72da57d30 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -111,7 +111,7 @@ jobs:
cargo test
cd arrow
# re-run tests on arrow workspace with additional features
- cargo test --features=prettyprint
+ cargo test --features=prettyprint --features=async
# run test on arrow with minimal set of features
cargo test --no-default-features
cargo run --example builders
@@ -238,7 +238,7 @@ jobs:
run: |
export CARGO_HOME="/github/home/.cargo"
export CARGO_TARGET_DIR="/github/home/target"
- cargo clippy --features test_common --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
+ cargo clippy --features test_common --features prettyprint --features=async --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
check_benches:
name: Check Benchmarks (but don't run them)
diff --git a/arrow/src/util/pretty.rs b/arrow/src/util/pretty.rs
index 91343ec0f161..4b67f3d376ed 100644
--- a/arrow/src/util/pretty.rs
+++ b/arrow/src/util/pretty.rs
@@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result
{
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
- cells.push(Cell::new(&array_value_to_string(&column, row)?));
+ cells.push(Cell::new(&array_value_to_string(column, row)?));
}
table.add_row(cells);
}
@@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result {
for col in columns {
for row in 0..col.len() {
- let cells = vec![Cell::new(&array_value_to_string(&col, row)?)];
+ let cells = vec![Cell::new(&array_value_to_string(col, row)?)];
table.add_row(cells);
}
}
@@ -320,7 +320,7 @@ mod tests {
let mut builder = FixedSizeBinaryBuilder::new(3, 3);
builder.append_value(&[1, 2, 3]).unwrap();
- builder.append_null();
+ builder.append_null().unwrap();
builder.append_value(&[7, 8, 9]).unwrap();
let array = Arc::new(builder.finish());
@@ -677,7 +677,7 @@ mod tests {
)?;
let mut buf = String::new();
- write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap();
+ write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap();
let s = vec![
"+---+-----+",
@@ -689,7 +689,7 @@ mod tests {
"| d | 100 |",
"+---+-----+",
];
- let expected = String::from(s.join("\n"));
+ let expected = s.join("\n");
assert_eq!(expected, buf);
Ok(())
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 0765da13c586..08d88d1d2c76 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow "]
-keywords = [ "arrow", "parquet", "hadoop" ]
+keywords = ["arrow", "parquet", "hadoop"]
readme = "README.md"
build = "build.rs"
edition = "2021"
@@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
+futures = { version = "0.3", optional = true }
+tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }
[dev-dependencies]
criterion = "0.3"
@@ -55,7 +57,7 @@ brotli = "3.3"
flate2 = "1.0"
lz4 = "1.23"
serde_json = { version = "1.0", features = ["preserve_order"] }
-arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
+arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }
[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
@@ -63,16 +65,18 @@ cli = ["serde_json", "base64", "clap"]
test_common = []
# Experimental, unstable functionality primarily used for testing
experimental = []
+# Enable async API
+async = ["futures", "tokio"]
-[[ bin ]]
+[[bin]]
name = "parquet-read"
required-features = ["cli"]
-[[ bin ]]
+[[bin]]
name = "parquet-schema"
required-features = ["cli"]
-[[ bin ]]
+[[bin]]
name = "parquet-rowcount"
required-features = ["cli"]
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index e8d3dffb6fec..347d0cfc9273 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -42,7 +42,7 @@ use arrow::datatypes::{
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
- Time32MillisecondType as ArrowTime32MillisecondType,
+ SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
@@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
/// Array reader reads parquet data into arrow array.
-pub trait ArrayReader {
+pub trait ArrayReader: Send {
fn as_any(&self) -> &dyn Any;
/// Returns the arrow type of this array reader.
@@ -117,6 +117,26 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}
+/// A collection of row groups
+pub trait RowGroupCollection {
+ /// Get schema of parquet file.
+ fn schema(&self) -> Result;
+
+ /// Returns an iterator over the column chunks for particular column
+ fn column_chunks(&self, i: usize) -> Result>;
+}
+
+impl RowGroupCollection for Arc {
+ fn schema(&self) -> Result {
+ Ok(self.metadata().file_metadata().schema_descr_ptr())
+ }
+
+ fn column_chunks(&self, column_index: usize) -> Result> {
+ let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
+ Ok(Box::new(iterator))
+ }
+}
+
/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
@@ -482,7 +502,7 @@ where
impl ArrayReader for ComplexObjectArrayReader
where
T: DataType,
- C: Converter>, ArrayRef> + 'static,
+ C: Converter>, ArrayRef> + Send + 'static,
{
fn as_any(&self) -> &dyn Any {
self
@@ -1315,9 +1335,9 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader(
parquet_schema: SchemaDescPtr,
- arrow_schema: Schema,
+ arrow_schema: SchemaRef,
column_indices: T,
- file_reader: Arc,
+ row_groups: Box,
) -> Result>
where
T: IntoIterator- ,
@@ -1355,13 +1375,8 @@ where
fields: filtered_root_fields,
};
- ArrayReaderBuilder::new(
- Arc::new(proj),
- Arc::new(arrow_schema),
- Arc::new(leaves),
- file_reader,
- )
- .build_array_reader()
+ ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
+ .build_array_reader()
}
/// Used to build array reader.
@@ -1371,7 +1386,7 @@ struct ArrayReaderBuilder {
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Arc>,
- file_reader: Arc,
+ row_groups: Box,
}
/// Used in type visitor.
@@ -1671,13 +1686,13 @@ impl<'a> ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Arc,
columns_included: Arc>,
- file_reader: Arc,
+ file_reader: Box,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
- file_reader,
+ row_groups: file_reader,
}
}
@@ -1711,10 +1726,10 @@ impl<'a> ArrayReaderBuilder {
context.rep_level,
context.path.clone(),
));
- let page_iterator = Box::new(FilePageIterator::new(
- self.columns_included[&(cur_type.as_ref() as *const Type)],
- self.file_reader.clone(),
- )?);
+
+ let page_iterator = self
+ .row_groups
+ .column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;
let arrow_type: Option = self
.get_arrow_field(&cur_type, context)
@@ -2827,7 +2842,8 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
- let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
+ let file_reader: Arc =
+ Arc::new(SerializedFileReader::new(file).unwrap());
let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
@@ -2838,9 +2854,9 @@ mod tests {
let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
- arrow_schema,
+ Arc::new(arrow_schema),
vec![0usize].into_iter(),
- file_reader,
+ Box::new(file_reader),
)
.unwrap();
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 0dbc11853a30..fd2055b32782 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
- self.get_schema()?,
+ Arc::new(self.get_schema()?),
column_indices,
- self.file_reader.clone(),
+ Box::new(self.file_reader.clone()),
)?;
ParquetRecordBatchReader::try_new(batch_size, array_reader)
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
new file mode 100644
index 000000000000..c7f1b9c0fc2c
--- /dev/null
+++ b/parquet/src/arrow/async_reader.rs
@@ -0,0 +1,482 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains asynchronous APIs for reading parquet files into
+//! arrow [`RecordBatch`]
+
+use std::collections::VecDeque;
+use std::fmt::Formatter;
+use std::io::{Cursor, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use byteorder::{ByteOrder, LittleEndian};
+use futures::future::{BoxFuture, FutureExt};
+use futures::stream::Stream;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
+use crate::arrow::arrow_reader::ParquetRecordBatchReader;
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::basic::Compression;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::{ParquetError, Result};
+use crate::file::footer::parse_metadata_buffer;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::reader::SerializedPageReader;
+use crate::file::PARQUET_MAGIC;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+use crate::util::memory::ByteBufferPtr;
+
+/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
+///
+/// In particular, this handles reading the parquet file metadata, allowing consumers
+/// to use this information to select what specific columns, row groups, etc...
+/// they wish to be read by the resulting stream
+///
+pub struct ParquetRecordBatchStreamBuilder {
+ input: T,
+
+ metadata: Arc,
+
+ schema: SchemaRef,
+
+ batch_size: usize,
+
+ row_groups: Option>,
+
+ projection: Option>,
+}
+
+impl ParquetRecordBatchStreamBuilder {
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
+ pub async fn new(mut input: T) -> Result {
+ let metadata = Arc::new(read_footer(&mut input).await?);
+
+ let schema = Arc::new(parquet_to_arrow_schema(
+ metadata.file_metadata().schema_descr(),
+ metadata.file_metadata().key_value_metadata(),
+ )?);
+
+ Ok(Self {
+ input,
+ metadata,
+ schema,
+ batch_size: 1024,
+ row_groups: None,
+ projection: None,
+ })
+ }
+
+ /// Returns a reference to the [`ParquetMetaData`] for this parquet file
+ pub fn metadata(&self) -> &Arc {
+ &self.metadata
+ }
+
+ /// Returns the arrow [`SchemaRef`] for this parquet file
+ pub fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ /// Set the size of [`RecordBatch`] to produce
+ pub fn with_batch_size(self, batch_size: usize) -> Self {
+ Self { batch_size, ..self }
+ }
+
+ /// Only read data from the provided row group indexes
+ pub fn with_row_groups(self, row_groups: Vec) -> Self {
+ Self {
+ row_groups: Some(row_groups),
+ ..self
+ }
+ }
+
+ /// Only read data from the provided column indexes
+ pub fn with_projection(self, projection: Vec) -> Self {
+ Self {
+ projection: Some(projection),
+ ..self
+ }
+ }
+
+ /// Build a new [`ParquetRecordBatchStream`]
+ pub fn build(self) -> Result> {
+ let num_columns = self.schema.fields().len();
+ let num_row_groups = self.metadata.row_groups().len();
+
+ let columns = match self.projection {
+ Some(projection) => {
+ if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
+ return Err(general_err!(
+ "column projection {} outside bounds of schema 0..{}",
+ col,
+ num_columns
+ ));
+ }
+ projection
+ }
+ None => (0..num_columns).collect::>(),
+ };
+
+ let row_groups = match self.row_groups {
+ Some(row_groups) => {
+ if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) {
+ return Err(general_err!(
+ "row group {} out of bounds 0..{}",
+ col,
+ num_row_groups
+ ));
+ }
+ row_groups.into()
+ }
+ None => (0..self.metadata.row_groups().len()).collect(),
+ };
+
+ Ok(ParquetRecordBatchStream {
+ row_groups,
+ columns: columns.into(),
+ batch_size: self.batch_size,
+ metadata: self.metadata,
+ schema: self.schema,
+ input: Some(self.input),
+ state: StreamState::Init,
+ })
+ }
+}
+
+enum StreamState {
+ /// At the start of a new row group, or the end of the parquet stream
+ Init,
+ /// Decoding a batch
+ Decoding(ParquetRecordBatchReader),
+ /// Reading data from input
+ Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>),
+ /// Error
+ Error,
+}
+
+impl std::fmt::Debug for StreamState {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ StreamState::Init => write!(f, "StreamState::Init"),
+ StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
+ StreamState::Reading(_) => write!(f, "StreamState::Reading"),
+ StreamState::Error => write!(f, "StreamState::Error"),
+ }
+ }
+}
+
+/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file
+pub struct ParquetRecordBatchStream {
+ metadata: Arc,
+
+ schema: SchemaRef,
+
+ batch_size: usize,
+
+ columns: Arc<[usize]>,
+
+ row_groups: VecDeque,
+
+ /// This is an option so it can be moved into a future
+ input: Option,
+
+ state: StreamState,
+}
+
+impl std::fmt::Debug for ParquetRecordBatchStream {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ParquetRecordBatchStream")
+ .field("metadata", &self.metadata)
+ .field("schema", &self.schema)
+ .field("batch_size", &self.batch_size)
+ .field("columns", &self.columns)
+ .field("state", &self.state)
+ .finish()
+ }
+}
+
+impl ParquetRecordBatchStream {
+ /// Returns the [`SchemaRef`] for this parquet file
+ pub fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+}
+
+impl Stream
+ for ParquetRecordBatchStream
+{
+ type Item = Result;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll