Skip to content

Commit

Permalink
[write stage0] add Transaction with commit info and commit implementa…
Browse files Browse the repository at this point in the history
…tion (#370)

This PR does 4 main things:
1. ~reorganize `transaction.rs` so that the transaction action is now
moved to actions module~
**EDIT:** now in #386 
1. new `Transaction` API which includes:
a. `Table.new_transaction()` to create a new transaction from the latest
snapshot of the table
b. `Transaction.with_commit_info(engine_commit_info: Box<dyn
EngineData>)` to add single-row commit info in the form of a
`map<string, string>`. required to commit.
c. `Transaction.with_operation(operation: String)` to set the operation
name of the transaction (persisted in commit info)
d. `Transaction.commit() // consumes transaction` to commit the
transaction to the log (currently only supporting committing the commit
info)
1. new engine API: `write_json_file(impl Iterator<Item = Box<dyn
EngineData>>)` (and a default engine implementation for this)
1. new integration test suite `write.rs` to house many of our write
tests as it's implemented

resolves #378

---------

Co-authored-by: Ryan Johnson <[email protected]>
Co-authored-by: Nick Lanham <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2024
1 parent f5d0a42 commit 80f08db
Show file tree
Hide file tree
Showing 12 changed files with 1,109 additions and 12 deletions.
6 changes: 6 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ pub enum KernelError {
InternalError,
InvalidExpression,
InvalidLogPath,
InvalidCommitInfo,
FileAlreadyExists,
MissingCommitInfo,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -376,6 +379,9 @@ impl From<Error> for KernelError {
} => Self::from(*source),
Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression,
Error::InvalidLogPath(_) => KernelError::InvalidLogPath,
Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo,
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.4.0" }
# used for developer-visibility
visibility = "0.1.1"

# Used in the sync engine
tempfile = { version = "3", optional = true }
# Used in default engine
arrow-buffer = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
Expand Down Expand Up @@ -99,6 +101,7 @@ sync-engine = [
"arrow-json",
"arrow-select",
"parquet",
"tempfile",
]
integration-test = [
"hdfs-native-object-store/integration-test",
Expand Down
58 changes: 56 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
.into()
});

static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into()
});

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_schema() -> &'static SchemaRef {
Expand All @@ -58,6 +62,10 @@ fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}

pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Format {
/// Name of the encoding for files in this table
Expand Down Expand Up @@ -147,8 +155,26 @@ impl Protocol {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct CommitInfo {
pub kernel_version: Option<String>,
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
/// If in-commit timestamps are enabled, this is always required.
pub(crate) timestamp: Option<i64>,
/// An arbitrary string that identifies the operation associated with this commit. This is
/// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes).
pub(crate) operation: Option<String>,
/// Map of arbitrary string key-value pairs that provide additional information about the
/// operation. This is specified by the engine. For now this is always empty on write.
pub(crate) operation_parameters: Option<HashMap<String, String>>,
/// The version of the delta_kernel crate used to write this commit. The kernel will always
/// write this field, but it is optional since many tables will not have this field (i.e. any
/// tables not written by kernel).
pub(crate) kernel_version: Option<String>,
/// A place for the engine to store additional metadata associated with this commit encoded as
/// a map of strings.
pub(crate) engine_commit_info: Option<HashMap<String, String>>,
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
Expand Down Expand Up @@ -427,4 +453,32 @@ mod tests {
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_commit_info_schema() {
let schema = get_log_schema()
.project(&["commitInfo"])
.expect("Couldn't get commitInfo field");

let expected = Arc::new(StructType::new(vec![StructField::new(
"commitInfo",
StructType::new(vec![
StructField::new("timestamp", DataType::LONG, true),
StructField::new("operation", DataType::STRING, true),
StructField::new(
"operationParameters",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
StructField::new("kernelVersion", DataType::STRING, true),
StructField::new(
"engineCommitInfo",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
]),
true,
)]));
assert_eq!(schema, expected);
}
}
37 changes: 36 additions & 1 deletion kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
RecordBatch, StringArray, StructArray,
};
use arrow_json::ReaderBuilder;
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
SchemaRef as ArrowSchemaRef,
Expand Down Expand Up @@ -662,6 +662,21 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
Ok(concat_batches(&schema, output.iter())?)
}

/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = Box<dyn EngineData>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
writer.finish()?;
Ok(writer.into_inner())
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -1408,4 +1423,24 @@ mod tests {
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn test_write_json() -> DeltaResult<()> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"string",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
);
Ok(())
}
}
27 changes: 27 additions & 0 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use bytes::{Buf, Bytes};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};
use url::Url;

