Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close thrift metastore client in tests #21980

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected QueryRunner createQueryRunner()
this.metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));

QueryRunner queryRunner = createDeltaLakeQueryRunner();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected QueryRunner createQueryRunner()
metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));

return DeltaLakeQueryRunner.builder("default")
.addMetastoreProperties(hiveMinioDataLake.getHiveHadoop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static class Builder<SELF extends Builder<?>>
private ImmutableMap.Builder<String, String> hiveProperties = ImmutableMap.builder();
private List<TpchTable<?>> initialTables = ImmutableList.of();
private Optional<String> initialSchemasLocationBase = Optional.empty();
private Optional<Function<QueryRunner, HiveMetastore>> metastore = Optional.empty();
private Optional<Function<DistributedQueryRunner, HiveMetastore>> metastore = Optional.empty();
private boolean tpcdsCatalogEnabled;
private boolean tpchBucketedCatalogEnabled;
private boolean createTpchSchemas = true;
Expand Down Expand Up @@ -153,7 +153,7 @@ public SELF setInitialSchemasLocationBase(String initialSchemasLocationBase)
}

@CanIgnoreReturnValue
public SELF setMetastore(Function<QueryRunner, HiveMetastore> metastore)
public SELF setMetastore(Function<DistributedQueryRunner, HiveMetastore> metastore)
{
this.metastore = Optional.of(metastore);
return self();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected QueryRunner createQueryRunner()
this.metastoreClient = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(this.hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return S3HiveQueryRunner.builder(hiveMinioDataLake)
.addExtraProperty("sql.path", "hive.functions")
.addExtraProperty("sql.default-function-catalog", "hive")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.base.security.UserNameProvider.SIMPLE_USER_NAME_PROVIDER;
Expand Down Expand Up @@ -95,15 +97,17 @@ public TestingThriftHiveMetastoreBuilder fileSystemFactory(TrinoFileSystemFactor
return this;
}

public ThriftMetastore build()
public ThriftMetastore build(Consumer<AutoCloseable> registerResource)
{
checkState(tokenAwareMetastoreClientFactory != null, "metastore client not set");
ExecutorService executorService = newFixedThreadPool(thriftMetastoreConfig.getWriteStatisticsThreads());
registerResource.accept(executorService);
ThriftHiveMetastoreFactory metastoreFactory = new ThriftHiveMetastoreFactory(
new UgiBasedMetastoreClientFactory(tokenAwareMetastoreClientFactory, SIMPLE_USER_NAME_PROVIDER, thriftMetastoreConfig),
new HiveMetastoreConfig().isHideDeltaLakeTables(),
thriftMetastoreConfig,
fileSystemFactory,
newFixedThreadPool(thriftMetastoreConfig.getWriteStatisticsThreads()));
executorService);
return metastoreFactory.createMetastore(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.metastore;

import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.SchemaAlreadyExistsException;
import io.trino.plugin.hive.TableAlreadyExistsException;
import io.trino.plugin.hive.containers.HiveHadoop;
Expand Down Expand Up @@ -47,12 +48,12 @@
final class TestBridgingHiveMetastore
extends AbstractTestHiveMetastore
{
private final HiveHadoop hiveHadoop;
private final AutoCloseableCloser closer = AutoCloseableCloser.create();
private final HiveMetastore metastore;

TestBridgingHiveMetastore()
{
hiveHadoop = HiveHadoop.builder().build();
HiveHadoop hiveHadoop = closer.register(HiveHadoop.builder().build());
hiveHadoop.start();

MetastoreClientAdapterProvider metastoreClientAdapterProvider = delegate -> newProxy(ThriftMetastoreClient.class, (proxy, method, methodArgs) -> {
Expand All @@ -72,13 +73,14 @@ final class TestBridgingHiveMetastore
metastore = new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint(), metastoreClientAdapterProvider)
.thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true))
.build());
.build(closer::register));
}

@AfterAll
void afterAll()
throws Exception
{
hiveHadoop.stop();
closer.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.hive.thrift.metastore.ColumnStatisticsData;
import io.trino.hive.thrift.metastore.ColumnStatisticsObj;
import io.trino.hive.thrift.metastore.LongColumnStatsData;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.HiveBasicStatistics;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveMetastoreClosure;
Expand Down Expand Up @@ -118,6 +119,7 @@ public class TestCachingHiveMetastore
private static final TableInfo TEST_TABLE_INFO = new TableInfo(TEST_SCHEMA_TABLE, TableInfo.ExtendedRelationType.TABLE);
private static final Duration CACHE_TTL = new Duration(5, TimeUnit.MINUTES);

private AutoCloseableCloser closer;
private MockThriftMetastoreClient mockClient;
private ThriftMetastore thriftHiveMetastore;
private ListeningExecutorService executor;
Expand All @@ -128,6 +130,7 @@ public class TestCachingHiveMetastore
@BeforeEach
public void setUp()
{
closer = AutoCloseableCloser.create();
mockClient = new MockThriftMetastoreClient();
thriftHiveMetastore = createThriftHiveMetastore();
executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed(getClass().getSimpleName() + "-%s")));
Expand All @@ -140,22 +143,25 @@ public void setUp()

@AfterAll
public void tearDown()
throws Exception
{
executor.shutdownNow();
executor = null;
metastore = null;
closer.close();
closer = null;
}

private ThriftMetastore createThriftHiveMetastore()
{
return createThriftHiveMetastore(mockClient);
}

private static ThriftMetastore createThriftHiveMetastore(ThriftMetastoreClient client)
private ThriftMetastore createThriftHiveMetastore(ThriftMetastoreClient client)
{
return testingThriftHiveMetastoreBuilder()
.metastoreClient(client)
.build();
.build(closer::register);
}

@Test
Expand Down Expand Up @@ -1050,7 +1056,7 @@ private PartitionCachingAssertions assertThatCachingWithDisabledPartitionCache()
return new PartitionCachingAssertions(executor);
}

