Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not cache Hadoop LocatedFileStatus objects #14408

Merged
merged 1 commit into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.HiveFileIterator;
import io.trino.plugin.hive.fs.TrinoFileStatus;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
Expand Down Expand Up @@ -571,7 +572,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
ListenableFuture<Void> lastResult = immediateVoidFuture(); // TODO document in addToQueue() that it is sufficient to hold on to last returned future
for (Path readPath : readPaths) {
// list all files in the partition
List<LocatedFileStatus> files = new ArrayList<>();
List<TrinoFileStatus> files = new ArrayList<>();
try {
Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
}
Expand All @@ -589,11 +590,11 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
}

for (HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
List<LocatedFileStatus> locatedFileStatuses = ImmutableList.of((LocatedFileStatus) hdfsFileStatusWithId.getFileStatus());
List<TrinoFileStatus> fileStatuses = ImmutableList.of(new TrinoFileStatus((LocatedFileStatus) hdfsFileStatusWithId.getFileStatus()));
Optional<AcidInfo> acidInfo = isFullAcid
? Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath())))
: Optional.empty();
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(locatedFileStatuses, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(fileStatuses, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
}

return lastResult;
Expand Down Expand Up @@ -683,13 +684,13 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
{
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, parent);

Map<Path, LocatedFileStatus> fileStatuses = new HashMap<>();
Map<Path, TrinoFileStatus> fileStatuses = new HashMap<>();
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, namenodeStats, IGNORED, false);
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(status.getPath()), status));

