Skip to content

Commit

Permalink
chore: tidying up builds without datafusion feature and clippy
Browse files Browse the repository at this point in the history
The recent convert_to_delta changes imported a symbol from a module that
is only compiled in with the datafusion feature, oops!
  • Loading branch information
rtyler committed May 15, 2024
1 parent c86d29f commit fae1406
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 48 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Command for converting a Parquet table to a Delta table in place
// https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

use crate::operations::write::get_num_idx_cols_and_stats_columns;
use crate::operations::get_num_idx_cols_and_stats_columns;
use crate::{
kernel::{Add, DataType, Schema, StructField},
logstore::{LogStore, LogStoreRef},
Expand Down
27 changes: 27 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,33 @@ impl AsRef<DeltaTable> for DeltaOps {
}
}

/// Get the num_idx_columns and stats_columns from the table configuration in the state
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub fn get_num_idx_cols_and_stats_columns(
config: Option<crate::table::config::TableConfig<'_>>,
configuration: HashMap<String, Option<String>>,
) -> (i32, Option<Vec<String>>) {
let (num_index_cols, stats_columns) = match &config {
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
_ => (
configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(crate::table::config::DEFAULT_NUM_INDEX_COLS),
configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
),
};
(
num_index_cols,
stats_columns
.clone()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
)
}

#[cfg(feature = "datafusion")]
mod datafusion_utils {
use datafusion::execution::context::SessionState;
Expand Down
9 changes: 1 addition & 8 deletions crates/core/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<'a> TransactionInfo<'a> {
#[cfg(not(feature = "datafusion"))]
/// Files read by the transaction
pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
Ok(self.read_snapshot.file_actions().unwrap().into_iter())
Ok(self.read_snapshot.file_actions().unwrap())
}

/// Whether the whole table was read during the transaction
Expand Down Expand Up @@ -311,13 +311,6 @@ impl WinningCommitSummary {
}
}

// pub fn only_add_files(&self) -> bool {
// !self
// .actions
// .iter()
// .any(|action| matches!(action, Action::remove(_)))
// }

pub fn is_blind_append(&self) -> Option<bool> {
self.commit_info
.as_ref()
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ impl ProtocolChecker {
}

/// checks if table contains timestamp_ntz in any field including nested fields.
pub fn contains_timestampntz(&self, fields: &Vec<StructField>) -> bool {
fn check_vec_fields(fields: &Vec<StructField>) -> bool {
pub fn contains_timestampntz(&self, fields: &[StructField]) -> bool {
fn check_vec_fields(fields: &[StructField]) -> bool {
fields.iter().any(|f| _check_type(f.data_type()))
}

Expand Down
30 changes: 1 addition & 29 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::logstore::LogStoreRef;
use crate::operations::cast::{cast_record_batch, merge_schema};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::table::state::DeltaTableState;
use crate::table::Constraint as DeltaConstraint;
use crate::writer::record_batch::divide_by_partition_values;
Expand Down Expand Up @@ -759,7 +758,7 @@ impl std::future::IntoFuture for WriteBuilder {
.map(|snapshot| snapshot.table_config());

let (num_indexed_cols, stats_columns) =
get_num_idx_cols_and_stats_columns(config, this.configuration);
super::get_num_idx_cols_and_stats_columns(config, this.configuration);

let writer_stats_config = WriterStatsConfig {
num_indexed_cols,
Expand Down Expand Up @@ -922,33 +921,6 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE
Ok(())
}

/// Get the num_idx_columns and stats_columns from the table configuration in the state
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub fn get_num_idx_cols_and_stats_columns(
config: Option<crate::table::config::TableConfig<'_>>,
configuration: HashMap<String, Option<String>>,
) -> (i32, Option<Vec<String>>) {
let (num_index_cols, stats_columns) = match &config {
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
_ => (
configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
),
};
(
num_index_cols,
stats_columns
.clone()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ mod tests {
use crate::DeltaOps;

let table = crate::writer::test_utils::create_bare_table();
let partition_cols = vec!["modified".to_string()];
let partition_cols = ["modified".to_string()];
let delta_schema = r#"
{"type" : "struct",
"fields" : [
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn stats_from_metadata(
let idx_to_iterate = if let Some(stats_cols) = stats_columns {
schema_descriptor
.columns()
.into_iter()
.iter()
.enumerate()
.filter_map(|(index, col)| {
if stats_cols.contains(&col.name().to_string()) {
Expand Down
10 changes: 4 additions & 6 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1732,12 +1732,10 @@ fn get_num_idx_cols_and_stats_columns(
.map_err(PythonError::from)?
.map(|snapshot| snapshot.table_config());

Ok(
deltalake::operations::write::get_num_idx_cols_and_stats_columns(
config,
configuration.unwrap_or_default(),
),
)
Ok(deltalake::operations::get_num_idx_cols_and_stats_columns(
config,
configuration.unwrap_or_default(),
))
}

#[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")]
Expand Down

0 comments on commit fae1406

Please sign in to comment.