Skip to content

Commit

Permalink
Fix pending requests in preview that could stall out the webui (#684)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jul 11, 2024
1 parent caacf38 commit 6d2806d
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
8 changes: 6 additions & 2 deletions crates/arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::convert::Infallible;
use std::{collections::HashMap, time::Duration};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt as _;
use tonic::Request;
use tonic::{Code, Request};
use tracing::info;

const PREVIEW_TTL: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -441,6 +441,7 @@ pub async fn get_job_output(

// validate that the job exists, the user has access, and the graph has a GrpcSink
query_job_by_pub_id(&pipeline_pub_id, &job_pub_id, &db, &auth_data).await?;

let pipeline = query_pipeline_by_pub_id(&pipeline_pub_id, &db, &auth_data).await?;

if !pipeline
Expand All @@ -463,7 +464,10 @@ pub async fn get_job_output(
job_id: job_pub_id.clone(),
}))
.await
.unwrap()
.map_err(|e| match e.code() {
Code::FailedPrecondition | Code::NotFound => bad_request(e.message().to_string()),
_ => log_and_map(e),
})?
.into_inner();

info!("Subscribed to output");
Expand Down
23 changes: 19 additions & 4 deletions crates/arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,28 @@ impl ControllerGrpc for ControllerServer {
&self,
request: Request<GrpcOutputSubscription>,
) -> Result<Response<Self::SubscribeToOutputStream>, Status> {
let job_id = request.into_inner().job_id;
if self
.job_state
.lock()
.await
.get(&job_id)
.ok_or_else(|| Status::not_found(format!("Job {} does not exist", job_id)))?
.state
.read()
.unwrap()
.as_str()
!= "Running"
{
return Err(Status::failed_precondition(
"Job must be running to read output",
));
}

let (tx, rx) = tokio::sync::mpsc::channel(32);

let mut data_txs = self.data_txs.lock().await;
data_txs
.entry(request.into_inner().job_id)
.or_default()
.push(tx);
data_txs.entry(job_id).or_default().push(tx);

Ok(Response::new(ReceiverStream::new(rx)))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/arroyo-controller/src/states/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ pub async fn run_to_completion(
pub struct StateMachine {
tx: Option<Sender<JobMessage>>,
pub config: Arc<RwLock<JobConfig>>,
pub state: Arc<RwLock<String>>,
metrics: Arc<tokio::sync::RwLock<HashMap<Arc<String>, JobMetrics>>>,
db: DatabaseSource,
scheduler: Arc<dyn Scheduler>,
Expand All @@ -601,6 +602,7 @@ impl StateMachine {
let mut this = Self {
tx: None,
config: Arc::new(RwLock::new(config)),
state: Arc::new(RwLock::new(status.state.clone())),
metrics,
db,
scheduler,
Expand Down Expand Up @@ -726,6 +728,7 @@ impl StateMachine {
status: JobStatus,
shutdown_guard: &ShutdownGuard,
) {
*self.state.write().unwrap() = status.state.clone();
if *self.config.read().unwrap() != config {
let update = JobMessage::ConfigUpdate(config.clone());
{
Expand Down
4 changes: 2 additions & 2 deletions webui/src/routes/pipelines/PipelineOutputs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function PipelineOutputs({
const outputSource = useRef<EventSource | undefined>(undefined);
const currentJobId = useRef<string | undefined>(undefined);
const [cols, setCols] = useState<any | undefined>(undefined);
const [rows, setRows] = useState<any[]>([]);
const [rows, _setRows] = useState<any[]>([]);
const [subscribed, setSubscribed] = useState<boolean>(false);
const rowsRead = useRef(0);
const rowsInTable = useRef(0);
Expand Down Expand Up @@ -109,7 +109,7 @@ export function PipelineOutputs({
}, []);

useEffect(() => {
if (onDemand && !subscribed) {
if ((onDemand && !subscribed) || job.state != 'Running') {
close();
return;
}
Expand Down

0 comments on commit 6d2806d

Please sign in to comment.