From 78fd3b1f7b9aa763bc41623157158cb94b8d15bc Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 4 Oct 2022 14:46:09 -0400 Subject: [PATCH] Include source task/attempt for each file in FileSystemExchangeSourceHandle To allow post filtering at the FileSystemExchangeSource level --- .../filesystem/ExchangeSourceFile.java | 25 ++++- .../filesystem/FileSystemExchange.java | 45 ++++---- .../filesystem/FileSystemExchangeSource.java | 9 +- .../FileSystemExchangeSourceHandle.java | 104 +++++++++++++++++- 4 files changed, 155 insertions(+), 28 deletions(-) diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java index c8f049d94060..f51545c14962 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/ExchangeSourceFile.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.exchange.filesystem; +import io.trino.spi.exchange.ExchangeId; + import javax.annotation.concurrent.Immutable; import javax.crypto.SecretKey; @@ -27,12 +29,18 @@ public class ExchangeSourceFile private final URI fileUri; private final Optional secretKey; private final long fileSize; + private final ExchangeId exchangeId; + private final int sourceTaskPartitionId; + private final int sourceTaskAttemptId; - public ExchangeSourceFile(URI fileUri, Optional secretKey, long fileSize) + public ExchangeSourceFile(URI fileUri, Optional secretKey, long fileSize, ExchangeId exchangeId, int sourceTaskPartitionId, int sourceTaskAttemptId) { this.fileUri = requireNonNull(fileUri, "fileUri is null"); this.secretKey = requireNonNull(secretKey, "secretKey is null"); this.fileSize = fileSize; + this.exchangeId = requireNonNull(exchangeId, "exchangeId is null"); + this.sourceTaskPartitionId = sourceTaskPartitionId; + this.sourceTaskAttemptId = sourceTaskAttemptId; } public URI getFileUri() @@ -49,4 +57,19 @@ public long getFileSize() { return fileSize; } + + public ExchangeId getExchangeId() + { + return exchangeId; + } + + public int getSourceTaskPartitionId() + { + return sourceTaskPartitionId; + } + + public int getSourceTaskAttemptId() + { + return sourceTaskAttemptId; + } } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java index 41688234ca27..8f60a4f7bdab 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchange.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.trino.plugin.exchange.filesystem.FileSystemExchangeSourceHandle.SourceFile; import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeContext; import io.trino.spi.exchange.ExchangeSinkHandle; @@ -58,6 +59,7 @@ import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR; import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME; import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.DATA_FILE_SUFFIX; +import static java.lang.Integer.parseInt; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -216,16 +218,16 @@ private ListenableFuture> createExchangeSourceHandles return Futures.transform( processAll(finishedTaskPartitions, this::getCommittedPartitions, fileListingParallelism, executor), partitionsList -> { - Multimap partitionFiles = ArrayListMultimap.create(); - partitionsList.forEach(partitions -> partitions.forEach(partitionFiles::put)); + Multimap sourceFiles = ArrayListMultimap.create(); + partitionsList.forEach(partitions -> partitions.forEach(sourceFiles::put)); ImmutableList.Builder result = ImmutableList.builder(); - for (Integer partitionId : partitionFiles.keySet()) { - Collection files = partitionFiles.get(partitionId); + for (Integer partitionId : sourceFiles.keySet()) { + Collection files = sourceFiles.get(partitionId); long currentExchangeHandleDataSizeInBytes = 0; - ImmutableList.Builder currentExchangeHandleFiles = ImmutableList.builder(); - for (FileStatus file : files) { + ImmutableList.Builder currentExchangeHandleFiles = ImmutableList.builder(); + for (SourceFile file : files) { if (currentExchangeHandleDataSizeInBytes > 0 && currentExchangeHandleDataSizeInBytes + file.getFileSize() > exchangeSourceHandleTargetDataSizeInBytes) { - result.add(new FileSystemExchangeSourceHandle(partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded))); + result.add(new FileSystemExchangeSourceHandle(exchangeContext.getExchangeId(), partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded))); currentExchangeHandleDataSizeInBytes = 0; currentExchangeHandleFiles = ImmutableList.builder(); } @@ -233,7 +235,7 @@ private ListenableFuture> createExchangeSourceHandles currentExchangeHandleFiles.add(file); } if (currentExchangeHandleDataSizeInBytes > 0) { - result.add(new FileSystemExchangeSourceHandle(partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded))); + result.add(new FileSystemExchangeSourceHandle(exchangeContext.getExchangeId(), partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded))); } } return result.build(); @@ -241,35 +243,40 @@ private ListenableFuture> createExchangeSourceHandles executor); } - private ListenableFuture> getCommittedPartitions(int taskPartitionId) + private ListenableFuture> getCommittedPartitions(int partition) { - URI sinkOutputPath = getTaskOutputDirectory(taskPartitionId); + URI sinkOutputPath = getTaskOutputDirectory(partition); return stats.getGetCommittedPartitions().record(Futures.transform( exchangeStorage.listFilesRecursively(sinkOutputPath), sinkOutputFiles -> { - String committedMarkerFilePath = sinkOutputFiles.stream() + List committedMarkerFilePaths = sinkOutputFiles.stream() .map(FileStatus::getFilePath) .filter(filePath -> filePath.endsWith(COMMITTED_MARKER_FILE_NAME)) - .findFirst() - .orElseThrow(() -> new IllegalStateException(format("No committed attempts found under sink output path %s", sinkOutputPath))); + .collect(toImmutableList()); + + if (committedMarkerFilePaths.isEmpty()) { + throw new IllegalStateException(format("No committed attempts found under sink output path %s", sinkOutputPath)); + } + + String committedMarkerFilePath = committedMarkerFilePaths.get(0); // Committed marker file path format: {sinkOutputPath}/{attemptId}/committed String[] parts = committedMarkerFilePath.split(PATH_SEPARATOR); checkState(parts.length >= 3, "committedMarkerFilePath %s is malformed", committedMarkerFilePath); - String committedAttemptId = parts[parts.length - 2]; - int attemptIdOffset = committedMarkerFilePath.length() - committedAttemptId.length() + String stringCommittedAttemptId = parts[parts.length - 2]; + int attemptIdOffset = committedMarkerFilePath.length() - stringCommittedAttemptId.length() - PATH_SEPARATOR.length() - COMMITTED_MARKER_FILE_NAME.length(); // Data output file path format: {sinkOutputPath}/{attemptId}/{sourcePartitionId}_{splitId}.data List partitionFiles = sinkOutputFiles.stream() - .filter(file -> file.getFilePath().startsWith(committedAttemptId + PATH_SEPARATOR, attemptIdOffset) && file.getFilePath().endsWith(DATA_FILE_SUFFIX)) + .filter(file -> file.getFilePath().startsWith(stringCommittedAttemptId + PATH_SEPARATOR, attemptIdOffset) && file.getFilePath().endsWith(DATA_FILE_SUFFIX)) .collect(toImmutableList()); - ImmutableMultimap.Builder result = ImmutableMultimap.builder(); + ImmutableMultimap.Builder result = ImmutableMultimap.builder(); for (FileStatus partitionFile : partitionFiles) { Matcher matcher = PARTITION_FILE_NAME_PATTERN.matcher(new File(partitionFile.getFilePath()).getName()); checkState(matcher.matches(), "Unexpected partition file: %s", partitionFile); - int partitionId = Integer.parseInt(matcher.group(1)); - result.put(partitionId, partitionFile); + int partitionId = parseInt(matcher.group(1)); + result.put(partitionId, new SourceFile(partitionFile.getFilePath(), partitionFile.getFileSize(), partition, parseInt(stringCommittedAttemptId))); } return result.build(); }, diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java index fba10c86c61e..cc5a358a7409 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSource.java @@ -319,11 +319,14 @@ private static List getFiles(List hand Optional secretKey = handle.getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES")); return new AbstractMap.SimpleEntry<>(handle, secretKey); }) - .flatMap(entry -> entry.getKey().getFiles().stream().map(fileStatus -> + .flatMap(entry -> entry.getKey().getFiles().stream().map(sourceFile -> new ExchangeSourceFile( - URI.create(fileStatus.getFilePath()), + URI.create(sourceFile.getFilePath()), entry.getValue(), - fileStatus.getFileSize()))) + sourceFile.getFileSize(), + entry.getKey().getExchangeId(), + sourceFile.getSourceTaskPartitionId(), + sourceFile.getSourceTaskAttemptId()))) .collect(toImmutableList()); } } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java index abc021cba2da..e3bfa077a63a 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java @@ -17,10 +17,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.airlift.slice.SizeOf; +import io.trino.spi.exchange.ExchangeId; import io.trino.spi.exchange.ExchangeSourceHandle; import org.openjdk.jol.info.ClassLayout; import java.util.List; +import java.util.Objects; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; @@ -34,21 +36,30 @@ public class FileSystemExchangeSourceHandle { private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(FileSystemExchangeSourceHandle.class).instanceSize()); + private final ExchangeId exchangeId; private final int partitionId; - private final List files; + private final List files; private final Optional secretKey; @JsonCreator public FileSystemExchangeSourceHandle( + @JsonProperty("exchangeId") ExchangeId exchangeId, @JsonProperty("partitionId") int partitionId, - @JsonProperty("files") List files, + @JsonProperty("files") List files, @JsonProperty("secretKey") Optional secretKey) { + this.exchangeId = requireNonNull(exchangeId, "exchangeId is null"); this.partitionId = partitionId; this.files = ImmutableList.copyOf(requireNonNull(files, "files is null")); this.secretKey = requireNonNull(secretKey, "secretKey is null"); } + @JsonProperty + public ExchangeId getExchangeId() + { + return exchangeId; + } + @Override @JsonProperty public int getPartitionId() @@ -60,7 +71,7 @@ public int getPartitionId() public long getDataSizeInBytes() { return files.stream() - .mapToLong(FileStatus::getFileSize) + .mapToLong(SourceFile::getFileSize) .sum(); } @@ -68,12 +79,12 @@ public long getDataSizeInBytes() public long getRetainedSizeInBytes() { return INSTANCE_SIZE - + estimatedSizeOf(files, FileStatus::getRetainedSizeInBytes) + + estimatedSizeOf(files, SourceFile::getRetainedSizeInBytes) + sizeOf(secretKey, SizeOf::sizeOf); } @JsonProperty - public List getFiles() + public List getFiles() { return files; } @@ -88,9 +99,92 @@ public Optional getSecretKey() public String toString() { return toStringHelper(this) + .add("exchangeId", exchangeId) .add("partitionId", partitionId) .add("files", files) .add("secretKey", secretKey.map(value -> "[REDACTED]")) .toString(); } + + public static class SourceFile + { + private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(SourceFile.class).instanceSize()); + + private final String filePath; + private final long fileSize; + private final int sourceTaskPartitionId; + private final int sourceTaskAttemptId; + + @JsonCreator + public SourceFile( + @JsonProperty("filePath") String filePath, + @JsonProperty("fileSize") long fileSize, + @JsonProperty("sourceTaskPartitionId") int sourceTaskPartitionId, + @JsonProperty("sourceTaskAttemptId") int sourceTaskAttemptId) + { + this.filePath = requireNonNull(filePath, "filePath is null"); + this.fileSize = fileSize; + this.sourceTaskPartitionId = sourceTaskPartitionId; + this.sourceTaskAttemptId = sourceTaskAttemptId; + } + + @JsonProperty + public String getFilePath() + { + return filePath; + } + + @JsonProperty + public long getFileSize() + { + return fileSize; + } + + @JsonProperty + public int getSourceTaskPartitionId() + { + return sourceTaskPartitionId; + } + + @JsonProperty + public int getSourceTaskAttemptId() + { + return sourceTaskAttemptId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SourceFile that = (SourceFile) o; + return fileSize == that.fileSize && sourceTaskPartitionId == that.sourceTaskPartitionId && sourceTaskAttemptId == that.sourceTaskAttemptId && Objects.equals(filePath, that.filePath); + } + + @Override + public int hashCode() + { + return Objects.hash(filePath, fileSize, sourceTaskPartitionId, sourceTaskAttemptId); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("filePath", filePath) + .add("fileSize", fileSize) + .add("sourceTaskPartitionId", sourceTaskPartitionId) + .add("sourceTaskAttemptId", sourceTaskAttemptId) + .toString(); + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + estimatedSizeOf(filePath); + } + } }