Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alluxio cache #18719

Merged
merged 6 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import io.airlift.configuration.Config;

import static io.trino.filesystem.manager.FileSystemConfig.CacheType.NONE;

public class FileSystemConfig
{
private boolean hadoopEnabled = true;
private boolean nativeAzureEnabled;
private boolean nativeS3Enabled;
private boolean nativeGcsEnabled;
private CacheType cacheType = NONE;

public boolean isHadoopEnabled()
{
Expand Down Expand Up @@ -69,4 +72,21 @@ public FileSystemConfig setNativeGcsEnabled(boolean nativeGcsEnabled)
this.nativeGcsEnabled = nativeGcsEnabled;
return this;
}

public CacheType getCacheType()
{
return cacheType;
}

@Config("fs.cache")
public FileSystemConfig setCacheType(CacheType cacheType)
{
this.cacheType = cacheType;
return this;
}

public enum CacheType
{
NONE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemModule;
import io.trino.filesystem.cache.CacheFileSystemFactory;
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.filesystem.cache.DefaultCacheKeyProvider;
import io.trino.filesystem.cache.NoneCachingHostAddressProvider;
import io.trino.filesystem.cache.TrinoFileSystemCache;
import io.trino.filesystem.gcs.GcsFileSystemFactory;
import io.trino.filesystem.gcs.GcsFileSystemModule;
import io.trino.filesystem.s3.S3FileSystemFactory;
Expand Down Expand Up @@ -96,6 +100,8 @@ protected void setup(Binder binder)
}

newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using an optional binder here, the configured cache implementation should bind the implementation. So the NONE cache should install NoCachingHostAddressProvider and ALLUXIO should install ConsistentHashingHostAddressProvider.

newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON);
newMapBinder(binder, String.class, TrinoFileSystemCache.class);
}

@Provides
Expand All @@ -104,12 +110,28 @@ public TrinoFileSystemFactory createFileSystemFactory(
Optional<HdfsFileSystemLoader> hdfsFileSystemLoader,
LifeCycleManager lifeCycleManager,
Map<String, TrinoFileSystemFactory> factories,
Optional<TrinoFileSystemCache> fileSystemCache,
Optional<CacheKeyProvider> keyProvider,
Tracer tracer)
{
Optional<TrinoFileSystemFactory> hdfsFactory = hdfsFileSystemLoader.map(HdfsFileSystemLoader::create);
hdfsFactory.ifPresent(lifeCycleManager::addInstance);

TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFactory, factories);
if (fileSystemCache.isPresent()) {
delegate = new CacheFileSystemFactory(delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
}
return new TracingFileSystemFactory(tracer, delegate);
}

@Provides
@Singleton
public Optional<TrinoFileSystemCache> createFileSystemCache(FileSystemConfig config, Map<String, TrinoFileSystemCache> caches)
{
Optional<TrinoFileSystemCache> cache = Optional.ofNullable(caches.get(config.getCacheType()));
if (cache.isEmpty() && config.getCacheType() != null) {
throw new IllegalArgumentException("Unknown cache type %s".formatted(config.getCacheType()));
}
return cache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.filesystem.manager.FileSystemConfig.CacheType.NONE;

public class TestFileSystemConfig
{
Expand All @@ -31,7 +32,8 @@ public void testDefaults()
.setHadoopEnabled(true)
.setNativeAzureEnabled(false)
.setNativeS3Enabled(false)
.setNativeGcsEnabled(false));
.setNativeGcsEnabled(false)
.setCacheType(NONE));
}

@Test
Expand All @@ -42,13 +44,15 @@ public void testExplicitPropertyMappings()
.put("fs.native-azure.enabled", "true")
.put("fs.native-s3.enabled", "true")
.put("fs.native-gcs.enabled", "true")
.put("fs.cache", "none")
.buildOrThrow();

FileSystemConfig expected = new FileSystemConfig()
.setHadoopEnabled(false)
.setNativeAzureEnabled(true)
.setNativeS3Enabled(true)
.setNativeGcsEnabled(true);
.setNativeGcsEnabled(true)
.setCacheType(NONE);

assertFullMapping(properties, expected);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/trino-filesystem/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.cache;

import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public final class CacheFileSystem
implements TrinoFileSystem
{
private final TrinoFileSystem delegate;
private final TrinoFileSystemCache cache;
private final CacheKeyProvider keyProvider;

public CacheFileSystem(TrinoFileSystem delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cache = requireNonNull(cache, "cache is null");
this.keyProvider = requireNonNull(keyProvider, "keyProvider is null");
}

@Override
public TrinoInputFile newInputFile(Location location)
{
return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider);
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider);
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
TrinoOutputFile output = delegate.newOutputFile(location);
try {
cache.expire(location);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return output;
}

@Override
public void deleteFile(Location location)
throws IOException
{
delegate.deleteFile(location);
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
cache.expire(location);
}

@Override
public void deleteDirectory(Location location)
throws IOException
{
delegate.deleteDirectory(location);
}

@Override
public void renameFile(Location source, Location target)
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
throws IOException
{
delegate.renameFile(source, target);
cache.expire(source);
cache.expire(target);
}

@Override
public FileIterator listFiles(Location location)
throws IOException
{
return delegate.listFiles(location);
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
{
return delegate.directoryExists(location);
}

@Override
public void createDirectory(Location location)
throws IOException
{
delegate.createDirectory(location);
}

@Override
public void renameDirectory(Location source, Location target)
throws IOException
{
delegate.renameDirectory(source, target);
}

@Override
public Set<Location> listDirectories(Location location)
throws IOException
{
return delegate.listDirectories(location);
}

@Override
public Optional<Location> createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix)
throws IOException
{
return delegate.createTemporaryDirectory(targetPath, temporaryPrefix, relativePrefix);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.cache;

import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;

import static java.util.Objects.requireNonNull;

public final class CacheFileSystemFactory
implements TrinoFileSystemFactory
{
private final TrinoFileSystemFactory delegate;
private final TrinoFileSystemCache cache;
private final CacheKeyProvider keyProvider;

public CacheFileSystemFactory(TrinoFileSystemFactory delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cache = requireNonNull(cache, "cache is null");
this.keyProvider = requireNonNull(keyProvider, "keyProvider is null");
}

@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new CacheFileSystem(delegate.create(identity), cache, keyProvider);
}
}
Loading