Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid extra TrinoInputFile#lastModified calls in hive filesystem cache #23327

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.filesystem.TrinoOutputFile;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
Expand Down Expand Up @@ -96,15 +97,23 @@ public TrinoInputFile newInputFile(Location location)
{
AzureLocation azureLocation = new AzureLocation(location);
BlobClient client = createBlobClient(azureLocation);
return new AzureInputFile(azureLocation, OptionalLong.empty(), client, readBlockSizeBytes);
return new AzureInputFile(azureLocation, OptionalLong.empty(), Optional.empty(), client, readBlockSizeBytes);
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
AzureLocation azureLocation = new AzureLocation(location);
BlobClient client = createBlobClient(azureLocation);
return new AzureInputFile(azureLocation, OptionalLong.of(length), client, readBlockSizeBytes);
return new AzureInputFile(azureLocation, OptionalLong.of(length), Optional.empty(), client, readBlockSizeBytes);
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
AzureLocation azureLocation = new AzureLocation(location);
BlobClient client = createBlobClient(azureLocation);
return new AzureInputFile(azureLocation, OptionalLong.of(length), Optional.of(lastModified), client, readBlockSizeBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ class AzureInputFile
private final int readBlockSizeBytes;

private OptionalLong length;
private Optional<Instant> lastModified = Optional.empty();
private Optional<Instant> lastModified;

public AzureInputFile(AzureLocation location, OptionalLong length, BlobClient blobClient, int readBlockSizeBytes)
public AzureInputFile(AzureLocation location, OptionalLong length, Optional<Instant> lastModified, BlobClient blobClient, int readBlockSizeBytes)
{
this.location = requireNonNull(location, "location is null");
location.location().verifyValidFileLocation();
this.length = requireNonNull(length, "length is null");
this.lastModified = requireNonNull(lastModified, "lastModified is null");
this.blobClient = requireNonNull(blobClient, "blobClient is null");
checkArgument(readBlockSizeBytes >= 0, "readBlockSizeBytes is negative");
this.readBlockSizeBytes = readBlockSizeBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -84,15 +85,23 @@ public TrinoInputFile newInputFile(Location location)
{
GcsLocation gcsLocation = new GcsLocation(location);
checkIsValidFile(gcsLocation);
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty());
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
GcsLocation gcsLocation = new GcsLocation(location);
checkIsValidFile(gcsLocation);
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length));
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
GcsLocation gcsLocation = new GcsLocation(location);
checkIsValidFile(gcsLocation);
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ public class GcsInputFile
private final int readBlockSize;
private final OptionalLong predeclaredLength;
private OptionalLong length;
private Optional<Instant> lastModified = Optional.empty();
private Optional<Instant> lastModified;

public GcsInputFile(GcsLocation location, Storage storage, int readBockSize, OptionalLong predeclaredLength)
public GcsInputFile(GcsLocation location, Storage storage, int readBockSize, OptionalLong predeclaredLength, Optional<Instant> lastModified)
{
this.location = requireNonNull(location, "location is null");
this.storage = requireNonNull(storage, "storage is null");
this.readBlockSize = readBockSize;
this.predeclaredLength = requireNonNull(predeclaredLength, "length is null");
this.length = OptionalLong.empty();
this.lastModified = requireNonNull(lastModified, "lastModified is null");
}

@Override
Expand Down Expand Up @@ -108,7 +109,9 @@ private void loadProperties()
Blob blob = getBlobOrThrow(storage, location);
try {
length = OptionalLong.of(blob.getSize());
lastModified = Optional.of(Instant.from(blob.getUpdateTimeOffsetDateTime()));
if (lastModified.isEmpty()) {
lastModified = Optional.of(Instant.from(blob.getUpdateTimeOffsetDateTime()));
}
}
catch (RuntimeException e) {
throw handleGcsException(e, "fetching properties for file", location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -79,13 +80,19 @@ public S3FileSystem(Executor uploadExecutor, S3Client client, S3Presigner preSig
@Override
public TrinoInputFile newInputFile(Location location)
{
return new S3InputFile(client, context, new S3Location(location), null);
return new S3InputFile(client, context, new S3Location(location), null, null);
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new S3InputFile(client, context, new S3Location(location), length);
return new S3InputFile(client, context, new S3Location(location), length, null);
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
return new S3InputFile(client, context, new S3Location(location), length, lastModified);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ final class S3InputFile
private Long length;
private Instant lastModified;

public S3InputFile(S3Client client, S3Context context, S3Location location, Long length)
public S3InputFile(S3Client client, S3Context context, S3Location location, Long length, Instant lastModified)
{
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
this.length = length;
this.lastModified = lastModified;
location.location().verifyValidFileLocation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -72,6 +73,17 @@ public interface TrinoFileSystem
*/
TrinoInputFile newInputFile(Location location, long length);

/**
* Creates a TrinoInputFile with a predeclared length and lastModifiedTime which can be used to read the file data.
* The length will be returned from {@link TrinoInputFile#length()} and the actual file length
* will never be checked. The lastModified will be returned from {@link TrinoInputFile#lastModified()} and the
* actual file last modified time will never be checked. The file location path cannot be empty, and must not end
* with a slash or whitespace.
*
* @throws IllegalArgumentException if location is not valid for this file system
*/
TrinoInputFile newInputFile(Location location, long length, Instant lastModified);

/**
* Creates a TrinoOutputFile which can be used to create or overwrite the file. The file
* location path cannot be empty, and must not end with a slash or whitespace.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import java.util.OptionalLong;
Expand All @@ -45,13 +46,19 @@ public CacheFileSystem(TrinoFileSystem delegate, TrinoFileSystemCache cache, Cac
@Override
public TrinoInputFile newInputFile(Location location)
{
return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider, OptionalLong.empty());
return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider, OptionalLong.empty(), Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider, OptionalLong.of(length));
return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider, OptionalLong.of(length), Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
return new CacheInputFile(delegate.newInputFile(location, length, lastModified), cache, keyProvider, OptionalLong.of(length), Optional.of(lastModified));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public final class CacheInputFile
private final TrinoFileSystemCache cache;
private final CacheKeyProvider keyProvider;
private OptionalLong length;
private Optional<Instant> lastModified;

public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider, OptionalLong length)
public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider, OptionalLong length, Optional<Instant> lastModified)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cache = requireNonNull(cache, "cache is null");
this.keyProvider = requireNonNull(keyProvider, "keyProvider is null");
this.length = requireNonNull(length, "length is null");
this.lastModified = requireNonNull(lastModified, "lastModified is null");
}

@Override
Expand Down Expand Up @@ -83,7 +85,10 @@ public long length()
public Instant lastModified()
throws IOException
{
return delegate.lastModified();
if (lastModified.isEmpty()) {
lastModified = Optional.of(delegate.lastModified());
}
return lastModified.orElseThrow();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
Expand Down Expand Up @@ -60,7 +61,13 @@ public TrinoInputFile newInputFile(Location location)
@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new LocalInputFile(location, toFilePath(location), length);
return new LocalInputFile(location, toFilePath(location), length, null);
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
return new LocalInputFile(location, toFilePath(location), length, lastModified);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,24 @@ public class LocalInputFile
{
private final Location location;
private final Path path;
private OptionalLong length = OptionalLong.empty();
private Optional<Instant> lastModified = Optional.empty();
private OptionalLong length;
private Optional<Instant> lastModified;

public LocalInputFile(Location location, Path path)
{
this.location = requireNonNull(location, "location is null");
this.path = requireNonNull(path, "path is null");
this.length = OptionalLong.empty();
this.lastModified = Optional.empty();
}

public LocalInputFile(Location location, Path path, long length)
public LocalInputFile(Location location, Path path, long length, Instant lastModified)
{
this.location = requireNonNull(location, "location is null");
this.path = requireNonNull(path, "path is null");
checkArgument(length >= 0, "length is negative");
this.length = OptionalLong.of(length);
this.lastModified = Optional.ofNullable(lastModified);
}

public LocalInputFile(File file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.time.Instant;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalLong;
Expand Down Expand Up @@ -54,14 +55,21 @@ public boolean isEmpty()
public TrinoInputFile newInputFile(Location location)
{
String key = toBlobKey(location);
return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.empty());
return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.empty(), Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
String key = toBlobKey(location);
return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.of(length));
return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.of(length), Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
String key = toBlobKey(location);
return new MemoryInputFile(location, () -> blobs.get(key), OptionalLong.of(length), Optional.of(lastModified));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ public class MemoryInputFile
private final Location location;
private final Supplier<MemoryBlob> dataSupplier;
private OptionalLong length;
private Optional<Instant> lastModified = Optional.empty();
private Optional<Instant> lastModified;

public MemoryInputFile(Location location, Slice data)
{
this(location, () -> new MemoryBlob(data), OptionalLong.of(data.length()));
this(location, () -> new MemoryBlob(data), OptionalLong.of(data.length()), Optional.empty());
}

public MemoryInputFile(Location location, Supplier<MemoryBlob> dataSupplier, OptionalLong length)
public MemoryInputFile(Location location, Supplier<MemoryBlob> dataSupplier, OptionalLong length, Optional<Instant> lastModified)
{
this.location = requireNonNull(location, "location is null");
this.dataSupplier = requireNonNull(dataSupplier, "dataSupplier is null");
this.length = requireNonNull(length, "length is null");
this.lastModified = requireNonNull(lastModified, "lastModified is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.security.ConnectorIdentity;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -62,6 +63,12 @@ public TrinoInputFile newInputFile(Location location, long length)
return fileSystem(location).newInputFile(location, length);
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
return fileSystem(location).newInputFile(location, length, lastModified);
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
Expand Down
Loading
Loading