Skip to content

Commit

Permalink
Close thrift metastore client in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed May 15, 2024
1 parent 870c0bb commit dfbda86
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected QueryRunner createQueryRunner()
this.metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));

QueryRunner queryRunner = createDeltaLakeQueryRunner();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected QueryRunner createQueryRunner()
metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));

return DeltaLakeQueryRunner.builder("default")
.addMetastoreProperties(hiveMinioDataLake.getHiveHadoop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static class Builder<SELF extends Builder<?>>
private ImmutableMap.Builder<String, String> hiveProperties = ImmutableMap.builder();
private List<TpchTable<?>> initialTables = ImmutableList.of();
private Optional<String> initialSchemasLocationBase = Optional.empty();
private Optional<Function<QueryRunner, HiveMetastore>> metastore = Optional.empty();
private Optional<Function<DistributedQueryRunner, HiveMetastore>> metastore = Optional.empty();
private boolean tpcdsCatalogEnabled;
private boolean tpchBucketedCatalogEnabled;
private boolean createTpchSchemas = true;
Expand Down Expand Up @@ -153,7 +153,7 @@ public SELF setInitialSchemasLocationBase(String initialSchemasLocationBase)
}

@CanIgnoreReturnValue
public SELF setMetastore(Function<QueryRunner, HiveMetastore> metastore)
public SELF setMetastore(Function<DistributedQueryRunner, HiveMetastore> metastore)
{
this.metastore = Optional.of(metastore);
return self();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected QueryRunner createQueryRunner()
this.metastoreClient = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(this.hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return S3HiveQueryRunner.builder(hiveMinioDataLake)
.addExtraProperty("sql.path", "hive.functions")
.addExtraProperty("sql.default-function-catalog", "hive")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.base.security.UserNameProvider.SIMPLE_USER_NAME_PROVIDER;
Expand Down Expand Up @@ -95,15 +97,17 @@ public TestingThriftHiveMetastoreBuilder fileSystemFactory(TrinoFileSystemFactor
return this;
}

public ThriftMetastore build()
public ThriftMetastore build(Consumer<AutoCloseable> registerResource)
{
checkState(tokenAwareMetastoreClientFactory != null, "metastore client not set");
ExecutorService executorService = newFixedThreadPool(thriftMetastoreConfig.getWriteStatisticsThreads());
registerResource.accept(executorService);
ThriftHiveMetastoreFactory metastoreFactory = new ThriftHiveMetastoreFactory(
new UgiBasedMetastoreClientFactory(tokenAwareMetastoreClientFactory, SIMPLE_USER_NAME_PROVIDER, thriftMetastoreConfig),
new HiveMetastoreConfig().isHideDeltaLakeTables(),
thriftMetastoreConfig,
fileSystemFactory,
newFixedThreadPool(thriftMetastoreConfig.getWriteStatisticsThreads()));
executorService);
return metastoreFactory.createMetastore(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.metastore;

import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.SchemaAlreadyExistsException;
import io.trino.plugin.hive.TableAlreadyExistsException;
import io.trino.plugin.hive.containers.HiveHadoop;
Expand Down Expand Up @@ -47,12 +48,12 @@
final class TestBridgingHiveMetastore
extends AbstractTestHiveMetastore
{
private final HiveHadoop hiveHadoop;
private final AutoCloseableCloser closer = AutoCloseableCloser.create();
private final HiveMetastore metastore;

TestBridgingHiveMetastore()
{
hiveHadoop = HiveHadoop.builder().build();
HiveHadoop hiveHadoop = closer.register(HiveHadoop.builder().build());
hiveHadoop.start();

MetastoreClientAdapterProvider metastoreClientAdapterProvider = delegate -> newProxy(ThriftMetastoreClient.class, (proxy, method, methodArgs) -> {
Expand All @@ -72,13 +73,14 @@ final class TestBridgingHiveMetastore
metastore = new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint(), metastoreClientAdapterProvider)
.thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true))
.build());
.build(closer::register));
}

@AfterAll
void afterAll()
throws Exception
{
hiveHadoop.stop();
closer.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.hive.thrift.metastore.ColumnStatisticsData;
import io.trino.hive.thrift.metastore.ColumnStatisticsObj;
import io.trino.hive.thrift.metastore.LongColumnStatsData;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.HiveBasicStatistics;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveMetastoreClosure;
Expand Down Expand Up @@ -118,6 +119,7 @@ public class TestCachingHiveMetastore
private static final TableInfo TEST_TABLE_INFO = new TableInfo(TEST_SCHEMA_TABLE, TableInfo.ExtendedRelationType.TABLE);
private static final Duration CACHE_TTL = new Duration(5, TimeUnit.MINUTES);

private AutoCloseableCloser closer;
private MockThriftMetastoreClient mockClient;
private ThriftMetastore thriftHiveMetastore;
private ListeningExecutorService executor;
Expand All @@ -128,6 +130,7 @@ public class TestCachingHiveMetastore
@BeforeEach
public void setUp()
{
closer = AutoCloseableCloser.create();
mockClient = new MockThriftMetastoreClient();
thriftHiveMetastore = createThriftHiveMetastore();
executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed(getClass().getSimpleName() + "-%s")));
Expand All @@ -140,22 +143,25 @@ public void setUp()

@AfterAll
public void tearDown()
throws Exception
{
executor.shutdownNow();
executor = null;
metastore = null;
closer.close();
closer = null;
}

private ThriftMetastore createThriftHiveMetastore()
{
return createThriftHiveMetastore(mockClient);
}

private static ThriftMetastore createThriftHiveMetastore(ThriftMetastoreClient client)
private ThriftMetastore createThriftHiveMetastore(ThriftMetastoreClient client)
{
return testingThriftHiveMetastoreBuilder()
.metastoreClient(client)
.build();
.build(closer::register);
}

@Test
Expand Down Expand Up @@ -1050,7 +1056,7 @@ private PartitionCachingAssertions assertThatCachingWithDisabledPartitionCache()
return new PartitionCachingAssertions(executor);
}

