diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 162cd68352ff..ee2f65699309 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -23,13 +23,17 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; use ballista_core::config::BallistaConfig; -use ballista_core::{datasource::DfTableAdapter, utils::create_datafusion_context}; +use ballista_core::{ + datasource::DfTableAdapter, utils::create_df_ctx_with_ballista_query_planner, +}; use datafusion::catalog::TableReference; use datafusion::dataframe::DataFrame; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::dataframe_impl::DataFrameImpl; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; +use datafusion::sql::parser::FileType; struct BallistaContextState { /// Ballista configuration @@ -129,12 +133,14 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let guard = self.state.lock().unwrap(); - let mut ctx = create_datafusion_context( - &guard.scheduler_host, - guard.scheduler_port, - guard.config(), - ); + let mut ctx = { + let guard = self.state.lock().unwrap(); + create_df_ctx_with_ballista_query_planner( + &guard.scheduler_host, + guard.scheduler_port, + guard.config(), + ) + }; let df = ctx.read_parquet(path.to_str().unwrap())?; Ok(df) } @@ -151,12 +157,14 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let guard = self.state.lock().unwrap(); - let mut ctx = create_datafusion_context( - &guard.scheduler_host, - guard.scheduler_port, - guard.config(), - ); + let mut ctx = { + let guard = self.state.lock().unwrap(); + create_df_ctx_with_ballista_query_planner( + &guard.scheduler_host, + guard.scheduler_port, + guard.config(), + ) + }; let df = ctx.read_csv(path.to_str().unwrap(), options)?; Ok(df) } @@ -187,23 +195,59 @@ impl BallistaContext { /// Create a DataFrame from a SQL statement pub fn sql(&self, sql: &str) -> Result> { - // use local DataFusion context for now but later this might call the scheduler - // register tables - let state = self.state.lock().unwrap(); - let mut ctx = create_datafusion_context( - &state.scheduler_host, - state.scheduler_port, - state.config(), - ); - for (name, plan) in &state.tables { - let plan = ctx.optimize(plan)?; - let execution_plan = ctx.create_physical_plan(&plan)?; - ctx.register_table( - TableReference::Bare { table: name }, - Arc::new(DfTableAdapter::new(plan, execution_plan)), - )?; + let mut ctx = { + let state = self.state.lock().unwrap(); + create_df_ctx_with_ballista_query_planner( + &state.scheduler_host, + state.scheduler_port, + state.config(), + ) + }; + + // register tables with DataFusion context + { + let state = self.state.lock().unwrap(); + for (name, plan) in &state.tables { + let plan = ctx.optimize(plan)?; + let execution_plan = ctx.create_physical_plan(&plan)?; + ctx.register_table( + TableReference::Bare { table: name }, + Arc::new(DfTableAdapter::new(plan, execution_plan)), + )?; + } + } + + let plan = ctx.create_logical_plan(sql)?; + match plan { + LogicalPlan::CreateExternalTable { + ref schema, + ref name, + ref location, + ref file_type, + ref has_header, + } => match file_type { + FileType::CSV => { + self.register_csv( + name, + location, + CsvReadOptions::new() + .schema(&schema.as_ref().to_owned().into()) + .has_header(*has_header), + )?; + Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan))) + } + FileType::Parquet => { + self.register_parquet(name, location)?; + Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan))) + } + _ => Err(DataFusionError::NotImplemented(format!( + "Unsupported file type {:?}.", + file_type + ))), + }, + + _ => ctx.sql(sql), } - ctx.sql(sql) } } diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 4187faa6645a..d753f70fa22e 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; use crate::config::BallistaConfig; +use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ array::{ @@ -51,6 +52,7 @@ use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::csv::CsvExec; +use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; @@ -236,8 +238,9 @@ fn build_exec_plan_diagram( Ok(node_id) } -/// Create a DataFusion context that is compatible with Ballista -pub fn create_datafusion_context( +/// Create a DataFusion context that uses the BallistaQueryPlanner to send logical plans +/// to a Ballista scheduler +pub fn create_df_ctx_with_ballista_query_planner( scheduler_host: &str, scheduler_port: u16, config: &BallistaConfig, @@ -272,11 +275,17 @@ impl QueryPlanner for BallistaQueryPlanner { logical_plan: &LogicalPlan, _ctx_state: &ExecutionContextState, ) -> std::result::Result, DataFusionError> { - Ok(Arc::new(DistributedQueryExec::new( - self.scheduler_url.clone(), - self.config.clone(), - logical_plan.clone(), - ))) + match logical_plan { + LogicalPlan::CreateExternalTable { .. } => { + // table state is managed locally in the BallistaContext, not in the scheduler + Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty())))) + } + _ => Ok(Arc::new(DistributedQueryExec::new( + self.scheduler_url.clone(), + self.config.clone(), + logical_plan.clone(), + ))), + } } } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index d2f3045cd727..9d40276d96fb 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -20,7 +20,7 @@ name = "datafusion-cli" version = "4.0.0-SNAPSHOT" authors = ["Apache Arrow "] edition = "2018" -keywords = [ "arrow", "query", "sql", "cli", "repl" ] +keywords = [ "arrow", "datafusion", "ballista", "query", "sql", "cli", "repl" ] license = "Apache-2.0" homepage = "https://github.com/apache/arrow-datafusion" repository = "https://github.com/apache/arrow-datafusion" @@ -31,4 +31,5 @@ clap = "2.33" rustyline = "8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion", version = "5.1.0" } -arrow = { version = "5.0" } +ballista = { path = "../ballista/rust/client", version = "0.6.0" } +arrow = { version = "5.0" } \ No newline at end of file diff --git a/datafusion-cli/README.md b/datafusion-cli/README.md new file mode 100644 index 000000000000..6a4707e2a1d4 --- /dev/null +++ b/datafusion-cli/README.md @@ -0,0 +1,74 @@ + + +# DataFusion Command-line Interface + +The DataFusion CLI allows SQL queries to be executed by an in-process DataFusion context, or by a distributed +Ballista context. + +```ignore +USAGE: + datafusion-cli [FLAGS] [OPTIONS] + +FLAGS: + -h, --help Prints help information + -q, --quiet Reduce printing other than the results and work quietly + -V, --version Prints version information + +OPTIONS: + -c, --batch-size The batch size of each query, or use DataFusion default + -p, --data-path Path to your data, default to current directory + -f, --file ... Execute commands from file(s), then exit + --format Output format [default: table] [possible values: csv, tsv, table, json, ndjson] + --host Ballista scheduler host + --port Ballista scheduler port +``` + +## Example + +Create a CSV file to query. + +```bash,ignore +$ echo "1,2" > data.csv +``` + +```sql,ignore +$ datafusion-cli + +DataFusion CLI v4.0.0-SNAPSHOT + +> CREATE EXTERNAL TABLE foo (a INT, b INT) STORED AS CSV LOCATION 'data.csv'; +0 rows in set. Query took 0.001 seconds. + +> SELECT * FROM foo; ++---+---+ +| a | b | ++---+---+ +| 1 | 2 | ++---+---+ +1 row in set. Query took 0.017 seconds. +``` + +## Ballista + +The DataFusion CLI can connect to a Ballista scheduler for query execution. + +```bash,ignore +datafusion-cli --host localhost --port 50050 +``` \ No newline at end of file diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs index 5b110d315364..74b91ac9f82f 100644 --- a/datafusion-cli/src/lib.rs +++ b/datafusion-cli/src/lib.rs @@ -14,6 +14,11 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +#![doc = include_str!("../README.md")] +#![allow(unused_imports)] +pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION"); + pub mod print_format; use datafusion::arrow::record_batch::RecordBatch; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 7742051dd352..4a4588812c24 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -17,14 +17,6 @@ #![allow(bare_trait_objects)] -use clap::{crate_version, App, Arg}; -use datafusion::error::Result; -use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; -use datafusion_cli::{ - print_format::{all_print_formats, PrintFormat}, - PrintOptions, -}; -use rustyline::Editor; use std::env; use std::fs::File; use std::io::prelude::*; @@ -32,8 +24,27 @@ use std::io::BufReader; use std::path::Path; use std::time::Instant; +use ballista::context::BallistaContext; +use ballista::prelude::BallistaConfig; +use clap::{crate_version, App, Arg}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; +use datafusion_cli::{ + print_format::{all_print_formats, PrintFormat}, + PrintOptions, DATAFUSION_CLI_VERSION, +}; +use rustyline::Editor; + +/// The CLI supports using a local DataFusion context or a distributed BallistaContext +enum Context { + /// In-process execution with DataFusion + Local(ExecutionContext), + /// Distributed execution with Ballista + Remote(BallistaContext), +} + #[tokio::main] -pub async fn main() { +pub async fn main() -> Result<()> { let matches = App::new("DataFusion") .version(crate_version!()) .about( @@ -82,6 +93,18 @@ pub async fn main() { ) .takes_value(true), ) + .arg( + Arg::with_name("host") + .help("Ballista scheduler host") + .long("host") + .takes_value(true), + ) + .arg( + Arg::with_name("port") + .help("Ballista scheduler port") + .long("port") + .takes_value(true), + ) .arg( Arg::with_name("quiet") .help("Reduce printing other than the results and work quietly") @@ -91,6 +114,17 @@ pub async fn main() { ) .get_matches(); + let quiet = matches.is_present("quiet"); + + if !quiet { + println!("DataFusion CLI v{}\n", DATAFUSION_CLI_VERSION); + } + + let host = matches.value_of("host"); + let port = matches + .value_of("port") + .and_then(|port| port.parse::().ok()); + if let Some(path) = matches.value_of("data-path") { let p = Path::new(path); env::set_current_dir(&p).unwrap(); @@ -105,31 +139,43 @@ pub async fn main() { execution_config = execution_config.with_batch_size(batch_size); }; + let ctx: Result = match (host, port) { + (Some(h), Some(p)) => { + let config: BallistaConfig = BallistaConfig::new() + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + Ok(Context::Remote(BallistaContext::remote(h, p, &config))) + } + _ => Ok(Context::Local(ExecutionContext::with_config( + execution_config.clone(), + ))), + }; + let mut ctx = ctx?; + let format = matches .value_of("format") .expect("No format is specified") .parse::() .expect("Invalid format"); - let quiet = matches.is_present("quiet"); let print_options = PrintOptions { format, quiet }; if let Some(file_paths) = matches.values_of("file") { let files = file_paths .map(|file_path| File::open(file_path).unwrap()) .collect::>(); - let mut ctx = ExecutionContext::with_config(execution_config); for file in files { let mut reader = BufReader::new(file); exec_from_lines(&mut ctx, &mut reader, print_options.clone()).await; } } else { - exec_from_repl(execution_config, print_options).await; + exec_from_repl(&mut ctx, print_options).await; } + + Ok(()) } async fn exec_from_lines( - ctx: &mut ExecutionContext, + ctx: &mut Context, reader: &mut BufReader, print_options: PrintOptions, ) { @@ -168,9 +214,7 @@ async fn exec_from_lines( } } -async fn exec_from_repl(execution_config: ExecutionConfig, print_options: PrintOptions) { - let mut ctx = ExecutionContext::with_config(execution_config); - +async fn exec_from_repl(ctx: &mut Context, print_options: PrintOptions) { let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); @@ -186,7 +230,7 @@ async fn exec_from_repl(execution_config: ExecutionConfig, print_options: PrintO Ok(ref line) if line.trim_end().ends_with(';') => { query.push_str(line.trim_end()); rl.add_history_entry(query.clone()); - match exec_and_print(&mut ctx, print_options.clone(), query).await { + match exec_and_print(ctx, print_options.clone(), query).await { Ok(_) => {} Err(err) => println!("{:?}", err), } @@ -234,15 +278,19 @@ fn is_exit_command(line: &str) -> bool { } async fn exec_and_print( - ctx: &mut ExecutionContext, + ctx: &mut Context, print_options: PrintOptions, sql: String, ) -> Result<()> { let now = Instant::now(); - let df = ctx.sql(&sql)?; - let results = df.collect().await?; + let df = match ctx { + Context::Local(datafusion) => datafusion.sql(&sql)?, + Context::Remote(ballista) => ballista.sql(&sql)?, + }; + let results = df.collect().await?; print_options.print_batches(&results, now)?; + Ok(()) }