From 1b26912006d03ca89266bf4455e969ed0eb6f0e8 Mon Sep 17 00:00:00 2001 From: Milan Cermak Date: Thu, 5 Dec 2024 10:46:31 +0100 Subject: [PATCH] feat: postgresql store --- Cargo.lock | 116 +++++- Cargo.toml | 5 +- rig-postgres/CHANGELOG.md | 12 + rig-postgres/Cargo.toml | 22 ++ rig-postgres/LICENSE | 7 + rig-postgres/README.md | 37 ++ .../examples/vector_search_postgres.rs | 89 +++++ rig-postgres/src/lib.rs | 364 ++++++++++++++++++ 8 files changed, 644 insertions(+), 8 deletions(-) create mode 100644 rig-postgres/CHANGELOG.md create mode 100644 rig-postgres/Cargo.toml create mode 100644 rig-postgres/LICENSE create mode 100644 rig-postgres/README.md create mode 100644 rig-postgres/examples/vector_search_postgres.rs create mode 100644 rig-postgres/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 62e304f9..15448632 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,9 @@ checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "arc-swap" @@ -1926,6 +1926,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -4181,6 +4187,16 @@ dependencies = [ "indexmap 2.6.0", ] +[[package]] +name = "pgvector" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0e8871b6d7ca78348c6cd29b911b94851f3429f0cd403130ca17f26c1fb91a6" +dependencies = [ + "bytes", + "postgres-types", +] + [[package]] name = "phf" version = "0.11.2" @@ -4273,6 +4289,35 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "postgres-protocol" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" +dependencies = [ + "base64 0.22.1", + "byteorder", + "bytes", + "fallible-iterator 0.2.0", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f" +dependencies = [ + "bytes", + "fallible-iterator 0.2.0", + "postgres-protocol", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -4883,6 +4928,24 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "rig-postgres" +version = "0.1.0" +dependencies = [ + "anyhow", + "log", + "pgvector", + "rig-core", + "serde", + "serde_json", + "tokio", + "tokio-postgres", + "tokio-test", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "rig-qdrant" version = "0.1.3" @@ -4946,7 +5009,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags 2.6.0", - "fallible-iterator", + "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", @@ -6059,9 +6122,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.1" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", @@ -6096,6 +6159,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator 0.2.0", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand", + "socket2 0.5.7", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-rusqlite" version = "0.6.0" @@ -6524,6 +6613,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.95" @@ -6645,6 +6740,17 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a85b86a771b1c87058196170769dd264f66c0782acf1ae6cc51bfd64b39082" +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall 0.5.7", + "wasite", + "web-sys", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index df48d1b3..aa556a7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ resolver = "2" members = [ "rig-core", "rig-lancedb", - "rig-mongodb", "rig-neo4j", + "rig-mongodb", "rig-neo4j", "rig-postgres", "rig-qdrant", "rig-core/rig-core-derive", - "rig-sqlite" -] + "rig-sqlite"] diff --git a/rig-postgres/CHANGELOG.md b/rig-postgres/CHANGELOG.md new file mode 100644 index 00000000..13f3af9c --- /dev/null +++ b/rig-postgres/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Added + +- Postgres vector store diff --git a/rig-postgres/Cargo.toml b/rig-postgres/Cargo.toml new file mode 100644 index 00000000..b6fbbeac --- /dev/null +++ b/rig-postgres/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "rig-postgres" +version = "0.1.0" +edition = "2021" +description = "PostgreSQL-based vector store implementation for the rig framework" +license = "MIT" + +[dependencies] +pgvector = { version = "0.4.0", features = ["postgres"] } +rig-core = { path = "../rig-core", version = "0.5.0", features = ["derive"] } +serde = { version = "1.0.215", features = ["derive"] } +serde_json = "1.0.133" +tokio-postgres = "0.7.12" +tracing = "0.1.40" + +[dev-dependencies] +anyhow = "1.0.94" +log = "0.4.22" +tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } +tokio-test = "0.4.4" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = { version = "1.11.0", features = ["v4"] } diff --git a/rig-postgres/LICENSE b/rig-postgres/LICENSE new file mode 100644 index 00000000..878b5fbc --- /dev/null +++ b/rig-postgres/LICENSE @@ -0,0 +1,7 @@ +Copyright (c) 2024, Playgrounds Analytics Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/rig-postgres/README.md b/rig-postgres/README.md new file mode 100644 index 00000000..a58f17f7 --- /dev/null +++ b/rig-postgres/README.md @@ -0,0 +1,37 @@ +
+ + + + Rig logo + + + + + + + SQLite logo + +
+ +

