Skip to content

Commit

Permalink
Ignore table already exist error when metastore create retried
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 26, 2022
1 parent ab9ea85 commit 9f19e71
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.TableAlreadyExistsException;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HivePrincipal;
Expand Down Expand Up @@ -862,6 +863,12 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
.setParameters(deltaTableProperties(session, location, handle.isExternal()));
setDeltaStorageFormat(tableBuilder, location, getExternalPath(new HdfsContext(session), location));
Table table = tableBuilder.build();
// Ensure the table has queryId set. This is relied on for exception handling
String queryId = session.getQueryId();
verify(
getQueryId(table).orElseThrow(() -> new IllegalArgumentException("Query id is not present")).equals(queryId),
"Table does not have correct query id set",
table);

try {
// For CTAS there is no risk of multiple writers racing. Using writer without transaction isolation so we are not limiting support for CTAS to
Expand All @@ -882,7 +889,19 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true);
transactionLogWriter.flush();
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow());
metastore.createTable(session, table, principalPrivileges);

try {
metastore.createTable(session, table, principalPrivileges);
}
catch (TableAlreadyExistsException e) {
// Ignore TableAlreadyExistsException when table looks like created by us.
// This may happen when an actually successful metastore create call is retried
// e.g. because of a timeout on our side.
Optional<Table> existingTable = metastore.getTable(schemaName, tableName);
if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) {
throw e;
}
}
}
catch (Exception e) {
// Remove the transaction log entry if the table creation fails
Expand All @@ -901,6 +920,12 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
return Optional.empty();
}

private static boolean isCreatedBy(Table table, String queryId)
{
Optional<String> tableQueryId = getQueryId(table);
return tableQueryId.isPresent() && tableQueryId.get().equals(queryId);
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata)
{
Expand Down Expand Up @@ -2247,4 +2272,9 @@ private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, Colle
boolean isPartitionKey = partitionColumns.stream().anyMatch(partition -> partition.equalsIgnoreCase(column.getName()));
return new DeltaLakeColumnHandle(column.getName(), column.getType(), isPartitionKey ? PARTITION_KEY : REGULAR);
}

private static Optional<String> getQueryId(Table table)
{
return Optional.ofNullable(table.getParameters().get(PRESTO_QUERY_ID_NAME));
}
}

0 comments on commit 9f19e71

Please sign in to comment.