Skip to content

Commit

Permalink
feat(python, rust): respect column stats collection configurations (d…
Browse files Browse the repository at this point in the history
…elta-io#2428)

# Description
All of the Rust and Python write actions will now properly adhere to the
configuration regarding the amount of columns stats have to be collected
for. Either by dataSkippingNumIndexedCols or dataSkippingStatsColumns.

# Related Issue(s)
- closes delta-io#2427

---------

Co-authored-by: R. Tyler Croy <[email protected]>
  • Loading branch information
ion-elgreco and rtyler authored May 11, 2024
1 parent c734926 commit 353e08b
Show file tree
Hide file tree
Showing 17 changed files with 513 additions and 14 deletions.
10 changes: 10 additions & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde::Serialize;

use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::WriterStatsConfig;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder,
Expand Down Expand Up @@ -153,6 +154,14 @@ async fn excute_non_empty_expr(
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
);

let add_actions = write_execution_plan(
Some(snapshot),
state.clone(),
Expand All @@ -164,6 +173,7 @@ async fn excute_non_empty_expr(
writer_properties,
false,
None,
writer_stats_config,
)
.await?
.into_iter()
Expand Down
11 changes: 10 additions & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use crate::kernel::Action;
use crate::logstore::LogStoreRef;
use crate::operations::merge::barrier::find_barrier_node;
use crate::operations::transaction::CommitBuilder;
use crate::operations::write::write_execution_plan;
use crate::operations::write::{write_execution_plan, WriterStatsConfig};
use crate::protocol::{DeltaOperation, MergePredicate};
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -1368,6 +1368,14 @@ async fn execute(
// write projected records
let table_partition_cols = current_metadata.partition_columns.clone();

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
);

let rewrite_start = Instant::now();
let add_actions = write_execution_plan(
Some(&snapshot),
Expand All @@ -1380,6 +1388,7 @@ async fn execute(
writer_properties,
safe_cast,
None,
writer_stats_config,
)
.await?;

Expand Down
16 changes: 15 additions & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ pub struct MergeTaskParameters {
file_schema: ArrowSchemaRef,
/// Properties passed to parquet writer
writer_properties: WriterProperties,
/// Num index cols to collect stats for
num_indexed_cols: i32,
/// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
stats_columns: Option<Vec<String>>,
}

/// A stream of record batches, with a ParquetError on failure.
Expand Down Expand Up @@ -483,7 +487,12 @@ impl MergePlan {
Some(task_parameters.input_parameters.target_size as usize),
None,
)?;
let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?;
let mut writer = PartitionWriter::try_with_config(
object_store,
writer_config,
task_parameters.num_indexed_cols,
task_parameters.stats_columns.clone(),
)?;

let mut read_stream = read_stream.await?;

Expand Down Expand Up @@ -841,6 +850,11 @@ pub fn create_merge_plan(
input_parameters,
file_schema,
writer_properties,
num_indexed_cols: snapshot.table_config().num_indexed_cols(),
stats_columns: snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
}),
read_table_version: snapshot.version(),
})
Expand Down
11 changes: 10 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;

use super::transaction::PROTOCOL;
use super::write::write_execution_plan;
use super::{
datafusion_utils::Expression,
transaction::{CommitBuilder, CommitProperties},
};
use super::{transaction::PROTOCOL, write::WriterStatsConfig};
use crate::delta_datafusion::{
create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec,
DataFusionMixins, DeltaColumn, DeltaSessionContext,
Expand Down Expand Up @@ -348,6 +348,14 @@ async fn execute(
projection_update.clone(),
)?);

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
);

let add_actions = write_execution_plan(
Some(&snapshot),
state.clone(),
Expand All @@ -359,6 +367,7 @@ async fn execute(
writer_properties,
safe_cast,
None,
writer_stats_config,
)
.await?;

Expand Down
71 changes: 71 additions & 0 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::logstore::LogStoreRef;
use crate::operations::cast::{cast_record_batch, merge_schema};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::table::state::DeltaTableState;
use crate::table::Constraint as DeltaConstraint;
use crate::writer::record_batch::divide_by_partition_values;
Expand Down Expand Up @@ -337,6 +338,24 @@ impl WriteBuilder {
}
}
}
/// Configuration for the writer on how to collect stats
#[derive(Clone)]
pub struct WriterStatsConfig {
/// Number of columns to collect stats for, idx based
num_indexed_cols: i32,
/// Optional list of columns which to collect stats for, takes precedende over num_index_cols
stats_columns: Option<Vec<String>>,
}

