Skip to content

Commit

Permalink
feat: typed commit info (delta-io#1207)
Browse files Browse the repository at this point in the history
# Description

Another PR on the road to delta-io#632 - ~~keeping it a draft, as it is based on
delta-io#1206~~

While the `commitInfo` action is defined as completely optional, spark
and delta-rs write at the very least interesting, but often also quite
helpful information into the commit info. To make it easier to work with
and centralize some conventions, we introduce a `CommitInfo` struct,
that exposes some of the fields at the top level. Additionally we
harmonize a bit between spark and delta-rs conventions.

# Related Issue(s)

part of delta-io#632 

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
2 people authored and chitralverma committed Mar 17, 2023
1 parent d543798 commit fe7a417
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 134 deletions.
200 changes: 173 additions & 27 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ mod parquet_read;
#[cfg(feature = "parquet2")]
pub mod parquet2_read;

use crate::{schema::*, DeltaTableError, DeltaTableMetaData};
use crate::delta_config::IsolationLevel;
use crate::{schema::*, DeltaResult, DeltaTableError, DeltaTableMetaData};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -44,6 +45,13 @@ pub enum ActionError {
#[from]
source: parquet::errors::ParquetError,
},
/// Faild to serialize operation
#[error("Failed to serialize operation: {source}")]
SerializeOperation {
#[from]
/// The source error
source: serde_json::Error,
},
}

fn decode_path(raw_path: &str) -> Result<String, ActionError> {
Expand Down Expand Up @@ -435,7 +443,43 @@ pub struct Protocol {
pub min_writer_version: DeltaDataTypeInt,
}

type CommitInfo = Map<String, Value>;
/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
/// However the reference implementation as well as delta-rs store useful information that may for instance
/// allow us to be more permissive in commit conflict resolution.
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct CommitInfo {
/// Timestamp in millis when the commit was created
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<DeltaDataTypeTimestamp>,
/// Id of the user invoking the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
/// Name of the user invoking the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub user_name: Option<String>,
/// The operation performed during the
#[serde(skip_serializing_if = "Option::is_none")]
pub operation: Option<String>,
/// Parameters used for table operation
#[serde(skip_serializing_if = "Option::is_none")]
pub operation_parameters: Option<HashMap<String, serde_json::Value>>,
/// Version of the table when the operation was started
#[serde(skip_serializing_if = "Option::is_none")]
pub read_version: Option<i64>,
/// The isolation level of the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub isolation_level: Option<IsolationLevel>,
/// TODO
#[serde(skip_serializing_if = "Option::is_none")]
pub is_blind_append: Option<bool>,
/// Delta engine which created the commit.
#[serde(skip_serializing_if = "Option::is_none")]
pub engine_info: Option<String>,
/// Additional provenance information for the commit
#[serde(flatten, default)]
pub info: Map<String, serde_json::Value>,
}

/// Represents an action in the Delta log. The Delta log is an aggregate of all actions performed
/// on the table, so the full list of actions is required to properly read a table.
Expand All @@ -459,6 +503,16 @@ pub enum Action {
commitInfo(CommitInfo),
}

impl Action {
/// Create a commit info from a map
pub fn commit_info(info: Map<String, serde_json::Value>) -> Self {
Self::commitInfo(CommitInfo {
info,
..Default::default()
})
}
}

/// Operation performed when creating a new log entry with one or more actions.
/// This is a key element of the `CommitInfo` action.
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -517,45 +571,78 @@ pub enum DeltaOperation {
}

impl DeltaOperation {
/// Retrieve basic commit information to be added to Delta commits
pub fn get_commit_info(&self) -> Map<String, Value> {
let mut commit_info = Map::<String, Value>::new();
let operation = match &self {
DeltaOperation::Create { .. } => "delta-rs.Create",
DeltaOperation::Write { .. } => "delta-rs.Write",
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck",
};
commit_info.insert(
"operation".to_string(),
serde_json::Value::String(operation.into()),
);
/// A human readable name for the operation
pub fn name(&self) -> &str {
// operation names taken from https://learn.microsoft.com/en-us/azure/databricks/delta/history#--operation-metrics-keys
match &self {
DeltaOperation::Create { mode, .. } if matches!(mode, SaveMode::Overwrite) => {
"CREATE OR REPLACE TABLE"
}
DeltaOperation::Create { .. } => "CREATE TABLE",
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
}
}

if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) {
let all_operation_fields = map.values().next().unwrap().as_object().unwrap();
let converted_operation_fields: Map<String, Value> = all_operation_fields
/// Parameters configured for operation.
pub fn operation_parameters(&self) -> DeltaResult<HashMap<String, Value>> {
if let Some(Some(Some(map))) = serde_json::to_value(self)
.map_err(|err| ActionError::SerializeOperation { source: err })?
.as_object()
.map(|p| p.values().next().map(|q| q.as_object()))
{
Ok(map
.iter()
.filter(|item| !item.1.is_null())
.map(|(k, v)| {
(
k.clone(),
k.to_owned(),
serde_json::Value::String(if v.is_string() {
String::from(v.as_str().unwrap())
} else {
v.to_string()
}),
)
})
.collect();
.collect())
} else {
Err(ActionError::Generic(
"Operation parameters serialized into unexpected shape".into(),
)
.into())
}
}

commit_info.insert(
"operationParameters".to_string(),
serde_json::Value::Object(converted_operation_fields),
);
};
/// Denotes if the operation changes the data contained in the table
pub fn changes_data(&self) -> bool {
match self {
Self::Optimize { .. } => false,
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
| Self::Write { .. } => true,
}
}

commit_info
/// Retrieve basic commit information to be added to Delta commits
pub fn get_commit_info(&self) -> CommitInfo {
// TODO infer additional info from operation parameters ...
CommitInfo {
operation: Some(self.name().into()),
operation_parameters: self.operation_parameters().ok(),
..Default::default()
}
}

/// Get predicate expression applied when the operation reads data from the table.
pub fn read_predicate(&self) -> Option<String> {
match self {
// TODO add more operations
Self::Write { predicate, .. } => predicate.clone(),
_ => None,
}
}
}

Expand Down Expand Up @@ -654,4 +741,63 @@ mod tests {
1
);
}

#[test]
fn test_read_commit_info() {
let raw = r#"
{
"timestamp": 1670892998177,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"c1\",\"c2\"]"
},
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "3",
"numOutputRows": "3",
"numOutputBytes": "1356"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c"
}"#;

let info = serde_json::from_str::<CommitInfo>(raw);
assert!(info.is_ok());

// assert that commit info has no required filelds
let raw = "{}";
let info = serde_json::from_str::<CommitInfo>(raw);
assert!(info.is_ok());

// arbitrary field data may be added to commit
let raw = r#"
{
"timestamp": 1670892998177,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"c1\",\"c2\"]"
},
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "3",
"numOutputRows": "3",
"numOutputBytes": "1356"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c",
"additionalField": "more data",
"additionalStruct": {
"key": "value",
"otherKey": 123
}
}"#;

let info = serde_json::from_str::<CommitInfo>(raw).expect("should parse");
assert!(info.info.contains_key("additionalField"));
assert!(info.info.contains_key("additionalStruct"));
}
}
2 changes: 1 addition & 1 deletion rust/src/action/parquet2_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl ActionVariant for CommitInfo {
type Variant = CommitInfo;

fn default_action() -> Action {
Action::commitInfo(CommitInfo::new())
Action::commitInfo(CommitInfo::default())
}

fn try_mut_from_action(a: &mut Action) -> Result<&mut Self, ParseError> {
Expand Down
Loading

0 comments on commit fe7a417

Please sign in to comment.