Skip to content

Commit

Permalink
Fix race condition that could stall scheduling if operator panicked d…
Browse files Browse the repository at this point in the history
…uring setup (#712)
  • Loading branch information
mwylde committed Aug 8, 2024
1 parent c75a5b1 commit 721cb4f
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 31 deletions.
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
reqwest = "0.11.20"

# Redis
redis = { version = "0.24.0", features = ["default", "tokio-rustls-comp", "cluster-async", "connection-manager"] }
redis = { version = "0.26.0", features = ["default", "tokio-rustls-comp", "cluster-async", "connection-manager"] }

# Fluvio
fluvio = {version = "0.21", features = ["openssl"]}
Expand Down
5 changes: 2 additions & 3 deletions crates/arroyo-connectors/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod operator;

use std::collections::HashMap;

use anyhow::{anyhow, bail};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
Expand All @@ -11,6 +9,7 @@ use redis::aio::ConnectionManager;
use redis::cluster::ClusterClient;
use redis::{Client, ConnectionInfo, IntoConnectionInfo};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::oneshot::Receiver;
use typify::import_types;

Expand Down Expand Up @@ -113,7 +112,7 @@ async fn test_inner(
match &client {
RedisClient::Standard(client) => {
let mut connection = client
.get_async_connection()
.get_multiplexed_async_connection()
.await
.map_err(|e| anyhow!("Failed to connect to to Redis Cluster: {:?}", e))?;
tx.send(TestSourceMessage::info(
Expand Down
10 changes: 3 additions & 7 deletions crates/arroyo-connectors/src/redis/operator/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,7 @@ impl RedisWriter {
}

while attempts < 20 {
match self
.pipeline
.query_async::<_, ()>(&mut self.connection)
.await
{
match self.pipeline.query_async::<()>(&mut self.connection).await {
Ok(_) => {
self.pipeline.clear();
self.size_estimate = 0;
Expand All @@ -219,7 +215,7 @@ impl RedisWriter {
}

tokio::time::sleep(Duration::from_millis((50 * (1 << attempts)).min(5_000))).await;
attempts -= 1;
attempts += 1;
}

panic!("Exhausted retries writing to Redis");
Expand Down Expand Up @@ -319,7 +315,7 @@ impl ArrowOperator for RedisSinkFunc {
}

tokio::time::sleep(Duration::from_millis((50 * (1 << attempts)).min(5_000))).await;
attempts -= 1;
attempts += 1;
}

panic!("Failed to establish connection to redis after 20 retries");
Expand Down
10 changes: 9 additions & 1 deletion crates/arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,16 @@ impl ControllerGrpc for ControllerServer {
&self,
request: Request<WorkerErrorReq>,
) -> Result<Response<WorkerErrorRes>, Status> {
info!("Got worker error.");
let req = request.into_inner();

info!(
job_id = req.job_id,
operator_id = req.operator_id,
message = "operator error",
error_message = req.message,
error_details = req.details
);

let client = self.db.client().await.unwrap();
match queries::controller_queries::execute_create_job_log_message(
&client,
Expand Down
7 changes: 6 additions & 1 deletion crates/arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use arroyo_state::{

use crate::job_controller::job_metrics::JobMetrics;
use crate::{
job_controller::JobController, queries::controller_queries, states::stop_if_desired_non_running,
job_controller::JobController, queries::controller_queries,
states::stop_if_desired_non_running, RunningMessage,
};
use crate::{schedulers::SchedulerError, JobMessage};
use crate::{
Expand Down Expand Up @@ -521,6 +522,10 @@ impl State for Scheduling {
}) => {
started_tasks.insert((operator_id, operator_subtask));
}
Some(JobMessage::RunningMessage(RunningMessage::TaskFailed {worker_id, operator_id, subtask_index, reason})) => {
return Err(ctx.retryable(self, "task failed on startup",
anyhow!("task failed on job startup on {:?}: {}:{}: {}", worker_id, operator_id, subtask_index, reason), 10));
}
Some(JobMessage::ConfigUpdate(c)) => {
stop_if_desired_non_running!(self, &c);
}
Expand Down
20 changes: 20 additions & 0 deletions crates/arroyo-operator/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,4 +881,24 @@ mod tests {

assert_eq!(tx.capacity(), 8);
}

#[tokio::test]
async fn test_panic_propagation() {
let (tx, mut rx) = batch_bounded(8);

let msg = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])),
vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4]))],
)
.unwrap();

tokio::task::spawn(async move {
let f = rx.recv();
panic!("at the disco");
});

tokio::time::sleep(Duration::from_millis(100)).await;

assert!(tx.send(ArrowMessage::Data(msg)).await.is_err());
}
}
19 changes: 19 additions & 0 deletions crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ impl OperatorNode {
"Running source {}-{}",
ctx.task_info.operator_name, ctx.task_info.task_index
);

ctx.control_tx
.send(ControlResp::TaskStarted {
operator_id: ctx.task_info.operator_id.clone(),
task_index: ctx.task_info.task_index,
start_time: SystemTime::now(),
})
.await
.unwrap();

let result = s.run(ctx).await;

s.on_close(ctx).await;
Expand Down Expand Up @@ -193,6 +203,15 @@ async fn operator_run_behavior(
ctx.task_info.operator_name, ctx.task_info.task_index
);

ctx.control_tx
.send(ControlResp::TaskStarted {
operator_id: ctx.task_info.operator_id.clone(),
task_index: ctx.task_info.task_index,
start_time: SystemTime::now(),
})
.await
.unwrap();

let task_info = ctx.task_info.clone();
let name = this.name();
let mut counter = CheckpointCounter::new(in_qs.len());
Expand Down
8 changes: 0 additions & 8 deletions crates/arroyo-worker/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,14 +738,6 @@ impl Engine {

let send_copy = control_tx.clone();
tokio::spawn(async move {
send_copy
.send(ControlResp::TaskStarted {
operator_id: operator_id.clone(),
task_index,
start_time: SystemTime::now(),
})
.await
.unwrap();
if let Err(error) = join_task.await {
send_copy
.send(ControlResp::TaskFailed {
Expand Down

0 comments on commit 721cb4f

Please sign in to comment.