diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 63eebe5d3981..da1bc3732dc8 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -601,6 +601,20 @@ existing Delta Lake table from the metastores without deleting the data:: CALL example.system.unregister_table(schema_name => 'testdb', table_name => 'customer_orders') +.. _delta-lake-flush-metadata-cache: + +Flush metadata cache +^^^^^^^^^^^^^^^^^^^^ + +* ``system.flush_metadata_cache()`` + + Flush all metadata caches. + +* ``system.flush_metadata_cache(schema_name => ..., table_name => ...)`` + + Flush metadata caches entries connected with selected table. + Procedure requires named parameters to be passed + .. _delta-lake-write-support: Updating data diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 495775171b11..3e1114330f4f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -420,7 +420,7 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable return new DeltaLakeTableHandle( dataTableName.getSchemaName(), dataTableName.getTableName(), - metastore.getTableLocation(dataTableName, session), + metastore.getTableLocation(dataTableName), metadata, TupleDomain.all(), TupleDomain.all(), @@ -449,7 +449,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table; - String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); + String location = metastore.getTableLocation(tableHandle.getSchemaTableName()); Map columnComments = getColumnComments(tableHandle.getMetadataEntry()); Map columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry()); List constraints = ImmutableList.builder() @@ -1415,7 +1415,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl Optional checkpointInterval = handle.getMetadataEntry().getCheckpointInterval(); - String tableLocation = metastore.getTableLocation(handle.getSchemaTableName(), session); + String tableLocation = metastore.getTableLocation(handle.getSchemaTableName()); boolean writeCommitted = false; try { @@ -1660,7 +1660,7 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableHandle) { try { - String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); + String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName()); Path tableMetadataDirectory = new Path(new Path(tableLocation).getParent().toString(), tableHandle.getTableName()); boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableMetadataDirectory.toString()).isUnsafe(); return !requiresOptIn || unsafeWritesEnabled; @@ -2260,7 +2260,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH { DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table; AnalyzeHandle analyzeHandle = tableHandle.getAnalyzeHandle().orElseThrow(() -> new IllegalArgumentException("analyzeHandle not set")); - String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); + String location = metastore.getTableLocation(tableHandle.getSchemaTableName()); Optional maxFileModificationTime = getMaxFileModificationTime(computedStatistics); updateTableStatistics( session, @@ -2554,7 +2554,7 @@ private List getCommitInfoEntries(SchemaTableName table, Connec { TrinoFileSystem fileSystem = fileSystemFactory.create(session); try { - return TransactionLogTail.loadNewTail(fileSystem, metastore.getTableLocation(table, session), Optional.empty()).getFileEntries().stream() + return TransactionLogTail.loadNewTail(fileSystem, metastore.getTableLocation(table), Optional.empty()).getFileEntries().stream() .map(DeltaLakeTransactionLogEntry::getCommitInfo) .filter(Objects::nonNull) .collect(toImmutableList()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java index bbd0f7bace91..c18ff8c17b4e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java @@ -25,6 +25,7 @@ import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure; +import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure; import io.trino.plugin.deltalake.procedure.OptimizeTableProcedure; import io.trino.plugin.deltalake.procedure.RegisterTableProcedure; import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure; @@ -147,6 +148,7 @@ public void setup(Binder binder) procedures.addBinding().toProvider(VacuumProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON); Multibinder tableProcedures = newSetBinder(binder, TableProcedureMetadata.class); tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 73b04442b448..efe05f39df59 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -130,7 +130,7 @@ private Stream getSplits( Constraint constraint) { DeltaLakeMetastore metastore = getMetastore(session, transaction); - String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName(), session); + String tableLocation = metastore.getTableLocation(tableHandle.getSchemaTableName()); List validDataFiles = metastore.getValidDataFiles(tableHandle.getSchemaTableName(), session); TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); TupleDomain nonPartitionConstraint = tableHandle.getNonPartitionConstraint(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java index c259b1eeec8d..68beb6b73650 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastore.java @@ -53,7 +53,7 @@ public interface DeltaLakeMetastore ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot table); - String getTableLocation(SchemaTableName table, ConnectorSession session); + String getTableLocation(SchemaTableName table); TableSnapshot getSnapshot(SchemaTableName table, ConnectorSession session); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java index b4afa22e5e60..616084931cfc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeMetastoreModule.java @@ -37,7 +37,7 @@ protected void setup(Binder binder) bindMetastoreModule("file", new DeltaLakeFileMetastoreModule()); bindMetastoreModule("glue", new DeltaLakeGlueMetastoreModule()); - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(false)); } private void bindMetastoreModule(String name, Module module) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index 9cacae767fda..f4438d68b456 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -121,18 +121,21 @@ public List getAllTables(String databaseName) public Optional getTable(String databaseName, String tableName) { Optional
candidate = delegate.getTable(databaseName, tableName); - candidate.ifPresent(table -> { - if (isHiveOrPrestoView(table)) { - // this is a Hive view, hence not a table - throw new NotADeltaLakeTableException(databaseName, tableName); - } - if (!TABLE_PROVIDER_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_PROVIDER_PROPERTY))) { - throw new NotADeltaLakeTableException(databaseName, tableName); - } - }); + candidate.ifPresent(HiveMetastoreBackedDeltaLakeMetastore::verifyDeltaLakeTable); return candidate; } + public static void verifyDeltaLakeTable(Table table) + { + if (isHiveOrPrestoView(table)) { + // this is a Hive view, hence not a table + throw new NotADeltaLakeTableException(table.getDatabaseName(), table.getTableName()); + } + if (!TABLE_PROVIDER_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_PROVIDER_PROPERTY))) { + throw new NotADeltaLakeTableException(table.getDatabaseName(), table.getTableName()); + } + } + @Override public void createDatabase(Database database) { @@ -167,7 +170,7 @@ public void createTable(ConnectorSession session, Table table, PrincipalPrivileg @Override public void dropTable(ConnectorSession session, String databaseName, String tableName, boolean deleteData) { - String tableLocation = getTableLocation(new SchemaTableName(databaseName, tableName), session); + String tableLocation = getTableLocation(new SchemaTableName(databaseName, tableName)); delegate.dropTable(databaseName, tableName, deleteData); statisticsAccess.invalidateCache(tableLocation); transactionLogAccess.invalidateCaches(tableLocation); @@ -202,11 +205,16 @@ public ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot tableSn } @Override - public String getTableLocation(SchemaTableName table, ConnectorSession session) + public String getTableLocation(SchemaTableName tableName) + { + Table table = getTable(tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); + return getTableLocation(table); + } + + public static String getTableLocation(Table table) { - Map serdeParameters = getTable(table.getSchemaName(), table.getTableName()) - .orElseThrow(() -> new TableNotFoundException(table)) - .getStorage().getSerdeParameters(); + Map serdeParameters = table.getStorage().getSerdeParameters(); String location = serdeParameters.get(PATH_PROPERTY); if (location == null) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("No %s property defined for table: %s", PATH_PROPERTY, table)); @@ -218,7 +226,7 @@ public String getTableLocation(SchemaTableName table, ConnectorSession session) public TableSnapshot getSnapshot(SchemaTableName table, ConnectorSession session) { try { - return transactionLogAccess.loadSnapshot(table, getTableLocation(table, session), session); + return transactionLogAccess.loadSnapshot(table, getTableLocation(table), session); } catch (NotADeltaLakeTableException e) { throw e; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java index 698f58442732..0af6ac85e37e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java @@ -83,6 +83,6 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name)); } accessControl.checkCanInsertIntoTable(null, name); - statsAccess.deleteExtendedStatistics(session, metadata.getMetastore().getTableLocation(name, session)); + statsAccess.deleteExtendedStatistics(session, metadata.getMetastore().getTableLocation(name)); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java new file mode 100644 index 000000000000..b75445e8373d --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.procedure; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; +import io.trino.spi.TrinoException; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.procedure.Procedure; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.lang.invoke.MethodHandle; +import java.util.Optional; + +import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.getTableLocation; +import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.verifyDeltaLakeTable; +import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; + +public class FlushMetadataCacheProcedure + implements Provider +{ + private static final String PROCEDURE_NAME = "flush_metadata_cache"; + + private static final String PARAM_SCHEMA_NAME = "SCHEMA_NAME"; + private static final String PARAM_TABLE_NAME = "TABLE_NAME"; + + private final HiveMetastoreFactory metastoreFactory; + private final Optional cachingHiveMetastore; + private final TransactionLogAccess transactionLogAccess; + + private static final MethodHandle FLUSH_METADATA_CACHE; + + static { + try { + FLUSH_METADATA_CACHE = lookup().unreflect(FlushMetadataCacheProcedure.class.getMethod("flushMetadataCache", ConnectorSession.class, String.class, String.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + @Inject + public FlushMetadataCacheProcedure( + HiveMetastoreFactory metastoreFactory, + Optional cachingHiveMetastore, + TransactionLogAccess transactionLogAccess) + { + this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.cachingHiveMetastore = requireNonNull(cachingHiveMetastore, "cachingHiveMetastore is null"); + this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(PARAM_SCHEMA_NAME, VARCHAR, false, null), + new Procedure.Argument(PARAM_TABLE_NAME, VARCHAR, false, null)), + FLUSH_METADATA_CACHE.bindTo(this), + true); + } + + public void flushMetadataCache(ConnectorSession session, String schemaName, String tableName) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doFlushMetadataCache(session, Optional.ofNullable(schemaName), Optional.ofNullable(tableName)); + } + } + + private void doFlushMetadataCache(ConnectorSession session, Optional schemaName, Optional tableName) + { + if (schemaName.isEmpty() && tableName.isEmpty()) { + cachingHiveMetastore.ifPresent(CachingHiveMetastore::flushCache); + transactionLogAccess.flushCache(); + } + else if (schemaName.isPresent() && tableName.isPresent()) { + HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity())); + Table table = metastore.getTable(schemaName.get(), tableName.get()) + .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schemaName.get(), tableName.get()))); + verifyDeltaLakeTable(table); + cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(table.getDatabaseName(), table.getTableName())); + transactionLogAccess.invalidateCaches(getTableLocation(table)); + } + else { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Illegal parameter set passed"); + } + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index c1b4772ab7e6..77b7ee15a151 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -175,6 +175,12 @@ public TableSnapshot loadSnapshot(SchemaTableName table, String tableLocation, C return snapshot; } + public void flushCache() + { + tableSnapshots.invalidateAll(); + activeDataFileCache.invalidateAll(); + } + public void invalidateCaches(String tableLocation) { tableSnapshots.invalidate(tableLocation); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java new file mode 100644 index 000000000000..57bcb27e2cb6 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner; +import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; +import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE; + +public class TestDeltaLakeFlushMetadataCacheProcedure + extends AbstractTestQueryFramework +{ + private static final String BUCKET_NAME = "delta-lake-test-flush-metadata-cache"; + + private HiveMetastore metastore; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + HiveMinioDataLake hiveMinioDataLake = new HiveMinioDataLake(BUCKET_NAME, HIVE3_IMAGE); + hiveMinioDataLake.start(); + metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint()) + .build()); + + return createS3DeltaLakeQueryRunner( + DELTA_CATALOG, + "default", + ImmutableMap.of("hive.metastore-cache-ttl", "10m"), + hiveMinioDataLake.getMinio().getMinioAddress(), + hiveMinioDataLake.getHiveHadoop()); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + throws IOException + { + metastore = null; + } + + @Test + public void testFlushMetadataCache() + { + assertUpdate("CREATE SCHEMA cached WITH (location = 's3://" + BUCKET_NAME + "/cached')"); + assertUpdate("CREATE TABLE cached.cached AS SELECT * FROM tpch.tiny.nation", 25); + + // Verify that column cache is flushed + // Fill caches + assertQuerySucceeds("SELECT name, regionkey FROM cached.cached"); + + // Verify that table cache is flushed + String showTablesSql = "SHOW TABLES FROM cached"; + // Fill caches + assertQuery(showTablesSql, "VALUES 'cached'"); + + // Rename table outside Trino + metastore.renameTable("cached", "cached", "cached", "renamed"); + + // Should still return old table name from cache + assertQuery(showTablesSql, "VALUES 'cached'"); + + // Should return new table name after cache flush + assertUpdate("CALL system.flush_metadata_cache(schema_name => 'cached', table_name => 'cached')"); + assertQuery(showTablesSql, "VALUES 'renamed'"); + + // Verify that schema cache is flushed + String showSchemasSql = "SHOW SCHEMAS FROM delta_lake"; + // Fill caches + assertQuery(showSchemasSql, "VALUES ('cached'), ('information_schema'), ('default')"); + + // Drop a table and a schema outside Trino + metastore.dropTable("cached", "renamed", false); + metastore.dropDatabase("cached", false); + + // Should still return old schemas from cache + assertQuery(showSchemasSql, "VALUES ('cached'), ('information_schema'), ('default')"); + + // Should not return the old schema name after cache flush + assertUpdate("CALL system.flush_metadata_cache()"); + assertQuery(showSchemasSql, "VALUES ('information_schema'), ('default')"); + } + + @Test + public void testFlushMetadataCacheTableNotFound() + { + assertQueryFails( + "CALL system.flush_metadata_cache(schema_name => 'test_not_existing_schema', table_name => 'test_not_existing_table')", + "Table 'test_not_existing_schema.test_not_existing_table' not found"); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 5fceb69352a2..5d0f8e8f0d48 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -285,7 +285,7 @@ public ProtocolEntry getProtocol(ConnectorSession session, TableSnapshot tableSn } @Override - public String getTableLocation(SchemaTableName table, ConnectorSession session) + public String getTableLocation(SchemaTableName table) { return TABLE_PATH; } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 52ae8b3b20c3..000620b55783 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -760,6 +760,43 @@ public void testTableSnapshotsActiveDataFilesCache() "00000000000000000010.checkpoint.parquet", 2)); } + @Test + public void testFlushSnapshotAndActiveFileCache() + throws Exception + { + String tableName = "person"; + String tableDir = getClass().getClassLoader().getResource("databricks/" + tableName).toURI().toString(); + DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); + shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES)); + setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig); + + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + assertEquals(addFileEntries.size(), 12); + assertThat(accessTrackingFileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "_last_checkpoint", 1, + "00000000000000000011.json", 1, + "00000000000000000012.json", 1, + "00000000000000000013.json", 1, + "00000000000000000014.json", 1, + "00000000000000000010.checkpoint.parquet", 2)); + + // Flush all cache and then load snapshot and get active files + transactionLogAccess.flushCache(); + transactionLogAccess.loadSnapshot(new SchemaTableName("schema", tableName), tableDir, SESSION); + addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + + assertEquals(addFileEntries.size(), 12); + assertThat(accessTrackingFileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "_last_checkpoint", 2, + "00000000000000000011.json", 2, + "00000000000000000012.json", 2, + "00000000000000000013.json", 2, + "00000000000000000014.json", 2, + "00000000000000000010.checkpoint.parquet", 4)); + } + @Test public void testTableSnapshotsActiveDataFilesCacheDisabled() throws Exception diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestingDeltaLakeMetastoreModule.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestingDeltaLakeMetastoreModule.java index 7dd656d78410..8db4f0ba31f6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestingDeltaLakeMetastoreModule.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestingDeltaLakeMetastoreModule.java @@ -39,7 +39,7 @@ public TestingDeltaLakeMetastoreModule(HiveMetastore metastore) public void setup(Binder binder) { binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore)); - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(false)); binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); binder.bind(Key.get(boolean.class, AllowDeltaLakeManagedTableRename.class)).toInstance(true); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/DecoratedHiveMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/DecoratedHiveMetastoreModule.java index 64bad8be8dbc..e94001191ff3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/DecoratedHiveMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/DecoratedHiveMetastoreModule.java @@ -42,6 +42,13 @@ public class DecoratedHiveMetastoreModule extends AbstractConfigurationAwareModule { + private final boolean installFlushMetadataCacheProcedure; + + public DecoratedHiveMetastoreModule(boolean installFlushMetadataCacheProcedure) + { + this.installFlushMetadataCacheProcedure = installFlushMetadataCacheProcedure; + } + @Override protected void setup(Binder binder) { @@ -56,7 +63,9 @@ protected void setup(Binder binder) newExporter(binder).export(HiveMetastoreFactory.class) .as(generator -> generator.generatedNameOf(CachingHiveMetastore.class)); - newSetBinder(binder, Procedure.class).addBinding().toProvider(FlushHiveMetastoreCacheProcedure.class).in(Scopes.SINGLETON); + if (installFlushMetadataCacheProcedure) { + newSetBinder(binder, Procedure.class).addBinding().toProvider(FlushHiveMetastoreCacheProcedure.class).in(Scopes.SINGLETON); + } } @Provides diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java index 943122ace1bc..d703227a43e3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/HiveMetastoreModule.java @@ -56,7 +56,7 @@ protected void setup(Binder binder) bindMetastoreModule("alluxio-deprecated", deferredModule("io.trino.plugin.hive.metastore.alluxio.AlluxioMetastoreModule")); } - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(true)); } private void bindMetastoreModule(String name, Module module) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java index a8dd62765d7c..e937260c7c86 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java @@ -47,7 +47,7 @@ protected void setup(Binder binder) binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(MetastoreValidator.class).asEagerSingleton(); binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(HIDE_DELTA_LAKE_TABLES_IN_ICEBERG); - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(false)); configBinder(binder).bindConfigDefaults(CachingHiveMetastoreConfig.class, config -> { // ensure caching metastore wrapper isn't created, as it's not leveraged by Iceberg diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java index c40dfa4e6c60..2c385032d152 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java @@ -49,7 +49,7 @@ protected void setup(Binder binder) binder.bind(MetastoreValidator.class).asEagerSingleton(); binder.bind(Key.get(boolean.class, TranslateHiveViews.class)).toInstance(false); binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(HIDE_DELTA_LAKE_TABLES_IN_ICEBERG); - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(false)); configBinder(binder).bindConfigDefaults(CachingHiveMetastoreConfig.class, config -> { // ensure caching metastore wrapper isn't created, as it's not leveraged by Iceberg diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestingIcebergFileMetastoreCatalogModule.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestingIcebergFileMetastoreCatalogModule.java index e83e43b05eb4..79228aefa6ef 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestingIcebergFileMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestingIcebergFileMetastoreCatalogModule.java @@ -45,7 +45,7 @@ public TestingIcebergFileMetastoreCatalogModule(HiveMetastore metastore) protected void setup(Binder binder) { binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore)); - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(false)); binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestingIcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestingIcebergHiveMetastoreCatalogModule.java index 0b7e1694c824..15edd94e2431 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestingIcebergHiveMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestingIcebergHiveMetastoreCatalogModule.java @@ -46,7 +46,7 @@ public TestingIcebergHiveMetastoreCatalogModule(HiveMetastore hiveMetastore, Thr @Override protected void setup(Binder binder) { - install(new DecoratedHiveMetastoreModule()); + install(new DecoratedHiveMetastoreModule(false)); binder.bind(ThriftMetastoreFactory.class).toInstance(this.thriftMetastoreFactory); binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(this.hiveMetastore)); binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeActiveFilesCache.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeActiveFilesCache.java new file mode 100644 index 000000000000..60412d201c88 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeActiveFilesCache.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake; + +import com.amazonaws.services.s3.AmazonS3; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import io.trino.tempto.BeforeTestWithContext; +import io.trino.testng.services.Flaky; +import org.testng.annotations.Test; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.removeS3Directory; +import static io.trino.tests.product.utils.QueryExecutors.onDelta; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; + +public class TestDeltaLakeActiveFilesCache + extends BaseTestDeltaLakeS3Storage +{ + @Inject + @Named("s3.server_type") + private String s3ServerType; + + private AmazonS3 s3; + + @BeforeTestWithContext + public void setup() + { + super.setUp(); + s3 = new S3ClientFactory().createS3Client(s3ServerType); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testRefreshTheFilesCacheWhenTableIsRecreated() + { + String tableName = "test_dl_cached_table_files_refresh_" + randomNameSuffix(); + String tableDirectory = "databricks-compatibility-test-" + tableName; + + onTrino().executeQuery(format("CREATE TABLE delta.default.%s (col INT) WITH (location = 's3://%s/%s')", + tableName, + bucketName, + tableDirectory)); + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 1"); + // Add the files of the table in the active files cache + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(row(1)); + + // Recreate the table outside Trino to avoid updating the Trino table active files cache + onDelta().executeQuery("DROP TABLE default." + tableName); + // Delete the contents of the table explicitly from storage (because it has been created as `EXTERNAL`) + removeS3Directory(s3, bucketName, tableDirectory); + + onDelta().executeQuery(format("CREATE TABLE default.%s (col INTEGER) USING DELTA LOCATION 's3://%s/%s'", + tableName, + bucketName, + tableDirectory)); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES 2"); + + // TODO https://github.com/trinodb/trino/issues/13737 Fix failure when active files cache is stale + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasMessageContaining("Error opening Hive split"); + + // Verify flushing cache resolve the query failure + onTrino().executeQuery("CALL delta.system.flush_metadata_cache(schema_name => 'default', table_name => '" + tableName + "')"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(row(2)); + + onTrino().executeQuery("DROP TABLE delta.default." + tableName); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java index af763f339fc1..e1b1ee36d855 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.java @@ -14,13 +14,9 @@ package io.trino.tests.product.deltalake; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.google.inject.name.Named; -import io.airlift.log.Logger; import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.assertions.QueryAssert; import io.trino.tempto.query.QueryResult; @@ -30,7 +26,6 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.List; -import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.tempto.assertions.QueryAssert.Row.row; @@ -44,6 +39,7 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.removeS3Directory; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -51,8 +47,6 @@ public class TestDeltaLakeDatabricksCreateTableAsSelectCompatibility extends BaseTestDeltaLakeS3Storage { - private static final Logger log = Logger.get(TestDeltaLakeDatabricksCreateTableAsSelectCompatibility.class); - @Inject @Named("s3.server_type") private String s3ServerType; @@ -148,7 +142,7 @@ public void testPrestoCacheInvalidatedOnCreateTable() .collect(toImmutableList())); dropDeltaTableWithRetry("default." + tableName); - removeS3Directory(bucketName, "databricks-compatibility-test-" + tableName); + removeS3Directory(s3, bucketName, "databricks-compatibility-test-" + tableName); assertThat(onTrino().executeQuery("CREATE TABLE delta.default.\"" + tableName + "\" " + "(id, boolean, tinyint) " + @@ -256,17 +250,4 @@ public void testReplaceTableWithSchemaChangeOnCheckpoint() dropDeltaTableWithRetry(tableName); } } - - private void removeS3Directory(String bucketName, String directoryPrefix) - { - ObjectListing listing = s3.listObjects(bucketName, directoryPrefix); - do { - List objectKeys = listing.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toUnmodifiableList()); - DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(objectKeys.toArray(new String[0])); - log.info("Deleting keys: %s", objectKeys); - s3.deleteObjects(deleteObjectsRequest); - listing = s3.listNextBatchOfObjects(listing); - } - while (listing.isTruncated()); - } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index 5e38d39d21b0..757189237fbf 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -14,6 +14,10 @@ package io.trino.tests.product.deltalake.util; import com.amazonaws.services.glue.model.ConcurrentModificationException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Throwables; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; @@ -23,8 +27,10 @@ import org.intellij.lang.annotations.Language; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Optional; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -88,4 +94,17 @@ public static QueryResult dropDeltaTableWithRetry(String tableName) return Failsafe.with(CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY) .get(() -> onDelta().executeQuery("DROP TABLE IF EXISTS " + tableName)); } + + public static void removeS3Directory(AmazonS3 s3, String bucketName, String directoryPrefix) + { + ObjectListing listing = s3.listObjects(bucketName, directoryPrefix); + do { + List objectKeys = listing.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(toImmutableList()); + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(objectKeys.toArray(new String[0])); + log.info("Deleting keys: %s", objectKeys); + s3.deleteObjects(deleteObjectsRequest); + listing = s3.listNextBatchOfObjects(listing); + } + while (listing.isTruncated()); + } }