Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potentially incorrect table read version for writes on Delta Lake #21330

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1741,26 +1741,20 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
// This check acts as a safeguard in cases where the input columns may differ from the table metadata case-sensitively
checkAllColumnsPassedOnInsert(tableMetadata, inputColumns);

return createInsertHandle(session, retryMode, table, inputColumns);
return createInsertHandle(retryMode, table, inputColumns);
}

private DeltaLakeInsertTableHandle createInsertHandle(ConnectorSession session, RetryMode retryMode, DeltaLakeTableHandle table, List<DeltaLakeColumnHandle> inputColumns)
private DeltaLakeInsertTableHandle createInsertHandle(RetryMode retryMode, DeltaLakeTableHandle table, List<DeltaLakeColumnHandle> inputColumns)
{
String tableLocation = table.getLocation();
try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
return new DeltaLakeInsertTableHandle(
table.getSchemaTableName(),
tableLocation,
table.getMetadataEntry(),
table.getProtocolEntry(),
inputColumns,
getMandatoryCurrentVersion(fileSystem, tableLocation, table.getReadVersion()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, this line was incorrect

retryMode != NO_RETRIES);
}
catch (IOException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
return new DeltaLakeInsertTableHandle(
table.getSchemaTableName(),
tableLocation,
table.getMetadataEntry(),
table.getProtocolEntry(),
inputColumns,
table.getReadVersion(),
retryMode != NO_RETRIES);
}

private void checkAllColumnsPassedOnInsert(ConnectorTableMetadata tableMetadata, List<DeltaLakeColumnHandle> insertColumns)
Expand Down Expand Up @@ -2046,7 +2040,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
.filter(column -> column.getColumnType() != SYNTHESIZED)
.collect(toImmutableList());

DeltaLakeInsertTableHandle insertHandle = createInsertHandle(session, retryMode, handle, inputColumns);
DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns);

return new DeltaLakeMergeTableHandle(handle, insertHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2406,11 +2406,11 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned)
.map("(%d, 10)"::formatted)
.collect(joining(", ", ", ", "")));
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" +
"SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" +
LongStream.rangeClosed(1, successfulInsertsCount)
.boxed()
.map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1))
.map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1))
.collect(joining(", ", ", ", "")));
}
finally {
Expand Down Expand Up @@ -2458,13 +2458,13 @@ public void testConcurrentInsertsReconciliationForMixedInserts()
"SELECT * FROM " + tableName,
"VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)");
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
"SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0),
(1, 'WRITE', 'WriteSerializable', 0),
(2, 'WRITE', 'WriteSerializable', 1),
(3, 'WRITE', 'WriteSerializable', 2)
('CREATE TABLE AS SELECT', 'WriteSerializable', true),
('WRITE', 'WriteSerializable', false),
('WRITE', 'WriteSerializable', false),
('WRITE', 'WriteSerializable', true)
""");
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public void testDeleteWithNonPartitionFilter()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream"))
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists"), 2)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "OutputFile.createOrOverwrite"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream"))
.addCopies(new FileOperation(DATA, "key=domain1/", "InputFile.newInput"), 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitio
.forEach(MoreFutures::getDone);

assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (21, 30)");
assertQuery("SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE', 'WriteSerializable', 0),
(1, 'WRITE', 'WriteSerializable', 0),
(2, 'WRITE', 'WriteSerializable', 1),
(3, 'WRITE', 'WriteSerializable', 2)
(0, 'CREATE TABLE', 'WriteSerializable', 0, true),
(1, 'WRITE', 'WriteSerializable', 0, true),
(2, 'WRITE', 'WriteSerializable', 1, true),
(3, 'WRITE', 'WriteSerializable', 2, true)
""");
}
finally {
Expand Down Expand Up @@ -233,11 +233,11 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned)
.map("(%d, 10)"::formatted)
.collect(joining(", ", ", ", "")));
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
findinpath marked this conversation as resolved.
Show resolved Hide resolved
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" +
"SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" +
LongStream.rangeClosed(1, successfulInsertsCount)
.boxed()
.map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1))
.map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1))
.collect(joining(", ", ", ", "")));
}
finally {
Expand Down Expand Up @@ -303,11 +303,11 @@ public void testConcurrentInsertsSelectingFromTheSamePartition()
.map("(%d, 10)"::formatted)
.collect(joining(", ", ", ", "")));
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" +
"SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" +
LongStream.rangeClosed(1, successfulInsertsCount)
.boxed()
.map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1))
.map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1))
.collect(joining(", ", ", ", "")));
}
finally {
Expand Down Expand Up @@ -356,13 +356,13 @@ public void testConcurrentInsertsReconciliationForMixedInserts()
"SELECT * FROM " + tableName,
"VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)");
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
findinpath marked this conversation as resolved.
Show resolved Hide resolved
"SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0),
(1, 'WRITE', 'WriteSerializable', 0),
(2, 'WRITE', 'WriteSerializable', 1),
(3, 'WRITE', 'WriteSerializable', 2)
('CREATE TABLE AS SELECT', 'WriteSerializable', true),
('WRITE', 'WriteSerializable', false),
('WRITE', 'WriteSerializable', false),
('WRITE', 'WriteSerializable', true)
findinpath marked this conversation as resolved.
Show resolved Hide resolved
""");
}
finally {
Expand Down Expand Up @@ -409,13 +409,13 @@ public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable()
"SELECT * FROM " + tableName,
"VALUES (0, 10), (11, 20), (22, 30), (1, 40), (13, 40), (25, 40)");
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
"SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0),
(1, 'WRITE', 'WriteSerializable', 0),
(2, 'WRITE', 'WriteSerializable', 1),
(3, 'WRITE', 'WriteSerializable', 2)
('CREATE TABLE AS SELECT', 'WriteSerializable', true),
('WRITE', 'WriteSerializable', false),
('WRITE', 'WriteSerializable', false),
('WRITE', 'WriteSerializable', false)
""");
}
finally {
Expand Down Expand Up @@ -472,14 +472,14 @@ public void testConcurrentInsertsSelectingFromMultipleNonoverlappingPartitionsOf
(55, 60), (56,60), (57, 60), (58,60)
""");
assertQuery(
"SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
"SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0),
(1, 'WRITE', 'WriteSerializable', 0),
(2, 'WRITE', 'WriteSerializable', 1),
(3, 'WRITE', 'WriteSerializable', 2),
(4, 'WRITE', 'WriteSerializable', 3)
(0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true),
(1, 'WRITE', 'WriteSerializable', 0, true),
(2, 'WRITE', 'WriteSerializable', 1, false),
(3, 'WRITE', 'WriteSerializable', 2, false),
(4, 'WRITE', 'WriteSerializable', 3, false)
""");
}
finally {
Expand Down