From f0d61bd5c4eb3f18fac65773c4379f99e9b62749 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 13 Jun 2023 20:08:00 -0700 Subject: [PATCH 1/6] process z-order in chunks --- rust/src/operations/optimize.rs | 55 +++++++++++++++++---------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index a49c6a9940..a17bc4262b 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -498,10 +498,8 @@ impl MergePlan { .collect_vec(); // Interleave the batches - let out_batches = util::interleave_batches(batches, Arc::new(indices), false).await?; - - Ok(futures::stream::once(futures::future::ready(out_batches)) - .map(Ok) + Ok(util::interleave_batches(batches, indices, 10_000) + .map_err(|err| ParquetError::General(format!("Failed to reorder data: {:?}", err))) .boxed()) } @@ -861,39 +859,44 @@ fn build_zorder_plan( pub(super) mod util { use super::*; - use arrow_array::ArrayRef; use arrow_select::interleave::interleave; use futures::Future; use itertools::Itertools; use tokio::task::JoinError; /// Interleaves a vector of record batches based on a set of indices - pub async fn interleave_batches( + pub fn interleave_batches( batches: Vec, - indices: Arc>, - use_threads: bool, - ) -> Result { - // It would be nice if upstream provided this. Though TBH they would - // probably prefer we just use DataFusion to sort. - let columns: Vec = futures::stream::iter(0..batches[0].num_columns()) - .map(|col_i| { - let arrays: Vec = batches + indices: Vec<(usize, usize)>, + batch_size: usize, + ) -> BoxStream<'static, Result> { + if batches.is_empty() { + return futures::stream::empty().boxed(); + } + let num_columns = batches[0].num_columns(); + let schema = batches[0].schema(); + let arrays = (0..num_columns) + .map(move |col_i| { + batches .iter() .map(|batch| batch.column(col_i).clone()) - .collect_vec(); - let indices = indices.clone(); - let task = tokio::task::spawn_blocking(move || { - let arrays = arrays.iter().map(|arr| arr.as_ref()).collect_vec(); - interleave(&arrays, &indices) - }); - - flatten_join_error(task) + .collect_vec() }) - .buffered(if use_threads { num_cpus::get() } else { 1 }) - .try_collect::>() - .await?; + .collect_vec(); - Ok(RecordBatch::try_new(batches[0].schema(), columns)?) + futures::stream::iter(indices) + .chunks(batch_size) + .map(move |chunk| { + let columns = arrays + .iter() + .map(|array_chunks| { + let array_refs = array_chunks.iter().map(|arr| arr.as_ref()).collect_vec(); + interleave(&array_refs, &chunk) + }) + .collect::, ArrowError>>()?; + Ok(RecordBatch::try_new(schema.clone(), columns)?) + }) + .boxed() } pub async fn flatten_join_error( From 31e50f0432a63b57516acf914731c1be0ea2d587 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 13 Jun 2023 21:43:05 -0700 Subject: [PATCH 2/6] add datafusion-based z-order --- rust/src/operations/optimize.rs | 134 ++++++++++++++++++++++++++++++-- 1 file changed, 129 insertions(+), 5 deletions(-) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index a17bc4262b..58999ffb1a 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -25,9 +25,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use arrow_array::cast::as_generic_binary_array; -use arrow_array::{ArrayRef, RecordBatch}; -use arrow_schema::ArrowError; +use arrow_array::RecordBatch; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{Future, StreamExt, TryStreamExt}; @@ -444,11 +442,17 @@ impl MergePlan { /// Currently requires loading all the data into memory. This is run for each /// partition, so it is not a problem for tables where each partition is small. /// But for large unpartitioned tables, this could be a problem. + #[cfg(not(feature = "datafusion"))] async fn read_zorder( columns: Arc>, files: MergeBin, object_store: ObjectStoreRef, + _use_threads: bool, ) -> Result>, DeltaTableError> { + use arrow_array::cast::as_generic_binary_array; + use arrow_array::ArrayRef; + use arrow_schema::ArrowError; + let object_store_ref = object_store.clone(); // Read all batches into a vec let batches: Vec = futures::stream::iter(files.clone()) @@ -503,6 +507,71 @@ impl MergePlan { .boxed()) } + /// Datafusion-based z-order read. + #[cfg(feature = "datafusion")] + async fn read_zorder( + columns: Arc>, + files: MergeBin, + object_store: ObjectStoreRef, + _use_threads: bool, + ) -> Result>, DeltaTableError> { + use datafusion::execution::memory_pool::FairSpillPool; + use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + // TODO: usethreads + use datafusion::prelude::{col, ParquetReadOptions, SessionConfig}; + + // TODO: make this configurable. + // TODO: push this up and share between bins or maybe an overall runtime. + let memory_pool = FairSpillPool::new(8 * 1024 * 1024 * 1024); + let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); + let runtime = Arc::new(RuntimeEnv::new(config)?); + runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); + + use datafusion::prelude::SessionContext; + use datafusion_expr::expr::ScalarUDF; + use datafusion_expr::Expr; + use url::Url; + let ctx = SessionContext::with_config_rt(SessionConfig::default(), runtime); + ctx.register_udf(zorder::datafusion::zorder_key_udf()); + + let locations = files + .iter() + .map(|file| format!("delta-rs:///{}", file.location)) + .collect_vec(); + let df = ctx + .read_parquet(locations, ParquetReadOptions::default()) + .await?; + + let original_columns = df + .schema() + .fields() + .iter() + .map(|f| col(f.name())) + .collect_vec(); + + // Add a temporary z-order column we will sort by, and then drop. + const ZORDER_KEY_COLUMN: &str = "__zorder_key"; + let cols = columns.iter().map(col).collect_vec(); + let expr = Expr::ScalarUDF(ScalarUDF::new( + Arc::new(zorder::datafusion::zorder_key_udf()), + cols, + )); + let df = df.with_column(ZORDER_KEY_COLUMN, expr)?; + + let df = df.sort(vec![col(ZORDER_KEY_COLUMN).sort(true, true)])?; + let df = df.select(original_columns)?; + + let stream = df + .execute_stream() + .await? + .map_err(|err| { + ParquetError::General(format!("Z-order failed while scanning data: {:?}", err)) + }) + .boxed(); + + Ok(stream) + } + /// Perform the operations outlined in the plan. pub async fn execute( mut self, @@ -558,12 +627,18 @@ impl MergePlan { } OptimizeOperations::ZOrder(zorder_columns, bins) => { let zorder_columns = Arc::new(zorder_columns); + // If there aren't enough bins to use all threads, then instead + // use threads within the bins. This is important for the case where + // the table is un-partitioned, in which case the entire table is just + // one big bin. + let use_threads_within_bin = bins.len() <= num_cpus::get(); futures::stream::iter(bins) .map(|(partition, files)| { let batch_stream = Self::read_zorder( zorder_columns.clone(), files.clone(), object_store.clone(), + use_threads_within_bin, ); let object_store = object_store.clone(); @@ -859,17 +934,19 @@ fn build_zorder_plan( pub(super) mod util { use super::*; - use arrow_select::interleave::interleave; use futures::Future; - use itertools::Itertools; use tokio::task::JoinError; /// Interleaves a vector of record batches based on a set of indices + #[cfg(not(feature = "datafusion"))] pub fn interleave_batches( batches: Vec, indices: Vec<(usize, usize)>, batch_size: usize, ) -> BoxStream<'static, Result> { + use arrow_schema::ArrowError; + use arrow_select::interleave::interleave; + if batches.is_empty() { return futures::stream::empty().boxed(); } @@ -924,6 +1001,53 @@ pub(super) mod zorder { use arrow_buffer::bit_util::{get_bit_raw, set_bit_raw, unset_bit_raw}; use arrow_row::{Row, RowConverter, SortField}; use arrow_schema::ArrowError; + #[cfg(feature = "datafusion")] + pub(super) mod datafusion { + use arrow_schema::DataType; + use datafusion_common::DataFusionError; + use datafusion_expr::{ColumnarValue, ScalarUDF, Signature, TypeSignature, Volatility}; + use itertools::Itertools; + + use super::*; + + pub const ZORDER_UDF_NAME: &str = "zorder_key"; + + /// Get the DataFusion UDF struct for zorder_key + pub fn zorder_key_udf() -> ScalarUDF { + let signature = Signature { + type_signature: TypeSignature::VariadicAny, + volatility: Volatility::Immutable, + }; + ScalarUDF { + name: ZORDER_UDF_NAME.to_string(), + signature, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Binary))), + fun: Arc::new(zorder_key_datafusion), + } + } + + /// Datafusion zorder UDF body + pub fn zorder_key_datafusion( + columns: &[ColumnarValue], + ) -> Result { + let length = columns + .iter() + .map(|col| match col { + ColumnarValue::Array(array) => array.len(), + ColumnarValue::Scalar(_) => 1, + }) + .max() + .ok_or(DataFusionError::NotImplemented( + "z-order on zero columns.".to_string(), + ))?; + let columns = columns + .iter() + .map(|col| col.clone().into_array(length)) + .collect_vec(); + let array = zorder_key(&columns)?; + Ok(ColumnarValue::Array(array)) + } + } /// Creates a new binary array containing the zorder keys for the given columns /// From 9c50ba4e787bf9b85acbab6eb904f29a4d4b7a19 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 15 Jun 2023 22:34:12 -0700 Subject: [PATCH 3/6] parallel z-order, even if only one partition --- rust/src/operations/optimize.rs | 72 ++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 58999ffb1a..3c5765a5af 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -447,7 +447,7 @@ impl MergePlan { columns: Arc>, files: MergeBin, object_store: ObjectStoreRef, - _use_threads: bool, + use_threads: bool, ) -> Result>, DeltaTableError> { use arrow_array::cast::as_generic_binary_array; use arrow_array::ArrayRef; @@ -502,9 +502,12 @@ impl MergePlan { .collect_vec(); // Interleave the batches - Ok(util::interleave_batches(batches, indices, 10_000) - .map_err(|err| ParquetError::General(format!("Failed to reorder data: {:?}", err))) - .boxed()) + Ok( + util::interleave_batches(batches, indices, 10_000, use_threads) + .await + .map_err(|err| ParquetError::General(format!("Failed to reorder data: {:?}", err))) + .boxed(), + ) } /// Datafusion-based z-order read. @@ -939,13 +942,15 @@ pub(super) mod util { /// Interleaves a vector of record batches based on a set of indices #[cfg(not(feature = "datafusion"))] - pub fn interleave_batches( + pub async fn interleave_batches( batches: Vec, indices: Vec<(usize, usize)>, batch_size: usize, + use_threads: bool, ) -> BoxStream<'static, Result> { - use arrow_schema::ArrowError; + use arrow_array::ArrayRef; use arrow_select::interleave::interleave; + use futures::TryFutureExt; if batches.is_empty() { return futures::stream::empty().boxed(); @@ -954,25 +959,52 @@ pub(super) mod util { let schema = batches[0].schema(); let arrays = (0..num_columns) .map(move |col_i| { - batches - .iter() - .map(|batch| batch.column(col_i).clone()) - .collect_vec() + Arc::new( + batches + .iter() + .map(|batch| batch.column(col_i).clone()) + .collect_vec(), + ) }) .collect_vec(); + let arrays = Arc::new(arrays); + + fn interleave_task( + array_chunks: Arc>, + indices: Arc>, + ) -> impl Future> + Send + 'static { + let fut = tokio::task::spawn_blocking(move || { + let array_refs = array_chunks.iter().map(|arr| arr.as_ref()).collect_vec(); + interleave(&array_refs, &indices) + }); + flatten_join_error(fut) + } + + fn interleave_batch( + arrays: Arc>>>, + chunk: Vec<(usize, usize)>, + schema: ArrowSchemaRef, + use_threads: bool, + ) -> impl Future> + Send + 'static { + let num_threads = if use_threads { num_cpus::get() } else { 1 }; + let chunk = Arc::new(chunk); + futures::stream::iter(0..arrays.len()) + .map(move |i| arrays[i].clone()) + .map(move |array_chunks| interleave_task(array_chunks.clone(), chunk.clone())) + .buffered(num_threads) + .try_collect::>() + .and_then(move |columns| { + futures::future::ready( + RecordBatch::try_new(schema.clone(), columns) + .map_err(|err| DeltaTableError::from(err)), + ) + }) + } futures::stream::iter(indices) .chunks(batch_size) - .map(move |chunk| { - let columns = arrays - .iter() - .map(|array_chunks| { - let array_refs = array_chunks.iter().map(|arr| arr.as_ref()).collect_vec(); - interleave(&array_refs, &chunk) - }) - .collect::, ArrowError>>()?; - Ok(RecordBatch::try_new(schema.clone(), columns)?) - }) + .map(move |chunk| interleave_batch(arrays.clone(), chunk, schema.clone(), use_threads)) + .buffered(2) .boxed() } From cc21ec11e155b2f96130bc62fe3892135edf8559 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 18 Jun 2023 13:47:32 -0700 Subject: [PATCH 4/6] Add splill parameter --- rust/src/operations/optimize.rs | 153 ++++++++++++++++++++++---------- rust/tests/command_optimize.rs | 2 + 2 files changed, 110 insertions(+), 45 deletions(-) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 3c5765a5af..939cc119b9 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -168,6 +168,8 @@ pub struct OptimizeBuilder<'a> { preserve_insertion_order: bool, /// Max number of concurrent tasks (default is number of cpus) max_concurrent_tasks: usize, + /// Maximum number of bytes that are allowed to spill to disk + max_spill_size: usize, /// Optimize type optimize_type: OptimizeType, } @@ -184,6 +186,7 @@ impl<'a> OptimizeBuilder<'a> { app_metadata: None, preserve_insertion_order: false, max_concurrent_tasks: num_cpus::get(), + max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB. optimize_type: OptimizeType::Compact, } } @@ -232,6 +235,12 @@ impl<'a> OptimizeBuilder<'a> { self.max_concurrent_tasks = max_concurrent_tasks; self } + + /// Max spill size + pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self { + self.max_spill_size = max_spill_size; + self + } } impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { @@ -255,6 +264,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.target_size.to_owned(), writer_properties, this.max_concurrent_tasks, + this.max_spill_size, )?; let metrics = plan.execute(this.store.clone(), &this.snapshot).await?; let mut table = DeltaTable::new_with_state(this.store, this.snapshot); @@ -333,6 +343,9 @@ pub struct MergePlan { /// Whether to preserve insertion order within files /// Max number of concurrent tasks max_concurrent_tasks: usize, + #[allow(dead_code)] + /// Maximum number of bytes that are allowed to spill to disk + max_spill_size: usize, } /// Parameters passed to individual merge tasks @@ -444,16 +457,14 @@ impl MergePlan { /// But for large unpartitioned tables, this could be a problem. #[cfg(not(feature = "datafusion"))] async fn read_zorder( - columns: Arc>, files: MergeBin, - object_store: ObjectStoreRef, - use_threads: bool, + context: Arc, ) -> Result>, DeltaTableError> { use arrow_array::cast::as_generic_binary_array; use arrow_array::ArrayRef; use arrow_schema::ArrowError; - let object_store_ref = object_store.clone(); + let object_store_ref = context.object_store.clone(); // Read all batches into a vec let batches: Vec = futures::stream::iter(files.clone()) .then(|file| { @@ -475,7 +486,7 @@ impl MergePlan { .iter() .map(|batch| { let mut zorder_columns = Vec::new(); - for column in columns.iter() { + for column in context.columns.iter() { let array = batch.column_by_name(column).ok_or(ArrowError::SchemaError( format!("Column not found in data file: {column}"), ))?; @@ -503,7 +514,7 @@ impl MergePlan { // Interleave the batches Ok( - util::interleave_batches(batches, indices, 10_000, use_threads) + util::interleave_batches(batches, indices, 10_000, context.use_inner_threads) .await .map_err(|err| ParquetError::General(format!("Failed to reorder data: {:?}", err))) .boxed(), @@ -513,35 +524,19 @@ impl MergePlan { /// Datafusion-based z-order read. #[cfg(feature = "datafusion")] async fn read_zorder( - columns: Arc>, files: MergeBin, - object_store: ObjectStoreRef, - _use_threads: bool, + context: Arc, ) -> Result>, DeltaTableError> { - use datafusion::execution::memory_pool::FairSpillPool; - use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; - // TODO: usethreads - use datafusion::prelude::{col, ParquetReadOptions, SessionConfig}; - - // TODO: make this configurable. - // TODO: push this up and share between bins or maybe an overall runtime. - let memory_pool = FairSpillPool::new(8 * 1024 * 1024 * 1024); - let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); - let runtime = Arc::new(RuntimeEnv::new(config)?); - runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); - - use datafusion::prelude::SessionContext; + use datafusion::prelude::{col, ParquetReadOptions}; use datafusion_expr::expr::ScalarUDF; use datafusion_expr::Expr; - use url::Url; - let ctx = SessionContext::with_config_rt(SessionConfig::default(), runtime); - ctx.register_udf(zorder::datafusion::zorder_key_udf()); let locations = files .iter() .map(|file| format!("delta-rs:///{}", file.location)) .collect_vec(); - let df = ctx + let df = context + .ctx .read_parquet(locations, ParquetReadOptions::default()) .await?; @@ -554,7 +549,7 @@ impl MergePlan { // Add a temporary z-order column we will sort by, and then drop. const ZORDER_KEY_COLUMN: &str = "__zorder_key"; - let cols = columns.iter().map(col).collect_vec(); + let cols = context.columns.iter().map(col).collect_vec(); let expr = Expr::ScalarUDF(ScalarUDF::new( Arc::new(zorder::datafusion::zorder_key_udf()), cols, @@ -629,20 +624,25 @@ impl MergePlan { .await?; } OptimizeOperations::ZOrder(zorder_columns, bins) => { - let zorder_columns = Arc::new(zorder_columns); - // If there aren't enough bins to use all threads, then instead - // use threads within the bins. This is important for the case where - // the table is un-partitioned, in which case the entire table is just - // one big bin. - let use_threads_within_bin = bins.len() <= num_cpus::get(); + #[cfg(not(feature = "datafusion"))] + let exec_context = Arc::new(zorder::ZOrderExecContext::new( + zorder_columns, + object_store.clone(), + // If there aren't enough bins to use all threads, then instead + // use threads within the bins. This is important for the case where + // the table is un-partitioned, in which case the entire table is just + // one big bin. + bins.len() <= num_cpus::get(), + )); + #[cfg(feature = "datafusion")] + let exec_context = Arc::new(zorder::ZOrderExecContext::new( + zorder_columns, + object_store.clone(), + self.max_spill_size, + )?); futures::stream::iter(bins) .map(|(partition, files)| { - let batch_stream = Self::read_zorder( - zorder_columns.clone(), - files.clone(), - object_store.clone(), - use_threads_within_bin, - ); + let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); let object_store = object_store.clone(); @@ -727,6 +727,7 @@ pub fn create_merge_plan( target_size: Option, writer_properties: WriterProperties, max_concurrent_tasks: usize, + max_spill_size: usize, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); @@ -766,6 +767,7 @@ pub fn create_merge_plan( }), read_table_version: snapshot.version(), max_concurrent_tasks, + max_spill_size, }) } @@ -995,8 +997,7 @@ pub(super) mod util { .try_collect::>() .and_then(move |columns| { futures::future::ready( - RecordBatch::try_new(schema.clone(), columns) - .map_err(|err| DeltaTableError::from(err)), + RecordBatch::try_new(schema, columns).map_err(DeltaTableError::from), ) }) } @@ -1026,24 +1027,86 @@ pub(super) mod util { /// Z-order utilities pub(super) mod zorder { - use std::sync::Arc; + use super::*; use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; use arrow_array::{Array, ArrayRef, BinaryArray}; use arrow_buffer::bit_util::{get_bit_raw, set_bit_raw, unset_bit_raw}; use arrow_row::{Row, RowConverter, SortField}; use arrow_schema::ArrowError; + + /// Execution context for Z-order scan + #[cfg(not(feature = "datafusion"))] + pub struct ZOrderExecContext { + /// Columns to z-order by + pub columns: Arc<[String]>, + /// Object store to use for reading files + pub object_store: ObjectStoreRef, + /// Whether to use threads when interleaving batches + pub use_inner_threads: bool, + } + + #[cfg(not(feature = "datafusion"))] + impl ZOrderExecContext { + pub fn new( + columns: Vec, + object_store: ObjectStoreRef, + use_inner_threads: bool, + ) -> Self { + let columns = columns.into(); + Self { + columns, + object_store, + use_inner_threads, + } + } + } + + #[cfg(feature = "datafusion")] + pub use self::datafusion::ZOrderExecContext; + #[cfg(feature = "datafusion")] pub(super) mod datafusion { + use super::*; + use ::datafusion::{ + execution::{ + memory_pool::FairSpillPool, + runtime_env::{RuntimeConfig, RuntimeEnv}, + }, + prelude::{SessionConfig, SessionContext}, + }; use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_expr::{ColumnarValue, ScalarUDF, Signature, TypeSignature, Volatility}; use itertools::Itertools; - use super::*; - pub const ZORDER_UDF_NAME: &str = "zorder_key"; + pub struct ZOrderExecContext { + pub columns: Arc<[String]>, + pub ctx: SessionContext, + } + + impl ZOrderExecContext { + pub fn new( + columns: Vec, + object_store: ObjectStoreRef, + max_spill_size: usize, + ) -> Result { + let columns = columns.into(); + + let memory_pool = FairSpillPool::new(max_spill_size); + let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); + let runtime = Arc::new(RuntimeEnv::new(config)?); + runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); + + use url::Url; + let ctx = SessionContext::with_config_rt(SessionConfig::default(), runtime); + ctx.register_udf(datafusion::zorder_key_udf()); + Ok(Self { columns, ctx }) + } + } + /// Get the DataFusion UDF struct for zorder_key pub fn zorder_key_udf() -> ScalarUDF { let signature = Signature { @@ -1059,7 +1122,7 @@ pub(super) mod zorder { } /// Datafusion zorder UDF body - pub fn zorder_key_datafusion( + fn zorder_key_datafusion( columns: &[ColumnarValue], ) -> Result { let length = columns diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 68eee17d07..98984bab0e 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -277,6 +277,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { None, WriterProperties::builder().build(), 1, + 20, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -340,6 +341,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { None, WriterProperties::builder().build(), 1, + 20, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); From f3e37ddd1939a4e7eadb82f425aa09daf204cb45 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 18 Jun 2023 13:56:26 -0700 Subject: [PATCH 5/6] expose in python --- python/deltalake/table.py | 8 +++++++- python/src/lib.rs | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index dc7e7a1214..b57cad0213 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -678,6 +678,7 @@ def z_order( partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, + max_spill_size: int = 20 * 1024 * 1024 * 1024, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -692,10 +693,15 @@ def z_order( :param max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. + :param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB. :return: the metrics from optimize """ metrics = self.table._table.z_order_optimize( - list(columns), partition_filters, target_size, max_concurrent_tasks + list(columns), + partition_filters, + target_size, + max_concurrent_tasks, + max_spill_size, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/src/lib.rs b/python/src/lib.rs index 4b63567f26..ca7247c365 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -291,16 +291,18 @@ impl RawDeltaTable { } /// Run z-order variation of optimize - #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None))] + #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))] pub fn z_order_optimize( &mut self, z_order_columns: Vec, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, + max_spill_size: usize, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) + .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); From c8e7a55183464a1db626e39db6b6c06b9f71cafa Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Jun 2023 14:54:08 -0700 Subject: [PATCH 6/6] lint fix --- python/deltalake/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 5ec6d1d56a..466010ed7f 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -220,7 +220,7 @@ def visitor(written_file: Any) -> None: if PYARROW_MAJOR_VERSION >= 9: size = written_file.size else: - size = filesystem.get_file_info([path])[0].size # type: ignore + size = filesystem.get_file_info([path])[0].size add_actions.append( AddAction(