Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler state with different management policy for volatile and stable states #1810

Merged
merged 2 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -848,13 +848,16 @@ message ColumnStats {
uint32 distinct_count = 4;
}

// Used by scheduler
message ExecutorMetadata {
string id = 1;
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
}

// Used by grpc
message ExecutorRegistration {
string id = 1;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
Expand All @@ -864,10 +867,11 @@ message ExecutorRegistration {
}
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
}

message ExecutorHeartbeat {
ExecutorMetadata meta = 1;
string executor_id = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
ExecutorState state = 3;
Expand Down Expand Up @@ -929,7 +933,7 @@ message ShuffleWritePartition {
}

message TaskStatus {
PartitionId partition_id = 1;
PartitionId task_id = 1;
oneof status {
RunningTask running = 2;
FailedTask failed = 3;
Expand Down Expand Up @@ -957,19 +961,18 @@ message PollWorkResult {

message RegisterExecutorParams {
ExecutorRegistration metadata = 1;
ExecutorSpecification specification = 2;
}

message RegisterExecutorResult {
bool success = 1;
}

message SendHeartBeatParams {
ExecutorRegistration metadata = 1;
message HeartBeatParams {
string executor_id = 1;
ExecutorState state = 2;
}

message SendHeartBeatResult {
message HeartBeatResult {
// TODO it's from Spark for BlockManager
bool reregister = 1;
}
Expand All @@ -981,7 +984,7 @@ message StopExecutorResult {
}

message UpdateTaskStatusParams {
ExecutorRegistration metadata = 1;
string executor_id = 1;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 2;
}
Expand Down Expand Up @@ -1067,7 +1070,7 @@ service SchedulerGrpc {

// Push-based task scheduler will only leverage this interface
// rather than the PollWork interface to report executor states
rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {}
rpc HeartBeatFromExecutor (HeartBeatParams) returns (HeartBeatResult) {}

rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}

Expand Down
18 changes: 12 additions & 6 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,47 @@ impl PartitionId {
#[derive(Debug, Clone)]
pub struct PartitionLocation {
pub partition_id: PartitionId,
pub executor_meta: ExecutorMeta,
pub executor_meta: ExecutorMetadata,
pub partition_stats: PartitionStats,
pub path: String,
}

/// Meta-data for an executor, used when fetching shuffle partitions from other executors
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ExecutorMeta {
pub struct ExecutorMetadata {
pub id: String,
pub host: String,
pub port: u16,
pub grpc_port: u16,
pub specification: ExecutorSpecification,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn into(self) -> protobuf::ExecutorMetadata {
protobuf::ExecutorMetadata {
id: self.id,
host: self.host,
port: self.port as u32,
grpc_port: self.grpc_port as u32,
specification: Some(self.specification.into()),
}
}
}

impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
impl From<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn from(meta: protobuf::ExecutorMetadata) -> Self {
Self {
id: meta.id,
host: meta.host,
port: meta.port as u16,
grpc_port: meta.grpc_port as u16,
specification: meta.specification.unwrap().into(),
}
}
}

/// Specification of an executor, indicting executor resources, like total task slots
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorSpecification {
pub task_slots: u32,
Expand Down Expand Up @@ -136,6 +140,7 @@ impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
}
}

/// From Spark, available resources for an executor, like available task slots
#[derive(Debug, Clone, Serialize)]
pub struct ExecutorData {
pub executor_id: String,
Expand Down Expand Up @@ -204,6 +209,7 @@ impl From<protobuf::ExecutorData> for ExecutorData {
}
}

/// The internal state of an executor, like cpu usage, memory usage, etc
#[derive(Debug, Clone, Copy, Serialize)]
pub struct ExecutorState {
// in bytes
Expand Down Expand Up @@ -359,7 +365,7 @@ pub struct ExecutePartition {
/// The physical plan for this query stage
pub plan: Arc<dyn ExecutionPlan>,
/// Location of shuffle partitions that this query stage may depend on
pub shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
pub shuffle_locations: HashMap<PartitionId, ExecutorMetadata>,
/// Output partitioning for shuffle writes
pub output_partitioning: Option<Partitioning>,
}
Expand All @@ -370,7 +376,7 @@ impl ExecutePartition {
stage_id: usize,
partition_id: Vec<usize>,
plan: Arc<dyn ExecutionPlan>,
shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
shuffle_locations: HashMap<PartitionId, ExecutorMetadata>,
output_partitioning: Option<Partitioning>,
) -> Self {
Self {
Expand Down
20 changes: 12 additions & 8 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use datafusion::physical_plan::ExecutionPlan;
use log::{debug, error, info, warn};
use tonic::transport::Channel;

use ballista_core::serde::protobuf::ExecutorRegistration;
use ballista_core::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
TaskDefinition, TaskStatus,
Expand All @@ -33,16 +32,23 @@ use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};

pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
executor: Arc<Executor>,
executor_meta: ExecutorRegistration,
concurrent_tasks: usize,
codec: BallistaCodec<T, U>,
) {
let available_tasks_slots = Arc::new(AtomicUsize::new(concurrent_tasks));
let executor_specification: ExecutorSpecification = executor
.metadata
.specification
.as_ref()
.unwrap()
.clone()
.into();
let available_tasks_slots =
Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();

Expand All @@ -61,7 +67,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
tonic::Status,
> = scheduler
.poll_work(PollWorkParams {
metadata: Some(executor_meta.clone()),
metadata: Some(executor.metadata.clone()),
can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0,
task_status,
})
Expand All @@ -74,7 +80,6 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
if let Some(task) = result.into_inner().task {
match run_received_tasks(
executor.clone(),
executor_meta.id.clone(),
available_tasks_slots.clone(),
task_status_sender,
task,
Expand Down Expand Up @@ -106,7 +111,6 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
executor: Arc<Executor>,
executor_id: String,
available_tasks_slots: Arc<AtomicUsize>,
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
Expand Down Expand Up @@ -146,7 +150,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
let _ = task_status_sender.send(as_task_status(
execution_result,
executor_id,
executor.metadata.id.clone(),
task_id,
));
});
Expand Down
24 changes: 8 additions & 16 deletions ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
Expand All @@ -31,34 +31,26 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext};

/// Ballista executor
pub struct Executor {
/// Directory for storing partial results
work_dir: String,
/// Metadata
pub metadata: ExecutorRegistration,

/// Specification like total task slots
pub specification: ExecutorSpecification,
/// Directory for storing partial results
pub work_dir: String,

/// DataFusion execution context
pub ctx: Arc<ExecutionContext>,
}

impl Executor {
/// Create a new executor instance
pub fn new(work_dir: &str, ctx: Arc<ExecutionContext>) -> Self {
Executor::new_with_specification(
work_dir,
ExecutorSpecification { task_slots: 4 },
ctx,
)
}

pub fn new_with_specification(
pub fn new(
metadata: ExecutorRegistration,
work_dir: &str,
specification: ExecutorSpecification,
ctx: Arc<ExecutionContext>,
) -> Self {
Self {
metadata,
work_dir: work_dir.to_owned(),
specification,
ctx,
}
}
Expand Down
Loading