Skip to content

Commit

Permalink
Use shared cache for per transaction file listing cache
Browse files Browse the repository at this point in the history
This allows to cap cache size independently of query
concurrency.
  • Loading branch information
sopel39 committed May 15, 2023
1 parent e9fd487 commit 0f3ff0a
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 50 deletions.
3 changes: 2 additions & 1 deletion docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ Hive connector documentation.
- How long a cached directory listing is considered valid.
- ``1m``
* - ``hive.per-transaction-file-status-cache.max-retained-size``
- Maximum retained size of cached file status entries per transaction
- Maximum retained size of all entries in per transaction file status cache.
Retained size limit is shared across all running queries.
- ``100MB``
* - ``hive.rcfile.time-zone``
- Adjusts binary encoded timestamp values to a specific time zone. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.plugin.hive.PropertiesSystemTableProvider;
import io.trino.plugin.hive.aws.athena.PartitionProjectionService;
import io.trino.plugin.hive.fs.FileSystemDirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
Expand Down Expand Up @@ -162,6 +163,7 @@ public S3SelectTestHelper(String host,
new DefaultHiveMaterializedViewMetadataFactory(),
SqlStandardAccessControlMetadata::new,
new FileSystemDirectoryLister(),
new TransactionScopeCachingDirectoryListerFactory(hiveConfig),
new PartitionProjectionService(this.hiveConfig, ImmutableMap.of(), new TestingTypeManager()),
true);
transactionManager = new HiveTransactionManager(metadataFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.aws.athena.PartitionProjectionService;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -77,7 +76,7 @@ public class HiveMetadataFactory
private final Optional<Duration> hiveTransactionHeartbeatInterval;
private final ScheduledExecutorService heartbeatService;
private final DirectoryLister directoryLister;
private final DataSize perTransactionFileStatusCacheMaximumDataSize;
private final TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory;
private final PartitionProjectionService partitionProjectionService;
private final boolean allowTableRename;
private final HiveTimestampPrecision hiveViewsTimestampPrecision;
Expand All @@ -103,6 +102,7 @@ public HiveMetadataFactory(
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
DirectoryLister directoryLister,
TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory,
PartitionProjectionService partitionProjectionService,
@AllowHiveTableRename boolean allowTableRename)
{
Expand Down Expand Up @@ -138,7 +138,7 @@ public HiveMetadataFactory(
hiveMaterializedViewMetadataFactory,
accessControlMetadataFactory,
directoryLister,
hiveConfig.getPerTransactionFileStatusCacheMaxRetainedSize(),
transactionScopeCachingDirectoryListerFactory,
partitionProjectionService,
allowTableRename,
hiveConfig.getTimestampPrecision());
Expand Down Expand Up @@ -176,7 +176,7 @@ public HiveMetadataFactory(
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
DirectoryLister directoryLister,
DataSize perTransactionFileStatusCacheMaximumDataSize,
TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory,
PartitionProjectionService partitionProjectionService,
boolean allowTableRename,
HiveTimestampPrecision hiveViewsTimestampPrecision)
Expand Down Expand Up @@ -219,7 +219,7 @@ public HiveMetadataFactory(
this.maxPartitionDropsPerQuery = maxPartitionDropsPerQuery;
this.heartbeatService = requireNonNull(heartbeatService, "heartbeatService is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.perTransactionFileStatusCacheMaximumDataSize = requireNonNull(perTransactionFileStatusCacheMaximumDataSize, "perTransactionFileStatusCacheMaximumDataSize is null");
this.transactionScopeCachingDirectoryListerFactory = requireNonNull(transactionScopeCachingDirectoryListerFactory, "transactionScopeCachingDirectoryListerFactory is null");
this.partitionProjectionService = requireNonNull(partitionProjectionService, "partitionProjectionService is null");
this.allowTableRename = allowTableRename;
this.hiveViewsTimestampPrecision = requireNonNull(hiveViewsTimestampPrecision, "hiveViewsTimestampPrecision is null");
Expand All @@ -231,14 +231,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(
memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize)); // per-transaction cache

DirectoryLister directoryLister;
if (perTransactionFileStatusCacheMaximumDataSize.toBytes() > 0) {
directoryLister = new TransactionScopeCachingDirectoryLister(this.directoryLister, perTransactionFileStatusCacheMaximumDataSize);
}
else {
directoryLister = this.directoryLister;
}

DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
hdfsEnvironment,
hiveMetastoreClosure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.hdfs.TrinoFileSystemCacheStats;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory;
import io.trino.plugin.hive.line.CsvFileWriterFactory;
import io.trino.plugin.hive.line.CsvPageSourceFactory;
import io.trino.plugin.hive.line.JsonFileWriterFactory;
Expand Down Expand Up @@ -106,6 +107,7 @@ public void configure(Binder binder)
.setDefault().to(DefaultHiveMaterializedViewMetadataFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, TransactionalMetadataFactory.class)
.setDefault().to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(TransactionScopeCachingDirectoryListerFactory.class).in(Scopes.SINGLETON);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(ConnectorSplitManager.class).as(generator -> generator.generatedNameOf(HiveSplitManager.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.fs;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

public class TransactionDirectoryListingCacheKey
{
private static final long INSTANCE_SIZE = instanceSize(TransactionDirectoryListingCacheKey.class);

private final long transactionId;
private final DirectoryListingCacheKey key;

public TransactionDirectoryListingCacheKey(long transactionId, DirectoryListingCacheKey key)
{
this.transactionId = transactionId;
this.key = requireNonNull(key, "key is null");
}

public DirectoryListingCacheKey getKey()
{
return key;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + key.getRetainedSizeInBytes();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o;
return transactionId == that.transactionId && key.equals(that.key);
}

@Override
public int hashCode()
{
return Objects.hash(transactionId, key);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("transactionId", transactionId)
.add("key", key)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.units.DataSize;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Storage;
import io.trino.plugin.hive.metastore.Table;
Expand All @@ -40,9 +38,10 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOfObjectArray;
import static java.lang.Math.toIntExact;
import static io.trino.plugin.hive.fs.DirectoryListingCacheKey.allKeysWithPath;
import static java.util.Collections.synchronizedList;
import static java.util.Objects.requireNonNull;

Expand All @@ -55,42 +54,34 @@
public class TransactionScopeCachingDirectoryLister
implements DirectoryLister
{
private final long transactionId;
//TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys
// to deal more efficiently with cache invalidation scenarios for partitioned tables.
private final Cache<DirectoryListingCacheKey, FetchingValueHolder> cache;
private final Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> cache;
private final DirectoryLister delegate;

public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, DataSize maxSize)
public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long transactionId, Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> cache)
{
this(delegate, maxSize, Optional.empty());
}

@VisibleForTesting
TransactionScopeCachingDirectoryLister(DirectoryLister delegate, DataSize maxSize, Optional<Integer> concurrencyLevel)
{
EvictableCacheBuilder<DirectoryListingCacheKey, FetchingValueHolder> cacheBuilder = EvictableCacheBuilder.newBuilder()
.maximumWeight(maxSize.toBytes())
.weigher((key, value) -> toIntExact(key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes()));
concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel);
this.cache = cacheBuilder.build();
this.delegate = requireNonNull(delegate, "delegate is null");
this.transactionId = transactionId;
this.cache = requireNonNull(cache, "cache is null");
}

@Override
public RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException
{
return listInternal(fs, table, new DirectoryListingCacheKey(path.toString(), false));
return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), false)));
}

@Override
public RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
throws IOException
{
return listInternal(fs, table, new DirectoryListingCacheKey(path.toString(), true));
return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), true)));
}

