Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend metadata cache flush procedure to flush specific caches #10385

Merged
merged 3 commits into from
Jan 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,12 @@ Procedures

* ``system.flush_metadata_cache()``

Flush Hive metadata caches.
Flush all Hive metadata caches.

* ``system.flush_metadata_cache(schema_name => ..., table_name => ..., partition_columns => ARRAY[...], partition_values => ARRAY[...])``

Flush Hive metadata cache entries connected with selected partition.
Procedure requires named parameters to be passed

Special columns
---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ public void testFlushHiveMetastoreCacheProcedureCallable()
queryRunner.execute(renamedColumnQuery);
}

@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestCachingHiveMetastoreWithQueryRunner -- this is an odd name for a test class.

It's named like this to differentiate from a unit test TestCachingHiveMetastore, and so it should be called TestCachingHiveMetastoreQueries (alas, this already exists! let's merge the classes -- as a follow-up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could merge them there are some differences in QueryRunner setup, but i think we can manage to adjust.

public void testIllegalFlushHiveMetastoreCacheProcedureCalls()
{
var illegalParameterMessage = "Illegal parameter set passed. ";
var validUsageExample = "Valid usages:\n - 'flush_metadata_cache()'\n - flush_metadata_cache(schema_name => ..., table_name => ..., partition_column => ARRAY['...'], partition_value => ARRAY['...'])";

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache('dummy_schema')"))
.hasMessage("Procedure should only be invoked with named parameters. " + validUsageExample);

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema')"))
.hasMessage(illegalParameterMessage + validUsageExample);
assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table')"))
.hasMessage(illegalParameterMessage + validUsageExample);

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column => ARRAY['dummy_partition'])"))
.hasMessage("Parameters partition_column and partition_value should have same length");
}

@DataProvider
public Object[][] testCacheRefreshOnRoleGrantAndRevokeParams()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -91,6 +92,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;

