Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge redundant hive cache tests classes #10645

Merged
merged 3 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public static Builder<Builder<?>> builder()
return new Builder<>();
}

public static Builder<Builder<?>> builder(Session defaultSession)
{
return new Builder<>(defaultSession);
}

public static class Builder<SELF extends Builder<?>>
extends DistributedQueryRunner.Builder<SELF>
{
Expand All @@ -107,7 +112,12 @@ public static class Builder<SELF extends Builder<?>>

protected Builder()
{
super(createSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin")))));
this(createSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin")))));
}

protected Builder(Session defaultSession)
{
super(defaultSession);
}

public SELF setSkipTimezoneSetup(boolean skipTimezoneSetup)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,73 +13,74 @@
*/
package io.trino.plugin.hive.metastore.cache;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.hive.HivePlugin;
import io.trino.plugin.hive.HiveQueryRunner;
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
import io.trino.spi.security.Identity;
import io.trino.spi.security.SelectedRole;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import io.trino.testing.QueryRunner;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Lists.cartesianProduct;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.authentication.HiveIdentity.none;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.spi.security.SelectedRole.Type.ROLE;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.nio.file.Files.createTempDirectory;
import static java.util.Collections.nCopies;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@Test(singleThreaded = true)
public class TestCachingHiveMetastoreWithQueryRunner
extends AbstractTestQueryFramework
{
private static final String CATALOG = "test";
private static final String CATALOG = HiveQueryRunner.HIVE_CATALOG;
private static final String SCHEMA = "test";
private static final Session ADMIN = getTestSession(Identity.forUser("admin")
.withConnectorRole(CATALOG, new SelectedRole(ROLE, Optional.of("admin")))
.build());
private static final String ALICE_NAME = "alice";
private static final Session ALICE = getTestSession(new Identity.Builder(ALICE_NAME).build());

private DistributedQueryRunner queryRunner;
private Path temporaryMetastoreDirectory;
private FileHiveMetastore fileHiveMetastore;

@BeforeMethod
public void createQueryRunner()
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
queryRunner = DistributedQueryRunner
.builder(ADMIN)
.setNodeCount(1)
Path temporaryMetastoreDirectory = createTempDirectory(null);
closeAfterClass(() -> deleteRecursively(temporaryMetastoreDirectory, ALLOW_INSECURE));

DistributedQueryRunner queryRunner = HiveQueryRunner.builder(ADMIN)
.setNodeCount(3)
// Required by testPartitionAppend test.
// Coordinator needs to be excluded from workers to deterministically reproduce the original problem
// https://github.com/trinodb/trino/pull/6853
.setCoordinatorProperties(ImmutableMap.of("node-scheduler.include-coordinator", "false"))
.setMetastore(distributedQueryRunner -> fileHiveMetastore = createTestingFileHiveMetastore(temporaryMetastoreDirectory.toFile()))
.setHiveProperties(ImmutableMap.of(
"hive.security", "sql-standard",
"hive.metastore-cache-ttl", "60m",
"hive.metastore-refresh-interval", "10m"))
.build();
queryRunner.installPlugin(new HivePlugin());
temporaryMetastoreDirectory = createTempDirectory(null);
queryRunner.createCatalog(CATALOG, "hive", ImmutableMap.of(
"hive.metastore", "file",
"hive.metastore.catalog.dir", temporaryMetastoreDirectory.toUri().toString(),
"hive.security", "sql-standard",
"hive.metastore-cache-ttl", "60m",
"hive.metastore-refresh-interval", "10m"));

queryRunner.execute(ADMIN, "CREATE SCHEMA " + SCHEMA);
queryRunner.execute("CREATE TABLE test (test INT)");
}

@AfterMethod(alwaysRun = true)
public void cleanUp()
throws IOException
{
queryRunner.close();
deleteRecursively(temporaryMetastoreDirectory, ALLOW_INSECURE);
return queryRunner;
}

private static Session getTestSession(Identity identity)
Expand All @@ -94,46 +95,50 @@ private static Session getTestSession(Identity identity)
@Test
public void testCacheRefreshOnGrantAndRevoke()
{
assertThatThrownBy(() -> queryRunner.execute(ALICE, "SELECT * FROM test"))
assertThatThrownBy(() -> getQueryRunner().execute(ALICE, "SELECT * FROM test"))
aczajkowski marked this conversation as resolved.
Show resolved Hide resolved
.hasMessageContaining("Access Denied");
queryRunner.execute("GRANT SELECT ON test TO " + ALICE_NAME);
queryRunner.execute(ALICE, "SELECT * FROM test");
queryRunner.execute("REVOKE SELECT ON test FROM " + ALICE_NAME);
assertThatThrownBy(() -> queryRunner.execute(ALICE, "SELECT * FROM test"))
getQueryRunner().execute("GRANT SELECT ON test TO " + ALICE_NAME);
getQueryRunner().execute(ALICE, "SELECT * FROM test");
getQueryRunner().execute("REVOKE SELECT ON test FROM " + ALICE_NAME);
assertThatThrownBy(() -> getQueryRunner().execute(ALICE, "SELECT * FROM test"))
.hasMessageContaining("Access Denied");
}

@Test(dataProvider = "testCacheRefreshOnRoleGrantAndRevokeParams")
public void testCacheRefreshOnRoleGrantAndRevoke(List<String> grantRoleStatements, String revokeRoleStatement)
{
assertThatThrownBy(() -> queryRunner.execute(ALICE, "SELECT * FROM test"))
assertThatThrownBy(() -> getQueryRunner().execute(ALICE, "SELECT * FROM test"))
.hasMessageContaining("Access Denied");
queryRunner.execute("CREATE ROLE test_role IN " + CATALOG);
grantRoleStatements.forEach(queryRunner::execute);
queryRunner.execute(ALICE, "SELECT * FROM test");
queryRunner.execute(revokeRoleStatement);
assertThatThrownBy(() -> queryRunner.execute(ALICE, "SELECT * FROM test"))
getQueryRunner().execute("CREATE ROLE test_role IN " + CATALOG);
grantRoleStatements.forEach(getQueryRunner()::execute);
getQueryRunner().execute(ALICE, "SELECT * FROM test");
getQueryRunner().execute(revokeRoleStatement);
assertThatThrownBy(() -> getQueryRunner().execute(ALICE, "SELECT * FROM test"))
.hasMessageContaining("Access Denied");
// Cleanup
String removeByDropStatement = "DROP ROLE test_role IN " + CATALOG;
if (!revokeRoleStatement.equals(removeByDropStatement)) {
getQueryRunner().execute(removeByDropStatement);
}
}

@Test
public void testFlushHiveMetastoreCacheProcedureCallable()
{
queryRunner.execute("CREATE TABLE cached (initial varchar)");
queryRunner.execute("SELECT initial FROM cached");
getQueryRunner().execute("CREATE TABLE cached (initial varchar)");
getQueryRunner().execute("SELECT initial FROM cached");

// Rename column name in Metastore outside Trino
FileHiveMetastore fileHiveMetastore = FileHiveMetastore.createTestingFileHiveMetastore(temporaryMetastoreDirectory.toFile());
fileHiveMetastore.renameColumn(none(), "test", "cached", "initial", "renamed");

String renamedColumnQuery = "SELECT renamed FROM cached";
// Should fail as Trino has old metadata cached
assertThatThrownBy(() -> queryRunner.execute(renamedColumnQuery))
assertThatThrownBy(() -> getQueryRunner().execute(renamedColumnQuery))
.hasMessageMatching(".*Column 'renamed' cannot be resolved");

// Should success after flushing Trino JDBC metadata cache
queryRunner.execute("CALL system.flush_metadata_cache()");
queryRunner.execute(renamedColumnQuery);
getQueryRunner().execute("CALL system.flush_metadata_cache()");
getQueryRunner().execute(renamedColumnQuery);
}

@Test
Expand All @@ -142,18 +147,40 @@ public void testIllegalFlushHiveMetastoreCacheProcedureCalls()
var illegalParameterMessage = "Illegal parameter set passed. ";
var validUsageExample = "Valid usages:\n - 'flush_metadata_cache()'\n - flush_metadata_cache(schema_name => ..., table_name => ..., partition_column => ARRAY['...'], partition_value => ARRAY['...'])";

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache('dummy_schema')"))
assertThatThrownBy(() -> getQueryRunner().execute("CALL system.flush_metadata_cache('dummy_schema')"))
.hasMessage("Procedure should only be invoked with named parameters. " + validUsageExample);

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema')"))
assertThatThrownBy(() -> getQueryRunner().execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema')"))
.hasMessage(illegalParameterMessage + validUsageExample);
assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table')"))
assertThatThrownBy(() -> getQueryRunner().execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table')"))
.hasMessage(illegalParameterMessage + validUsageExample);

assertThatThrownBy(() -> queryRunner.execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column => ARRAY['dummy_partition'])"))
assertThatThrownBy(() -> getQueryRunner().execute("CALL system.flush_metadata_cache(schema_name => 'dummy_schema', table_name => 'dummy_table', partition_column => ARRAY['dummy_partition'])"))
.hasMessage("Parameters partition_column and partition_value should have same length");
}

@Test
public void testPartitionAppend()
{
int nodeCount = getQueryRunner().getNodeCount();
verify(nodeCount > 1, "this test requires a multinode query runner");
aczajkowski marked this conversation as resolved.
Show resolved Hide resolved

getQueryRunner().execute("CREATE TABLE test_part_append " +
"(name varchar, partkey varchar) " +
"WITH (partitioned_by = ARRAY['partkey'])");

String row = "('some name', 'part1')";

// if metastore caching was enabled on workers than any worker which tries to INSERT into same partition twice
// will fail because it would've cached the absence of the partition
for (int i = 0; i < nodeCount + 1; i++) {
aczajkowski marked this conversation as resolved.
Show resolved Hide resolved
getQueryRunner().execute("INSERT INTO test_part_append VALUES " + row);
}

String expected = Joiner.on(",").join(nCopies(nodeCount + 1, row));
assertQuery("SELECT * FROM test_part_append", "VALUES " + expected);
}

@DataProvider
public Object[][] testCacheRefreshOnRoleGrantAndRevokeParams()
{
Expand Down