Skip to content

Commit

Permalink
Extend flush_metadata_cache procedure to allow flush of specific part…
Browse files Browse the repository at this point in the history
…ition cache entries
  • Loading branch information
Arkadiusz Czajkowski authored and losipiuk committed Jan 16, 2022
1 parent d1a6991 commit 6529d7b
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 12 deletions.
7 changes: 6 additions & 1 deletion docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,12 @@ Procedures

* ``system.flush_metadata_cache()``

Flush Hive metadata caches.
Flush all Hive metadata caches.

* ``system.flush_metadata_cache(schema_name => ..., table_name => ..., partition_columns => ARRAY[...], partition_values => ARRAY[...])``

Flush Hive metadata cache entries connected with selected partition.
Procedure requires named parameters to be passed

Special columns
---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ public void testFlushHiveMetastoreCacheProcedureCallable()
queryRunner.execute(renamedColumnQuery);
}

@Test
public void testIllegalFlushHiveMetastoreCacheProcedureCalls()
{
var illegalParameterMessage = "Illegal parameter set passed. ";
var validUsageExample = "Valid usages:\n - 'flush_metadata_cache()'\n - flush_metadata_cache(schema_name => ..., table_name => ..., partition_column => ARRAY['...'], partition_value => ARRAY['...'])";

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache('dummy_schema')"))
.hasMessage("Procedure should only be invoked with named parameters. " + validUsageExample);

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema')"))
.hasMessage(illegalParameterMessage + validUsageExample);
assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table')"))
.hasMessage(illegalParameterMessage + validUsageExample);

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column => ARRAY['dummy_partition'])"))
.hasMessage("Parameters partition_column and partition_value should have same length");
}

@DataProvider
public Object[][] testCacheRefreshOnRoleGrantAndRevokeParams()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -91,6 +92,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;

