Skip to content

Commit

Permalink
materialize write_metadata as vec
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 29, 2024
1 parent a6290f2 commit c7dac51
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 81 deletions.
2 changes: 1 addition & 1 deletion kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
Expand Down
112 changes: 69 additions & 43 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Default Parquet handler implementation
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray};
use futures::StreamExt;
use object_store::path::Path;
Expand Down Expand Up @@ -43,6 +45,53 @@ impl ParquetMetadata {
pub fn new(file_meta: FileMeta) -> Self {
Self { file_meta }
}

// convert ParquetMetadata into a record batch which matches the 'write_metadata' schema
fn create_write_metadata(
&self,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let ParquetMetadata { file_meta } = self;
let FileMeta {
location,
last_modified,
size,
} = file_meta;
let write_metadata_schema = crate::transaction::get_write_metadata_schema();

// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
let key_builder = StringBuilder::new();
let val_builder = StringBuilder::new();
let names = MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
};
let mut builder = MapBuilder::new(Some(names), key_builder, val_builder);
if partition_values.is_empty() {
builder.append(true).unwrap();
} else {
for (k, v) in partition_values {
builder.keys().append_value(&k);
builder.values().append_value(&v);
builder.append(true).unwrap();
}
}
let partitions = Arc::new(builder.finish());
// this means max size we can write is i64::MAX (~8EB)
let size: i64 = (*size)
.try_into()
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
let size = Arc::new(Int64Array::from(vec![size]));
let data_change = Arc::new(BooleanArray::from(vec![data_change]));
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(write_metadata_schema.as_ref().try_into()?),
vec![path, partitions, size, modification_time, data_change],
)?)))
}
}

impl<E: TaskExecutor> DefaultParquetHandler<E> {
Expand All @@ -62,6 +111,11 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
self
}

// Write `data` to `path`/<uuid>.parquet as parquet using ArrowWriter and return the parquet
// metadata.
//
// Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in
// order to obtain metadata about the object just written.
async fn write_parquet(
&self,
path: &url::Url,
Expand Down Expand Up @@ -97,55 +151,21 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
Ok(ParquetMetadata::new(file_meta))
}

/// Write `data` to `path`/<uuid>.parquet as parquet using ArrowWriter and return the parquet
/// metadata as an EngineData batch which matches the [write metadata] schema
///
/// [write metadata]: crate::transaction::get_write_metadata_schema
pub async fn write_parquet_file(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
partition_values: std::collections::HashMap<String, String>,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let ParquetMetadata { file_meta } = self.write_parquet(path, data).await?;
let FileMeta {
location,
last_modified,
size,
} = file_meta;
let write_metadata_schema = crate::transaction::get_write_metadata_schema();

// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
use arrow_array::builder::StringBuilder;
let key_builder = StringBuilder::new();
let val_builder = StringBuilder::new();
let names = arrow_array::builder::MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
};
let mut builder =
arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder);
if partition_values.is_empty() {
builder.append(true).unwrap();
} else {
for (k, v) in partition_values {
builder.keys().append_value(&k);
builder.values().append_value(&v);
builder.append(true).unwrap();
}
}
let partitions = Arc::new(builder.finish());
// this means max size we can write is i64::MAX (~8EB)
let size: i64 = size
.try_into()
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
let size = Arc::new(Int64Array::from(vec![size]));
let data_change = Arc::new(BooleanArray::from(vec![data_change]));
let modification_time = Arc::new(Int64Array::from(vec![last_modified]));

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(write_metadata_schema.as_ref().try_into()?),
vec![path, partitions, size, modification_time, data_change],
)?)))
let parquet_metadata = self.write_parquet(path, data).await?;
let write_metadata =
parquet_metadata.create_write_metadata(partition_values, data_change)?;
Ok(write_metadata)
}
}

Expand Down Expand Up @@ -401,4 +421,10 @@ mod tests {
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 10);
}

#[test]
fn test_into_write_metadata() {}

#[tokio::test]
async fn test_write_parquet() {}
}
2 changes: 1 addition & 1 deletion kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let path = path
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub trait JsonHandler: Send + Sync {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>,
overwrite: bool,
) -> DeltaResult<()>;
}
Expand Down
73 changes: 42 additions & 31 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::HashMap;
use std::iter;
use std::mem;
use std::sync::{Arc, LazyLock};
use std::time::{SystemTime, UNIX_EPOCH};

use crate::actions::get_log_commit_info_schema;
use crate::actions::schemas::{GetNullableContainerStructField, GetStructField};
use crate::actions::{ADD_NAME, COMMIT_INFO_NAME};
use crate::actions::COMMIT_INFO_NAME;
use crate::actions::{get_log_add_schema, get_log_commit_info_schema};
use crate::error::Error;
use crate::expressions::{column_expr, ColumnName, Scalar, StructData};
use crate::path::ParsedLogPath;
Expand Down Expand Up @@ -57,7 +56,7 @@ pub struct Transaction {
read_snapshot: Arc<Snapshot>,
operation: Option<String>,
commit_info: Option<Arc<dyn EngineData>>,
write_metadata: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
write_metadata: Vec<Box<dyn EngineData>>,
}

