Skip to content

Commit

Permalink
feat(integrations)!: implement Persist for Redis (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv authored Jun 23, 2024
1 parent 062107b commit 9004323
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 139 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
tokio = { version = "1.0", features = ["full"] }
swiftide = { path = "../swiftide/", features = ["all"] }
tracing-subscriber = "0.3"
serde_json = "1.0"

[[example]]
name = "ingest-codebase"
Expand All @@ -16,3 +17,7 @@ path = "ingest_codebase.rs"
[[example]]
name = "fastembed"
path = "fastembed.rs"

[[example]]
name = "ingest-redis"
path = "ingest_into_redis.rs"
7 changes: 2 additions & 5 deletions examples/ingest_codebase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use swiftide::{
ingestion,
integrations::{self, qdrant::Qdrant, redis::RedisNodeCache},
integrations::{self, qdrant::Qdrant, redis::Redis},
loaders::FileLoader,
transformers::{ChunkCode, Embed, MetadataQACode},
};
Expand All @@ -46,10 +46,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.to_owned();

ingestion::IngestionPipeline::from_loader(FileLoader::new(".").with_extensions(&["rs"]))
.filter_cached(RedisNodeCache::try_from_url(
redis_url,
"swiftide-examples",
)?)
.filter_cached(Redis::try_from_url(redis_url, "swiftide-examples")?)
.then(MetadataQACode::new(openai_client.clone()))
.then_chunk(ChunkCode::try_for_language_and_chunk_size(
"rust",
Expand Down
51 changes: 51 additions & 0 deletions examples/ingest_into_redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! # [Swiftide] Ingesting the Swiftide itself example
//!
//! This example demonstrates how to ingest the Swiftide codebase itself.
//! Note that for it to work correctly you need to have OPENAI_API_KEY set, redis and qdrant
//! running.
//!
//! The pipeline will:
//! - Load all `.rs` files from the current directory
//! - Skip any nodes previously processed; hashes are based on the path and chunk (not the
//! metadata!)
//! - Run metadata QA on each chunk; generating questions and answers and adding metadata
//! - Chunk the code into pieces of 10 to 2048 bytes
//! - Embed the chunks in batches of 10, Metadata is embedded by default
//! - Store the nodes in Qdrant
//!
//! Note that metadata is copied over to smaller chunks when chunking. When making LLM requests
//! with lots of small chunks, consider the rate limits of the API.
//!
//! [Swiftide]: https://github.com/bosun-ai/swiftide
//! [examples]: https://github.com/bosun-ai/swiftide/blob/master/examples
use swiftide::{
ingestion, integrations::redis::Redis, loaders::FileLoader, transformers::ChunkCode,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let redis_url = std::env::var("REDIS_URL")
.as_deref()
.unwrap_or("redis://localhost:6379")
.to_owned();

ingestion::IngestionPipeline::from_loader(FileLoader::new(".").with_extensions(&["rs"]))
.then_chunk(ChunkCode::try_for_language_and_chunk_size(
"rust",
10..2048,
)?)
.then_store_with(
// By default the value is the full node serialized to JSON.
// We can customize this by providing a custom function.
Redis::try_build_from_url(&redis_url)?
.persist_value_fn(|node| Ok(serde_json::to_string(&node.metadata)?))
.batch_size(50)
.build()?,
)
.run()
.await?;
Ok(())
}
4 changes: 3 additions & 1 deletion swiftide/src/ingestion/ingestion_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use std::{
path::PathBuf,
};

use serde::{Deserialize, Serialize};

/// Represents a unit of data in the ingestion process.
///
/// `IngestionNode` encapsulates all necessary information for a single unit of data being processed
/// in the ingestion pipeline. It includes fields for an identifier, file path, data chunk, optional
/// vector representation, and metadata.
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
pub struct IngestionNode {
/// Optional identifier for the node.
pub id: Option<u64>,
Expand Down
200 changes: 196 additions & 4 deletions swiftide/src/integrations/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,211 @@
//! This module provides the integration with Redis for caching nodes in the Swiftide system.
//!
//! The primary component of this module is the `RedisNodeCache`, which is re-exported for use
//! in other parts of the system. The `RedisNodeCache` struct is responsible for managing and
//! The primary component of this module is the `Redis`, which is re-exported for use
//! in other parts of the system. The `Redis` struct is responsible for managing and
//! caching nodes during the ingestion process, leveraging Redis for efficient storage and retrieval.
//!
//! # Overview
//!
//! The `RedisNodeCache` struct provides methods for:
//! The `Redis` struct provides methods for:
//! - Connecting to a Redis database
//! - Checking if a node is cached
//! - Setting a node in the cache
//! - Resetting the cache (primarily for testing purposes)
//!
//! This integration is essential for ensuring efficient node management and caching in the Swiftide system.
use anyhow::{Context as _, Result};
use derive_builder::Builder;
use tokio::sync::RwLock;

use crate::ingestion::IngestionNode;

mod node_cache;
mod persist;

/// `Redis` provides a caching mechanism for nodes using Redis.
/// It helps in optimizing the ingestion process by skipping nodes that have already been processed.
///
/// # Fields
///
/// * `client` - The Redis client used to interact with the Redis server.
/// * `connection_manager` - Manages the Redis connections asynchronously.
/// * `key_prefix` - A prefix used for keys stored in Redis to avoid collisions.
#[derive(Builder)]
#[builder(pattern = "owned", setter(strip_option))]
pub struct Redis {
client: redis::Client,
#[builder(default, setter(skip))]
connection_manager: RwLock<Option<redis::aio::ConnectionManager>>,
#[builder(default)]
cache_key_prefix: String,
#[builder(default = "10")]
/// The batch size used for persisting nodes. Defaults to a safe 10.
batch_size: usize,
#[builder(default)]
/// Customize the key used for persisting nodes
persist_key_fn: Option<fn(&IngestionNode) -> Result<String>>,
#[builder(default)]
/// Customize the value used for persisting nodes
persist_value_fn: Option<fn(&IngestionNode) -> Result<String>>,
}

impl Redis {
/// Creates a new `Redis` instance from a given Redis URL and key prefix.
///
/// # Parameters
///
/// * `url` - The URL of the Redis server.
/// * `prefix` - The prefix to be used for keys stored in Redis.
///
/// # Returns
///
/// A `Result` containing the `Redis` instance or an error if the client could not be created.
///
/// # Errors
///
/// Returns an error if the Redis client cannot be opened.
pub fn try_from_url(url: impl AsRef<str>, prefix: impl AsRef<str>) -> Result<Self> {
let client = redis::Client::open(url.as_ref()).context("Failed to open redis client")?;
Ok(Self {
client,
connection_manager: RwLock::new(None),
cache_key_prefix: prefix.as_ref().to_string(),
batch_size: 10,
persist_key_fn: None,
persist_value_fn: None,
})
}

pub fn try_build_from_url(url: impl AsRef<str>) -> Result<RedisBuilder> {
Ok(RedisBuilder::default()
.client(redis::Client::open(url.as_ref()).context("Failed to open redis client")?))
}

/// Builds a new `Redis` instance from the builder.
pub fn builder() -> RedisBuilder {
RedisBuilder::default()
}

/// Lazily connects to the Redis server and returns the connection manager.
///
/// # Returns
///
/// An `Option` containing the `ConnectionManager` if the connection is successful, or `None` if it fails.
///
/// # Errors
///
/// Logs an error and returns `None` if the connection manager cannot be obtained.
async fn lazy_connect(&self) -> Option<redis::aio::ConnectionManager> {
if self.connection_manager.read().await.is_none() {
let result = self.client.get_connection_manager().await;
if let Err(e) = result {
tracing::error!("Failed to get connection manager: {}", e);
return None;
}
let mut cm = self.connection_manager.write().await;
*cm = result.ok();
}

self.connection_manager.read().await.clone()
}

/// Generates a Redis key for a given node using the key prefix and the node's hash.
///
/// # Parameters
///
/// * `node` - The node for which the key is to be generated.
///
/// # Returns
///
/// A `String` representing the Redis key for the node.
fn cache_key_for_node(&self, node: &IngestionNode) -> String {
format!("{}:{}", self.cache_key_prefix, node.calculate_hash())
}

/// Generates a key for a given node to be persisted in Redis.
fn persist_key_for_node(&self, node: &IngestionNode) -> Result<String> {
if let Some(key_fn) = self.persist_key_fn {
key_fn(node)
} else {
let hash = node.calculate_hash();
Ok(format!("{}:{}", node.path.to_string_lossy(), hash))
}
}

/// Generates a value for a given node to be persisted in Redis.
/// By default, the node is serialized as JSON.
/// If a custom function is provided, it is used to generate the value.
/// Otherwise, the node is serialized as JSON.
fn persist_value_for_node(&self, node: &IngestionNode) -> Result<String> {
if let Some(value_fn) = self.persist_value_fn {
value_fn(node)
} else {
Ok(serde_json::to_string(node)?)
}
}

/// Resets the cache by deleting all keys with the specified prefix.
/// This function is intended for testing purposes and is inefficient for production use.
///
/// # Errors
///
/// Panics if the keys cannot be retrieved or deleted.
#[allow(dead_code)]
async fn reset_cache(&self) {
if let Some(mut cm) = self.lazy_connect().await {
let keys: Vec<String> = redis::cmd("KEYS")
.arg(format!("{}:*", self.cache_key_prefix))
.query_async(&mut cm)
.await
.expect("Could not get keys");

for key in &keys {
let _: usize = redis::cmd("DEL")
.arg(key)
.query_async(&mut cm)
.await
.expect("Failed to reset cache");
}
}
}

/// Gets a node persisted in Redis using the GET command
/// Takes a node and returns a Result<Option<String>>
#[allow(dead_code)]
async fn get_node(&self, node: &IngestionNode) -> Result<Option<String>> {
if let Some(mut cm) = self.lazy_connect().await {
let key = self.persist_key_for_node(node)?;
let result: Option<String> = redis::cmd("GET")
.arg(key)
.query_async(&mut cm)
.await
.context("Error getting from redis")?;
Ok(result)
} else {
anyhow::bail!("Failed to connect to Redis")
}
}
}

// Redis CM does not implement debug
impl std::fmt::Debug for Redis {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Redis")
.field("client", &self.client)
.finish()
}
}

pub use node_cache::RedisNodeCache;
impl Clone for Redis {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
connection_manager: RwLock::new(None),
cache_key_prefix: self.cache_key_prefix.clone(),
batch_size: self.batch_size,
persist_key_fn: self.persist_key_fn,
persist_value_fn: self.persist_value_fn,
}
}
}
Loading

0 comments on commit 9004323

Please sign in to comment.