Skip to content

Commit

Permalink
Add JMX support for native s3 filesystems
Browse files Browse the repository at this point in the history
  • Loading branch information
ljw9111 authored and pettyjamesm committed Sep 13, 2024
1 parent c93321b commit 4c4edc4
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 18 deletions.
25 changes: 25 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -47,6 +52,16 @@
<artifactId>http-client</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
Expand Down Expand Up @@ -98,6 +113,11 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
Expand Down Expand Up @@ -129,6 +149,11 @@
<artifactId>identity-spi</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>metrics-spi</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.s3;

import com.google.errorprone.annotations.ThreadSafe;
import io.airlift.stats.CounterStat;
import io.airlift.stats.TimeStat;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.time.Duration;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

@ThreadSafe
public class AwsSdkV2ApiCallStats
{
private final TimeStat latency = new TimeStat(MILLISECONDS);
private final CounterStat calls = new CounterStat();
private final CounterStat failures = new CounterStat();
private final CounterStat retries = new CounterStat();
private final CounterStat throttlingExceptions = new CounterStat();
private final CounterStat serverErrors = new CounterStat();

@Managed
@Nested
public TimeStat getLatency()
{
return latency;
}

@Managed
@Nested
public CounterStat getCalls()
{
return calls;
}

@Managed
@Nested
public CounterStat getFailures()
{
return failures;
}

@Managed
@Nested
public CounterStat getRetries()
{
return retries;
}

@Managed
@Nested
public CounterStat getThrottlingExceptions()
{
return throttlingExceptions;
}

@Managed
@Nested
public CounterStat getServerErrors()
{
return serverErrors;
}

public void updateLatency(Duration duration)
{
latency.addNanos(duration.toNanos());
}

public void updateCalls()
{
calls.update(1);
}

public void updateFailures()
{
failures.update(1);
}

public void updateRetries(int retryCount)
{
retries.update(retryCount);
}

public void updateThrottlingExceptions()
{
throttlingExceptions.update(1);
}

public void updateServerErrors()
{
serverErrors.update(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public final class S3FileSystemFactory
private final S3Presigner preSigner;

@Inject
public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config)
public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats)
{
this.loader = new S3FileSystemLoader(openTelemetry, config);
this.loader = new S3FileSystemLoader(openTelemetry, config, stats);
this.client = loader.createClient();
this.preSigner = loader.createPreSigner();
this.context = loader.context();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
Expand Down Expand Up @@ -59,22 +60,23 @@ final class S3FileSystemLoader
private final ExecutorService uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));

@Inject
public S3FileSystemLoader(S3SecurityMappingProvider mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config)
public S3FileSystemLoader(S3SecurityMappingProvider mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats)
{
this(Optional.of(mappingProvider), openTelemetry, config);
this(Optional.of(mappingProvider), openTelemetry, config, stats);
}

S3FileSystemLoader(OpenTelemetry openTelemetry, S3FileSystemConfig config)
S3FileSystemLoader(OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats)
{
this(Optional.empty(), openTelemetry, config);
this(Optional.empty(), openTelemetry, config, stats);
}

private S3FileSystemLoader(Optional<S3SecurityMappingProvider> mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config)
private S3FileSystemLoader(Optional<S3SecurityMappingProvider> mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config, S3FileSystemStats stats)
{
this.mappingProvider = requireNonNull(mappingProvider, "mappingProvider is null");
this.httpClient = createHttpClient(config);

this.clientFactory = s3ClientFactory(httpClient, openTelemetry, config);
requireNonNull(stats, "stats is null");
this.clientFactory = s3ClientFactory(httpClient, openTelemetry, config, stats.newMetricPublisher());

this.preSigner = s3PreSigner(httpClient, openTelemetry, config);

Expand Down Expand Up @@ -122,9 +124,9 @@ Executor uploadExecutor()
return uploadExecutor;
}

private static S3ClientFactory s3ClientFactory(SdkHttpClient httpClient, OpenTelemetry openTelemetry, S3FileSystemConfig config)
private static S3ClientFactory s3ClientFactory(SdkHttpClient httpClient, OpenTelemetry openTelemetry, S3FileSystemConfig config, MetricPublisher metricPublisher)
{
ClientOverrideConfiguration overrideConfiguration = createOverrideConfiguration(openTelemetry, config);
ClientOverrideConfiguration overrideConfiguration = createOverrideConfiguration(openTelemetry, config, metricPublisher);

Optional<AwsCredentialsProvider> staticCredentialsProvider = createStaticCredentialsProvider(config);
Optional<String> staticRegion = Optional.ofNullable(config.getRegion());
Expand Down Expand Up @@ -240,7 +242,7 @@ private static StsClient createStsClient(S3FileSystemConfig config, Optional<Aws
return sts.build();
}

private static ClientOverrideConfiguration createOverrideConfiguration(OpenTelemetry openTelemetry, S3FileSystemConfig config)
private static ClientOverrideConfiguration createOverrideConfiguration(OpenTelemetry openTelemetry, S3FileSystemConfig config, MetricPublisher metricPublisher)
{
return ClientOverrideConfiguration.builder()
.addExecutionInterceptor(AwsSdkTelemetry.builder(openTelemetry)
Expand All @@ -250,6 +252,7 @@ private static ClientOverrideConfiguration createOverrideConfiguration(OpenTelem
.retryStrategy(getRetryStrategy(config.getRetryMode()).toBuilder()
.maxAttempts(config.getMaxErrorRetries())
.build())
.addMetricPublisher(metricPublisher)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class S3FileSystemModule
extends AbstractConfigurationAwareModule
Expand All @@ -52,6 +53,9 @@ protected void setup(Binder binder)
binder.bind(TrinoFileSystemFactory.class).annotatedWith(FileSystemS3.class)
.to(S3FileSystemFactory.class).in(SINGLETON);
}

binder.bind(S3FileSystemStats.class).in(SINGLETON);
newExporter(binder).export(S3FileSystemStats.class).withGeneratedName();
}

public static class S3SecurityMappingModule
Expand Down
Loading

0 comments on commit 4c4edc4

Please sign in to comment.