Skip to content

Commit

Permalink
feat: support to struct datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 5, 2023
1 parent de88a24 commit 38d5d89
Show file tree
Hide file tree
Showing 19 changed files with 1,041 additions and 315 deletions.
763 changes: 589 additions & 174 deletions src/arrow_reader.rs

Large diffs are not rendered by default.

113 changes: 56 additions & 57 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::Field;
use bytes::Bytes;
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::proto::{ColumnEncoding, StripeFooter, StripeInformation};
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::schema::TypeDescription;

use crate::reader::schema::{create_field, TypeDescription};
use crate::reader::Reader;

pub mod binary;
Expand All @@ -19,19 +20,24 @@ pub mod float;
pub mod int;
pub mod present;
pub mod string;
pub mod struct_column;
pub mod timestamp;
pub mod tinyint;

#[derive(Debug)]
pub struct Column {
data: Bytes,
number_of_rows: u64,
compression: Option<Compression>,
footer: Arc<StripeFooter>,
name: String,
column: Arc<TypeDescription>,
}

impl From<Column> for Field {
fn from(value: Column) -> Self {
create_field((&value.name, &value.column))
}
}

macro_rules! impl_read_stream {
($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{
$reader
Expand Down Expand Up @@ -98,67 +104,18 @@ impl Column {
Ok((start, length))
}

pub fn new<R: Read + Seek>(
reader: &mut Reader<R>,
compression: Option<Compression>,
pub fn new(
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
) -> Result<Self> {
let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
let data = Column::read_stream(reader, start, length)?;

Ok(Self {
data,
) -> Self {
Self {
number_of_rows: stripe.number_of_rows(),
compression,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
})
}

pub async fn new_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
reader: &mut Reader<R>,
compression: Option<Compression>,
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
) -> Result<Self> {
let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
let data = Column::read_stream_async(reader, start, length).await?;

Ok(Self {
data,
number_of_rows: stripe.number_of_rows(),
compression,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
})
}

pub fn stream(&self, kind: Kind) -> Option<Result<Decompressor>> {
let mut start = 0; // the start of the stream

let column_id = self.column.column_id() as u32;
self.footer
.streams
.iter()
.filter(|stream| stream.column() == column_id && stream.kind() != Kind::RowIndex)
.map(|stream| {
start += stream.length() as usize;
stream
})
.find(|stream| stream.kind() == kind)
.map(|stream| {
let length = stream.length() as usize;
let data = self.data.slice((start - length)..start);
Decompressor::new(data, self.compression, vec![])
})
.map(Ok)
}
}

pub fn dictionary_size(&self) -> usize {
Expand All @@ -184,6 +141,27 @@ impl Column {
pub fn name(&self) -> &str {
&self.name
}

Check warning on line 143 in src/arrow_reader/column.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_reader/column.rs#L141-L143

Added lines #L141 - L143 were not covered by tests

pub fn column_id(&self) -> u32 {
self.column.column_id() as u32
}

pub fn children(&self) -> Vec<Column> {
let children = self.column.children();

let mut columns = Vec::with_capacity(children.len());

for (name, column) in children {
columns.push(Column {
number_of_rows: self.number_of_rows,
footer: self.footer.clone(),
name,
column,
});
}

columns
}
}

pub struct NullableIterator<T> {
Expand Down Expand Up @@ -211,3 +189,24 @@ impl<T> Iterator for NullableIterator<T> {
}
}
}

impl<T> NullableIterator<T> {
pub fn collect_chunk(&mut self, chunk: usize) -> Option<Result<Vec<Option<T>>>> {
let mut buf = Vec::with_capacity(chunk);
for _ in 0..chunk {
match self.next() {
Some(Ok(value)) => {
buf.push(value);
}
Some(Err(err)) => return Some(Err(err)),
None => break,

Check warning on line 202 in src/arrow_reader/column.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_reader/column.rs#L201-L202

Added lines #L201 - L202 were not covered by tests
}
}

if buf.is_empty() {
return None;

Check warning on line 207 in src/arrow_reader/column.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_reader/column.rs#L207

Added line #L207 was not covered by tests
}

Some(Ok(buf))
}
}
21 changes: 13 additions & 8 deletions src/arrow_reader/column/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error;
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_unsigned_rle_reader;
use crate::reader::decode::variable_length::Values;
use crate::reader::decompress::Decompressor;

pub fn new_binary_iterator(column: &Column) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column)?.collect::<error::Result<Vec<_>>>()?;
pub fn new_binary_iterator(
column: &Column,
stripe: &Stripe,
) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column, stripe)?.collect::<error::Result<Vec<_>>>()?;

let values = column
.stream(Kind::Data)
.transpose()?
let values = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| Box::new(Values::new(reader, vec![])))
.context(error::InvalidColumnSnafu { name: &column.name })?;
let lengths = column
.stream(Kind::Length)
.transpose()?

let lengths = stripe
.stream_map
.get(column, Kind::Length)
.map(|reader| get_direct_unsigned_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_boolean_iter(column: &Column) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_boolean_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows))
as Box<dyn Iterator<Item = Result<bool>> + Send>
Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_signed_rle_reader;
Expand Down Expand Up @@ -37,12 +38,12 @@ impl Iterator for DateIterator {
}
}

pub fn new_date_iter(column: &Column) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_date_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;

let data = column
.stream(Kind::Data)
.transpose()?
let data = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::float::FloatIter;

macro_rules! impl_float_iter {
($tp:ident) => {
paste::item! {
pub fn [<new_ $tp _iter>] (column: &Column) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn [<new_ $tp _iter>] (column: &Column, stripe: &Stripe) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();
let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| Box::new(FloatIter::<$tp, _>::new(reader, rows)))
.context(InvalidColumnSnafu { name: &column.name })?;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_signed_rle_reader;

pub fn new_i64_iter(column: &Column) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_i64_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;

let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(InvalidColumnSnafu { name: &column.name })??;

Expand Down
12 changes: 8 additions & 4 deletions src/arrow_reader/column/present.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::arrow_reader::column::Column;
use crate::arrow_reader::Stripe;
use crate::error::Result;
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_present_iter(column: &Column) -> Result<Box<dyn Iterator<Item = Result<bool>>>> {
pub fn new_present_iter(
column: &Column,
stripe: &Stripe,
) -> Result<Box<dyn Iterator<Item = Result<bool>>>> {
let rows = column.number_of_rows as usize;
let iter = column
.stream(Kind::Present)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Present)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows)) as Box<dyn Iterator<Item = Result<bool>>>
})
Expand Down
Loading

0 comments on commit 38d5d89

Please sign in to comment.