Skip to content

Commit

Permalink
docs(swiftide): documented file swiftide/src/ingestion/ingestion_pipe…
Browse files Browse the repository at this point in the history
…line.rs (#14)

Co-authored-by: bosun-ai[bot] <157630444+bosun-ai[bot]@users.noreply.github.com>
Co-authored-by: Timon Vonk <[email protected]>
  • Loading branch information
bosun-ai[bot] and timonv authored Jun 13, 2024
1 parent 9ec93be commit 95a6200
Showing 1 changed file with 87 additions and 10 deletions.
97 changes: 87 additions & 10 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,24 @@ use std::sync::Arc;

use super::{IngestionNode, IngestionStream};

/// A pipeline for ingesting files, adding metadata, chunking, transforming, embedding, and then storing them.
///
/// The `IngestionPipeline` struct orchestrates the entire file ingestion process. It is designed to be flexible and
/// performant, allowing for various stages of data transformation and storage to be configured and executed asynchronously.
///
/// # Fields
///
/// * `stream` - The stream of `IngestionNode` items to be processed.
/// * `storage` - Optional storage backend where the processed nodes will be stored.
/// * `concurrency` - The level of concurrency for processing nodes.
pub struct IngestionPipeline {
stream: IngestionStream,
storage: Option<Box<dyn Storage>>,
concurrency: usize,
}

impl Default for IngestionPipeline {
/// Creates a default `IngestionPipeline` with an empty stream, no storage, and a concurrency level equal to the number of CPUs.
fn default() -> Self {
Self {
stream: Box::pin(futures_util::stream::empty()),
Expand All @@ -22,8 +33,16 @@ impl Default for IngestionPipeline {
}
}

// A lazy pipeline for ingesting files, adding metadata, chunking, transforming, embedding and then storing them.
impl IngestionPipeline {
/// Creates an `IngestionPipeline` from a given loader.
///
/// # Arguments
///
/// * `loader` - A loader that implements the `Loader` trait.
///
/// # Returns
///
/// An instance of `IngestionPipeline` initialized with the provided loader.
pub fn from_loader(loader: impl Loader + 'static) -> Self {
let stream = loader.into_stream();
Self {
Expand All @@ -32,27 +51,41 @@ impl IngestionPipeline {
}
}

/// Sets the concurrency level for the pipeline.
///
/// # Arguments
///
/// * `concurrency` - The desired level of concurrency.
///
/// # Returns
///
/// An instance of `IngestionPipeline` with the updated concurrency level.
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}

/// Filters out cached nodes using the provided cache.
///
/// # Arguments
///
/// * `cache` - A cache that implements the `NodeCache` trait.
///
/// # Returns
///
/// An instance of `IngestionPipeline` with the updated stream that filters out cached nodes.
pub fn filter_cached(mut self, cache: impl NodeCache + 'static) -> Self {
let cache = Arc::new(cache);
self.stream = self
.stream
.try_filter(move |node| {
let cache = Arc::clone(&cache);
// FIXME: Maybe Cow or arc instead? Lots of nodes
// Or we could get the key before the spawn
let node = node.clone();
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move {
if !cache.get(&node).await {
cache.set(&node).await;

tracing::debug!("Node not in cache, passing through");

true
} else {
tracing::debug!("Node in cache, skipping");
Expand All @@ -68,6 +101,15 @@ impl IngestionPipeline {
self
}

/// Adds a transformer to the pipeline.
///
/// # Arguments
///
/// * `transformer` - A transformer that implements the `Transformer` trait.
///
/// # Returns
///
/// An instance of `IngestionPipeline` with the updated stream that applies the transformer to each node.
pub fn then(mut self, transformer: impl Transformer + 'static) -> Self {
let transformer = Arc::new(transformer);
self.stream = self
Expand All @@ -81,13 +123,22 @@ impl IngestionPipeline {
.map_err(anyhow::Error::from)
})
.try_buffer_unordered(self.concurrency)
// Flatten the double result
.map(|x| x.and_then(|x| x))
.boxed();

self
}

/// Adds a batch transformer to the pipeline.
///
/// # Arguments
///
/// * `batch_size` - The size of the batches to be processed.
/// * `transformer` - A transformer that implements the `BatchableTransformer` trait.
///
/// # Returns
///
/// An instance of `IngestionPipeline` with the updated stream that applies the batch transformer to each batch of nodes.
pub fn then_in_batch(
mut self,
batch_size: usize,
Expand All @@ -106,15 +157,22 @@ impl IngestionPipeline {
)
.map_err(anyhow::Error::from)
})
// We need to coerce both the stream error and tokio error to anyhow manually
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten()
.boxed();
self
}

// Takes a single node, splits it into multiple, then flattens the stream
/// Adds a chunker transformer to the pipeline.
///
/// # Arguments
///
/// * `chunker` - A transformer that implements the `ChunkerTransformer` trait.
///
/// # Returns
///
/// An instance of `IngestionPipeline` with the updated stream that applies the chunker transformer to each node.
pub fn then_chunk(mut self, chunker: impl ChunkerTransformer + 'static) -> Self {
let chunker = Arc::new(chunker);
self.stream = self
Expand All @@ -127,7 +185,6 @@ impl IngestionPipeline {
)
.map_err(anyhow::Error::from)
})
// We need to coerce both the stream error and tokio error to anyhow manually
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten()
Expand All @@ -136,11 +193,31 @@ impl IngestionPipeline {
self
}

/// Configures the pipeline to use the specified storage backend.
///
/// # Arguments
///
/// * `storage` - A storage backend that implements the `Storage` trait.
///
/// # Returns
///
/// An instance of `IngestionPipeline` with the configured storage backend.
pub fn store_with(mut self, storage: impl Storage + 'static) -> Self {
self.storage = Some(Box::new(storage));
self
}

/// Runs the ingestion pipeline.
///
/// This method processes the stream of nodes, applying all configured transformations and storing the results.
///
/// # Returns
///
/// A `Result` indicating the success or failure of the pipeline execution.
///
/// # Errors
///
/// Returns an error if no storage backend is configured or if any stage of the pipeline fails.
#[tracing::instrument(skip_all, fields(total_nodes), name = "ingestion_pipeline.run")]
pub async fn run(mut self) -> Result<()> {
tracing::info!(
Expand All @@ -155,7 +232,6 @@ impl IngestionPipeline {

let mut total_nodes = 0;
if let Some(batch_size) = storage.batch_size() {
// Chunk both Ok and Err results, early return on any error
let mut stream = self.stream.chunks(batch_size).boxed();
while let Some(nodes) = stream.next().await {
let nodes = nodes.into_iter().collect::<Result<Vec<IngestionNode>>>()?;
Expand Down Expand Up @@ -183,6 +259,7 @@ mod tests {
use futures_util::stream;
use mockall::Sequence;

/// Tests a simple run of the ingestion pipeline.
#[test_log::test(tokio::test)]
async fn test_simple_run() {
let mut loader = MockLoader::new();
Expand Down

0 comments on commit 95a6200

Please sign in to comment.