Skip to content

Commit

Permalink
[ENH] Add flush sysdb to compactor (#2003)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - This PR adds flush sysdb operator to compaction orchestrator. 
	 - Changes include
- A new sysdb API to write to the sysdb on the new compaction metadata.
- A flush_sysdb operator which will be used by the compaction
orchestrator to register the compaction result to sysdb.
 - New functionality
	 - ...

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara authored Apr 12, 2024
1 parent 9fbc9cc commit 7e330ba
Show file tree
Hide file tree
Showing 16 changed files with 523 additions and 18 deletions.
9 changes: 7 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::execution::orchestration::CompactOrchestrator;
use crate::execution::orchestration::CompactionResponse;
use crate::log::log::Log;
use crate::memberlist::Memberlist;
use crate::sysdb::sysdb::SysDb;
use crate::system::Component;
use crate::system::ComponentContext;
use crate::system::Handler;
Expand All @@ -32,6 +33,7 @@ pub(crate) struct CompactionManager {
scheduler: Scheduler,
// Dependencies
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
// Dispatcher
dispatcher: Option<Box<dyn Receiver<TaskMessage>>>,
// Config
Expand All @@ -57,13 +59,15 @@ impl CompactionManager {
pub(crate) fn new(
scheduler: Scheduler,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
compaction_manager_queue_size: usize,
compaction_interval: Duration,
) -> Self {
CompactionManager {
system: None,
scheduler,
log,
sysdb,
dispatcher: None,
compaction_manager_queue_size,
compaction_interval,
Expand Down Expand Up @@ -96,6 +100,7 @@ impl CompactionManager {
system.clone(),
collection_uuid.unwrap(),
self.log.clone(),
self.sysdb.clone(),
dispatcher.clone(),
None,
);
Expand Down Expand Up @@ -196,6 +201,7 @@ impl Configurable<CompactionServiceConfig> for CompactionManager {
Ok(CompactionManager::new(
scheduler,
log,
sysdb,
compaction_manager_queue_size,
Duration::from_secs(compaction_interval_sec),
))
Expand Down Expand Up @@ -257,8 +263,6 @@ mod tests {
use crate::types::LogRecord;
use crate::types::Operation;
use crate::types::OperationRecord;
use std::str::FromStr;
use uuid::Uuid;

#[tokio::test]
async fn test_compaction_manager() {
Expand Down Expand Up @@ -362,6 +366,7 @@ mod tests {
let mut manager = CompactionManager::new(
scheduler,
log,
sysdb,
compaction_manager_queue_size,
compaction_interval,
);
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Scheduler {
last_compaction_time,
first_record_time: collection_info.first_log_ts,
offset: collection_info.first_log_offset,
collection_version: collection[0].version,
});
}
Err(e) => {
Expand Down Expand Up @@ -185,7 +186,6 @@ mod tests {
use crate::types::Operation;
use crate::types::OperationRecord;
use std::str::FromStr;
use uuid::Uuid;

#[tokio::test]
async fn test_scheduler() {
Expand Down
3 changes: 3 additions & 0 deletions rust/worker/src/compactor/scheduler_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl SchedulerPolicy for LasCompactionTimeSchedulerPolicy {
collection_id: collection.id.clone(),
tenant_id: collection.tenant_id.clone(),
offset: collection.offset,
collection_version: collection.collection_version,
});
}
tasks
Expand All @@ -67,13 +68,15 @@ mod tests {
last_compaction_time: 1,
first_record_time: 1,
offset: 0,
collection_version: 0,
},
CollectionRecord {
id: "test2".to_string(),
tenant_id: "test".to_string(),
last_compaction_time: 0,
first_record_time: 0,
offset: 0,
collection_version: 0,
},
];
let jobs = scheduler_policy.determine(collections.clone(), 1);
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/compactor/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub(crate) struct CompactionJob {
pub(crate) collection_id: String,
pub(crate) tenant_id: String,
pub(crate) offset: i64,
pub(crate) collection_version: i32,
}

#[derive(Clone, Debug)]
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/execution/data/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl<'a> Iterator for DataChunkIteraror<'a> {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::LogRecord;
use crate::types::Operation;
use crate::types::OperationRecord;

Expand Down
2 changes: 0 additions & 2 deletions rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::execution::data::data_chunk::DataChunk;
use crate::{distance::DistanceFunction, execution::operator::Operator};
use async_trait::async_trait;
use std::cmp::Ord;
use std::cmp::Ordering;
use std::cmp::PartialOrd;
use std::collections::BinaryHeap;

/// The brute force k-nearest neighbors operator is responsible for computing the k-nearest neighbors
Expand Down
200 changes: 200 additions & 0 deletions rust/worker/src/execution/operators/flush_sysdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use crate::execution::operator::Operator;
use crate::sysdb::sysdb::FlushCompactionError;
use crate::sysdb::sysdb::SysDb;
use crate::types::FlushCompactionResponse;
use crate::types::SegmentFlushInfo;
use async_trait::async_trait;
use std::sync::Arc;

/// The flush sysdb operator is responsible for flushing compaction data to the sysdb.
#[derive(Debug)]
pub struct FlushSysDbOperator {}

impl FlushSysDbOperator {
/// Create a new flush sysdb operator.
pub fn new() -> Box<Self> {
Box::new(FlushSysDbOperator {})
}
}

#[derive(Debug)]
/// The input for the flush sysdb operator.
/// This input is used to flush compaction data to the sysdb.
/// # Parameters
/// * `tenant` - The tenant id.
/// * `collection_id` - The collection id.
/// * `log_position` - The log position. Note that this is the log position for the last record that
/// was flushed to S3.
/// * `collection_version` - The collection version. This is the current collection version before
/// the flush operation. This version will be incremented by 1 after the flush operation. If the
/// collection version in sysdb is not the same as the current collection version, the flush operation
/// will fail.
/// * `segment_flush_info` - The segment flush info.
pub struct FlushSysDbInput {
tenant: String,
collection_id: String,
log_position: i64,
collection_version: i32,
segment_flush_info: Arc<[SegmentFlushInfo]>,
sysdb: Box<dyn SysDb>,
}

impl FlushSysDbInput {
/// Create a new flush sysdb input.
pub fn new(
tenant: String,
collection_id: String,
log_position: i64,
collection_version: i32,
segment_flush_info: Arc<[SegmentFlushInfo]>,
sysdb: Box<dyn SysDb>,
) -> Self {
FlushSysDbInput {
tenant,
collection_id,
log_position,
collection_version,
segment_flush_info,
sysdb,
}
}
}

/// The output for the flush sysdb operator.
/// # Parameters
/// * `result` - The result of the flush compaction operation.
#[derive(Debug)]
pub struct FlushSysDbOutput {
result: FlushCompactionResponse,
}

pub type FlushSysDbResult = Result<FlushSysDbOutput, FlushCompactionError>;

#[async_trait]
impl Operator<FlushSysDbInput, FlushSysDbOutput> for FlushSysDbOperator {
type Error = FlushCompactionError;

async fn run(&self, input: &FlushSysDbInput) -> FlushSysDbResult {
let mut sysdb = input.sysdb.clone();
let result = sysdb
.flush_compaction(
input.tenant.clone(),
input.collection_id.clone(),
input.log_position,
input.collection_version,
input.segment_flush_info.clone(),
)
.await;
match result {
Ok(response) => Ok(FlushSysDbOutput { result: response }),
Err(error) => Err(error),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::sysdb::test_sysdb::TestSysDb;
use crate::types::Collection;
use crate::types::Segment;
use crate::types::SegmentScope;
use crate::types::SegmentType;
use std::collections::HashMap;
use std::str::FromStr;
use uuid::Uuid;

#[tokio::test]
async fn test_flush_sysdb_operator() {
let mut sysdb = Box::new(TestSysDb::new());
let collection_version = 0;
let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
id: collection_uuid_1,
name: "collection_1".to_string(),
metadata: None,
dimension: Some(1),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
version: collection_version,
};

let collection_uuid_2 = Uuid::from_str("00000000-0000-0000-0000-000000000002").unwrap();
let tenant_2 = "tenant_2".to_string();
let collection_2 = Collection {
id: collection_uuid_2,
name: "collection_2".to_string(),
metadata: None,
dimension: Some(1),
tenant: tenant_2.clone(),
database: "database_2".to_string(),
log_position: 0,
version: collection_version,
};
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

let mut file_path_1 = HashMap::new();
file_path_1.insert("hnsw".to_string(), vec!["path_1".to_string()]);
let segment_id_1 = Uuid::from_str("00000000-0000-0000-0000-000000000003").unwrap();

let segment_1 = Segment {
id: segment_id_1.clone(),
r#type: SegmentType::HnswDistributed,
scope: SegmentScope::VECTOR,
collection: Some(collection_uuid_1),
metadata: None,
file_path: file_path_1.clone(),
};

let mut file_path_2 = HashMap::new();
file_path_2.insert("hnsw".to_string(), vec!["path_2".to_string()]);
let segment_id_2 = Uuid::from_str("00000000-0000-0000-0000-000000000004").unwrap();
let segment_2 = Segment {
id: segment_id_2.clone(),
r#type: SegmentType::HnswDistributed,
scope: SegmentScope::VECTOR,
collection: Some(collection_uuid_2),
metadata: None,
file_path: file_path_2.clone(),
};
sysdb.add_segment(segment_1);
sysdb.add_segment(segment_2);

let mut file_path_3 = HashMap::new();
file_path_3.insert("hnsw".to_string(), vec!["path_3".to_string()]);

let mut file_path_4 = HashMap::new();
file_path_4.insert("hnsw".to_string(), vec!["path_4".to_string()]);
let segment_flush_info = vec![
SegmentFlushInfo {
segment_id: segment_id_1.clone(),
file_paths: file_path_3.clone(),
},
SegmentFlushInfo {
segment_id: segment_id_2.clone(),
file_paths: file_path_4.clone(),
},
];

let log_position = 100;
let operator = FlushSysDbOperator::new();
let input = FlushSysDbInput::new(
tenant_1.clone(),
collection_uuid_1.to_string(),
log_position,
collection_version,
segment_flush_info.into(),
sysdb,
);

let result = operator.run(&input).await;

assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.result.collection_id, collection_uuid_1.to_string());
assert_eq!(result.result.collection_version, collection_version + 1);
}
}
1 change: 1 addition & 0 deletions rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(super) mod brute_force_knn;
pub(super) mod flush_sysdb;
pub(super) mod normalize_vectors;
pub(super) mod partition;
pub(super) mod pull_log;
1 change: 0 additions & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ mod tests {
use crate::types::Operation;
use crate::types::OperationRecord;
use std::str::FromStr;
use uuid::Uuid;

#[tokio::test]
async fn test_pull_logs() {
Expand Down
Loading

0 comments on commit 7e330ba

Please sign in to comment.