Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support selecting from local files #152

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,384 changes: 1,011 additions & 373 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,29 @@ default-run = "dft"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] , optional = true }
arrow-flight = { version = "53.0.0", features = ["flight-sql-experimental"] , optional = true }
async-trait = "0.1.80"
clap = { version = "4.5.1", features = ["derive"] }
color-eyre = "0.6.3"
crossterm = { version = "0.28.1", features = ["event-stream"] }
datafusion = "41.0.0"
datafusion-common = "41.0.0"
datafusion-functions-json = { version = "0.41.0", optional = true }
deltalake = { version = "0.19.0", features = ["datafusion"], optional = true }
datafusion = "42.0.0"
datafusion-functions-json = { version = "0.42.0", optional = true }
deltalake = { version = "0.20.0", features = ["datafusion"], optional = true }
directories = "5.0.1"
env_logger = "0.11.5"
futures = "0.3.30"
itertools = "0.13.0"
lazy_static = "1.4.0"
log = "0.4.22"
object_store = { version = "0.10.2", features = ["aws"], optional = true }
object_store = { version = "0.11.0", features = ["aws"], optional = true }
ratatui = "0.28.0"
serde = { version = "1.0.197", features = ["derive"] }
strum = "0.26.2"
tokio = { version = "1.36.0", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1.15"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1.16"
tokio-util = "0.7.10"
toml = "0.8.12"
tonic = { version = "0.11.0", optional = true }
toml = "0.8.19"
tonic = { version = "0.12.2", optional = true }
tui-logger = {version = "0.12", features = ["tracing-support"]}
tui-textarea = { version = "0.6.1", features = ["search"] }
url = { version = "2.5.2", optional = true }
Expand Down
Binary file added data/alltypes_plain.snappy.parquet
Binary file not shown.
9 changes: 9 additions & 0 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ impl ExecutionContext {
extension.register_on_ctx(config, &mut session_ctx)?;
}

// Enable the dynamic file provider (so that we can read files from the
// local filesystem)
// TODO file a ticket to make the API consuming!!
// I did like
// session_ctx.enable_url_table();
// expecting that to work, but it didn't as the code returns a new session context
// instead of modifying the existing one
Comment on lines +77 to +81
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree that would be nicer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/datafusion#12551 tracking tickeet

let session_ctx = session_ctx.enable_url_table();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only change needed (it is a nice API thanks to @goldmedal)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty awesome


Ok(Self {
session_ctx,
#[cfg(feature = "flightsql")]
Expand Down
3 changes: 2 additions & 1 deletion src/execution/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::error::DataFusionError;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use log::info;
use std::sync::Arc;
Expand Down Expand Up @@ -46,7 +47,7 @@ impl From<PlanVisitor> for ExecutionStats {
}

impl ExecutionPlanVisitor for PlanVisitor {
type Error = datafusion_common::DataFusionError;
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result<bool, Self::Error> {
match plan.metrics() {
Expand Down
3 changes: 2 additions & 1 deletion src/extensions/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`DftSessionStateBuilder`] for configuring DataFusion [`SessionState`]

use datafusion::catalog::TableProviderFactory;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::session_state::SessionStateBuilder;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl DftSessionStateBuilder {
}

/// Build the [`SessionState`] from the specified configuration
pub fn build(self) -> datafusion_common::Result<SessionState> {
pub fn build(self) -> Result<SessionState> {
let Self {
session_config,
table_factories,
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/functions_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

use crate::config::ExecutionConfig;
use crate::extensions::{DftSessionStateBuilder, Extension};
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
use datafusion_common::Result;

#[derive(Debug, Default)]
pub struct JsonFunctionsExtension {}
Expand All @@ -38,7 +38,7 @@ impl Extension for JsonFunctionsExtension {
&self,
_config: &ExecutionConfig,
builder: DftSessionStateBuilder,
) -> datafusion_common::Result<DftSessionStateBuilder> {
) -> Result<DftSessionStateBuilder> {
//
Ok(builder)
}
Expand Down
28 changes: 28 additions & 0 deletions tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,34 @@ fn test_multiple_sql_in_multiple_args2() {
assert.stdout(contains_str(expected));
}


// Validate that the CLI is correctly connected to query files
#[test]
fn test_query_file() {
// Note that the results come out as two batches as the batch size is set to 1
let expected = r##"
+------------+
| double_col |
+------------+
| 0.0 |
+------------+
+------------+
| double_col |
+------------+
| 10.1 |
+------------+
"##;
let assert = Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg("SELECT double_col from 'data/alltypes_plain.snappy.parquet'")
.assert()
.success();

assert.stdout(contains_str(expected));
}


/// Creates a temporary file with the given SQL content
pub fn sql_in_file(sql: impl AsRef<str>) -> NamedTempFile {
let file = NamedTempFile::new().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions tests/extensions.rs → tests/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
// specific language governing permissions and limitations
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the tests from 'extensions' to 'execution' to better reflect it is testing execution in general rather than only extension execution

// under the License.

//! Tests for extensions (stored in the `extension_cases` directory)
//! Tests for `ExecutionContext` and extensions (stored in the `execution_cases` directory)

use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::error::Result;
use datafusion::sql::parser::DFParser;
use datafusion_common::Result;
use dft::config::AppConfig;
use dft::execution::ExecutionContext;
use futures::{StreamExt, TryStreamExt};
use log::debug;

mod extension_cases;
mod execution_cases;

/// Encapsulates an `ExecutionContext` for running queries in tests
pub struct TestExecution {
Expand Down
54 changes: 54 additions & 0 deletions tests/execution_cases/basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Tests for basic execution

use crate::TestExecution;

#[tokio::test]
async fn test_basic_execution() {
let mut execution = TestExecution::new();

let actual = execution.run_and_format("SELECT 1+3, 11").await;

insta::assert_yaml_snapshot!(actual, @r###"
- +---------------------+-----------+
- "| Int64(1) + Int64(3) | Int64(11) |"
- +---------------------+-----------+
- "| 4 | 11 |"
- +---------------------+-----------+
"###);
}

#[tokio::test]
async fn test_ddl_statements() {
let mut execution = TestExecution::new()
.with_setup("CREATE TABLE foo(x int) as VALUES (11), (12), (13)")
.await;

let actual = execution.run_and_format("SELECT x, x+1 FROM foo").await;

insta::assert_yaml_snapshot!(actual, @r###"
- +----+------------------+
- "| x | foo.x + Int64(1) |"
- +----+------------------+
- "| 11 | 12 |"
- "| 12 | 13 |"
- "| 13 | 14 |"
- +----+------------------+
"###);
}
2 changes: 2 additions & 0 deletions tests/extension_cases/mod.rs → tests/execution_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@
// specific language governing permissions and limitations
// under the License.

mod basic;
#[cfg(feature = "functions-json")]
mod functions_json;
mod url_table;
38 changes: 38 additions & 0 deletions tests/execution_cases/url_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Tests for selecting data from local files
use crate::TestExecution;


#[tokio::test]
async fn local_csv_file() {
let mut execution = TestExecution::new();

let actual = execution
.run_and_format("SELECT * from 'data/alltypes_plain.snappy.parquet'")
.await;

insta::assert_yaml_snapshot!(actual, @r###"
- +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
- "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |"
- +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
- "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |"
- "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |"
- +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
"###);
}
Loading