Skip to content

Commit

Permalink
fix!: ensure Iceberg layouts own the SeekableChannelsProvider (#6371)
Browse files Browse the repository at this point in the history
This greatly improves the efficiency of Iceberg reading. Previously, it
was creating a `SeekableChannelsProvider` per URI, and now only one is
created once per layout (/ Table).

To aid in this update, objects that were previously created on-demand in
`IcebergBaseLayout` are now created once upon construction. To enable
this, it was noted that only the URI scheme is relevant for
discrimination, and not actually the full URI to the data files. Thus,
we can use the URI scheme as provided via
`org.apache.iceberg.Table#location` to do any up-front loading.

The various interfaces that take a URI have been update to take a URI
scheme instead. While this change could technically have been made in a
non-breaking fashion by delegating existing URI methods to URI scheme
methods, the existence of the URI methods encourages the wrong mental
model and is easy to misuse, so they have been removed.

One of the `ParquetTableLocationKey` constructors has been deprecated,
marked for removal. A more appropriate constructor has been added.

BREAKING CHANGE: 
- `SeekableChannelsProviderLoader.fromServiceLoader` has been removed,
replaced with `SeekableChannelsProviderLoader.load`.
- `DataInstructionsProviderLoader.fromServiceLoader` has been removed,
replaced with `DataInstructionsProviderLoader.load`.
- `SeekableChannelsProviderPlugin` methods have been changed, now use a
`String` for the URI scheme instead of a `URI`.
- `DataInstructionsProviderPlugin.createInstructions` method has been
changed, now uses a `String` for the URI scheme instead of a `URI`.
- `IcebergTableParquetLocationKey` has added a new
`SeekableChannelsProvider` parameter.
  • Loading branch information
devinrsmith authored Nov 15, 2024
1 parent 02127ae commit 38a3aa8
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
Expand All @@ -19,11 +18,22 @@ public final class SeekableChannelsProviderLoader {

private static volatile SeekableChannelsProviderLoader instance;

/**
* Get a static a {@link SeekableChannelsProviderLoader} instance that is loading with
* {@link SeekableChannelsProviderPlugin} provided via {@link ServiceLoader#load(Class)}.
*
* @return The {@link SeekableChannelsProviderLoader} instance.
*/
public static SeekableChannelsProviderLoader getInstance() {
if (instance == null) {
instance = new SeekableChannelsProviderLoader();
SeekableChannelsProviderLoader localInstance;
if ((localInstance = instance) == null) {
synchronized (SeekableChannelsProviderLoader.class) {
if ((localInstance = instance) == null) {
instance = localInstance = new SeekableChannelsProviderLoader();
}
}
}
return instance;
return localInstance;
}

private final List<SeekableChannelsProviderPlugin> providers;
Expand All @@ -37,21 +47,19 @@ private SeekableChannelsProviderLoader() {
}

/**
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI scheme.
* For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @param uriScheme The URI scheme
* @param specialInstructions An optional object to pass special instructions to the provider.
* @return A {@link SeekableChannelsProvider} for the given URI.
* @return A {@link SeekableChannelsProvider} for the given URI scheme.
*/
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
@Nullable final Object specialInstructions) {
public SeekableChannelsProvider load(@NotNull final String uriScheme, @Nullable final Object specialInstructions) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, specialInstructions)) {
return plugin.createProvider(uri, specialInstructions);
if (plugin.isCompatible(uriScheme, specialInstructions)) {
return plugin.createProvider(uriScheme, specialInstructions);
}
}
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
throw new UnsupportedOperationException("No plugin found for uri scheme: " + uriScheme);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;

/**
* A plugin interface for providing {@link SeekableChannelsProvider} implementations for different URI schemes, e.g. S3.
* Check out {@link SeekableChannelsProviderLoader} for more details.
*/
public interface SeekableChannelsProviderPlugin {
/**
* Check if this plugin is compatible with the given URI and config object.
* Check if this plugin is compatible with the given URI scheme and config object.
*/
boolean isCompatible(@NotNull URI uri, @Nullable Object config);
boolean isCompatible(@NotNull String uriScheme, @Nullable Object config);

/**
* Create a {@link SeekableChannelsProvider} for the given URI and config object.
* Create a {@link SeekableChannelsProvider} for the given URI scheme and config object.
*/
SeekableChannelsProvider createProvider(@NotNull URI uri, @Nullable Object object);
SeekableChannelsProvider createProvider(@NotNull String uriScheme, @Nullable Object object);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Map;

/**
Expand All @@ -25,17 +24,18 @@
@SuppressWarnings("unused")
public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin {
@Override
public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
public S3Instructions createInstructions(@NotNull final String uriScheme,
@NotNull final Map<String, String> properties) {
final S3Instructions s3Instructions = DeephavenAwsClientFactory.getInstructions(properties).orElse(null);
if (s3Instructions != null) {
return s3Instructions;
}

// If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can
// create a useful S3Instructions object.
if (uri.getScheme().equals("s3")
|| uri.getScheme().equals("s3a")
|| uri.getScheme().equals("s3n")
if (uriScheme.equals("s3")
|| uriScheme.equals("s3a")
|| uriScheme.equals("s3n")
|| properties.containsKey(AwsClientProperties.CLIENT_REGION)
|| properties.containsKey(S3FileIOProperties.ACCESS_KEY_ID)
|| properties.containsKey(S3FileIOProperties.SECRET_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;

/**
* A service loader class for loading {@link DataInstructionsProviderPlugin} implementations at runtime which provide
* {@link DataInstructionsProviderLoader} implementations for different URI paths.
* {@link DataInstructionsProviderLoader} implementations for different URI schemes.
*/
public final class DataInstructionsProviderLoader {
/**
Expand All @@ -21,30 +24,33 @@ public final class DataInstructionsProviderLoader {
/**
* Ensure that the {@link DataInstructionsProviderPlugin plugins} are loaded exactly once.
*/
private static void ensureProviders() {
if (cachedProviders == null) {
private static List<DataInstructionsProviderPlugin> ensureProviders() {
List<DataInstructionsProviderPlugin> localProviders;
if ((localProviders = cachedProviders) == null) {
synchronized (DataInstructionsProviderLoader.class) {
if (cachedProviders == null) {
cachedProviders = new ArrayList<>();
if ((localProviders = cachedProviders) == null) {
localProviders = new ArrayList<>();
// Load the plugins
for (final DataInstructionsProviderPlugin plugin : ServiceLoader
.load(DataInstructionsProviderPlugin.class)) {
cachedProviders.add(plugin);
localProviders.add(plugin);
}
cachedProviders = localProviders;
}
}
}
return localProviders;
}

/**
* Get a {@link DataInstructionsProviderLoader} instance for the given property collection.
* Create a {@link DataInstructionsProviderLoader} instance for the given property collection with a static list of
* {@link DataInstructionsProviderPlugin} provided via {@link ServiceLoader#load(Class)}.
*
* @param properties The property collection.
* @return A {@link DataInstructionsProviderLoader} instance.
*/
public static DataInstructionsProviderLoader create(final Map<String, String> properties) {
ensureProviders();
return new DataInstructionsProviderLoader(properties);
return new DataInstructionsProviderLoader(properties, ensureProviders());
}

/**
Expand All @@ -62,27 +68,28 @@ public static DataInstructionsProviderLoader create(final Map<String, String> pr
*
* @param properties The property collection.
*/
private DataInstructionsProviderLoader(final Map<String, String> properties) {
this.properties = properties;
providers = cachedProviders;
private DataInstructionsProviderLoader(
final Map<String, String> properties,
final List<DataInstructionsProviderPlugin> providers) {
this.properties = Objects.requireNonNull(properties);
this.providers = Objects.requireNonNull(providers);
}

/**
* Create a new data instructions object compatible with reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create an
* {@code S3Instructions} object which can read files from S3.
* Create a new data instructions object compatible with reading from and writing to the given URI scheme. For
* example, for an "S3" URI scheme will create an {@code S3Instructions} object which can read files from S3.
*
* @param uri The URI
* @return A data instructions object for the given URI or null if one cannot be found
* @param uriScheme The URI scheme
* @return A data instructions object for the given URI scheme or null if one cannot be found
*/
public Object fromServiceLoader(@NotNull final URI uri) {
public Object load(@NotNull final String uriScheme) {
for (final DataInstructionsProviderPlugin plugin : providers) {
final Object pluginInstructions = plugin.createInstructions(uri, properties);
final Object pluginInstructions = plugin.createInstructions(uriScheme, properties);
if (pluginInstructions != null) {
return pluginInstructions;
}
}
// No plugin found for this URI and property collection.
// No plugin found for this URI scheme and property collection.
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@

import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Map;

/**
* A plugin interface for providing {@link DataInstructionsProviderPlugin} implementations for different property
* collections and URI values. Check out {@link DataInstructionsProviderLoader} for more details.
* collections and URI schemes. Check out {@link DataInstructionsProviderLoader} for more details.
*/
public interface DataInstructionsProviderPlugin {
/**
* Create a data instructions object for the given URI.
* Create a data instructions object for the given URI scheme.
*/
Object createInstructions(@NotNull URI uri, @NotNull final Map<String, String> properties);
Object createInstructions(@NotNull String uriScheme, @NotNull Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
Expand All @@ -38,65 +40,37 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
final TableDefinition tableDef;

/**
* The instructions for customizations while reading.
* The URI scheme from the Table {@link Table#location() location}.
*/
final IcebergReadInstructions instructions;
private final String uriScheme;

/**
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
*/
final Map<URI, IcebergTableLocationKey> cache;
private final Map<URI, IcebergTableLocationKey> cache;

/**
* The {@link Snapshot} from which to discover data files.
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table.
*/
Snapshot snapshot;
private final ParquetInstructions parquetInstructions;

/**
* The data instructions provider for creating instructions from URI and user-supplied properties.
* The {@link SeekableChannelsProvider} object that will be used for {@link IcebergTableParquetLocationKey}
* creation.
*/
final DataInstructionsProviderLoader dataInstructionsProvider;
private final SeekableChannelsProvider channelsProvider;

/**
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only
* accessed while synchronized on {@code this}.
* The {@link Snapshot} from which to discover data files.
*/
ParquetInstructions parquetInstructions;
Snapshot snapshot;

protected IcebergTableLocationKey locationKey(
final org.apache.iceberg.FileFormat format,
final URI fileUri,
@Nullable final Map<String, Comparable<?>> partitions) {

if (format == org.apache.iceberg.FileFormat.PARQUET) {
if (parquetInstructions == null) {
// Start with user-supplied instructions (if provided).
final ParquetInstructions.Builder builder = new ParquetInstructions.Builder();

// Add the table definition.
builder.setTableDefinition(tableDef);

// Add any column rename mappings.
if (!instructions.columnRenames().isEmpty()) {
for (Map.Entry<String, String> entry : instructions.columnRenames().entrySet()) {
builder.addColumnNameMapping(entry.getKey(), entry.getValue());
}
}

// Add the data instructions if provided as part of the IcebergReadInstructions.
if (instructions.dataInstructions().isPresent()) {
builder.setSpecialInstructions(instructions.dataInstructions().get());
} else {
// Attempt to create data instructions from the properties collection and URI.
final Object dataInstructions = dataInstructionsProvider.fromServiceLoader(fileUri);
if (dataInstructions != null) {
builder.setSpecialInstructions(dataInstructions);
}
}

parquetInstructions = builder.build();
}
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions);
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions, channelsProvider);
}
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
tableAdapter, snapshot.snapshotId(), format, fileUri));
Expand All @@ -112,23 +86,46 @@ public IcebergBaseLayout(
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableAdapter = tableAdapter;
this.snapshot = tableAdapter.getSnapshot(instructions);
this.instructions = instructions;
this.dataInstructionsProvider = dataInstructionsProvider;
this.tableDef = tableAdapter.definition(instructions);

this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme();
// Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create
// data instructions from the properties collection and URI scheme.
final Object specialInstructions = instructions.dataInstructions()
.orElseGet(() -> dataInstructionsProvider.load(uriScheme));
{
// Start with user-supplied instructions (if provided).
final ParquetInstructions.Builder builder = new ParquetInstructions.Builder();

// Add the table definition.
builder.setTableDefinition(tableDef);

// Add any column rename mappings.
if (!instructions.columnRenames().isEmpty()) {
for (Map.Entry<String, String> entry : instructions.columnRenames().entrySet()) {
builder.addColumnNameMapping(entry.getKey(), entry.getValue());
}
}
if (specialInstructions != null) {
builder.setSpecialInstructions(specialInstructions);
}
this.parquetInstructions = builder.build();
}
this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions);
this.cache = new HashMap<>();
}

abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri);

@NotNull
private URI dataFileUri(@NotNull DataFile df) {
String path = df.path().toString();
final FileIO fileIO = tableAdapter.icebergTable().io();
if (fileIO instanceof RelativeFileIO) {
path = ((RelativeFileIO) fileIO).absoluteLocation(path);
}
return FileUtils.convertToURI(path, false);
private static String path(String path, FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
}

private static URI locationUri(Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
}

private static URI dataFileUri(Table table, DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}

@Override
Expand All @@ -149,7 +146,12 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
for (DataFile df : reader) {
final URI fileUri = dataFileUri(df);
final URI fileUri = dataFileUri(table, df);
if (!uriScheme.equals(fileUri.getScheme())) {
throw new TableDataException(String.format(
"%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, fileUri=%s",
table, snapshot.snapshotId(), uriScheme, fileUri));
}
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
Expand Down
Loading

0 comments on commit 38a3aa8

Please sign in to comment.