Skip to content

Commit

Permalink
fix(ingestion_pipeline): concurrency does not work when spawned (#76)
Browse files Browse the repository at this point in the history
Currency does did not work as expected. When spawning via `Tokio::spawn`
the future would be polled directly, and any concurrency setting would
not be respected. Because it had to be removed, improved tracing for
each step as well.
  • Loading branch information
timonv authored Jun 22, 2024
1 parent f6656be commit 7cbfc4e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 43 deletions.
110 changes: 68 additions & 42 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, Transformer};
use anyhow::Result;
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
use futures_util::{StreamExt, TryStreamExt};
use tracing::Instrument;

use std::sync::Arc;

Expand Down Expand Up @@ -80,18 +81,19 @@ impl IngestionPipeline {
.stream
.try_filter_map(move |node| {
let cache = Arc::clone(&cache);
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move {
let span =
tracing::trace_span!("filter_cached", node_cache = ?cache, node = ?node );
async move {
if !cache.get(&node).await {
cache.set(&node).await;
tracing::debug!("Node not in cache, passing through");
Some(node)
Ok(Some(node))
} else {
tracing::debug!("Node in cache, skipping");
None
Ok(None)
}
}))
.map_err(anyhow::Error::from)
}
.instrument(span)
})
.boxed();
self
Expand All @@ -107,20 +109,17 @@ impl IngestionPipeline {
///
/// An instance of `IngestionPipeline` with the updated stream that applies the transformer to each node.
pub fn then(mut self, transformer: impl Transformer + 'static) -> Self {
let transformer = Arc::new(transformer);
let concurrency = transformer.concurrency().unwrap_or(self.concurrency);
let transformer = Arc::new(transformer);
self.stream = self
.stream
.map_ok(move |node| {
let transformer = Arc::clone(&transformer);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { transformer.transform_node(node).await }),
)
.map_err(anyhow::Error::from)
let transformer = transformer.clone();
let span = tracing::trace_span!("then", transformer = ?transformer, node = ?node );

async move { transformer.transform_node(node).await }.instrument(span)
})
.try_buffer_unordered(concurrency)
.map(|x| x.and_then(|x| x))
.boxed();

self
Expand All @@ -146,18 +145,15 @@ impl IngestionPipeline {
self.stream = self
.stream
.try_chunks(batch_size)
.map_ok(move |chunks| {
.map_ok(move |nodes| {
let transformer = Arc::clone(&transformer);
let current_span = tracing::Span::current();
tokio::spawn(
current_span
.in_scope(|| async move { transformer.batch_transform(chunks).await }),
)
.map_err(anyhow::Error::from)
let span =
tracing::trace_span!("then_in_batch", batchable_transformer = ?transformer, nodes = ?nodes );

async move { Ok(transformer.batch_transform(nodes).await) }.instrument(span)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
.try_buffer_unordered(concurrency) // First get the streams from each future
.try_flatten_unordered(concurrency) // Then flatten all the streams back into one
.boxed();
self
}
Expand All @@ -178,11 +174,9 @@ impl IngestionPipeline {
.stream
.map_ok(move |node| {
let chunker = Arc::clone(&chunker);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { chunker.transform_node(node).await }),
)
.map_err(anyhow::Error::from)
let span = tracing::trace_span!("then_chunk", chunker = ?chunker, node = ?node );

async move { Ok(chunker.transform_node(node).await) }.instrument(span)
})
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
Expand Down Expand Up @@ -210,13 +204,10 @@ impl IngestionPipeline {
.try_chunks(storage.batch_size().unwrap())
.map_ok(move |nodes| {
let storage = Arc::clone(&storage);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { storage.batch_store(nodes).await }),
)
.map_err(anyhow::Error::from)
let span = tracing::trace_span!("then_store_with_batched", storage = ?storage, nodes = ?nodes );

async move { Ok(storage.batch_store(nodes).await) }.instrument(span)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten_unordered(self.concurrency)
.boxed();
Expand All @@ -225,13 +216,12 @@ impl IngestionPipeline {
.stream
.map_ok(move |node| {
let storage = Arc::clone(&storage);
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move { storage.store(node).await }))
.map_err(anyhow::Error::from)
let span =
tracing::trace_span!("then_store_with", storage = ?storage, node = ?node );

async move { storage.store(node).await }.instrument(span)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.map(|x| x.and_then(|x| x))
.boxed();
}

Expand Down Expand Up @@ -313,7 +303,7 @@ impl IngestionPipeline {
let setup_futures = self
.storage
.into_iter()
.map(|storage| tokio::spawn(async move { storage.setup().await }))
.map(|storage| async move { storage.setup().await })
.collect::<Vec<_>>();
futures_util::future::try_join_all(setup_futures).await?;

Expand Down Expand Up @@ -425,4 +415,40 @@ mod tests {
.filter_errors();
pipeline.run().await.unwrap();
}

#[tokio::test]
async fn test_concurrent_calls_with_simple_transformer() {
let mut loader = MockLoader::new();
let mut transformer = MockTransformer::new();
let mut storage = MockPersist::new();
let mut seq = Sequence::new();
loader
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| {
Box::pin(stream::iter(vec![
Ok(IngestionNode::default()),
Ok(IngestionNode::default()),
Ok(IngestionNode::default()),
]))
});
transformer
.expect_transform_node()
.times(3)
.in_sequence(&mut seq)
.returning(|mut node| {
node.chunk = "transformed".to_string();
Ok(node)
});
transformer.expect_concurrency().returning(|| Some(3));
storage.expect_setup().returning(|| Ok(()));
storage.expect_batch_size().returning(|| None);
storage.expect_store().times(3).returning(Ok);

let pipeline = IngestionPipeline::from_loader(loader)
.then(transformer)
.then_store_with(storage);
pipeline.run().await.unwrap();
}
}
2 changes: 1 addition & 1 deletion swiftide/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub trait SimplePrompt: Debug + Send + Sync {
#[cfg_attr(test, automock)]
#[async_trait]
/// Persists nodes
pub trait Persist: Send + Sync {
pub trait Persist: Debug + Send + Sync {
async fn setup(&self) -> Result<()>;
async fn store(&self, node: IngestionNode) -> Result<IngestionNode>;
async fn batch_store(&self, nodes: Vec<IngestionNode>) -> IngestionStream;
Expand Down

0 comments on commit 7cbfc4e

Please sign in to comment.