diff --git a/swiftide-indexing/src/pipeline.rs b/swiftide-indexing/src/pipeline.rs index 8e127305..ea22865a 100644 --- a/swiftide-indexing/src/pipeline.rs +++ b/swiftide-indexing/src/pipeline.rs @@ -1,10 +1,10 @@ use anyhow::Result; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::{StreamExt, TryFutureExt, TryStreamExt}; use swiftide_core::{ indexing::IndexingDefaults, BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, SimplePrompt, Transformer, WithBatchIndexingDefaults, WithIndexingDefaults, }; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; use tracing::Instrument; use std::{sync::Arc, time::Duration}; @@ -187,13 +187,12 @@ impl Pipeline { let transformer = transformer.clone(); let span = tracing::trace_span!("then", node = ?node ); - async move { - tracing::debug!(?node, "Transforming node"); - transformer.transform_node(node).await - } - .instrument(span) + task::spawn(async move { transformer.transform_node(node).await }) + .instrument(span) + .err_into::() }) .try_buffer_unordered(concurrency) + .map(|x| x.and_then(|x| x)) .boxed() .into(); @@ -230,14 +229,16 @@ impl Pipeline { let transformer = Arc::clone(&transformer); let span = tracing::trace_span!("then_in_batch", nodes = ?nodes ); - async move { + tokio::spawn(async move { tracing::debug!(num_nodes = nodes.len(), "Batch transforming nodes"); - Ok(transformer.batch_transform(nodes).await) - } + transformer.batch_transform(nodes).await + }) .instrument(span) + .map_err(anyhow::Error::from) }) + .err_into::() .try_buffer_unordered(concurrency) // First get the streams from each future - .try_flatten_unordered(concurrency) // Then flatten all the streams back into one + .try_flatten_unordered(None) // Then flatten all the streams back into one .boxed() .into(); self @@ -262,14 +263,16 @@ impl Pipeline { let chunker = Arc::clone(&chunker); let span = tracing::trace_span!("then_chunk", chunker = ?chunker, node = ?node ); - async move { + tokio::spawn(async move { tracing::debug!(?node, "Chunking node"); - Ok(chunker.transform_node(node).await) - } + chunker.transform_node(node).await + }) .instrument(span) + .map_err(anyhow::Error::from) }) + .err_into::() .try_buffer_unordered(concurrency) - .try_flatten_unordered(concurrency) + .try_flatten_unordered(None) .boxed() .into(); @@ -303,12 +306,17 @@ impl Pipeline { let storage = Arc::clone(&storage); let span = tracing::trace_span!("then_store_with_batched", storage = ?storage, nodes = ?nodes ); - async move { + tokio::spawn(async move { tracing::debug!(num_nodes = nodes.len(), "Batch storing nodes"); - Ok(storage.batch_store(nodes).await) }.instrument(span) + storage.batch_store(nodes).await + }) + .instrument(span) + .map_err(anyhow::Error::from) + }) + .err_into::() .try_buffer_unordered(self.concurrency) - .try_flatten_unordered(self.concurrency) + .try_flatten_unordered(None) .boxed().into(); } else { self.stream = self @@ -318,14 +326,16 @@ impl Pipeline { let span = tracing::trace_span!("then_store_with", storage = ?storage, node = ?node ); - async move { + tokio::spawn(async move { tracing::debug!(?node, "Storing node"); storage.store(node).await - } + }) + .err_into::() .instrument(span) }) .try_buffer_unordered(self.concurrency) + .map(|x| x.and_then(|x| x)) .boxed() .into(); }