Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Add optional flag which advertises host for Arrow Flight SQL apache#418
Browse files Browse the repository at this point in the history
- Update scheduler_config_spec.toml to include new flag 'advertise_host'
- Add advertise_host member variable to SchedulerServer
- Add advertise_host argument to new, new_with_policy, new_with_builder, and new_with_state in order to propagate flag
- Add None argument to relevant method calls

Add optional flag which advertises host for Arrow Flight SQL apache#418

- Update logic in job_to_fetch_part to use advertise-host flag when it exists
- Remove default from advertise_host in scheduler_config_spec.toml
- Wrap scheduler_server advertise_host variable in Option
- Update scheduler's main.rs to reflect advertise_host being wrapped in Option

Utilize executor IP for routing to flight results in job_to_fetch_part even when advertise-host flag is set.

Add missing variable and ownership stuff

Remove unnecessary output from do_get_fallback
  • Loading branch information
Dalton Modlin committed Oct 24, 2022
1 parent eec5c18 commit f36c041
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
5 changes: 5 additions & 0 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ conf_file_param = "config_file"
name = "version"
doc = "Print version of this executable"

[[param]]
name = "advertise_host"
type = "String"
doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"

[[param]]
abbr = "b"
name = "config_backend"
Expand Down
33 changes: 24 additions & 9 deletions ballista/scheduler/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,41 @@ impl FlightSqlServiceImpl {
) -> Result<Vec<FlightEndpoint>, Status> {
let mut fieps: Vec<_> = vec![];
for loc in completed.partition_location.iter() {
let (host, port) = if let Some(ref md) = loc.executor_meta {

let (exec_host, exec_port) = if let Some(ref md) = loc.executor_meta {
(md.host.clone(), md.port)
} else {
Err(Status::internal(
"Invalid partition location, missing executor metadata".to_string(),
"Invalid partition location, missing executor metadata and advertise_host flag is undefined.".to_string(),
))?
};

let (host, port) = match self.server.advertise_host {
Some(_) => {
let advertise_host_flag: Vec<&str> = self
.server
.advertise_host
.as_ref()
.unwrap()
.split(":")
.collect();
(advertise_host_flag[0].to_string(), advertise_host_flag[1].parse().unwrap())
}
None => (exec_host.clone(), exec_port.clone())
};

let fetch = if let Some(ref id) = loc.partition_id {
let fetch = protobuf::FetchPartition {
job_id: id.job_id.clone(),
stage_id: id.stage_id,
partition_id: id.partition_id,
path: loc.path.clone(),
host: host.clone(),
port,
// Use executor ip:port for routing to flight result
host: exec_host.clone(),
port: exec_port,
};
protobuf::Action {
action_type: Some(protobuf::action::ActionType::FetchPartition(
fetch,
)),
action_type: Some(FetchPartition(fetch)),
settings: vec![],
}
} else {
Expand All @@ -227,7 +242,7 @@ impl FlightSqlServiceImpl {
} else {
Err(Status::internal("Error getting stats".to_string()))?
}
let authority = format!("{}:{}", &host, &port); // TODO: my host & port
let authority = format!("{}:{}", &host, &port);
let loc = Location {
uri: format!("grpc+tcp://{}", authority),
};
Expand Down Expand Up @@ -458,7 +473,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let stream = flight_client
.do_get(request)
.await
.map_err(|e| Status::internal(format!("{:?}", e)))?
.map_err(|e| Status::internal(format!("Error from within flight_client.do_get(): {:?}\n", e)))?
.into_inner();
return Ok(Response::new(Box::pin(stream)));
}
Expand Down
4 changes: 4 additions & 0 deletions ballista/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async fn start_server(
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
event_loop_buffer_size: usize,
advertise_host: Option<String>,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
Expand All @@ -95,12 +96,14 @@ async fn start_server(
BallistaCodec::default(),
default_session_builder,
event_loop_buffer_size,
advertise_host,
),
_ => SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
event_loop_buffer_size,
advertise_host,
),
};

Expand Down Expand Up @@ -255,6 +258,7 @@ async fn main() -> Result<()> {
scheduling_policy,
slots_policy,
event_loop_buffer_size,
opt.advertise_host,
)
.await?;
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
Expand Down Expand Up @@ -663,6 +664,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
);
scheduler.init().await?;

Expand Down Expand Up @@ -743,6 +745,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
None,
);
scheduler.init().await?;

Expand Down
11 changes: 11 additions & 0 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
pub scheduler_name: String,
pub advertise_host: Option<String>,
pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
policy: TaskSchedulingPolicy,
Expand All @@ -69,6 +70,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
config: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
event_loop_buffer_size: usize,
advertise_host: Option<String>,
) -> Self {
SchedulerServer::new_with_policy(
scheduler_name,
Expand All @@ -78,6 +80,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
codec,
default_session_builder,
event_loop_buffer_size,
advertise_host
)
}

Expand All @@ -87,6 +90,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
event_loop_buffer_size: usize,
advertise_host: Option<String>,
) -> Self {
SchedulerServer::new_with_policy(
scheduler_name,
Expand All @@ -96,6 +100,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
codec,
session_builder,
event_loop_buffer_size,
advertise_host
)
}

Expand All @@ -107,6 +112,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
event_loop_buffer_size: usize,
advertise_host: Option<String>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config,
Expand All @@ -121,6 +127,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduling_policy,
state,
event_loop_buffer_size,
advertise_host,
)
}

Expand All @@ -129,6 +136,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
policy: TaskSchedulingPolicy,
state: Arc<SchedulerState<T, U>>,
event_loop_buffer_size: usize,
advertise_host: Option<String>,
) -> Self {
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone(), policy));
Expand All @@ -146,6 +154,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.as_millis(),
policy,
query_stage_event_loop,
advertise_host,
}
}

Expand Down Expand Up @@ -785,6 +794,7 @@ mod test {
BallistaCodec::default(),
default_session_builder,
10000,
None,
);
scheduler.init().await?;

Expand All @@ -805,6 +815,7 @@ mod test {
TaskSchedulingPolicy::PushStaged,
state,
10000,
None
);
scheduler.init().await?;

Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
Arc::new(client),
BallistaCodec::default(),
10000,
None,
);
scheduler_server.init().await?;
let server = SchedulerGrpcServer::new(scheduler_server.clone());
Expand Down

0 comments on commit f36c041

Please sign in to comment.