Skip to content

Commit

Permalink
modify write_json_files to take iterator of DeltaResult
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 29, 2024
1 parent c7dac51 commit 8f1a446
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 19 deletions.
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = Box<dyn EngineData>> + Send,
data: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
Expand Down Expand Up @@ -1436,7 +1436,7 @@ mod tests {
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let path = path
Expand Down Expand Up @@ -120,10 +120,10 @@ mod tests {

let url = Url::from_file_path(path.clone()).unwrap();
handler
.write_json_file(&url, Box::new(std::iter::once(data)), false)
.write_json_file(&url, Box::new(std::iter::once(Ok(data))), false)
.expect("write json file");
assert!(matches!(
handler.write_json_file(&url, Box::new(std::iter::once(empty)), false),
handler.write_json_file(&url, Box::new(std::iter::once(Ok(empty))), false),
Err(Error::FileAlreadyExists(_))
));

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub trait JsonHandler: Send + Sync {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
overwrite: bool,
) -> DeltaResult<()>;
}
Expand Down
16 changes: 5 additions & 11 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl Transaction {
/// will include the failed transaction in case of a conflict so the user can retry.
pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
// step one: construct the iterator of actions we want to commit
// note: only support commit_info right now (and it's required)
let engine_commit_info = self
.commit_info
.as_ref()
Expand All @@ -98,10 +97,7 @@ impl Transaction {
engine,
self.operation.as_deref(),
engine_commit_info.as_ref(),
)?));

// TODO consider IntoIterator so we can have multiple write_metadata iterators (and return
// self in the conflict case for retries)
)));
let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref()));
let actions = chain(actions, adds);

Expand Down Expand Up @@ -144,8 +140,8 @@ impl Transaction {
self
}

// Generate the logical-to-physical transform expression for this transaction. At the moment,
// this is a transaction-wide expression.
// Generate the logical-to-physical transform expression which must be evaluated on every data
// chunk before writing. At the moment, this is a transaction-wide expression.
fn generate_logical_to_physical(&self) -> Expression {
// for now, we just pass through all the columns except partition columns.
// note this is _incorrect_ if table config deems we need partition columns.
Expand Down Expand Up @@ -195,7 +191,7 @@ impl Transaction {
fn generate_adds<'a>(
engine: &dyn Engine,
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a,
) -> Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + 'a> {
) -> Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a> {
let expression_handler = engine.get_expression_handler();
let write_metadata_schema = get_write_metadata_schema();
let log_schema = get_log_add_schema();
Expand All @@ -211,9 +207,7 @@ fn generate_adds<'a>(
adds_expr,
log_schema.clone().into(),
);
adds_evaluator
.evaluate(write_metadata_batch)
.expect("fixme")
adds_evaluator.evaluate(write_metadata_batch)
}))
}

Expand Down

0 comments on commit 8f1a446

Please sign in to comment.