Skip to content

Commit

Permalink
Include source task/attempt for each file in FileSystemExchangeSource…
Browse files Browse the repository at this point in the history
…Handle

To allow post filtering at the FileSystemExchangeSource level
  • Loading branch information
arhimondr committed Oct 7, 2022
1 parent 6cf37fa commit 78fd3b1
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.exchange.filesystem;

import io.trino.spi.exchange.ExchangeId;

import javax.annotation.concurrent.Immutable;
import javax.crypto.SecretKey;

Expand All @@ -27,12 +29,18 @@ public class ExchangeSourceFile
private final URI fileUri;
private final Optional<SecretKey> secretKey;
private final long fileSize;
private final ExchangeId exchangeId;
private final int sourceTaskPartitionId;
private final int sourceTaskAttemptId;

public ExchangeSourceFile(URI fileUri, Optional<SecretKey> secretKey, long fileSize)
public ExchangeSourceFile(URI fileUri, Optional<SecretKey> secretKey, long fileSize, ExchangeId exchangeId, int sourceTaskPartitionId, int sourceTaskAttemptId)
{
this.fileUri = requireNonNull(fileUri, "fileUri is null");
this.secretKey = requireNonNull(secretKey, "secretKey is null");
this.fileSize = fileSize;
this.exchangeId = requireNonNull(exchangeId, "exchangeId is null");
this.sourceTaskPartitionId = sourceTaskPartitionId;
this.sourceTaskAttemptId = sourceTaskAttemptId;
}

public URI getFileUri()
Expand All @@ -49,4 +57,19 @@ public long getFileSize()
{
return fileSize;
}

public ExchangeId getExchangeId()
{
return exchangeId;
}

public int getSourceTaskPartitionId()
{
return sourceTaskPartitionId;
}

public int getSourceTaskAttemptId()
{
return sourceTaskAttemptId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeSourceHandle.SourceFile;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeSinkHandle;
Expand Down Expand Up @@ -58,6 +59,7 @@
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.DATA_FILE_SUFFIX;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -216,60 +218,65 @@ private ListenableFuture<List<ExchangeSourceHandle>> createExchangeSourceHandles
return Futures.transform(
processAll(finishedTaskPartitions, this::getCommittedPartitions, fileListingParallelism, executor),
partitionsList -> {
Multimap<Integer, FileStatus> partitionFiles = ArrayListMultimap.create();
partitionsList.forEach(partitions -> partitions.forEach(partitionFiles::put));
Multimap<Integer, SourceFile> sourceFiles = ArrayListMultimap.create();
partitionsList.forEach(partitions -> partitions.forEach(sourceFiles::put));
ImmutableList.Builder<ExchangeSourceHandle> result = ImmutableList.builder();
for (Integer partitionId : partitionFiles.keySet()) {
Collection<FileStatus> files = partitionFiles.get(partitionId);
for (Integer partitionId : sourceFiles.keySet()) {
Collection<SourceFile> files = sourceFiles.get(partitionId);
long currentExchangeHandleDataSizeInBytes = 0;
ImmutableList.Builder<FileStatus> currentExchangeHandleFiles = ImmutableList.builder();
for (FileStatus file : files) {
ImmutableList.Builder<SourceFile> currentExchangeHandleFiles = ImmutableList.builder();
for (SourceFile file : files) {
if (currentExchangeHandleDataSizeInBytes > 0 && currentExchangeHandleDataSizeInBytes + file.getFileSize() > exchangeSourceHandleTargetDataSizeInBytes) {
result.add(new FileSystemExchangeSourceHandle(partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded)));
result.add(new FileSystemExchangeSourceHandle(exchangeContext.getExchangeId(), partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded)));
currentExchangeHandleDataSizeInBytes = 0;
currentExchangeHandleFiles = ImmutableList.builder();
}
currentExchangeHandleDataSizeInBytes += file.getFileSize();
currentExchangeHandleFiles.add(file);
}
if (currentExchangeHandleDataSizeInBytes > 0) {
result.add(new FileSystemExchangeSourceHandle(partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded)));
result.add(new FileSystemExchangeSourceHandle(exchangeContext.getExchangeId(), partitionId, currentExchangeHandleFiles.build(), secretKey.map(SecretKey::getEncoded)));
}
}
return result.build();
},
executor);
}

