-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store sessions so users can register tables and query them through flight #269
Conversation
@andygrove this is the big one that we've been sitting on until Arrow upgraded to 22. |
@@ -135,7 +137,23 @@ impl FlightService for BallistaFlightService { | |||
&self, | |||
_request: Request<Streaming<HandshakeRequest>>, | |||
) -> Result<Response<Self::HandshakeStream>, Status> { | |||
Err(Status::unimplemented("handshake")) | |||
let token = uuid::Uuid::new_v4(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When working properly (i.e. supporting alternate endpoints) the Flight SQL JDBC driver will try to talk to the executor with the same credentials as it used with the scheduler. Let's just accept any credentials for now, so that when the issue is fixed in the arrow
repo this will still work.
use uuid::Uuid; | ||
|
||
pub struct FlightSqlServiceImpl { | ||
server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>, | ||
statements: Arc<Mutex<HashMap<Uuid, LogicalPlan>>>, | ||
contexts: Arc<Mutex<HashMap<Uuid, Arc<SessionContext>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store a map of SessionContext
to UUID
so we can look up the session for the user and keep the tables registered.
TODO: move this into etcd?
} | ||
let user = parts[0]; | ||
let pass = parts[1]; | ||
if user != "admin" || pass != "password" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: authentication providers? (i.e. oauth, active directory, etc)
|
||
let result = HandshakeResponse { | ||
protocol_version: 0, | ||
payload: token.as_bytes().to_vec(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't actually care about credentials at all, we just need auth to be active in order to get this handshake request and give them back a session token.
addr, e | ||
)) | ||
})?; | ||
let mut flight_client = FlightServiceClient::new(connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proxy the flight to the correct executor instance.
) -> Result<Response<FlightInfo>, Status> { | ||
let ctx = self.create_ctx().await?; | ||
debug!("get_flight_info_prepared_statement"); | ||
let ctx = self.get_ctx(&request)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the UUID token to get the cached SessionContext
.
@@ -517,6 +517,8 @@ message FetchPartition { | |||
uint32 stage_id = 2; | |||
uint32 partition_id = 3; | |||
string path = 4; | |||
string host = 5; | |||
uint32 port = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add host & port to flights so that when the scheduler gets a flight request, it can go find the source. Architecturally, I see this as a good canonicalization of the source of Flight data, but I'm happy to discuss here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @avantgardnerio! I pulled this PR locally and tested it out. I was able to register tables and query them. This is a huge leap forward.
Which issue does this PR close?
Closes #112.
Rationale for this change
Allow users to do more than
select 1;
thought Flight SQL.What changes are included in this PR?
Are there any user-facing changes?
Users can now register tables and query them through Flight SQL (and JDBC).