-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create Table Concurrent query Failure handling in Delta Lake #24250
base: master
Are you sure you want to change the base?
Create Table Concurrent query Failure handling in Delta Lake #24250
Conversation
078d491
to
c7628b3
Compare
@@ -1263,7 +1263,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe | |||
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location); | |||
} | |||
else { | |||
setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory)); | |||
// deleteRecursivelyIfNothingExists ensures current CREATE TABLE doesn't delete directory if there's a conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we checking in the catch
clause whether we're dealing with a TransactionConflictException
instead?
By doing this, we'd likely know whether we're in a concurrency situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findinpath the rollback happens in a different thread AFAIK, the exception context needs to be passed on as well. Also the rollback initialisation happens in beginCreateTable & createTable
calls which is much prior to finishCreateTable
which is later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're figuring out that another process already created the table only when trying to write the first transaction log file right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes @findinpath,
trino/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Line 1660 in 5b82e10
transactionLogWriter.flush(); |
this is where the operation fails due Conflict.
...ake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentCreateTableTest.java
Show resolved
Hide resolved
c7628b3
to
32fae9e
Compare
63cc0ed
to
a1ed23d
Compare
.build()) | ||
.forEach(MoreFutures::getDone); | ||
|
||
// Verify table exists and has one row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check that catalog doesn't have any files from failed attempts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean the failed table creations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the test case to ensure the files contained are from successful query execution and only
...ake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentCreateTableTest.java
Outdated
Show resolved
Hide resolved
9ca102a
to
7a9b13d
Compare
7a9b13d
to
c747a0b
Compare
{ | ||
try { | ||
fileSystem.deleteDirectory(path); | ||
Location deltaLogDirectory = Location.of(getTransactionLogDir(tablePath.path())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code throws exception which is only logged:
2024-12-18T06:58:49.952-0600 ERROR transaction-finishing-0 io.trino.metadata.CatalogMetadata Connector threw exception on abort
java.lang.IllegalArgumentException: No scheme for file system location: var/folders/ns/drnmv_551mjcn3rd0ccw1tzm0000gn/T/catalog-dir5122020563593486373/tpch/test_concurrent_ctas_vnae2mtwc9/_delta_log
It causes probably CI failures.
It also visible in logs for testConcurrentCreateTableAsSelectSameTable
return null; | ||
}).collect(toImmutableList())).forEach(MoreFutures::getDone); | ||
|
||
// Verify table exists and has one row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment
long pathMatchingSuccessfulQueryCount = (long) computeScalar("with a as (select distinct(\"$path\") as path from " + tableName + "), " + | ||
"b as (select element_at(operation_parameters, 'queryId') as queryId from \"" + tableName + "$history\") " + | ||
"select count(1) from a,b where a.path like '%' || b.queryId || '%'"); | ||
assertThat(pathCount).isEqualTo(pathMatchingSuccessfulQueryCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert does not check files existence. $history and $path will return only files from succeeded queries.
You can use getTableFiles
like here:
Line 671 in fbb9610
assertEventually(new Duration(5, SECONDS), () -> assertThat(getTableFiles(tableName)).isEmpty()); |
Description
Create Table [as select] concurrent query failure handling
Additional context and related issues
Fixes #24153
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: