Skip to content

Commit

Permalink
Prepare for ObjectStore 0.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 14, 2023
1 parent a5e86fa commit 84655f0
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 421 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false

[patch.crates-io]
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "979a070dc82eeb26b38a8651cac879b2c276c0ed" }
3 changes: 1 addition & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ assert_cmd = "2.0"
ctor = "0.2.0"
predicates = "3.0"
rstest = "0.17"

[patch.crates-io]
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "979a070dc82eeb26b38a8651cac879b2c276c0ed" }
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::Statistics;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
use std::any::Any;
use std::io::{Read, Seek};
use std::sync::Arc;
Expand All @@ -54,9 +54,12 @@ impl FileFormat for ArrowFormat {
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
let schema = match store.get(&object.location).await? {
GetResult::File(mut file, _) => read_arrow_schema_from_reader(&mut file)?,
r @ GetResult::Stream(_) => {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_arrow_schema_from_reader(&mut file)?
}
GetResultPayload::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
let mut cursor = std::io::Cursor::new(&data);
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
Expand Down Expand Up @@ -54,9 +54,12 @@ impl FileFormat for AvroFormat {
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
let schema = match store.get(&object.location).await? {
GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?,
r @ GetResult::Stream(_) => {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_avro_schema_from_reader(&mut file)?
}
GetResultPayload::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
read_avro_schema_from_reader(&mut data.as_ref())?
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use async_trait::async_trait;
use bytes::Buf;

use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
Expand Down Expand Up @@ -121,14 +121,15 @@ impl FileFormat for JsonFormat {
should_take
};

let schema = match store.get(&object.location).await? {
GetResult::File(file, _) => {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
let mut reader = BufReader::new(decoder);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
}
r @ GetResult::Stream(_) => {
GetResultPayload::Stream(_) => {
let data = r.bytes().await?;
let decoder = file_compression_type.convert_read(data.reader())?;
let mut reader = BufReader::new(decoder);
Expand Down
34 changes: 23 additions & 11 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ pub(crate) mod test_util {
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetOptions, GetResult, ListResult, MultipartId};
use object_store::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
};
use tokio::io::AsyncWrite;

pub async fn scan_format(
Expand Down Expand Up @@ -203,18 +205,28 @@ pub(crate) mod test_util {
unimplemented!()
}

async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let range = 0..bytes.len() * self.max_iterations;
let arc = self.iterations_detected.clone();
Ok(GetResult::Stream(
futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Ok(bytes.clone())
})
.take(self.max_iterations)
.boxed(),
))
let stream = futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Ok(bytes.clone())
})
.take(self.max_iterations)
.boxed();

Ok(GetResult {
payload: GetResultPayload::Stream(stream),
meta: ObjectMeta {
location: location.clone(),
last_modified: Default::default(),
size: range.end,
e_tag: None,
},
range: Default::default(),
})
}

async fn get_opts(
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion_physical_expr::{
PhysicalSortExpr,
};
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
use object_store::{GetResultPayload, ObjectStore};
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -158,13 +158,14 @@ impl FileOpener for ArrowOpener {
let object_store = self.object_store.clone();
let projection = self.projection.clone();
Ok(Box::pin(async move {
match object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let r = object_store.get(file_meta.location()).await?;
match r.payload {
GetResultPayload::File(file, _) => {
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(file, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
r @ GetResult::Stream(_) => {
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let cursor = std::io::Cursor::new(bytes);
let arrow_reader =
Expand Down
11 changes: 6 additions & 5 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ mod private {
use crate::datasource::physical_plan::FileMeta;
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
use object_store::{GetResultPayload, ObjectStore};

pub struct AvroConfig {
pub schema: SchemaRef,
Expand Down Expand Up @@ -203,12 +203,13 @@ mod private {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
Ok(Box::pin(async move {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let r = config.object_store.get(file_meta.location()).await?;
match r.payload {
GetResultPayload::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
}
r @ GetResult::Stream(_) => {
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let reader = config.open(bytes.reader())?;
Ok(futures::stream::iter(reader).boxed())
Expand All @@ -225,14 +226,14 @@ mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
use arrow::datatypes::{DataType, Field, SchemaBuilder};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use object_store::chunked::ChunkedStore;
use rstest::*;
use url::Url;

Expand Down
Loading

0 comments on commit 84655f0

Please sign in to comment.