Skip to content

Commit

Permalink
Add concurrent writes reconciliation for blind append INSERT operations
Browse files Browse the repository at this point in the history
Allow committing blind append INSERT operations in a concurrent context by
placing these operations right after any other previously concurrently
completed write operations.

Disallow committing blind insert operations in any of the following cases:

- table schema change has been committed in the meantime
- table protocol change has been committed in the meantime

INSERT operations that contain subqueries reading the same table are
subject to concurrent write failures.
  • Loading branch information
findinpath committed Feb 15, 2024
1 parent 916af43 commit efe6102
Show file tree
Hide file tree
Showing 11 changed files with 638 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class DeltaLakeCommitSummary
{
private final List<MetadataEntry> metadataUpdates;
private final Optional<ProtocolEntry> protocol;
private final Optional<Boolean> isBlindAppend;

public DeltaLakeCommitSummary(List<DeltaLakeTransactionLogEntry> transactionLogEntries)
{
requireNonNull(transactionLogEntries, "transactionLogEntries is null");
ImmutableList.Builder<MetadataEntry> metadataUpdatesBuilder = ImmutableList.builder();
Optional<ProtocolEntry> optionalProtocol = Optional.empty();
Optional<CommitInfoEntry> optionalCommitInfo = Optional.empty();

for (DeltaLakeTransactionLogEntry transactionLogEntry : transactionLogEntries) {
if (transactionLogEntry.getMetaData() != null) {
metadataUpdatesBuilder.add(transactionLogEntry.getMetaData());
}
else if (transactionLogEntry.getProtocol() != null) {
optionalProtocol = Optional.of(transactionLogEntry.getProtocol());
}
else if (transactionLogEntry.getCommitInfo() != null) {
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo());
}
}

metadataUpdates = metadataUpdatesBuilder.build();
protocol = optionalProtocol;
isBlindAppend = optionalCommitInfo.flatMap(CommitInfoEntry::isBlindAppend);
}

public List<MetadataEntry> getMetadataUpdates()
{
return metadataUpdates;
}

public Optional<ProtocolEntry> getProtocol()
{
return protocol;
}

public Optional<Boolean> getIsBlindAppend()
{
return isBlindAppend;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
*/
package io.trino.plugin.deltalake.transactionlog.writer;

/**
* Exception thrown to point out that another process has already created
* a namesake transaction log file as the one being written to commit the
* current transaction.
* <p>
* This kind of exception is potentially recoverable with a commit retry
* mechanism.
*/
public class TransactionConflictException
extends RuntimeException
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog.writer;

/**
* Exception thrown to point out that other transaction log files
* have been created during the time on which the current transaction
* has been executing and their already committed changes are in
* irremediable conflict with the changes from the current transaction.
*/
public class TransactionFailedException
extends RuntimeException
{
public TransactionFailedException(String message)
{
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.MoreFutures;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
Expand All @@ -40,12 +41,16 @@
import io.trino.tpch.TpchTable;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -79,10 +84,12 @@
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.abort;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

Expand Down Expand Up @@ -2262,6 +2269,63 @@ public void testPartitionFilterIncluded()
}
}

@RepeatedTest(3)
public void testConcurrentInsertsReconciliationForBlindInserts()
throws Exception
{
testConcurrentInsertsReconciliationForBlindInserts(false);
testConcurrentInsertsReconciliationForBlindInserts(true);
}

private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitioned)
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String tableName = "test_concurrent_inserts_table_" + randomNameSuffix();

assertUpdate("CREATE TABLE " + tableName + " (a INT, part INT) " +
(partitioned ? " WITH (partitioned_by = ARRAY['part'])" : ""));

try {
// insert data concurrently
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (1, 10)");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (11, 20)");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (21, 30)");
return null;
})
.build())
.forEach(MoreFutures::getDone);

assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (21, 30)");
assertQuery("SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"",
"""
VALUES
(0, 'CREATE TABLE', 'WriteSerializable', 0),
(1, 'WRITE', 'WriteSerializable', 0),
(2, 'WRITE', 'WriteSerializable', 1),
(3, 'WRITE', 'WriteSerializable', 2)
""");
}
finally {
assertUpdate("DROP TABLE " + tableName);
executor.shutdownNow();
assertTrue(executor.awaitTermination(10, SECONDS));
}
}

protected List<String> listCheckpointFiles(String transactionLogDirectory)
{
return listFiles(transactionLogDirectory).stream()
Expand Down
Loading

0 comments on commit efe6102

Please sign in to comment.