Skip to content

Commit

Permalink
feat: replace databuoy with new ingestion pipeline (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv authored May 24, 2024
1 parent d1e8b70 commit 7453ddc
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 121 deletions.
4 changes: 3 additions & 1 deletion crates/indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ignore = "0.4.22"
code_ops = { path = "../code_ops" }
infrastructure = { path = "../infrastructure" }
anyhow = { workspace = true }
qdrant-client = "1.9.0"
qdrant-client = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -22,3 +22,5 @@ tokio-stream = "0.1.15"
uuid = { workspace = true }
indoc = { workspace = true }
redis = { workspace = true }
text-splitter = { version = "0.13.1", features = ["markdown"] }
chrono = { workspace = true }
19 changes: 13 additions & 6 deletions crates/indexing/src/ingestion_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use qdrant_client::{

#[derive(Debug, Default, Clone)]
pub struct IngestionNode {
// TODO: Can we make the ie path + n node the id?
pub id: Option<u64>,
pub path: PathBuf,
pub chunk: String,
Expand All @@ -32,12 +31,14 @@ impl IngestionNode {

format!("{}\n{}", metadata, self.chunk)
}

pub fn calculate_hash(&self) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.hash(&mut hasher);
hasher.finish()
}
}

// TODO: We could also use hashes as the node id instead of uuid?
// That would remove the need for uuid and the extra delete before insert check in storage
// Potential issue there is that if implementation on metadata changes, storage would not update
// ... Or we add metadata to the hash as well?
impl Hash for IngestionNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state);
Expand All @@ -49,9 +50,15 @@ impl TryInto<qdrant::PointStruct> for IngestionNode {
type Error = anyhow::Error;

fn try_into(mut self) -> Result<qdrant::PointStruct> {
let id = self.calculate_hash();

self.metadata.extend([
("path".to_string(), self.path.to_string_lossy().to_string()),
("content".to_string(), self.chunk),
(
"last_updated_at".to_string(),
chrono::Utc::now().to_rfc3339(),
),
]);

// Damn who build this api
Expand All @@ -63,7 +70,7 @@ impl TryInto<qdrant::PointStruct> for IngestionNode {
.into();

Ok(qdrant::PointStruct::new(
uuid::Uuid::new_v4().to_string(),
id,
self.vector.context("Vector is not set")?,
payload,
))
Expand Down
16 changes: 12 additions & 4 deletions crates/indexing/src/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub struct IngestionPipeline {

// A lazy pipeline for ingesting files, adding metadata, chunking, transforming, embedding and then storing them.
impl IngestionPipeline {
// TODO: fix lifetime
pub fn from_loader(loader: impl Loader + 'static) -> Self {
let stream = loader.into_stream();
Self {
Expand All @@ -33,20 +32,24 @@ impl IngestionPipeline {
self
}

#[tracing::instrument(skip_all)]
pub fn filter_cached(mut self, cache: impl NodeCache + 'static) -> Self {
let cache = Arc::new(cache);
self.stream = self
.stream
.try_filter(move |node| {
let cache = Arc::clone(&cache);
// TODO: Maybe Cow or arc instead? Lots of nodes
// FIXME: Maybe Cow or arc instead? Lots of nodes
let node = node.clone();
tokio::spawn(async move {
if !cache.get(&node).await {
cache.set(&node).await;

tracing::debug!("Node not in cache, passing through");

true
} else {
tracing::debug!("Node in cache, skipping");
false
}
})
Expand Down Expand Up @@ -122,27 +125,32 @@ impl IngestionPipeline {
self
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(total_nodes))]
pub async fn run(mut self) -> Result<()> {
let Some(ref storage) = self.storage else {
return Ok(());
anyhow::bail!("No storage configured for ingestion pipeline")
};

storage.setup().await?;

let mut total_nodes = 0;
if let Some(batch_size) = storage.batch_size() {
// Chunk both Ok and Err results, early return on any error
let mut stream = self.stream.chunks(batch_size).boxed();
while let Some(nodes) = stream.next().await {
let nodes = nodes.into_iter().collect::<Result<Vec<IngestionNode>>>()?;
total_nodes += nodes.len();
storage.batch_store(nodes).await?;
}
} else {
while let Some(node) = self.stream.next().await {
total_nodes += 1;
storage.store(node?).await?;
}
}

tracing::Span::current().record("total_nodes", total_nodes);

Ok(())
}
}
32 changes: 32 additions & 0 deletions crates/indexing/src/loaders/file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,42 @@ impl FileLoader {
}
}

/// Add extensions to the loader
///
/// # Arguments
/// * `extensions` - A list of extensions to add without the leading dot
pub fn with_extensions(mut self, extensions: &[&str]) -> Self {
self.extensions
.extend(extensions.iter().map(ToString::to_string));
self
}

/// Debug method
pub fn list_nodes(&self) -> Vec<IngestionNode> {
ignore::Walk::new(&self.path)
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_type().map(|ft| ft.is_file()).unwrap_or(false))
.filter(move |entry| {
let extensions = self.extensions.clone();

entry
.path()
.extension()
.map(|ext| extensions.contains(&ext.to_string_lossy().to_string()))
.unwrap_or(false)
})
.map(|entry| entry.into_path())
.map(|entry| {
tracing::debug!("Reading file: {:?}", entry);
let content = std::fs::read_to_string(&entry).unwrap();
IngestionNode {
path: entry,
chunk: content,
..Default::default()
}
})
.collect()
}
}

impl Loader for FileLoader {
Expand All @@ -41,6 +72,7 @@ impl Loader for FileLoader {
.map(|entry| entry.into_path())
.map(|entry| {
let content = std::fs::read_to_string(&entry)?;
tracing::debug!("Reading file: {:?}", entry);
Ok(IngestionNode {
path: entry,
chunk: content,
Expand Down
2 changes: 2 additions & 0 deletions crates/indexing/src/loaders/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod file_loader;

pub use file_loader::FileLoader;
13 changes: 2 additions & 11 deletions crates/indexing/src/node_caches/redis.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
fmt::Debug,
hash::{Hash as _, Hasher},
};
use std::fmt::Debug;
use tokio::sync::RwLock;

use anyhow::{Context as _, Result};
Expand Down Expand Up @@ -42,13 +39,7 @@ impl Redis {
}

fn key_for_node(&self, node: &IngestionNode) -> String {
// We care about speed, not security or correctness
//
// Might be worth using faster algorithms
let mut s = std::hash::DefaultHasher::new();
node.hash(&mut s);

format!("{}:{}", self.key_prefix, s.finish())
format!("{}:{}", self.key_prefix, node.calculate_hash())
}

#[allow(dead_code)]
Expand Down
43 changes: 31 additions & 12 deletions crates/indexing/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,33 @@ use anyhow::Result;
use indoc::formatdoc;
use infrastructure::Embed;
use infrastructure::SimplePrompt;
use qdrant_client::{client::QdrantClient, qdrant::SearchPoints};

pub async fn query(query: &str) -> Result<String> {
let client = QdrantClient::from_url("http://localhost:6334")
.build()
.unwrap();
use qdrant_client::qdrant::SearchPoints;

/// Performs a naive search using qdrant and openai
///
/// When we add more complicated rag query
/// logic, nice to have a pipeline similar to ingestion and abstract away over the storage.
///
/// This is just quick and dirty so we can get databuoy out.
#[tracing::instrument(
skip(query, storage_namespace),
fields(query, response),
err,
name = "indexing.query.naieve"
)]
pub async fn naive(query: &str, storage_namespace: &str) -> Result<String> {
let qdrant = infrastructure::create_qdrant_client()?;
let openai = infrastructure::create_openai_client();

let embedding_model = infrastructure::DEFAULT_OPENAI_EMBEDDING_MODEL;

let mut embedded_query = openai
.embed(vec![query.to_string()], "text-embedding-3-small")
.embed(vec![query.to_string()], embedding_model)
.await?;

let search_result = client
let search_result = qdrant
.search_points(&SearchPoints {
collection_name: "latest-test".to_string(),
collection_name: storage_namespace.to_string(),
vector: embedded_query
.drain(0..1)
.next()
Expand All @@ -38,19 +49,27 @@ pub async fn query(query: &str) -> Result<String> {
.fold(acc, |acc, (k, v)| format!("{}\n{}: {}", acc, k, v))
});

tracing::Span::current().record("query", query);

let prompt = formatdoc!(
r#"
Answer the following question:
Answer the following question(s):
{query}
## Constraints
* Only answer based on the provided context below
* Be elaborate and specific in your answers
* Answer the question fully and remember to be concise
## Additional information found
{result_context}
"#,
);

openai.prompt(&prompt, "gpt-4o").await
let response = openai
.prompt(&prompt, infrastructure::DEFAULT_OPENAI_MODEL)
.await?;

tracing::Span::current().record("response", &response);

Ok(response)
}
Loading

0 comments on commit 7453ddc

Please sign in to comment.