Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent adding columns (ALTER TABLE ADD COLUMN) test to BaseConnectorTest #12483

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ protected String errorMessageForInsertIntoNotNullColumn(String columnName)
return format("NULL not allowed for column \"%s\"(?s).*", columnName.toUpperCase(ENGLISH));
}

@Override
public void testAddColumnConcurrently()
{
// TODO: Difficult to determine whether the exception is concurrent issue or not from the error message
throw new SkipException("TODO: Enable this test after finding the failure cause");
}

@Override
protected JdbcSqlExecutor onRemoteDatabase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public void testDropColumn()
assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN a", "(?s).* Missing columns: 'a' while processing query: 'a', required columns: 'a' 'a'.*");
}

@Override
public void testAddColumnConcurrently()
{
// TODO: Default storage engine doesn't support adding new columns
throw new SkipException("TODO: test not implemented yet");
}

@Override
public void testAddColumn()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e)
assertThat(e).hasMessageContaining("Failed to commit Iceberg update to table");
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Cannot update Iceberg table: supplied previous location does not match current location");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably should be "Failed to commit Iceberg .."
but is fine for now.

cc @alexjo2144

}

@Test
public void testDeleteOnV1Table()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ public void testInsertRowConcurrently()
throw new SkipException("TODO");
}

@Override
public void testAddColumnConcurrently()
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
}

@Test
@Override
public void testDelete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,13 @@ public void testLimitPushdown()
assertThat(query("SELECT name FROM nation LIMIT 2147483648")).isNotFullyPushedDown(LimitNode.class);
}

@Override
public void testAddColumnConcurrently()
{
// TODO: Enable after supporting multi-document transaction https://www.mongodb.com/docs/manual/core/transactions/
throw new SkipException("TODO");
}

private void assertOneNotNullResult(String query)
{
MaterializedResult results = getQueryRunner().execute(getSession(), query).toTestTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ protected String errorMessageForInsertIntoNotNullColumn(String columnName)
return format("ORA-01400: cannot insert NULL into \\(.*\"%s\"\\)\n", columnName.toUpperCase(ENGLISH));
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessage("ORA-14411: The DDL cannot be run concurrently with other DDLs\n");
}

private void predicatePushdownTest(String oracleType, String oracleLiteral, String operator, String filterLiteral)
{
String tableName = ("test_pdown_" + oracleType.replaceAll("[^a-zA-Z0-9]", ""))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,13 @@ protected TestTable createTableWithDoubleAndRealColumns(String name, List<String
return new TestTable(onRemoteDatabase(), name, "(t_double double primary key, u_double double, v_real float, w_real float)", rows);
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Concurrent modification to table");
}

@Override
protected SqlExecutor onRemoteDatabase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,13 @@ protected TestTable createTableWithDoubleAndRealColumns(String name, List<String
return new TestTable(onRemoteDatabase(), name, "(t_double double primary key, u_double double, v_real float, w_real float)", rows);
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Concurrent modification to table");
}

@Override
protected SqlExecutor onRemoteDatabase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static java.util.Arrays.asList;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -880,4 +881,16 @@ public void testAlterTable()

assertUpdate("DROP TABLE test_alter_table");
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Failed to perform metadata operation")
.getCause()
.hasMessageMatching(
"(?s).*SQLIntegrityConstraintViolationException.*" +
"|.*Unique index or primary key violation.*" +
"|.*Deadlock found when trying to get lock; try restarting transaction.*");
Comment on lines +889 to +894
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,71 @@ protected void verifyConcurrentInsertFailurePermissible(Exception e)
throw new AssertionError("Unexpected concurrent insert failure", e);
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@Test(timeOut = 60_000, invocationCount = 4)
public void testAddColumnConcurrently()
throws Exception
{
if (!hasBehavior(SUPPORTS_ADD_COLUMN)) {
// Covered by testAddColumn
return;
}

int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column", "(col integer)")) {
String tableName = table.getName();

List<Future<Optional<String>>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(30, SECONDS);
try {
String columnName = "col" + threadNumber;
getQueryRunner().execute("ALTER TABLE " + tableName + " ADD COLUMN " + columnName + " integer");
return Optional.of(columnName);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
verifyConcurrentAddColumnFailurePermissible(trinoException);
}
catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
return Optional.<String>empty();
}
}))
.collect(toImmutableList());

List<String> addedColumns = futures.stream()
.map(future -> tryGetFutureValue(future, 30, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());

assertThat(query("DESCRIBE " + tableName))
.projected(0)
.skippingTypesCheck()
.matches(Stream.concat(Stream.of("col"), addedColumns.stream())
.map(value -> format("'%s'", value))
.collect(joining(",", "VALUES ", "")));
}
finally {
executor.shutdownNow();
executor.awaitTermination(30, SECONDS);
}
}

protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
// By default, do not expect ALTER TABLE ADD COLUMN to fail in case of concurrent inserts
throw new AssertionError("Unexpected concurrent add column failure", e);
}

@Test
public void testUpdateWithPredicates()
{
Expand Down