diff --git a/README.md b/README.md
index a11a6f42..6ef267bc 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ IngestionPipeline::from_loader(FileLoader::new(".").with_extensions(&["rs"]))
10..2048,
)?)
.then_in_batch(10, OpenAIEmbed::new(openai_client.clone()))
- .store_with(
+ .then_store_with(
Qdrant::try_from_url(qdrant_url)?
.batch_size(50)
.vector_size(1536)
@@ -87,6 +87,7 @@ IngestionPipeline::from_loader(FileLoader::new(".").with_extensions(&["rs"]))
- Extremely fast streaming pipeline with parallel processing
- Integrations with OpenAI, Redis, Qdrant and Treesitter
- Bring your own transformers by extending straightforward traits.
+- Store into multiple backends
- `tracing` supported
(back to top)
@@ -137,7 +138,7 @@ IngestionNodes have a path, chunk and metadata. Currently metadata is copied ove
- **then** `(impl Transformer)` transforms the node and puts it on the stream
- **then_in_batch** `(impl BatchTransformer)` transforms multiple nodes and puts them on the stream
- **then_chunk** `(impl ChunkerTransformer)` transforms a single node and emits multiple nodes
-- **store_with** `(impl Storage)` Finally stores the nodes, optionally in batches
+- **then_store_with** `(impl Storage)` stores the nodes in a storage backend, this can be chained
Additionally, several generic transformers are implemented. They take implementers of `SimplePrompt` and `Embed` to do their things.
diff --git a/examples/ingest_codebase.rs b/examples/ingest_codebase.rs
index 10c6ec4e..dc707ee7 100644
--- a/examples/ingest_codebase.rs
+++ b/examples/ingest_codebase.rs
@@ -56,7 +56,7 @@ async fn main() -> Result<(), Box> {
10..2048,
)?)
.then_in_batch(10, OpenAIEmbed::new(openai_client.clone()))
- .store_with(
+ .then_store_with(
Qdrant::try_from_url(qdrant_url)?
.batch_size(50)
.vector_size(1536)
diff --git a/swiftide/src/ingestion/ingestion_pipeline.rs b/swiftide/src/ingestion/ingestion_pipeline.rs
index a52038d2..1499bfc8 100644
--- a/swiftide/src/ingestion/ingestion_pipeline.rs
+++ b/swiftide/src/ingestion/ingestion_pipeline.rs
@@ -1,10 +1,10 @@
-use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Storage, Transformer};
+use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, Transformer};
use anyhow::Result;
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
use std::sync::Arc;
-use super::{IngestionNode, IngestionStream};
+use super::IngestionStream;
/// A pipeline for ingesting files, adding metadata, chunking, transforming, embedding, and then storing them.
///
@@ -18,7 +18,7 @@ use super::{IngestionNode, IngestionStream};
/// * `concurrency` - The level of concurrency for processing nodes.
pub struct IngestionPipeline {
stream: IngestionStream,
- storage: Option>,
+ storage: Vec>,
concurrency: usize,
}
@@ -27,7 +27,7 @@ impl Default for IngestionPipeline {
fn default() -> Self {
Self {
stream: Box::pin(futures_util::stream::empty()),
- storage: None,
+ storage: Default::default(),
concurrency: num_cpus::get(),
}
}
@@ -193,7 +193,7 @@ impl IngestionPipeline {
self
}
- /// Configures the pipeline to use the specified storage backend.
+ /// Persists ingestion nodes using the provided storage backend.
///
/// # Arguments
///
@@ -202,8 +202,41 @@ impl IngestionPipeline {
/// # Returns
///
/// An instance of `IngestionPipeline` with the configured storage backend.
- pub fn store_with(mut self, storage: impl Storage + 'static) -> Self {
- self.storage = Some(Box::new(storage));
+ pub fn then_store_with(mut self, storage: impl Persist + 'static) -> Self {
+ let storage = Arc::new(storage);
+ self.storage.push(storage.clone());
+ // add storage to the stream instead of doing it at the end
+ if storage.batch_size().is_some() {
+ self.stream = self
+ .stream
+ .try_chunks(storage.batch_size().unwrap())
+ .map_ok(move |nodes| {
+ let storage = Arc::clone(&storage);
+ let current_span = tracing::Span::current();
+ tokio::spawn(
+ current_span.in_scope(|| async move { storage.batch_store(nodes).await }),
+ )
+ .map_err(anyhow::Error::from)
+ })
+ .err_into::()
+ .try_buffer_unordered(self.concurrency)
+ .try_flatten()
+ .boxed();
+ } else {
+ self.stream = self
+ .stream
+ .map_ok(move |node| {
+ let storage = Arc::clone(&storage);
+ let current_span = tracing::Span::current();
+ tokio::spawn(current_span.in_scope(|| async move { storage.store(node).await }))
+ .map_err(anyhow::Error::from)
+ })
+ .err_into::()
+ .try_buffer_unordered(self.concurrency)
+ .map(|x| x.and_then(|x| x))
+ .boxed();
+ }
+
self
}
@@ -224,27 +257,24 @@ impl IngestionPipeline {
"Starting ingestion pipeline with {} concurrency",
self.concurrency
);
- let Some(ref storage) = self.storage else {
- anyhow::bail!("No storage configured for ingestion pipeline")
- };
+ if self.storage.is_empty() {
+ anyhow::bail!("No storage configured for ingestion pipeline");
+ }
- storage.setup().await?;
+ // Ensure all storage backends are set up before processing nodes
+ let setup_futures = self
+ .storage
+ .into_iter()
+ .map(|storage| tokio::spawn(async move { storage.setup().await }))
+ .collect::>();
+ futures_util::future::try_join_all(setup_futures).await?;
let mut total_nodes = 0;
- if let Some(batch_size) = storage.batch_size() {
- let mut stream = self.stream.chunks(batch_size).boxed();
- while let Some(nodes) = stream.next().await {
- let nodes = nodes.into_iter().collect::>>()?;
- total_nodes += nodes.len();
- storage.batch_store(nodes).await?;
- }
- } else {
- while let Some(node) = self.stream.next().await {
- total_nodes += 1;
- storage.store(node?).await?;
- }
+ while self.stream.next().await.is_some() {
+ total_nodes += 1;
}
+ tracing::warn!("Processed {} nodes", total_nodes);
tracing::Span::current().record("total_nodes", total_nodes);
Ok(())
@@ -255,6 +285,7 @@ impl IngestionPipeline {
mod tests {
use super::*;
+ use crate::ingestion::IngestionNode;
use crate::traits::*;
use futures_util::stream;
use mockall::Sequence;
@@ -266,7 +297,7 @@ mod tests {
let mut transformer = MockTransformer::new();
let mut batch_transformer = MockBatchableTransformer::new();
let mut chunker = MockChunkerTransformer::new();
- let mut storage = MockStorage::new();
+ let mut storage = MockPersist::new();
let mut seq = Sequence::new();
@@ -308,13 +339,13 @@ mod tests {
.times(3)
.in_sequence(&mut seq)
.withf(|node| node.chunk.starts_with("transformed_chunk_"))
- .returning(|_| Ok(()));
+ .returning(Ok);
let pipeline = IngestionPipeline::from_loader(loader)
.then(transformer)
.then_in_batch(1, batch_transformer)
.then_chunk(chunker)
- .store_with(storage);
+ .then_store_with(storage);
pipeline.run().await.unwrap();
}
diff --git a/swiftide/src/integrations/qdrant/persist.rs b/swiftide/src/integrations/qdrant/persist.rs
index 81cac74a..cce6d861 100644
--- a/swiftide/src/integrations/qdrant/persist.rs
+++ b/swiftide/src/integrations/qdrant/persist.rs
@@ -4,13 +4,17 @@
use anyhow::Result;
use async_trait::async_trait;
+use futures_util::{stream, StreamExt};
-use crate::traits::Storage;
+use crate::{
+ ingestion::{IngestionNode, IngestionStream},
+ traits::Persist,
+};
use super::Qdrant;
#[async_trait]
-impl Storage for Qdrant {
+impl Persist for Qdrant {
/// Returns the batch size for the Qdrant storage.
///
/// # Returns
@@ -49,16 +53,12 @@ impl Storage for Qdrant {
///
/// This function will return an error if the node conversion or storage operation fails.
#[tracing::instrument(skip_all, err, name = "storage.qdrant.store")]
- async fn store(&self, node: crate::ingestion::IngestionNode) -> Result<()> {
+ async fn store(&self, node: crate::ingestion::IngestionNode) -> Result {
+ let point = node.clone().try_into()?;
self.client
- .upsert_points_blocking(
- self.collection_name.to_string(),
- None,
- vec![node.try_into()?],
- None,
- )
+ .upsert_points_blocking(self.collection_name.to_string(), None, vec![point], None)
.await?;
- Ok(())
+ Ok(node)
}
/// Stores a batch of ingestion nodes in the Qdrant storage.
@@ -74,19 +74,28 @@ impl Storage for Qdrant {
/// # Errors
///
/// This function will return an error if any node conversion or storage operation fails.
- #[tracing::instrument(skip_all, err, name = "storage.qdrant.batch_store")]
- async fn batch_store(&self, nodes: Vec) -> Result<()> {
- self.client
- .upsert_points_blocking(
- self.collection_name.to_string(),
- None,
- nodes
- .into_iter()
- .map(TryInto::try_into)
- .collect::>>()?,
- None,
- )
- .await?;
- Ok(())
+ #[tracing::instrument(skip_all, name = "storage.qdrant.batch_store")]
+ async fn batch_store(&self, nodes: Vec) -> IngestionStream {
+ let points = nodes
+ .iter()
+ .map(|node| node.clone().try_into())
+ .collect::>>();
+
+ if points.is_err() {
+ return stream::iter(vec![Err(points.unwrap_err())]).boxed();
+ }
+
+ let points = points.unwrap();
+
+ let result = self
+ .client
+ .upsert_points_blocking(self.collection_name.to_string(), None, points, None)
+ .await;
+
+ if result.is_ok() {
+ stream::iter(nodes.into_iter().map(Ok)).boxed()
+ } else {
+ stream::iter(vec![Err(result.unwrap_err())]).boxed()
+ }
}
}
diff --git a/swiftide/src/traits.rs b/swiftide/src/traits.rs
index 54b12272..4d076e84 100644
--- a/swiftide/src/traits.rs
+++ b/swiftide/src/traits.rs
@@ -63,10 +63,10 @@ pub trait SimplePrompt: Debug + Send + Sync {
#[cfg_attr(test, automock)]
#[async_trait]
/// Persists nodes
-pub trait Storage: Send + Sync {
+pub trait Persist: Send + Sync {
async fn setup(&self) -> Result<()>;
- async fn store(&self, node: IngestionNode) -> Result<()>;
- async fn batch_store(&self, nodes: Vec) -> Result<()>;
+ async fn store(&self, node: IngestionNode) -> Result;
+ async fn batch_store(&self, nodes: Vec) -> IngestionStream;
fn batch_size(&self) -> Option {
None
}
diff --git a/swiftide/tests/ingestion_pipeline.rs b/swiftide/tests/ingestion_pipeline.rs
index f0973b48..5775c2ad 100644
--- a/swiftide/tests/ingestion_pipeline.rs
+++ b/swiftide/tests/ingestion_pipeline.rs
@@ -138,7 +138,7 @@ async fn test_ingestion_pipeline() {
integrations::redis::RedisNodeCache::try_from_url(&redis_url, "prefix").unwrap(),
)
.then_in_batch(1, transformers::OpenAIEmbed::new(openai_client.clone()))
- .store_with(
+ .then_store_with(
integrations::qdrant::Qdrant::try_from_url(qdrant_url)
.unwrap()
.vector_size(1536)