Skip to content

Commit

Permalink
fix(aws_sqs source): Pass span context to spawned tasks (#14254)
Browse files Browse the repository at this point in the history
* fix(aws_sqs source): Pass span context to spawned tasks

So that metrics are correctly tagged.

Fixes: #14252

Signed-off-by: Jesse Szwedko <[email protected]>

* Move span attachment

Signed-off-by: Jesse Szwedko <[email protected]>

Signed-off-by: Jesse Szwedko <[email protected]>
  • Loading branch information
jszwedko authored Sep 2, 2022
1 parent 3d80387 commit bd43ffb
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aws_sdk_sqs::{
use chrono::{DateTime, TimeZone, Utc};
use futures::{FutureExt, StreamExt};
use tokio::{pin, select, time::Duration};
use tracing_futures::Instrument;
use vector_common::finalizer::UnorderedFinalizer;

use crate::{
Expand Down Expand Up @@ -42,13 +43,16 @@ impl SqsSource {
let (finalizer, mut ack_stream) = Finalizer::new(shutdown.clone());
let client = self.client.clone();
let queue_url = self.queue_url.clone();
tokio::spawn(async move {
while let Some((status, receipts)) = ack_stream.next().await {
if status == BatchStatus::Delivered {
delete_messages(client.clone(), receipts, queue_url.clone()).await;
tokio::spawn(
async move {
while let Some((status, receipts)) = ack_stream.next().await {
if status == BatchStatus::Delivered {
delete_messages(client.clone(), receipts, queue_url.clone()).await;
}
}
}
});
.in_current_span(),
);
Arc::new(finalizer)
});

Expand All @@ -57,16 +61,19 @@ impl SqsSource {
let shutdown = shutdown.clone().fuse();
let mut out = out.clone();
let finalizer = finalizer.clone();
task_handles.push(tokio::spawn(async move {
let finalizer = finalizer.as_ref();
pin!(shutdown);
loop {
select! {
_ = &mut shutdown => break,
_ = source.run_once(&mut out, finalizer) => {},
task_handles.push(tokio::spawn(
async move {
let finalizer = finalizer.as_ref();
pin!(shutdown);
loop {
select! {
_ = &mut shutdown => break,
_ = source.run_once(&mut out, finalizer) => {},
}
}
}
}));
.in_current_span(),
));
}

// Wait for all of the processes to finish. If any one of them panics, we resume
Expand Down

0 comments on commit bd43ffb

Please sign in to comment.