static class PartitionCachingAssertions
class PartitionCachingAssertions
{
private final CachingHiveMetastore cachingHiveMetastore;
private final MockThriftMetastoreClient thriftClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.opentelemetry.api.OpenTelemetry;
import io.trino.hive.thrift.metastore.Database;
import io.trino.hive.thrift.metastore.NoSuchObjectException;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.testing.TestingNodeManager;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.http.HttpHeaders;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand All @@ -38,6 +40,7 @@

public class TestThriftHttpMetastoreClient
{
private static final AutoCloseableCloser closer = AutoCloseableCloser.create();
private static ThriftMetastore delegate;

@BeforeAll
Expand All @@ -46,7 +49,14 @@ public static void setup()
{
File tempDir = Files.createTempDirectory(null).toFile();
tempDir.deleteOnExit();
delegate = testingThriftHiveMetastoreBuilder().metastoreClient(createFakeMetastoreClient()).build();
delegate = testingThriftHiveMetastoreBuilder().metastoreClient(createFakeMetastoreClient()).build(closer::register);
}

@AfterAll
public static void tearDown()
throws Exception
{
closer.close();
}

private static ThriftMetastoreClient createFakeMetastoreClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive.metastore.thrift;

import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.TableInfo;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -38,7 +39,7 @@ void test()
String databricksHost = requireNonNull(System.getenv("DATABRICKS_HOST"), "Environment variable not set: DATABRICKS_HOST");
String databricksToken = requireNonNull(System.getenv("DATABRICKS_TOKEN"), "Environment variable not set: DATABRICKS_TOKEN");
String databricksCatalogName = requireNonNull(System.getenv("DATABRICKS_UNITY_CATALOG_NAME"), "Environment variable not set: DATABRICKS_UNITY_CATALOG_NAME");
URI metastoreUri = URI.create("https://%s:443/api/2.0/unity-hms-proxy/metadata".formatted(databricksHost));
URI metastoreUri = URI.create("https://%s:443/api/2.0/unity-hms-proxy/metadata" .formatted(databricksHost));

ThriftHttpMetastoreConfig config = new ThriftHttpMetastoreConfig()
.setAuthenticationMode(BEARER)
Expand All @@ -47,18 +48,20 @@ void test()
ThriftMetastoreClient client = ((ThriftMetastoreClientFactory) new HttpThriftMetastoreClientFactory(config, new TestingNodeManager(), OpenTelemetry.noop()))
.create(metastoreUri, Optional.empty());

HiveMetastore metastore = new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(client)
.thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true))
.build());
try (var closer = AutoCloseableCloser.create()) {
HiveMetastore metastore = new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(client)
.thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true))
.build(closer::register));

