Skip to content

Commit

Permalink
Add optional flag which advertises host for Arrow Flight SQL #418
Browse files Browse the repository at this point in the history
- Update scheduler_config_spec.toml to include new flag 'advertise_host'
- Add advertise_host member variable to SchedulerServer
- Add advertise_host argument to new, new_with_policy, new_with_builder, and new_with_state in order to propagate flag
- Add None argument to relevant method calls

Add optional flag which advertises host for Arrow Flight SQL #418

- Update logic in job_to_fetch_part to use advertise-host flag when it exists
- Remove default from advertise_host in scheduler_config_spec.toml
- Wrap scheduler_server advertise_host variable in Option
- Update scheduler's main.rs to reflect advertise_host being wrapped in Option

Utilize executor IP for routing to flight results in job_to_fetch_part even when advertise-host flag is set.

Add missing variable and ownership stuff

Remove unnecessary output from do_get_fallback

Responding to PR feedback
- Rename advertise-host to advertise-endpoint
- Replace unwrap calls with expect where possible
- Add missing error handling when parsing advertise-endpoint flag

PR feedback

Co-authored-by: Andy Grove <[email protected]>

PR Feedback
- Using slice rather than array indexing for parsing advertise-endpoint

PR Feedback
- Fix clippy issue
  • Loading branch information
Dalton Modlin committed Oct 31, 2022
1 parent eec5c18 commit f0d1f5b
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 23 deletions.
5 changes: 5 additions & 0 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ conf_file_param = "config_file"
name = "version"
doc = "Print version of this executable"

[[param]]
name = "advertise_endpoint"
type = "String"
doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"