static class PartitionCachingAssertions
class PartitionCachingAssertions
{
private final CachingHiveMetastore cachingHiveMetastore;
private final MockThriftMetastoreClient thriftClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.opentelemetry.api.OpenTelemetry;
import io.trino.hive.thrift.metastore.Database;
import io.trino.hive.thrift.metastore.NoSuchObjectException;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.testing.TestingNodeManager;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.http.HttpHeaders;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand All @@ -38,6 +40,7 @@

public class TestThriftHttpMetastoreClient
{
private static final AutoCloseableCloser closer = AutoCloseableCloser.create();
private static ThriftMetastore delegate;

@BeforeAll
Expand All @@ -46,7 +49,14 @@ public static void setup()
{
File tempDir = Files.createTempDirectory(null).toFile();
tempDir.deleteOnExit();
delegate = testingThriftHiveMetastoreBuilder().metastoreClient(createFakeMetastoreClient()).build();
delegate = testingThriftHiveMetastoreBuilder().metastoreClient(createFakeMetastoreClient()).build(closer::register);
}

@AfterAll
public static void tearDown()
throws Exception
{
closer.close();
}

private static ThriftMetastoreClient createFakeMetastoreClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive.metastore.thrift;

import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.TableInfo;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -38,7 +39,7 @@ void test()
String databricksHost = requireNonNull(System.getenv("DATABRICKS_HOST"), "Environment variable not set: DATABRICKS_HOST");
String databricksToken = requireNonNull(System.getenv("DATABRICKS_TOKEN"), "Environment variable not set: DATABRICKS_TOKEN");
String databricksCatalogName = requireNonNull(System.getenv("DATABRICKS_UNITY_CATALOG_NAME"), "Environment variable not set: DATABRICKS_UNITY_CATALOG_NAME");
URI metastoreUri = URI.create("https://%s:443/api/2.0/unity-hms-proxy/metadata".formatted(databricksHost));
URI metastoreUri = URI.create("https://%s:443/api/2.0/unity-hms-proxy/metadata" .formatted(databricksHost));

ThriftHttpMetastoreConfig config = new ThriftHttpMetastoreConfig()
.setAuthenticationMode(BEARER)
Expand All @@ -47,18 +48,20 @@ void test()
ThriftMetastoreClient client = ((ThriftMetastoreClientFactory) new HttpThriftMetastoreClientFactory(config, new TestingNodeManager(), OpenTelemetry.noop()))
.create(metastoreUri, Optional.empty());

HiveMetastore metastore = new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(client)
.thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true))
.build());
try (var closer = AutoCloseableCloser.create()) {
HiveMetastore metastore = new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(client)
.thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true))
.build(closer::register));

