Skip to content

Commit

Permalink
feat(python, rust): add set table properties operation (#2264)
Browse files Browse the repository at this point in the history
# Description
- Adds `set table properties` operation
- Some logic is shared for the create operation

# Related Issue(s)
- closes #1663
- fixes #2247

---------

Co-authored-by: R. Tyler Croy <[email protected]>
  • Loading branch information
ion-elgreco and rtyler authored May 16, 2024
1 parent fae1406 commit 507c3a3
Show file tree
Hide file tree
Showing 12 changed files with 686 additions and 47 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl From<&str> for WriterFeatures {
fn from(value: &str) -> Self {
match value {
"appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly,
"invariants" | "delta.invariants" => WriterFeatures::Invariants,
"checkConstraints" | "delta.checkConstraints" => WriterFeatures::CheckConstraints,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
Expand Down
75 changes: 36 additions & 39 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use maplit::hashset;
use serde_json::Value;

use super::transaction::{CommitBuilder, TableReference, PROTOCOL};
Expand All @@ -13,6 +14,9 @@ use crate::kernel::{
Action, DataType, Metadata, Protocol, ReaderFeatures, StructField, StructType, WriterFeatures,
};
use crate::logstore::{LogStore, LogStoreRef};
use crate::operations::set_tbl_properties::{
apply_properties_to_protocol, convert_properties_to_features,
};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
Expand Down Expand Up @@ -237,41 +241,28 @@ impl CreateBuilder {
)
};

let configuration = self.configuration;
let contains_timestampntz = PROTOCOL.contains_timestampntz(&self.columns);

// TODO configure more permissive versions based on configuration. Also how should this ideally be handled?
// We set the lowest protocol we can, and if subsequent writes use newer features we update metadata?

let (min_reader_version, min_writer_version, writer_features, reader_features) =
if contains_timestampntz {
let mut converted_writer_features = self
.configuration
.keys()
.map(|key| key.clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.collect::<HashSet<WriterFeatures>>();

let mut converted_reader_features = self
.configuration
.keys()
.map(|key| key.clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
converted_writer_features.insert(WriterFeatures::TimestampWithoutTimezone);
converted_reader_features.insert(ReaderFeatures::TimestampWithoutTimezone);
(
3,
7,
Some(converted_writer_features),
Some(converted_reader_features),
)
} else {
(
PROTOCOL.default_reader_version(),
PROTOCOL.default_writer_version(),
None,
None,
)
};
let current_protocol = if contains_timestampntz {
Protocol {
min_reader_version: 3,
min_writer_version: 7,
writer_features: Some(hashset! {WriterFeatures::TimestampWithoutTimezone}),
reader_features: Some(hashset! {ReaderFeatures::TimestampWithoutTimezone}),
}
} else {
Protocol {
min_reader_version: PROTOCOL.default_reader_version(),
min_writer_version: PROTOCOL.default_writer_version(),
reader_features: None,
writer_features: None,
}
};

let protocol = self
.actions
.iter()
Expand All @@ -280,17 +271,23 @@ impl CreateBuilder {
Action::Protocol(p) => p.clone(),
_ => unreachable!(),
})
.unwrap_or_else(|| Protocol {
min_reader_version,
min_writer_version,
writer_features,
reader_features,
});
.unwrap_or_else(|| current_protocol);

let protocol = apply_properties_to_protocol(
&protocol,
&configuration
.iter()
.map(|(k, v)| (k.clone(), v.clone().unwrap()))
.collect::<HashMap<String, String>>(),
true,
)?;

let protocol = convert_properties_to_features(protocol, &configuration);

let mut metadata = Metadata::try_new(
StructType::new(self.columns),
self.partition_columns.unwrap_or_default(),
self.configuration,
configuration,
)?
.with_created_time(chrono::Utc::now().timestamp_millis());
if let Some(name) = self.name {
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
use arrow::record_batch::RecordBatch;
use optimize::OptimizeBuilder;
use restore::RestoreBuilder;
use set_tbl_properties::SetTablePropertiesBuilder;

#[cfg(feature = "datafusion")]
pub mod constraints;
Expand All @@ -48,6 +49,7 @@ mod load;
pub mod load_cdf;
#[cfg(feature = "datafusion")]
pub mod merge;
pub mod set_tbl_properties;
#[cfg(feature = "datafusion")]
pub mod update;
#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -219,6 +221,11 @@ impl DeltaOps {
pub fn drop_constraints(self) -> DropConstraintBuilder {
DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Set table properties
pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap())
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
Loading

0 comments on commit 507c3a3

Please sign in to comment.