Skip to content

Commit

Permalink
core: Provide mechanism to cache manifest file content (#4518)
Browse files Browse the repository at this point in the history
* Core: Add CONTENT_CACHES in ManifestFiles.java

* Fix kryo serialization failure for HadoopFileIO

* Add DEBUG log if ContentCache is not created

* Rename properties related to manifest caching

* Fix small string mistakes in testWeakFileIOReferenceCleanUp

* Clarify config documentation and change access modifier

* Fix checkstyle and catch UnsupportedOperationException
  • Loading branch information
rizaon authored Sep 26, 2022
1 parent 9fcf53b commit 6c7e8d1
Show file tree
Hide file tree
Showing 6 changed files with 777 additions and 3 deletions.
54 changes: 54 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,60 @@ private CatalogProperties() {}
public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1;

/**
* Controls whether to use caching during manifest reads or not.
*
* <p>Enabling manifest file caching require the following configuration constraints to be true:
*
* <ul>
* <li>{@link #IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS} must be a non-negative value.
* <li>{@link #IO_MANIFEST_CACHE_MAX_TOTAL_BYTES} must be a positive value.
* <li>{@link #IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH} must be a positive value.
* </ul>
*/
public static final String IO_MANIFEST_CACHE_ENABLED = "io.manifest.cache-enabled";

public static final boolean IO_MANIFEST_CACHE_ENABLED_DEFAULT = false;

/**
* Controls the maximum duration for which an entry stays in the manifest cache.
*
* <p>Must be a non-negative value. Following are specific behaviors of this config:
*
* <ul>
* <li>Zero - Cache entries expires only if it gets evicted due to memory pressure from {@link
* #IO_MANIFEST_CACHE_MAX_TOTAL_BYTES} setting.
* <li>Positive Values - Cache entries expire if not accessed via the cache after this many
* milliseconds
* </ul>
*/
public static final String IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS =
"io.manifest.cache.expiration-interval-ms";

public static final long IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(60);

/**
* Controls the maximum total amount of bytes to cache in manifest cache.
*
* <p>Must be a positive value.
*/
public static final String IO_MANIFEST_CACHE_MAX_TOTAL_BYTES =
"io.manifest.cache.max-total-bytes";

public static final long IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT = 100 * 1024 * 1024;

/**
* Controls the maximum length of file to be considered for caching.
*
* <p>An {@link org.apache.iceberg.io.InputFile} will not be cached if the length is longer than
* this limit. Must be a positive value.
*/
public static final String IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH =
"io.manifest.cache.max-content-length";

public static final long IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT = 8 * 1024 * 1024;

public static final String URI = "uri";
public static final String CLIENT_POOL_SIZE = "clients";
public static final int CLIENT_POOL_SIZE_DEFAULT = 2;
Expand Down
106 changes: 104 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,32 @@
*/
package org.apache.iceberg;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.ManifestReader.FileType;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ContentCache;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManifestFiles {
private ManifestFiles() {}

private static final Logger LOG = LoggerFactory.getLogger(ManifestFiles.class);

private static final org.apache.avro.Schema MANIFEST_AVRO_SCHEMA =
AvroSchemaUtil.convert(
ManifestFile.schema(),
Expand All @@ -44,6 +53,36 @@ private ManifestFiles() {}
ManifestFile.PARTITION_SUMMARY_TYPE,
GenericPartitionFieldSummary.class.getName()));

@VisibleForTesting
static Cache<FileIO, ContentCache> newManifestCache() {
return Caffeine.newBuilder()
.weakKeys()
.softValues()
.maximumSize(maxFileIO())
.removalListener(
(io, contentCache, cause) ->
LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause))
.recordStats()
.build();
}

private static final Cache<FileIO, ContentCache> CONTENT_CACHES = newManifestCache();

@VisibleForTesting
static ContentCache contentCache(FileIO io) {
return CONTENT_CACHES.get(
io,
fileIO ->
new ContentCache(
cacheDurationMs(fileIO), cacheTotalBytes(fileIO), cacheMaxContentLength(fileIO)));
}

/** Drop manifest file cache object for a FileIO if exists. */
public static synchronized void dropCache(FileIO fileIO) {
CONTENT_CACHES.invalidate(fileIO);
CONTENT_CACHES.cleanUp();
}

/**
* Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
*
Expand Down Expand Up @@ -86,7 +125,7 @@ public static ManifestReader<DataFile> read(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
manifest);
InputFile file = io.newInputFile(manifest.path(), manifest.length());
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES);
}
Expand Down Expand Up @@ -140,7 +179,7 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s",
manifest);
InputFile file = io.newInputFile(manifest.path(), manifest.length());
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
}
Expand Down Expand Up @@ -300,4 +339,67 @@ private static ManifestFile copyManifestInternal(

return writer.toManifestFile();
}

private static InputFile newInputFile(FileIO io, String path, long length) {
boolean enabled = false;

try {
enabled = cachingEnabled(io);
} catch (UnsupportedOperationException e) {
// There is an issue reading io.properties(). Disable caching.
enabled = false;
}

if (enabled) {
ContentCache cache = contentCache(io);
Preconditions.checkNotNull(
cache,
"ContentCache creation failed. Check that all manifest caching configurations has valid value.");
LOG.debug("FileIO-level cache stats: {}", CONTENT_CACHES.stats());
return cache.tryCache(io, path, length);
}

// caching is not enable for this io or caught RuntimeException.
return io.newInputFile(path, length);
}

private static int maxFileIO() {
String value = System.getProperty(SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO);
if (value != null) {
try {
return Integer.parseUnsignedInt(value);
} catch (NumberFormatException e) {
// will return the default
}
}
return SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT;
}

static boolean cachingEnabled(FileIO io) {
return PropertyUtil.propertyAsBoolean(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT);
}

static long cacheDurationMs(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
}

static long cacheTotalBytes(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT);
}

static long cacheMaxContentLength(FileIO io) {
return PropertyUtil.propertyAsLong(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
}
}
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/SystemProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ private SystemProperties() {}
/** Whether to use the shared worker pool when planning table scans. */
public static final String SCAN_THREAD_POOL_ENABLED = "iceberg.scan.plan-in-worker-pool";

/**
* Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have
* associated {@link org.apache.iceberg.io.ContentCache} in memory at a time.
*/
public static final String IO_MANIFEST_CACHE_MAX_FILEIO = "iceberg.io.manifest.cache.fileio-max";

public static final int IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT = 8;

static boolean getBoolean(String systemProperty, boolean defaultValue) {
String value = System.getProperty(systemProperty);
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;

public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations {

private SerializableSupplier<Configuration> hadoopConf;
private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());

/**
* Constructor used for dynamic FileIO loading.
Expand All @@ -61,6 +63,11 @@ public Configuration conf() {
return hadoopConf.get();
}

@Override
public void initialize(Map<String, String> props) {
this.properties = SerializableMap.copyOf(props);
}

@Override
public InputFile newInputFile(String path) {
return HadoopInputFile.fromLocation(path, hadoopConf.get());
Expand Down Expand Up @@ -89,7 +96,7 @@ public void deleteFile(String path) {

@Override
public Map<String, String> properties() {
return ImmutableMap.of();
return properties.immutableMap();
}

@Override
Expand Down
Loading

0 comments on commit 6c7e8d1

Please sign in to comment.