From b9bf5dc182a37c7ed9832fb586e633fb41a43a50 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 12:07:25 +0200 Subject: [PATCH 1/6] #352: BallistaContext::collect() logging is too noisy, change info messages for trace inside collect loop # Please enter the commit message for your changes. Lines starting # with '#' will be kept; you may remove them yourself if you want to. # An empty message aborts the commit. # # Date: Sun May 23 12:07:25 2021 +0200 # # On branch ballista_context_collect_info # Changes to be committed: # modified: ballista/rust/client/src/context.rs # --- ballista/rust/client/src/context.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index e26dcac256d2..9b2e5e780f10 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -39,7 +39,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; -use log::{error, info}; +use log::{error, info, trace}; #[allow(dead_code)] struct BallistaContextState { @@ -200,11 +200,11 @@ impl BallistaContext { let wait_future = tokio::time::sleep(Duration::from_millis(100)); match status { job_status::Status::Queued(_) => { - info!("Job {} still queued...", job_id); + trace!("Job {} still queued...", job_id); wait_future.await; } job_status::Status::Running(_) => { - info!("Job {} is running...", job_id); + trace!("Job {} is running...", job_id); wait_future.await; } job_status::Status::Failed(err) => { From b1642bebb65ea44d53be29031ca8d66011bf7e36 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 12:07:25 +0200 Subject: [PATCH 2/6] #352: BallistaContext::collect() logging is too noisy, change info messages for trace inside collect loop --- ballista/rust/client/src/context.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index e26dcac256d2..9b2e5e780f10 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -39,7 +39,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; -use log::{error, info}; +use log::{error, info, trace}; #[allow(dead_code)] struct BallistaContextState { @@ -200,11 +200,11 @@ impl BallistaContext { let wait_future = tokio::time::sleep(Duration::from_millis(100)); match status { job_status::Status::Queued(_) => { - info!("Job {} still queued...", job_id); + trace!("Job {} still queued...", job_id); wait_future.await; } job_status::Status::Running(_) => { - info!("Job {} is running...", job_id); + trace!("Job {} is running...", job_id); wait_future.await; } job_status::Status::Failed(err) => { From d192291fab4e60af744155f72a012767c8a4d960 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 19:58:08 +0200 Subject: [PATCH 3/6] BallistaContext::collect info or trace a job as queued/running if first time --- ballista/rust/client/src/context.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 9b2e5e780f10..02bfb92bc071 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -41,6 +41,17 @@ use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; use log::{error, info, trace}; +macro_rules! info_or_trace_if { + ($first:ident, $($arg:tt)+) => ( + if $first { + info!($($arg)+); + } else { + trace!($($arg)+); + } + $first = false; + ) +} + #[allow(dead_code)] struct BallistaContextState { /// Scheduler host @@ -186,6 +197,10 @@ impl BallistaContext { .into_inner() .job_id; + // first time we info/trace this job as queued or running + let mut first_q_it: bool = true; + let mut first_r_it: bool = true; + loop { let GetJobStatusResult { status } = scheduler .get_job_status(GetJobStatusParams { @@ -200,11 +215,11 @@ impl BallistaContext { let wait_future = tokio::time::sleep(Duration::from_millis(100)); match status { job_status::Status::Queued(_) => { - trace!("Job {} still queued...", job_id); + info_or_trace_if!(first_q_it, "Job {} still queued...", job_id); wait_future.await; } job_status::Status::Running(_) => { - trace!("Job {} is running...", job_id); + info_or_trace_if!(first_r_it, "Job {} is running...", job_id); wait_future.await; } job_status::Status::Failed(err) => { From f2700756776673900032f072f027affcc4dd2066 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Wed, 26 May 2021 22:06:46 +0200 Subject: [PATCH 4/6] BallistaContext::collect loop info message when status changed --- ballista/rust/client/src/context.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 02bfb92bc071..d793b8905388 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -42,13 +42,12 @@ use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; use log::{error, info, trace}; macro_rules! info_or_trace_if { - ($first:ident, $($arg:tt)+) => ( - if $first { + ($info:expr, $($arg:tt)+) => ( + if $info { info!($($arg)+); } else { trace!($($arg)+); } - $first = false; ) } @@ -197,9 +196,7 @@ impl BallistaContext { .into_inner() .job_id; - // first time we info/trace this job as queued or running - let mut first_q_it: bool = true; - let mut first_r_it: bool = true; + let mut prev_status: Option = None; loop { let GetJobStatusResult { status } = scheduler @@ -213,14 +210,21 @@ impl BallistaContext { DataFusionError::Internal("Received empty status message".to_owned()) })?; let wait_future = tokio::time::sleep(Duration::from_millis(100)); + let has_status_change = prev_status.map(|x| x != status).unwrap_or(false); match status { job_status::Status::Queued(_) => { - info_or_trace_if!(first_q_it, "Job {} still queued...", job_id); + info_or_trace_if!( + has_status_change, + "Job {} still queued...", + job_id + ); wait_future.await; + prev_status = Some(status); } job_status::Status::Running(_) => { - info_or_trace_if!(first_r_it, "Job {} is running...", job_id); + info_or_trace_if!(has_status_change, "Job {} is running...", job_id); wait_future.await; + prev_status = Some(status); } job_status::Status::Failed(err) => { let msg = format!("Job {} failed: {}", job_id, err.error); From 9f7c6fde5ea17380824d0e619207ff56419315b6 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Wed, 26 May 2021 22:37:45 +0200 Subject: [PATCH 5/6] BallistaContext::collect Remove unnecessary macro for trace/info --- ballista/rust/client/src/context.rs | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index d793b8905388..2d965bbb7c28 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -39,17 +39,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; -use log::{error, info, trace}; - -macro_rules! info_or_trace_if { - ($info:expr, $($arg:tt)+) => ( - if $info { - info!($($arg)+); - } else { - trace!($($arg)+); - } - ) -} +use log::{error, info}; #[allow(dead_code)] struct BallistaContextState { @@ -213,16 +203,16 @@ impl BallistaContext { let has_status_change = prev_status.map(|x| x != status).unwrap_or(false); match status { job_status::Status::Queued(_) => { - info_or_trace_if!( - has_status_change, - "Job {} still queued...", - job_id - ); + if has_status_change { + info!("Job {} still queued...", job_id); + } wait_future.await; prev_status = Some(status); } job_status::Status::Running(_) => { - info_or_trace_if!(has_status_change, "Job {} is running...", job_id); + if has_status_change { + info!("Job {} is running...", job_id); + } wait_future.await; prev_status = Some(status); } From 8dbc2c79d8d89055b5d256d53e0fab0f9d265814 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Wed, 26 May 2021 22:58:59 +0200 Subject: [PATCH 6/6] BallistaContext::collect Fix has_status_change default value (true) --- ballista/rust/client/src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 2d965bbb7c28..df97e3a22984 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -200,7 +200,7 @@ impl BallistaContext { DataFusionError::Internal("Received empty status message".to_owned()) })?; let wait_future = tokio::time::sleep(Duration::from_millis(100)); - let has_status_change = prev_status.map(|x| x != status).unwrap_or(false); + let has_status_change = prev_status.map(|x| x != status).unwrap_or(true); match status { job_status::Status::Queued(_) => { if has_status_change {