Skip to content

Commit

Permalink
Expose FileSystemExchangeSource metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk authored and wendigo committed Jul 26, 2024
1 parent eee8d4d commit 2fdfb47
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.slice.Slice;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import io.trino.spi.metrics.Metrics;
import jakarta.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -42,6 +45,7 @@
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
import static io.trino.plugin.exchange.filesystem.MetricsBuilder.SOURCE_FILES_TOTAL;
import static io.trino.spi.exchange.ExchangeSourceOutputSelector.Selection.INCLUDED;
import static java.util.Objects.requireNonNull;

Expand All @@ -66,6 +70,9 @@ public class FileSystemExchangeSource
private final AtomicReference<ListenableFuture<Void>> blocked = new AtomicReference<>();
private final AtomicBoolean closed = new AtomicBoolean();

private final MetricsBuilder metricsBuilder = new MetricsBuilder();
private final CounterMetricBuilder totalFilesMetric = metricsBuilder.getCounterMetric(SOURCE_FILES_TOTAL);

public FileSystemExchangeSource(
FileSystemExchangeStorage exchangeStorage,
FileSystemExchangeStats stats,
Expand All @@ -86,7 +93,9 @@ public synchronized void addSourceHandles(List<ExchangeSourceHandle> handles)
if (closed.get()) {
return;
}
files.addAll(getFiles(handles));
List<ExchangeSourceFile> newFiles = getFiles(handles);
files.addAll(newFiles);
totalFilesMetric.add(newFiles.size());
closeAndCreateReadersIfNecessary();
}

Expand Down Expand Up @@ -289,7 +298,7 @@ private void closeAndCreateReadersIfNecessary()
break;
}
}
activeReaders.add(exchangeStorage.createExchangeStorageReader(readerFiles.build(), maxPageStorageSize));
activeReaders.add(exchangeStorage.createExchangeStorageReader(readerFiles.build(), maxPageStorageSize, metricsBuilder));
}
if (activeReaders.isEmpty()) {
if (noMoreFiles) {
Expand Down Expand Up @@ -338,6 +347,12 @@ private int getNumberOfActiveReaders()
return result;
}

@Override
public Optional<Metrics> getMetrics()
{
return Optional.of(metricsBuilder.buildMetrics());
}

private static List<ExchangeSourceFile> getFiles(List<ExchangeSourceHandle> handles)
{
return handles.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface FileSystemExchangeStorage
void createDirectories(URI dir)
throws IOException;

ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize);
ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder);

ExchangeStorageWriter createExchangeStorageWriter(URI file);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.exchange.filesystem;

import io.airlift.stats.TDigest;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.metrics.Metric;
import io.trino.spi.metrics.Metrics;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.collect.ImmutableMap.toImmutableMap;

public class MetricsBuilder
{
public static final String SOURCE_FILES_TOTAL = "FileSystemExchangeSource.filesTotal";
public static final String SOURCE_FILES_PROCESSED = "FileSystemExchangeSource.filesProcessed";

private final ConcurrentMap<String, MetricBuilder> metricBuilders = new ConcurrentHashMap<>();

public Metrics buildMetrics()
{
return new Metrics(
metricBuilders.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> entry.getValue().build())));
}

private interface MetricBuilder
{
Metric<?> build();
}

public CounterMetricBuilder getCounterMetric(String key)
{
return (CounterMetricBuilder) metricBuilders.computeIfAbsent(key, _ -> new CounterMetricBuilder());
}

public DistributionMetricBuilder getDistributionMetric(String key)
{
return (DistributionMetricBuilder) metricBuilders.computeIfAbsent(key, _ -> new DistributionMetricBuilder());
}

public static class CounterMetricBuilder
implements MetricBuilder
{
private final AtomicLong counter = new AtomicLong();

public void increment()
{
counter.incrementAndGet();
}

public void add(long delta)
{
counter.addAndGet(delta);
}

@Override
public Metric<?> build()
{
return new LongCount(counter.get());
}
}

