Skip to content

Commit

Permalink
well that was a mess
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 3, 2024
1 parent 10355e6 commit 740d112
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 19 deletions.
1 change: 0 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ impl Protocol {
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct CommitInfo {
pub kernel_version: Option<String>,
// pub engine_info: HashMap<String, String>
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
Expand Down
87 changes: 70 additions & 17 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,62 +36,115 @@ impl Transaction {
}

pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
use crate::actions::{
ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME,
};

// step one: construct the iterator of actions we want to commit
let action_schema = get_log_schema();

let actions = self.commit_info.into_iter().map(|commit_info| {
// expression to select all the columns
let mut commit_info_expr = vec![Expression::literal("v0.3.1")];
commit_info_expr.extend(
commit_info
.schema
.fields()
.map(|f| Expression::column(f.name()))
.collect::<Vec<_>>(),
);
let commit_info_expr = Expression::Struct(vec![
Expression::Literal(Scalar::Null(
action_schema
.field(crate::actions::ADD_NAME)
.unwrap()
.data_type()
.clone(),
action_schema.field(ADD_NAME).unwrap().data_type().clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(crate::actions::REMOVE_NAME)
.field(REMOVE_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(crate::actions::METADATA_NAME)
.field(METADATA_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(crate::actions::PROTOCOL_NAME)
.field(PROTOCOL_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema
.field(crate::actions::TRANSACTION_NAME)
.field(TRANSACTION_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Struct(
commit_info
.schema
.fields()
.map(|f| Expression::column(f.name()))
.collect(),
),
Expression::Struct(commit_info_expr),
]);

// add the commit info fields to the action schema.
// e.g. if engine's commit info is {engineInfo: string, operation: string}
// then the 'commit_info' field in the actions will be:
// {kernelVersion: string, engineInfo: string, operation: string}
// let action_fields = action_schema
// .project_as_struct(&[
// ADD_NAME,
// REMOVE_NAME,
// METADATA_NAME,
// PROTOCOL_NAME,
// TRANSACTION_NAME,
// ])
// .unwrap()
// .fields();
// let kernel_commit_info_fields = action_schema
// .project_as_struct(&[COMMIT_INFO_NAME])
// .unwrap()
// .fields();
// let engine_commit_info_fields = commit_info.schema.fields();
// let commit_info_fields = kernel_commit_info_fields.chain(engine_commit_info_fields);
// let action_schema = StructType::new(
// std::iter::once(commit_info_fields)
// .chain(action_fields)
// .collect::Vec<_>()
// );

let mut action_fields = action_schema.fields().collect::<Vec<_>>();
let commit_info_field = action_fields.pop().unwrap();
let mut commit_info_fields =
if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() {
commit_info_schema.fields().collect::<Vec<_>>()
} else {
unreachable!()
};
commit_info_fields.extend(commit_info.schema.fields());
let commit_info_schema =
StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect());
let mut action_fields = action_fields
.into_iter()
.map(|f| f.clone())
.collect::<Vec<_>>();
action_fields.push(crate::schema::StructField::new(
COMMIT_INFO_NAME,
commit_info_schema,
true,
));
let action_schema = StructType::new(action_fields);

println!("commit_info schema: {:#?}", commit_info.schema);
println!("action_schema: {:#?}", action_schema);

// commit info has arbitrary schema ex: {engineInfo: string, operation: string}
// we want to bundle it up and put it in the commit_info field of the actions.
let commit_info_evaluator = engine.get_expression_handler().get_evaluator(
commit_info.schema.into(),
commit_info_expr,
<StructType as Into<DataType>>::into(action_schema.clone()),
action_schema.into(),
);
commit_info_evaluator
.evaluate(commit_info.data.as_ref())
Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
.await?;
assert_eq!(
String::from_utf8(commit1.bytes().await?.to_vec())?,
"{\"commitInfo\":{\"kernelVersion\":\"default engine\"}}\n"
"{\"commitInfo\":{\"kernelVersion\":\"v0.3.1\",\"engineInfo\":\"default engine\"}}\n"
);
Ok(())
}

0 comments on commit 740d112

Please sign in to comment.