Skip to content

Commit

Permalink
Avoid extra TrinoInputFile#lastModified calls in hive filesystem cache
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Sep 11, 2024
1 parent 8afdb09 commit d96dfb9
Show file tree
Hide file tree
Showing 13 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getStart(),
hiveSplit.getLength(),
hiveSplit.getEstimatedFileSize(),
hiveSplit.getFileModifiedTime(),
hiveSplit.getSchema(),
hiveTable.getCompactEffectivePredicate().intersect(
dynamicFilter.getCurrentPredicate().transformKeys(HiveColumnHandle.class::cast))
Expand Down Expand Up @@ -175,6 +176,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
TypeManager typeManager,
Expand Down Expand Up @@ -205,6 +207,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
start,
length,
estimatedFileSize,
fileModifiedTime,
schema,
desiredColumns,
effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,6 +174,7 @@ public Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand Down Expand Up @@ -200,6 +202,7 @@ public Optional<ReaderPageSource> createPageSource(
start,
length,
estimatedFileSize,
fileModifiedTime,
readerColumnHandles,
columns,
isUseOrcColumnNames(session),
Expand Down Expand Up @@ -230,6 +233,7 @@ private ConnectorPageSource createOrcPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
List<HiveColumnHandle> columns,
List<HiveColumnHandle> projections,
boolean useOrcColumnNames,
Expand All @@ -253,7 +257,7 @@ private ConnectorPageSource createOrcPageSource(
boolean originalFilesPresent = acidInfo.isPresent() && !acidInfo.get().getOriginalFiles().isEmpty();
try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize);
TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize, Instant.ofEpochMilli(fileModifiedTime));
orcDataSource = new HdfsOrcDataSource(
new OrcDataSourceId(path.toString()),
estimatedFileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -160,6 +161,7 @@ public Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand All @@ -175,7 +177,7 @@ public Optional<ReaderPageSource> createPageSource(
checkArgument(acidInfo.isEmpty(), "Acid is not supported");

TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize);
TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize, Instant.ofEpochMilli(fileModifiedTime));

return Optional.of(createPageSource(
inputFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
long fileModifiedTime,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,12 @@ public void testCacheFileOperations()
.add(new CacheOperation("Input.readFully", "key=p2/"))
.add(new CacheOperation("Alluxio.writeCache", "key=p1/"))
.add(new CacheOperation("Alluxio.writeCache", "key=p2/"))
.add(new CacheOperation("InputFile.lastModified", "key=p1/"))
.add(new CacheOperation("InputFile.lastModified", "key=p2/"))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "key=p1/"))
.add(new CacheOperation("Alluxio.readCached", "key=p2/"))
.add(new CacheOperation("InputFile.lastModified", "key=p1/"))
.add(new CacheOperation("InputFile.lastModified", "key=p2/"))
.build());
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('3-xyz', 'p3')", 1);
assertUpdate("INSERT INTO test_cache_file_operations VALUES ('4-xyz', 'p4')", 1);
Expand All @@ -112,11 +108,6 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.writeCache", "key=p3/"))
.add(new CacheOperation("Alluxio.writeCache", "key=p4/"))
.add(new CacheOperation("Alluxio.writeCache", "key=p5/"))
.add(new CacheOperation("InputFile.lastModified", "key=p1/"))
.add(new CacheOperation("InputFile.lastModified", "key=p2/"))
.add(new CacheOperation("InputFile.lastModified", "key=p3/"))
.add(new CacheOperation("InputFile.lastModified", "key=p4/"))
.add(new CacheOperation("InputFile.lastModified", "key=p5/"))
.build());
assertFileSystemAccesses(
"SELECT * FROM test_cache_file_operations",
Expand All @@ -126,11 +117,6 @@ public void testCacheFileOperations()
.add(new CacheOperation("Alluxio.readCached", "key=p3/"))
.add(new CacheOperation("Alluxio.readCached", "key=p4/"))
.add(new CacheOperation("Alluxio.readCached", "key=p5/"))
.add(new CacheOperation("InputFile.lastModified", "key=p1/"))
.add(new CacheOperation("InputFile.lastModified", "key=p2/"))
.add(new CacheOperation("InputFile.lastModified", "key=p3/"))
.add(new CacheOperation("InputFile.lastModified", "key=p4/"))
.add(new CacheOperation("InputFile.lastModified", "key=p5/"))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ private static void testPageSourceFactory(
0,
fileSize,
paddedFileSize,
12345,
splitProperties,
TupleDomain.all(),
TESTING_TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec
fileSplit.getStart(),
fileSplit.getLength(),
fileSplit.getLength(),
12345,
schema,
TupleDomain.all(),
TESTING_TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ private static List<Nation> readFile(
0,
fileSize,
fileSize,
12345,
createSchema(),
columnHandles,
tupleDomain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.memory.MemoryFileSystemFactory;
import io.trino.metastore.HiveType;
import io.trino.orc.OrcReaderOptions;
Expand Down Expand Up @@ -160,7 +161,8 @@ private static ConnectorPageSource createPageSource(
{
OrcPageSourceFactory readerFactory = new OrcPageSourceFactory(new OrcReaderOptions(), fileSystemFactory, STATS, UTC);

long length = fileSystemFactory.create(session).newInputFile(location).length();
TrinoInputFile inputFile = fileSystemFactory.create(session).newInputFile(location);
long length = inputFile.length();
List<HivePageSourceProvider.ColumnMapping> columnMappings = buildColumnMappings(
"",
ImmutableList.of(),
Expand All @@ -180,6 +182,7 @@ private static ConnectorPageSource createPageSource(
0,
length,
length,
inputFile.lastModified().toEpochMilli(),
getTableProperties(),
effectivePredicate,
TESTING_TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private static ConnectorPageSource createPageSource(ConnectorSession session, Fi
0,
parquetFile.length(),
parquetFile.length(),
parquetFile.lastModified(),
ImmutableMap.of(SERIALIZATION_LIB, HiveStorageFormat.PARQUET.getSerde()),
columns,
domain,
Expand Down

0 comments on commit d96dfb9

Please sign in to comment.