/**
* Hive Metastore Cache
Expand Down Expand Up @@ -255,6 +257,17 @@ public void flushCache()
rolesCache.invalidateAll();
}

public void flushPartitionCache(String schemaName, String tableName, List<String> partitionColumns, List<String> partitionValues)
{
requireNonNull(schemaName, "schemaName is null");
requireNonNull(tableName, "tableName is null");
requireNonNull(partitionColumns, "partitionColumns is null");
requireNonNull(partitionValues, "partitionValues is null");

String providedPartitionName = makePartName(partitionColumns, partitionValues);
invalidatePartitionCache(schemaName, tableName, partitionNameToCheck -> partitionNameToCheck.map(value -> value.equals(providedPartitionName)).orElse(false));
}

private static <K, V> V get(LoadingCache<K, V> cache, K key)
{
try {
Expand Down Expand Up @@ -875,16 +888,26 @@ private Set<RoleGrant> loadPrincipals(String role)
}

private void invalidatePartitionCache(String databaseName, String tableName)
{
invalidatePartitionCache(databaseName, tableName, partitionName -> true);
}

private void invalidatePartitionCache(String databaseName, String tableName, Predicate<Optional<String>> partitionPredicate)
{
HiveTableName hiveTableName = hiveTableName(databaseName, tableName);

Predicate<WithIdentity<HivePartitionName>> hivePartitionPredicate =
partitionName -> partitionName.getKey().getHiveTableName().equals(hiveTableName) &&
partitionPredicate.test(partitionName.getKey().getPartitionName());

partitionCache.asMap().keySet().stream()
.filter(partitionName -> partitionName.getKey().getHiveTableName().equals(hiveTableName))
.filter(hivePartitionPredicate)
.forEach(partitionCache::invalidate);
partitionFilterCache.asMap().keySet().stream()
.filter(partitionFilter -> partitionFilter.getKey().getHiveTableName().equals(hiveTableName))
.forEach(partitionFilterCache::invalidate);
partitionStatisticsCache.asMap().keySet().stream()
.filter(partitionFilter -> partitionFilter.getKey().getHiveTableName().equals(hiveTableName))
.filter(hivePartitionPredicate)
.forEach(partitionStatisticsCache::invalidate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,52 @@
import io.trino.plugin.hive.HiveErrorCode;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.type.ArrayType;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.spi.block.MethodHandleUtil.methodHandle;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class FlushHiveMetastoreCacheProcedure
implements Provider<Procedure>
{
private static final String PROCEDURE_NAME = "flush_metadata_cache";

private static final String PARAM_SCHEMA_NAME = "schema_name";
private static final String PARAM_TABLE_NAME = "table_name";
private static final String PARAM_PARTITION_COLUMN = "partition_column";
private static final String PARAM_PARTITION_VALUE = "partition_value";

private static final String PROCEDURE_USAGE_EXAMPLES = format(
"Valid usages:%n" +
" - '%1$s()'%n" +
" - %1$s(%2$s => ..., %3$s => ..., %4$s => ARRAY['...'], %5$s => ARRAY['...'])",
PROCEDURE_NAME,
PARAM_SCHEMA_NAME,
PARAM_TABLE_NAME,
PARAM_PARTITION_COLUMN,
PARAM_PARTITION_VALUE);

private static final MethodHandle FLUSH_HIVE_METASTORE_CACHE = methodHandle(
FlushHiveMetastoreCacheProcedure.class,
"flushMetadataCache");
"flushMetadataCache",
String.class,
String.class,
String.class,
List.class,
List.class);
private static final String FAKE_PARAM_DEFAULT_VALUE = "procedure should only be invoked with named parameters";

private final Optional<CachingHiveMetastore> cachingHiveMetastore;

Expand All @@ -48,15 +77,47 @@ public Procedure get()
{
return new Procedure(
"system",
"flush_metadata_cache",
ImmutableList.of(),
PROCEDURE_NAME,
ImmutableList.of(
new Procedure.Argument("$fake_first_parameter", VARCHAR, false, FAKE_PARAM_DEFAULT_VALUE),
new Procedure.Argument(PARAM_SCHEMA_NAME, VARCHAR, false, null),
new Procedure.Argument(PARAM_TABLE_NAME, VARCHAR, false, null),
new Procedure.Argument(PARAM_PARTITION_COLUMN, new ArrayType(VARCHAR), false, null),
new Procedure.Argument(PARAM_PARTITION_VALUE, new ArrayType(VARCHAR), false, null)),
FLUSH_HIVE_METASTORE_CACHE.bindTo(this));
}

public void flushMetadataCache()
public void flushMetadataCache(String fakeParam, String schemaName, String tableName, List<String> partitionColumn, List<String> partitionValue)
{
checkState(FAKE_PARAM_DEFAULT_VALUE.equals(fakeParam), "Procedure should only be invoked with named parameters. " + PROCEDURE_USAGE_EXAMPLES);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doFlushMetadataCache(
Optional.ofNullable(schemaName),
Optional.ofNullable(tableName),
Optional.ofNullable(partitionColumn).orElse(ImmutableList.of()),
Optional.ofNullable(partitionValue).orElse(ImmutableList.of()));
}
}

private void doFlushMetadataCache(Optional<String> schemaName, Optional<String> tableName, List<String> partitionColumns, List<String> partitionValues)
{
cachingHiveMetastore
.orElseThrow(() -> new TrinoException(HiveErrorCode.HIVE_METASTORE_ERROR, "Cannot flush, metastore cache is not enabled"))
.flushCache();
CachingHiveMetastore cachingHiveMetastore = this.cachingHiveMetastore
.orElseThrow(() -> new TrinoException(HiveErrorCode.HIVE_METASTORE_ERROR, "Cannot flush, metastore cache is not enabled"));

checkState(
partitionColumns.size() == partitionValues.size(),
"Parameters partition_column and partition_value should have same length");

if (schemaName.isEmpty() && tableName.isEmpty() && partitionColumns.isEmpty()) {
cachingHiveMetastore.flushCache();
}
else if (schemaName.isPresent() && tableName.isPresent() && !partitionColumns.isEmpty()) {
cachingHiveMetastore.flushPartitionCache(schemaName.get(), tableName.get(), partitionColumns, partitionValues);
}
else {
throw new TrinoException(
HiveErrorCode.HIVE_METASTORE_ERROR,
"Illegal parameter set passed. " + PROCEDURE_USAGE_EXAMPLES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,22 @@
*/
package io.trino.plugin.hive;

