Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: data letter queue #73

Merged
merged 16 commits into from
Aug 13, 2024
Merged
Prev Previous commit
Next Next commit
update: tests fixed
heemankv committed Aug 8, 2024

Verified

This commit was signed with the committer’s verified signature.
heemankv Heemank Verma
commit d44bae6afb497ac4955d4caa5ba99c583b9b0354
3 changes: 2 additions & 1 deletion crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -195,7 +195,8 @@ pub async fn handle_job_failure(id: Uuid) -> Result<()> {
let mut metadata = job.metadata.clone();

if job.status == JobStatus::Completed {
return Err(eyre!("Invalid state exists on DL queue: {}", job.status.to_string()));
log::error!("Invalid state exists on DL queue: {}", job.status.to_string());
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we fail silently here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DL-queue is supposed to handle actual failed cases.
If JobStatus::Completed job is pushed to DL-queue multiple times by the queuing agent,
we prefer not stopping the orchestrator rather failing silently.

}
// We assume that a Failure status wil only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
30 changes: 12 additions & 18 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::database::build_job_item;
use crate::config::config;
use crate::jobs::handle_job_failure;
use crate::jobs::types::JobType;
use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder};
use rstest::rstest;
use std::str::FromStr;
#[cfg(test)]
@@ -25,11 +27,10 @@ use uuid::Uuid;

use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus};
use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob};
use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE};
use crate::tests::common::MessagePayloadType;
use crate::tests::config::TestConfigBuilder;

/// Tests `create_job` function when job is not existing in the db.
#[rstest]
@@ -505,7 +506,6 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i
}
}

#[cfg(test)]
impl FromStr for JobStatus {
type Err = String;

@@ -527,7 +527,6 @@ impl FromStr for JobStatus {
}
}

#[cfg(test)]
impl FromStr for JobType {
type Err = String;

@@ -593,11 +592,10 @@ async fn handle_job_failure_job_status_typical_works(#[case] job_type: JobType,

#[rstest]
// code should panic here, how can completed move to dl queue ?
#[case("DataSubmission", "Completed")]
#[case("DataSubmission")]
#[tokio::test]
async fn handle_job_failure__job_status_completed_fails(#[case] job_type: JobType, #[case] job_status: JobStatus) {
use color_eyre::eyre::eyre;

async fn handle_job_failure__job_status_completed_works(#[case] job_type: JobType) {
let job_status = JobStatus::Completed;
TestConfigBuilder::new().build().await;
let internal_id = 1;

@@ -612,16 +610,12 @@ async fn handle_job_failure__job_status_completed_fails(#[case] job_type: JobTyp
database_client.create_job(job.clone()).await.unwrap();

// calling handle_job_failure
let response = handle_job_failure(job_id).await;
handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed.");

match response {
Ok(()) => {
panic!("Test call to handle_job_failure should not have passed");
}
Err(err) => {
// Should only fail for Completed case, anything else : raise error
let expected = eyre!("Invalid state exists on DL queue: {}", job_status);
assert_eq!(err.to_string(), expected.to_string());
}
// The completed job status on db is untouched.
let job_fetched_result = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data");

if let Some(job_fetched) = job_fetched_result {
assert_eq!(job_fetched.status, JobStatus::Completed);
}
}