From 80f08dbf349ea27390fe0a9ea258b045a3b864ad Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Fri, 25 Oct 2024 12:37:14 -0700 Subject: [PATCH] [write stage0] add Transaction with commit info and commit implementation (#370) This PR does 4 main things: 1. ~reorganize `transaction.rs` so that the transaction action is now moved to actions module~ **EDIT:** now in #386 1. new `Transaction` API which includes: a. `Table.new_transaction()` to create a new transaction from the latest snapshot of the table b. `Transaction.with_commit_info(engine_commit_info: Box)` to add single-row commit info in the form of a `map`. required to commit. c. `Transaction.with_operation(operation: String)` to set the operation name of the transaction (persisted in commit info) d. `Transaction.commit() // consumes transaction` to commit the transaction to the log (currently only supporting committing the commit info) 1. new engine API: `write_json_file(impl Iterator>)` (and a default engine implementation for this) 1. new integration test suite `write.rs` to house many of our write tests as it's implemented resolves #378 --------- Co-authored-by: Ryan Johnson Co-authored-by: Nick Lanham --- ffi/src/lib.rs | 6 + kernel/Cargo.toml | 3 + kernel/src/actions/mod.rs | 58 +++- kernel/src/engine/arrow_utils.rs | 37 +- kernel/src/engine/default/json.rs | 27 ++ kernel/src/engine/sync/json.rs | 102 +++++- kernel/src/error.rs | 12 + kernel/src/lib.rs | 30 ++ kernel/src/path.rs | 43 ++- kernel/src/table.rs | 6 + kernel/src/transaction.rs | 544 ++++++++++++++++++++++++++++++ kernel/tests/write.rs | 253 ++++++++++++++ 12 files changed, 1109 insertions(+), 12 deletions(-) create mode 100644 kernel/src/transaction.rs create mode 100644 kernel/tests/write.rs diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 75a5f34a3..ff3d60f2e 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -329,6 +329,9 @@ pub enum KernelError { InternalError, InvalidExpression, InvalidLogPath, + InvalidCommitInfo, + FileAlreadyExists, + MissingCommitInfo, } impl From for KernelError { @@ -376,6 +379,9 @@ impl From for KernelError { } => Self::from(*source), Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression, Error::InvalidLogPath(_) => KernelError::InvalidLogPath, + Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo, + Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists, + Error::MissingCommitInfo => KernelError::MissingCommitInfo, } } } diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 5680e5afe..8205a2ccf 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -36,6 +36,8 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.4.0" } # used for developer-visibility visibility = "0.1.1" +# Used in the sync engine +tempfile = { version = "3", optional = true } # Used in default engine arrow-buffer = { workspace = true, optional = true } arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] } @@ -99,6 +101,7 @@ sync-engine = [ "arrow-json", "arrow-select", "parquet", + "tempfile", ] integration-test = [ "hdfs-native-object-store/integration-test", diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index ab2a9fee3..b23dd6511 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -46,6 +46,10 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { .into() }); +static LOG_COMMIT_INFO_SCHEMA: LazyLock = LazyLock::new(|| { + StructType::new([Option::::get_struct_field(COMMIT_INFO_NAME)]).into() +}); + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] fn get_log_schema() -> &'static SchemaRef { @@ -58,6 +62,10 @@ fn get_log_add_schema() -> &'static SchemaRef { &LOG_ADD_SCHEMA } +pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef { + &LOG_COMMIT_INFO_SCHEMA +} + #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table @@ -147,8 +155,26 @@ impl Protocol { } #[derive(Debug, Clone, PartialEq, Eq, Schema)] -pub struct CommitInfo { - pub kernel_version: Option, +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] +#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] +struct CommitInfo { + /// The time this logical file was created, as milliseconds since the epoch. + /// Read: optional, write: required (that is, kernel always writes). + /// If in-commit timestamps are enabled, this is always required. + pub(crate) timestamp: Option, + /// An arbitrary string that identifies the operation associated with this commit. This is + /// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes). + pub(crate) operation: Option, + /// Map of arbitrary string key-value pairs that provide additional information about the + /// operation. This is specified by the engine. For now this is always empty on write. + pub(crate) operation_parameters: Option>, + /// The version of the delta_kernel crate used to write this commit. The kernel will always + /// write this field, but it is optional since many tables will not have this field (i.e. any + /// tables not written by kernel). + pub(crate) kernel_version: Option, + /// A place for the engine to store additional metadata associated with this commit encoded as + /// a map of strings. + pub(crate) engine_commit_info: Option>, } #[derive(Debug, Clone, PartialEq, Eq, Schema)] @@ -427,4 +453,32 @@ mod tests { )])); assert_eq!(schema, expected); } + + #[test] + fn test_commit_info_schema() { + let schema = get_log_schema() + .project(&["commitInfo"]) + .expect("Couldn't get commitInfo field"); + + let expected = Arc::new(StructType::new(vec![StructField::new( + "commitInfo", + StructType::new(vec![ + StructField::new("timestamp", DataType::LONG, true), + StructField::new("operation", DataType::STRING, true), + StructField::new( + "operationParameters", + MapType::new(DataType::STRING, DataType::STRING, false), + true, + ), + StructField::new("kernelVersion", DataType::STRING, true), + StructField::new( + "engineCommitInfo", + MapType::new(DataType::STRING, DataType::STRING, false), + true, + ), + ]), + true, + )])); + assert_eq!(schema, expected); + } } diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index f8680b403..d8daba774 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -16,7 +16,7 @@ use arrow_array::{ cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait, RecordBatch, StringArray, StructArray, }; -use arrow_json::ReaderBuilder; +use arrow_json::{LineDelimitedWriter, ReaderBuilder}; use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, SchemaRef as ArrowSchemaRef, @@ -662,6 +662,21 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR Ok(concat_batches(&schema, output.iter())?) } +/// serialize an arrow RecordBatch to a JSON string by appending to a buffer. +// TODO (zach): this should stream data to the JSON writer and output an iterator. +pub(crate) fn to_json_bytes( + data: impl Iterator> + Send, +) -> DeltaResult> { + let mut writer = LineDelimitedWriter::new(Vec::new()); + for chunk in data.into_iter() { + let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?; + let record_batch = arrow_data.record_batch(); + writer.write(record_batch)?; + } + writer.finish()?; + Ok(writer.into_inner()) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -1408,4 +1423,24 @@ mod tests { assert_eq!(mask_indices, expect_mask); assert_eq!(reorder_indices, expect_reorder); } + + #[test] + fn test_write_json() -> DeltaResult<()> { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "string", + ArrowDataType::Utf8, + true, + )])); + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], + )?; + let data: Box = Box::new(ArrowEngineData::new(data)); + let json = to_json_bytes(Box::new(std::iter::once(data)))?; + assert_eq!( + json, + "{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes() + ); + Ok(()) + } } diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 7ea8b38cb..b03b26bc6 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -11,10 +11,12 @@ use bytes::{Buf, Bytes}; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, GetResultPayload}; +use url::Url; use super::executor::TaskExecutor; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; use crate::engine::arrow_utils::parse_json as arrow_parse_json; +use crate::engine::arrow_utils::to_json_bytes; use crate::schema::SchemaRef; use crate::{ DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, @@ -89,6 +91,31 @@ impl JsonHandler for DefaultJsonHandler { self.readahead, ) } + + // note: for now we just buffer all the data and write it out all at once + fn write_json_file( + &self, + path: &Url, + data: Box> + Send>, + _overwrite: bool, + ) -> DeltaResult<()> { + let buffer = to_json_bytes(data)?; + // Put if absent + let store = self.store.clone(); // cheap Arc + let path = Path::from(path.path()); + let path_str = path.to_string(); + self.task_executor + .block_on(async move { + store + .put_opts(&path, buffer.into(), object_store::PutMode::Create.into()) + .await + }) + .map_err(|e| match e { + object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str), + e => e.into(), + })?; + Ok(()) + } } /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index a63854ea7..016fb2658 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -1,13 +1,17 @@ -use std::{fs::File, io::BufReader}; +use std::{fs::File, io::BufReader, io::Write}; use arrow_schema::SchemaRef as ArrowSchemaRef; +use tempfile::NamedTempFile; +use url::Url; use super::read_files; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::parse_json as arrow_parse_json; +use crate::engine::arrow_utils::to_json_bytes; use crate::schema::SchemaRef; use crate::{ - DeltaResult, EngineData, ExpressionRef, FileDataReadResultIterator, FileMeta, JsonHandler, + DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, + JsonHandler, }; pub(crate) struct SyncJsonHandler; @@ -41,4 +45,98 @@ impl JsonHandler for SyncJsonHandler { ) -> DeltaResult> { arrow_parse_json(json_strings, output_schema) } + + // For sync writer we write data to a tmp file then atomically rename it to the final path. + // This is highly OS-dependent and for now relies on the atomicity of tempfile's + // `persist_noclobber`. + fn write_json_file( + &self, + path: &Url, + data: Box> + Send>, + _overwrite: bool, + ) -> DeltaResult<()> { + let path = path + .to_file_path() + .map_err(|_| crate::Error::generic("sync client can only read local files"))?; + let Some(parent) = path.parent() else { + return Err(crate::Error::generic(format!( + "no parent found for {:?}", + path + ))); + }; + + // write data to tmp file + let mut tmp_file = NamedTempFile::new_in(parent)?; + let buf = to_json_bytes(data)?; + tmp_file.write_all(&buf)?; + tmp_file.flush()?; + + // use 'persist_noclobber' to atomically rename tmp file to final path + tmp_file + .persist_noclobber(path.clone()) + .map_err(|e| match e { + tempfile::PersistError { error, .. } + if error.kind() == std::io::ErrorKind::AlreadyExists => + { + Error::FileAlreadyExists(path.to_string_lossy().to_string()) + } + e => Error::IOError(e.into()), + })?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + + use arrow_array::{RecordBatch, StringArray}; + use arrow_schema::DataType as ArrowDataType; + use arrow_schema::Field; + use arrow_schema::Schema as ArrowSchema; + use serde_json::json; + use url::Url; + + #[test] + fn test_write_json_file() -> DeltaResult<()> { + let test_dir = tempfile::tempdir().unwrap(); + let path = test_dir.path().join("00000000000000000001.json"); + let handler = SyncJsonHandler; + + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "dog", + ArrowDataType::Utf8, + true, + )])); + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))], + )?; + let data: Box = Box::new(ArrowEngineData::new(data)); + let empty: Box = + Box::new(ArrowEngineData::new(RecordBatch::new_empty(schema))); + + let url = Url::from_file_path(path.clone()).unwrap(); + handler + .write_json_file(&url, Box::new(std::iter::once(data)), false) + .expect("write json file"); + assert!(matches!( + handler.write_json_file(&url, Box::new(std::iter::once(empty)), false), + Err(Error::FileAlreadyExists(_)) + )); + + let file = std::fs::read_to_string(path)?; + let json: Vec<_> = serde_json::Deserializer::from_str(&file) + .into_iter::() + .flatten() + .collect(); + assert_eq!( + json, + vec![json!({"dog": "remi"}), json!({"dog": "wilson"}),] + ); + + Ok(()) + } } diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 78cab4ad6..6396f5b8f 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -159,6 +159,18 @@ pub enum Error { /// Unable to parse the name of a log path #[error("Invalid log path: {0}")] InvalidLogPath(String), + + /// Invalid commit info passed to the transaction + #[error("Invalid commit info: {0}")] + InvalidCommitInfo(String), + + /// Commit info was not passed to the transaction + #[error("Missing commit info")] + MissingCommitInfo, + + /// The file already exists at the path, prohibiting a non-overwrite write + #[error("File already exists: {0}")] + FileAlreadyExists(String), } // Convenience constructors for Error types that take a String argument diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 6829c8eca..6d47d9ae2 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -73,6 +73,7 @@ pub mod scan; pub mod schema; pub mod snapshot; pub mod table; +pub mod transaction; pub(crate) mod utils; pub use engine_data::{DataVisitor, EngineData}; @@ -206,6 +207,35 @@ pub trait JsonHandler: Send + Sync { physical_schema: SchemaRef, predicate: Option, ) -> DeltaResult; + + /// Atomically (!) write a single JSON file. Each row of the input data should be written as a + /// new JSON object appended to the file. this write must: + /// (1) serialize the data to newline-delimited json (each row is a json object literal) + /// (2) write the data to storage atomically (i.e. if the file already exists, fail unless the + /// overwrite flag is set) + /// + /// For example, the JSON data should be written as { "column1": "val1", "column2": "val2", .. } + /// with each row on a new line. + /// + /// NOTE: Null columns should not be written to the JSON file. For example, if a row has columns + /// ["a", "b"] and the value of "b" is null, the JSON object should be written as + /// { "a": "..." }. Note that including nulls is technically valid JSON, but would bloat the + /// log, therefore we recommend omitting them. + /// + /// # Parameters + /// + /// - `path` - URL specifying the location to write the JSON file + /// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as + /// a new JSON object appended to the file. (that is, the file is newline-delimeted JSON, and + /// each row is a JSON object on a single line) + /// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if + /// the file exists. + fn write_json_file( + &self, + path: &Url, + data: Box> + Send>, + overwrite: bool, + ) -> DeltaResult<()>; } /// Provides Parquet file related functionalities to Delta Kernel. diff --git a/kernel/src/path.rs b/kernel/src/path.rs index bdb443509..cc91a8ea9 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -73,6 +73,12 @@ impl AsUrl for FileMeta { } } +impl AsUrl for Url { + fn as_url(&self) -> &Url { + self + } +} + impl ParsedLogPath { // NOTE: We can't actually impl TryFrom because Option is a foreign struct even if T is local. #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] @@ -176,19 +182,31 @@ impl ParsedLogPath { } } +impl ParsedLogPath { + /// Create a new ParsedCommitPath for a new json commit file at the specified version + pub(crate) fn new_commit( + table_root: &Url, + version: Version, + ) -> DeltaResult> { + let filename = format!("{:020}.json", version); + let location = table_root.join("_delta_log/")?.join(&filename)?; + let path = Self::try_from(location)? + .ok_or_else(|| Error::internal_error("attempted to create invalid commit path"))?; + if !path.is_commit() { + return Err(Error::internal_error( + "ParsedLogPath::new_commit created a non-commit path", + )); + } + Ok(path) + } +} + #[cfg(test)] mod tests { use std::path::PathBuf; use super::*; - // Easier to test directly with Url instead of FileMeta! - impl AsUrl for Url { - fn as_url(&self) -> &Url { - self - } - } - fn table_log_dir_url() -> Url { let path = PathBuf::from("./tests/data/table-with-dv-small/_delta_log/"); let path = std::fs::canonicalize(path).unwrap(); @@ -518,4 +536,15 @@ mod tests { .unwrap(); ParsedLogPath::try_from(log_path).expect_err("non-numeric hi"); } + + #[test] + fn test_new_commit() { + let table_log_dir = table_log_dir_url(); + let log_path = ParsedLogPath::new_commit(&table_log_dir, 10).unwrap(); + assert_eq!(log_path.version, 10); + assert!(log_path.is_commit()); + assert_eq!(log_path.extension, "json"); + assert!(matches!(log_path.file_type, LogPathFileType::Commit)); + assert_eq!(log_path.filename, "00000000000000000010.json"); + } } diff --git a/kernel/src/table.rs b/kernel/src/table.rs index 590490410..d7aba9894 100644 --- a/kernel/src/table.rs +++ b/kernel/src/table.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use url::Url; use crate::snapshot::Snapshot; +use crate::transaction::Transaction; use crate::{DeltaResult, Engine, Error, Version}; /// In-memory representation of a Delta table, which acts as an immutable root entity for reading @@ -76,6 +77,11 @@ impl Table { pub fn snapshot(&self, engine: &dyn Engine, version: Option) -> DeltaResult { Snapshot::try_new(self.location.clone(), engine, version) } + + /// Create a new write transaction for this table. + pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult { + Ok(Transaction::new(self.snapshot(engine, None)?)) + } } #[derive(Debug)] diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs new file mode 100644 index 000000000..81b0f31f8 --- /dev/null +++ b/kernel/src/transaction.rs @@ -0,0 +1,544 @@ +use std::iter; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::actions::get_log_commit_info_schema; +use crate::actions::COMMIT_INFO_NAME; +use crate::error::Error; +use crate::expressions::{column_expr, Scalar, StructData}; +use crate::path::ParsedLogPath; +use crate::schema::{StructField, StructType}; +use crate::snapshot::Snapshot; +use crate::{DataType, DeltaResult, Engine, EngineData, Expression, Version}; + +const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); +const UNKNOWN_OPERATION: &str = "UNKNOWN"; + +/// A transaction represents an in-progress write to a table. After creating a transaction, changes +/// to the table may be staged via the transaction methods before calling `commit` to commit the +/// changes to the table. +/// +/// # Examples +/// +/// ```rust,ignore +/// // create a transaction +/// let mut txn = table.new_transaction(&engine)?; +/// // stage table changes (right now only commit info) +/// txn.commit_info(Box::new(ArrowEngineData::new(engine_commit_info))); +/// // commit! (consume the transaction) +/// txn.commit(&engine)?; +/// ``` +pub struct Transaction { + read_snapshot: Arc, + operation: Option, + commit_info: Option>, +} + +impl std::fmt::Debug for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!( + "Transaction {{ read_snapshot version: {}, commit_info: {} }}", + self.read_snapshot.version(), + self.commit_info.is_some() + )) + } +} + +impl Transaction { + /// Create a new transaction from a snapshot. The snapshot will be used to read the current + /// state of the table (e.g. to read the current version). + /// + /// Instead of using this API, the more typical (user-facing) API is + /// [Table::new_transaction](crate::table::Table::new_transaction) to create a transaction from + /// a table automatically backed by the latest snapshot. + pub(crate) fn new(snapshot: impl Into>) -> Self { + Transaction { + read_snapshot: snapshot.into(), + operation: None, + commit_info: None, + } + } + + /// Consume the transaction and commit it to the table. The result is a [CommitResult] which + /// will include the failed transaction in case of a conflict so the user can retry. + pub fn commit(self, engine: &dyn Engine) -> DeltaResult { + // step one: construct the iterator of actions we want to commit + // note: only support commit_info right now (and it's required) + let engine_commit_info = self + .commit_info + .as_ref() + .ok_or_else(|| Error::MissingCommitInfo)?; + let actions = Box::new(iter::once(generate_commit_info( + engine, + self.operation.as_deref(), + engine_commit_info.as_ref(), + )?)); + + // step two: set new commit version (current_version + 1) and path to write + let commit_version = self.read_snapshot.version() + 1; + let commit_path = + ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?; + + // step three: commit the actions as a json file in the log + let json_handler = engine.get_json_handler(); + match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { + Ok(()) => Ok(CommitResult::Committed(commit_version)), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), + Err(e) => Err(e), + } + } + + /// Set the operation that this transaction is performing. This string will be persisted in the + /// commit and visible to anyone who describes the table history. + pub fn with_operation(mut self, operation: String) -> Self { + self.operation = Some(operation); + self + } + + /// WARNING: This is an unstable API and will likely change in the future. + /// + /// Add commit info to the transaction. This is commit-wide metadata that is written as the + /// first action in the commit. The engine data passed here must have exactly one row, and we + /// only read one column: `engineCommitInfo` which must be a map encoding the + /// metadata. + /// + /// The engine is required to provide commit info before committing the transaction. If the + /// engine would like to omit engine-specific commit info, it can do so by passing pass a + /// commit_info engine data chunk with one row and one column of type `Map` + /// that can either be `null` or contain an empty map. + /// + /// Any other columns in the data chunk are ignored. + pub fn with_commit_info(mut self, commit_info: Box) -> Self { + self.commit_info = Some(commit_info.into()); + self + } +} + +/// Result after committing a transaction. If 'committed', the version is the new version written +/// to the log. If 'conflict', the transaction is returned so the caller can resolve the conflict +/// (along with the version which conflicted). +// TODO(zach): in order to make the returning of a transcation useful, we need to add APIs to +// update the transaction to a new version etc. +#[derive(Debug)] +pub enum CommitResult { + /// The transaction was successfully committed at the version. + Committed(Version), + /// The transaction conflicted with an existing version (at the version given). + Conflict(Transaction, Version), +} + +// given the engine's commit info we want to create commitInfo action to commit (and append more actions to) +fn generate_commit_info( + engine: &dyn Engine, + operation: Option<&str>, + engine_commit_info: &dyn EngineData, +) -> DeltaResult> { + if engine_commit_info.length() != 1 { + return Err(Error::InvalidCommitInfo(format!( + "Engine commit info should have exactly one row, found {}", + engine_commit_info.length() + ))); + } + + let timestamp: i64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|_| Error::generic("time went backwards"))? + .as_millis() + .try_into() + .map_err(|_| Error::generic("milliseconds since unix_epoch exceeded i64 size"))?; + let commit_info_exprs = [ + // TODO(zach): we should probably take a timestamp closer to actual commit time? + Expression::literal(timestamp), + Expression::literal(operation.unwrap_or(UNKNOWN_OPERATION)), + // HACK (part 1/2): since we don't have proper map support, we create a literal struct with + // one null field to create data that serializes as "operationParameters": {} + Expression::literal(Scalar::Struct(StructData::try_new( + vec![StructField::new( + "operation_parameter_int", + DataType::INTEGER, + true, + )], + vec![Scalar::Null(DataType::INTEGER)], + )?)), + Expression::literal(format!("v{}", KERNEL_VERSION)), + column_expr!("engineCommitInfo"), + ]; + let commit_info_expr = Expression::struct_from([Expression::struct_from(commit_info_exprs)]); + let commit_info_schema = get_log_commit_info_schema().as_ref(); + + // HACK (part 2/2): we need to modify the commit info schema to match the expression above (a + // struct with a single null int field). + let mut commit_info_empty_struct_schema = commit_info_schema.clone(); + let commit_info_field = commit_info_empty_struct_schema + .fields + .get_mut(COMMIT_INFO_NAME) + .ok_or_else(|| Error::missing_column(COMMIT_INFO_NAME))?; + let DataType::Struct(mut commit_info_data_type) = commit_info_field.data_type().clone() else { + return Err(Error::internal_error( + "commit_info_field should be a struct", + )); + }; + let engine_commit_info_schema = + commit_info_data_type.project_as_struct(&["engineCommitInfo"])?; + let hack_data_type = DataType::Struct(Box::new(StructType::new(vec![StructField::new( + "hack_operation_parameter_int", + DataType::INTEGER, + true, + )]))); + + commit_info_data_type + .fields + .get_mut("operationParameters") + .ok_or_else(|| Error::missing_column("operationParameters"))? + .data_type = hack_data_type; + commit_info_field.data_type = DataType::Struct(commit_info_data_type); + + let commit_info_evaluator = engine.get_expression_handler().get_evaluator( + engine_commit_info_schema.into(), + commit_info_expr, + commit_info_empty_struct_schema.into(), + ); + + commit_info_evaluator.evaluate(engine_commit_info) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::engine::arrow_data::ArrowEngineData; + use crate::engine::arrow_expression::ArrowExpressionHandler; + use crate::{ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler}; + + use arrow::json::writer::LineDelimitedWriter; + use arrow::record_batch::RecordBatch; + use arrow_array::builder::StringBuilder; + use arrow_schema::Schema as ArrowSchema; + use arrow_schema::{DataType as ArrowDataType, Field}; + + struct ExprEngine(Arc); + + impl ExprEngine { + fn new() -> Self { + ExprEngine(Arc::new(ArrowExpressionHandler)) + } + } + + impl Engine for ExprEngine { + fn get_expression_handler(&self) -> Arc { + self.0.clone() + } + + fn get_json_handler(&self) -> Arc { + unimplemented!() + } + + fn get_parquet_handler(&self) -> Arc { + unimplemented!() + } + + fn get_file_system_client(&self) -> Arc { + unimplemented!() + } + } + + fn build_map(entries: Vec<(&str, &str)>) -> arrow_array::MapArray { + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = arrow_array::builder::MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = + arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + for (key, val) in entries { + builder.keys().append_value(key); + builder.values().append_value(val); + builder.append(true).unwrap(); + } + builder.finish() + } + + // convert it to JSON just for ease of comparison (and since we ultimately persist as JSON) + fn as_json_and_scrub_timestamp(data: Box) -> serde_json::Value { + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .unwrap() + .into(); + + let buf = Vec::new(); + let mut writer = LineDelimitedWriter::new(buf); + writer.write_batches(&[&record_batch]).unwrap(); + writer.finish().unwrap(); + let buf = writer.into_inner(); + + let mut result: serde_json::Value = serde_json::from_slice(&buf).unwrap(); + *result + .get_mut("commitInfo") + .unwrap() + .get_mut("timestamp") + .unwrap() = serde_json::Value::Number(0.into()); + result + } + + #[test] + fn test_generate_commit_info() -> DeltaResult<()> { + let engine = ExprEngine::new(); + let engine_commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "engineCommitInfo", + ArrowDataType::Map( + Arc::new(Field::new( + "entries", + ArrowDataType::Struct( + vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + )])); + + let map_array = build_map(vec![("engineInfo", "default engine")]); + let commit_info_batch = + RecordBatch::try_new(engine_commit_info_schema, vec![Arc::new(map_array)])?; + + let actions = generate_commit_info( + &engine, + Some("test operation"), + &ArrowEngineData::new(commit_info_batch), + )?; + + let expected = serde_json::json!({ + "commitInfo": { + "timestamp": 0, + "operation": "test operation", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }); + + assert_eq!(actions.length(), 1); + let result = as_json_and_scrub_timestamp(actions); + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_commit_info_with_multiple_columns() -> DeltaResult<()> { + let engine = ExprEngine::new(); + let engine_commit_info_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "engineCommitInfo", + ArrowDataType::Map( + Arc::new(Field::new( + "entries", + ArrowDataType::Struct( + vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + ), + Field::new("operation", ArrowDataType::Utf8, true), + ])); + + let map_array = build_map(vec![("engineInfo", "default engine")]); + + let commit_info_batch = RecordBatch::try_new( + engine_commit_info_schema, + vec![ + Arc::new(map_array), + Arc::new(arrow_array::StringArray::from(vec!["some_string"])), + ], + )?; + + let actions = generate_commit_info( + &engine, + Some("test operation"), + &ArrowEngineData::new(commit_info_batch), + )?; + + let expected = serde_json::json!({ + "commitInfo": { + "timestamp": 0, + "operation": "test operation", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }); + + assert_eq!(actions.length(), 1); + let result = as_json_and_scrub_timestamp(actions); + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_invalid_commit_info_missing_column() -> DeltaResult<()> { + let engine = ExprEngine::new(); + let engine_commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "some_column_name", + ArrowDataType::Utf8, + true, + )])); + let commit_info_batch = RecordBatch::try_new( + engine_commit_info_schema, + vec![Arc::new(arrow_array::StringArray::new_null(1))], + )?; + + let _ = generate_commit_info( + &engine, + Some("test operation"), + &ArrowEngineData::new(commit_info_batch), + ) + .map_err(|e| match e { + Error::Arrow(arrow_schema::ArrowError::SchemaError(_)) => (), + Error::Backtraced { source, .. } + if matches!( + &*source, + Error::Arrow(arrow_schema::ArrowError::SchemaError(_)) + ) => {} + _ => panic!("expected arrow schema error error, got {:?}", e), + }); + + Ok(()) + } + + #[test] + fn test_invalid_commit_info_invalid_column_type() -> DeltaResult<()> { + let engine = ExprEngine::new(); + let engine_commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "engineCommitInfo", + ArrowDataType::Utf8, + true, + )])); + let commit_info_batch = RecordBatch::try_new( + engine_commit_info_schema, + vec![Arc::new(arrow_array::StringArray::new_null(1))], + )?; + + let _ = generate_commit_info( + &engine, + Some("test operation"), + &ArrowEngineData::new(commit_info_batch), + ) + .map_err(|e| match e { + Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(_)) => (), + Error::Backtraced { source, .. } + if matches!( + &*source, + Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(_)) + ) => {} + _ => panic!("expected arrow invalid arg error, got {:?}", e), + }); + + Ok(()) + } + + fn assert_empty_commit_info( + data: Box, + write_engine_commit_info: bool, + ) -> DeltaResult<()> { + assert_eq!(data.length(), 1); + let expected = if write_engine_commit_info { + serde_json::json!({ + "commitInfo": { + "timestamp": 0, + "operation": "test operation", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": {} + } + }) + } else { + serde_json::json!({ + "commitInfo": { + "timestamp": 0, + "operation": "test operation", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + } + }) + }; + let result = as_json_and_scrub_timestamp(data); + assert_eq!(result, expected); + Ok(()) + } + + // Three cases for empty commit info: + // 1. `engineCommitInfo` column with an empty Map + // 2. `engineCommitInfo` null column of type Map + // 3. a column that has a name other than `engineCommitInfo`; Delta can detect that the column + // is missing and substitute a null literal in its place. The type of that column doesn't + // matter, Delta will ignore it. + #[test] + fn test_empty_commit_info() -> DeltaResult<()> { + // test with null map and empty map + for is_null in [true, false] { + let engine = ExprEngine::new(); + let engine_commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "engineCommitInfo", + ArrowDataType::Map( + Arc::new(Field::new( + "entries", + ArrowDataType::Struct( + vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + true, + )])); + use arrow_array::builder::StringBuilder; + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = arrow_array::builder::MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = + arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + builder.append(is_null).unwrap(); + let array = builder.finish(); + + let commit_info_batch = + RecordBatch::try_new(engine_commit_info_schema, vec![Arc::new(array)])?; + + let actions = generate_commit_info( + &engine, + Some("test operation"), + &ArrowEngineData::new(commit_info_batch), + )?; + + assert_empty_commit_info(actions, is_null)?; + } + Ok(()) + } +} diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs new file mode 100644 index 000000000..212b06cae --- /dev/null +++ b/kernel/tests/write.rs @@ -0,0 +1,253 @@ +use std::sync::Arc; + +use arrow::array::StringArray; +use arrow::record_batch::RecordBatch; +use arrow_schema::Schema as ArrowSchema; +use arrow_schema::{DataType as ArrowDataType, Field}; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::ObjectStore; +use serde_json::{json, to_vec}; +use url::Url; + +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; +use delta_kernel::{Error as KernelError, Table}; + +// setup default engine with in-memory object store. +fn setup( + table_name: &str, +) -> ( + Arc, + DefaultEngine, + Url, +) { + let table_root_path = Path::from(format!("/{table_name}")); + let url = Url::parse(&format!("memory:///{}/", table_root_path)).unwrap(); + let storage = Arc::new(InMemory::new()); + ( + storage.clone(), + DefaultEngine::new( + storage, + table_root_path, + Arc::new(TokioBackgroundExecutor::new()), + ), + url, + ) +} + +// we provide this table creation function since we only do appends to existing tables for now. +// this will just create an empty table with the given schema. (just protocol + metadata actions) +async fn create_table( + store: Arc, + table_path: Url, + schema: SchemaRef, + partition_columns: &[&str], +) -> Result> { + let table_id = "test_id"; + let schema = serde_json::to_string(&schema)?; + + let protocol = json!({ + "protocol": { + "minReaderVersion": 3, + "minWriterVersion": 7, + "readerFeatures": [], + "writerFeatures": [] + } + }); + let metadata = json!({ + "metaData": { + "id": table_id, + "format": { + "provider": "parquet", + "options": {} + }, + "schemaString": schema, + "partitionColumns": partition_columns, + "configuration": {}, + "createdTime": 1677811175819u64 + } + }); + + let data = [ + to_vec(&protocol).unwrap(), + b"\n".to_vec(), + to_vec(&metadata).unwrap(), + ] + .concat(); + + // put 0.json with protocol + metadata + let path = table_path.path(); + let path = format!("{path}_delta_log/00000000000000000000.json"); + store.put(&Path::from(path), data.into()).await?; + Ok(Table::new(table_path)) +} + +#[tokio::test] +async fn test_commit_info() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table"); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema, &[]).await?; + + // create commit info of the form {engineCommitInfo: Map { "engineInfo": "default engine" } } + let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "engineCommitInfo", + ArrowDataType::Map( + Arc::new(Field::new( + "entries", + ArrowDataType::Struct( + vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + )])); + + use arrow_array::builder::StringBuilder; + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = arrow_array::builder::MapFieldNames { + entry: "entries".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + builder.keys().append_value("engineInfo"); + builder.values().append_value("default engine"); + builder.append(true).unwrap(); + let array = builder.finish(); + + let commit_info_batch = + RecordBatch::try_new(commit_info_schema.clone(), vec![Arc::new(array)])?; + + // create a transaction + let txn = table + .new_transaction(&engine)? + .with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch))); + + // commit! + txn.commit(&engine)?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + + let mut parsed_commit: serde_json::Value = serde_json::from_slice(&commit1.bytes().await?)?; + *parsed_commit + .get_mut("commitInfo") + .unwrap() + .get_mut("timestamp") + .unwrap() = serde_json::Value::Number(0.into()); + + let expected_commit = json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }); + + assert_eq!(parsed_commit, expected_commit); + Ok(()) +} + +#[tokio::test] +async fn test_empty_commit() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table"); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema, &[]).await?; + + assert!(matches!( + table.new_transaction(&engine)?.commit(&engine).unwrap_err(), + KernelError::MissingCommitInfo + )); + + Ok(()) +} + +#[tokio::test] +async fn test_invalid_commit_info() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table"); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema, &[]).await?; + + // empty commit info test + let commit_info_schema = Arc::new(ArrowSchema::empty()); + let commit_info_batch = RecordBatch::new_empty(commit_info_schema.clone()); + assert!(commit_info_batch.num_rows() == 0); + let txn = table + .new_transaction(&engine)? + .with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch))); + + // commit! + assert!(matches!( + txn.commit(&engine), + Err(KernelError::InvalidCommitInfo(_)) + )); + + // two-row commit info test + let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "engineInfo", + ArrowDataType::Utf8, + true, + )])); + let commit_info_batch = RecordBatch::try_new( + commit_info_schema.clone(), + vec![Arc::new(StringArray::from(vec![ + "row1: default engine", + "row2: default engine", + ]))], + )?; + + let txn = table + .new_transaction(&engine)? + .with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch))); + + // commit! + assert!(matches!( + txn.commit(&engine), + Err(KernelError::InvalidCommitInfo(_)) + )); + Ok(()) +}