From 7a5f72894c623f08acbff5ee4136772c95c27550 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 21 Jul 2023 15:02:41 +0900 Subject: [PATCH] Test S3 request count in Hive connector --- plugin/trino-hive/pom.xml | 31 +++ .../hive/InternalHiveConnectorFactory.java | 5 +- .../io/trino/plugin/hive/HiveQueryRunner.java | 11 +- .../hive/TestingHiveConnectorFactory.java | 13 +- .../trino/plugin/hive/TestingHivePlugin.java | 11 +- ...TestTrinoS3FileSystemAccessOperations.java | 213 ++++++++++++++++++ 6 files changed, 274 insertions(+), 10 deletions(-) create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index bd19a453a590..cea46290c3d1 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -308,6 +308,12 @@ runtime + + io.opentelemetry + opentelemetry-context + runtime + + io.trino trino-hadoop-toolkit @@ -348,6 +354,24 @@ + + io.opentelemetry + opentelemetry-sdk + test + + + + io.opentelemetry + opentelemetry-sdk-testing + test + + + + io.opentelemetry + opentelemetry-sdk-trace + test + + io.trino @@ -368,6 +392,13 @@ test + + io.trino + trino-filesystem + test-jar + test + + io.trino trino-main diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index 90d1043551d9..7d231385477b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -87,7 +87,7 @@ private InternalHiveConnectorFactory() {} public static Connector createConnector(String catalogName, Map config, ConnectorContext context, Module module) { - return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty(), Optional.empty()); + return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); } public static Connector createConnector( @@ -97,6 +97,7 @@ public static Connector createConnector( Module module, Optional metastore, Optional fileSystemFactory, + Optional openTelemetry, Optional directoryLister) { requireNonNull(config, "config is null"); @@ -127,7 +128,7 @@ public static Connector createConnector( new HiveProcedureModule(), new MBeanServerModule(), binder -> { - binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); + binder.bind(OpenTelemetry.class).toInstance(openTelemetry.orElse(context.getOpenTelemetry())); binder.bind(Tracer.class).toInstance(context.getTracer()); binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index a493d304a220..19fa8bb0cd6e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -19,6 +19,7 @@ import com.google.inject.Module; import io.airlift.log.Logger; import io.airlift.log.Logging; +import io.opentelemetry.api.OpenTelemetry; import io.trino.Session; import io.trino.metadata.QualifiedObjectName; import io.trino.plugin.hive.fs.DirectoryLister; @@ -105,6 +106,7 @@ public static class Builder> File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile(); return createTestingFileHiveMetastore(baseDir); }; + private Optional openTelemetry = Optional.empty(); private Module module = EMPTY_MODULE; private Optional directoryLister = Optional.empty(); private boolean tpcdsCatalogEnabled; @@ -173,6 +175,13 @@ public SELF setMetastore(Function metasto return self(); } + @CanIgnoreReturnValue + public SELF setOpenTelemetry(OpenTelemetry openTelemetry) + { + this.openTelemetry = Optional.of(openTelemetry); + return self(); + } + @CanIgnoreReturnValue public SELF setModule(Module module) { @@ -244,7 +253,7 @@ public DistributedQueryRunner build() } HiveMetastore metastore = this.metastore.apply(queryRunner); - queryRunner.installPlugin(new TestingHivePlugin(Optional.of(metastore), module, directoryLister)); + queryRunner.installPlugin(new TestingHivePlugin(Optional.of(metastore), openTelemetry, module, directoryLister)); Map hiveProperties = new HashMap<>(); if (!skipTimezoneSetup) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java index f353248f6df1..086472941d78 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import com.google.inject.Module; +import io.opentelemetry.api.OpenTelemetry; import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.connector.Connector; @@ -31,17 +32,23 @@ public class TestingHiveConnectorFactory implements ConnectorFactory { private final Optional metastore; + private final Optional openTelemetry; private final Module module; private final Optional directoryLister; public TestingHiveConnectorFactory(HiveMetastore metastore) { - this(Optional.of(metastore), EMPTY_MODULE, Optional.empty()); + this(Optional.of(metastore), Optional.empty(), EMPTY_MODULE, Optional.empty()); } - public TestingHiveConnectorFactory(Optional metastore, Module module, Optional directoryLister) + public TestingHiveConnectorFactory( + Optional metastore, + Optional openTelemetry, + Module module, + Optional directoryLister) { this.metastore = requireNonNull(metastore, "metastore is null"); + this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null"); this.module = requireNonNull(module, "module is null"); this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); } @@ -55,6 +62,6 @@ public String getName() @Override public Connector create(String catalogName, Map config, ConnectorContext context) { - return createConnector(catalogName, config, context, module, metastore, Optional.empty(), directoryLister); + return createConnector(catalogName, config, context, module, metastore, Optional.empty(), openTelemetry, directoryLister); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java index adc6ad833f5e..13975b1995b3 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Module; +import io.opentelemetry.api.OpenTelemetry; import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.Plugin; @@ -29,22 +30,24 @@ public class TestingHivePlugin implements Plugin { private final Optional metastore; + private final Optional openTelemetry; private final Module module; private final Optional directoryLister; public TestingHivePlugin() { - this(Optional.empty(), EMPTY_MODULE, Optional.empty()); + this(Optional.empty(), Optional.empty(), EMPTY_MODULE, Optional.empty()); } public TestingHivePlugin(HiveMetastore metastore) { - this(Optional.of(metastore), EMPTY_MODULE, Optional.empty()); + this(Optional.of(metastore), Optional.empty(), EMPTY_MODULE, Optional.empty()); } - public TestingHivePlugin(Optional metastore, Module module, Optional directoryLister) + public TestingHivePlugin(Optional metastore, Optional openTelemetry, Module module, Optional directoryLister) { this.metastore = requireNonNull(metastore, "metastore is null"); + this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null"); this.module = requireNonNull(module, "module is null"); this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); } @@ -52,6 +55,6 @@ public TestingHivePlugin(Optional metastore, Module module, Optio @Override public Iterable getConnectorFactories() { - return ImmutableList.of(new TestingHiveConnectorFactory(metastore, module, directoryLister)); + return ImmutableList.of(new TestingHiveConnectorFactory(metastore, openTelemetry, module, directoryLister)); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java new file mode 100644 index 000000000000..a065406c8d39 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestTrinoS3FileSystemAccessOperations.java @@ -0,0 +1,213 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multiset; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.trino.hdfs.HdfsConfig; +import io.trino.hdfs.HdfsEnvironment; +import io.trino.hdfs.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.HiveQueryRunner; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.testing.containers.Minio; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Arrays; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_CONFIGURATION; +import static io.trino.plugin.hive.util.MultisetAssertions.assertMultisetsEqual; +import static io.trino.testing.DataProviders.toDataProvider; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.util.stream.Collectors.toCollection; + +@Test(singleThreaded = true) // S3 request counters shares mutable state so can't be run from many threads simultaneously +public class TestTrinoS3FileSystemAccessOperations + extends AbstractTestQueryFramework +{ + private static final String BUCKET = "test-bucket"; + + private Minio minio; + private InMemorySpanExporter spanExporter; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + minio = closeAfterClass(Minio.builder().build()); + minio.start(); + minio.createBucket(BUCKET); + + spanExporter = closeAfterClass(InMemorySpanExporter.create()); + + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + + return HiveQueryRunner.builder() + .setMetastore(distributedQueryRunner -> { + File baseDir = distributedQueryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile(); + return new FileHiveMetastore( + new NodeVersion("testversion"), + new HdfsEnvironment(HDFS_CONFIGURATION, new HdfsConfig(), new NoHdfsAuthentication()), + new HiveMetastoreConfig().isHideDeltaLakeTables(), + new FileHiveMetastoreConfig() + .setCatalogDirectory(baseDir.toURI().toString()) + .setDisableLocationChecks(true) // matches Glue behavior + .setMetastoreUser("test")); + }) + .setHiveProperties(ImmutableMap.builder() + .put("hive.s3.aws-access-key", MINIO_ACCESS_KEY) + .put("hive.s3.aws-secret-key", MINIO_SECRET_KEY) + .put("hive.s3.endpoint", minio.getMinioAddress()) + .put("hive.s3.path-style-access", "true") + .put("hive.non-managed-table-writes-enabled", "true") + .buildOrThrow()) + .setOpenTelemetry(openTelemetry) + .setInitialSchemasLocationBase("s3://" + BUCKET) + .build(); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + // closed by closeAfterClass + spanExporter = null; + minio = null; + } + + @Test(dataProvider = "storageFormats") + public void testSelectWithFilter(StorageFormat format) + { + assertUpdate("DROP TABLE IF EXISTS test_select_from_where"); + String tableLocation = randomTableLocation("test_select_from_where"); + + assertUpdate("CREATE TABLE test_select_from_where WITH (format = '" + format + "', external_location = '" + tableLocation + "') AS SELECT 2 AS age", 1); + + assertFileSystemAccesses("SELECT * FROM test_select_from_where WHERE age = 2", + ImmutableMultiset.builder() + // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format + .addCopies("S3.GetObject", occurrences(format, 1, 2)) + .add("S3.ListObjectsV2") + .addCopies("S3.GetObjectMetadata", occurrences(format, 1, 0)) + .build()); + + assertUpdate("DROP TABLE test_select_from_where"); + } + + @Test(dataProvider = "storageFormats") + public void testSelectPartitionTable(StorageFormat format) + { + assertUpdate("DROP TABLE IF EXISTS test_select_from_partition"); + String tableLocation = randomTableLocation("test_select_from_partition"); + + assertUpdate("CREATE TABLE test_select_from_partition (data int, key varchar)" + + "WITH (partitioned_by = ARRAY['key'], format = '" + format + "', external_location = '" + tableLocation + "')"); + assertUpdate("INSERT INTO test_select_from_partition VALUES (1, 'part1'), (2, 'part2')", 2); + + assertFileSystemAccesses("SELECT * FROM test_select_from_partition", + ImmutableMultiset.builder() + // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format + .addCopies("S3.GetObject", occurrences(format, 2, 4)) + .addCopies("S3.ListObjectsV2", 2) + .addCopies("S3.GetObjectMetadata", occurrences(format, 2, 0)) + .build()); + + assertFileSystemAccesses("SELECT * FROM test_select_from_partition WHERE key = 'part1'", + ImmutableMultiset.builder() + // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format + .addCopies("S3.GetObject", occurrences(format, 1, 2)) + .add("S3.ListObjectsV2") + .addCopies("S3.GetObjectMetadata", occurrences(format, 1, 0)) + .build()); + + assertUpdate("INSERT INTO test_select_from_partition VALUES (11, 'part1')", 1); + assertFileSystemAccesses("SELECT * FROM test_select_from_partition WHERE key = 'part1'", + ImmutableMultiset.builder() + // TODO https://github.com/trinodb/trino/issues/18334 Reduce GetObject call for Parquet format + .addCopies("S3.GetObject", occurrences(format, 2, 4)) + .addCopies("S3.ListObjectsV2", 1) + .addCopies("S3.GetObjectMetadata", occurrences(format, 2, 0)) + .build()); + + assertUpdate("DROP TABLE test_select_from_partition"); + } + + private static String randomTableLocation(String tableName) + { + return "s3://%s/%s/%s-%s".formatted(BUCKET, TPCH_SCHEMA, tableName, randomNameSuffix()); + } + + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedAccesses) + { + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + spanExporter.reset(); + queryRunner.executeWithQueryId(queryRunner.getDefaultSession(), query); + assertMultisetsEqual(getOperations(), expectedAccesses); + } + + private Multiset getOperations() + { + return spanExporter.getFinishedSpanItems().stream() + .map(SpanData::getName) + .collect(toCollection(HashMultiset::create)); + } + + @DataProvider + public static Object[][] storageFormats() + { + return Arrays.stream(StorageFormat.values()) + .collect(toDataProvider()); + } + + private static int occurrences(StorageFormat tableType, int orcValue, int parquetValue) + { + checkArgument(!(orcValue == parquetValue), "No need to use Occurrences when ORC and Parquet"); + return switch (tableType) { + case ORC -> orcValue; + case PARQUET -> parquetValue; + }; + } + + enum StorageFormat + { + ORC, + PARQUET, + } +}