Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'apache/main' into alamb/min_max_nan
Browse files Browse the repository at this point in the history
alamb committed May 25, 2024
2 parents 43cb27e + 4709fc6 commit d4b5dac
Showing 54 changed files with 1,472 additions and 1,400 deletions.
4 changes: 2 additions & 2 deletions datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.73-bullseye as builder
FROM rust:1.78-bookworm as builder

COPY . /usr/src/arrow-datafusion
COPY ./datafusion /usr/src/arrow-datafusion/datafusion
@@ -28,7 +28,7 @@ RUN rustup component add rustfmt

RUN cargo build --release

FROM debian:bullseye-slim
FROM debian:bookworm-slim

COPY --from=builder /usr/src/arrow-datafusion/datafusion-cli/target/release/datafusion-cli /usr/local/bin

2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
@@ -379,7 +379,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
let store = get_object_store(&ctx.state(), scheme, url, &table_options).await?;

// Register the retrieved object store in the session context's runtime environment
ctx.runtime_env().register_object_store(url, store);
ctx.register_object_store(url, store);

Ok(())
}
16 changes: 5 additions & 11 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@

use std::{sync::Arc, vec};

use datafusion::common::Statistics;
use datafusion::{
assert_batches_eq,
datasource::{
@@ -58,16 +57,11 @@ async fn main() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![12, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Original file line number Diff line number Diff line change
@@ -49,8 +49,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
let arc_s3 = Arc::new(s3);
ctx.runtime_env()
.register_object_store(&s3_url, arc_s3.clone());
ctx.register_object_store(&s3_url, arc_s3.clone());

let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
Original file line number Diff line number Diff line change
@@ -48,8 +48,7 @@ async fn main() -> Result<()> {

let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.runtime_env()
.register_object_store(&s3_url, Arc::new(s3));
ctx.register_object_store(&s3_url, Arc::new(s3));

// cannot query the parquet files from this bucket because the path contains a whitespace
// and we don't support that yet
16 changes: 5 additions & 11 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_common::Statistics;

use futures::StreamExt;
use object_store::ObjectStore;
@@ -61,16 +60,11 @@ async fn main() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![1, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")?;
ctx.runtime_env().register_object_store(&u, local_fs);
ctx.register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
// for the query
3 changes: 1 addition & 2 deletions datafusion-examples/examples/query-http-csv.rs
Original file line number Diff line number Diff line change
@@ -34,8 +34,7 @@ async fn main() -> Result<()> {
.with_url(base_url.clone())
.build()
.unwrap();
ctx.runtime_env()
.register_object_store(&base_url, Arc::new(http_store));
ctx.register_object_store(&base_url, Arc::new(http_store));

// register csv file with the execution context
ctx.register_csv(
15 changes: 5 additions & 10 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
@@ -154,16 +154,11 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
},
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
None,
)
.await?;
7 changes: 7 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
@@ -48,6 +48,13 @@ pub struct FileRange {
pub end: i64,
}

impl FileRange {
/// returns true if this file range contains the specified offset
pub fn contains(&self, offset: i64) -> bool {
offset >= self.start && offset < self.end
}
}

#[derive(Debug, Clone)]
/// A single file or part of a file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
17 changes: 7 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
@@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
state,
FileScanConfig {
object_store_url,
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.cloned(),
limit,
output_ordering,
table_partition_cols,
},
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
.with_file_groups(partitioned_file_lists)
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
filters.as_ref(),
)
.await
59 changes: 24 additions & 35 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
@@ -261,9 +261,7 @@ mod tests {
let state = session_ctx.state();

let url = Url::parse("file://").unwrap();
state
.runtime_env()
.register_object_store(&url, store.clone());
session_ctx.register_object_store(&url, store.clone());

let testdata = crate::test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
@@ -273,16 +271,11 @@ mod tests {
.infer_schema(&state, &store, &[meta.clone()])
.await?;

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2])),
);
assert_eq!(
avro_exec
.properties()
@@ -350,16 +343,11 @@ mod tests {
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url,
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
.with_file(meta.into())
.with_projection(projection),
);
assert_eq!(
avro_exec
.properties()
@@ -424,18 +412,19 @@ mod tests {
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];

let avro_exec = AvroExec::new(FileScanConfig {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
object_store_url,
file_groups: vec![vec![partitioned_file]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
output_ordering: vec![],
});
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new(
"date",
DataType::Utf8,
false,
)]),
);
assert_eq!(
avro_exec
.properties()
22 changes: 10 additions & 12 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
@@ -561,7 +561,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(
@@ -627,7 +627,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![4, 0, 2]);

let csv = CsvExec::new(
@@ -693,7 +693,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
@@ -756,7 +756,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
@@ -809,7 +809,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);

// Add partition columns
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
@@ -895,7 +895,7 @@ mod tests {
) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
ctx.runtime_env().register_object_store(&url, store.clone());
ctx.register_object_store(&url, store.clone());

let task_ctx = ctx.task_ctx();

@@ -914,7 +914,7 @@ mod tests {
)
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups).unwrap();
let config = partitioned_csv_config(file_schema, file_groups);
let csv = CsvExec::new(
config,
true,
@@ -968,9 +968,7 @@ mod tests {
store.put(&path, data).await.unwrap();

let url = Url::parse("memory://").unwrap();
session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(store));
session_ctx.register_object_store(&url, Arc::new(store));

let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new())
@@ -999,7 +997,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);
let options = CsvReadOptions::default()
.schema_infer_max_records(2)
.has_header(true);
@@ -1039,7 +1037,7 @@ mod tests {
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();

ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
Loading

0 comments on commit d4b5dac

Please sign in to comment.