List<TableInfo> tables = metastore.getAllDatabases().stream()
.map(metastore::getTables)
.flatMap(List::stream)
.toList();
assertThat(tables).isNotEmpty();
List<TableInfo> tables = metastore.getAllDatabases().stream()
.map(metastore::getTables)
.flatMap(List::stream)
.toList();
assertThat(tables).isNotEmpty();

SchemaTableName schemaTableName = tables.getFirst().tableName();
assertThat(metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())).isPresent();
SchemaTableName schemaTableName = tables.getFirst().tableName();
assertThat(metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())).isPresent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public DistributedQueryRunner build()
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMetastoreEndpoint, thriftMetastoreTimeout)
.thriftMetastoreConfig(thriftMetastoreConfig)
.build()));
.build(distributedQueryRunner::registerResource)));
setInitialSchemasLocationBase("s3a://" + bucketName); // cannot use s3:// as Hive metastore is not configured to accept it
return super.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected void dropTableFromMetastore(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
metastore.dropTable(schemaName, tableName, false);
assertThat(metastore.getTable(schemaName, tableName)).isEmpty();
}
Expand All @@ -244,7 +244,7 @@ protected String getMetadataLocation(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return metastore
.getTable(schemaName, tableName).orElseThrow()
.getParameters().get("metadata_location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ protected void dropTableFromMetastore(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
metastore.dropTable(schemaName, tableName, false);
assertThat(metastore.getTable(schemaName, tableName)).isEmpty();
}
Expand All @@ -134,7 +134,7 @@ protected String getMetadataLocation(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return metastore
.getTable(schemaName, tableName).orElseThrow()
.getParameters().get("metadata_location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void dropTableFromMetastore(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
metastore.dropTable(schema, tableName, false);
assertThat(metastore.getTable(schema, tableName)).isEmpty();
}
Expand All @@ -186,7 +186,7 @@ protected String getMetadataLocation(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return metastore
.getTable(schema, tableName).orElseThrow()
.getParameters().get("metadata_location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.hdfs.authentication.NoHdfsAuthentication;
import io.trino.hdfs.s3.HiveS3Config;
import io.trino.hdfs.s3.TrinoS3ConfigurationInitializer;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.TrinoViewHiveMetastore;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
Expand Down Expand Up @@ -66,24 +67,23 @@ public class TestTrinoHiveCatalogWithHiveMetastore
{
private static final String bucketName = "test-hive-catalog-with-hms-" + randomNameSuffix();

private AutoCloseableCloser closer = AutoCloseableCloser.create();
// Use MinIO for storage, since HDFS is hard to get working in a unit test
private HiveMinioDataLake dataLake;

@BeforeAll
public void setUp()
{
dataLake = new HiveMinioDataLake(bucketName, HIVE3_IMAGE);
dataLake = closer.register(new HiveMinioDataLake(bucketName, HIVE3_IMAGE));
dataLake.start();
}

@AfterAll
public void tearDown()
throws Exception
{
if (dataLake != null) {
dataLake.stop();
dataLake = null;
}
dataLake = null;
closer.close();
}

@Override
Expand All @@ -108,7 +108,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
// Read timed out sometimes happens with the default timeout
.setReadTimeout(new Duration(1, MINUTES)))
.metastoreClient(dataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build();
.build(closer::register);
CachingHiveMetastore metastore = createPerTransactionCache(new BridgingHiveMetastore(thriftMetastore), 1000);
return new TrinoHiveCatalog(
new CatalogName("catalog"),
Expand Down
Loading

0 comments on commit dfbda86

Please sign in to comment.