Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta Lake Inconsistent INSERT outcome in the context of performing concurrent operations #21324

Closed
findinpath opened this issue Mar 29, 2024 · 2 comments · Fixed by #21330
Closed
Labels
bug Something isn't working correctness delta-lake Delta Lake connector

Comments

@findinpath
Copy link
Contributor

findinpath commented Mar 29, 2024

https://github.com/trinodb/trino/actions/runs/8480015101/job/23235027443

2024-03-29T04:37:45.772-0600	ERROR	SplitRunner-20240329_103745_00063_e4xfv.0.0.0-5-739	io.trino.plugin.deltalake.DeltaLakeMetadata	Failed to write checkpoint for table test_delta_concurrent_writes_df66fjlg6l.test_concurrent_inserts_select_from_same_table_h28nk6v7lf for version 2
io.trino.spi.TrinoException: Error getting snapshot for test_delta_concurrent_writes_df66fjlg6l.test_concurrent_inserts_select_from_same_table_h28nk6v7lf
	at io.trino.plugin.deltalake.DeltaLakeMetadata.getSnapshot(DeltaLakeMetadata.java:485)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.writeCheckpointIfNeeded(DeltaLakeMetadata.java:2469)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:1807)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:633)
	at io.trino.tracing.TracingConnectorMetadata.finishInsert(TracingConnectorMetadata.java:718)
	at io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:1168)
	at io.trino.tracing.TracingMetadata.finishInsert(TracingMetadata.java:706)
	at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4090)
	at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
	at io.trino.operator.Driver.processInternal(Driver.java:398)
	at io.trino.operator.Driver.lambda$process$8(Driver.java:301)
	at io.trino.operator.Driver.tryWithLock(Driver.java:704)
	at io.trino.operator.Driver.process(Driver.java:293)
	at io.trino.operator.Driver.processForDuration(Driver.java:264)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:887)
	at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:76)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
	at io.trino.$gen.Trino_testversion____[202](https://github.com/trinodb/trino/actions/runs/8480015101/job/23235027443#step:5:203)40329_103721_265.run(Unknown Source)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
	at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:174)
	at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:161)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IllegalStateException: No previously loaded snapshot found for query 20240329_103745_00063_e4xfv, table test_delta_concurrent_writes_df66fjlg6l.test_concurrent_inserts_select_from_same_table_h28nk6v7lf [local:///test_delta_concurrent_writes_df66fjlg6l/test_concurrent_inserts_select_from_same_table_h28nk6v7lf-fb80ef1630a249098808c77433e76f23] at version 1
	at com.google.common.base.Preconditions.checkState(Preconditions.java:877)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.getSnapshot(DeltaLakeMetadata.java:474)
	... 27 more
....

Error:  io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameTable -- Time elapsed: 1.223 s <<< FAILURE!
java.lang.AssertionError: 
For query 20240329_103745_00065_e4xfv: 
 SELECT * FROM test_concurrent_inserts_select_from_same_table_h28nk6v7lf
not equal
Actual rows (up to 100 of 1 extra rows shown, 3 rows in total):
    [1, 10]
Expected rows (up to 100 of 1 missing rows shown, 3 rows in total):
    [2, 10]

	at io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder(QueryAssertions.java:409)
	at io.trino.testing.QueryAssertions.assertDistributedQuery(QueryAssertions.java:371)
	at io.trino.testing.QueryAssertions.assertQuery(QueryAssertions.java:187)
	at io.trino.testing.QueryAssertions.assertQuery(QueryAssertions.java:160)
	at io.trino.testing.AbstractTestQueryFramework.assertQuery(AbstractTestQueryFramework.java:350)
	at io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameTable(TestDeltaLakeLocalConcurrentWritesTest.java:228)
	at io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameTable(TestDeltaLakeLocalConcurrentWritesTest.java:177)

Relates to #20983

@findinpath findinpath added bug Something isn't working delta-lake Delta Lake connector labels Mar 29, 2024
@findinpath
Copy link
Contributor Author

findinpath commented Mar 29, 2024

2024-03-29T04:37:45.772-0600	ERROR	SplitRunner-20240329_103745_00063_e4xfv.0.0.0-5-739	io.trino.plugin.deltalake.DeltaLakeMetadata	Failed to write checkpoint for table test_delta_concurrent_writes_df66fjlg6l.test_concurrent_inserts_select_from_same_table_h28nk6v7lf for version 2
....
Caused by: java.lang.IllegalStateException: No previously loaded snapshot found for query 20240329_103745_00063_e4xfv, table test_delta_concurrent_writes_df66fjlg6l.test_concurrent_inserts_select_from_same_table_h28nk6v7lf [local:///test_delta_concurrent_writes_df66fjlg6l/test_concurrent_inserts_select_from_same_table_h28nk6v7lf-fb80ef1630a249098808c77433e76f23] at version 1

The exception above is caused by the fact that the readVersion from DeltaLakeInsertTableHandle bypasses the mechanism for registering queriedVersions

getMandatoryCurrentVersion(fileSystem, tableLocation, table.getReadVersion()),

checkState(queriedVersions.put(table, snapshot.getVersion()) == null, "queriedLocations changed concurrently for %s", table);

Note that the same behavior is exercised in beginMerge as well.

Explanation

In getTableHandle we read the table at version 0 and likely register its version in queriedVersions.
In beginInsert we read the table at version 1(because in the meantime a concurrent operation has successfully completed and created 1.log transaction log file), but don't register it in queriedVersions . This is why in finishInsert we don't find the version 1 in queriedVersions.

Going on this line of thinking, the TableScanNode is created based on the readVersion 0 of the DeltaLakeTableHandle (obtained after getTableHandle/applyFilter and not on DeltaLakeInsertTableHandle created in beginInsert method).

The DeltaLakeInsertTableHandle is created with the readVersion 1 in beginInsert.
The operation is committed successfully in finishInsert and the version 2 of the table is created. The checkpoint writeCheckpointIfNeeded check however fails because the queriedVersion 1 is not registered.

The expected result (in case that all the statements would succeed) would be:

// Considering T1, T2, T3 being the order of completion of the concurrent INSERT operations,
// if all the operations would eventually succeed, the entries inserted per thread would look like this:
// T1: (1, 10)
// T2: (2, 10)
// T3: (3, 10)

The table contains now the data:

0,10 initial data  0.log
1,10 the first winning commit 1.log
1,10 the second commit which mistakenly assumes it is scanning data beginning from readVersion 1, while the TableScanNode corresponds to readVersion 0  2.log

Quite likely the readVersion on the DeltaLakeInsertTableHandle should be set from DeltaLakeTableHandle to reflect the reality of the query plan

cc @pajaks

@findinpath findinpath changed the title Flaky TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameTable Delta Lake Inconsistent INSERT outcome in the context of performing concurrent operations Mar 30, 2024
@ebyhr
Copy link
Member

ebyhr commented Apr 2, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working correctness delta-lake Delta Lake connector
Development

Successfully merging a pull request may close this issue.

3 participants