Skip to content

Commit

Permalink
feat: update flight-sql to latest specs (apache#4250)
Browse files Browse the repository at this point in the history
* feat: update flight-sql to latest specs

* fix: pr feedback
  • Loading branch information
roeap authored May 23, 2023
1 parent df691d5 commit 5752997
Show file tree
Hide file tree
Showing 13 changed files with 3,150 additions and 1,958 deletions.
100 changes: 90 additions & 10 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::sql::{
ActionCreatePreparedStatementResult, Any, ProstMessageExt, SqlInfo,
ActionBeginSavepointRequest, ActionBeginSavepointResult,
ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult,
ActionCreatePreparedStatementResult, ActionEndSavepointRequest,
ActionEndTransactionRequest, Any, CommandStatementSubstraitPlan, ProstMessageExt,
SqlInfo,
};
use arrow_flight::{
Action, FlightData, FlightEndpoint, HandshakeRequest, HandshakeResponse, IpcMessage,
Expand All @@ -40,8 +44,9 @@ use arrow_flight::{
flight_service_server::FlightService,
flight_service_server::FlightServiceServer,
sql::{
server::FlightSqlService, ActionClosePreparedStatementRequest,
ActionCreatePreparedStatementRequest, CommandGetCatalogs,
server::FlightSqlService, ActionBeginTransactionRequest,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedSubstraitPlanRequest, CommandGetCatalogs,
CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
Expand Down Expand Up @@ -177,6 +182,16 @@ impl FlightSqlService for FlightSqlServiceImpl {
))
}

async fn get_flight_info_substrait_plan(
&self,
_query: CommandStatementSubstraitPlan,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_substrait_plan not implemented",
))
}

async fn get_flight_info_prepared_statement(
&self,
cmd: CommandPreparedStatementQuery,
Expand Down Expand Up @@ -220,6 +235,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
endpoint: endpoints,
total_records: num_rows as i64,
total_bytes: num_bytes as i64,
ordered: false,
};
let resp = Response::new(info);
Ok(resp)
Expand Down Expand Up @@ -441,6 +457,16 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(FAKE_UPDATE_RESULT)
}

async fn do_put_substrait_plan(
&self,
_ticket: CommandStatementSubstraitPlan,
_request: Request<Streaming<FlightData>>,
) -> Result<i64, Status> {
Err(Status::unimplemented(
"do_put_substrait_plan not implemented",
))
}

async fn do_put_prepared_statement_query(
&self,
_query: CommandPreparedStatementQuery,
Expand Down Expand Up @@ -486,8 +512,62 @@ impl FlightSqlService for FlightSqlServiceImpl {
&self,
_query: ActionClosePreparedStatementRequest,
_request: Request<Action>,
) {
unimplemented!("Implement do_action_close_prepared_statement")
) -> Result<(), Status> {
Err(Status::unimplemented(
"Implement do_action_close_prepared_statement",
))
}

async fn do_action_create_prepared_substrait_plan(
&self,
_query: ActionCreatePreparedSubstraitPlanRequest,
_request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
Err(Status::unimplemented(
"Implement do_action_create_prepared_substrait_plan",
))
}

async fn do_action_begin_transaction(
&self,
_query: ActionBeginTransactionRequest,
_request: Request<Action>,
) -> Result<ActionBeginTransactionResult, Status> {
Err(Status::unimplemented(
"Implement do_action_begin_transaction",
))
}

async fn do_action_end_transaction(
&self,
_query: ActionEndTransactionRequest,
_request: Request<Action>,
) -> Result<(), Status> {
Err(Status::unimplemented("Implement do_action_end_transaction"))
}

async fn do_action_begin_savepoint(
&self,
_query: ActionBeginSavepointRequest,
_request: Request<Action>,
) -> Result<ActionBeginSavepointResult, Status> {
Err(Status::unimplemented("Implement do_action_begin_savepoint"))
}

async fn do_action_end_savepoint(
&self,
_query: ActionEndSavepointRequest,
_request: Request<Action>,
) -> Result<(), Status> {
Err(Status::unimplemented("Implement do_action_end_savepoint"))
}

async fn do_action_cancel_query(
&self,
_query: ActionCancelQueryRequest,
_request: Request<Action>,
) -> Result<ActionCancelQueryResult, Status> {
Err(Status::unimplemented("Implement do_action_cancel_query"))
}

async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
Expand Down Expand Up @@ -718,7 +798,7 @@ mod tests {
test_all_clients(|mut client| async move {
auth_client(&mut client).await;

let mut stmt = client.prepare("select 1;".to_string()).await.unwrap();
let mut stmt = client.prepare("select 1;".to_string(), None).await.unwrap();

let flight_info = stmt.execute().await.unwrap();

Expand Down Expand Up @@ -746,7 +826,7 @@ mod tests {
test_all_clients(|mut client| async move {
auth_client(&mut client).await;
let res = client
.execute_update("creat table test(a int);".to_string())
.execute_update("creat table test(a int);".to_string(), None)
.await
.unwrap();
assert_eq!(res, FAKE_UPDATE_RESULT);
Expand All @@ -759,7 +839,7 @@ mod tests {
test_all_clients(|mut client| async move {
// no handshake
assert!(client
.prepare("select 1;".to_string())
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string()
Expand All @@ -776,7 +856,7 @@ mod tests {
// forget to set_token
client.handshake("admin", "password").await.unwrap();
assert!(client
.prepare("select 1;".to_string())
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string()
Expand All @@ -786,7 +866,7 @@ mod tests {
client.handshake("admin", "password").await.unwrap();
client.set_token("wrong token".to_string());
assert!(client
.prepare("select 1;".to_string())
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string()
Expand Down
21 changes: 19 additions & 2 deletions arrow-flight/src/arrow.flight.protocol.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion arrow-flight/src/bin/flight_sql_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ async fn main() {
setup_logging();
let mut client = setup_client(args.client_args).await.expect("setup client");

let info = client.execute(args.query).await.expect("prepare statement");
let info = client
.execute(args.query, None)
.await
.expect("prepare statement");
info!("got flight info");

let schema = Arc::new(Schema::try_from(info.clone()).expect("valid schema"));
Expand Down
2 changes: 2 additions & 0 deletions arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ impl FlightInfo {
endpoint: Vec<FlightEndpoint>,
total_records: i64,
total_bytes: i64,
ordered: bool,
) -> Self {
let IpcMessage(vals) = message;
FlightInfo {
Expand All @@ -432,6 +433,7 @@ impl FlightInfo {
endpoint,
total_records,
total_bytes,
ordered,
}
}

Expand Down
Loading

0 comments on commit 5752997

Please sign in to comment.