Skip to content

Commit

Permalink
bug: deployer drifting state (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
chesedo authored Jan 3, 2023
1 parent cb342fd commit eda4769
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
2 changes: 2 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub async fn start(
.queue_client(GatewayClient::new(args.gateway_uri))
.build();

persistence.cleanup_invalid_states().await.unwrap();

let runnable_deployments = persistence.get_all_runnable_deployments().await.unwrap();
info!(count = %runnable_deployments.len(), "enqueuing runnable deployments");
for existing_deployment in runnable_deployments {
Expand Down
116 changes: 116 additions & 0 deletions deployer/src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ impl Persistence {
.map_err(Error::from)
}

// Clean up all invalid states inside persistence
pub async fn cleanup_invalid_states(&self) -> Result<()> {
sqlx::query("UPDATE deployments SET state = ? WHERE state IN(?, ?, ?, ?)")
.bind(State::Stopped)
.bind(State::Queued)
.bind(State::Built)
.bind(State::Building)
.bind(State::Loading)
.execute(&self.pool)
.await?;

Ok(())
}

pub async fn get_or_create_service(&self, name: &str) -> Result<Service> {
if let Some(service) = self.get_service_by_name(name).await? {
Ok(service)
Expand Down Expand Up @@ -553,6 +567,108 @@ mod tests {
);
}

// Test that we are correctly cleaning up any stale / unexpected states for a deployment
// The reason this does not clean up two (or more) running states for a single deployment is because
// it should theoretically be impossible for a service to have two deployments in the running state.
// And even if a service where to have this, then the start ups of these deployments (more specifically
// the last deployment that is starting up) will stop all the deployments correctly.
#[tokio::test(flavor = "multi_thread")]
async fn cleanup_invalid_states() {
let (p, _) = Persistence::new_in_memory().await;

let service_id = add_service(&p.pool).await.unwrap();

let queued_id = Uuid::new_v4();
let building_id = Uuid::new_v4();
let built_id = Uuid::new_v4();
let loading_id = Uuid::new_v4();

let deployment_crashed = Deployment {
id: Uuid::new_v4(),
service_id,
state: State::Crashed,
last_update: Utc::now(),
address: None,
};
let deployment_stopped = Deployment {
id: Uuid::new_v4(),
service_id,
state: State::Stopped,
last_update: Utc::now(),
address: None,
};
let deployment_running = Deployment {
id: Uuid::new_v4(),
service_id,
state: State::Running,
last_update: Utc::now(),
address: Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9876)),
};
let deployment_queued = Deployment {
id: queued_id,
service_id,
state: State::Queued,
last_update: Utc::now(),
address: None,
};
let deployment_building = Deployment {
id: building_id,
service_id,
state: State::Building,
last_update: Utc::now(),
address: None,
};
let deployment_built = Deployment {
id: built_id,
service_id,
state: State::Built,
last_update: Utc::now(),
address: None,
};
let deployment_loading = Deployment {
id: loading_id,
service_id,
state: State::Loading,
last_update: Utc::now(),
address: None,
};

for deployment in [
&deployment_crashed,
&deployment_stopped,
&deployment_running,
&deployment_queued,
&deployment_built,
&deployment_building,
&deployment_loading,
] {
p.insert_deployment(deployment.clone()).await.unwrap();
}

p.cleanup_invalid_states().await.unwrap();

let actual: Vec<_> = p
.get_deployments(&service_id)
.await
.unwrap()
.into_iter()
.map(|deployment| (deployment.id, deployment.state))
.collect();
let expected = vec![
(deployment_crashed.id, State::Crashed),
(deployment_stopped.id, State::Stopped),
(deployment_running.id, State::Running),
(queued_id, State::Stopped),
(built_id, State::Stopped),
(building_id, State::Stopped),
(loading_id, State::Stopped),
];

assert_eq!(
actual, expected,
"invalid states should be moved to the stopped state"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn fetching_runnable_deployments() {
let (p, _) = Persistence::new_in_memory().await;
Expand Down

0 comments on commit eda4769

Please sign in to comment.