Skip to content

Commit

Permalink
feat(ingestion_pipeline)!: support chained storage backends (#46)
Browse files Browse the repository at this point in the history
Pipeline now supports multiple storage backends. This makes the order of adding storage important. Changed the name of the method to reflect that.
timonv authored Jun 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 473e60e commit 745b8ed
Showing 6 changed files with 98 additions and 57 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ IngestionPipeline::from_loader(FileLoader::new(".").with_extensions(&["rs"]))
10..2048,
)?)
.then_in_batch(10, OpenAIEmbed::new(openai_client.clone()))
.store_with(
.then_store_with(
Qdrant::try_from_url(qdrant_url)?
.batch_size(50)
.vector_size(1536)
@@ -87,6 +87,7 @@ IngestionPipeline::from_loader(FileLoader::new(".").with_extensions(&["rs"]))
- Extremely fast streaming pipeline with parallel processing
- Integrations with OpenAI, Redis, Qdrant and Treesitter
- Bring your own transformers by extending straightforward traits.
- Store into multiple backends
- `tracing` supported

<p align="right">(<a href="#readme-top">back to top</a>)</p>
@@ -137,7 +138,7 @@ IngestionNodes have a path, chunk and metadata. Currently metadata is copied ove
- **then** `(impl Transformer)` transforms the node and puts it on the stream
- **then_in_batch** `(impl BatchTransformer)` transforms multiple nodes and puts them on the stream
- **then_chunk** `(impl ChunkerTransformer)` transforms a single node and emits multiple nodes
- **store_with** `(impl Storage)` Finally stores the nodes, optionally in batches
- **then_store_with** `(impl Storage)` stores the nodes in a storage backend, this can be chained

Additionally, several generic transformers are implemented. They take implementers of `SimplePrompt` and `Embed` to do their things.

2 changes: 1 addition & 1 deletion examples/ingest_codebase.rs
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
10..2048,
)?)
.then_in_batch(10, OpenAIEmbed::new(openai_client.clone()))
.store_with(
.then_store_with(
Qdrant::try_from_url(qdrant_url)?
.batch_size(50)
.vector_size(1536)
83 changes: 57 additions & 26 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Storage, Transformer};
use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, Transformer};
use anyhow::Result;
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};

use std::sync::Arc;

use super::{IngestionNode, IngestionStream};
use super::IngestionStream;

/// A pipeline for ingesting files, adding metadata, chunking, transforming, embedding, and then storing them.
///
@@ -18,7 +18,7 @@ use super::{IngestionNode, IngestionStream};
/// * `concurrency` - The level of concurrency for processing nodes.
pub struct IngestionPipeline {
stream: IngestionStream,
storage: Option<Box<dyn Storage>>,
storage: Vec<Arc<dyn Persist>>,
concurrency: usize,
}

