diff --git a/swiftide/src/ingestion/ingestion_pipeline.rs b/swiftide/src/ingestion/ingestion_pipeline.rs index d6ff5f2a..a52038d2 100644 --- a/swiftide/src/ingestion/ingestion_pipeline.rs +++ b/swiftide/src/ingestion/ingestion_pipeline.rs @@ -6,6 +6,16 @@ use std::sync::Arc; use super::{IngestionNode, IngestionStream}; +/// A pipeline for ingesting files, adding metadata, chunking, transforming, embedding, and then storing them. +/// +/// The `IngestionPipeline` struct orchestrates the entire file ingestion process. It is designed to be flexible and +/// performant, allowing for various stages of data transformation and storage to be configured and executed asynchronously. +/// +/// # Fields +/// +/// * `stream` - The stream of `IngestionNode` items to be processed. +/// * `storage` - Optional storage backend where the processed nodes will be stored. +/// * `concurrency` - The level of concurrency for processing nodes. pub struct IngestionPipeline { stream: IngestionStream, storage: Option>, @@ -13,6 +23,7 @@ pub struct IngestionPipeline { } impl Default for IngestionPipeline { + /// Creates a default `IngestionPipeline` with an empty stream, no storage, and a concurrency level equal to the number of CPUs. fn default() -> Self { Self { stream: Box::pin(futures_util::stream::empty()), @@ -22,8 +33,16 @@ impl Default for IngestionPipeline { } } -// A lazy pipeline for ingesting files, adding metadata, chunking, transforming, embedding and then storing them. impl IngestionPipeline { + /// Creates an `IngestionPipeline` from a given loader. + /// + /// # Arguments + /// + /// * `loader` - A loader that implements the `Loader` trait. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` initialized with the provided loader. pub fn from_loader(loader: impl Loader + 'static) -> Self { let stream = loader.into_stream(); Self { @@ -32,27 +51,41 @@ impl IngestionPipeline { } } + /// Sets the concurrency level for the pipeline. + /// + /// # Arguments + /// + /// * `concurrency` - The desired level of concurrency. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` with the updated concurrency level. pub fn with_concurrency(mut self, concurrency: usize) -> Self { self.concurrency = concurrency; self } + /// Filters out cached nodes using the provided cache. + /// + /// # Arguments + /// + /// * `cache` - A cache that implements the `NodeCache` trait. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` with the updated stream that filters out cached nodes. pub fn filter_cached(mut self, cache: impl NodeCache + 'static) -> Self { let cache = Arc::new(cache); self.stream = self .stream .try_filter(move |node| { let cache = Arc::clone(&cache); - // FIXME: Maybe Cow or arc instead? Lots of nodes - // Or we could get the key before the spawn let node = node.clone(); let current_span = tracing::Span::current(); tokio::spawn(current_span.in_scope(|| async move { if !cache.get(&node).await { cache.set(&node).await; - tracing::debug!("Node not in cache, passing through"); - true } else { tracing::debug!("Node in cache, skipping"); @@ -68,6 +101,15 @@ impl IngestionPipeline { self } + /// Adds a transformer to the pipeline. + /// + /// # Arguments + /// + /// * `transformer` - A transformer that implements the `Transformer` trait. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` with the updated stream that applies the transformer to each node. pub fn then(mut self, transformer: impl Transformer + 'static) -> Self { let transformer = Arc::new(transformer); self.stream = self @@ -81,13 +123,22 @@ impl IngestionPipeline { .map_err(anyhow::Error::from) }) .try_buffer_unordered(self.concurrency) - // Flatten the double result .map(|x| x.and_then(|x| x)) .boxed(); self } + /// Adds a batch transformer to the pipeline. + /// + /// # Arguments + /// + /// * `batch_size` - The size of the batches to be processed. + /// * `transformer` - A transformer that implements the `BatchableTransformer` trait. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` with the updated stream that applies the batch transformer to each batch of nodes. pub fn then_in_batch( mut self, batch_size: usize, @@ -106,7 +157,6 @@ impl IngestionPipeline { ) .map_err(anyhow::Error::from) }) - // We need to coerce both the stream error and tokio error to anyhow manually .err_into::() .try_buffer_unordered(self.concurrency) .try_flatten() @@ -114,7 +164,15 @@ impl IngestionPipeline { self } - // Takes a single node, splits it into multiple, then flattens the stream + /// Adds a chunker transformer to the pipeline. + /// + /// # Arguments + /// + /// * `chunker` - A transformer that implements the `ChunkerTransformer` trait. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` with the updated stream that applies the chunker transformer to each node. pub fn then_chunk(mut self, chunker: impl ChunkerTransformer + 'static) -> Self { let chunker = Arc::new(chunker); self.stream = self @@ -127,7 +185,6 @@ impl IngestionPipeline { ) .map_err(anyhow::Error::from) }) - // We need to coerce both the stream error and tokio error to anyhow manually .err_into::() .try_buffer_unordered(self.concurrency) .try_flatten() @@ -136,11 +193,31 @@ impl IngestionPipeline { self } + /// Configures the pipeline to use the specified storage backend. + /// + /// # Arguments + /// + /// * `storage` - A storage backend that implements the `Storage` trait. + /// + /// # Returns + /// + /// An instance of `IngestionPipeline` with the configured storage backend. pub fn store_with(mut self, storage: impl Storage + 'static) -> Self { self.storage = Some(Box::new(storage)); self } + /// Runs the ingestion pipeline. + /// + /// This method processes the stream of nodes, applying all configured transformations and storing the results. + /// + /// # Returns + /// + /// A `Result` indicating the success or failure of the pipeline execution. + /// + /// # Errors + /// + /// Returns an error if no storage backend is configured or if any stage of the pipeline fails. #[tracing::instrument(skip_all, fields(total_nodes), name = "ingestion_pipeline.run")] pub async fn run(mut self) -> Result<()> { tracing::info!( @@ -155,7 +232,6 @@ impl IngestionPipeline { let mut total_nodes = 0; if let Some(batch_size) = storage.batch_size() { - // Chunk both Ok and Err results, early return on any error let mut stream = self.stream.chunks(batch_size).boxed(); while let Some(nodes) = stream.next().await { let nodes = nodes.into_iter().collect::>>()?; @@ -183,6 +259,7 @@ mod tests { use futures_util::stream; use mockall::Sequence; + /// Tests a simple run of the ingestion pipeline. #[test_log::test(tokio::test)] async fn test_simple_run() { let mut loader = MockLoader::new();