Skip to content

Commit

Permalink
feat: support delta merge for IVF_HNSW_SQ (#2132)
Browse files Browse the repository at this point in the history
Signed-off-by: BubbleCal <[email protected]>
  • Loading branch information
BubbleCal authored Apr 25, 2024
1 parent 444328e commit 6d1364b
Show file tree
Hide file tree
Showing 16 changed files with 499 additions and 70 deletions.
8 changes: 2 additions & 6 deletions python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,8 @@ pub fn build_sq_storage(

let lower_bound = bounds.get_item(0)?.extract::<f64>()?;
let upper_bound = bounds.get_item(1)?.extract::<f64>()?;
let quantizer = lance_index::vector::sq::ScalarQuantizer::with_bounds(
8,
dim,
MetricType::L2,
lower_bound..upper_bound,
);
let quantizer =
lance_index::vector::sq::ScalarQuantizer::with_bounds(8, dim, lower_bound..upper_bound);
let storage = sq::build_sq_storage(MetricType::L2, row_ids, vectors, quantizer)
.map_err(|e| PyIOError::new_err(e.to_string()))?;
storage.batch().clone().to_pyarrow(py)
Expand Down
23 changes: 10 additions & 13 deletions rust/lance-index/src/vector/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use snafu::{location, Location};

use self::builder::HNSW_METADATA_KEY;
use self::builder::{HnswBuildParams, HNSW_METADATA_KEY};

use super::graph::memory::InMemoryVectorStorage;
use super::graph::OrderedNode;
Expand Down Expand Up @@ -160,12 +160,8 @@ impl Graph for HnswLevel {
pub struct HNSW {
levels: Vec<HnswLevel>,
distance_type: MetricType,
/// Entry point of the graph.
entry_point: u32,

#[allow(dead_code)]
/// Whether to use the heuristic to select neighbors (Algorithm 4 or 3 in the paper).
use_select_heuristic: bool,
params: HnswBuildParams,
}

impl Debug for HNSW {
Expand All @@ -181,8 +177,9 @@ impl Debug for HNSW {

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct HnswMetadata {
entry_point: u32,
level_offsets: Vec<usize>,
pub entry_point: u32,
pub params: HnswBuildParams,
pub level_offsets: Vec<usize>,
}

impl HNSW {
Expand All @@ -191,7 +188,7 @@ impl HNSW {
levels: vec![],
distance_type: MetricType::L2,
entry_point: 0,
use_select_heuristic: true,
params: HnswBuildParams::default(),
}
}

Expand Down Expand Up @@ -281,15 +278,15 @@ impl HNSW {
levels,
distance_type: metric_type,
entry_point: metadata.entry_point,
use_select_heuristic: true,
params: metadata.params,
})
}

fn from_builder(
builder: &HNSWBuilder,
entry_point: u32,
metric_type: MetricType,
use_select_heuristic: bool,
params: HnswBuildParams,
) -> Self {
let mut levels = Vec::with_capacity(builder.num_levels());
for level in 0..builder.num_levels() {
Expand Down Expand Up @@ -330,7 +327,7 @@ impl HNSW {
levels,
distance_type: metric_type,
entry_point,
use_select_heuristic,
params,
}
}

Expand Down Expand Up @@ -390,6 +387,7 @@ impl HNSW {

HnswMetadata {
entry_point: self.entry_point,
params: self.params.clone(),
level_offsets,
}
}
Expand Down Expand Up @@ -519,7 +517,6 @@ mod tests {
use arrow_array::types::Float32Type;
use lance_linalg::matrix::MatrixView;
use lance_testing::datagen::generate_random_array;
use tests::builder::HnswBuildParams;

#[test]
fn test_select_neighbors() {
Expand Down
7 changes: 4 additions & 3 deletions rust/lance-index/src/vector/hnsw/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use itertools::Itertools;
use lance_core::utils::tokio::spawn_cpu;
use lance_core::Result;
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};

use super::super::graph::beam_search;
use super::{select_neighbors, select_neighbors_heuristic, HNSW};
Expand All @@ -22,7 +23,7 @@ use crate::vector::graph::{Graph, OrderedFloat, OrderedNode};
pub const HNSW_METADATA_KEY: &str = "lance:hnsw";

/// Parameters of building HNSW index
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HnswBuildParams {
/// max level ofm
pub max_level: u16,
Expand Down Expand Up @@ -157,7 +158,7 @@ impl HNSWBuilder {
self,
self.inner.entry_point,
self.inner.vectors.metric_type(),
self.inner.params.use_select_heuristic,
self.inner.params.clone(),
));
}

Expand Down Expand Up @@ -185,7 +186,7 @@ impl HNSWBuilder {
self,
self.inner.entry_point,
self.inner.vectors.metric_type(),
self.inner.params.use_select_heuristic,
self.inner.params.clone(),
))
}
}
Expand Down
81 changes: 79 additions & 2 deletions rust/lance-index/src/vector/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@

use std::sync::Arc;

use arrow_array::FixedSizeListArray;
use arrow::datatypes::Float32Type;
use arrow_array::{FixedSizeListArray, Float32Array};
use async_trait::async_trait;
use lance_arrow::ArrowFloatType;
use lance_core::{Error, Result};
use lance_file::reader::FileReader;
use lance_io::traits::Reader;
use lance_linalg::distance::{Dot, MetricType, L2};
use lance_linalg::distance::{DistanceType, Dot, MetricType, L2};
use lance_table::format::SelfDescribingFileReader;
use snafu::{location, Location};

Expand Down Expand Up @@ -41,6 +42,7 @@ pub trait Quantization {
fn metadata_key(&self) -> &'static str;
fn quantization_type(&self) -> QuantizationType;
fn metadata(&self, _: Option<QuantizationMetadata>) -> Result<serde_json::Value>;
fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer>;
}

pub enum QuantizationType {
Expand Down Expand Up @@ -100,6 +102,18 @@ impl Quantizer {
}
}

impl From<Arc<dyn ProductQuantizer>> for Quantizer {
fn from(pq: Arc<dyn ProductQuantizer>) -> Self {
Self::Product(pq)
}
}

impl From<ScalarQuantizer> for Quantizer {
fn from(sq: ScalarQuantizer) -> Self {
Self::Scalar(sq)
}
}

#[derive(Debug, Clone, Default)]
pub struct QuantizationMetadata {
// For PQ
Expand Down Expand Up @@ -146,10 +160,19 @@ impl Quantization for ScalarQuantizer {

fn metadata(&self, _: Option<QuantizationMetadata>) -> Result<serde_json::Value> {
Ok(serde_json::to_value(ScalarQuantizationMetadata {
dim: self.dim,
num_bits: self.num_bits(),
bounds: self.bounds(),
})?)
}

fn from_metadata(metadata: &Self::Metadata, _: DistanceType) -> Result<Quantizer> {
Ok(Quantizer::Scalar(Self::with_bounds(
metadata.num_bits,
metadata.dim,
metadata.bounds.clone(),
)))
}
}

impl Quantization for dyn ProductQuantizer {
Expand Down Expand Up @@ -187,6 +210,28 @@ impl Quantization for dyn ProductQuantizer {
codebook: args.codebook,
})?)
}

fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer> {
Ok(Quantizer::Product(Arc::new(ProductQuantizerImpl::<
Float32Type,
>::new(
metadata.num_sub_vectors,
metadata.num_bits,
metadata.dimension,
Arc::new(
metadata
.codebook
.as_ref()
.unwrap()
.values()
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.clone(),
),
distance_type,
))))
}
}

impl<T: ArrowFloatType + Dot + L2 + 'static> Quantization for ProductQuantizerImpl<T> {
Expand Down Expand Up @@ -224,13 +269,34 @@ impl<T: ArrowFloatType + Dot + L2 + 'static> Quantization for ProductQuantizerIm
codebook: args.codebook,
})?)
}

fn from_metadata(metadata: &Self::Metadata, distance_type: DistanceType) -> Result<Quantizer> {
Ok(Quantizer::Product(Arc::new(Self::new(
metadata.num_sub_vectors,
metadata.num_bits,
metadata.dimension,
Arc::new(
metadata
.codebook
.as_ref()
.unwrap()
.values()
.as_any()
.downcast_ref::<T::ArrayType>()
.unwrap()
.clone(),
),
distance_type,
))))
}
}

/// Loader to load partitioned PQ storage from disk.
pub struct IvfQuantizationStorage<Q: Quantization> {
reader: FileReader,

metric_type: MetricType,
quantizer: Quantizer,
metadata: Q::Metadata,

ivf: IvfData,
Expand All @@ -241,6 +307,7 @@ impl<Q: Quantization> Clone for IvfQuantizationStorage<Q> {
Self {
reader: self.reader.clone(),
metric_type: self.metric_type,
quantizer: self.quantizer.clone(),
metadata: self.metadata.clone(),
ivf: self.ivf.clone(),
}
Expand Down Expand Up @@ -276,14 +343,24 @@ impl<Q: Quantization> IvfQuantizationStorage<Q> {
let ivf_data = IvfData::load(&reader).await?;

let metadata = Q::Metadata::load(&reader).await?;
let quantizer = Q::from_metadata(&metadata, metric_type)?;
Ok(Self {
reader,
metric_type,
quantizer,
metadata,
ivf: ivf_data,
})
}

pub fn quantizer(&self) -> &Quantizer {
&self.quantizer
}

pub fn metadata(&self) -> &Q::Metadata {
&self.metadata
}

/// Get the number of partitions in the storage.
pub fn num_partitions(&self) -> usize {
self.ivf.num_partitions()
Expand Down
22 changes: 6 additions & 16 deletions rust/lance-index/src/vector/sq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use arrow_array::{Array, ArrayRef, FixedSizeListArray, UInt8Array};
use itertools::Itertools;
use lance_arrow::*;
use lance_core::{Error, Result};
use lance_linalg::distance::MetricType;
use num_traits::*;
use snafu::{location, Location};

Expand All @@ -30,32 +29,23 @@ pub struct ScalarQuantizer {
/// Original dimension of the vectors.
pub dim: usize,

/// Distance type.
pub metric_type: MetricType,

pub bounds: Range<f64>,
}

impl ScalarQuantizer {
pub fn new(num_bits: u16, dim: usize, metric_type: MetricType) -> Self {
pub fn new(num_bits: u16, dim: usize) -> Self {
Self {
num_bits,
dim,
metric_type,
bounds: Range::<f64> {
start: f64::MAX,
end: f64::MIN,
},
}
}

pub fn with_bounds(
num_bits: u16,
dim: usize,
metric_type: MetricType,
bounds: Range<f64>,
) -> Self {
let mut sq = Self::new(num_bits, dim, metric_type);
pub fn with_bounds(num_bits: u16, dim: usize, bounds: Range<f64>) -> Self {
let mut sq = Self::new(num_bits, dim);
sq.bounds = bounds;
sq
}
Expand Down Expand Up @@ -163,7 +153,7 @@ mod tests {
let vectors =
FixedSizeListArray::try_new_from_values(float_array, float_values.len() as i32)
.unwrap();
let mut sq = ScalarQuantizer::new(8, float_values.len(), MetricType::L2);
let mut sq = ScalarQuantizer::new(8, float_values.len());

sq.update_bounds::<Float16Type>(&vectors).unwrap();
assert_eq!(sq.bounds.start, float_values[0].to_f64());
Expand Down Expand Up @@ -192,7 +182,7 @@ mod tests {
let vectors =
FixedSizeListArray::try_new_from_values(float_array, float_values.len() as i32)
.unwrap();
let mut sq = ScalarQuantizer::new(8, float_values.len(), MetricType::L2);
let mut sq = ScalarQuantizer::new(8, float_values.len());

sq.update_bounds::<Float32Type>(&vectors).unwrap();
assert_eq!(sq.bounds.start, float_values[0].to_f64().unwrap());
Expand Down Expand Up @@ -221,7 +211,7 @@ mod tests {
let vectors =
FixedSizeListArray::try_new_from_values(float_array, float_values.len() as i32)
.unwrap();
let mut sq = ScalarQuantizer::new(8, float_values.len(), MetricType::L2);
let mut sq = ScalarQuantizer::new(8, float_values.len());

sq.update_bounds::<Float64Type>(&vectors).unwrap();
assert_eq!(sq.bounds.start, float_values[0]);
Expand Down
5 changes: 2 additions & 3 deletions rust/lance-index/src/vector/sq/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Default for SQBuildParams {
}

impl SQBuildParams {
pub fn build(&self, data: &dyn Array, metric_type: MetricType) -> Result<ScalarQuantizer> {
pub fn build(&self, data: &dyn Array, _: MetricType) -> Result<ScalarQuantizer> {
let fsl = data.as_fixed_size_list_opt().ok_or(Error::Index {
message: format!(
"SQ builder: input is not a FixedSizeList: {}",
Expand All @@ -42,8 +42,7 @@ impl SQBuildParams {
location: location!(),
})?;

let mut quantizer =
ScalarQuantizer::new(self.num_bits, fsl.value_length() as usize, metric_type);
let mut quantizer = ScalarQuantizer::new(self.num_bits, fsl.value_length() as usize);

match fsl.value_type() {
DataType::Float16 => {
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/src/vector/sq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub const SQ_METADATA_KEY: &str = "lance:sq";

#[derive(Clone, Serialize, Deserialize)]
pub struct ScalarQuantizationMetadata {
pub dim: usize,
pub num_bits: u16,
pub bounds: Range<f64>,
}
Expand Down
Loading

0 comments on commit 6d1364b

Please sign in to comment.