Skip to content

Commit

Permalink
Concurrency improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv committed Sep 11, 2024
1 parent 8595553 commit e461129
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions swiftide-indexing/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use futures_util::{StreamExt, TryStreamExt};
use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};

Check failure on line 2 in swiftide-indexing/src/pipeline.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `FutureExt`

Check failure on line 2 in swiftide-indexing/src/pipeline.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `FutureExt`
use swiftide_core::{
indexing::IndexingDefaults, BatchableTransformer, ChunkerTransformer, Loader, NodeCache,
Persist, SimplePrompt, Transformer, WithBatchIndexingDefaults, WithIndexingDefaults,
};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, task};
use tracing::Instrument;

use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -187,13 +187,12 @@ impl Pipeline {
let transformer = transformer.clone();
let span = tracing::trace_span!("then", node = ?node );

async move {
tracing::debug!(?node, "Transforming node");
transformer.transform_node(node).await
}
.instrument(span)
task::spawn(async move { transformer.transform_node(node).await })
.instrument(span)
.err_into::<anyhow::Error>()
})
.try_buffer_unordered(concurrency)
.map(|x| x.and_then(|x| x))
.boxed()
.into();

Expand Down Expand Up @@ -230,14 +229,16 @@ impl Pipeline {
let transformer = Arc::clone(&transformer);
let span = tracing::trace_span!("then_in_batch", nodes = ?nodes );

async move {
tokio::spawn(async move {
tracing::debug!(num_nodes = nodes.len(), "Batch transforming nodes");
Ok(transformer.batch_transform(nodes).await)
}
transformer.batch_transform(nodes).await
})
.instrument(span)
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(concurrency) // First get the streams from each future
.try_flatten_unordered(concurrency) // Then flatten all the streams back into one
.try_flatten_unordered(None) // Then flatten all the streams back into one
.boxed()
.into();
self
Expand All @@ -262,12 +263,14 @@ impl Pipeline {
let chunker = Arc::clone(&chunker);
let span = tracing::trace_span!("then_chunk", chunker = ?chunker, node = ?node );

async move {
tokio::spawn(async move {
tracing::debug!(?node, "Chunking node");
Ok(chunker.transform_node(node).await)
}
chunker.transform_node(node).await
})
.instrument(span)
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
.boxed()
Expand Down Expand Up @@ -303,10 +306,15 @@ impl Pipeline {
let storage = Arc::clone(&storage);
let span = tracing::trace_span!("then_store_with_batched", storage = ?storage, nodes = ?nodes );

async move {
tokio::spawn(async move {
tracing::debug!(num_nodes = nodes.len(), "Batch storing nodes");
Ok(storage.batch_store(nodes).await) }.instrument(span)
storage.batch_store(nodes).await
})
.instrument(span)
.map_err(anyhow::Error::from)

})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten_unordered(self.concurrency)
.boxed().into();
Expand All @@ -318,14 +326,16 @@ impl Pipeline {
let span =
tracing::trace_span!("then_store_with", storage = ?storage, node = ?node );

async move {
tokio::spawn(async move {
tracing::debug!(?node, "Storing node");

storage.store(node).await
}
})
.err_into::<anyhow::Error>()
.instrument(span)
})
.try_buffer_unordered(self.concurrency)
.map(|x| x.and_then(|x| x))
.boxed()
.into();
}
Expand Down

0 comments on commit e461129

Please sign in to comment.