diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 7ac321cab0..ecf2d30e94 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -213,15 +213,7 @@ impl IVFIndex { None, None, )?; - let memory_limit = 4 * 1024 * 1024 * 1024; // 4GB TODO: customize this. - let shuffled = shuffle_dataset( - data, - column, - ivf, - pq_index.pq.num_sub_vectors(), - memory_limit, - ) - .await?; + let shuffled = shuffle_dataset(data, column, ivf, pq_index.pq.num_sub_vectors()).await?; let mut ivf_mut = Ivf::new(self.ivf.centroids.clone()); write_index_partitions(&mut writer, &mut ivf_mut, shuffled, Some(self)).await?; diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 9209482164..2b176801ff 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Field, Schema}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionContext; -use datafusion::execution::memory_pool::GreedyMemoryPool; +use datafusion::execution::memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::logical_expr::col; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -58,7 +58,6 @@ pub async fn shuffle_dataset( // TODO: Once the transformer can generate schema automatically, // we can remove `num_sub_vectors`. num_sub_vectors: usize, - memory_limit: usize, ) -> Result { let column: Arc = column.into(); let stream = data @@ -96,8 +95,27 @@ pub async fn shuffle_dataset( info!("Building IVF shuffler"); - let runtime_config = - RuntimeConfig::new().with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))); + let memory_limit = if let Ok(memory_limit) = std::env::var("LANCE_MEMORY_LIMIT") { + match memory_limit.parse::() { + Ok(memory_limit) => Some(memory_limit), + Err(err) => { + log::error!( + "Failed to parse LANCE_MEMORY_LIMIT: {}, using default of unbounded.", + err + ); + None + } + } + } else { + None + }; + + let memory_pool: Arc = if let Some(memory_limit) = memory_limit { + Arc::new(GreedyMemoryPool::new(memory_limit)) + } else { + Arc::new(UnboundedMemoryPool::default()) + }; + let runtime_config = RuntimeConfig::new().with_memory_pool(memory_pool); let runtime_env = RuntimeEnv::new(runtime_config)?; let context = SessionContext::new_with_config_rt(Default::default(), Arc::new(runtime_env)); @@ -147,10 +165,7 @@ pub(super) async fn build_partitions( precomputed_partitons, )?; - let memory_limit = 4 * 1024 * 1024 * 1024; // 4GB - - let shuffled = - shuffle_dataset(data, column, ivf_model, pq.num_sub_vectors(), memory_limit).await?; + let shuffled = shuffle_dataset(data, column, ivf_model, pq.num_sub_vectors()).await?; write_index_partitions(writer, ivf, shuffled, None).await?;