diff --git a/crates/aws/src/logstore.rs b/crates/aws/src/logstore.rs index 8e02659d87..123aadd2d1 100644 --- a/crates/aws/src/logstore.rs +++ b/crates/aws/src/logstore.rs @@ -166,6 +166,20 @@ impl LogStore for S3DynamoDbLogStore { self.table_path.clone() } + async fn refresh(&self) -> DeltaResult<()> { + let entry = self + .lock_client + .get_latest_entry(&self.table_path) + .await + .map_err(|err| DeltaTableError::GenericError { + source: Box::new(err), + })?; + if let Some(entry) = entry { + self.repair_entry(&entry).await?; + } + Ok(()) + } + async fn read_commit_entry(&self, version: i64) -> DeltaResult> { let entry = self .lock_client diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index ff8f0ae7e9..179c46fc5a 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -156,7 +156,6 @@ async fn test_repair_commit_entry() -> TestResult<()> { #[tokio::test] #[serial] -#[ignore = "https://github.com/delta-io/delta-rs/issues/2109"] async fn test_repair_on_update() -> TestResult<()> { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let mut table = prepare_table(&context, "repair_on_update").await?; @@ -168,6 +167,19 @@ async fn test_repair_on_update() -> TestResult<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn test_repair_on_load() -> TestResult<()> { + let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + let mut table = prepare_table(&context, "repair_on_update").await?; + let _entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?; + table.load_version(1).await?; + // table should fix the broken entry while loading a specific version + assert_eq!(table.version(), 1); + validate_lock_table_state(&table, 1).await?; + Ok(()) +} + const WORKERS: i64 = 3; const COMMITS: i64 = 15; diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 66cc428c3f..6ad1690db1 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -18,6 +18,7 @@ use tracing::debug; use super::parse; use crate::kernel::{arrow::json, Action, ActionType, Metadata, Protocol, Schema, StructType}; +use crate::logstore::LogStore; use crate::operations::transaction::get_commit_bytes; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -148,15 +149,21 @@ impl LogSegment { table_root: &Path, start_version: i64, end_version: Option, - store: &dyn ObjectStore, + log_store: &dyn LogStore, ) -> DeltaResult { debug!( "try_new_slice: start_version: {}, end_version: {:?}", start_version, end_version ); + log_store.refresh().await?; let log_url = table_root.child("_delta_log"); - let (mut commit_files, checkpoint_files) = - list_log_files(store, &log_url, end_version, Some(start_version)).await?; + let (mut commit_files, checkpoint_files) = list_log_files( + log_store.object_store().as_ref(), + &log_url, + end_version, + Some(start_version), + ) + .await?; // remove all files above requested version if let Some(version) = end_version { commit_files.retain(|meta| meta.location.commit_version() <= Some(version)); diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 715fb2feec..d12018c245 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -30,6 +30,7 @@ use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; use super::{Action, Add, CommitInfo, DataType, Metadata, Protocol, Remove, StructField}; use crate::kernel::StructType; +use crate::logstore::LogStore; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -108,16 +109,16 @@ impl Snapshot { /// Update the snapshot to the given version pub async fn update( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult<()> { - self.update_inner(store, target_version).await?; + self.update_inner(log_store, target_version).await?; Ok(()) } async fn update_inner( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult> { if let Some(version) = target_version { @@ -125,16 +126,14 @@ impl Snapshot { return Ok(None); } if version < self.version() { - return Err(DeltaTableError::Generic( - "Cannoit downgrade snapshot".into(), - )); + return Err(DeltaTableError::Generic("Cannot downgrade snapshot".into())); } } let log_segment = LogSegment::try_new_slice( &Path::default(), self.version() + 1, target_version, - store.as_ref(), + log_store.as_ref(), ) .await?; if log_segment.commit_files.is_empty() && log_segment.checkpoint_files.is_empty() { @@ -142,7 +141,7 @@ impl Snapshot { } let (protocol, metadata) = log_segment - .read_metadata(store.clone(), &self.config) + .read_metadata(log_store.object_store().clone(), &self.config) .await?; if let Some(protocol) = protocol { self.protocol = protocol; @@ -376,7 +375,7 @@ impl EagerSnapshot { /// Update the snapshot to the given version pub async fn update( &mut self, - store: Arc, + log_store: Arc, target_version: Option, ) -> DeltaResult<()> { if Some(self.version()) == target_version { @@ -384,12 +383,12 @@ impl EagerSnapshot { } let new_slice = self .snapshot - .update_inner(store.clone(), target_version) + .update_inner(log_store.clone(), target_version) .await?; if let Some(new_slice) = new_slice { let files = std::mem::take(&mut self.files); let log_stream = new_slice.commit_stream( - store.clone(), + log_store.object_store().clone(), &log_segment::COMMIT_SCHEMA, &self.snapshot.config, )?; @@ -398,7 +397,7 @@ impl EagerSnapshot { } else { new_slice .checkpoint_stream( - store, + log_store.object_store(), &log_segment::CHECKPOINT_SCHEMA, &self.snapshot.config, ) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 5deaa9cd36..e6b4c6e2d4 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -165,6 +165,11 @@ pub trait LogStore: Sync + Send { /// Return the name of this LogStore implementation fn name(&self) -> String; + /// Trigger sync operation on log store to. + async fn refresh(&self) -> DeltaResult<()> { + Ok(()) + } + /// Read data for commit entry with the given version. async fn read_commit_entry(&self, version: i64) -> DeltaResult>; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index ad260295ba..7615c72dc3 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -345,11 +345,7 @@ impl DeltaTable { self.version(), ); match self.state.as_mut() { - Some(state) => { - state - .update(self.log_store.object_store(), max_version) - .await - } + Some(state) => state.update(self.log_store.clone(), max_version).await, _ => { let state = DeltaTableState::try_new( &Path::default(), diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 104ba2bd32..ab5c229c49 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -14,6 +14,7 @@ use crate::kernel::{ Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove, StructType, }; +use crate::logstore::LogStore; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableError}; @@ -196,10 +197,10 @@ impl DeltaTableState { /// Update the state of the table to the given version. pub async fn update( &mut self, - store: Arc, + log_store: Arc, version: Option, ) -> Result<(), DeltaTableError> { - self.snapshot.update(store, version).await?; + self.snapshot.update(log_store, version).await?; Ok(()) } diff --git a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py index 64479e96c8..2cb4f05805 100644 --- a/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py +++ b/python/tests/pyspark_integration/test_concurrent_write_s3_dynamo.py @@ -183,11 +183,11 @@ def delete_dynamodb_table(table_name: str): @pytest.fixture def setup(): - os.environ['AWS_ENDPOINT_URL'] = 'http://localhost:4566' - os.environ['AWS_REGION'] = 'us-east-1' - os.environ['AWS_ACCESS_KEY_ID'] = 'deltalake' - os.environ['AWS_SECRET_ACCESS_KEY'] = 'weloverust' - os.environ['AWS_ALLOW_HTTP'] = 'true' + os.environ["AWS_ENDPOINT_URL"] = "http://localhost:4566" + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "deltalake" + os.environ["AWS_SECRET_ACCESS_KEY"] = "weloverust" + os.environ["AWS_ALLOW_HTTP"] = "true" id = uuid.uuid4() bucket_name = f"delta-rs-integration-{id}" bucket_url = f"s3://{bucket_name}"