Skip to content

Commit

Permalink
Create Table Concurrent query handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vinay-kl committed Dec 18, 2024
1 parent a87e255 commit c747a0b
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location);
}
else {
setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory));
setRollback(() -> deleteRecursivelyIfLogNotExists(fileSystem, deltaLogDirectory));
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
}

Expand Down Expand Up @@ -1440,7 +1440,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(
}
else {
checkPathContainsNoFiles(session, finalLocation);
setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), finalLocation));
setRollback(() -> deleteRecursivelyIfLogNotExists(fileSystemFactory.create(session), finalLocation));
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
}

Expand Down Expand Up @@ -1550,13 +1550,17 @@ private void checkColumnTypes(List<ColumnMetadata> columnMetadata)
}
}

private static void deleteRecursivelyIfExists(TrinoFileSystem fileSystem, Location path)
private static void deleteRecursivelyIfLogNotExists(TrinoFileSystem fileSystem, Location tablePath)
{
try {
fileSystem.deleteDirectory(path);
Location deltaLogDirectory = Location.of(getTransactionLogDir(tablePath.path()));
boolean transactionLogFileExists = fileSystem.listFiles(deltaLogDirectory).hasNext();
if (!transactionLogFileExists) {
fileSystem.deleteDirectory(tablePath);
}
}
catch (IOException e) {
LOG.warn(e, "IOException while trying to delete '%s'", path);
LOG.warn(e, "IOException while trying to delete '%s'", tablePath);
}
}

Expand Down Expand Up @@ -1697,7 +1701,11 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
// TODO perhaps it should happen in a background thread (https://github.com/trinodb/trino/issues/12011)
cleanupFailedWrite(session, handle.location(), dataFileInfos);
}
if (handle.readVersion().isEmpty()) {
// Table already exist and created by a concurrent transaction as there's conflict while writing the transaction log entry
if (e instanceof TransactionConflictException) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry, Table already exists", e);
}
else if (handle.readVersion().isEmpty()) {
Location transactionLogDir = Location.of(getTransactionLogDir(location));
try {
fileSystemFactory.create(session).deleteDirectory(transactionLogDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;

import static java.util.Objects.requireNonNull;

Expand All @@ -48,6 +49,9 @@ public void write(ConnectorSession session, String clusterId, Location newLogEnt
outputStream.write(entryContents);
}
}
catch (FileAlreadyExistsException e) {
throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.MoreFutures;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@TestInstance(PER_CLASS)
public class TestDeltaLakeLocalConcurrentCreateTableTest
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Path catalogDir = Files.createTempDirectory("catalog-dir");
closeAfterClass(() -> deleteRecursively(catalogDir, ALLOW_INSECURE));

return DeltaLakeQueryRunner.builder()
.addDeltaProperty("delta.unique-table-location", "false")
.addDeltaProperty("fs.hadoop.enabled", "true")
.addDeltaProperty("hive.metastore", "file")
.addDeltaProperty("hive.metastore.catalog.dir", catalogDir.toUri().toString())
.addDeltaProperty("hive.metastore.disable-location-checks", "true")
.build();
}

@Test
public void testConcurrentCreateTableAsSelect()
throws Exception
{
testConcurrentCreateTableAsSelect(false);
testConcurrentCreateTableAsSelect(true);
}

private void testConcurrentCreateTableAsSelect(boolean partitioned)
throws InterruptedException
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);

try {
// Execute concurrent CTAS operations
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("CREATE TABLE test_ctas_1"
+ (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS SELECT 1 as a, 10 as part");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("CREATE TABLE test_ctas_2"
+ (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS SELECT 11 as a, 20 as part");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("CREATE TABLE test_ctas_3"
+ (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS SELECT 21 as a, 30 as part");
return null;
})
.build())
.forEach(MoreFutures::getDone);

// Verify each table was created with correct data
assertThat(query("SELECT * FROM test_ctas_1")).matches("VALUES (1, 10)");
assertThat(query("SELECT * FROM test_ctas_2")).matches("VALUES (11, 20)");
assertThat(query("SELECT * FROM test_ctas_3")).matches("VALUES (21, 30)");

// Verify table histories
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"test_ctas_1$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"test_ctas_2$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"test_ctas_3$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
}
finally {
// Clean up
assertUpdate("DROP TABLE IF EXISTS test_ctas_1");
assertUpdate("DROP TABLE IF EXISTS test_ctas_2");
assertUpdate("DROP TABLE IF EXISTS test_ctas_3");
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
public void testConcurrentCreateTableAsSelectSameTable()
throws Exception
{
testConcurrentCreateTableAsSelectSameTable(false);
testConcurrentCreateTableAsSelectSameTable(true);
}

private void testConcurrentCreateTableAsSelectSameTable(boolean partitioned)
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String tableName = "test_concurrent_ctas_" + randomNameSuffix();

try {
getQueryRunner().execute("create table stg_test as SELECT a, b, 20220101 as d FROM UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b)");

String selectString = " as (select stg1.a as a, stg1.b as b, stg1.d as part from stg_test stg1, stg_test stg2 where stg1.d=stg2.d)";

// Execute concurrent CTAS operations
executor.invokeAll(IntStream.range(0, threads).mapToObj(i -> (Callable<Void>) () -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("CREATE TABLE " + tableName +
(partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + selectString);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
assertThat(trinoException).hasMessageContaining("Table already exists");
}
return null;
}).collect(toImmutableList())).forEach(MoreFutures::getDone);

// Verify table exists and has one row
assertThat((long) computeScalar("SELECT count(*) FROM " + tableName)).isEqualTo(81018001L);

// Verify table history shows single creation
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");

// Ensure the files in the table directory contains only the file written by succeeded query
long pathCount = (long) computeScalar("select count(distinct(\"$path\")) as path FROM " + tableName);
long pathMatchingSuccessfulQueryCount = (long) computeScalar("with a as (select distinct(\"$path\") as path from " + tableName + "), " +
"b as (select element_at(operation_parameters, 'queryId') as queryId from \"" + tableName + "$history\") " +
"select count(1) from a,b where a.path like '%' || b.queryId || '%'");
assertThat(pathCount).isEqualTo(pathMatchingSuccessfulQueryCount);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
assertUpdate("DROP TABLE IF EXISTS stg_test");
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}
}

0 comments on commit c747a0b

Please sign in to comment.