From 8afdb09088a2f2b64b4d004c9edcc73286d9d269 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Sat, 7 Sep 2024 11:41:08 +0530 Subject: [PATCH 1/2] Add TrinoFileSystem#newInputFile API to allow reusing lastModified time --- .../filesystem/azure/AzureFileSystem.java | 13 ++++++- .../filesystem/azure/AzureInputFile.java | 5 ++- .../trino/filesystem/gcs/GcsFileSystem.java | 13 ++++++- .../io/trino/filesystem/gcs/GcsInputFile.java | 9 +++-- .../io/trino/filesystem/s3/S3FileSystem.java | 11 +++++- .../io/trino/filesystem/s3/S3InputFile.java | 3 +- .../io/trino/filesystem/TrinoFileSystem.java | 12 ++++++ .../filesystem/cache/CacheFileSystem.java | 11 +++++- .../filesystem/cache/CacheInputFile.java | 9 ++++- .../filesystem/local/LocalFileSystem.java | 9 ++++- .../filesystem/local/LocalInputFile.java | 9 +++-- .../filesystem/memory/MemoryFileSystem.java | 12 +++++- .../filesystem/memory/MemoryInputFile.java | 7 ++-- .../switching/SwitchingFileSystem.java | 7 ++++ .../filesystem/tracing/TracingFileSystem.java | 11 +++++- .../filesystem/tracing/TracingInputFile.java | 9 +++-- .../AbstractTestTrinoFileSystem.java | 33 +++++++++++++++++ .../trino/filesystem/hdfs/HdfsFileSystem.java | 11 +++++- .../trino/filesystem/hdfs/HdfsInputFile.java | 37 +++++++++++-------- .../hive/TestBackgroundHiveSplitLoader.java | 6 +++ 20 files changed, 189 insertions(+), 48 deletions(-) diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java index d3e817fd54ce..00567abed13f 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java @@ -43,6 +43,7 @@ import io.trino.filesystem.TrinoOutputFile; import java.io.IOException; +import java.time.Instant; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -96,7 +97,7 @@ public TrinoInputFile newInputFile(Location location) { AzureLocation azureLocation = new AzureLocation(location); BlobClient client = createBlobClient(azureLocation); - return new AzureInputFile(azureLocation, OptionalLong.empty(), client, readBlockSizeBytes); + return new AzureInputFile(azureLocation, OptionalLong.empty(), Optional.empty(), client, readBlockSizeBytes); } @Override @@ -104,7 +105,15 @@ public TrinoInputFile newInputFile(Location location, long length) { AzureLocation azureLocation = new AzureLocation(location); BlobClient client = createBlobClient(azureLocation); - return new AzureInputFile(azureLocation, OptionalLong.of(length), client, readBlockSizeBytes); + return new AzureInputFile(azureLocation, OptionalLong.of(length), Optional.empty(), client, readBlockSizeBytes); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + AzureLocation azureLocation = new AzureLocation(location); + BlobClient client = createBlobClient(azureLocation); + return new AzureInputFile(azureLocation, OptionalLong.of(length), Optional.of(lastModified), client, readBlockSizeBytes); } @Override diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureInputFile.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureInputFile.java index dbf14c9f62fc..653cf0365baa 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureInputFile.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureInputFile.java @@ -37,13 +37,14 @@ class AzureInputFile private final int readBlockSizeBytes; private OptionalLong length; - private Optional lastModified = Optional.empty(); + private Optional lastModified; - public AzureInputFile(AzureLocation location, OptionalLong length, BlobClient blobClient, int readBlockSizeBytes) + public AzureInputFile(AzureLocation location, OptionalLong length, Optional lastModified, BlobClient blobClient, int readBlockSizeBytes) { this.location = requireNonNull(location, "location is null"); location.location().verifyValidFileLocation(); this.length = requireNonNull(length, "length is null"); + this.lastModified = requireNonNull(lastModified, "lastModified is null"); this.blobClient = requireNonNull(blobClient, "blobClient is null"); checkArgument(readBlockSizeBytes >= 0, "readBlockSizeBytes is negative"); this.readBlockSizeBytes = readBlockSizeBytes; diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index a27eb76425c2..10518b43665e 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -84,7 +85,7 @@ public TrinoInputFile newInputFile(Location location) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty()); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty()); } @Override @@ -92,7 +93,15 @@ public TrinoInputFile newInputFile(Location location, long length) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length)); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + GcsLocation gcsLocation = new GcsLocation(location); + checkIsValidFile(gcsLocation); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified)); } @Override diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java index 71f14f8d203f..f961cdb1a975 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java @@ -38,15 +38,16 @@ public class GcsInputFile private final int readBlockSize; private final OptionalLong predeclaredLength; private OptionalLong length; - private Optional lastModified = Optional.empty(); + private Optional lastModified; - public GcsInputFile(GcsLocation location, Storage storage, int readBockSize, OptionalLong predeclaredLength) + public GcsInputFile(GcsLocation location, Storage storage, int readBockSize, OptionalLong predeclaredLength, Optional lastModified) { this.location = requireNonNull(location, "location is null"); this.storage = requireNonNull(storage, "storage is null"); this.readBlockSize = readBockSize; this.predeclaredLength = requireNonNull(predeclaredLength, "length is null"); this.length = OptionalLong.empty(); + this.lastModified = requireNonNull(lastModified, "lastModified is null"); } @Override @@ -108,7 +109,9 @@ private void loadProperties() Blob blob = getBlobOrThrow(storage, location); try { length = OptionalLong.of(blob.getSize()); - lastModified = Optional.of(Instant.from(blob.getUpdateTimeOffsetDateTime())); + if (lastModified.isEmpty()) { + lastModified = Optional.of(Instant.from(blob.getUpdateTimeOffsetDateTime())); + } } catch (RuntimeException e) { throw handleGcsException(e, "fetching properties for file", location); diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index 5064e9cf5ce2..c4526e7186f7 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.net.URISyntaxException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -79,13 +80,19 @@ public S3FileSystem(Executor uploadExecutor, S3Client client, S3Presigner preSig @Override public TrinoInputFile newInputFile(Location location) { - return new S3InputFile(client, context, new S3Location(location), null); + return new S3InputFile(client, context, new S3Location(location), null, null); } @Override public TrinoInputFile newInputFile(Location location, long length) { - return new S3InputFile(client, context, new S3Location(location), length); + return new S3InputFile(client, context, new S3Location(location), length, null); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return new S3InputFile(client, context, new S3Location(location), length, lastModified); } @Override diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java index fe8ac5a271d5..c7a70da308ed 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java @@ -42,13 +42,14 @@ final class S3InputFile private Long length; private Instant lastModified; - public S3InputFile(S3Client client, S3Context context, S3Location location, Long length) + public S3InputFile(S3Client client, S3Context context, S3Location location, Long length, Instant lastModified) { this.client = requireNonNull(client, "client is null"); this.location = requireNonNull(location, "location is null"); this.context = requireNonNull(context, "context is null"); this.requestPayer = context.requestPayer(); this.length = length; + this.lastModified = lastModified; location.location().verifyValidFileLocation(); } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 629924bc3b58..8aae52e99dbf 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -18,6 +18,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -72,6 +73,17 @@ public interface TrinoFileSystem */ TrinoInputFile newInputFile(Location location, long length); + /** + * Creates a TrinoInputFile with a predeclared length and lastModifiedTime which can be used to read the file data. + * The length will be returned from {@link TrinoInputFile#length()} and the actual file length + * will never be checked. The lastModified will be returned from {@link TrinoInputFile#lastModified()} and the + * actual file last modified time will never be checked. The file location path cannot be empty, and must not end + * with a slash or whitespace. + * + * @throws IllegalArgumentException if location is not valid for this file system + */ + TrinoInputFile newInputFile(Location location, long length, Instant lastModified); + /** * Creates a TrinoOutputFile which can be used to create or overwrite the file. The file * location path cannot be empty, and must not end with a slash or whitespace. diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java index b2888ceeddff..cdf338825d48 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Instant; import java.util.Collection; import java.util.Optional; import java.util.OptionalLong; @@ -45,13 +46,19 @@ public CacheFileSystem(TrinoFileSystem delegate, TrinoFileSystemCache cache, Cac @Override public TrinoInputFile newInputFile(Location location) { - return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider, OptionalLong.empty()); + return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider, OptionalLong.empty(), Optional.empty()); } @Override public TrinoInputFile newInputFile(Location location, long length) { - return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider, OptionalLong.of(length)); + return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider, OptionalLong.of(length), Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return new CacheInputFile(delegate.newInputFile(location, length, lastModified), cache, keyProvider, OptionalLong.of(length), Optional.of(lastModified)); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java index a2fc973fde17..3c7f6e1fe3d2 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java @@ -32,13 +32,15 @@ public final class CacheInputFile private final TrinoFileSystemCache cache; private final CacheKeyProvider keyProvider; private OptionalLong length; + private Optional lastModified; - public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider, OptionalLong length) + public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider, OptionalLong length, Optional lastModified) { this.delegate = requireNonNull(delegate, "delegate is null"); this.cache = requireNonNull(cache, "cache is null"); this.keyProvider = requireNonNull(keyProvider, "keyProvider is null"); this.length = requireNonNull(length, "length is null"); + this.lastModified = requireNonNull(lastModified, "lastModified is null"); } @Override @@ -83,7 +85,10 @@ public long length() public Instant lastModified() throws IOException { - return delegate.lastModified(); + if (lastModified.isEmpty()) { + lastModified = Optional.of(delegate.lastModified()); + } + return lastModified.orElseThrow(); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java index 9e8b1298995b..b20d29a33d21 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.time.Instant; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -60,7 +61,13 @@ public TrinoInputFile newInputFile(Location location) @Override public TrinoInputFile newInputFile(Location location, long length) { - return new LocalInputFile(location, toFilePath(location), length); + return new LocalInputFile(location, toFilePath(location), length, null); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return new LocalInputFile(location, toFilePath(location), length, lastModified); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java index 29550549ea84..071f3bc7d1a4 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalInputFile.java @@ -35,21 +35,24 @@ public class LocalInputFile { private final Location location; private final Path path; - private OptionalLong length = OptionalLong.empty(); - private Optional lastModified = Optional.empty(); + private OptionalLong length; + private Optional lastModified; public LocalInputFile(Location location, Path path) { this.location = requireNonNull(location, "location is null"); this.path = requireNonNull(path, "path is null"); + this.length = OptionalLong.empty(); + this.lastModified = Optional.empty(); } - public LocalInputFile(Location location, Path path, long length) + public LocalInputFile(Location location, Path path, long length, Instant lastModified) { this.location = requireNonNull(location, "location is null"); this.path = requireNonNull(path, "path is null"); checkArgument(length >= 0, "length is negative"); this.length = OptionalLong.of(length); + this.lastModified = Optional.ofNullable(lastModified); } public LocalInputFile(File file) diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java index aab9063b3ece..e479ce9ca6c0 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.file.FileAlreadyExistsException; +import java.time.Instant; import java.util.Iterator; import java.util.Optional; import java.util.OptionalLong; @@ -54,14 +55,21 @@ public boolean isEmpty() public TrinoInputFile newInputFile(Location location) { String key = toBlobKey(location); - return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.empty()); + return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.empty(), Optional.empty()); } @Override public TrinoInputFile newInputFile(Location location, long length) { String key = toBlobKey(location); - return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.of(length)); + return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.of(length), Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + String key = toBlobKey(location); + return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.of(length), Optional.of(lastModified)); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java index 5cf15a28bb0e..3f4b548bc327 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryInputFile.java @@ -34,18 +34,19 @@ public class MemoryInputFile private final Location location; private final Supplier dataSupplier; private OptionalLong length; - private Optional lastModified = Optional.empty(); + private Optional lastModified; public MemoryInputFile(Location location, Slice data) { - this(location, () -> new MemoryBlob(data), OptionalLong.of(data.length())); + this(location, () -> new MemoryBlob(data), OptionalLong.of(data.length()), Optional.empty()); } - public MemoryInputFile(Location location, Supplier dataSupplier, OptionalLong length) + public MemoryInputFile(Location location, Supplier dataSupplier, OptionalLong length, Optional lastModified) { this.location = requireNonNull(location, "location is null"); this.dataSupplier = requireNonNull(dataSupplier, "dataSupplier is null"); this.length = requireNonNull(length, "length is null"); + this.lastModified = requireNonNull(lastModified, "lastModified is null"); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java index 96d2031998a9..58626251981e 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java @@ -23,6 +23,7 @@ import io.trino.spi.security.ConnectorIdentity; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -62,6 +63,12 @@ public TrinoInputFile newInputFile(Location location, long length) return fileSystem(location).newInputFile(location, length); } + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return fileSystem(location).newInputFile(location, length, lastModified); + } + @Override public TrinoOutputFile newOutputFile(Location location) { diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java index 3bb24f2ee7af..5dd90fcb1abd 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java @@ -24,6 +24,7 @@ import io.trino.filesystem.UriLocation; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -46,13 +47,19 @@ public TracingFileSystem(Tracer tracer, TrinoFileSystem delegate) @Override public TrinoInputFile newInputFile(Location location) { - return new TracingInputFile(tracer, delegate.newInputFile(location), Optional.empty()); + return new TracingInputFile(tracer, delegate.newInputFile(location), Optional.empty(), Optional.empty()); } @Override public TrinoInputFile newInputFile(Location location, long length) { - return new TracingInputFile(tracer, delegate.newInputFile(location, length), Optional.of(length)); + return new TracingInputFile(tracer, delegate.newInputFile(location, length), Optional.of(length), Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return new TracingInputFile(tracer, delegate.newInputFile(location, length, lastModified), Optional.of(length), Optional.of(lastModified)); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java index 313b1abf38c4..69415c5da8f5 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java @@ -34,13 +34,14 @@ final class TracingInputFile private final Tracer tracer; private final TrinoInputFile delegate; private Optional length; - private boolean lastModifiedRequested; + private boolean isLastModifiedKnown; - public TracingInputFile(Tracer tracer, TrinoInputFile delegate, Optional length) + public TracingInputFile(Tracer tracer, TrinoInputFile delegate, Optional length, Optional lastModified) { this.tracer = requireNonNull(tracer, "tracer is null"); this.delegate = requireNonNull(delegate, "delegate is null"); this.length = requireNonNull(length, "length is null"); + this.isLastModifiedKnown = lastModified.isPresent(); } @Override @@ -87,7 +88,7 @@ public Instant lastModified() throws IOException { // skip tracing if lastModified is cached, but delegate anyway - if (lastModifiedRequested) { + if (isLastModifiedKnown) { return delegate.lastModified(); } @@ -95,7 +96,7 @@ public Instant lastModified() .setAttribute(FileSystemAttributes.FILE_LOCATION, toString()) .startSpan(); Instant fileLastModified = withTracing(span, delegate::lastModified); - lastModifiedRequested = true; + isLastModifiedKnown = true; return fileLastModified; } diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java index 68aa73c93759..12690bded366 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java @@ -228,6 +228,39 @@ void testInputFileWithLengthMetadata() } } + @Test + void testInputFileWithLastModifiedMetadata() + throws IOException + { + try (TempBlob tempBlob = randomBlobLocation("inputFileWithLastModifiedMetadata")) { + TrinoInputFile inputFile = getFileSystem().newInputFile(tempBlob.location(), 22, Instant.ofEpochMilli(12345)); + assertThat(inputFile.exists()).isFalse(); + + // getting length for non-existent file returns pre-declared length + assertThat(inputFile.length()).isEqualTo(22); + // getting modified time for non-existent file returns pre-declared modified time + assertThat(inputFile.lastModified()).isEqualTo(Instant.ofEpochMilli(12345)); + // double-check the length did not change in call above + assertThat(inputFile.length()).isEqualTo(22); + + tempBlob.createOrOverwrite("123456"); + + // length always returns the pre-declared length + assertThat(inputFile.length()).isEqualTo(22); + // modified time always returns the pre-declared length + assertThat(inputFile.lastModified()).isEqualTo(Instant.ofEpochMilli(12345)); + // double-check the length did not change when metadata was loaded + assertThat(inputFile.length()).isEqualTo(22); + + // delete file and verify that exists check is not cached + tempBlob.close(); + assertThat(inputFile.exists()).isFalse(); + // input file caches metadata, so results will be unchanged after delete + assertThat(inputFile.length()).isEqualTo(22); + assertThat(inputFile.lastModified()).isEqualTo(Instant.ofEpochMilli(12345)); + } + } + @Test public void testInputFile() throws IOException diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java index 61cdfa4aae67..5adaaba2c608 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java @@ -34,6 +34,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.IdentityHashMap; import java.util.List; @@ -81,13 +82,19 @@ public HdfsFileSystem(HdfsEnvironment environment, HdfsContext context, TrinoHdf @Override public TrinoInputFile newInputFile(Location location) { - return new HdfsInputFile(location, null, environment, context, stats.getOpenFileCalls()); + return new HdfsInputFile(location, null, null, environment, context, stats.getOpenFileCalls()); } @Override public TrinoInputFile newInputFile(Location location, long length) { - return new HdfsInputFile(location, length, environment, context, stats.getOpenFileCalls()); + return new HdfsInputFile(location, length, null, environment, context, stats.getOpenFileCalls()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return new HdfsInputFile(location, length, lastModified, environment, context, stats.getOpenFileCalls()); } @Override diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java index d1281644a740..24ceec67b91e 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsInputFile.java @@ -44,9 +44,9 @@ class HdfsInputFile private final Path file; private final CallStats openFileCallStat; private Long length; - private FileStatus status; + private Instant lastModified; - public HdfsInputFile(Location location, Long length, HdfsEnvironment environment, HdfsContext context, CallStats openFileCallStat) + public HdfsInputFile(Location location, Long length, Instant lastModified, HdfsEnvironment environment, HdfsContext context, CallStats openFileCallStat) { this.location = requireNonNull(location, "location is null"); this.environment = requireNonNull(environment, "environment is null"); @@ -55,6 +55,7 @@ public HdfsInputFile(Location location, Long length, HdfsEnvironment environment this.file = hadoopPath(location); this.length = length; checkArgument(length == null || length >= 0, "length is negative"); + this.lastModified = lastModified; location.verifyValidFileLocation(); } @@ -77,7 +78,7 @@ public long length() throws IOException { if (length == null) { - length = lazyStatus().getLen(); + loadFileStatus(); } return length; } @@ -86,7 +87,10 @@ public long length() public Instant lastModified() throws IOException { - return Instant.ofEpochMilli(lazyStatus().getModificationTime()); + if (lastModified == null) { + loadFileStatus(); + } + return requireNonNull(lastModified, "lastModified is null"); } @Override @@ -128,21 +132,24 @@ private FSDataInputStream openFile() }); } - private FileStatus lazyStatus() + private void loadFileStatus() throws IOException { - if (status == null) { - FileSystem fileSystem = environment.getFileSystem(context, file); - try { - status = environment.doAs(context.getIdentity(), () -> fileSystem.getFileStatus(file)); - } - catch (FileNotFoundException e) { - throw withCause(new FileNotFoundException(toString()), e); + FileSystem fileSystem = environment.getFileSystem(context, file); + try { + FileStatus status = environment.doAs(context.getIdentity(), () -> fileSystem.getFileStatus(file)); + if (length == null) { + length = status.getLen(); } - catch (IOException e) { - throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e); + if (lastModified == null) { + lastModified = Instant.ofEpochMilli(status.getModificationTime()); } } - return status; + catch (FileNotFoundException e) { + throw withCause(new FileNotFoundException(toString()), e); + } + catch (IOException e) { + throw new IOException("Get status for file %s failed: %s" .formatted(location, e.getMessage()), e); + } } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 51dac62efeef..27f6563b24f7 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -1374,6 +1374,12 @@ public TrinoInputFile newInputFile(Location location, long length) throw new UnsupportedOperationException(); } + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + throw new UnsupportedOperationException(); + } + @Override public TrinoOutputFile newOutputFile(Location location) { From d96dfb95ae242bf53f9c9783c985ed9ce5f3c8fb Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Sun, 8 Sep 2024 13:15:16 +0530 Subject: [PATCH 2/2] Avoid extra TrinoInputFile#lastModified calls in hive filesystem cache --- .../trino/plugin/hive/HivePageSourceFactory.java | 1 + .../trino/plugin/hive/HivePageSourceProvider.java | 3 +++ .../plugin/hive/avro/AvroPageSourceFactory.java | 1 + .../plugin/hive/line/LinePageSourceFactory.java | 1 + .../plugin/hive/orc/OrcPageSourceFactory.java | 6 +++++- .../hive/parquet/ParquetPageSourceFactory.java | 4 +++- .../hive/rcfile/RcFilePageSourceFactory.java | 1 + .../hive/TestHiveAlluxioCacheFileOperations.java | 14 -------------- .../io/trino/plugin/hive/TestHiveFileFormats.java | 1 + .../hive/TestOrcPageSourceMemoryTracking.java | 1 + .../plugin/hive/orc/TestOrcPageSourceFactory.java | 1 + .../trino/plugin/hive/orc/TestOrcPredicates.java | 5 ++++- .../io/trino/plugin/hive/parquet/ParquetUtil.java | 1 + 13 files changed, 23 insertions(+), 17 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java index 344063332a3b..52dfabaf2e08 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java @@ -31,6 +31,7 @@ Optional createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, List columns, TupleDomain effectivePredicate, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java index a0b815a41b88..325f2292da83 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java @@ -144,6 +144,7 @@ public ConnectorPageSource createPageSource( hiveSplit.getStart(), hiveSplit.getLength(), hiveSplit.getEstimatedFileSize(), + hiveSplit.getFileModifiedTime(), hiveSplit.getSchema(), hiveTable.getCompactEffectivePredicate().intersect( dynamicFilter.getCurrentPredicate().transformKeys(HiveColumnHandle.class::cast)) @@ -175,6 +176,7 @@ public static Optional createHivePageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, TupleDomain effectivePredicate, TypeManager typeManager, @@ -205,6 +207,7 @@ public static Optional createHivePageSource( start, length, estimatedFileSize, + fileModifiedTime, schema, desiredColumns, effectivePredicate, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java index 525654d81ef4..ac58e390e562 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroPageSourceFactory.java @@ -87,6 +87,7 @@ public Optional createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, List columns, TupleDomain effectivePredicate, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index 796d46bce451..7498bdc9a87b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -82,6 +82,7 @@ public Optional createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, List columns, TupleDomain effectivePredicate, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java index da7a6c1401ef..68f4fde033be 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java @@ -55,6 +55,7 @@ import org.joda.time.DateTimeZone; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -173,6 +174,7 @@ public Optional createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, List columns, TupleDomain effectivePredicate, @@ -200,6 +202,7 @@ public Optional createPageSource( start, length, estimatedFileSize, + fileModifiedTime, readerColumnHandles, columns, isUseOrcColumnNames(session), @@ -230,6 +233,7 @@ private ConnectorPageSource createOrcPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, List columns, List projections, boolean useOrcColumnNames, @@ -253,7 +257,7 @@ private ConnectorPageSource createOrcPageSource( boolean originalFilesPresent = acidInfo.isPresent() && !acidInfo.get().getOriginalFiles().isEmpty(); try { TrinoFileSystem fileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize); + TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize, Instant.ofEpochMilli(fileModifiedTime)); orcDataSource = new HdfsOrcDataSource( new OrcDataSourceId(path.toString()), estimatedFileSize, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index f16399c17d38..dcd88f2523d6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -61,6 +61,7 @@ import org.joda.time.DateTimeZone; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -160,6 +161,7 @@ public Optional createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, List columns, TupleDomain effectivePredicate, @@ -175,7 +177,7 @@ public Optional createPageSource( checkArgument(acidInfo.isEmpty(), "Acid is not supported"); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize); + TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize, Instant.ofEpochMilli(fileModifiedTime)); return Optional.of(createPageSource( inputFile, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java index 4ca50893bc93..3fe0d461804e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java @@ -94,6 +94,7 @@ public Optional createPageSource( long start, long length, long estimatedFileSize, + long fileModifiedTime, Map schema, List columns, TupleDomain effectivePredicate, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java index ad42ed7e328f..0d8b3955dd7a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.java @@ -84,16 +84,12 @@ public void testCacheFileOperations() .add(new CacheOperation("Input.readFully", "key=p2/")) .add(new CacheOperation("Alluxio.writeCache", "key=p1/")) .add(new CacheOperation("Alluxio.writeCache", "key=p2/")) - .add(new CacheOperation("InputFile.lastModified", "key=p1/")) - .add(new CacheOperation("InputFile.lastModified", "key=p2/")) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", ImmutableMultiset.builder() .add(new CacheOperation("Alluxio.readCached", "key=p1/")) .add(new CacheOperation("Alluxio.readCached", "key=p2/")) - .add(new CacheOperation("InputFile.lastModified", "key=p1/")) - .add(new CacheOperation("InputFile.lastModified", "key=p2/")) .build()); assertUpdate("INSERT INTO test_cache_file_operations VALUES ('3-xyz', 'p3')", 1); assertUpdate("INSERT INTO test_cache_file_operations VALUES ('4-xyz', 'p4')", 1); @@ -112,11 +108,6 @@ public void testCacheFileOperations() .add(new CacheOperation("Alluxio.writeCache", "key=p3/")) .add(new CacheOperation("Alluxio.writeCache", "key=p4/")) .add(new CacheOperation("Alluxio.writeCache", "key=p5/")) - .add(new CacheOperation("InputFile.lastModified", "key=p1/")) - .add(new CacheOperation("InputFile.lastModified", "key=p2/")) - .add(new CacheOperation("InputFile.lastModified", "key=p3/")) - .add(new CacheOperation("InputFile.lastModified", "key=p4/")) - .add(new CacheOperation("InputFile.lastModified", "key=p5/")) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", @@ -126,11 +117,6 @@ public void testCacheFileOperations() .add(new CacheOperation("Alluxio.readCached", "key=p3/")) .add(new CacheOperation("Alluxio.readCached", "key=p4/")) .add(new CacheOperation("Alluxio.readCached", "key=p5/")) - .add(new CacheOperation("InputFile.lastModified", "key=p1/")) - .add(new CacheOperation("InputFile.lastModified", "key=p2/")) - .add(new CacheOperation("InputFile.lastModified", "key=p3/")) - .add(new CacheOperation("InputFile.lastModified", "key=p4/")) - .add(new CacheOperation("InputFile.lastModified", "key=p5/")) .build()); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index f807a2e1a6a8..14b2dc78dc93 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -1007,6 +1007,7 @@ private static void testPageSourceFactory( 0, fileSize, paddedFileSize, + 12345, splitProperties, TupleDomain.all(), TESTING_TYPE_MANAGER, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java index 70019c71a196..c7595047ed4f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java @@ -577,6 +577,7 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec fileSplit.getStart(), fileSplit.getLength(), fileSplit.getLength(), + 12345, schema, TupleDomain.all(), TESTING_TYPE_MANAGER, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java index c36538dd220b..9a03efc52380 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java @@ -256,6 +256,7 @@ private static List readFile( 0, fileSize, fileSize, + 12345, createSchema(), columnHandles, tupleDomain, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java index faaf64abc117..bca72e5a8b6c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableSet; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.metastore.HiveType; import io.trino.orc.OrcReaderOptions; @@ -160,7 +161,8 @@ private static ConnectorPageSource createPageSource( { OrcPageSourceFactory readerFactory = new OrcPageSourceFactory(new OrcReaderOptions(), fileSystemFactory, STATS, UTC); - long length = fileSystemFactory.create(session).newInputFile(location).length(); + TrinoInputFile inputFile = fileSystemFactory.create(session).newInputFile(location); + long length = inputFile.length(); List columnMappings = buildColumnMappings( "", ImmutableList.of(), @@ -180,6 +182,7 @@ private static ConnectorPageSource createPageSource( 0, length, length, + inputFile.lastModified().toEpochMilli(), getTableProperties(), effectivePredicate, TESTING_TYPE_MANAGER, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java index 571b74c111a0..c1bba68fdd76 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java @@ -95,6 +95,7 @@ private static ConnectorPageSource createPageSource(ConnectorSession session, Fi 0, parquetFile.length(), parquetFile.length(), + parquetFile.lastModified(), ImmutableMap.of(SERIALIZATION_LIB, HiveStorageFormat.PARQUET.getSerde()), columns, domain,