@@ -27,7 +27,7 @@ impl Default for IngestionPipeline {
fn default() -> Self {
Self {
stream: Box::pin(futures_util::stream::empty()),
storage: None,
storage: Default::default(),
concurrency: num_cpus::get(),
}
}
@@ -193,7 +193,7 @@ impl IngestionPipeline {
self
}

/// Configures the pipeline to use the specified storage backend.
/// Persists ingestion nodes using the provided storage backend.
///
/// # Arguments
///
@@ -202,8 +202,41 @@ impl IngestionPipeline {
/// # Returns
///
/// An instance of `IngestionPipeline` with the configured storage backend.
pub fn store_with(mut self, storage: impl Storage + 'static) -> Self {
self.storage = Some(Box::new(storage));
pub fn then_store_with(mut self, storage: impl Persist + 'static) -> Self {
let storage = Arc::new(storage);
self.storage.push(storage.clone());
// add storage to the stream instead of doing it at the end
if storage.batch_size().is_some() {
self.stream = self
.stream
.try_chunks(storage.batch_size().unwrap())
.map_ok(move |nodes| {
let storage = Arc::clone(&storage);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { storage.batch_store(nodes).await }),
)
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten()
.boxed();
} else {
self.stream = self
.stream
.map_ok(move |node| {
let storage = Arc::clone(&storage);
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move { storage.store(node).await }))
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.map(|x| x.and_then(|x| x))
.boxed();
}

self
}

@@ -224,27 +257,24 @@ impl IngestionPipeline {
"Starting ingestion pipeline with {} concurrency",
self.concurrency
);
let Some(ref storage) = self.storage else {
anyhow::bail!("No storage configured for ingestion pipeline")
};
if self.storage.is_empty() {
anyhow::bail!("No storage configured for ingestion pipeline");
}

storage.setup().await?;
// Ensure all storage backends are set up before processing nodes
let setup_futures = self
.storage
.into_iter()
.map(|storage| tokio::spawn(async move { storage.setup().await }))
.collect::<Vec<_>>();
futures_util::future::try_join_all(setup_futures).await?;

let mut total_nodes = 0;
if let Some(batch_size) = storage.batch_size() {
let mut stream = self.stream.chunks(batch_size).boxed();
while let Some(nodes) = stream.next().await {
let nodes = nodes.into_iter().collect::<Result<Vec<IngestionNode>>>()?;
total_nodes += nodes.len();
storage.batch_store(nodes).await?;
}
} else {
while let Some(node) = self.stream.next().await {
total_nodes += 1;
storage.store(node?).await?;
}
while self.stream.next().await.is_some() {
total_nodes += 1;
}

tracing::warn!("Processed {} nodes", total_nodes);
tracing::Span::current().record("total_nodes", total_nodes);

Ok(())
@@ -255,6 +285,7 @@ impl IngestionPipeline {
mod tests {

use super::*;
use crate::ingestion::IngestionNode;
use crate::traits::*;
use futures_util::stream;
use mockall::Sequence;
@@ -266,7 +297,7 @@ mod tests {
let mut transformer = MockTransformer::new();
let mut batch_transformer = MockBatchableTransformer::new();
let mut chunker = MockChunkerTransformer::new();
let mut storage = MockStorage::new();
let mut storage = MockPersist::new();

let mut seq = Sequence::new();

@@ -308,13 +339,13 @@ mod tests {
.times(3)
.in_sequence(&mut seq)
.withf(|node| node.chunk.starts_with("transformed_chunk_"))
.returning(|_| Ok(()));
.returning(Ok);

let pipeline = IngestionPipeline::from_loader(loader)
.then(transformer)
.then_in_batch(1, batch_transformer)
.then_chunk(chunker)
.store_with(storage);
.then_store_with(storage);

pipeline.run().await.unwrap();
}
57 changes: 33 additions & 24 deletions swiftide/src/integrations/qdrant/persist.rs
Original file line number Diff line number Diff line change
@@ -4,13 +4,17 @@
use anyhow::Result;
use async_trait::async_trait;
use futures_util::{stream, StreamExt};

use crate::traits::Storage;
use crate::{
ingestion::{IngestionNode, IngestionStream},
traits::Persist,
};

use super::Qdrant;

#[async_trait]
impl Storage for Qdrant {
impl Persist for Qdrant {
/// Returns the batch size for the Qdrant storage.
///
/// # Returns
@@ -49,16 +53,12 @@ impl Storage for Qdrant {
///
/// This function will return an error if the node conversion or storage operation fails.
#[tracing::instrument(skip_all, err, name = "storage.qdrant.store")]
async fn store(&self, node: crate::ingestion::IngestionNode) -> Result<()> {
async fn store(&self, node: crate::ingestion::IngestionNode) -> Result<IngestionNode> {
let point = node.clone().try_into()?;
self.client
.upsert_points_blocking(
self.collection_name.to_string(),
None,
vec![node.try_into()?],
None,
)
.upsert_points_blocking(self.collection_name.to_string(), None, vec![point], None)
.await?;
Ok(())
Ok(node)
}

/// Stores a batch of ingestion nodes in the Qdrant storage.
@@ -74,19 +74,28 @@ impl Storage for Qdrant {
/// # Errors
///
/// This function will return an error if any node conversion or storage operation fails.
#[tracing::instrument(skip_all, err, name = "storage.qdrant.batch_store")]
async fn batch_store(&self, nodes: Vec<crate::ingestion::IngestionNode>) -> Result<()> {
self.client
.upsert_points_blocking(
self.collection_name.to_string(),
None,
nodes
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?,
None,
)
.await?;
Ok(())
#[tracing::instrument(skip_all, name = "storage.qdrant.batch_store")]
async fn batch_store(&self, nodes: Vec<crate::ingestion::IngestionNode>) -> IngestionStream {
let points = nodes
.iter()
.map(|node| node.clone().try_into())
.collect::<Result<Vec<_>>>();

if points.is_err() {
return stream::iter(vec![Err(points.unwrap_err())]).boxed();
}

let points = points.unwrap();

let result = self
.client
.upsert_points_blocking(self.collection_name.to_string(), None, points, None)
.await;

if result.is_ok() {
stream::iter(nodes.into_iter().map(Ok)).boxed()
} else {
stream::iter(vec![Err(result.unwrap_err())]).boxed()
}
}
}
6 changes: 3 additions & 3 deletions swiftide/src/traits.rs
Original file line number Diff line number Diff line change
@@ -63,10 +63,10 @@ pub trait SimplePrompt: Debug + Send + Sync {
#[cfg_attr(test, automock)]
#[async_trait]
/// Persists nodes
pub trait Storage: Send + Sync {
pub trait Persist: Send + Sync {
async fn setup(&self) -> Result<()>;
async fn store(&self, node: IngestionNode) -> Result<()>;
async fn batch_store(&self, nodes: Vec<IngestionNode>) -> Result<()>;
async fn store(&self, node: IngestionNode) -> Result<IngestionNode>;
async fn batch_store(&self, nodes: Vec<IngestionNode>) -> IngestionStream;
fn batch_size(&self) -> Option<usize> {
None
}
2 changes: 1 addition & 1 deletion swiftide/tests/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ async fn test_ingestion_pipeline() {
integrations::redis::RedisNodeCache::try_from_url(&redis_url, "prefix").unwrap(),
)
.then_in_batch(1, transformers::OpenAIEmbed::new(openai_client.clone()))
.store_with(
.then_store_with(
integrations::qdrant::Qdrant::try_from_url(qdrant_url)
.unwrap()
.vector_size(1536)

0 comments on commit 745b8ed

Please sign in to comment.