import com.amazonaws.services.s3.AmazonS3;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.PartitionWithStatistics;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.TestingMetastoreLocator;
import io.trino.plugin.hive.metastore.thrift.ThriftHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig;
import io.trino.plugin.hive.s3.S3HiveQueryRunner;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
Expand All @@ -24,6 +37,8 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
Expand All @@ -39,6 +54,7 @@ public abstract class BaseTestHiveOnDataLake

private String bucketName;
private HiveMinioDataLake dockerizedS3DataLake;
private HiveMetastore metastoreClient;

private final String hiveHadoopImage;

Expand All @@ -55,12 +71,30 @@ protected QueryRunner createQueryRunner()
this.dockerizedS3DataLake = closeAfterClass(
new HiveMinioDataLake(bucketName, ImmutableMap.of(), hiveHadoopImage));
this.dockerizedS3DataLake.start();
this.metastoreClient = new BridgingHiveMetastore(new ThriftHiveMetastore(
new TestingMetastoreLocator(
Optional.empty(),
this.dockerizedS3DataLake.getHiveHadoop().getHiveMetastoreEndpoint()),
new HiveConfig(),
new MetastoreConfig(),
new ThriftMetastoreConfig(),
new HdfsEnvironment(new HiveHdfsConfiguration(
new HdfsConfigurationInitializer(
new HdfsConfig(),
ImmutableSet.of()),
ImmutableSet.of()),
new HdfsConfig(),
new NoHdfsAuthentication()),
false));
return S3HiveQueryRunner.create(
dockerizedS3DataLake,
ImmutableMap.<String, String>builder()
// This is required when using MinIO which requires path style access
.put("hive.insert-existing-partitions-behavior", "OVERWRITE")
.put("hive.non-managed-table-writes-enabled", "true")
// Below are required to enable caching on metastore
.put("hive.metastore-cache-ttl", "1d")
.put("hive.metastore-refresh-interval", "1d")
.build());
}

Expand Down Expand Up @@ -160,6 +194,92 @@ public void testInsertOverwritePartitionedAndBucketedExternalTable()
assertOverwritePartition(externalTableName);
}

@Test
public void testFlushPartitionCache()
{
String tableName = "nation_" + randomTableSuffix();
String fullyQualifiedTestTableName = getTestTableName(tableName);
String partitionColumn = "regionkey";

// Create table with partition on regionkey
computeActual(getCreateTableStatement(
fullyQualifiedTestTableName,
format("partitioned_by=ARRAY['%s']", partitionColumn)));
copyTpchNationToTable(fullyQualifiedTestTableName);

String queryUsingPartitionCacheTemplate = "SELECT name FROM %s WHERE %s=%s";
String partitionValue1 = "0";
String queryUsingPartitionCacheForValue1 = format(queryUsingPartitionCacheTemplate, fullyQualifiedTestTableName, partitionColumn, partitionValue1);
String expectedQueryResultForValue1 = "VALUES 'ALGERIA', 'MOROCCO', 'MOZAMBIQUE', 'ETHIOPIA', 'KENYA'";
String partitionValue2 = "1";
String queryUsingPartitionCacheForValue2 = format(queryUsingPartitionCacheTemplate, fullyQualifiedTestTableName, partitionColumn, partitionValue2);
String expectedQueryResultForValue2 = "VALUES 'ARGENTINA', 'BRAZIL', 'CANADA', 'PERU', 'UNITED STATES'";

// Fill partition cache and check we got expected results
assertQuery(queryUsingPartitionCacheForValue1, expectedQueryResultForValue1);
assertQuery(queryUsingPartitionCacheForValue2, expectedQueryResultForValue2);

// Copy partition to new location and update metadata outside Trino
renamePartitionResourcesOutsideTrino(tableName, partitionColumn, partitionValue1);
renamePartitionResourcesOutsideTrino(tableName, partitionColumn, partitionValue2);

// Should return 0 rows as we moved partition and cache is outdated. We use nonexistent partition
assertQueryReturnsEmptyResult(queryUsingPartitionCacheForValue1);
assertQueryReturnsEmptyResult(queryUsingPartitionCacheForValue2);

// Refresh cache for schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column =>
getQueryRunner().execute(format(
"CALL system.flush_metadata_cache(schema_name => '%s', table_name => '%s', partition_column => ARRAY['%s'], partition_value => ARRAY['%s'])",
HIVE_TEST_SCHEMA,
tableName,
partitionColumn,
partitionValue1));

// Should return expected rows as we refresh cache
assertQuery(queryUsingPartitionCacheForValue1, expectedQueryResultForValue1);
// Should return 0 rows as we left cache untouched
assertQueryReturnsEmptyResult(queryUsingPartitionCacheForValue2);

computeActual(format("DROP TABLE %s", fullyQualifiedTestTableName));
}