private ListenableFuture<Multimap<Integer, FileStatus>> getCommittedPartitions(int taskPartitionId)
private ListenableFuture<Multimap<Integer, SourceFile>> getCommittedPartitions(int partition)
{
URI sinkOutputPath = getTaskOutputDirectory(taskPartitionId);
URI sinkOutputPath = getTaskOutputDirectory(partition);
return stats.getGetCommittedPartitions().record(Futures.transform(
exchangeStorage.listFilesRecursively(sinkOutputPath),
sinkOutputFiles -> {
String committedMarkerFilePath = sinkOutputFiles.stream()
List<String> committedMarkerFilePaths = sinkOutputFiles.stream()
.map(FileStatus::getFilePath)
.filter(filePath -> filePath.endsWith(COMMITTED_MARKER_FILE_NAME))
.findFirst()
.orElseThrow(() -> new IllegalStateException(format("No committed attempts found under sink output path %s", sinkOutputPath)));
.collect(toImmutableList());

if (committedMarkerFilePaths.isEmpty()) {
throw new IllegalStateException(format("No committed attempts found under sink output path %s", sinkOutputPath));
}

String committedMarkerFilePath = committedMarkerFilePaths.get(0);
// Committed marker file path format: {sinkOutputPath}/{attemptId}/committed
String[] parts = committedMarkerFilePath.split(PATH_SEPARATOR);
checkState(parts.length >= 3, "committedMarkerFilePath %s is malformed", committedMarkerFilePath);
String committedAttemptId = parts[parts.length - 2];
int attemptIdOffset = committedMarkerFilePath.length() - committedAttemptId.length()
String stringCommittedAttemptId = parts[parts.length - 2];
int attemptIdOffset = committedMarkerFilePath.length() - stringCommittedAttemptId.length()
- PATH_SEPARATOR.length() - COMMITTED_MARKER_FILE_NAME.length();

// Data output file path format: {sinkOutputPath}/{attemptId}/{sourcePartitionId}_{splitId}.data
List<FileStatus> partitionFiles = sinkOutputFiles.stream()
.filter(file -> file.getFilePath().startsWith(committedAttemptId + PATH_SEPARATOR, attemptIdOffset) && file.getFilePath().endsWith(DATA_FILE_SUFFIX))
.filter(file -> file.getFilePath().startsWith(stringCommittedAttemptId + PATH_SEPARATOR, attemptIdOffset) && file.getFilePath().endsWith(DATA_FILE_SUFFIX))
.collect(toImmutableList());

ImmutableMultimap.Builder<Integer, FileStatus> result = ImmutableMultimap.builder();
ImmutableMultimap.Builder<Integer, SourceFile> result = ImmutableMultimap.builder();
for (FileStatus partitionFile : partitionFiles) {
Matcher matcher = PARTITION_FILE_NAME_PATTERN.matcher(new File(partitionFile.getFilePath()).getName());
checkState(matcher.matches(), "Unexpected partition file: %s", partitionFile);
int partitionId = Integer.parseInt(matcher.group(1));
result.put(partitionId, partitionFile);
int partitionId = parseInt(matcher.group(1));
result.put(partitionId, new SourceFile(partitionFile.getFilePath(), partitionFile.getFileSize(), partition, parseInt(stringCommittedAttemptId)));
}
return result.build();
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,14 @@ private static List<ExchangeSourceFile> getFiles(List<ExchangeSourceHandle> hand
Optional<SecretKey> secretKey = handle.getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES"));
return new AbstractMap.SimpleEntry<>(handle, secretKey);
})
.flatMap(entry -> entry.getKey().getFiles().stream().map(fileStatus ->
.flatMap(entry -> entry.getKey().getFiles().stream().map(sourceFile ->
new ExchangeSourceFile(
URI.create(fileStatus.getFilePath()),
URI.create(sourceFile.getFilePath()),
entry.getValue(),
fileStatus.getFileSize())))
sourceFile.getFileSize(),
entry.getKey().getExchangeId(),
sourceFile.getSourceTaskPartitionId(),
sourceFile.getSourceTaskAttemptId())))
.collect(toImmutableList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.SizeOf;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeSourceHandle;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -34,21 +36,30 @@ public class FileSystemExchangeSourceHandle
{
private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(FileSystemExchangeSourceHandle.class).instanceSize());

private final ExchangeId exchangeId;
private final int partitionId;
private final List<FileStatus> files;
private final List<SourceFile> files;
private final Optional<byte[]> secretKey;

@JsonCreator
public FileSystemExchangeSourceHandle(
@JsonProperty("exchangeId") ExchangeId exchangeId,
@JsonProperty("partitionId") int partitionId,
@JsonProperty("files") List<FileStatus> files,
@JsonProperty("files") List<SourceFile> files,
@JsonProperty("secretKey") Optional<byte[]> secretKey)
{
this.exchangeId = requireNonNull(exchangeId, "exchangeId is null");
this.partitionId = partitionId;
this.files = ImmutableList.copyOf(requireNonNull(files, "files is null"));
this.secretKey = requireNonNull(secretKey, "secretKey is null");
}

@JsonProperty
public ExchangeId getExchangeId()
{
return exchangeId;
}

@Override
@JsonProperty
public int getPartitionId()
Expand All @@ -60,20 +71,20 @@ public int getPartitionId()
public long getDataSizeInBytes()
{
return files.stream()
.mapToLong(FileStatus::getFileSize)
.mapToLong(SourceFile::getFileSize)
.sum();
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(files, FileStatus::getRetainedSizeInBytes)
+ estimatedSizeOf(files, SourceFile::getRetainedSizeInBytes)
+ sizeOf(secretKey, SizeOf::sizeOf);
}

@JsonProperty
public List<FileStatus> getFiles()
public List<SourceFile> getFiles()
{
return files;
}
Expand All @@ -88,9 +99,92 @@ public Optional<byte[]> getSecretKey()
public String toString()
{
return toStringHelper(this)
.add("exchangeId", exchangeId)
.add("partitionId", partitionId)
.add("files", files)
.add("secretKey", secretKey.map(value -> "[REDACTED]"))
.toString();
}

public static class SourceFile
{
private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(SourceFile.class).instanceSize());

private final String filePath;
private final long fileSize;
private final int sourceTaskPartitionId;
private final int sourceTaskAttemptId;

@JsonCreator
public SourceFile(
@JsonProperty("filePath") String filePath,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("sourceTaskPartitionId") int sourceTaskPartitionId,
@JsonProperty("sourceTaskAttemptId") int sourceTaskAttemptId)
{
this.filePath = requireNonNull(filePath, "filePath is null");
this.fileSize = fileSize;
this.sourceTaskPartitionId = sourceTaskPartitionId;
this.sourceTaskAttemptId = sourceTaskAttemptId;
}

@JsonProperty
public String getFilePath()
{
return filePath;
}

@JsonProperty
public long getFileSize()
{
return fileSize;
}

@JsonProperty
public int getSourceTaskPartitionId()
{
return sourceTaskPartitionId;
}

@JsonProperty
public int getSourceTaskAttemptId()
{
return sourceTaskAttemptId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SourceFile that = (SourceFile) o;
return fileSize == that.fileSize && sourceTaskPartitionId == that.sourceTaskPartitionId && sourceTaskAttemptId == that.sourceTaskAttemptId && Objects.equals(filePath, that.filePath);
}

@Override
public int hashCode()
{
return Objects.hash(filePath, fileSize, sourceTaskPartitionId, sourceTaskAttemptId);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("filePath", filePath)
.add("fileSize", fileSize)
.add("sourceTaskPartitionId", sourceTaskPartitionId)
.add("sourceTaskAttemptId", sourceTaskAttemptId)
.toString();
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + estimatedSizeOf(filePath);
}
}
}

0 comments on commit 78fd3b1

Please sign in to comment.