Skip to content

Commit

Permalink
Switch to object_store crate (#2489) (#2677)
Browse files Browse the repository at this point in the history
* Switch to object_store crate (#2489)

* Test fixes

* Update to object_store 0.2.0

* More windows pacification

* Fix windows test

* Fix windows test_prefix_path

* More windows fixes

* Simplify ListingTableUrl::strip_prefix

* Review feedback

* Update to latest arrow-rs

* Use ParquetRecordBatchStream

* Simplify predicate pruning

* Add host to ObjectStoreRegistry

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb authored Jul 4, 2022
1 parent da392f4 commit bf7564f
Show file tree
Hide file tree
Showing 28 changed files with 1,045 additions and 1,055 deletions.
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pyarrow = ["pyo3"]
arrow = { version = "17.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
object_store = { version = "0.3", optional = true }
ordered-float = "3.0"
parquet = { version = "17.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
Expand Down
21 changes: 21 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub enum DataFusionError {
/// Wraps an error from the Avro crate
#[cfg(feature = "avro")]
AvroError(AvroError),
/// Wraps an error from the object_store crate
#[cfg(feature = "object_store")]
ObjectStore(object_store::Error),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
Expand Down Expand Up @@ -203,6 +206,20 @@ impl From<AvroError> for DataFusionError {
}
}

#[cfg(feature = "object_store")]
impl From<object_store::Error> for DataFusionError {
fn from(e: object_store::Error) -> Self {
DataFusionError::ObjectStore(e)
}
}

#[cfg(feature = "object_store")]
impl From<object_store::path::Error> for DataFusionError {
fn from(e: object_store::path::Error) -> Self {
DataFusionError::ObjectStore(e.into())
}
}

impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
Expand Down Expand Up @@ -264,6 +281,10 @@ impl Display for DataFusionError {
DataFusionError::JITError(ref desc) => {
write!(f, "JIT error: {}", desc)
}
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {}", desc)
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ ahash = { version = "0.7", default-features = false }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet"] }
datafusion-data-access = { path = "../data-access", version = "9.0.0" }
datafusion-common = { path = "../common", version = "9.0.0", features = ["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "9.0.0" }
datafusion-jit = { path = "../jit", version = "9.0.0", optional = true }
datafusion-optimizer = { path = "../optimizer", version = "9.0.0" }
Expand All @@ -75,9 +75,10 @@ lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.3.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "17.0.0", features = ["arrow"] }
parquet = { version = "17.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ mod tests {
use std::sync::Arc;

use arrow::datatypes::Schema;
use datafusion_data_access::object_store::local::LocalFileSystem;

use crate::assert_batches_eq;
use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
Expand Down Expand Up @@ -170,8 +169,6 @@ mod tests {
let schema = MemorySchemaProvider::new();

let ctx = SessionContext::new();
let store = Arc::new(LocalFileSystem {});
ctx.runtime_env().register_object_store("file", store);

let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
Expand Down
19 changes: 12 additions & 7 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_data_access::FileMeta;
use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
Expand All @@ -32,7 +32,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use datafusion_data_access::object_store::ObjectStore;

/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
Expand All @@ -49,12 +48,18 @@ impl FileFormat for AvroFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];
for file in files {
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
for object in objects {
let schema = match store.get(&object.location).await? {
GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?,
r @ GetResult::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
read_avro_schema_from_reader(&mut data.as_ref())?
}
};
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Expand All @@ -65,7 +70,7 @@ impl FileFormat for AvroFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand Down
20 changes: 13 additions & 7 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_data_access::FileMeta;
use datafusion_common::DataFusionError;
use futures::TryFutureExt;
use object_store::{ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
Expand All @@ -32,7 +34,6 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
use datafusion_data_access::object_store::ObjectStore;

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -96,16 +97,21 @@ impl FileFormat for CsvFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = vec![];

let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

for file in files {
let mut reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
for object in objects {
let data = store
.get(&object.location)
.and_then(|r| r.bytes())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
&mut data.as_ref(),
self.delimiter,
Some(records_to_read),
self.has_header,
Expand All @@ -128,7 +134,7 @@ impl FileFormat for CsvFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand Down
43 changes: 26 additions & 17 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use async_trait::async_trait;
use datafusion_data_access::{object_store::ObjectStore, FileMeta};
use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use super::FileScanConfig;
Expand Down Expand Up @@ -71,21 +71,33 @@ impl FileFormat for JsonFormat {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
for file in files {
let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
let mut reader = BufReader::new(reader);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
for object in objects {
let mut take_while = || {
let should_take = records_to_read > 0;
if should_take {
records_to_read -= 1;
}
should_take
}))?;
};

let schema = match store.get(&object.location).await? {
GetResult::File(file, _) => {
let mut reader = BufReader::new(file);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
}
r @ GetResult::Stream(_) => {
let data = r.bytes().await?;
let mut reader = BufReader::new(data.as_ref());
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
}
};

schemas.push(schema);
if records_to_read == 0 {
break;
Expand All @@ -100,7 +112,7 @@ impl FileFormat for JsonFormat {
&self,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_file: &FileMeta,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}
Expand All @@ -120,15 +132,12 @@ mod tests {
use super::super::test_util::scan_format;
use arrow::array::Int64Array;
use futures::StreamExt;
use object_store::local::LocalFileSystem;

use super::*;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_data_access::object_store::local::{
local_unpartitioned_file, LocalFileSystem,
},
physical_plan::collect,
};
use crate::test::object_store::local_unpartitioned_file;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -229,12 +238,12 @@ mod tests {

#[tokio::test]
async fn infer_schema_with_limit() {
let store = Arc::new(LocalFileSystem {}) as _;
let store = Arc::new(LocalFileSystem::new()) as _;
let filename = "tests/jsons/schema_infer_limit.json";
let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));

let file_schema = format
.infer_schema(&store, &[local_unpartitioned_file(filename.to_string())])
.infer_schema(&store, &[local_unpartitioned_file(filename)])
.await
.expect("Schema inference");

Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};

use async_trait::async_trait;
use datafusion_data_access::object_store::ObjectStore;
use datafusion_data_access::FileMeta;
use object_store::{ObjectMeta, ObjectStore};

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization across
Expand All @@ -55,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
async fn infer_schema(
&self,
store: &Arc<dyn ObjectStore>,
files: &[FileMeta],
objects: &[ObjectMeta],
) -> Result<SchemaRef>;

/// Infer the statistics for the provided object. The cost and accuracy of the
Expand All @@ -69,7 +68,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
&self,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
file: &FileMeta,
object: &ObjectMeta,
) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
Expand All @@ -86,9 +85,8 @@ pub(crate) mod test_util {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_data_access::object_store::local::{
local_unpartitioned_file, LocalFileSystem,
};
use crate::test::object_store::local_unpartitioned_file;
use object_store::local::LocalFileSystem;

pub async fn scan_format(
format: &dyn FileFormat,
Expand All @@ -97,7 +95,7 @@ pub(crate) mod test_util {
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let store = Arc::new(LocalFileSystem {}) as _;
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name));

let file_schema = format.infer_schema(&store, &[meta.clone()]).await?;
Expand All @@ -107,7 +105,7 @@ pub(crate) mod test_util {
.await?;

let file_groups = vec![vec![PartitionedFile {
file_meta: meta,
object_meta: meta,
partition_values: vec![],
range: None,
}]];
Expand Down
Loading

0 comments on commit bf7564f

Please sign in to comment.