use super::executor::TaskExecutor;
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::engine::arrow_utils::to_json_bytes;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
Expand Down Expand Up @@ -89,6 +91,31 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
self.readahead,
)
}

// note: for now we just buffer all the data and write it out all at once
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
_overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
// Put if absent
let store = self.store.clone(); // cheap Arc
let path = Path::from(path.path());
let path_str = path.to_string();
self.task_executor
.block_on(async move {
store
.put_opts(&path, buffer.into(), object_store::PutMode::Create.into())
.await
})
.map_err(|e| match e {
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str),
e => e.into(),
})?;
Ok(())
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
102 changes: 100 additions & 2 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::{fs::File, io::BufReader};
use std::{fs::File, io::BufReader, io::Write};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use tempfile::NamedTempFile;
use url::Url;

use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::engine::arrow_utils::to_json_bytes;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, ExpressionRef, FileDataReadResultIterator, FileMeta, JsonHandler,
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
JsonHandler,
};

pub(crate) struct SyncJsonHandler;
Expand Down Expand Up @@ -41,4 +45,98 @@ impl JsonHandler for SyncJsonHandler {
) -> DeltaResult<Box<dyn EngineData>> {
arrow_parse_json(json_strings, output_schema)
}

// For sync writer we write data to a tmp file then atomically rename it to the final path.
// This is highly OS-dependent and for now relies on the atomicity of tempfile's
// `persist_noclobber`.
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
_overwrite: bool,
) -> DeltaResult<()> {
let path = path
.to_file_path()
.map_err(|_| crate::Error::generic("sync client can only read local files"))?;
let Some(parent) = path.parent() else {
return Err(crate::Error::generic(format!(
"no parent found for {:?}",
path
)));
};

// write data to tmp file
let mut tmp_file = NamedTempFile::new_in(parent)?;
let buf = to_json_bytes(data)?;
tmp_file.write_all(&buf)?;
tmp_file.flush()?;

// use 'persist_noclobber' to atomically rename tmp file to final path
tmp_file
.persist_noclobber(path.clone())
.map_err(|e| match e {
tempfile::PersistError { error, .. }
if error.kind() == std::io::ErrorKind::AlreadyExists =>
{
Error::FileAlreadyExists(path.to_string_lossy().to_string())
}
e => Error::IOError(e.into()),
})?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::Arc;

use arrow_array::{RecordBatch, StringArray};
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field;
use arrow_schema::Schema as ArrowSchema;
use serde_json::json;
use url::Url;

#[test]
fn test_write_json_file() -> DeltaResult<()> {
let test_dir = tempfile::tempdir().unwrap();
let path = test_dir.path().join("00000000000000000001.json");
let handler = SyncJsonHandler;

let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"dog",
ArrowDataType::Utf8,
true,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let empty: Box<dyn EngineData> =
Box::new(ArrowEngineData::new(RecordBatch::new_empty(schema)));

let url = Url::from_file_path(path.clone()).unwrap();
handler
.write_json_file(&url, Box::new(std::iter::once(data)), false)
.expect("write json file");
assert!(matches!(
handler.write_json_file(&url, Box::new(std::iter::once(empty)), false),
Err(Error::FileAlreadyExists(_))
));

let file = std::fs::read_to_string(path)?;
let json: Vec<_> = serde_json::Deserializer::from_str(&file)
.into_iter::<serde_json::Value>()
.flatten()
.collect();
assert_eq!(
json,
vec![json!({"dog": "remi"}), json!({"dog": "wilson"}),]
);

Ok(())
}
}
12 changes: 12 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ pub enum Error {
/// Unable to parse the name of a log path
#[error("Invalid log path: {0}")]
InvalidLogPath(String),

/// Invalid commit info passed to the transaction
#[error("Invalid commit info: {0}")]
InvalidCommitInfo(String),

/// Commit info was not passed to the transaction
#[error("Missing commit info")]
MissingCommitInfo,

/// The file already exists at the path, prohibiting a non-overwrite write
#[error("File already exists: {0}")]
FileAlreadyExists(String),
}

// Convenience constructors for Error types that take a String argument
Expand Down
Loading

0 comments on commit 80f08db

Please sign in to comment.