Skip to content

Commit

Permalink
fix(python,rust): missing remove actions during create_or_replace (#…
Browse files Browse the repository at this point in the history
…2437)

# Description
The overwrite mode never added the remove actions, which causes your
table to get in an invalid state.
  • Loading branch information
ion-elgreco authored Apr 22, 2024
1 parent 5f137ca commit f12834e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
60 changes: 58 additions & 2 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl CreateBuilder {
};

let mut actions = vec![Action::Protocol(protocol), Action::Metadata(metadata)];

actions.extend(
self.actions
.into_iter()
Expand All @@ -329,7 +330,7 @@ impl std::future::IntoFuture for CreateBuilder {
Box::pin(async move {
let mode = this.mode;
let app_metadata = this.metadata.clone().unwrap_or_default();
let (mut table, actions, operation) = this.into_table_and_actions()?;
let (mut table, mut actions, operation) = this.into_table_and_actions()?;
let log_store = table.log_store();

let table_state = if log_store.is_delta_table_location().await? {
Expand All @@ -342,6 +343,12 @@ impl std::future::IntoFuture for CreateBuilder {
}
SaveMode::Overwrite => {
table.load().await?;
let remove_actions = table
.snapshot()?
.log_data()
.into_iter()
.map(|p| p.remove_action(true).into());
actions.extend(remove_actions);
Some(table.snapshot()?)
}
}
Expand Down Expand Up @@ -371,7 +378,7 @@ mod tests {
use super::*;
use crate::operations::DeltaOps;
use crate::table::config::DeltaConfigKey;
use crate::writer::test_utils::get_delta_schema;
use crate::writer::test_utils::{get_delta_schema, get_record_batch};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -518,4 +525,53 @@ mod tests {
.unwrap();
assert_ne!(table.metadata().unwrap().id, first_id)
}

#[tokio::test]
async fn test_create_or_replace_existing_table() {
let batch = get_record_batch(None, false);
let schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_files_count(), 1);

let mut table = DeltaOps(table)
.create()
.with_columns(schema.fields().iter().cloned())
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();
table.load().await.unwrap();
assert_eq!(table.version(), 1);
/// Checks if files got removed after overwrite
assert_eq!(table.get_files_count(), 0);
}

#[tokio::test]
async fn test_create_or_replace_existing_table_partitioned() {
let batch = get_record_batch(None, false);
let schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_files_count(), 1);

let mut table = DeltaOps(table)
.create()
.with_columns(schema.fields().iter().cloned())
.with_save_mode(SaveMode::Overwrite)
.with_partition_columns(vec!["id"])
.await
.unwrap();
table.load().await.unwrap();
assert_eq!(table.version(), 1);
/// Checks if files got removed after overwrite
assert_eq!(table.get_files_count(), 0);
}
}
13 changes: 12 additions & 1 deletion python/tests/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pyarrow as pa
import pytest

from deltalake import DeltaTable
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaError


Expand Down Expand Up @@ -54,3 +54,14 @@ def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table):
)

assert dt.schema().to_pyarrow() == sample_data.schema


def test_create_or_replace_existing_table(
tmp_path: pathlib.Path, sample_data: pa.Table
):
write_deltalake(table_or_uri=tmp_path, data=sample_data)
dt = DeltaTable.create(
tmp_path, sample_data.schema, partition_by=["utf8"], mode="overwrite"
)

assert dt.files() == []

0 comments on commit f12834e

Please sign in to comment.