From 84ce5ba63266fabfd3580cd7f917ed3d5b999819 Mon Sep 17 00:00:00 2001 From: Jungwoo Lee Date: Mon, 9 Sep 2024 20:39:29 +0000 Subject: [PATCH] Add JMX support for native s3 filesystems --- lib/trino-filesystem-s3/pom.xml | 25 ++ .../filesystem/s3/AwsSdkV2ApiCallStats.java | 107 +++++++ .../filesystem/s3/S3FileSystemFactory.java | 4 +- .../filesystem/s3/S3FileSystemLoader.java | 21 +- .../filesystem/s3/S3FileSystemModule.java | 4 + .../filesystem/s3/S3FileSystemStats.java | 263 ++++++++++++++++++ .../filesystem/s3/TestS3FileSystemAwsS3.java | 2 +- .../s3/TestS3FileSystemLocalStack.java | 2 +- .../filesystem/s3/TestS3FileSystemMinIo.java | 2 +- .../filesystem/s3/TestS3FileSystemS3Mock.java | 2 +- ...gVendingRestCatalogConnectorSmokeTest.java | 3 +- .../snowflake/TestTrinoSnowflakeCatalog.java | 3 +- .../TestFileSystemSpoolingManager.java | 3 +- .../product/deltalake/TestDeltaLakeJmx.java | 1 + 14 files changed, 424 insertions(+), 18 deletions(-) create mode 100644 lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2ApiCallStats.java create mode 100644 lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemStats.java diff --git a/lib/trino-filesystem-s3/pom.xml b/lib/trino-filesystem-s3/pom.xml index b1a7a5672a0b..403c614e9649 100644 --- a/lib/trino-filesystem-s3/pom.xml +++ b/lib/trino-filesystem-s3/pom.xml @@ -22,6 +22,11 @@ jackson-annotations + + com.google.errorprone + error_prone_annotations + + com.google.guava guava @@ -47,6 +52,16 @@ http-client + + io.airlift + log + + + + io.airlift + stats + + io.airlift units @@ -98,6 +113,11 @@ jakarta.validation-api + + org.weakref + jmxutils + + software.amazon.awssdk apache-client @@ -129,6 +149,11 @@ identity-spi + + software.amazon.awssdk + metrics-spi + + software.amazon.awssdk regions diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2ApiCallStats.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2ApiCallStats.java new file mode 100644 index 000000000000..e4fab155ccf8 --- /dev/null +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2ApiCallStats.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +import com.google.errorprone.annotations.ThreadSafe; +import io.airlift.stats.CounterStat; +import io.airlift.stats.TimeStat; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +import java.time.Duration; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@ThreadSafe +public class AwsSdkV2ApiCallStats +{ + private final TimeStat latency = new TimeStat(MILLISECONDS); + private final CounterStat calls = new CounterStat(); + private final CounterStat failures = new CounterStat(); + private final CounterStat retries = new CounterStat(); + private final CounterStat throttlingExceptions = new CounterStat(); + private final CounterStat serverErrors = new CounterStat(); + + @Managed + @Nested + public TimeStat getLatency() + { + return latency; + } + + @Managed + @Nested + public CounterStat getCalls() + { + return calls; + } + + @Managed + @Nested + public CounterStat getFailures() + { + return failures; + } + + @Managed + @Nested + public CounterStat getRetries() + { + return retries; + } + + @Managed + @Nested + public CounterStat getThrottlingExceptions() + { + return throttlingExceptions; + } + + @Managed + @Nested + public CounterStat getServerErrors() + { + return serverErrors; + } + + public void updateLatency(Duration duration) + { + latency.addNanos(duration.toNanos()); + } + + public void updateCalls() + { + calls.update(1); + } + + public void updateFailures() + { + failures.update(1); + } + + public void updateRetries(int retryCount) + { + retries.update(retryCount); + } + + public void updateThrottlingExceptions() + { + throttlingExceptions.update(1); + } + + public void updateServerErrors() + { + serverErrors.update(1); + } +} diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java index 7658053af686..4d52f4746b7c 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java @@ -32,9 +32,9 @@ public final class S3FileSystemFactory private final Executor uploadExecutor; @Inject - public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config) + public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats) { - this.loader = new S3FileSystemLoader(openTelemetry, config); + this.loader = new S3FileSystemLoader(openTelemetry, config, stats); this.client = loader.createClient(); this.context = loader.context(); this.uploadExecutor = loader.uploadExecutor(); diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java index 98c8fb8b92a4..e72feb5d61cb 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java @@ -27,6 +27,7 @@ import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; +import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -56,22 +57,23 @@ final class S3FileSystemLoader private final ExecutorService uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s")); @Inject - public S3FileSystemLoader(S3SecurityMappingProvider mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config) + public S3FileSystemLoader(S3SecurityMappingProvider mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats) { - this(Optional.of(mappingProvider), openTelemetry, config); + this(Optional.of(mappingProvider), openTelemetry, config, stats); } - S3FileSystemLoader(OpenTelemetry openTelemetry, S3FileSystemConfig config) + S3FileSystemLoader(OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats) { - this(Optional.empty(), openTelemetry, config); + this(Optional.empty(), openTelemetry, config, stats); } - private S3FileSystemLoader(Optional mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config) + private S3FileSystemLoader(Optional mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats) { this.mappingProvider = requireNonNull(mappingProvider, "mappingProvider is null"); this.httpClient = createHttpClient(config); - this.clientFactory = s3ClientFactory(httpClient, openTelemetry, config); + requireNonNull(stats, "stats is null"); + this.clientFactory = s3ClientFactory(httpClient, openTelemetry, config, stats.newMetricPublisher()); this.context = new S3Context( toIntExact(config.getStreamingPartSize().toBytes()), @@ -112,9 +114,9 @@ Executor uploadExecutor() return uploadExecutor; } - private static S3ClientFactory s3ClientFactory(SdkHttpClient httpClient, OpenTelemetry openTelemetry, S3FileSystemConfig config) + private static S3ClientFactory s3ClientFactory(SdkHttpClient httpClient, OpenTelemetry openTelemetry, S3FileSystemConfig config, MetricPublisher metricPublisher) { - ClientOverrideConfiguration overrideConfiguration = createOverrideConfiguration(openTelemetry, config); + ClientOverrideConfiguration overrideConfiguration = createOverrideConfiguration(openTelemetry, config, metricPublisher); Optional staticCredentialsProvider = createStaticCredentialsProvider(config); Optional staticRegion = Optional.ofNullable(config.getRegion()); @@ -187,7 +189,7 @@ private static StsClient createStsClient(S3FileSystemConfig config, Optional> ALLOWED_METRICS = Set.of(API_CALL_SUCCESSFUL, RETRY_COUNT, API_CALL_DURATION, ERROR_TYPE); + + private static final Logger log = Logger.get(JmxMetricPublisher.class); + + private final S3FileSystemStats stats; + + public JmxMetricPublisher(S3FileSystemStats stats) + { + this.stats = requireNonNull(stats, "stats is null"); + } + + @Override + public void publish(MetricCollection metricCollection) + { + try { + Optional serviceId = metricCollection.metricValues(SERVICE_ID).stream().filter(Objects::nonNull).findFirst(); + Optional operationName = metricCollection.metricValues(OPERATION_NAME).stream().filter(Objects::nonNull).findFirst(); + if (serviceId.isEmpty() || operationName.isEmpty()) { + log.warn("ServiceId or OperationName is empty for AWS MetricCollection: %s", metricCollection); + return; + } + + if (!serviceId.get().equals("S3")) { + return; + } + + AwsSdkV2ApiCallStats apiCallStats = getApiCallStats(operationName.get()); + publishMetrics(metricCollection, apiCallStats); + } + catch (Exception e) { + log.warn(e, "Publishing AWS metrics failed"); + } + } + + private void publishMetrics(MetricCollection metricCollection, AwsSdkV2ApiCallStats apiCallStats) + { + metricCollection.stream() + .filter(metricRecord -> metricRecord.value() != null && ALLOWED_METRICS.contains(metricRecord.metric())) + .forEach(metricRecord -> { + if (metricRecord.metric().equals(API_CALL_SUCCESSFUL)) { + Boolean value = (Boolean) metricRecord.value(); + + stats.total.updateCalls(); + apiCallStats.updateCalls(); + + if (value.equals(Boolean.FALSE)) { + stats.total.updateFailures(); + apiCallStats.updateFailures(); + } + } + else if (metricRecord.metric().equals(RETRY_COUNT)) { + int value = (int) metricRecord.value(); + + stats.total.updateRetries(value); + apiCallStats.updateRetries(value); + } + else if (metricRecord.metric().equals(API_CALL_DURATION)) { + Duration value = (Duration) metricRecord.value(); + + stats.total.updateLatency(value); + apiCallStats.updateLatency(value); + } + else if (metricRecord.metric().equals(ERROR_TYPE)) { + String value = (String) metricRecord.value(); + + if (value.equals(THROTTLING.toString())) { + stats.total.updateThrottlingExceptions(); + apiCallStats.updateThrottlingExceptions(); + } + else if (value.equals(SERVER_ERROR.toString())) { + stats.total.updateServerErrors(); + apiCallStats.updateServerErrors(); + } + } + }); + + metricCollection.children().forEach(child -> publishMetrics(child, apiCallStats)); + } + + @Override + public void close() + { + } + + private AwsSdkV2ApiCallStats getApiCallStats(String operationName) + { + return switch (operationName) { + case "HeadObject" -> stats.headObject; + case "GetObject" -> stats.getObject; + case "ListObjectsV2" -> stats.listObjectsV2; + case "PutObject" -> stats.putObject; + case "DeleteObject" -> stats.deleteObject; + case "DeleteObjects" -> stats.deleteObjects; + case "CreateMultipartUpload" -> stats.createMultipartUpload; + case "CompleteMultipartUpload" -> stats.completeMultipartUpload; + case "AbortMultipartUpload" -> stats.abortMultipartUpload; + case "UploadPart" -> stats.uploadPart; + default -> S3FileSystemStats.dummy; + }; + } + } + + private static class DummyAwsSdkV2ApiCallStats + extends AwsSdkV2ApiCallStats + { + @Override + public void updateLatency(Duration duration) {} + + @Override + public void updateCalls() {} + + @Override + public void updateFailures() {} + + @Override + public void updateRetries(int retryCount) {} + + @Override + public void updateThrottlingExceptions() {} + + @Override + public void updateServerErrors() {} + } +} diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3.java index 79c7db65c3c1..5ed6166590f6 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3.java @@ -72,7 +72,7 @@ protected S3FileSystemFactory createS3FileSystemFactory() .setAwsSecretKey(secretKey) .setRegion(region) .setSupportsExclusiveCreate(true) - .setStreamingPartSize(DataSize.valueOf("5.5MB"))); + .setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats()); } private static String environmentVariable(String name) diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStack.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStack.java index f879f3305ad7..b00c3487f1ce 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStack.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStack.java @@ -68,6 +68,6 @@ protected S3FileSystemFactory createS3FileSystemFactory() .setAwsSecretKey(LOCALSTACK.getSecretKey()) .setEndpoint(LOCALSTACK.getEndpointOverride(Service.S3).toString()) .setRegion(LOCALSTACK.getRegion()) - .setStreamingPartSize(DataSize.valueOf("5.5MB"))); + .setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats()); } } diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemMinIo.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemMinIo.java index d57c646d8fa4..2c82e5aadf27 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemMinIo.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemMinIo.java @@ -80,7 +80,7 @@ protected S3FileSystemFactory createS3FileSystemFactory() .setAwsAccessKey(Minio.MINIO_ACCESS_KEY) .setAwsSecretKey(Minio.MINIO_SECRET_KEY) .setSupportsExclusiveCreate(true) - .setStreamingPartSize(DataSize.valueOf("5.5MB"))); + .setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats()); } @Test diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemS3Mock.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemS3Mock.java index 79b4330e5847..64d6d636d62a 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemS3Mock.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemS3Mock.java @@ -75,6 +75,6 @@ protected S3FileSystemFactory createS3FileSystemFactory() .setRegion(Region.US_EAST_1.id()) .setPathStyleAccess(true) .setStreamingPartSize(DataSize.valueOf("5.5MB")) - .setSupportsExclusiveCreate(false)); + .setSupportsExclusiveCreate(false), new S3FileSystemStats()); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java index 615135bdf17a..c2efa55daa8d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java @@ -18,6 +18,7 @@ import io.trino.filesystem.Location; import io.trino.filesystem.s3.S3FileSystemConfig; import io.trino.filesystem.s3.S3FileSystemFactory; +import io.trino.filesystem.s3.S3FileSystemStats; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -138,7 +139,7 @@ public void initFileSystem() .setEndpoint(minio.getMinioAddress()) .setPathStyleAccess(true) .setAwsAccessKey(MINIO_ACCESS_KEY) - .setAwsSecretKey(MINIO_SECRET_KEY) + .setAwsSecretKey(MINIO_SECRET_KEY), new S3FileSystemStats() ).create(SESSION); } diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java index 4ab20988f1f8..7c371b8ae755 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.trino.filesystem.s3.S3FileSystemConfig; import io.trino.filesystem.s3.S3FileSystemFactory; +import io.trino.filesystem.s3.S3FileSystemStats; import io.trino.metastore.TableInfo; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.ColumnIdentity; @@ -159,7 +160,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) .setAwsAccessKey(S3_ACCESS_KEY) .setAwsSecretKey(S3_SECRET_KEY) .setRegion(S3_REGION) - .setStreamingPartSize(DataSize.valueOf("5.5MB"))); + .setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats()); CatalogName catalogName = new CatalogName("snowflake_test_catalog"); TrinoIcebergSnowflakeCatalogFileIOFactory catalogFileIOFactory = new TrinoIcebergSnowflakeCatalogFileIOFactory(s3FileSystemFactory, ConnectorIdentity.ofUser("trino")); diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManager.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManager.java index 04264a8d652b..641ef60dbb7b 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManager.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManager.java @@ -16,6 +16,7 @@ import io.airlift.units.DataSize; import io.trino.filesystem.s3.S3FileSystemConfig; import io.trino.filesystem.s3.S3FileSystemFactory; +import io.trino.filesystem.s3.S3FileSystemStats; import io.trino.spi.QueryId; import io.trino.spi.protocol.SpooledLocation; import io.trino.spi.protocol.SpooledSegmentHandle; @@ -122,6 +123,6 @@ private SpoolingManager getSpoolingManager() .setAwsAccessKey(Minio.MINIO_ACCESS_KEY) .setAwsSecretKey(Minio.MINIO_SECRET_KEY) .setStreamingPartSize(DataSize.valueOf("5.5MB")); - return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig)); + return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats())); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeJmx.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeJmx.java index d2b2286ebe55..fa2474d723ea 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeJmx.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeJmx.java @@ -46,6 +46,7 @@ public void testJmxTablesExposedByDeltaLakeConnectorBackedByGlueMetastore() public void testJmxTablesExposedByDeltaLakeConnectorBackedByThriftMetastore() { assertThat(onTrino().executeQuery("SHOW TABLES IN jmx.current LIKE '%name=delta%'")).containsOnly( + row("io.trino.filesystem.s3:name=delta,type=s3filesystemstats"), row("io.trino.plugin.hive.metastore.cache:name=delta,type=cachinghivemetastore"), row("io.trino.plugin.hive.metastore.thrift:name=delta,type=thrifthivemetastore"), row("io.trino.plugin.hive:catalog=delta,name=delta,type=fileformatdatasourcestats"),