impl std::fmt::Debug for Transaction {
Expand All @@ -82,7 +81,7 @@ impl Transaction {
read_snapshot: snapshot.into(),
operation: None,
commit_info: None,
write_metadata: Box::new(std::iter::empty()),
write_metadata: vec![],
}
}

Expand All @@ -103,7 +102,7 @@ impl Transaction {

// TODO consider IntoIterator so we can have multiple write_metadata iterators (and return
// self in the conflict case for retries)
let adds = generate_adds(engine, self.write_metadata);
let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref()));
let actions = chain(actions, adds);

// step two: set new commit version (current_version + 1) and path to write
Expand All @@ -115,8 +114,7 @@ impl Transaction {
let json_handler = engine.get_json_handler();
match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) {
Ok(()) => Ok(CommitResult::Committed(commit_version)),
// FIXME
// Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)),
Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)),
Err(e) => Err(e),
}
}
Expand Down Expand Up @@ -167,6 +165,9 @@ impl Transaction {

/// Get the write context for this transaction. At the moment, this is constant for the whole
/// transaction.
// Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure
// that engines cannot call this method after a metadata change, since the write context could
// have invalid metadata.
pub fn write_context(&self) -> WriteContext {
let target_dir = self.read_snapshot.table_root();
let snapshot_schema = self.read_snapshot.schema();
Expand All @@ -181,30 +182,23 @@ impl Transaction {
}

/// Add write metadata about files to include in the transaction. This API can be called
/// multiple times to add multiple iterators.
/// multiple times to add multiple batches.
///
/// TODO what is expected schema for the batches?
pub fn add_write_metadata(
&mut self,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
) {
let write_metadata = mem::replace(&mut self.write_metadata, Box::new(std::iter::empty()));
self.write_metadata = Box::new(chain(write_metadata, data));
/// The expected schema for the write metadata is given by [`get_write_metadata_schema`].
pub fn add_write_metadata(&mut self, data: Box<dyn EngineData>) {
self.write_metadata.push(data);
}
}

// this does something similar to adding top-level 'commitInfo' named struct. we should unify.
fn generate_adds(
// convert write_metadata into add actions using an expression to transform the data in a single
// pass
fn generate_adds<'a>(
engine: &dyn Engine,
write_metadata: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
) -> Box<dyn Iterator<Item = Box<dyn EngineData>> + Send> {
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a,
) -> Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + 'a> {
let expression_handler = engine.get_expression_handler();
let write_metadata_schema = get_write_metadata_schema();
let log_schema: DataType = DataType::struct_type(vec![StructField::new(
ADD_NAME,
write_metadata_schema.clone(),
true,
)]);
let log_schema = get_log_add_schema();

Box::new(write_metadata.map(move |write_metadata_batch| {
let adds_expr = Expression::struct_from([Expression::struct_from(
Expand All @@ -215,22 +209,23 @@ fn generate_adds(
let adds_evaluator = expression_handler.get_evaluator(
write_metadata_schema.clone(),
adds_expr,
log_schema.clone(),
log_schema.clone().into(),
);
adds_evaluator
.evaluate(write_metadata_batch.as_ref())
.evaluate(write_metadata_batch)
.expect("fixme")
}))
}

/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to
/// write table data.
///
/// [`Transaction`]: struct.Transaction.html
pub struct WriteContext {
pub target_dir: Url,
pub schema: SchemaRef,
pub partition_cols: Vec<String>,
pub logical_to_physical: Expression,
target_dir: Url,
schema: SchemaRef,
partition_cols: Vec<String>,
logical_to_physical: Expression,
}

impl WriteContext {
Expand All @@ -247,6 +242,22 @@ impl WriteContext {
logical_to_physical,
}
}

pub fn target_dir(&self) -> &Url {
&self.target_dir
}

pub fn schema(&self) -> &SchemaRef {
&self.schema
}

pub fn partition_cols(&self) -> &[String] {
&self.partition_cols
}

pub fn logical_to_physical(&self) -> &Expression {
&self.logical_to_physical
}
}

/// Result after committing a transaction. If 'committed', the version is the new version written
Expand Down
14 changes: 10 additions & 4 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
let parquet_handler = &engine.parquet;
parquet_handler
.write_parquet_file(
&write_context.target_dir,
write_context.target_dir(),
data.expect("FIXME"),
std::collections::HashMap::new(),
true,
Expand Down Expand Up @@ -377,7 +377,10 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
.await
.into_iter()
.map(|data| data.unwrap());
txn.add_write_metadata(Box::new(write_metadata));

for meta in write_metadata {
txn.add_write_metadata(meta);
}

// commit!
txn.commit(engine.as_ref())?;
Expand Down Expand Up @@ -526,7 +529,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
let parquet_handler = &engine.parquet;
parquet_handler
.write_parquet_file(
&write_context.target_dir,
write_context.target_dir(),
data.expect("FIXME"),
std::collections::HashMap::from([(
partition_col.to_string(),
Expand All @@ -544,7 +547,10 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
.await
.into_iter()
.map(|data| data.unwrap());
txn.add_write_metadata(Box::new(write_metadata));

for meta in write_metadata {
txn.add_write_metadata(meta);
}

// commit!
txn.commit(engine.as_ref())?;
Expand Down

0 comments on commit c7dac51

Please sign in to comment.