diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index df537624e..b5d37fb98 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -135,7 +135,6 @@ impl Protocol { #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct CommitInfo { pub kernel_version: Option, - // pub engine_info: HashMap } #[derive(Debug, Clone, PartialEq, Eq, Schema)] diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 4e259540e..c8732b4ed 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -36,62 +36,115 @@ impl Transaction { } pub fn commit(self, engine: &dyn Engine) -> DeltaResult { + use crate::actions::{ + ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME, + }; + // step one: construct the iterator of actions we want to commit let action_schema = get_log_schema(); let actions = self.commit_info.into_iter().map(|commit_info| { // expression to select all the columns + let mut commit_info_expr = vec![Expression::literal("v0.3.1")]; + commit_info_expr.extend( + commit_info + .schema + .fields() + .map(|f| Expression::column(f.name())) + .collect::>(), + ); let commit_info_expr = Expression::Struct(vec![ Expression::Literal(Scalar::Null( - action_schema - .field(crate::actions::ADD_NAME) - .unwrap() - .data_type() - .clone(), + action_schema.field(ADD_NAME).unwrap().data_type().clone(), )), Expression::Literal(Scalar::Null( action_schema - .field(crate::actions::REMOVE_NAME) + .field(REMOVE_NAME) .unwrap() .data_type() .clone(), )), Expression::Literal(Scalar::Null( action_schema - .field(crate::actions::METADATA_NAME) + .field(METADATA_NAME) .unwrap() .data_type() .clone(), )), Expression::Literal(Scalar::Null( action_schema - .field(crate::actions::PROTOCOL_NAME) + .field(PROTOCOL_NAME) .unwrap() .data_type() .clone(), )), Expression::Literal(Scalar::Null( action_schema - .field(crate::actions::TRANSACTION_NAME) + .field(TRANSACTION_NAME) .unwrap() .data_type() .clone(), )), - Expression::Struct( - commit_info - .schema - .fields() - .map(|f| Expression::column(f.name())) - .collect(), - ), + Expression::Struct(commit_info_expr), ]); + // add the commit info fields to the action schema. + // e.g. if engine's commit info is {engineInfo: string, operation: string} + // then the 'commit_info' field in the actions will be: + // {kernelVersion: string, engineInfo: string, operation: string} + // let action_fields = action_schema + // .project_as_struct(&[ + // ADD_NAME, + // REMOVE_NAME, + // METADATA_NAME, + // PROTOCOL_NAME, + // TRANSACTION_NAME, + // ]) + // .unwrap() + // .fields(); + // let kernel_commit_info_fields = action_schema + // .project_as_struct(&[COMMIT_INFO_NAME]) + // .unwrap() + // .fields(); + // let engine_commit_info_fields = commit_info.schema.fields(); + // let commit_info_fields = kernel_commit_info_fields.chain(engine_commit_info_fields); + // let action_schema = StructType::new( + // std::iter::once(commit_info_fields) + // .chain(action_fields) + // .collect::Vec<_>() + // ); + + let mut action_fields = action_schema.fields().collect::>(); + let commit_info_field = action_fields.pop().unwrap(); + let mut commit_info_fields = + if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() { + commit_info_schema.fields().collect::>() + } else { + unreachable!() + }; + commit_info_fields.extend(commit_info.schema.fields()); + let commit_info_schema = + StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect()); + let mut action_fields = action_fields + .into_iter() + .map(|f| f.clone()) + .collect::>(); + action_fields.push(crate::schema::StructField::new( + COMMIT_INFO_NAME, + commit_info_schema, + true, + )); + let action_schema = StructType::new(action_fields); + + println!("commit_info schema: {:#?}", commit_info.schema); + println!("action_schema: {:#?}", action_schema); + // commit info has arbitrary schema ex: {engineInfo: string, operation: string} // we want to bundle it up and put it in the commit_info field of the actions. let commit_info_evaluator = engine.get_expression_handler().get_evaluator( commit_info.schema.into(), commit_info_expr, - >::into(action_schema.clone()), + action_schema.into(), ); commit_info_evaluator .evaluate(commit_info.data.as_ref()) diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 9896a5936..2ac241538 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -111,7 +111,7 @@ async fn test_commit_info() -> Result<(), Box> { .await?; assert_eq!( String::from_utf8(commit1.bytes().await?.to_vec())?, - "{\"commitInfo\":{\"kernelVersion\":\"default engine\"}}\n" + "{\"commitInfo\":{\"kernelVersion\":\"v0.3.1\",\"engineInfo\":\"default engine\"}}\n" ); Ok(()) }