impl WriterStatsConfig {
/// Create new writer stats config
pub fn new(num_indexed_cols: i32, stats_columns: Option<Vec<String>>) -> Self {
Self {
num_indexed_cols,
stats_columns,
}
}
}

#[allow(clippy::too_many_arguments)]
async fn write_execution_plan_with_predicate(
Expand All @@ -351,6 +370,7 @@ async fn write_execution_plan_with_predicate(
writer_properties: Option<WriterProperties>,
safe_cast: bool,
schema_mode: Option<SchemaMode>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
let schema: ArrowSchemaRef = if schema_mode.is_some() {
plan.schema()
Expand Down Expand Up @@ -386,6 +406,8 @@ async fn write_execution_plan_with_predicate(
writer_properties.clone(),
target_file_size,
write_batch_size,
writer_stats_config.num_indexed_cols,
writer_stats_config.stats_columns.clone(),
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
let checker_stream = checker.clone();
Expand Down Expand Up @@ -438,6 +460,7 @@ pub(crate) async fn write_execution_plan(
writer_properties: Option<WriterProperties>,
safe_cast: bool,
schema_mode: Option<SchemaMode>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
write_execution_plan_with_predicate(
None,
Expand All @@ -451,10 +474,12 @@ pub(crate) async fn write_execution_plan(
writer_properties,
safe_cast,
schema_mode,
writer_stats_config,
)
.await
}

#[allow(clippy::too_many_arguments)]
async fn execute_non_empty_expr(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
Expand All @@ -463,6 +488,7 @@ async fn execute_non_empty_expr(
expression: &Expr,
rewrite: &[Add],
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.
Expand Down Expand Up @@ -496,13 +522,15 @@ async fn execute_non_empty_expr(
writer_properties,
false,
None,
writer_stats_config,
)
.await?;

Ok(add_actions)
}

// This should only be called wth a valid predicate
#[allow(clippy::too_many_arguments)]
async fn prepare_predicate_actions(
predicate: Expr,
log_store: LogStoreRef,
Expand All @@ -511,6 +539,7 @@ async fn prepare_predicate_actions(
partition_columns: Vec<String>,
writer_properties: Option<WriterProperties>,
deletion_timestamp: i64,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
let candidates =
find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?;
Expand All @@ -526,6 +555,7 @@ async fn prepare_predicate_actions(
&predicate,
&candidates.candidates,
writer_properties,
writer_stats_config,
)
.await?
};
Expand Down Expand Up @@ -723,6 +753,18 @@ impl std::future::IntoFuture for WriteBuilder {
_ => (None, None),
};

let config: Option<crate::table::config::TableConfig<'_>> = this
.snapshot
.as_ref()
.map(|snapshot| snapshot.table_config());

let (num_indexed_cols, stats_columns) =
get_num_idx_cols_and_stats_columns(config, this.configuration);

let writer_stats_config = WriterStatsConfig {
num_indexed_cols,
stats_columns,
};
// Here we need to validate if the new data conforms to a predicate if one is provided
let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
Expand All @@ -736,6 +778,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.writer_properties.clone(),
this.safe_cast,
this.schema_mode,
writer_stats_config.clone(),
)
.await?;
actions.extend(add_actions);
Expand Down Expand Up @@ -772,6 +815,7 @@ impl std::future::IntoFuture for WriteBuilder {
partition_columns.clone(),
this.writer_properties,
deletion_timestamp,
writer_stats_config,
)
.await?;
if !predicate_actions.is_empty() {
Expand Down Expand Up @@ -878,6 +922,33 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE
Ok(())
}

/// Get the num_idx_columns and stats_columns from the table configuration in the state
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub fn get_num_idx_cols_and_stats_columns(
config: Option<crate::table::config::TableConfig<'_>>,
configuration: HashMap<String, Option<String>>,
) -> (i32, Option<Vec<String>>) {
let (num_index_cols, stats_columns) = match &config {
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
_ => (
configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
),
};
(
num_index_cols,
stats_columns
.clone()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 353e08b

Please sign in to comment.