public static class DistributionMetricBuilder
implements MetricBuilder
{
private final TDigest digest = new TDigest();

public synchronized void add(double value)
{
digest.add(value);
}

@Override
public synchronized Metric<?> build()
{
return new TDigestHistogram(TDigest.copyOf(digest));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;
import jakarta.annotation.PreDestroy;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -79,6 +81,7 @@
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures.translateFailures;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
import static io.trino.plugin.exchange.filesystem.MetricsBuilder.SOURCE_FILES_PROCESSED;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.lang.System.arraycopy;
Expand Down Expand Up @@ -124,9 +127,9 @@ public void createDirectories(URI dir)
}

@Override
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize)
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder)
{
return new AzureExchangeStorageReader(blobServiceAsyncClient, sourceFiles, blockSize, maxPageStorageSize);
return new AzureExchangeStorageReader(blobServiceAsyncClient, sourceFiles, metricsBuilder, blockSize, maxPageStorageSize);
}

@Override
Expand Down Expand Up @@ -279,6 +282,7 @@ private static class AzureExchangeStorageReader
private final Queue<ExchangeSourceFile> sourceFiles;
private final int blockSize;
private final int bufferSize;
CounterMetricBuilder sourceFilesProcessedMetric;

@GuardedBy("this")
private ExchangeSourceFile currentFile;
Expand All @@ -295,11 +299,14 @@ private static class AzureExchangeStorageReader
public AzureExchangeStorageReader(
BlobServiceAsyncClient blobServiceAsyncClient,
List<ExchangeSourceFile> sourceFiles,
MetricsBuilder metricsBuilder,
int blockSize,
int maxPageStorageSize)
{
this.blobServiceAsyncClient = requireNonNull(blobServiceAsyncClient, "blobServiceAsyncClient is null");
this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null"));
requireNonNull(metricsBuilder, "metricsBuilder is null");
sourceFilesProcessedMetric = metricsBuilder.getCounterMetric(SOURCE_FILES_PROCESSED);
this.blockSize = blockSize;
// Make sure buffer can accommodate at least one complete Slice, and keep reads aligned to block boundaries
this.bufferSize = maxPageStorageSize + blockSize;
Expand Down Expand Up @@ -440,6 +447,7 @@ private void fillBuffer()
}

if (fileOffset == fileSize) {
sourceFilesProcessedMetric.increment();
currentFile = sourceFiles.poll();
if (currentFile == null) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -50,6 +52,7 @@
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.trino.plugin.exchange.filesystem.MetricsBuilder.SOURCE_FILES_PROCESSED;
import static java.lang.Math.toIntExact;
import static java.nio.file.Files.createFile;
import static java.util.Objects.requireNonNull;
Expand All @@ -67,9 +70,9 @@ public void createDirectories(URI dir)
}

@Override
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize)
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder)
{
return new LocalExchangeStorageReader(sourceFiles);
return new LocalExchangeStorageReader(sourceFiles, metricsBuilder);
}

@Override
Expand Down Expand Up @@ -140,15 +143,18 @@ private static class LocalExchangeStorageReader

@GuardedBy("this")
private final Queue<ExchangeSourceFile> sourceFiles;
CounterMetricBuilder sourceFilesProcessedMetric;

@GuardedBy("this")
private InputStreamSliceInput sliceInput;
@GuardedBy("this")
private boolean closed;

public LocalExchangeStorageReader(List<ExchangeSourceFile> sourceFiles)
public LocalExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, MetricsBuilder metricsBuilder)
{
this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null"));
requireNonNull(metricsBuilder, "metricsBuilder is null");
sourceFilesProcessedMetric = metricsBuilder.getCounterMetric(SOURCE_FILES_PROCESSED);
}

@Override
Expand All @@ -165,6 +171,7 @@ public synchronized Slice read()
}
else {
sliceInput.close();
sourceFilesProcessedMetric.increment();
}
}

Expand Down
Loading

0 comments on commit 2fdfb47

Please sign in to comment.