Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 7, 2024
1 parent 3c18380 commit 5779f06
Showing 1 changed file with 83 additions and 74 deletions.
157 changes: 83 additions & 74 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ use crate::snapshot::Snapshot;
use crate::{DataType, Expression};
use crate::{DeltaResult, Engine, EngineData};

const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION");

/// A transaction represents an in-progress write to a table.
pub struct Transaction {
read_snapshot: Arc<Snapshot>,
commit_info: Option<EngineCommitInfo>,
}

// Since the engine can include any commit info it likes, we unify the data/schema pair as a single
// struct with Arc semantics.
#[derive(Clone)]
struct EngineCommitInfo {
data: Box<dyn EngineData>,
schema: Schema,
data: Arc<dyn EngineData>,
schema: SchemaRef,
}

impl std::fmt::Debug for Transaction {
Expand All @@ -28,21 +34,26 @@ impl std::fmt::Debug for Transaction {
}

impl Transaction {
/// Create a new transaction from a snapshot. The snapshot will be used to read the current
/// state of the table (e.g. to read the current version).
///
/// Instead of using this API, the more typical API is
/// [Table::new_transaction](crate::table::Table::new_transaction) to create a transaction from
/// a table automatically backed by the latest snapshot.
pub fn new(snapshot: impl Into<Arc<Snapshot>>) -> Self {
Transaction {
read_snapshot: snapshot.into(),
commit_info: None,
}
}

/// Consume the transaction and commit the in-progress write to the table.
pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
// step one: construct the iterator of actions we want to commit
//
// TODO: enforce single row commit info
// TODO: for now we always require commit info
let (actions, _actions_schema) = generate_commit_info(engine, self.commit_info)?;
// note: only support commit_info right now.
let (actions, _actions_schema) = generate_commit_info(engine, self.commit_info.clone())?;

// step two: figure out the commit version and path to write
// step two: set new commit version (current_version + 1) and path to write
let commit_version = &self.read_snapshot.version() + 1;
let commit_file_name = format!("{:020}", commit_version) + ".json";
let commit_path = &self
Expand All @@ -53,26 +64,40 @@ impl Transaction {

// step three: commit the actions as a json file in the log
let json_handler = engine.get_json_handler();

json_handler.write_json_file(commit_path, Box::new(actions), false)?;
Ok(CommitResult::Committed(commit_version))
match json_handler.write_json_file(commit_path, Box::new(actions), false) {
Ok(()) => Ok(CommitResult::Committed(commit_version)),
Err(crate::error::Error::ObjectStore(object_store::Error::AlreadyExists {
path: _,
source: _,
})) => Ok(CommitResult::Conflict(self, commit_version)),
Err(e) => Err(e),
}
}

/// Add commit info to the transaction. This is commit-wide metadata that is written as the
/// first action in the commit. Note it is required in order to commit. If the engine does not
/// require any commit info, pass an empty `EngineData`.
pub fn commit_info(&mut self, commit_info: Box<dyn EngineData>, schema: Schema) {
self.commit_info = Some(EngineCommitInfo {
data: commit_info,
schema,
data: commit_info.into(),
schema: schema.into(),
});
}
}

/// Result after committing a transaction. If 'committed', the version is the new version written
/// to the log. If 'conflict', the transaction is returned so the caller can resolve the conflict
/// (along with the version which conflicted).
pub enum CommitResult {
/// The transaction was successfully committed at the version.
Committed(crate::Version),
/// The transaction conflicted with an existing version (at the version given).
Conflict(Transaction, crate::Version),
}

// given the engine's commit info (data and schema as [EngineCommitInfo]) we want to create both:
// (1) the commitInfo action to commit (and append more actions to) and
// (2) the schema for the actions to commit (this is determined by the engine's commit info schema)
fn generate_commit_info<'a>(
engine: &'a dyn Engine,
engine_commit_info: Option<EngineCommitInfo>,
Expand All @@ -84,80 +109,64 @@ fn generate_commit_info<'a>(
ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME,
};

let action_schema =
Arc::new(engine_commit_info
.as_ref()
.map_or(get_log_schema().clone(), |commit_info| {
let mut action_fields = get_log_schema().fields().collect::<Vec<_>>();
let commit_info_field = action_fields.pop().unwrap();
let mut commit_info_fields =
if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() {
commit_info_schema.fields().collect::<Vec<_>>()
} else {
unreachable!()
};
commit_info_fields.extend(commit_info.schema.fields());
let commit_info_schema =
StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect());
let mut action_fields = action_fields
.into_iter()
.map(|f| f.clone())
.collect::<Vec<_>>();
action_fields.push(crate::schema::StructField::new(
COMMIT_INFO_NAME,
commit_info_schema,
true,
));
StructType::new(action_fields)
}));
// TODO: enforce single row commit info
// TODO: for now we always require commit info
let action_schema = Arc::new(engine_commit_info.as_ref().map_or(
get_log_schema().clone(),
|commit_info| {
let mut action_fields = get_log_schema().fields().collect::<Vec<_>>();
let commit_info_field = action_fields.pop().unwrap();
let mut commit_info_fields =
if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() {
commit_info_schema.fields().collect::<Vec<_>>()
} else {
unreachable!()
};
commit_info_fields.extend(commit_info.schema.fields());
let commit_info_schema =
StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect());
let mut action_fields = action_fields
.into_iter()
.map(|f| f.clone())
.collect::<Vec<_>>();
action_fields.push(crate::schema::StructField::new(
COMMIT_INFO_NAME,
commit_info_schema,
true,
));
StructType::new(action_fields)
},
));

let action_schema_ref = Arc::clone(&action_schema);
let actions = engine_commit_info.into_iter().map(move |commit_info| {
// TODO RENAME
let engine_commit_info_data = commit_info.data;
let engine_commit_info_schema = commit_info.schema;
// expression to select all the columns
let mut commit_info_expr = vec![Expression::literal("v0.3.1")];
let mut commit_info_expr = vec![Expression::literal(format!("v{}", KERNEL_VERSION))];
commit_info_expr.extend(
engine_commit_info_schema
.fields()
.map(|f| Expression::column(f.name()))
.collect::<Vec<_>>(),
);
let commit_info_expr = Expression::Struct(vec![
Expression::Literal(Scalar::Null(
action_schema_ref.field(ADD_NAME).unwrap().data_type().clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(REMOVE_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(METADATA_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Literal(Scalar::Null(
action_schema_ref
.field(PROTOCOL_NAME)
.unwrap()
.data_type()
.clone(),
)),
// generate expression with null for all the fields except the commit_info field, and
// append the commit_info to the end.
let commit_info_expr_fields = [
ADD_NAME,
REMOVE_NAME,
METADATA_NAME,
PROTOCOL_NAME,
TRANSACTION_NAME,
]
.iter()
.map(|name| {
Expression::Literal(Scalar::Null(
action_schema_ref
.field(TRANSACTION_NAME)
.unwrap()
.data_type()
.clone(),
)),
Expression::Struct(commit_info_expr),
]);
action_schema_ref.field(name).unwrap().data_type().clone(),
))
})
.chain(std::iter::once(Expression::Struct(commit_info_expr)))
.collect::<Vec<_>>();
let commit_info_expr = Expression::Struct(commit_info_expr_fields);

// commit info has arbitrary schema ex: {engineInfo: string, operation: string}
// we want to bundle it up and put it in the commit_info field of the actions.
Expand Down

0 comments on commit 5779f06

Please sign in to comment.