From 0e682c0a70983e2b7e0a546e2f017ca722a08119 Mon Sep 17 00:00:00 2001 From: Arkadiusz Czajkowski Date: Tue, 21 Dec 2021 08:59:29 +0100 Subject: [PATCH] Extend flush_metadata_cache procedure to allow flush of specific partition cache entries --- docs/src/main/sphinx/connector/hive.rst | 7 +- ...stCachingHiveMetastoreWithQueryRunner.java | 18 +++ .../metastore/cache/CachingHiveMetastore.java | 27 +++- .../FlushHiveMetastoreCacheProcedure.java | 75 ++++++++++- .../plugin/hive/BaseTestHiveOnDataLake.java | 127 +++++++++++++++++- .../hive/containers/HiveMinioDataLake.java | 10 +- 6 files changed, 252 insertions(+), 12 deletions(-) diff --git a/docs/src/main/sphinx/connector/hive.rst b/docs/src/main/sphinx/connector/hive.rst index 1761af044bb6..49a263c792e8 100644 --- a/docs/src/main/sphinx/connector/hive.rst +++ b/docs/src/main/sphinx/connector/hive.rst @@ -965,7 +965,12 @@ Procedures * ``system.flush_metadata_cache()`` - Flush Hive metadata caches. + Flush all Hive metadata caches. + +* ``system.flush_metadata_cache(schema_name => ..., table_name => ..., partition_columns => ARRAY[...], partition_values => ARRAY[...])`` + + Flush Hive metadata cache entries connected with selected partition. + Procedure requires named parameters to be passed Special columns --------------- diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastoreWithQueryRunner.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastoreWithQueryRunner.java index 5a6290d8929d..1985fd9598cd 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastoreWithQueryRunner.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastoreWithQueryRunner.java @@ -136,6 +136,24 @@ public void testFlushHiveMetastoreCacheProcedureCallable() queryRunner.execute(renamedColumnQuery); } + @Test + public void testIllegalFlushHiveMetastoreCacheProcedureCalls() + { + var illegalParameterMessage = "Illegal parameter set passed. "; + var validUsageExample = "Valid usages:\n - 'flush_metadata_cache()'\n - flush_metadata_cache(schema_name => ..., table_name => ..., partition_column => ARRAY['...'], partition_value => ARRAY['...'])"; + + assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache('dummy_schema')")) + .hasMessage("Procedure should only be invoked with named parameters. " + validUsageExample); + + assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema')")) + .hasMessage(illegalParameterMessage + validUsageExample); + assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table')")) + .hasMessage(illegalParameterMessage + validUsageExample); + + assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column => ARRAY['dummy_partition'])")) + .hasMessage("Parameters partition_column and partition_value should have same length"); + } + @DataProvider public Object[][] testCacheRefreshOnRoleGrantAndRevokeParams() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index 6774eae6690c..6a516c4a724d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -68,6 +68,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.function.Function; +import java.util.function.Predicate; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; @@ -91,6 +92,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.hadoop.hive.common.FileUtils.makePartName; /** * Hive Metastore Cache @@ -255,6 +257,17 @@ public void flushCache() rolesCache.invalidateAll(); } + public void flushPartitionCache(String schemaName, String tableName, List partitionColumns, List partitionValues) + { + requireNonNull(schemaName, "schemaName is null"); + requireNonNull(tableName, "tableName is null"); + requireNonNull(partitionColumns, "partitionColumns is null"); + requireNonNull(partitionValues, "partitionValues is null"); + + String providedPartitionName = makePartName(partitionColumns, partitionValues); + invalidatePartitionCache(schemaName, tableName, partitionNameToCheck -> partitionNameToCheck.map(value -> value.equals(providedPartitionName)).orElse(false)); + } + private static V get(LoadingCache cache, K key) { try { @@ -875,16 +888,26 @@ private Set loadPrincipals(String role) } private void invalidatePartitionCache(String databaseName, String tableName) + { + invalidatePartitionCache(databaseName, tableName, partitionName -> true); + } + + private void invalidatePartitionCache(String databaseName, String tableName, Predicate> partitionPredicate) { HiveTableName hiveTableName = hiveTableName(databaseName, tableName); + + Predicate> hivePartitionPredicate = + partitionName -> partitionName.getKey().getHiveTableName().equals(hiveTableName) && + partitionPredicate.test(partitionName.getKey().getPartitionName()); + partitionCache.asMap().keySet().stream() - .filter(partitionName -> partitionName.getKey().getHiveTableName().equals(hiveTableName)) + .filter(hivePartitionPredicate) .forEach(partitionCache::invalidate); partitionFilterCache.asMap().keySet().stream() .filter(partitionFilter -> partitionFilter.getKey().getHiveTableName().equals(hiveTableName)) .forEach(partitionFilterCache::invalidate); partitionStatisticsCache.asMap().keySet().stream() - .filter(partitionFilter -> partitionFilter.getKey().getHiveTableName().equals(hiveTableName)) + .filter(hivePartitionPredicate) .forEach(partitionStatisticsCache::invalidate); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/procedure/FlushHiveMetastoreCacheProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/procedure/FlushHiveMetastoreCacheProcedure.java index 402625bb0d14..2108ea6155b8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/procedure/FlushHiveMetastoreCacheProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/procedure/FlushHiveMetastoreCacheProcedure.java @@ -17,23 +17,52 @@ import io.trino.plugin.hive.HiveErrorCode; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.spi.TrinoException; +import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.procedure.Procedure; +import io.trino.spi.type.ArrayType; import javax.inject.Inject; import javax.inject.Provider; import java.lang.invoke.MethodHandle; +import java.util.List; import java.util.Optional; +import static com.google.common.base.Preconditions.checkState; import static io.trino.spi.block.MethodHandleUtil.methodHandle; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class FlushHiveMetastoreCacheProcedure implements Provider { + private static final String PROCEDURE_NAME = "flush_metadata_cache"; + + private static final String PARAM_SCHEMA_NAME = "schema_name"; + private static final String PARAM_TABLE_NAME = "table_name"; + private static final String PARAM_PARTITION_COLUMN = "partition_column"; + private static final String PARAM_PARTITION_VALUE = "partition_value"; + + private static final String PROCEDURE_USAGE_EXAMPLES = format( + "Valid usages:%n" + + " - '%1$s()'%n" + + " - %1$s(%2$s => ..., %3$s => ..., %4$s => ARRAY['...'], %5$s => ARRAY['...'])", + PROCEDURE_NAME, + PARAM_SCHEMA_NAME, + PARAM_TABLE_NAME, + PARAM_PARTITION_COLUMN, + PARAM_PARTITION_VALUE); + private static final MethodHandle FLUSH_HIVE_METASTORE_CACHE = methodHandle( FlushHiveMetastoreCacheProcedure.class, - "flushMetadataCache"); + "flushMetadataCache", + String.class, + String.class, + String.class, + List.class, + List.class); + private static final String FAKE_PARAM_DEFAULT_VALUE = "procedure should only be invoked with named parameters"; private final Optional cachingHiveMetastore; @@ -48,15 +77,47 @@ public Procedure get() { return new Procedure( "system", - "flush_metadata_cache", - ImmutableList.of(), + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument("$fake_first_parameter", VARCHAR, false, FAKE_PARAM_DEFAULT_VALUE), + new Procedure.Argument(PARAM_SCHEMA_NAME, VARCHAR, false, null), + new Procedure.Argument(PARAM_TABLE_NAME, VARCHAR, false, null), + new Procedure.Argument(PARAM_PARTITION_COLUMN, new ArrayType(VARCHAR), false, null), + new Procedure.Argument(PARAM_PARTITION_VALUE, new ArrayType(VARCHAR), false, null)), FLUSH_HIVE_METASTORE_CACHE.bindTo(this)); } - public void flushMetadataCache() + public void flushMetadataCache(String fakeParam, String schemaName, String tableName, List partitionColumn, List partitionValue) + { + checkState(FAKE_PARAM_DEFAULT_VALUE.equals(fakeParam), "Procedure should only be invoked with named parameters. " + PROCEDURE_USAGE_EXAMPLES); + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doFlushMetadataCache( + Optional.ofNullable(schemaName), + Optional.ofNullable(tableName), + Optional.ofNullable(partitionColumn).orElse(ImmutableList.of()), + Optional.ofNullable(partitionValue).orElse(ImmutableList.of())); + } + } + + private void doFlushMetadataCache(Optional schemaName, Optional tableName, List partitionColumns, List partitionValues) { - cachingHiveMetastore - .orElseThrow(() -> new TrinoException(HiveErrorCode.HIVE_METASTORE_ERROR, "Cannot flush, metastore cache is not enabled")) - .flushCache(); + CachingHiveMetastore cachingHiveMetastore = this.cachingHiveMetastore + .orElseThrow(() -> new TrinoException(HiveErrorCode.HIVE_METASTORE_ERROR, "Cannot flush, metastore cache is not enabled")); + + checkState( + partitionColumns.size() == partitionValues.size(), + "Parameters partition_column and partition_value should have same length"); + + if (schemaName.isEmpty() && tableName.isEmpty() && partitionColumns.isEmpty()) { + cachingHiveMetastore.flushCache(); + } + else if (schemaName.isPresent() && tableName.isPresent() && !partitionColumns.isEmpty()) { + cachingHiveMetastore.flushPartitionCache(schemaName.get(), tableName.get(), partitionColumns, partitionValues); + } + else { + throw new TrinoException( + HiveErrorCode.HIVE_METASTORE_ERROR, + "Illegal parameter set passed. " + PROCEDURE_USAGE_EXAMPLES); + } } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java index d4a98f222854..13102898f216 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java @@ -13,9 +13,22 @@ */ package io.trino.plugin.hive; +import com.amazonaws.services.s3.AmazonS3; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.trino.Session; +import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.MetastoreConfig; +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.PartitionWithStatistics; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; +import io.trino.plugin.hive.metastore.thrift.TestingMetastoreLocator; +import io.trino.plugin.hive.metastore.thrift.ThriftHiveMetastore; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig; import io.trino.plugin.hive.s3.S3HiveQueryRunner; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; @@ -24,6 +37,8 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Optional; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.sql.TestTable.randomTableSuffix; @@ -39,6 +54,7 @@ public abstract class BaseTestHiveOnDataLake private String bucketName; private HiveMinioDataLake dockerizedS3DataLake; + private HiveMetastore metastoreClient; private final String hiveHadoopImage; @@ -55,12 +71,30 @@ protected QueryRunner createQueryRunner() this.dockerizedS3DataLake = closeAfterClass( new HiveMinioDataLake(bucketName, ImmutableMap.of(), hiveHadoopImage)); this.dockerizedS3DataLake.start(); + this.metastoreClient = new BridgingHiveMetastore(new ThriftHiveMetastore( + new TestingMetastoreLocator( + Optional.empty(), + this.dockerizedS3DataLake.getHiveHadoop().getHiveMetastoreEndpoint()), + new HiveConfig(), + new MetastoreConfig(), + new ThriftMetastoreConfig(), + new HdfsEnvironment(new HiveHdfsConfiguration( + new HdfsConfigurationInitializer( + new HdfsConfig(), + ImmutableSet.of()), + ImmutableSet.of()), + new HdfsConfig(), + new NoHdfsAuthentication()), + false)); return S3HiveQueryRunner.create( dockerizedS3DataLake, ImmutableMap.builder() // This is required when using MinIO which requires path style access .put("hive.insert-existing-partitions-behavior", "OVERWRITE") .put("hive.non-managed-table-writes-enabled", "true") + // Below are required to enable caching on metastore + .put("hive.metastore-cache-ttl", "1d") + .put("hive.metastore-refresh-interval", "1d") .build()); } @@ -160,6 +194,92 @@ public void testInsertOverwritePartitionedAndBucketedExternalTable() assertOverwritePartition(externalTableName); } + @Test + public void testFlushPartitionCache() + { + String tableName = "nation_" + randomTableSuffix(); + String fullyQualifiedTestTableName = getTestTableName(tableName); + String partitionColumn = "regionkey"; + + // Create table with partition on regionkey + computeActual(getCreateTableStatement( + fullyQualifiedTestTableName, + format("partitioned_by=ARRAY['%s']", partitionColumn))); + copyTpchNationToTable(fullyQualifiedTestTableName); + + String queryUsingPartitionCacheTemplate = "SELECT name FROM %s WHERE %s=%s"; + String partitionValue1 = "0"; + String queryUsingPartitionCacheForValue1 = format(queryUsingPartitionCacheTemplate, fullyQualifiedTestTableName, partitionColumn, partitionValue1); + String expectedQueryResultForValue1 = "VALUES 'ALGERIA', 'MOROCCO', 'MOZAMBIQUE', 'ETHIOPIA', 'KENYA'"; + String partitionValue2 = "1"; + String queryUsingPartitionCacheForValue2 = format(queryUsingPartitionCacheTemplate, fullyQualifiedTestTableName, partitionColumn, partitionValue2); + String expectedQueryResultForValue2 = "VALUES 'ARGENTINA', 'BRAZIL', 'CANADA', 'PERU', 'UNITED STATES'"; + + // Fill partition cache and check we got expected results + assertQuery(queryUsingPartitionCacheForValue1, expectedQueryResultForValue1); + assertQuery(queryUsingPartitionCacheForValue2, expectedQueryResultForValue2); + + // Copy partition to new location and update metadata outside Trino + renamePartitionResourcesOutsideTrino(tableName, partitionColumn, partitionValue1); + renamePartitionResourcesOutsideTrino(tableName, partitionColumn, partitionValue2); + + // Should return 0 rows as we moved partition and cache is outdated. We use nonexistent partition + assertQueryReturnsEmptyResult(queryUsingPartitionCacheForValue1); + assertQueryReturnsEmptyResult(queryUsingPartitionCacheForValue2); + + // Refresh cache for schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column => + getQueryRunner().execute(format( + "CALL system.flush_metadata_cache(schema_name => '%s', table_name => '%s', partition_column => ARRAY['%s'], partition_value => ARRAY['%s'])", + HIVE_TEST_SCHEMA, + tableName, + partitionColumn, + partitionValue1)); + + // Should return expected rows as we refresh cache + assertQuery(queryUsingPartitionCacheForValue1, expectedQueryResultForValue1); + // Should return 0 rows as we left cache untouched + assertQueryReturnsEmptyResult(queryUsingPartitionCacheForValue2); + + computeActual(format("DROP TABLE %s", fullyQualifiedTestTableName)); + } + + private void renamePartitionResourcesOutsideTrino(String tableName, String partitionColumn, String regionKey) + { + String partitionName = format("%s=%s", partitionColumn, regionKey); + String partitionS3KeyPrefix = format("%s/%s/%s", HIVE_TEST_SCHEMA, tableName, partitionName); + String renamedPartitionSuffix = "CP"; + + // Copy whole partition to new location + AmazonS3 amazonS3 = dockerizedS3DataLake.getS3Client(); + amazonS3.listObjects(bucketName) + .getObjectSummaries() + .forEach(object -> { + String objectKey = object.getKey(); + if (objectKey.startsWith(partitionS3KeyPrefix)) { + String fileName = objectKey.substring(objectKey.lastIndexOf('/')); + String destinationKey = partitionS3KeyPrefix + renamedPartitionSuffix + fileName; + amazonS3.copyObject(bucketName, objectKey, bucketName, destinationKey); + } + }); + + // Delete old partition and update metadata to point to location of new copy + HiveIdentity hiveIdentity = HiveIdentity.none(); + Table hiveTable = metastoreClient.getTable(hiveIdentity, HIVE_TEST_SCHEMA, tableName).get(); + Partition hivePartition = metastoreClient.getPartition(hiveIdentity, hiveTable, List.of(regionKey)).get(); + Map partitionStatistics = + metastoreClient.getPartitionStatistics(hiveIdentity, hiveTable, List.of(hivePartition)); + + metastoreClient.dropPartition(hiveIdentity, HIVE_TEST_SCHEMA, tableName, List.of(regionKey), true); + metastoreClient.addPartitions(hiveIdentity, HIVE_TEST_SCHEMA, tableName, List.of( + new PartitionWithStatistics( + Partition.builder(hivePartition) + .withStorage(builder -> builder.setLocation( + hivePartition.getStorage().getLocation() + renamedPartitionSuffix)) + .build(), + partitionName, + partitionStatistics.get(partitionName)))); + } + protected void assertInsertFailure(String testTable, String expectedMessageRegExp) { assertInsertFailure(getSession(), testTable, expectedMessageRegExp); @@ -208,7 +328,12 @@ protected void assertOverwritePartition(String testTable) protected String getTestTableName() { - return format("hive.%s.%s", HIVE_TEST_SCHEMA, "nation_" + randomTableSuffix()); + return getTestTableName("nation_" + randomTableSuffix()); + } + + protected String getTestTableName(String tableName) + { + return format("hive.%s.%s", HIVE_TEST_SCHEMA, tableName); } protected String getCreateTableStatement(String tableName, String... propertiesEntries) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java index d71cb7a1f04f..4b3f881564c6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java @@ -41,6 +41,7 @@ public class HiveMinioDataLake private final AutoCloseableCloser closer = AutoCloseableCloser.create(); private State state = State.INITIAL; + private AmazonS3 s3Client; public HiveMinioDataLake(String bucketName, Map hiveHadoopFilesToMount) { @@ -76,7 +77,7 @@ public void start() state = State.STARTING; minio.start(); hiveHadoop.start(); - AmazonS3 s3Client = AmazonS3ClientBuilder + s3Client = AmazonS3ClientBuilder .standard() .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( "http://localhost:" + minio.getMinioApiEndpoint().getPort(), @@ -86,9 +87,16 @@ public void start() new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY))) .build(); s3Client.createBucket(this.bucketName); + closer.register(() -> s3Client.shutdown()); state = State.STARTED; } + public AmazonS3 getS3Client() + { + checkState(state == State.STARTED, "Can't provide client when MinIO state is: %s", state); + return s3Client; + } + public void stop() throws Exception {