diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index d1770f0d3495a..6a5ca487fb361 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -7,6 +7,7 @@ use aws_sdk_sqs::{ use chrono::{DateTime, TimeZone, Utc}; use futures::{FutureExt, StreamExt}; use tokio::{pin, select, time::Duration}; +use tracing_futures::Instrument; use vector_common::finalizer::UnorderedFinalizer; use crate::{ @@ -42,13 +43,16 @@ impl SqsSource { let (finalizer, mut ack_stream) = Finalizer::new(shutdown.clone()); let client = self.client.clone(); let queue_url = self.queue_url.clone(); - tokio::spawn(async move { - while let Some((status, receipts)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - delete_messages(client.clone(), receipts, queue_url.clone()).await; + tokio::spawn( + async move { + while let Some((status, receipts)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + delete_messages(client.clone(), receipts, queue_url.clone()).await; + } } } - }); + .in_current_span(), + ); Arc::new(finalizer) }); @@ -57,16 +61,19 @@ impl SqsSource { let shutdown = shutdown.clone().fuse(); let mut out = out.clone(); let finalizer = finalizer.clone(); - task_handles.push(tokio::spawn(async move { - let finalizer = finalizer.as_ref(); - pin!(shutdown); - loop { - select! { - _ = &mut shutdown => break, - _ = source.run_once(&mut out, finalizer) => {}, + task_handles.push(tokio::spawn( + async move { + let finalizer = finalizer.as_ref(); + pin!(shutdown); + loop { + select! { + _ = &mut shutdown => break, + _ = source.run_once(&mut out, finalizer) => {}, + } } } - })); + .in_current_span(), + )); } // Wait for all of the processes to finish. If any one of them panics, we resume