diff --git a/python/deltalake/table.py b/python/deltalake/table.py index dc7e7a1214..b57cad0213 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -678,6 +678,7 @@ def z_order( partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, + max_spill_size: int = 20 * 1024 * 1024 * 1024, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -692,10 +693,15 @@ def z_order( :param max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. + :param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB. :return: the metrics from optimize """ metrics = self.table._table.z_order_optimize( - list(columns), partition_filters, target_size, max_concurrent_tasks + list(columns), + partition_filters, + target_size, + max_concurrent_tasks, + max_spill_size, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 5ec6d1d56a..466010ed7f 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -220,7 +220,7 @@ def visitor(written_file: Any) -> None: if PYARROW_MAJOR_VERSION >= 9: size = written_file.size else: - size = filesystem.get_file_info([path])[0].size # type: ignore + size = filesystem.get_file_info([path])[0].size add_actions.append( AddAction( diff --git a/python/src/lib.rs b/python/src/lib.rs index 4b63567f26..ca7247c365 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -291,16 +291,18 @@ impl RawDeltaTable { } /// Run z-order variation of optimize - #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None))] + #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))] pub fn z_order_optimize( &mut self, z_order_columns: Vec, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, + max_spill_size: usize, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) + .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index a49c6a9940..939cc119b9 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -25,9 +25,7 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use arrow_array::cast::as_generic_binary_array; -use arrow_array::{ArrayRef, RecordBatch}; -use arrow_schema::ArrowError; +use arrow_array::RecordBatch; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{Future, StreamExt, TryStreamExt}; @@ -170,6 +168,8 @@ pub struct OptimizeBuilder<'a> { preserve_insertion_order: bool, /// Max number of concurrent tasks (default is number of cpus) max_concurrent_tasks: usize, + /// Maximum number of bytes that are allowed to spill to disk + max_spill_size: usize, /// Optimize type optimize_type: OptimizeType, } @@ -186,6 +186,7 @@ impl<'a> OptimizeBuilder<'a> { app_metadata: None, preserve_insertion_order: false, max_concurrent_tasks: num_cpus::get(), + max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB. optimize_type: OptimizeType::Compact, } } @@ -234,6 +235,12 @@ impl<'a> OptimizeBuilder<'a> { self.max_concurrent_tasks = max_concurrent_tasks; self } + + /// Max spill size + pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self { + self.max_spill_size = max_spill_size; + self + } } impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { @@ -257,6 +264,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.target_size.to_owned(), writer_properties, this.max_concurrent_tasks, + this.max_spill_size, )?; let metrics = plan.execute(this.store.clone(), &this.snapshot).await?; let mut table = DeltaTable::new_with_state(this.store, this.snapshot); @@ -335,6 +343,9 @@ pub struct MergePlan { /// Whether to preserve insertion order within files /// Max number of concurrent tasks max_concurrent_tasks: usize, + #[allow(dead_code)] + /// Maximum number of bytes that are allowed to spill to disk + max_spill_size: usize, } /// Parameters passed to individual merge tasks @@ -444,12 +455,16 @@ impl MergePlan { /// Currently requires loading all the data into memory. This is run for each /// partition, so it is not a problem for tables where each partition is small. /// But for large unpartitioned tables, this could be a problem. + #[cfg(not(feature = "datafusion"))] async fn read_zorder( - columns: Arc>, files: MergeBin, - object_store: ObjectStoreRef, + context: Arc, ) -> Result>, DeltaTableError> { - let object_store_ref = object_store.clone(); + use arrow_array::cast::as_generic_binary_array; + use arrow_array::ArrayRef; + use arrow_schema::ArrowError; + + let object_store_ref = context.object_store.clone(); // Read all batches into a vec let batches: Vec = futures::stream::iter(files.clone()) .then(|file| { @@ -471,7 +486,7 @@ impl MergePlan { .iter() .map(|batch| { let mut zorder_columns = Vec::new(); - for column in columns.iter() { + for column in context.columns.iter() { let array = batch.column_by_name(column).ok_or(ArrowError::SchemaError( format!("Column not found in data file: {column}"), ))?; @@ -498,11 +513,61 @@ impl MergePlan { .collect_vec(); // Interleave the batches - let out_batches = util::interleave_batches(batches, Arc::new(indices), false).await?; + Ok( + util::interleave_batches(batches, indices, 10_000, context.use_inner_threads) + .await + .map_err(|err| ParquetError::General(format!("Failed to reorder data: {:?}", err))) + .boxed(), + ) + } + + /// Datafusion-based z-order read. + #[cfg(feature = "datafusion")] + async fn read_zorder( + files: MergeBin, + context: Arc, + ) -> Result>, DeltaTableError> { + use datafusion::prelude::{col, ParquetReadOptions}; + use datafusion_expr::expr::ScalarUDF; + use datafusion_expr::Expr; + + let locations = files + .iter() + .map(|file| format!("delta-rs:///{}", file.location)) + .collect_vec(); + let df = context + .ctx + .read_parquet(locations, ParquetReadOptions::default()) + .await?; - Ok(futures::stream::once(futures::future::ready(out_batches)) - .map(Ok) - .boxed()) + let original_columns = df + .schema() + .fields() + .iter() + .map(|f| col(f.name())) + .collect_vec(); + + // Add a temporary z-order column we will sort by, and then drop. + const ZORDER_KEY_COLUMN: &str = "__zorder_key"; + let cols = context.columns.iter().map(col).collect_vec(); + let expr = Expr::ScalarUDF(ScalarUDF::new( + Arc::new(zorder::datafusion::zorder_key_udf()), + cols, + )); + let df = df.with_column(ZORDER_KEY_COLUMN, expr)?; + + let df = df.sort(vec![col(ZORDER_KEY_COLUMN).sort(true, true)])?; + let df = df.select(original_columns)?; + + let stream = df + .execute_stream() + .await? + .map_err(|err| { + ParquetError::General(format!("Z-order failed while scanning data: {:?}", err)) + }) + .boxed(); + + Ok(stream) } /// Perform the operations outlined in the plan. @@ -559,14 +624,25 @@ impl MergePlan { .await?; } OptimizeOperations::ZOrder(zorder_columns, bins) => { - let zorder_columns = Arc::new(zorder_columns); + #[cfg(not(feature = "datafusion"))] + let exec_context = Arc::new(zorder::ZOrderExecContext::new( + zorder_columns, + object_store.clone(), + // If there aren't enough bins to use all threads, then instead + // use threads within the bins. This is important for the case where + // the table is un-partitioned, in which case the entire table is just + // one big bin. + bins.len() <= num_cpus::get(), + )); + #[cfg(feature = "datafusion")] + let exec_context = Arc::new(zorder::ZOrderExecContext::new( + zorder_columns, + object_store.clone(), + self.max_spill_size, + )?); futures::stream::iter(bins) .map(|(partition, files)| { - let batch_stream = Self::read_zorder( - zorder_columns.clone(), - files.clone(), - object_store.clone(), - ); + let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); let object_store = object_store.clone(); @@ -651,6 +727,7 @@ pub fn create_merge_plan( target_size: Option, writer_properties: WriterProperties, max_concurrent_tasks: usize, + max_spill_size: usize, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); @@ -690,6 +767,7 @@ pub fn create_merge_plan( }), read_table_version: snapshot.version(), max_concurrent_tasks, + max_spill_size, }) } @@ -861,39 +939,74 @@ fn build_zorder_plan( pub(super) mod util { use super::*; - use arrow_array::ArrayRef; - use arrow_select::interleave::interleave; use futures::Future; - use itertools::Itertools; use tokio::task::JoinError; /// Interleaves a vector of record batches based on a set of indices + #[cfg(not(feature = "datafusion"))] pub async fn interleave_batches( batches: Vec, - indices: Arc>, + indices: Vec<(usize, usize)>, + batch_size: usize, use_threads: bool, - ) -> Result { - // It would be nice if upstream provided this. Though TBH they would - // probably prefer we just use DataFusion to sort. - let columns: Vec = futures::stream::iter(0..batches[0].num_columns()) - .map(|col_i| { - let arrays: Vec = batches - .iter() - .map(|batch| batch.column(col_i).clone()) - .collect_vec(); - let indices = indices.clone(); - let task = tokio::task::spawn_blocking(move || { - let arrays = arrays.iter().map(|arr| arr.as_ref()).collect_vec(); - interleave(&arrays, &indices) - }); - - flatten_join_error(task) + ) -> BoxStream<'static, Result> { + use arrow_array::ArrayRef; + use arrow_select::interleave::interleave; + use futures::TryFutureExt; + + if batches.is_empty() { + return futures::stream::empty().boxed(); + } + let num_columns = batches[0].num_columns(); + let schema = batches[0].schema(); + let arrays = (0..num_columns) + .map(move |col_i| { + Arc::new( + batches + .iter() + .map(|batch| batch.column(col_i).clone()) + .collect_vec(), + ) }) - .buffered(if use_threads { num_cpus::get() } else { 1 }) - .try_collect::>() - .await?; + .collect_vec(); + let arrays = Arc::new(arrays); + + fn interleave_task( + array_chunks: Arc>, + indices: Arc>, + ) -> impl Future> + Send + 'static { + let fut = tokio::task::spawn_blocking(move || { + let array_refs = array_chunks.iter().map(|arr| arr.as_ref()).collect_vec(); + interleave(&array_refs, &indices) + }); + flatten_join_error(fut) + } - Ok(RecordBatch::try_new(batches[0].schema(), columns)?) + fn interleave_batch( + arrays: Arc>>>, + chunk: Vec<(usize, usize)>, + schema: ArrowSchemaRef, + use_threads: bool, + ) -> impl Future> + Send + 'static { + let num_threads = if use_threads { num_cpus::get() } else { 1 }; + let chunk = Arc::new(chunk); + futures::stream::iter(0..arrays.len()) + .map(move |i| arrays[i].clone()) + .map(move |array_chunks| interleave_task(array_chunks.clone(), chunk.clone())) + .buffered(num_threads) + .try_collect::>() + .and_then(move |columns| { + futures::future::ready( + RecordBatch::try_new(schema, columns).map_err(DeltaTableError::from), + ) + }) + } + + futures::stream::iter(indices) + .chunks(batch_size) + .map(move |chunk| interleave_batch(arrays.clone(), chunk, schema.clone(), use_threads)) + .buffered(2) + .boxed() } pub async fn flatten_join_error( @@ -914,7 +1027,7 @@ pub(super) mod util { /// Z-order utilities pub(super) mod zorder { - use std::sync::Arc; + use super::*; use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; use arrow_array::{Array, ArrayRef, BinaryArray}; @@ -922,6 +1035,115 @@ pub(super) mod zorder { use arrow_row::{Row, RowConverter, SortField}; use arrow_schema::ArrowError; + /// Execution context for Z-order scan + #[cfg(not(feature = "datafusion"))] + pub struct ZOrderExecContext { + /// Columns to z-order by + pub columns: Arc<[String]>, + /// Object store to use for reading files + pub object_store: ObjectStoreRef, + /// Whether to use threads when interleaving batches + pub use_inner_threads: bool, + } + + #[cfg(not(feature = "datafusion"))] + impl ZOrderExecContext { + pub fn new( + columns: Vec, + object_store: ObjectStoreRef, + use_inner_threads: bool, + ) -> Self { + let columns = columns.into(); + Self { + columns, + object_store, + use_inner_threads, + } + } + } + + #[cfg(feature = "datafusion")] + pub use self::datafusion::ZOrderExecContext; + + #[cfg(feature = "datafusion")] + pub(super) mod datafusion { + use super::*; + use ::datafusion::{ + execution::{ + memory_pool::FairSpillPool, + runtime_env::{RuntimeConfig, RuntimeEnv}, + }, + prelude::{SessionConfig, SessionContext}, + }; + use arrow_schema::DataType; + use datafusion_common::DataFusionError; + use datafusion_expr::{ColumnarValue, ScalarUDF, Signature, TypeSignature, Volatility}; + use itertools::Itertools; + + pub const ZORDER_UDF_NAME: &str = "zorder_key"; + + pub struct ZOrderExecContext { + pub columns: Arc<[String]>, + pub ctx: SessionContext, + } + + impl ZOrderExecContext { + pub fn new( + columns: Vec, + object_store: ObjectStoreRef, + max_spill_size: usize, + ) -> Result { + let columns = columns.into(); + + let memory_pool = FairSpillPool::new(max_spill_size); + let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); + let runtime = Arc::new(RuntimeEnv::new(config)?); + runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); + + use url::Url; + let ctx = SessionContext::with_config_rt(SessionConfig::default(), runtime); + ctx.register_udf(datafusion::zorder_key_udf()); + Ok(Self { columns, ctx }) + } + } + + /// Get the DataFusion UDF struct for zorder_key + pub fn zorder_key_udf() -> ScalarUDF { + let signature = Signature { + type_signature: TypeSignature::VariadicAny, + volatility: Volatility::Immutable, + }; + ScalarUDF { + name: ZORDER_UDF_NAME.to_string(), + signature, + return_type: Arc::new(|_| Ok(Arc::new(DataType::Binary))), + fun: Arc::new(zorder_key_datafusion), + } + } + + /// Datafusion zorder UDF body + fn zorder_key_datafusion( + columns: &[ColumnarValue], + ) -> Result { + let length = columns + .iter() + .map(|col| match col { + ColumnarValue::Array(array) => array.len(), + ColumnarValue::Scalar(_) => 1, + }) + .max() + .ok_or(DataFusionError::NotImplemented( + "z-order on zero columns.".to_string(), + ))?; + let columns = columns + .iter() + .map(|col| col.clone().into_array(length)) + .collect_vec(); + let array = zorder_key(&columns)?; + Ok(ColumnarValue::Array(array)) + } + } + /// Creates a new binary array containing the zorder keys for the given columns /// /// Each value is 16 bytes * number of columns. Each column is converted into diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 68eee17d07..98984bab0e 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -277,6 +277,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { None, WriterProperties::builder().build(), 1, + 20, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -340,6 +341,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { None, WriterProperties::builder().build(), 1, + 20, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap();