From 6d2806d823c8a6b4530feeb5970275cff826e8d1 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 11 Jul 2024 14:15:34 -0700 Subject: [PATCH] Fix pending requests in preview that could stall out the webui (#684) --- crates/arroyo-api/src/jobs.rs | 8 +++++-- crates/arroyo-controller/src/lib.rs | 23 +++++++++++++++---- crates/arroyo-controller/src/states/mod.rs | 3 +++ .../src/routes/pipelines/PipelineOutputs.tsx | 4 ++-- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/crates/arroyo-api/src/jobs.rs b/crates/arroyo-api/src/jobs.rs index 98d91d390..4e8707d5b 100644 --- a/crates/arroyo-api/src/jobs.rs +++ b/crates/arroyo-api/src/jobs.rs @@ -22,7 +22,7 @@ use std::convert::Infallible; use std::{collections::HashMap, time::Duration}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt as _; -use tonic::Request; +use tonic::{Code, Request}; use tracing::info; const PREVIEW_TTL: Duration = Duration::from_secs(60); @@ -441,6 +441,7 @@ pub async fn get_job_output( // validate that the job exists, the user has access, and the graph has a GrpcSink query_job_by_pub_id(&pipeline_pub_id, &job_pub_id, &db, &auth_data).await?; + let pipeline = query_pipeline_by_pub_id(&pipeline_pub_id, &db, &auth_data).await?; if !pipeline @@ -463,7 +464,10 @@ pub async fn get_job_output( job_id: job_pub_id.clone(), })) .await - .unwrap() + .map_err(|e| match e.code() { + Code::FailedPrecondition | Code::NotFound => bad_request(e.message().to_string()), + _ => log_and_map(e), + })? .into_inner(); info!("Subscribed to output"); diff --git a/crates/arroyo-controller/src/lib.rs b/crates/arroyo-controller/src/lib.rs index 17a045e34..80a3d28a1 100644 --- a/crates/arroyo-controller/src/lib.rs +++ b/crates/arroyo-controller/src/lib.rs @@ -408,13 +408,28 @@ impl ControllerGrpc for ControllerServer { &self, request: Request, ) -> Result, Status> { + let job_id = request.into_inner().job_id; + if self + .job_state + .lock() + .await + .get(&job_id) + .ok_or_else(|| Status::not_found(format!("Job {} does not exist", job_id)))? + .state + .read() + .unwrap() + .as_str() + != "Running" + { + return Err(Status::failed_precondition( + "Job must be running to read output", + )); + } + let (tx, rx) = tokio::sync::mpsc::channel(32); let mut data_txs = self.data_txs.lock().await; - data_txs - .entry(request.into_inner().job_id) - .or_default() - .push(tx); + data_txs.entry(job_id).or_default().push(tx); Ok(Response::new(ReceiverStream::new(rx))) } diff --git a/crates/arroyo-controller/src/states/mod.rs b/crates/arroyo-controller/src/states/mod.rs index 7aa3f7962..595b0435e 100644 --- a/crates/arroyo-controller/src/states/mod.rs +++ b/crates/arroyo-controller/src/states/mod.rs @@ -584,6 +584,7 @@ pub async fn run_to_completion( pub struct StateMachine { tx: Option>, pub config: Arc>, + pub state: Arc>, metrics: Arc, JobMetrics>>>, db: DatabaseSource, scheduler: Arc, @@ -601,6 +602,7 @@ impl StateMachine { let mut this = Self { tx: None, config: Arc::new(RwLock::new(config)), + state: Arc::new(RwLock::new(status.state.clone())), metrics, db, scheduler, @@ -726,6 +728,7 @@ impl StateMachine { status: JobStatus, shutdown_guard: &ShutdownGuard, ) { + *self.state.write().unwrap() = status.state.clone(); if *self.config.read().unwrap() != config { let update = JobMessage::ConfigUpdate(config.clone()); { diff --git a/webui/src/routes/pipelines/PipelineOutputs.tsx b/webui/src/routes/pipelines/PipelineOutputs.tsx index ce9276eb1..1a4fec06c 100644 --- a/webui/src/routes/pipelines/PipelineOutputs.tsx +++ b/webui/src/routes/pipelines/PipelineOutputs.tsx @@ -19,7 +19,7 @@ export function PipelineOutputs({ const outputSource = useRef(undefined); const currentJobId = useRef(undefined); const [cols, setCols] = useState(undefined); - const [rows, setRows] = useState([]); + const [rows, _setRows] = useState([]); const [subscribed, setSubscribed] = useState(false); const rowsRead = useRef(0); const rowsInTable = useRef(0); @@ -109,7 +109,7 @@ export function PipelineOutputs({ }, []); useEffect(() => { - if (onDemand && !subscribed) { + if ((onDemand && !subscribed) || job.state != 'Running') { close(); return; }