diff --git a/rust/index/src/hnsw_provider.rs b/rust/index/src/hnsw_provider.rs index f82d1bb13119..569d2048a959 100644 --- a/rust/index/src/hnsw_provider.rs +++ b/rust/index/src/hnsw_provider.rs @@ -1,10 +1,7 @@ -use crate::PersistentIndex; +use crate::{HnswIndexConfigError, PersistentIndex}; use super::config::HnswProviderConfig; -use super::{ - HnswIndex, HnswIndexConfig, HnswIndexFromSegmentError, Index, IndexConfig, - IndexConfigFromSegmentError, IndexUuid, -}; +use super::{HnswIndex, HnswIndexConfig, Index, IndexConfig, IndexUuid}; use async_trait::async_trait; use chroma_cache::Cache; diff --git a/rust/index/src/spann/types.rs b/rust/index/src/spann/types.rs index c5b315c6a30f..f884ffb30cf5 100644 --- a/rust/index/src/spann/types.rs +++ b/rust/index/src/spann/types.rs @@ -1,14 +1,17 @@ use std::collections::HashMap; use arrow::error; -use chroma_blockstore::{provider::BlockfileProvider, BlockfileWriter}; +use chroma_blockstore::{provider::BlockfileProvider, BlockfileWriter, BlockfileWriterOptions}; use chroma_distance::DistanceFunction; use chroma_error::{ChromaError, ErrorCodes}; -use chroma_types::SpannPostingList; +use chroma_types::{CollectionUuid, SpannPostingList}; use thiserror::Error; use uuid::Uuid; -use crate::hnsw_provider::{HnswIndexParams, HnswIndexProvider, HnswIndexRef}; +use crate::{ + hnsw_provider::{HnswIndexParams, HnswIndexProvider, HnswIndexRef}, + IndexUuid, +}; // TODO(Sanket): Add locking structures as necessary. pub struct SpannIndexWriter { @@ -59,8 +62,8 @@ impl SpannIndexWriter { async fn hnsw_index_from_id( hnsw_provider: &HnswIndexProvider, - id: &Uuid, - collection_id: &Uuid, + id: &IndexUuid, + collection_id: &CollectionUuid, distance_function: DistanceFunction, dimensionality: usize, ) -> Result { @@ -75,7 +78,7 @@ impl SpannIndexWriter { async fn create_hnsw_index( hnsw_provider: &HnswIndexProvider, - collection_id: &Uuid, + collection_id: &CollectionUuid, distance_function: DistanceFunction, dimensionality: usize, hnsw_params: HnswIndexParams, @@ -102,7 +105,7 @@ impl SpannIndexWriter { ) -> Result, SpannIndexWriterConstructionError> { // Create a reader for the blockfile. Load all the data into the versions map. let mut versions_map = HashMap::new(); - let reader = match blockfile_provider.open::(blockfile_id).await { + let reader = match blockfile_provider.read::(blockfile_id).await { Ok(reader) => reader, Err(_) => { return Err(SpannIndexWriterConstructionError::BlockfileReaderConstructionError) @@ -120,8 +123,11 @@ impl SpannIndexWriter { blockfile_id: &Uuid, blockfile_provider: &BlockfileProvider, ) -> Result { + let mut bf_options = BlockfileWriterOptions::new(); + bf_options = bf_options.unordered_mutations(); + bf_options = bf_options.fork(blockfile_id.clone()); match blockfile_provider - .fork::>(blockfile_id) + .write::>(bf_options) .await { Ok(writer) => Ok(writer), @@ -132,7 +138,12 @@ impl SpannIndexWriter { async fn create_posting_list( blockfile_provider: &BlockfileProvider, ) -> Result { - match blockfile_provider.create::>() { + let mut bf_options = BlockfileWriterOptions::new(); + bf_options = bf_options.unordered_mutations(); + match blockfile_provider + .write::>(bf_options) + .await + { Ok(writer) => Ok(writer), Err(_) => Err(SpannIndexWriterConstructionError::BlockfileWriterConstructionError), } @@ -141,11 +152,11 @@ impl SpannIndexWriter { #[allow(clippy::too_many_arguments)] pub async fn from_id( hnsw_provider: &HnswIndexProvider, - hnsw_id: Option<&Uuid>, + hnsw_id: Option<&IndexUuid>, versions_map_id: Option<&Uuid>, posting_list_id: Option<&Uuid>, hnsw_params: Option, - collection_id: &Uuid, + collection_id: &CollectionUuid, distance_function: DistanceFunction, dimensionality: usize, blockfile_provider: &BlockfileProvider, diff --git a/rust/worker/src/segment/distributed_hnsw_segment.rs b/rust/worker/src/segment/distributed_hnsw_segment.rs index 12b877d54b12..5a64afe192f1 100644 --- a/rust/worker/src/segment/distributed_hnsw_segment.rs +++ b/rust/worker/src/segment/distributed_hnsw_segment.rs @@ -10,10 +10,7 @@ use chroma_index::hnsw_provider::{ HnswIndexParams, HnswIndexProvider, HnswIndexProviderCreateError, HnswIndexProviderForkError, HnswIndexProviderOpenError, HnswIndexRef, }; -use chroma_index::{ - HnswIndexConfig, HnswIndexFromSegmentError, Index, IndexConfig, IndexConfigFromSegmentError, - IndexUuid, -}; +use chroma_index::{HnswIndexConfig, Index, IndexConfig, IndexUuid}; use chroma_index::{DEFAULT_HNSW_EF_CONSTRUCTION, DEFAULT_HNSW_EF_SEARCH, DEFAULT_HNSW_M}; use chroma_types::SegmentUuid; use chroma_types::{get_metadata_value_as, MaterializedLogOperation, MetadataValue, Segment}; diff --git a/rust/worker/src/segment/spann_segment.rs b/rust/worker/src/segment/spann_segment.rs index 2daaf8bd16c7..414dcfbf0b23 100644 --- a/rust/worker/src/segment/spann_segment.rs +++ b/rust/worker/src/segment/spann_segment.rs @@ -3,8 +3,8 @@ use std::collections::HashMap; use arrow::error; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; -use chroma_index::{hnsw_provider::HnswIndexProvider, spann::types::SpannIndexWriter}; -use chroma_types::{Segment, SegmentScope, SegmentType}; +use chroma_index::{hnsw_provider::HnswIndexProvider, spann::types::SpannIndexWriter, IndexUuid}; +use chroma_types::{Segment, SegmentScope, SegmentType, SegmentUuid}; use thiserror::Error; use uuid::Uuid; @@ -16,7 +16,7 @@ const POSTING_LIST_PATH: &str = "posting_list_path"; pub(crate) struct SpannSegmentWriter { index: SpannIndexWriter, - id: Uuid, + id: SegmentUuid, } #[derive(Error, Debug)] @@ -76,7 +76,10 @@ impl SpannSegmentWriter { return Err(SpannSegmentWriterError::IndexIdParsingError); } }; - (Some(index_uuid), Some(hnsw_params_from_segment(segment))) + ( + Some(IndexUuid(index_uuid)), + Some(hnsw_params_from_segment(segment)), + ) } None => { return Err(SpannSegmentWriterError::HnswInvalidFilePath);