List<LocatedFileStatus> locatedFileStatuses = new ArrayList<>();
List<TrinoFileStatus> locatedFileStatuses = new ArrayList<>();
for (Path path : paths) {
LocatedFileStatus status = fileStatuses.get(getPathWithoutSchemeAndAuthority(path));
TrinoFileStatus status = fileStatuses.get(getPathWithoutSchemeAndAuthority(path));
// This check will catch all directories in the manifest since HiveFileIterator will not return any directories.
// Some files may not be listed by HiveFileIterator - if those are included in the manifest this check will fail as well.
if (status == null) {
Expand Down Expand Up @@ -735,7 +736,7 @@ private Iterator<InternalHiveSplit> generateOriginalFilesSplits(
? Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(fileStatus.getPath())))
: Optional.empty();
return splitFactory.createInternalHiveSplit(
(LocatedFileStatus) fileStatus,
new TrinoFileStatus((LocatedFileStatus) fileStatus),
OptionalInt.empty(),
OptionalInt.empty(),
splittable,
Expand Down Expand Up @@ -780,7 +781,7 @@ private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, F
}

private List<InternalHiveSplit> getBucketedSplits(
List<LocatedFileStatus> files,
List<TrinoFileStatus> files,
InternalHiveSplitFactory splitFactory,
BucketSplitInfo bucketSplitInfo,
Optional<BucketConversion> bucketConversion,
Expand All @@ -795,8 +796,8 @@ private List<InternalHiveSplit> getBucketedSplits(
checkState(readBucketCount <= tableBucketCount, "readBucketCount(%s) should be less than or equal to tableBucketCount(%s)", readBucketCount, tableBucketCount);

// build mapping of file name to bucket
ListMultimap<Integer, LocatedFileStatus> bucketFiles = ArrayListMultimap.create();
for (LocatedFileStatus file : files) {
ListMultimap<Integer, TrinoFileStatus> bucketFiles = ArrayListMultimap.create();
for (TrinoFileStatus file : files) {
String fileName = file.getPath().getName();
OptionalInt bucket = getBucketNumber(fileName);
if (bucket.isPresent()) {
Expand Down Expand Up @@ -860,7 +861,7 @@ private List<InternalHiveSplit> getBucketedSplits(
"partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")");
}
if (!eligibleTableBucketNumbers.isEmpty()) {
for (LocatedFileStatus file : bucketFiles.get(partitionBucketNumber)) {
for (TrinoFileStatus file : bucketFiles.get(partitionBucketNumber)) {
// OrcDeletedRows will load only delete delta files matching current bucket id,
// so we can pass all delete delta locations here, without filtering.
eligibleTableBucketNumbers.stream()
Expand All @@ -873,7 +874,7 @@ private List<InternalHiveSplit> getBucketedSplits(
}

@VisibleForTesting
static void validateFileBuckets(ListMultimap<Integer, LocatedFileStatus> bucketFiles, int partitionBucketCount, String tableName, String partitionName)
static void validateFileBuckets(ListMultimap<Integer, TrinoFileStatus> bucketFiles, int partitionBucketCount, String tableName, String partitionName)
{
if (bucketFiles.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.fs;

import com.google.common.collect.ImmutableList;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class BlockLocation
{
private final List<String> hosts;
private final long offset;
private final long length;

public static List<BlockLocation> fromHiveBlockLocations(@Nullable org.apache.hadoop.fs.BlockLocation[] blockLocations)
{
if (blockLocations == null) {
return ImmutableList.of();
}

return Arrays.stream(blockLocations)
.map(BlockLocation::new)
.collect(toImmutableList());
}

public BlockLocation(org.apache.hadoop.fs.BlockLocation blockLocation)
{
requireNonNull(blockLocation, "blockLocation is null");
try {
this.hosts = ImmutableList.copyOf(blockLocation.getHosts());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
this.offset = blockLocation.getOffset();
this.length = blockLocation.getLength();
}

public List<String> getHosts()
{
return hosts;
}

public long getOffset()
{
return offset;
}

public long getLength()
{
return length;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BlockLocation that = (BlockLocation) o;
return offset == that.offset
&& length == that.length
&& hosts.equals(that.hosts);
}

@Override
public int hashCode()
{
return Objects.hash(hosts, offset, length);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("hosts", hosts)
.add("offset", offset)
.add("length", length)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.weakref.jmx.Managed;
Expand Down Expand Up @@ -89,28 +88,28 @@ private static SchemaTablePrefix parseTableName(String tableName)
}

@Override
public RemoteIterator<LocatedFileStatus> list(FileSystem fs, Table table, Path path)
public RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException
{
if (!isCacheEnabledFor(table.getSchemaTableName())) {
return fs.listLocatedStatus(path);
return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path));
}

return listInternal(fs, new DirectoryListingCacheKey(path, false));
}

@Override
public RemoteIterator<LocatedFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
public RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
throws IOException
{
if (!isCacheEnabledFor(table.getSchemaTableName())) {
return fs.listFiles(path, true);
return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true));
}

return listInternal(fs, new DirectoryListingCacheKey(path, true));
}

private RemoteIterator<LocatedFileStatus> listInternal(FileSystem fs, DirectoryListingCacheKey cacheKey)
private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, DirectoryListingCacheKey cacheKey)
throws IOException
{
ValueHolder cachedValueHolder = uncheckedCacheGet(cache, cacheKey, ValueHolder::new);
Expand All @@ -121,13 +120,13 @@ private RemoteIterator<LocatedFileStatus> listInternal(FileSystem fs, DirectoryL
return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, cacheKey), cacheKey);
}

private static RemoteIterator<LocatedFileStatus> createListingRemoteIterator(FileSystem fs, DirectoryListingCacheKey cacheKey)
private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(FileSystem fs, DirectoryListingCacheKey cacheKey)
throws IOException
{
if (cacheKey.isRecursiveFilesOnly()) {
return fs.listFiles(cacheKey.getPath(), true);
return new TrinoFileStatusRemoteIterator(fs.listFiles(cacheKey.getPath(), true));
}
return fs.listLocatedStatus(cacheKey.getPath());
return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(cacheKey.getPath()));
}

@Override
Expand All @@ -152,11 +151,11 @@ public void invalidate(Partition partition)
}
}

private RemoteIterator<LocatedFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<LocatedFileStatus> iterator, DirectoryListingCacheKey key)
private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<TrinoFileStatus> iterator, DirectoryListingCacheKey key)
{
return new RemoteIterator<>()
{
private final List<LocatedFileStatus> files = new ArrayList<>();
private final List<TrinoFileStatus> files = new ArrayList<>();

@Override
public boolean hasNext()
Expand All @@ -172,10 +171,10 @@ public boolean hasNext()
}

@Override
public LocatedFileStatus next()
public TrinoFileStatus next()
throws IOException
{
LocatedFileStatus next = iterator.next();
TrinoFileStatus next = iterator.next();
files.add(next);
return next;
}
Expand Down Expand Up @@ -249,19 +248,19 @@ private static boolean isLocationPresent(Storage storage)
*/
private static class ValueHolder
{
private final Optional<List<LocatedFileStatus>> files;
private final Optional<List<TrinoFileStatus>> files;

public ValueHolder()
{
files = Optional.empty();
}

public ValueHolder(List<LocatedFileStatus> files)
public ValueHolder(List<TrinoFileStatus> files)
{
this.files = Optional.of(ImmutableList.copyOf(requireNonNull(files, "files is null")));
}

public Optional<List<LocatedFileStatus>> getFiles()
public Optional<List<TrinoFileStatus>> getFiles()
{
return files;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.trino.plugin.hive.TableInvalidationCallback;
import io.trino.plugin.hive.metastore.Table;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

Expand All @@ -25,9 +24,9 @@
public interface DirectoryLister
extends TableInvalidationCallback
{
RemoteIterator<LocatedFileStatus> list(FileSystem fs, Table table, Path path)
RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException;

RemoteIterator<LocatedFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
throws IOException;
}
Loading