[[param]]
abbr = "b"
name = "config_backend"
Expand Down
38 changes: 29 additions & 9 deletions ballista/scheduler/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,41 @@ impl FlightSqlServiceImpl {
) -> Result<Vec<FlightEndpoint>, Status> {
let mut fieps: Vec<_> = vec![];
for loc in completed.partition_location.iter() {
let (host, port) = if let Some(ref md) = loc.executor_meta {
let (exec_host, exec_port) = if let Some(ref md) = loc.executor_meta {
(md.host.clone(), md.port)
} else {
Err(Status::internal(
"Invalid partition location, missing executor metadata".to_string(),
"Invalid partition location, missing executor metadata and advertise_endpoint flag is undefined.".to_string(),
))?
};

let (host, port) = match &self.server.advertise_endpoint {
Some(endpoint) => {
let advertise_endpoint_vec: Vec<&str> = endpoint.split(":").collect();
match advertise_endpoint_vec.as_slice() {
[host_ip, port] => {
(String::from(*host_ip), FromStr::from_str(*port).expect("Failed to parse port from advertise-endpoint."))
}
_ => {
Err(Status::internal("advertise-endpoint flag has incorrect format. Expected IP:Port".to_string()))?
}
}
}
None => (exec_host.clone(), exec_port.clone()),
};

let fetch = if let Some(ref id) = loc.partition_id {
let fetch = protobuf::FetchPartition {
job_id: id.job_id.clone(),
stage_id: id.stage_id,
partition_id: id.partition_id,
path: loc.path.clone(),
host: host.clone(),
port,
// Use executor ip:port for routing to flight result
host: exec_host.clone(),
port: exec_port,
};
protobuf::Action {
action_type: Some(protobuf::action::ActionType::FetchPartition(
fetch,
)),
action_type: Some(FetchPartition(fetch)),
settings: vec![],
}
} else {
Expand All @@ -227,7 +242,7 @@ impl FlightSqlServiceImpl {
} else {
Err(Status::internal("Error getting stats".to_string()))?
}
let authority = format!("{}:{}", &host, &port); // TODO: my host & port
let authority = format!("{}:{}", &host, &port);
let loc = Location {
uri: format!("grpc+tcp://{}", authority),
};
Expand Down Expand Up @@ -458,7 +473,12 @@ impl FlightSqlService for FlightSqlServiceImpl {
let stream = flight_client
.do_get(request)
.await
.map_err(|e| Status::internal(format!("{:?}", e)))?
.map_err(|e| {
Status::internal(format!(
"Error from within flight_client.do_get(): {:?}\n",
e
))
})?
.into_inner();
return Ok(Response::new(Box::pin(stream)));
}
Expand Down
8 changes: 6 additions & 2 deletions ballista/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};

use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;

use log::info;

#[macro_use]
Expand Down Expand Up @@ -75,6 +75,7 @@ async fn start_server(
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
event_loop_buffer_size: usize,
advertise_endpoint: Option<String>,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
Expand All @@ -85,6 +86,7 @@ async fn start_server(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
scheduling_policy
);

let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
match scheduling_policy {
TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy(
Expand All @@ -93,14 +95,15 @@ async fn start_server(
scheduling_policy,
slots_policy,
BallistaCodec::default(),
default_session_builder,
event_loop_buffer_size,
advertise_endpoint,
),
_ => SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
event_loop_buffer_size,
advertise_endpoint,
),
};

Expand Down Expand Up @@ -255,6 +258,7 @@ async fn main() -> Result<()> {
scheduling_policy,
slots_policy,
event_loop_buffer_size,
opt.advertise_endpoint,
)
.await?;
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
Expand Down Expand Up @@ -663,6 +664,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
);
scheduler.init().await?;

Expand Down Expand Up @@ -743,6 +745,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
);
scheduler.init().await?;

Expand Down
44 changes: 32 additions & 12 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
pub scheduler_name: String,
pub advertise_endpoint: Option<String>,
pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
policy: TaskSchedulingPolicy,
Expand All @@ -69,15 +70,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
config: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
event_loop_buffer_size: usize,
advertise_endpoint: Option<String>,
) -> Self {
SchedulerServer::new_with_policy(
scheduler_name,
let state = Arc::new(SchedulerState::new(
config,
TaskSchedulingPolicy::PullStaged,
SlotsPolicy::Bias,
codec,
default_session_builder,
codec,
scheduler_name.clone(),
SlotsPolicy::Bias,
));

SchedulerServer::new_with_state(
scheduler_name,
TaskSchedulingPolicy::PullStaged,
state,
event_loop_buffer_size,
advertise_endpoint,
)
}

Expand All @@ -87,15 +95,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
event_loop_buffer_size: usize,
advertise_endpoint: Option<String>,
) -> Self {
SchedulerServer::new_with_policy(
scheduler_name,
let state = Arc::new(SchedulerState::new(
config,
TaskSchedulingPolicy::PullStaged,
SlotsPolicy::Bias,
codec,
session_builder,
codec,
scheduler_name.clone(),
SlotsPolicy::Bias,
));

SchedulerServer::new_with_state(
scheduler_name,
TaskSchedulingPolicy::PullStaged,
state,
event_loop_buffer_size,
advertise_endpoint,
)
}

Expand All @@ -105,12 +120,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
event_loop_buffer_size: usize,
advertise_endpoint: Option<String>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config,
session_builder,
default_session_builder,
codec,
scheduler_name.clone(),
slots_policy,
Expand All @@ -121,6 +136,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduling_policy,
state,
event_loop_buffer_size,
advertise_endpoint,
)
}

Expand All @@ -129,6 +145,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
policy: TaskSchedulingPolicy,
state: Arc<SchedulerState<T, U>>,
event_loop_buffer_size: usize,
advertise_endpoint: Option<String>,
) -> Self {
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone(), policy));
Expand All @@ -146,6 +163,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.as_millis(),
policy,
query_stage_event_loop,
advertise_endpoint,
}
}

Expand Down Expand Up @@ -785,6 +803,7 @@ mod test {
BallistaCodec::default(),
default_session_builder,
10000,
None,
);
scheduler.init().await?;

Expand All @@ -805,6 +824,7 @@ mod test {
TaskSchedulingPolicy::PushStaged,
state,
10000,
None,
);
scheduler.init().await?;

Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
Arc::new(client),
BallistaCodec::default(),
10000,
None,
);
scheduler_server.init().await?;
let server = SchedulerGrpcServer::new(scheduler_server.clone());
Expand Down

0 comments on commit f0d1f5b

Please sign in to comment.