From 9f19e710f28a1111b07208db43a1ac933aa313cf Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 25 May 2022 16:06:48 +0900 Subject: [PATCH] Ignore table already exist error when metastore create retried --- .../plugin/deltalake/DeltaLakeMetadata.java | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 8299f58954cc..d573e627c742 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -48,6 +48,7 @@ import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import io.trino.plugin.hive.HiveType; +import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HivePrincipal; @@ -862,6 +863,12 @@ public Optional finishCreateTable( .setParameters(deltaTableProperties(session, location, handle.isExternal())); setDeltaStorageFormat(tableBuilder, location, getExternalPath(new HdfsContext(session), location)); Table table = tableBuilder.build(); + // Ensure the table has queryId set. This is relied on for exception handling + String queryId = session.getQueryId(); + verify( + getQueryId(table).orElseThrow(() -> new IllegalArgumentException("Query id is not present")).equals(queryId), + "Table does not have correct query id set", + table); try { // For CTAS there is no risk of multiple writers racing. Using writer without transaction isolation so we are not limiting support for CTAS to @@ -882,7 +889,19 @@ public Optional finishCreateTable( appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true); transactionLogWriter.flush(); PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow()); - metastore.createTable(session, table, principalPrivileges); + + try { + metastore.createTable(session, table, principalPrivileges); + } + catch (TableAlreadyExistsException e) { + // Ignore TableAlreadyExistsException when table looks like created by us. + // This may happen when an actually successful metastore create call is retried + // e.g. because of a timeout on our side. + Optional existingTable = metastore.getTable(schemaName, tableName); + if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) { + throw e; + } + } } catch (Exception e) { // Remove the transaction log entry if the table creation fails @@ -901,6 +920,12 @@ public Optional finishCreateTable( return Optional.empty(); } + private static boolean isCreatedBy(Table table, String queryId) + { + Optional tableQueryId = getQueryId(table); + return tableQueryId.isPresent() && tableQueryId.get().equals(queryId); + } + @Override public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata) { @@ -2247,4 +2272,9 @@ private static DeltaLakeColumnHandle toColumnHandle(ColumnMetadata column, Colle boolean isPartitionKey = partitionColumns.stream().anyMatch(partition -> partition.equalsIgnoreCase(column.getName())); return new DeltaLakeColumnHandle(column.getName(), column.getType(), isPartitionKey ? PARTITION_KEY : REGULAR); } + + private static Optional getQueryId(Table table) + { + return Optional.ofNullable(table.getParameters().get(PRESTO_QUERY_ID_NAME)); + } }