Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extend configuration handling #1206

Merged
merged 1 commit into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
446 changes: 331 additions & 115 deletions rust/src/delta_config.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub mod operations;
pub mod partitions;
pub mod schema;
pub mod storage;
pub mod table_properties;
pub mod table_state;
pub mod time_utils;

Expand Down
21 changes: 14 additions & 7 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::builder::ensure_table_uri;
use crate::delta_config::DeltaConfigKey;
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -152,19 +153,25 @@ impl CreateBuilder {
}

/// Set configuration on created table
pub fn with_configuration(mut self, configuration: HashMap<String, Option<String>>) -> Self {
self.configuration = configuration;
pub fn with_configuration(
mut self,
configuration: HashMap<DeltaConfigKey, Option<impl Into<String>>>,
) -> Self {
self.configuration = configuration
.into_iter()
.map(|(k, v)| (k.as_ref().into(), v.map(|s| s.into())))
.collect();
self
}

/// Specify a table property in the table configuration
pub fn with_configuration_property(
mut self,
key: impl Into<String>,
key: DeltaConfigKey,
value: Option<impl Into<String>>,
) -> Self {
self.configuration
.insert(key.into(), value.map(|v| v.into()));
.insert(key.as_ref().into(), value.map(|v| v.into()));
self
}

Expand Down Expand Up @@ -311,8 +318,8 @@ impl std::future::IntoFuture for CreateBuilder {
#[cfg(all(test, feature = "parquet"))]
mod tests {
use super::*;
use crate::delta_config::DeltaConfigKey;
use crate::operations::DeltaOps;
use crate::table_properties::APPEND_ONLY;
use crate::writer::test_utils::get_delta_schema;
use tempdir::TempDir;

Expand Down Expand Up @@ -396,14 +403,14 @@ mod tests {
let table = CreateBuilder::new()
.with_location("memory://")
.with_columns(schema.get_fields().clone())
.with_configuration_property(APPEND_ONLY, Some("true"))
.with_configuration_property(DeltaConfigKey::AppendOnly, Some("true"))
.await
.unwrap();
let append = table
.get_metadata()
.unwrap()
.configuration
.get(APPEND_ONLY)
.get(DeltaConfigKey::AppendOnly.as_ref())
.unwrap()
.as_ref()
.unwrap()
Expand Down
26 changes: 3 additions & 23 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
use crate::writer::utils::arrow_schema_without_partitions;
use crate::writer::utils::PartitionPath;
use crate::{table_properties, DeltaDataTypeVersion};
use crate::DeltaDataTypeVersion;
use crate::{
DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, PartitionFilter,
};
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use futures::future::BoxFuture;
use futures::StreamExt;
use log::{debug, error};
use log::debug;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::file::properties::WriterProperties;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -380,34 +380,14 @@ impl MergePlan {
}
}

fn get_target_file_size(snapshot: &DeltaTableState) -> DeltaDataTypeLong {
let mut target_size = 268_435_456;
if let Some(meta) = snapshot.current_metadata() {
let config_str = meta.configuration.get(table_properties::TARGET_FILE_SIZE);
if let Some(s) = config_str {
if let Some(s) = s {
let r = s.parse::<i64>();
if let Ok(size) = r {
target_size = size;
} else {
error!("Unable to parse value of 'delta.targetFileSize'. Using default value");
}
} else {
error!("Check your configuration of 'delta.targetFileSize'. Using default value");
}
}
}
target_size
}

/// Build a Plan on which files to merge together. See [OptimizeBuilder]
pub fn create_merge_plan(
snapshot: &DeltaTableState,
filters: &[PartitionFilter<'_, &str>],
target_size: Option<DeltaDataTypeLong>,
writer_properties: WriterProperties,
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| get_target_file_size(snapshot));
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());
let mut candidates = HashMap::new();
let mut operations: HashMap<PartitionPath, PartitionMergePlan> = HashMap::new();
let mut metrics = Metrics::default();
Expand Down
69 changes: 0 additions & 69 deletions rust/src/table_properties.rs

This file was deleted.

34 changes: 25 additions & 9 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
//! The module for delta table state.

use crate::action::{self, Action, Add};
use crate::delta_config;
use crate::delta_config::TableConfig;
use crate::partitions::{DeltaTablePartition, PartitionFilter};
use crate::schema::SchemaDataType;
use crate::Schema;
use crate::{
ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError,
DeltaTableMetaData,
};
use chrono::Utc;
use lazy_static::lazy_static;
use object_store::{path::Path, ObjectStore};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -228,6 +230,22 @@ impl DeltaTableState {
self.current_metadata.as_ref()
}

/// The table schema
pub fn schema(&self) -> Option<&Schema> {
self.current_metadata.as_ref().map(|m| &m.schema)
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
lazy_static! {
static ref DUMMY_CONF: HashMap<String, Option<String>> = HashMap::new();
}
self.current_metadata
.as_ref()
.map(|meta| TableConfig(&meta.configuration))
.unwrap_or_else(|| TableConfig(&DUMMY_CONF))
}

/// Merges new state information into our state
///
/// The DeltaTableState also carries the version information for the given state,
Expand Down Expand Up @@ -322,14 +340,12 @@ impl DeltaTableState {
action::Action::metaData(v) => {
let md = DeltaTableMetaData::try_from(v)
.map_err(|e| ApplyLogError::InvalidJson { source: e })?;
self.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
self.log_retention_millis = delta_config::LOG_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
self.enable_expired_log_cleanup =
delta_config::ENABLE_EXPIRED_LOG_CLEANUP.get_boolean_from_metadata(&md)?;
let table_config = TableConfig(&md.configuration);
self.tombstone_retention_millis =
table_config.deleted_file_retention_duration().as_millis() as i64;
self.log_retention_millis =
table_config.log_retention_duration().as_millis() as i64;
self.enable_expired_log_cleanup = table_config.enable_expired_log_cleanup();
self.current_metadata = Some(md);
}
action::Action::txn(v) => {
Expand Down
12 changes: 7 additions & 5 deletions rust/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod delete_expired_delta_log_in_checkpoint {

use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::delta_config::DeltaConfigKey;
use deltalake::*;
use maplit::hashmap;

Expand All @@ -99,8 +100,8 @@ mod delete_expired_delta_log_in_checkpoint {
let mut table = fs_common::create_table(
"./tests/data/checkpoints_with_expired_logs/expired",
Some(hashmap! {
delta_config::LOG_RETENTION.key.clone() => Some("interval 10 minute".to_string()),
delta_config::ENABLE_EXPIRED_LOG_CLEANUP.key.clone() => Some("true".to_string())
DeltaConfigKey::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()),
DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string())
}),
)
.await;
Expand Down Expand Up @@ -163,8 +164,8 @@ mod delete_expired_delta_log_in_checkpoint {
let mut table = fs_common::create_table(
"./tests/data/checkpoints_with_expired_logs/not_delete_expired",
Some(hashmap! {
delta_config::LOG_RETENTION.key.clone() => Some("interval 1 second".to_string()),
delta_config::ENABLE_EXPIRED_LOG_CLEANUP.key.clone() => Some("false".to_string())
DeltaConfigKey::LogRetentionDuration.as_ref().into() => Some("interval 1 second".to_string()),
DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into() => Some("false".to_string())
}),
)
.await;
Expand Down Expand Up @@ -212,6 +213,7 @@ mod checkpoints_with_tombstones {
use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::action::*;
use deltalake::delta_config::DeltaConfigKey;
use deltalake::*;
use maplit::hashmap;
use parquet::file::reader::{FileReader, SerializedFileReader};
Expand All @@ -237,7 +239,7 @@ mod checkpoints_with_tombstones {
#[tokio::test]
async fn test_expired_tombstones() {
let mut table = fs_common::create_table("./tests/data/checkpoints_tombstones/expired", Some(hashmap! {
delta_config::TOMBSTONE_RETENTION.key.clone() => Some("interval 1 minute".to_string())
DeltaConfigKey::DeletedFileRetentionDuration.as_ref().into() => Some("interval 1 minute".to_string())
})).await;

let a1 = fs_common::add(3 * 60 * 1000); // 3 mins ago,
Expand Down