Skip to content

Commit

Permalink
feat: allow multiple incremental commits in optimize
Browse files Browse the repository at this point in the history
Currently "optimize" executes the whole plan in one commit, which might
fail. The larger the table, the more likely it is to fail and the more
expensive the failure is.

Add an option in OptimizeBuilder that allows specifying a commit
interval. If that is provided, the plan executor will periodically
commit the accumulated actions.
  • Loading branch information
kvap committed Sep 9, 2023
1 parent 30c55d4 commit acca200
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 77 deletions.
191 changes: 114 additions & 77 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH, Duration, Instant};

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -172,6 +172,7 @@ pub struct OptimizeBuilder<'a> {
max_spill_size: usize,
/// Optimize type
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
}

impl<'a> OptimizeBuilder<'a> {
Expand All @@ -188,6 +189,7 @@ impl<'a> OptimizeBuilder<'a> {
max_concurrent_tasks: num_cpus::get(),
max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
}
}

Expand Down Expand Up @@ -241,6 +243,12 @@ impl<'a> OptimizeBuilder<'a> {
self.max_spill_size = max_spill_size;
self
}

/// Min commit interval
pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
self.min_commit_interval = Some(min_commit_interval);
self
}
}

impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
Expand All @@ -265,6 +273,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
writer_properties,
this.max_concurrent_tasks,
this.max_spill_size,
this.min_commit_interval,
)?;
let metrics = plan.execute(this.store.clone(), &this.snapshot).await?;
let mut table = DeltaTable::new_with_state(this.store, this.snapshot);
Expand Down Expand Up @@ -347,6 +356,7 @@ pub struct MergePlan {
#[allow(dead_code)]
/// Maximum number of bytes that are allowed to spill to disk
max_spill_size: usize,
min_commit_interval: Option<Duration>,
}

/// Parameters passed to individual merge tasks
Expand Down Expand Up @@ -580,50 +590,50 @@ impl MergePlan {
let mut actions = vec![];

// Need to move metrics and operations out of self, so we can use self in the stream
let mut metrics = std::mem::take(&mut self.metrics);
let orig_metrics = std::mem::take(&mut self.metrics);
let mut metrics = orig_metrics.clone();
let mut total_metrics = orig_metrics.clone();

let operations = std::mem::take(&mut self.operations);

match operations {
OptimizeOperations::Compact(bins) => {
futures::stream::iter(bins)
.flat_map(|(partition, bins)| {
futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
})
.map(|(partition, files)| {
let object_store_ref = object_store.clone();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader =
ParquetObjectReader::new(object_store_ref, file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.boxed();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
partition,
files,
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
})
.buffer_unordered(self.max_concurrent_tasks)
.try_for_each(|(partial_actions, partial_metrics)| {
debug!("Recording metrics for a completed partition");
actions.extend(partial_actions);
metrics.add(&partial_metrics);
async { Ok(()) }
})
.await?;
}
let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
.flat_map(|(partition, bins)| {
futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
})
.map(|(partition, files)| {
debug!(
"merging a group of {} files in partition {:?}",
files.len(),
partition,
);
for file in files.iter() {
debug!(" file {}", file.location);
}
let object_store_ref = object_store.clone();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref, file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.boxed();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
partition,
files,
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins) => {
#[cfg(not(feature = "datafusion"))]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
Expand All @@ -641,61 +651,86 @@ impl MergePlan {
object_store.clone(),
self.max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();
let object_store = object_store.clone();
futures::stream::iter(bins)
.map(|(partition, files)| {
.map(move |(partition, files)| {
let batch_stream = Self::read_zorder(files.clone(), exec_context.clone());

let object_store = object_store.clone();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
task_parameters.clone(),
partition,
files,
object_store,
batch_stream,
));
util::flatten_join_error(rewrite_result)
})
.buffer_unordered(self.max_concurrent_tasks)
.try_for_each(|(partial_actions, partial_metrics)| {
debug!("Recording metrics for a completed partition");
actions.extend(partial_actions);
metrics.add(&partial_metrics);
async { Ok(()) }
})
.await?;
.boxed()
}
}
};

metrics.preserve_insertion_order = true;
if metrics.num_files_added == 0 {
metrics.files_added.min = 0;
}
if metrics.num_files_removed == 0 {
metrics.files_removed.min = 0;
}
let mut stream = stream.buffer_unordered(self.max_concurrent_tasks);

// TODO: Check for remove actions on optimized partitions. If a
// optimized partition was updated then abort the commit. Requires (#593).
if !actions.is_empty() {
let mut metadata = Map::new();
metadata.insert("readVersion".to_owned(), self.read_table_version.into());
let maybe_map_metrics = serde_json::to_value(metrics.clone());
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone());

let mut last_commit = Instant::now();
loop {
let next = stream.next().await.transpose()?;

let end = next.is_none();

if let Some((partial_actions, partial_metrics)) = next {
debug!("Recording metrics for a completed partition");
actions.extend(partial_actions);
metrics.add(&partial_metrics);
total_metrics.add(&partial_metrics);
}

commit(
object_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
snapshot,
Some(metadata),
)
.await?;
let now = Instant::now();
if !actions.is_empty() && (self.min_commit_interval.map_or(false, |i| now.duration_since(last_commit) > i) || end) {
let actions = std::mem::take(&mut actions);
last_commit = now;

metrics.preserve_insertion_order = true;
let mut metadata = Map::new();
metadata.insert("readVersion".to_owned(), self.read_table_version.into());
let maybe_map_metrics =
serde_json::to_value(std::mem::replace(&mut metrics, orig_metrics.clone()));
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
}

table.update_incremental(None).await?;
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.object_store().as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
table.get_state(),
Some(metadata),
)
.await?;
}

if end {
break;
}
}

total_metrics.preserve_insertion_order = true;
if total_metrics.num_files_added == 0 {
total_metrics.files_added.min = 0;
}
if total_metrics.num_files_removed == 0 {
total_metrics.files_removed.min = 0;
}

Ok(metrics)
Ok(total_metrics)
}
}

Expand Down Expand Up @@ -729,6 +764,7 @@ pub fn create_merge_plan(
writer_properties: WriterProperties,
max_concurrent_tasks: usize,
max_spill_size: usize,
min_commit_interval: Option<Duration>,
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());

Expand Down Expand Up @@ -769,6 +805,7 @@ pub fn create_merge_plan(
read_table_version: snapshot.version(),
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
})
}

Expand Down
2 changes: 2 additions & 0 deletions rust/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
WriterProperties::builder().build(),
1,
20,
None,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -346,6 +347,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
WriterProperties::builder().build(),
1,
20,
None,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down

0 comments on commit acca200

Please sign in to comment.