Skip to content

Commit

Permalink
agent: update integration tests for automations-based controllers
Browse files Browse the repository at this point in the history
Updates the integration tests to work with the new automations-based controllers.

Previously the integration test harness would call the controller `Hander` to
have it dequeue and run a controller. That doesn't really work with automations
since the automations server does the dequeing, so I took a different approach.
I updated the `automations::Server` to allow the test harness to control the
semaphor that's used while dequeing tasks. The harness uses a semaphor with
only a single permit, which it itself acquires before creating the server. When
the harness runs a controller, it drops and immediately re-acquires the permit,
which allows the server an opportunity to dequeue and run at most a single
task. This works because the tokio semaphor always grants permits in the order
in which they were requested.

Another notable change is that `TestHarness::run_pending_controllers` now
returns the states as they are _after_ the controller run completes. This
required a few changes to tests, but overall it seems like much more useful and
sensible behavior. Returning the state as it was before the run seemed like
more effort than it was worth, given the differences in how automations works.
  • Loading branch information
psFried committed Jan 20, 2025
1 parent 618deab commit 24577e3
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 82 deletions.
4 changes: 2 additions & 2 deletions crates/agent-sql/src/controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct ControllerJob {
pub async fn fetch_controller_job(
controller_task_id: Id,
db: impl sqlx::PgExecutor<'static>,
) -> sqlx::Result<ControllerJob> {
) -> sqlx::Result<Option<ControllerJob>> {
sqlx::query_as!(
ControllerJob,
r#"select
Expand Down Expand Up @@ -62,7 +62,7 @@ pub async fn fetch_controller_job(
where t.task_id = $1::flowid;"#,
controller_task_id as Id,
)
.fetch_one(db)
.fetch_optional(db)
.await
}

Expand Down
4 changes: 3 additions & 1 deletion crates/agent/src/controllers/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ impl<C: ControlPlane + Send + Sync + 'static> Executor for LiveSpecControllerExe
state: &'s mut Self::State,
inbox: &'s mut std::collections::VecDeque<(models::Id, Option<Self::Receive>)>,
) -> anyhow::Result<Self::Outcome> {
let controller_state = fetch_controller_state(task_id, pool).await?;
let controller_state = fetch_controller_state(task_id, pool)
.await?
.unwrap_or_else(|| panic!("failed to fetch controller state for task_id: {task_id}"));
// Note that `failures` here only counts the number of _consecutive_
// failures, and resets to 0 on any sucessful update.
let (status, failures, error, next_run) =
Expand Down
18 changes: 12 additions & 6 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,20 @@ pub struct ControllerState {
pub async fn fetch_controller_state(
controller_task_id: Id,
db: impl sqlx::PgExecutor<'static>,
) -> anyhow::Result<ControllerState> {
let job = agent_sql::controllers::fetch_controller_job(controller_task_id, db)
) -> anyhow::Result<Option<ControllerState>> {
let maybe_job = agent_sql::controllers::fetch_controller_job(controller_task_id, db)
.await
.context("fetching controller job")?;

let Some(job) = maybe_job else {
return Ok(None);
};
// TODO(phil): remove controller_next_run after legacy agents no longer need it.
if job.controller_next_run.is_some() {
anyhow::bail!("live_specs row still has legacy controller_next_run set");
};
ControllerState::parse_db_row(&job)
let state = ControllerState::parse_db_row(&job)?;
Ok(Some(state))
}

impl ControllerState {
Expand Down Expand Up @@ -405,9 +409,11 @@ mod test {

let now = Utc::now();
let then = now + chrono::Duration::seconds(60);
let duration = NextRun::after(then)
let millis = NextRun::after(then)
.with_jitter_percent(20)
.compute_duration();
assert!(duration.as_secs() >= 60);
.compute_duration()
.as_millis();
assert!(millis > 59900, "duration too small, got: {millis}ms");
assert!(millis < 72000, "duration too big, got: {millis}ms");
}
}
21 changes: 11 additions & 10 deletions crates/agent/src/integration_tests/auto_discovers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn test_auto_discovers_add_new_bindings() {
.unwrap_capture()
.auto_discover
.is_some());
assert!(capture_state.next_run.is_some());
harness.assert_controller_pending("marmots/capture").await;

let no_disco_capture_state = harness
.get_controller_state("marmots/no-auto-discover")
Expand Down Expand Up @@ -406,7 +406,7 @@ async fn test_auto_discovers_no_evolution() {
let capture_state = harness.get_controller_state("mules/capture").await;
assert!(capture_state.error.is_some());
assert_eq!(1, capture_state.failures);
assert!(capture_state.next_run.is_some());
harness.assert_controller_pending("mules/capture").await;
let capture_status = capture_state.current_status.unwrap_capture();
// Expect to see that the discover succeeded, but that the publication failed.
insta::assert_json_snapshot!(capture_status, {
Expand Down Expand Up @@ -615,19 +615,21 @@ async fn test_auto_discovers_update_only() {
.await;
assert!(result.status.is_success());

harness.run_pending_controllers(Some(6)).await;
harness.run_pending_controllers(None).await;

// Expect to see that the controller has initialized a blank auto-capture status.
let capture_state = harness.get_controller_state("pikas/capture").await;
assert!(capture_state.next_run.is_some());
harness.assert_controller_pending("pikas/capture").await;
assert!(capture_state
.current_status
.unwrap_capture()
.auto_discover
.is_some());

let disabled_state = harness.get_controller_state("pikas/disabled-capture").await;
assert!(disabled_state.next_run.is_none());
harness
.assert_controller_not_pending("pikas/disabled-capture")
.await;
assert!(disabled_state
.current_status
.unwrap_capture()
Expand All @@ -647,7 +649,9 @@ async fn test_auto_discovers_update_only() {
let ad_disabled_state = harness
.get_controller_state("pikas/capture-auto-disco-disabled")
.await;
assert!(ad_disabled_state.next_run.is_none());
harness
.assert_controller_not_pending("pikas/capture-auto-disco-disabled")
.await;
assert!(ad_disabled_state
.current_status
.unwrap_capture()
Expand Down Expand Up @@ -745,10 +749,7 @@ async fn test_auto_discovers_update_only() {
let capture_state = harness.get_controller_state("pikas/capture").await;
assert!(capture_state.error.is_some());
assert_eq!(1, capture_state.failures);
assert!(
capture_state.next_run.is_some(),
"expect error to be retried automatically"
);
harness.assert_controller_pending("pikas/capture").await;
let auto_discover = capture_state
.current_status
.unwrap_capture()
Expand Down
24 changes: 9 additions & 15 deletions crates/agent/src/integration_tests/dependencies_and_activations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,19 +364,18 @@ async fn test_dependencies_and_controllers() {
.assert_live_spec_soft_deleted("owls/hoots", del_result.pub_id)
.await;

// All the controllers ought to run now. The collection controller should run first and notfiy
// the others.
let runs = harness.run_pending_controllers(Some(1)).await;
assert_eq!("owls/hoots", &runs[0].catalog_name);
harness
.control_plane()
.assert_activations("hoots deletion", vec![("owls/hoots", None)]);
// All the controllers ought to run now. The collection controller should
// run first and notfiy the others. Note that `run_pending_controllers`
// cannot return anything when the spec is deleted since the rows will have
// been deleted from the database.
harness.run_pending_controllers(None).await;
harness.assert_live_spec_hard_deleted("owls/hoots").await;

let _ = harness.run_pending_controllers(None).await;
harness.control_plane().assert_activations(
"after hoots deleted",
vec![
("owls/hoots", None),
("owls/capture", Some(CatalogType::Capture)),
("owls/materialize", Some(CatalogType::Materialization)),
("owls/nests", Some(CatalogType::Collection)),
Expand Down Expand Up @@ -496,7 +495,7 @@ async fn test_dependencies_and_controllers() {
);
harness.control_plane().reset_activations();
let runs = harness.run_pending_controllers(None).await;
assert_controllers_ran(&["owls/capture", "owls/materialize"], runs);
assert_controllers_ran(&["owls/materialize"], runs);

harness.assert_live_spec_hard_deleted("owls/capture").await;
harness
Expand All @@ -510,7 +509,7 @@ async fn test_dependencies_and_controllers() {
.publications
.history[0];
assert_eq!("simulated build failure", &failed_pub.errors[0].detail);
assert!(materialization_state.next_run.is_some());
harness.assert_controller_pending("owls/materialize").await;

// The materialization should now successfully retry and then activate
harness.run_pending_controller("owls/materialize").await;
Expand Down Expand Up @@ -563,12 +562,7 @@ async fn test_dependencies_and_controllers() {
.assert_live_spec_soft_deleted("owls/test-test", del_result.pub_id)
.await;

let runs = harness.run_pending_controllers(None).await;
assert_eq!(
3,
runs.len(),
"expected one run of each controller, got: {runs:?}"
);
harness.run_pending_controllers(None).await;
harness
.assert_live_spec_hard_deleted("owls/materialize")
.await;
Expand Down
Loading

0 comments on commit 24577e3

Please sign in to comment.