Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store sessions so users can register tables and query them through flight #269

Merged
merged 2 commits into from
Sep 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "22.0.0" }
arrow-flight = { version = "22.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ message FetchPartition {
uint32 stage_id = 2;
uint32 partition_id = 3;
string path = 4;
string host = 5;
uint32 port = 6;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add host & port to flights so that when the scheduler gets a flight request, it can go find the source. Architecturally, I see this as a good canonicalization of the source of Flight data, but I'm happy to discuss here!

}

// Mapping from partition id to executor id
Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,16 @@ impl BallistaClient {
stage_id: usize,
partition_id: usize,
path: &str,
host: &str,
port: u16,
) -> Result<SendableRecordBatchStream> {
let action = Action::FetchPartition {
job_id: job_id.to_string(),
stage_id,
partition_id,
path: path.to_owned(),
host: host.to_string(),
port,
};
self.execute_action(&action).await
}
Expand Down
11 changes: 7 additions & 4 deletions ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,19 @@ async fn fetch_partition(
let partition_id = location.partition_id.ok_or_else(|| {
DataFusionError::Internal("Received empty partition id".to_owned())
})?;
let mut ballista_client =
BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let host = metadata.host.as_str();
let port = metadata.port as u16;
let mut ballista_client = BallistaClient::try_new(host, port)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
ballista_client
.fetch_partition(
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
host,
port,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))
Expand Down
11 changes: 7 additions & 4 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,19 @@ async fn fetch_partition(
let partition_id = &location.partition_id;
// TODO for shuffle client connections, we should avoid creating new connections again and again.
// And we should also avoid to keep alive too many connections for long time.
let mut ballista_client =
BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let host = metadata.host.as_str();
let port = metadata.port as u16;
let mut ballista_client = BallistaClient::try_new(host, port)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
ballista_client
.fetch_partition(
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
host,
port,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))
Expand Down
14 changes: 14 additions & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! as convenience code for interacting with the generated code.

use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
use arrow_flight::sql::ProstMessageExt;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::{FunctionRegistry, Operator};
use datafusion::physical_plan::join_utils::JoinSide;
Expand All @@ -39,6 +40,19 @@ pub mod generated;
pub mod physical_plan;
pub mod scheduler;

impl ProstMessageExt for protobuf::Action {
fn type_url() -> &'static str {
"type.googleapis.com/arrow.flight.protocol.sql.Action"
}

fn as_any(&self) -> prost_types::Any {
prost_types::Any {
type_url: protobuf::Action::type_url().to_string(),
value: self.encode_to_vec(),
}
}
}

pub fn decode_protobuf(bytes: &[u8]) -> Result<BallistaAction, BallistaError> {
let mut buf = Cursor::new(bytes);

Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ impl TryInto<Action> for protobuf::Action {
stage_id: fetch.stage_id as usize,
partition_id: fetch.partition_id as usize,
path: fetch.path,
host: fetch.host,
port: fetch.port as u16,
}),
_ => Err(BallistaError::General(
"scheduler::from_proto(Action) invalid or missing action".to_owned(),
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub enum Action {
stage_id: usize,
partition_id: usize,
path: String,
host: String,
port: u16,
},
}

Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ impl TryInto<protobuf::Action> for Action {
stage_id,
partition_id,
path,
host,
port,
} => Ok(protobuf::Action {
action_type: Some(ActionType::FetchPartition(protobuf::FetchPartition {
job_id,
stage_id: stage_id as u32,
partition_id: partition_id as u32,
path,
host,
port: port as u32,
})),
settings: vec![],
}),
Expand Down
22 changes: 20 additions & 2 deletions ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Implementation of the Apache Arrow Flight protocol that wraps an executor.

use std::convert::TryFrom;
use std::fs::File;
use std::pin::Pin;

Expand All @@ -35,14 +36,15 @@ use datafusion::arrow::{
record_batch::RecordBatch,
};
use futures::{Stream, StreamExt};
use log::{debug, warn};
use log::{debug, info, warn};
use std::io::{Read, Seek};
use tokio::sync::mpsc::channel;
use tokio::{
sync::mpsc::{Receiver, Sender},
task,
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
use tonic::{Request, Response, Status, Streaming};

type FlightDataSender = Sender<Result<FlightData, Status>>;
Expand Down Expand Up @@ -135,7 +137,23 @@ impl FlightService for BallistaFlightService {
&self,
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, Status> {
Err(Status::unimplemented("handshake"))
let token = uuid::Uuid::new_v4();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working properly (i.e. supporting alternate endpoints) the Flight SQL JDBC driver will try to talk to the executor with the same credentials as it used with the scheduler. Let's just accept any credentials for now, so that when the issue is fixed in the arrow repo this will still work.

info!("do_handshake token={}", token);

let result = HandshakeResponse {
protocol_version: 0,
payload: token.as_bytes().to_vec(),
};
let result = Ok(result);
let output = futures::stream::iter(vec![result]);
let str = format!("Bearer {}", token);
let mut resp: Response<
Pin<Box<dyn Stream<Item = Result<_, Status>> + Sync + Send>>,
> = Response::new(Box::pin(output));
let md = MetadataValue::try_from(str)
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
resp.metadata_mut().insert("authorization", md);
Ok(resp)
}

async fn list_flights(
Expand Down
Loading