+ +## Rig-postgres + +This companion crate implements a Rig vector store based on PostgreSQL. + +## Usage + +Add the companion crate to your `Cargo.toml`, along with the rig-core crate: + +```toml +[dependencies] +rig-core = "0.4.0" +rig-postgres = "0.1.0" +``` + +You can also run `cargo add rig-core rig-postgres` to add the most recent versions of the dependencies to your project. + +See the [`/examples`](./examples) folder for usage examples. + +## Important Note + +The crate utilizes the [pgvector](https://github.com/pgvector/pgvector) PostgreSQL extension. It will automatically install it in your database if it's not yet present. diff --git a/rig-postgres/examples/vector_search_postgres.rs b/rig-postgres/examples/vector_search_postgres.rs new file mode 100644 index 00000000..88b1f24f --- /dev/null +++ b/rig-postgres/examples/vector_search_postgres.rs @@ -0,0 +1,89 @@ +use rig::{ + embeddings::EmbeddingsBuilder, + providers::openai::{Client, TEXT_EMBEDDING_3_SMALL}, + vector_store::VectorStoreIndex, + Embed, +}; +use rig_postgres::{Column, PostgresVectorIndex, PostgresVectorStore, PostgresVectorStoreTable}; +use serde::Deserialize; +use tokio_postgres::types::ToSql; + +#[derive(Clone, Debug, Deserialize, Embed)] +pub struct Document { + id: String, + #[embed] + content: String, +} + +impl PostgresVectorStoreTable for Document { + fn name() -> &'static str { + "documents" + } + + fn schema() -> Vec { + vec![ + Column::new("id", "TEXT PRIMARY KEY"), + Column::new("content", "TEXT"), + ] + } + + fn column_values(&self) -> Vec<(&'static str, Box)> { + vec![ + ("id", Box::new(self.id.clone())), + ("content", Box::new(self.content.clone())), + ] + } +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + // tracing_subscriber::fmt().with_env_filter( + // tracing_subscriber::EnvFilter::from_default_env() + // .add_directive(tracing::Level::DEBUG.into()) + // .add_directive("hyper=off".parse().unwrap()) + // ).init(); + + // set up postgres connection + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL not set"); + let db_config: tokio_postgres::Config = database_url.parse()?; + let (psql, connection) = db_config.connect(tokio_postgres::NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!("Connection error: {}", e); + } + }); + + // set up embedding model + let openai_api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY not set"); + let openai = Client::new(&openai_api_key); + let model = openai.embedding_model(TEXT_EMBEDDING_3_SMALL); + + // generate embeddings + let documents: Vec = vec![ + "The Mediterranean diet emphasizes fish, olive oil, and vegetables, believed to reduce chronic diseases.", + "Photosynthesis in plants converts light energy into glucose and produces essential oxygen.", + "20th-century innovations, from radios to smartphones, centered on electronic advancements.", + ].into_iter().map(|content| Document { + id: uuid::Uuid::new_v4().to_string(), + content: content.to_string(), + }).collect(); + let embeddings = EmbeddingsBuilder::new(model.clone()) + .documents(documents)? + .build() + .await?; + + // add embeddings to store + let store = PostgresVectorStore::new(psql, &model).await?; + store.add_rows(embeddings).await?; + + // query the index + let index = PostgresVectorIndex::new(model, store); + let results = index.top_n::("What is photosynthesis", 1).await?; + println!("top_n results: \n{:?}", results); + + let ids = index.top_n_ids("What is photosynthesis?", 1).await?; + println!("top_n_ids results:\n{:?}", ids); + + Ok(()) +} diff --git a/rig-postgres/src/lib.rs b/rig-postgres/src/lib.rs new file mode 100644 index 00000000..f611f536 --- /dev/null +++ b/rig-postgres/src/lib.rs @@ -0,0 +1,364 @@ +use pgvector::Vector; +use rig::{ + embeddings::embedding::{Embedding, EmbeddingModel}, + vector_store::{VectorStoreError, VectorStoreIndex}, + OneOrMany, +}; +use serde::Deserialize; +use std::marker::PhantomData; +use tokio_postgres::{types::ToSql, Client}; +use tracing::debug; + +pub struct Column { + name: &'static str, + col_type: &'static str, +} + +impl Column { + pub fn new(name: &'static str, col_type: &'static str) -> Self { + Self { name, col_type } + } +} + +/// Example of a document type taht can be used with PostgresVectoreStore +/// ```rust +/// +/// use rig::Embed; +/// use rig_postgres::{Column, PostgresVectorStoreTable}; +/// use tokio_postgres::types::ToSql; +/// +/// #[derive(Clone, Debug, Embed)] +/// pub struct Document { +/// id: String, +/// #[embed] +/// content: String, +/// } +/// +/// impl PostgresVectorStoreTable for Document { +/// fn name() -> &'static str { +/// "documents" +/// } +/// +/// fn schema() -> Vec { +/// vec![ +/// Column::new("id", "TEXT PRIMARY KEY"), +/// Column::new("content", "TEXT"), +/// ] +/// } +/// +/// fn column_values(&self) -> Vec<(&'static str, Box)> { +/// vec![ +/// ("id", Box::new(self.id.clone())), +/// ("content", Box::new(self.content.clone())), +/// ] +/// } +/// } +/// ``` +pub trait PostgresVectorStoreTable: Send + Sync + Clone { + fn name() -> &'static str; + fn schema() -> Vec; + fn column_values(&self) -> Vec<(&'static str, Box)>; +} + +pub struct PostgresVectorStore { + client: Client, + _phantom: PhantomData<(E, T)>, +} + +impl PostgresVectorStore { + pub async fn new(client: Client, embedding_model: &E) -> Result { + let dims = embedding_model.ndims(); + let table_name = T::name(); + let schema = T::schema(); + + async { + // ensure extension is installed + client + .execute("CREATE EXTENSION IF NOT EXISTS vector;", &[]) + .await?; + + // create the table according to schema, with an extra `embeddings vector(...)` column + let columns = schema + .iter() + .map(|col| format!("{} {}", col.name, col.col_type)) + .collect::>() + .join(", "); + let embeddings_col = format!("embeddings vector({})", dims); + + debug!("Creating table: {}", table_name); + let create_table = format!("CREATE TABLE IF NOT EXISTS {} ({}, {})", table_name, columns, embeddings_col); + client.execute(&create_table, &[]).await?; + + // create the index on the `embeddings` column + debug!("Creating index on embeddings column"); + client + .execute( + &format!( + "CREATE INDEX IF NOT EXISTS {}_embeddings_idx ON {} USING hnsw(embeddings vector_cosine_ops)", + table_name, table_name + ), + &[], + ) + .await?; + + Ok::<_, tokio_postgres::Error>(()) + } + .await + .map_err(|e| VectorStoreError::DatastoreError(Box::new(e)))?; + + Ok(Self { + client, + _phantom: PhantomData, + }) + } + + pub async fn add_rows( + &self, + documents: Vec<(T, OneOrMany)>, + ) -> Result<(), VectorStoreError> { + let table_name = T::name(); + let (mut columns, mut placeholders): (Vec<&str>, Vec) = T::schema() + .iter() + .enumerate() + .map(|(index, col)| (col.name, format!("${}", index + 1))) + .unzip(); + columns.push("embeddings"); + placeholders.push(format!("${}", placeholders.len() + 1)); + let columns = columns.join(", "); + let placeholders = placeholders.join(", "); + + let query_string = &format!( + "INSERT INTO {} ({}) VALUES ({})", + table_name, columns, placeholders + ); + let query = self + .client + .prepare(query_string) + .await + .map_err(|e| VectorStoreError::DatastoreError(Box::new(e)))?; + + debug!( + "Inserting {} rows into table: {}", + documents.len(), + table_name + ); + for (doc, embeddings) in &documents { + let embedding_vector: Vector = embeddings + .iter() + .flat_map(|e| e.vec.iter().map(|e| *e as f32)) + .collect::>() + .into(); + + let values = doc.column_values(); + let params: Vec<&(dyn ToSql + Sync)> = values + .iter() + .map(|(_, v)| &**v) + .chain(std::iter::once(&embedding_vector as &(dyn ToSql + Sync))) + .collect(); + + self.client + .execute(&query, ¶ms) + .await + .map_err(|e| VectorStoreError::DatastoreError(Box::new(e)))?; + } + + Ok(()) + } +} + +/// PostgreSQL vector store implementation for Rig. +/// +/// This crate provides a PostgreSQL vector store implementation for Rig. It uses the pgvector extension +/// to store embeddings and perform similarity searches. +/// +/// # Example +/// ```rust,ignore +/// use rig::{ +/// embeddings::EmbeddingsBuilder, +/// providers::openai::{Client, TEXT_EMBEDDING_3_SMALL}, +/// vector_store::VectorStoreIndex, +/// Embed, +/// }; +/// use rig_postgres::{Column, PostgresVectorIndex, PostgresVectorStore, PostgresVectorStoreTable}; +/// use serde::Deserialize; +/// use tokio_postgres::types::ToSql; +/// +/// #[derive(Clone, Debug, Deserialize, Embed)] +/// pub struct Document { +/// id: String, +/// #[embed] +/// content: String, +/// } +/// +/// impl PostgresVectorStoreTable for Document { +/// fn name() -> &'static str { +/// "documents" +/// } +/// +/// fn schema() -> Vec { +/// vec![ +/// Column::new("id", "TEXT PRIMARY KEY"), +/// Column::new("content", "TEXT"), +/// ] +/// } +/// +/// fn column_values(&self) -> Vec<(&'static str, Box)> { +/// vec![ +/// ("id", Box::new(self.id.clone())), +/// ("content", Box::new(self.content.clone())), +/// ] +/// } +/// } +/// +/// # tokio_test::block_on(async { +/// # Result::<(), Box>::Ok({ +/// // set up postgres connection +/// let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL not set"); +/// let db_config: tokio_postgres::Config = database_url.parse()?; +/// let (psql, connection) = db_config.connect(tokio_postgres::NoTls).await?; +/// +/// tokio::spawn(async move { +/// if let Err(e) = connection.await { +/// tracing::error!("Connection error: {}", e); +/// } +/// }); +/// +/// // set up embedding model +/// let openai_api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY not set"); +/// let openai = Client::new(&openai_api_key); +/// let model = openai.embedding_model(TEXT_EMBEDDING_3_SMALL); +/// +/// // generate embeddings +/// let documents: Vec = vec![ +/// "The Mediterranean diet emphasizes fish, olive oil, and vegetables, believed to reduce chronic diseases.", +/// "Photosynthesis in plants converts light energy into glucose and produces essential oxygen.", +/// "20th-century innovations, from radios to smartphones, centered on electronic advancements.", +/// ].into_iter().map(|content| Document { +/// id: uuid::Uuid::new_v4().to_string(), +/// content: content.to_string(), +/// }).collect(); +/// let embeddings = EmbeddingsBuilder::new(model.clone()) +/// .documents(documents)? +/// .build() +/// .await?; +/// +/// // add embeddings to store +/// let store = PostgresVectorStore::new(psql, &model).await?; +/// store.add_rows(embeddings).await?; +/// +/// // query the index +/// let index = PostgresVectorIndex::new(model, store); +/// let results = index.top_n::("What is photosynthesis", 1).await?; +/// # }) +/// # }); +/// ``` +pub struct PostgresVectorIndex { + store: PostgresVectorStore, + embedding_model: E, +} + +impl PostgresVectorIndex { + pub fn new(embedding_model: E, store: PostgresVectorStore) -> Self { + Self { + store, + embedding_model, + } + } +} + +impl VectorStoreIndex + for PostgresVectorIndex +{ + async fn top_n Deserialize<'a> + Send>( + &self, + query: &str, + n: usize, + ) -> Result, VectorStoreError> { + let embedding = self.embedding_model.embed_text(query).await?; + let vector: Vector = embedding + .vec + .iter() + .map(|e| *e as f32) + .collect::>() + .into(); + let table_name = T::name(); + let column_names = T::schema() + .iter() + .map(|col| col.name) + .collect::>() + .join(", "); + + let query_string = format!( + "SELECT {}, embeddings <=> $1 AS distance FROM {} ORDER BY embeddings <=> $1 LIMIT $2", + column_names, table_name + ); + let rows = self + .store + .client + .query(&query_string, &[&vector, &(n as i64)]) + .await + .map_err(|e| VectorStoreError::DatastoreError(Box::new(e)))?; + + let res = rows + .into_iter() + .map(|row| { + let rlen = row.len(); + let id: String = row.get(0); // assuming id is first column + let distance: f64 = row.get(rlen - 1); // distance is last column + + let mut map = serde_json::Map::new(); + for (i, column) in row.columns().iter().enumerate().take(rlen - 1) { + let name = column.name(); + let value = serde_json::Value::String(row.get(i)); + map.insert(name.to_string(), value); + } + + let doc_value = serde_json::Value::Object(map); + match serde_json::from_value::(doc_value) { + Ok(doc) => Ok((distance, id, doc)), + Err(e) => Err(VectorStoreError::DatastoreError(Box::new(e))), + } + }) + .collect::, _>>()?; + + Ok(res) + } + + async fn top_n_ids( + &self, + query: &str, + n: usize, + ) -> Result, VectorStoreError> { + let embedding = self.embedding_model.embed_text(query).await?; + let vector: Vector = embedding + .vec + .iter() + .map(|e| *e as f32) + .collect::>() + .into(); + let table_name = T::name(); + let id_col_name = T::schema()[0].name; + + let query = format!( + "SELECT {}, embeddings <=> $1 AS distance FROM {} ORDER BY embeddings <=> $1 LIMIT $2", + id_col_name, table_name + ); + let rows = self + .store + .client + .query(&query, &[&vector, &(n as i64)]) + .await + .map_err(|e| VectorStoreError::DatastoreError(Box::new(e)))?; + + let res = rows + .into_iter() + .map(|row| { + let id: String = row.get(0); + let distance: f64 = row.get(1); + Ok::<(f64, String), VectorStoreError>((distance, id)) + }) + .collect::, _>>()?; + + Ok(res) + } +}