private void renamePartitionResourcesOutsideTrino(String tableName, String partitionColumn, String regionKey)
{
String partitionName = format("%s=%s", partitionColumn, regionKey);
String partitionS3KeyPrefix = format("%s/%s/%s", HIVE_TEST_SCHEMA, tableName, partitionName);
String renamedPartitionSuffix = "CP";

// Copy whole partition to new location
AmazonS3 amazonS3 = dockerizedS3DataLake.getS3Client();
amazonS3.listObjects(bucketName)
.getObjectSummaries()
.forEach(object -> {
String objectKey = object.getKey();
if (objectKey.startsWith(partitionS3KeyPrefix)) {
String fileName = objectKey.substring(objectKey.lastIndexOf('/'));
String destinationKey = partitionS3KeyPrefix + renamedPartitionSuffix + fileName;
amazonS3.copyObject(bucketName, objectKey, bucketName, destinationKey);
}
});

// Delete old partition and update metadata to point to location of new copy
HiveIdentity hiveIdentity = HiveIdentity.none();
Table hiveTable = metastoreClient.getTable(hiveIdentity, HIVE_TEST_SCHEMA, tableName).get();
Partition hivePartition = metastoreClient.getPartition(hiveIdentity, hiveTable, List.of(regionKey)).get();
Map<String, PartitionStatistics> partitionStatistics =
metastoreClient.getPartitionStatistics(hiveIdentity, hiveTable, List.of(hivePartition));

metastoreClient.dropPartition(hiveIdentity, HIVE_TEST_SCHEMA, tableName, List.of(regionKey), true);
metastoreClient.addPartitions(hiveIdentity, HIVE_TEST_SCHEMA, tableName, List.of(
new PartitionWithStatistics(
Partition.builder(hivePartition)
.withStorage(builder -> builder.setLocation(
hivePartition.getStorage().getLocation() + renamedPartitionSuffix))
.build(),
partitionName,
partitionStatistics.get(partitionName))));
}

protected void assertInsertFailure(String testTable, String expectedMessageRegExp)
{
assertInsertFailure(getSession(), testTable, expectedMessageRegExp);
Expand Down Expand Up @@ -208,7 +328,12 @@ protected void assertOverwritePartition(String testTable)

protected String getTestTableName()
{
return format("hive.%s.%s", HIVE_TEST_SCHEMA, "nation_" + randomTableSuffix());
return getTestTableName("nation_" + randomTableSuffix());
}

protected String getTestTableName(String tableName)
{
return format("hive.%s.%s", HIVE_TEST_SCHEMA, tableName);
}

protected String getCreateTableStatement(String tableName, String... propertiesEntries)
Expand Down
Loading

0 comments on commit 6529d7b

Please sign in to comment.