List<TableInfo> tables = metastore.getAllDatabases().stream()
.map(metastore::getTables)
.flatMap(List::stream)
.toList();
assertThat(tables).isNotEmpty();
List<TableInfo> tables = metastore.getAllDatabases().stream()
.map(metastore::getTables)
.flatMap(List::stream)
.toList();
assertThat(tables).isNotEmpty();

SchemaTableName schemaTableName = tables.getFirst().tableName();
assertThat(metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())).isPresent();
SchemaTableName schemaTableName = tables.getFirst().tableName();
assertThat(metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())).isPresent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public DistributedQueryRunner build()
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMetastoreEndpoint, thriftMetastoreTimeout)
.thriftMetastoreConfig(thriftMetastoreConfig)
.build()));
.build(distributedQueryRunner::registerResource)));
setInitialSchemasLocationBase("s3a://" + bucketName); // cannot use s3:// as Hive metastore is not configured to accept it
return super.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected void dropTableFromMetastore(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
metastore.dropTable(schemaName, tableName, false);
assertThat(metastore.getTable(schemaName, tableName)).isEmpty();
}
Expand All @@ -244,7 +244,7 @@ protected String getMetadataLocation(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return metastore
.getTable(schemaName, tableName).orElseThrow()
.getParameters().get("metadata_location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ protected void dropTableFromMetastore(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
metastore.dropTable(schemaName, tableName, false);
assertThat(metastore.getTable(schemaName, tableName)).isEmpty();
}
Expand All @@ -134,7 +134,7 @@ protected String getMetadataLocation(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return metastore
.getTable(schemaName, tableName).orElseThrow()
.getParameters().get("metadata_location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void dropTableFromMetastore(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
metastore.dropTable(schema, tableName, false);
assertThat(metastore.getTable(schema, tableName)).isEmpty();
}
Expand All @@ -186,7 +186,7 @@ protected String getMetadataLocation(String tableName)
HiveMetastore metastore = new BridgingHiveMetastore(
testingThriftHiveMetastoreBuilder()
.metastoreClient(hiveHadoop.getHiveMetastoreEndpoint())
.build());
.build(this::closeAfterClass));
return metastore
.getTable(schema, tableName).orElseThrow()
.getParameters().get("metadata_location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.hdfs.authentication.NoHdfsAuthentication;
import io.trino.hdfs.s3.HiveS3Config;
import io.trino.hdfs.s3.TrinoS3ConfigurationInitializer;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.plugin.hive.TrinoViewHiveMetastore;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
Expand Down Expand Up @@ -66,24 +67,23 @@ public class TestTrinoHiveCatalogWithHiveMetastore
{
private static final String bucketName = "test-hive-catalog-with-hms-" + randomNameSuffix();

private AutoCloseableCloser closer = AutoCloseableCloser.create();
// Use MinIO for storage, since HDFS is hard to get working in a unit test
private HiveMinioDataLake dataLake;

@BeforeAll
public void setUp()
{
dataLake = new HiveMinioDataLake(bucketName, HIVE3_IMAGE);
dataLake = closer.register(new HiveMinioDataLake(bucketName, HIVE3_IMAGE));
dataLake.start();
}

@AfterAll
public void tearDown()
throws Exception
{
if (dataLake != null) {
dataLake.stop();
dataLake = null;
}
dataLake = null;
closer.close();
}

@Override
Expand All @@ -108,7 +108,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
// Read timed out sometimes happens with the default timeout
.setReadTimeout(new Duration(1, MINUTES)))
.metastoreClient(dataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build();
.build(closer::register);
CachingHiveMetastore metastore = createPerTransactionCache(new BridgingHiveMetastore(thriftMetastore), 1000);
return new TrinoHiveCatalog(
new CatalogName("catalog"),
Expand Down
Loading
Loading