/**
* Hive Metastore Cache
Expand Down Expand Up @@ -255,6 +257,17 @@ public void flushCache()
rolesCache.invalidateAll();
}

public void flushPartitionCache(String schemaName, String tableName, List<String> partitionColumns, List<String> partitionValues)
{
requireNonNull(schemaName, "schemaName is null");
requireNonNull(tableName, "tableName is null");
requireNonNull(partitionColumns, "partitionColumns is null");
requireNonNull(partitionValues, "partitionValues is null");

String providedPartitionName = makePartName(partitionColumns, partitionValues);
invalidatePartitionCache(schemaName, tableName, partitionNameToCheck -> partitionNameToCheck.map(value -> value.equals(providedPartitionName)).orElse(false));
}

private static <K, V> V get(LoadingCache<K, V> cache, K key)
{
try {
Expand Down Expand Up @@ -875,16 +888,26 @@ private Set<RoleGrant> loadPrincipals(String role)
}

private void invalidatePartitionCache(String databaseName, String tableName)
{
invalidatePartitionCache(databaseName, tableName, partitionName -> true);
}

private void invalidatePartitionCache(String databaseName, String tableName, Predicate<Optional<String>> partitionPredicate)
{
HiveTableName hiveTableName = hiveTableName(databaseName, tableName);

Predicate<WithIdentity<HivePartitionName>> hivePartitionPredicate =
partitionName -> partitionName.getKey().getHiveTableName().equals(hiveTableName) &&
partitionPredicate.test(partitionName.getKey().getPartitionName());

partitionCache.asMap().keySet().stream()
.filter(partitionName -> partitionName.getKey().getHiveTableName().equals(hiveTableName))
.filter(hivePartitionPredicate)
.forEach(partitionCache::invalidate);
partitionFilterCache.asMap().keySet().stream()
.filter(partitionFilter -> partitionFilter.getKey().getHiveTableName().equals(hiveTableName))
.forEach(partitionFilterCache::invalidate);
partitionStatisticsCache.asMap().keySet().stream()
.filter(partitionFilter -> partitionFilter.getKey().getHiveTableName().equals(hiveTableName))
.filter(hivePartitionPredicate)
.forEach(partitionStatisticsCache::invalidate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,52 @@
import io.trino.plugin.hive.HiveErrorCode;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.type.ArrayType;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.spi.block.MethodHandleUtil.methodHandle;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class FlushHiveMetastoreCacheProcedure
implements Provider<Procedure>
{
private static final String PROCEDURE_NAME = "flush_metadata_cache";

private static final String PARAM_SCHEMA_NAME = "schema_name";
private static final String PARAM_TABLE_NAME = "table_name";
private static final String PARAM_PARTITION_COLUMN = "partition_column";
private static final String PARAM_PARTITION_VALUE = "partition_value";

private static final String PROCEDURE_USAGE_EXAMPLES = format(
"Valid usages:%n" +
" - '%1$s()'%n" +
" - %1$s(%2$s => ..., %3$s => ..., %4$s => ARRAY['...'], %5$s => ARRAY['...'])",
PROCEDURE_NAME,
PARAM_SCHEMA_NAME,
PARAM_TABLE_NAME,
PARAM_PARTITION_COLUMN,
PARAM_PARTITION_VALUE);

private static final MethodHandle FLUSH_HIVE_METASTORE_CACHE = methodHandle(
FlushHiveMetastoreCacheProcedure.class,
"flushMetadataCache");
"flushMetadataCache",
String.class,
String.class,
String.class,
List.class,
List.class);
private static final String FAKE_PARAM_DEFAULT_VALUE = "procedure should only be invoked with named parameters";

private final Optional<CachingHiveMetastore> cachingHiveMetastore;

Expand All @@ -48,15 +77,47 @@ public Procedure get()
{
return new Procedure(
"system",
"flush_metadata_cache",
ImmutableList.of(),
PROCEDURE_NAME,
ImmutableList.of(
findepi marked this conversation as resolved.
Show resolved Hide resolved
new Procedure.Argument("$fake_first_parameter", VARCHAR, false, FAKE_PARAM_DEFAULT_VALUE),
new Procedure.Argument(PARAM_SCHEMA_NAME, VARCHAR, false, null),
new Procedure.Argument(PARAM_TABLE_NAME, VARCHAR, false, null),
new Procedure.Argument(PARAM_PARTITION_COLUMN, new ArrayType(VARCHAR), false, null),
new Procedure.Argument(PARAM_PARTITION_VALUE, new ArrayType(VARCHAR), false, null)),
FLUSH_HIVE_METASTORE_CACHE.bindTo(this));
}

public void flushMetadataCache()
public void flushMetadataCache(String fakeParam, String schemaName, String tableName, List<String> partitionColumn, List<String> partitionValue)
{
checkState(FAKE_PARAM_DEFAULT_VALUE.equals(fakeParam), "Procedure should only be invoked with named parameters. " + PROCEDURE_USAGE_EXAMPLES);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doFlushMetadataCache(
Optional.ofNullable(schemaName),
Optional.ofNullable(tableName),
Optional.ofNullable(partitionColumn).orElse(ImmutableList.of()),
Optional.ofNullable(partitionValue).orElse(ImmutableList.of()));
}
}

private void doFlushMetadataCache(Optional<String> schemaName, Optional<String> tableName, List<String> partitionColumns, List<String> partitionValues)
{
cachingHiveMetastore
.orElseThrow(() -> new TrinoException(HiveErrorCode.HIVE_METASTORE_ERROR, "Cannot flush, metastore cache is not enabled"))
.flushCache();
CachingHiveMetastore cachingHiveMetastore = this.cachingHiveMetastore
.orElseThrow(() -> new TrinoException(HiveErrorCode.HIVE_METASTORE_ERROR, "Cannot flush, metastore cache is not enabled"));

checkState(
partitionColumns.size() == partitionValues.size(),
"Parameters partition_column and partition_value should have same length");

if (schemaName.isEmpty() && tableName.isEmpty() && partitionColumns.isEmpty()) {
cachingHiveMetastore.flushCache();
}
else if (schemaName.isPresent() && tableName.isPresent() && !partitionColumns.isEmpty()) {
cachingHiveMetastore.flushPartitionCache(schemaName.get(), tableName.get(), partitionColumns, partitionValues);
}
else {
throw new TrinoException(
HiveErrorCode.HIVE_METASTORE_ERROR,
"Illegal parameter set passed. " + PROCEDURE_USAGE_EXAMPLES);
}
}
}
Loading