From 7cbfc4e13745ee5a6776a97fc6db06608fae8e81 Mon Sep 17 00:00:00 2001 From: Timon Vonk Date: Sat, 22 Jun 2024 21:19:52 +0200 Subject: [PATCH] fix(ingestion_pipeline): concurrency does not work when spawned (#76) Currency does did not work as expected. When spawning via `Tokio::spawn` the future would be polled directly, and any concurrency setting would not be respected. Because it had to be removed, improved tracing for each step as well. --- swiftide/src/ingestion/ingestion_pipeline.rs | 110 ++++++++++++------- swiftide/src/traits.rs | 2 +- 2 files changed, 69 insertions(+), 43 deletions(-) diff --git a/swiftide/src/ingestion/ingestion_pipeline.rs b/swiftide/src/ingestion/ingestion_pipeline.rs index 50618fd8..fe134bf8 100644 --- a/swiftide/src/ingestion/ingestion_pipeline.rs +++ b/swiftide/src/ingestion/ingestion_pipeline.rs @@ -1,6 +1,7 @@ use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, Transformer}; use anyhow::Result; -use futures_util::{StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{StreamExt, TryStreamExt}; +use tracing::Instrument; use std::sync::Arc; @@ -80,18 +81,19 @@ impl IngestionPipeline { .stream .try_filter_map(move |node| { let cache = Arc::clone(&cache); - let current_span = tracing::Span::current(); - tokio::spawn(current_span.in_scope(|| async move { + let span = + tracing::trace_span!("filter_cached", node_cache = ?cache, node = ?node ); + async move { if !cache.get(&node).await { cache.set(&node).await; tracing::debug!("Node not in cache, passing through"); - Some(node) + Ok(Some(node)) } else { tracing::debug!("Node in cache, skipping"); - None + Ok(None) } - })) - .map_err(anyhow::Error::from) + } + .instrument(span) }) .boxed(); self @@ -107,20 +109,17 @@ impl IngestionPipeline { /// /// An instance of `IngestionPipeline` with the updated stream that applies the transformer to each node. pub fn then(mut self, transformer: impl Transformer + 'static) -> Self { - let transformer = Arc::new(transformer); let concurrency = transformer.concurrency().unwrap_or(self.concurrency); + let transformer = Arc::new(transformer); self.stream = self .stream .map_ok(move |node| { - let transformer = Arc::clone(&transformer); - let current_span = tracing::Span::current(); - tokio::spawn( - current_span.in_scope(|| async move { transformer.transform_node(node).await }), - ) - .map_err(anyhow::Error::from) + let transformer = transformer.clone(); + let span = tracing::trace_span!("then", transformer = ?transformer, node = ?node ); + + async move { transformer.transform_node(node).await }.instrument(span) }) .try_buffer_unordered(concurrency) - .map(|x| x.and_then(|x| x)) .boxed(); self @@ -146,18 +145,15 @@ impl IngestionPipeline { self.stream = self .stream .try_chunks(batch_size) - .map_ok(move |chunks| { + .map_ok(move |nodes| { let transformer = Arc::clone(&transformer); - let current_span = tracing::Span::current(); - tokio::spawn( - current_span - .in_scope(|| async move { transformer.batch_transform(chunks).await }), - ) - .map_err(anyhow::Error::from) + let span = + tracing::trace_span!("then_in_batch", batchable_transformer = ?transformer, nodes = ?nodes ); + + async move { Ok(transformer.batch_transform(nodes).await) }.instrument(span) }) - .err_into::() - .try_buffer_unordered(concurrency) - .try_flatten_unordered(concurrency) + .try_buffer_unordered(concurrency) // First get the streams from each future + .try_flatten_unordered(concurrency) // Then flatten all the streams back into one .boxed(); self } @@ -178,11 +174,9 @@ impl IngestionPipeline { .stream .map_ok(move |node| { let chunker = Arc::clone(&chunker); - let current_span = tracing::Span::current(); - tokio::spawn( - current_span.in_scope(|| async move { chunker.transform_node(node).await }), - ) - .map_err(anyhow::Error::from) + let span = tracing::trace_span!("then_chunk", chunker = ?chunker, node = ?node ); + + async move { Ok(chunker.transform_node(node).await) }.instrument(span) }) .try_buffer_unordered(concurrency) .try_flatten_unordered(concurrency) @@ -210,13 +204,10 @@ impl IngestionPipeline { .try_chunks(storage.batch_size().unwrap()) .map_ok(move |nodes| { let storage = Arc::clone(&storage); - let current_span = tracing::Span::current(); - tokio::spawn( - current_span.in_scope(|| async move { storage.batch_store(nodes).await }), - ) - .map_err(anyhow::Error::from) + let span = tracing::trace_span!("then_store_with_batched", storage = ?storage, nodes = ?nodes ); + + async move { Ok(storage.batch_store(nodes).await) }.instrument(span) }) - .err_into::() .try_buffer_unordered(self.concurrency) .try_flatten_unordered(self.concurrency) .boxed(); @@ -225,13 +216,12 @@ impl IngestionPipeline { .stream .map_ok(move |node| { let storage = Arc::clone(&storage); - let current_span = tracing::Span::current(); - tokio::spawn(current_span.in_scope(|| async move { storage.store(node).await })) - .map_err(anyhow::Error::from) + let span = + tracing::trace_span!("then_store_with", storage = ?storage, node = ?node ); + + async move { storage.store(node).await }.instrument(span) }) - .err_into::() .try_buffer_unordered(self.concurrency) - .map(|x| x.and_then(|x| x)) .boxed(); } @@ -313,7 +303,7 @@ impl IngestionPipeline { let setup_futures = self .storage .into_iter() - .map(|storage| tokio::spawn(async move { storage.setup().await })) + .map(|storage| async move { storage.setup().await }) .collect::>(); futures_util::future::try_join_all(setup_futures).await?; @@ -425,4 +415,40 @@ mod tests { .filter_errors(); pipeline.run().await.unwrap(); } + + #[tokio::test] + async fn test_concurrent_calls_with_simple_transformer() { + let mut loader = MockLoader::new(); + let mut transformer = MockTransformer::new(); + let mut storage = MockPersist::new(); + let mut seq = Sequence::new(); + loader + .expect_into_stream() + .times(1) + .in_sequence(&mut seq) + .returning(|| { + Box::pin(stream::iter(vec![ + Ok(IngestionNode::default()), + Ok(IngestionNode::default()), + Ok(IngestionNode::default()), + ])) + }); + transformer + .expect_transform_node() + .times(3) + .in_sequence(&mut seq) + .returning(|mut node| { + node.chunk = "transformed".to_string(); + Ok(node) + }); + transformer.expect_concurrency().returning(|| Some(3)); + storage.expect_setup().returning(|| Ok(())); + storage.expect_batch_size().returning(|| None); + storage.expect_store().times(3).returning(Ok); + + let pipeline = IngestionPipeline::from_loader(loader) + .then(transformer) + .then_store_with(storage); + pipeline.run().await.unwrap(); + } } diff --git a/swiftide/src/traits.rs b/swiftide/src/traits.rs index 7ce5af11..f45337ab 100644 --- a/swiftide/src/traits.rs +++ b/swiftide/src/traits.rs @@ -89,7 +89,7 @@ pub trait SimplePrompt: Debug + Send + Sync { #[cfg_attr(test, automock)] #[async_trait] /// Persists nodes -pub trait Persist: Send + Sync { +pub trait Persist: Debug + Send + Sync { async fn setup(&self) -> Result<()>; async fn store(&self, node: IngestionNode) -> Result; async fn batch_store(&self, nodes: Vec) -> IngestionStream;