From 57e032e5602d11a6cef323856c0e4a39abe8e4e7 Mon Sep 17 00:00:00 2001 From: gaojun Date: Tue, 18 Jan 2022 16:11:41 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=B7=BB=E5=8A=A0show=20tables,=20show=20c?= =?UTF-8?q?olumns=20=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/tests/argo_engine_server_test.rs | 51 ++++++++++++ ci/tests/stream_load_flight_service_test.rs | 77 +------------------ client/rust-client/Cargo.toml | 1 + client/rust-client/src/argo_engine_context.rs | 40 +++++++++- 4 files changed, 94 insertions(+), 75 deletions(-) diff --git a/ci/tests/argo_engine_server_test.rs b/ci/tests/argo_engine_server_test.rs index 8b137891791fe..9ff41f89895fa 100644 --- a/ci/tests/argo_engine_server_test.rs +++ b/ci/tests/argo_engine_server_test.rs @@ -1 +1,52 @@ +#[cfg(feature = "standalone")] +#[tokio::test] +async fn test_argo_engine_server() { + start_server().await; + test_server().await; +} +#[cfg(feature = "standalone")] +async fn start_server() { + ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 1) + .await + .unwrap(); +} + +#[cfg(feature = "standalone")] +async fn test_server() { + let context_remote = ArgoEngineContext::remote("localhost", 50050, &ArgoEngineConfig::new()); + + let dir = env!("CARGO_MANIFEST_DIR"); + let sql = format!("CREATE EXTERNAL TABLE inte_test_table (a INT, b INT) STORED AS CSV LOCATION '{}/data/test_inte/test.csv'", dir); + println!("sql={}", sql); + let results = context_remote.sql(sql.as_str()).await; + assert!(results.is_ok()); + + let sql = "select * from inte_test_table"; + println!("sql={}", sql); + let df = context_remote.sql(sql).await.unwrap(); + let results = df.collect().await.unwrap(); + let vec1 = vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| 3 | 4 |", + "+---+---+", + ]; + assert_batches_eq!(vec1, &results); + + let sql = "show tables"; + println!("sql={}", sql); + let df = context_remote.sql(sql).await.unwrap(); + let results = df.collect().await.unwrap(); + let vec1 = vec![ + "+---------------+--------------------+-----------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+-----------------+------------+", + "| datafusion | public | inte_test_table | BASE TABLE |", + "| datafusion | information_schema | tables | VIEW |", + "| datafusion | information_schema | columns | VIEW |", + "+---------------+--------------------+-----------------+------------+", + ]; + assert_batches_eq!(vec1, &results); +} diff --git a/ci/tests/stream_load_flight_service_test.rs b/ci/tests/stream_load_flight_service_test.rs index 14ebd343e1c8e..bb8134a34c64d 100644 --- a/ci/tests/stream_load_flight_service_test.rs +++ b/ci/tests/stream_load_flight_service_test.rs @@ -1,25 +1,5 @@ -use arrow::util::pretty; -use std::error::Error; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::Arc; use tokio; -use datafusion::arrow::datatypes::Schema; - -use arrow_flight::flight_service_server::FlightServiceServer; -use arrow_flight::utils::flight_data_to_arrow_batch; -use arrow_flight::{flight_descriptor, Action, Criteria, FlightDescriptor, Ticket}; -use tokio::sync::Mutex; - -use common::config::ArgoEngineConfig; -use common::error::ArgoEngineError; -use tonic::transport::Server; - -use datafusion::assert_batches_eq; -use datafusion::prelude::ExecutionConfig; -use rust_client::argo_engine_context::ArgoEngineContext; -use rust_client::steam_load_flight_client::StreamLoadFlightClient; - #[cfg(feature = "standalone")] #[tokio::test] async fn test_get() { @@ -39,7 +19,6 @@ async fn test_get() { test_get_schema().await; test_get_flight_info().await; test_list_flight().await; - test_server().await; } #[cfg(feature = "standalone")] @@ -127,57 +106,7 @@ async fn test_list_flight() { #[cfg(feature = "standalone")] async fn start_server() { - ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 4).await.unwrap(); - // let mut context: Option = None; - // let mut n = 0; - // let mut success = false; - // while !success && n < 10 { - // match ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 4).await { - // Err(_) => { - // success = false; - // n += 1; - // - // tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - // log::info!("Attempting to start server...",); - // } - // Ok(s) => { - // success = true; - // n += 1; - // - // context = Some(s); - // } - // } - // } - - // match context { - // Some(m) => Ok(m), - // None => Err(ArgoEngineError::General("start server error".to_string())), - // } + ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 4) + .await + .unwrap(); } - - -#[cfg(feature = "standalone")] -async fn test_server() { - // let context = ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 1) - // .await - // .unwrap(); - - let context_remote = - ArgoEngineContext::remote("localhost", 50050, &ArgoEngineConfig::new()); - - let dir = env!("CARGO_MANIFEST_DIR"); - let sql = format!("CREATE EXTERNAL TABLE inte_test_table (a INT, b INT) STORED AS CSV LOCATION '{}/data/test_inte/test.csv'", dir); - println!("sql={}", sql); - let df = context_remote.sql(sql.as_str()).await.unwrap(); - // let result = df.show().await; - let results = df.collect().await.unwrap(); - pretty::print_batches(&results).unwrap(); - - let sql = "select * from inte_test_table"; - println!("sql={}", sql); - let df = context_remote.sql(sql).await.unwrap(); - // let result = df.show().await; - let results = df.collect().await.unwrap(); - pretty::print_batches(&results).unwrap(); - assert!(results.len() > 0); -} \ No newline at end of file diff --git a/client/rust-client/Cargo.toml b/client/rust-client/Cargo.toml index 9117da7b1972a..3cf12b7f6bca6 100644 --- a/client/rust-client/Cargo.toml +++ b/client/rust-client/Cargo.toml @@ -12,6 +12,7 @@ futures = "0.3.16" tokio = "1.10.0" log = "0.4" tonic = "0.6" +sqlparser = "0.13" server = {path = "../../server" , optional = true } common = { path= "../../common" } diff --git a/client/rust-client/src/argo_engine_context.rs b/client/rust-client/src/argo_engine_context.rs index 135f9c84e46cf..b413063b90cb0 100644 --- a/client/rust-client/src/argo_engine_context.rs +++ b/client/rust-client/src/argo_engine_context.rs @@ -26,7 +26,9 @@ use datafusion::datasource::file_format::avro::AvroFormat; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; -use datafusion::sql::parser::FileType; +use datafusion::prelude::{ExecutionConfig, ExecutionContext}; +use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement}; +use sqlparser::ast::Statement; pub struct ArgoEngineContextState { /// Ballista configuration @@ -122,6 +124,13 @@ impl ArgoEngineContext { ) }; + let is_show = self.is_show_statement(sql).await?; + // the show tables、 show columns sql can not run at scheduler because the tables is store at client + if is_show { + ctx = + ExecutionContext::with_config(ExecutionConfig::new().with_information_schema(true)); + } + // get the catalogs from ArgoEngineServer let catalog_data = self.get_catalogs().await?; // register catalog with DataFusion context @@ -185,6 +194,35 @@ impl ArgoEngineContext { } } + /// is a 'show *' sql + pub async fn is_show_statement(&self, sql: &str) -> Result { + let mut is_show_variable: bool = false; + let statements = DFParser::parse_sql(sql)?; + + if statements.len() != 1 { + return Err(DataFusionError::NotImplemented( + "The context currently only supports a single SQL statement".to_string(), + )); + } + + if let DFStatement::Statement(s) = &statements[0] { + let st: &Statement = s; + match st { + Statement::ShowVariable { .. } => { + is_show_variable = true; + } + Statement::ShowColumns { .. } => { + is_show_variable = true; + } + _ => { + is_show_variable = false; + } + } + }; + + Ok(is_show_variable) + } + /// get catalogs from scheduler pub async fn get_catalogs(&self) -> Result> { let scheduler_url = format!( From 12ea08dd4a1cfbc7084a15ac1cbcd8a01dcad57d Mon Sep 17 00:00:00 2001 From: gaojun Date: Tue, 18 Jan 2022 17:14:08 +0800 Subject: [PATCH 2/3] fix build error --- ci/tests/argo_engine_server_test.rs | 4 ++++ ci/tests/stream_load_flight_service_test.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ci/tests/argo_engine_server_test.rs b/ci/tests/argo_engine_server_test.rs index 9ff41f89895fa..02f4f1193dc89 100644 --- a/ci/tests/argo_engine_server_test.rs +++ b/ci/tests/argo_engine_server_test.rs @@ -1,3 +1,7 @@ +use datafusion::assert_batches_eq; +use common::config::ArgoEngineConfig; +use rust_client::argo_engine_context::ArgoEngineContext; + #[cfg(feature = "standalone")] #[tokio::test] async fn test_argo_engine_server() { diff --git a/ci/tests/stream_load_flight_service_test.rs b/ci/tests/stream_load_flight_service_test.rs index bb8134a34c64d..e00af2c066ed7 100644 --- a/ci/tests/stream_load_flight_service_test.rs +++ b/ci/tests/stream_load_flight_service_test.rs @@ -1,4 +1,8 @@ +use arrow_flight::{Action, Criteria, flight_descriptor, FlightDescriptor, Ticket}; use tokio; +use common::config::ArgoEngineConfig; +use rust_client::argo_engine_context::ArgoEngineContext; +use rust_client::steam_load_flight_client::StreamLoadFlightClient; #[cfg(feature = "standalone")] #[tokio::test] From 4909540fee3f281da43af9f5dc8628c834187def Mon Sep 17 00:00:00 2001 From: gaojun Date: Tue, 18 Jan 2022 17:20:39 +0800 Subject: [PATCH 3/3] fix build error --- ci/branch_build.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ci/branch_build.sh b/ci/branch_build.sh index e822884fa5a9b..f50a0f2c71b25 100644 --- a/ci/branch_build.sh +++ b/ci/branch_build.sh @@ -41,8 +41,7 @@ echo -e "\033[42;37m 开始 build \033[0m" rustup component add llvm-tools-preview cargo install grcov export RUST_BACKTRACE=full -cargo build --release --features "simd mimalloc" -cargo test --release --features "simd mimalloc" +cargo test --release --features "simd standalone" # 生成测试报告 echo -e "\033[42;37m 生成测试报告 build \033[0m"