private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, Table table, DirectoryListingCacheKey cacheKey)
private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey)
throws IOException
{
FetchingValueHolder cachedValueHolder;
Expand All @@ -101,7 +92,7 @@ private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, Table table,
Throwable throwable = e.getCause();
throwIfInstanceOf(throwable, IOException.class);
throwIfUnchecked(throwable);
throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable);
throw new RuntimeException("Failed to list directory: " + cacheKey.getKey().getPath(), throwable);
}

if (cachedValueHolder.isFullyCached()) {
Expand All @@ -111,21 +102,23 @@ private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, Table table,
return cachingRemoteIterator(cachedValueHolder, cacheKey);
}

private RemoteIterator<TrinoFileStatus> createListingRemoteIterator(FileSystem fs, Table table, DirectoryListingCacheKey cacheKey)
private RemoteIterator<TrinoFileStatus> createListingRemoteIterator(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey)
throws IOException
{
if (cacheKey.isRecursiveFilesOnly()) {
return delegate.listFilesRecursively(fs, table, new Path(cacheKey.getPath()));
if (cacheKey.getKey().isRecursiveFilesOnly()) {
return delegate.listFilesRecursively(fs, table, new Path(cacheKey.getKey().getPath()));
}
return delegate.list(fs, table, new Path(cacheKey.getPath()));
return delegate.list(fs, table, new Path(cacheKey.getKey().getPath()));
}

@Override
public void invalidate(Table table)
{
if (isLocationPresent(table.getStorage())) {
if (table.getPartitionColumns().isEmpty()) {
cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(table.getStorage().getLocation())));
cache.invalidateAll(allKeysWithPath(new Path(table.getStorage().getLocation())).stream()
.map(key -> new TransactionDirectoryListingCacheKey(transactionId, key))
.collect(toImmutableList()));
}
else {
// a partitioned table can have multiple paths in cache
Expand All @@ -139,12 +132,14 @@ public void invalidate(Table table)
public void invalidate(Partition partition)
{
if (isLocationPresent(partition.getStorage())) {
cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(partition.getStorage().getLocation())));
cache.invalidateAll(allKeysWithPath(new Path(partition.getStorage().getLocation())).stream()
.map(key -> new TransactionDirectoryListingCacheKey(transactionId, key))
.collect(toImmutableList()));
}
delegate.invalidate(partition);
}

private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(FetchingValueHolder cachedValueHolder, DirectoryListingCacheKey cacheKey)
private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(FetchingValueHolder cachedValueHolder, TransactionDirectoryListingCacheKey cacheKey)
{
return new RemoteIterator<>()
{
Expand Down Expand Up @@ -183,11 +178,11 @@ public TrinoFileStatus next()
@VisibleForTesting
boolean isCached(Path path)
{
return isCached(new DirectoryListingCacheKey(path.toString(), false));
return isCached(new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), false)));
}

@VisibleForTesting
boolean isCached(DirectoryListingCacheKey cacheKey)
boolean isCached(TransactionDirectoryListingCacheKey cacheKey)
{
FetchingValueHolder cached = cache.getIfPresent(cacheKey);
return cached != null && cached.isFullyCached();
Expand All @@ -199,7 +194,7 @@ private static boolean isLocationPresent(Storage storage)
return storage.getOptionalLocation().isPresent() && !storage.getLocation().isEmpty();
}

private static class FetchingValueHolder
static class FetchingValueHolder
{
private static final long ATOMIC_LONG_SIZE = instanceSize(AtomicLong.class);
private static final long INSTANCE_SIZE = instanceSize(FetchingValueHolder.class);
Expand Down
Loading

0 comments on commit 0f3ff0a

Please sign in to comment.