Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use DH S3Instructions to build Iceberg AWS clients #6113

Merged
merged 23 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8066110
feat: Use DH S3Instructions to build Iceberg AWS clients
devinrsmith Sep 23, 2024
4dc52bd
Refactor some shared code
devinrsmith Sep 23, 2024
9c13e93
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 8, 2024
de78d8a
Review response
devinrsmith Oct 10, 2024
28f7643
small changes
devinrsmith Oct 10, 2024
c8ecd2c
Plumb through to iceberg.py
devinrsmith Oct 10, 2024
3804ef9
remove import
devinrsmith Oct 10, 2024
385ac25
Refactor, don't have IcebergTools extendable
devinrsmith Oct 10, 2024
b909ff3
Delay cleanup until catalog is unreachable
devinrsmith Oct 11, 2024
a4d2b5a
Review response
devinrsmith Oct 15, 2024
7da2369
review response
devinrsmith Oct 18, 2024
a6f51a4
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 18, 2024
14aa4fb
Update iceberg python documentation
devinrsmith Oct 18, 2024
b62debc
Review response
devinrsmith Oct 21, 2024
11ba983
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 21, 2024
6d0bbad
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 28, 2024
1340833
Fix
devinrsmith Oct 28, 2024
790f086
changes
devinrsmith Oct 28, 2024
57b98a4
Migrate cleanup to CleanupReferenceProcessor
devinrsmith Oct 28, 2024
e3a9c65
fix deps for javadoc
devinrsmith Oct 29, 2024
3e78ce8
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 29, 2024
bb7e239
fixup after merge
devinrsmith Oct 29, 2024
098388f
review response
devinrsmith Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* An {@link AwsClientFactory} and {@link S3FileIOAwsClientFactory} implementation that assumes ownership of AWS client
* creation as configured via {@link S3Instructions}.
*/
public class DeephavenAwsClientFactory implements AwsClientFactory, S3FileIOAwsClientFactory {
public final class DeephavenAwsClientFactory implements AwsClientFactory, S3FileIOAwsClientFactory {

private static final String UUID_KEY = DeephavenAwsClientFactory.class.getName() + ".__uuid";

/**
* Adds {@link DeephavenAwsClientFactory} to {@code propertiesOut} with the keys
* {@value AwsProperties#CLIENT_FACTORY} and {@value S3FileIOProperties#CLIENT_FACTORY}; it is an error if either of
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
* these properties is already set. Also sets {@value S3FileIOProperties#PRELOAD_CLIENT_ENABLED} to "true" to ensure
* the S3 client is initialized during the catalog creation process. After the necessary objects have been
* initialized, the caller should call the returned {@link Runnable} to clean up.
* these properties is already set. After the corresponding {@link org.apache.iceberg.catalog.Catalog} is no longer
* in use, the caller should invoke the returned {@link Runnable} to clean up.
*
* @param instructions the instructions
* @param propertiesOut the properties
Expand All @@ -42,14 +42,31 @@ public static Runnable addToProperties(S3Instructions instructions, Map<String,
put(propertiesOut, S3FileIOProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
final String uuid = UUID.randomUUID().toString();
put(propertiesOut, UUID_KEY, uuid);
// We are enabling preload to ensure that #initialize gets called during the creation of the catalog while we
// know the instructions are in the map.
// Note: glue client is already preloaded on init if needed
propertiesOut.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true");
S3_INSTRUCTIONS_MAP.put(uuid, instructions);
return () -> S3_INSTRUCTIONS_MAP.remove(uuid);
}

/**
* Get the {@link S3Instructions} as set in the corresponding {@link #addToProperties(S3Instructions, Map)} if the
* properties were built with that. If the properties were built with {@link #addToProperties(S3Instructions, Map)},
* but the {@link Runnable} was already invoked for cleanup, an {@link IllegalStateException} will be thrown.
*
* @param properties the properties
* @return the instructions
*/
public static Optional<S3Instructions> get(Map<String, String> properties) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final String uuid = properties.get(UUID_KEY);
if (uuid == null) {
return Optional.empty();
}
final S3Instructions instructions = S3_INSTRUCTIONS_MAP.get(uuid);
if (instructions == null) {
throw new IllegalStateException(
"This S3Iinstructions were already cleaned up; please ensure that the returned Runnable from addToProperties is not invoked until the Catalog is no longer in use.");
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}
return Optional.of(instructions);
}

private static <K, V> void put(Map<K, V> map, K key, V value) {
if (map.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException(String.format("Key '%s' already exist in map", key));
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -62,16 +79,8 @@ private static <K, V> void put(Map<K, V> map, K key, V value) {

@Override
public void initialize(Map<String, String> properties) {
final String uuid = properties.get(UUID_KEY);
if (uuid == null) {
throw new IllegalArgumentException(
"DeephavenAwsClientFactory was setup improperly; it must be configured with DeephavenAwsClientFactory.addToProperties");
}
final S3Instructions s3i = S3_INSTRUCTIONS_MAP.get(uuid);
if (s3i == null) {
throw new IllegalStateException("This DeephavenAwsClientFactory was already cleaned up");
}
this.instructions = s3i;
this.instructions = get(properties).orElseThrow(() -> new IllegalArgumentException(
"DeephavenAwsClientFactory was setup improperly; it must be configured with DeephavenAwsClientFactory.addToProperties"));
}

private void checkInit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.ref.Cleaner;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -26,6 +27,9 @@
public final class IcebergToolsS3 {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

// Note: this could move to a shared location
private static final Cleaner CLEANER = Cleaner.create();

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
Expand Down Expand Up @@ -112,7 +116,9 @@ public static IcebergCatalogAdapter createGlue(
* properties (found amongst {@link S3FileIOProperties}, {@link HttpClientProperties}, and {@link AwsProperties}),
* the clients are created in the same way that Deephaven's AWS clients are configured with respect to
* {@code instructions}. This ensures, amongst other things, that Iceberg's AWS configuration and credentials are
* in-sync with Deephaven's AWS configuration and credentials for S3 access.
* in-sync with Deephaven's AWS configuration and credentials for S3 access. The {@code instructions} will
* automatically be used as special instructions if {@link IcebergInstructions#dataInstructions()} if not explicitly
* set.
*
* <p>
* The caller is still responsible for providing the properties necessary as specified in
Expand All @@ -133,10 +139,11 @@ public static IcebergCatalogAdapter createAdapter(
@NotNull final S3Instructions instructions) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, String> newProperties = new HashMap<>(properties);
final Runnable cleanup = DeephavenAwsClientFactory.addToProperties(instructions, newProperties);
try {
return IcebergTools.createAdapter(name, newProperties, hadoopConfig);
} finally {
cleanup.run();
}
final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(name, newProperties, hadoopConfig);
// When the Catalog becomes unreachable, we can invoke the DeephavenAwsClientFactory cleanup.
// Note: it would be incorrect to register the cleanup against the adapter since the Catalog can outlive the
// adapter (and the DeephavenAwsClientFactory properties are needed by the Catalog).
CLEANER.register(adapter.catalog(), cleanup);
return adapter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I had to do something similar for the S3Request objects, Ryan suggested using CleanupReferenceProcessor. You can check that too, if that has any advantages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "register" method on Cleaner is very nice. I tried to add a similarly helpful register method to CleanupReferenceProcessor (essentially, creating a reference behind the scenes that ties an object and a cleanup action), and it almost worked... for some reason though, the caller needs to explicitly retain the returned reference, whereas the same limitation does not apply to Cleaner. It could be I was missing some subtle aspect of the Reference stuff - will have convo w/ Ryan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a separate PR that adds similar functionality to CleanupReferenceProcessor; #6213

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.auto.service.AutoService;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderPlugin;
import org.apache.iceberg.aws.AwsClientProperties;
Expand All @@ -15,13 +16,18 @@
import java.util.Map;

/**
* {@link io.deephaven.iceberg.internal.DataInstructionsProviderPlugin} implementation used for reading files from S3.
* {@link DataInstructionsProviderPlugin} implementation used for reading files from S3.
*/
@AutoService(io.deephaven.iceberg.internal.DataInstructionsProviderPlugin.class)
@AutoService(DataInstructionsProviderPlugin.class)
@SuppressWarnings("unused")
public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin {
@Override
public Object createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
final S3Instructions s3Instructions = DeephavenAwsClientFactory.get(properties).orElse(null);
if (s3Instructions != null) {
return s3Instructions;
}

// If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can
// create a useful S3Instructions object.
if (uri.getScheme().equals("s3")
Expand Down
4 changes: 2 additions & 2 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ def adapter(

If the user is responsible for connecting to an AWS catalog or S3-compatible API for the warehouse, setting
s3_instructions is the recommended way of configuring these details, as it allows Deephaven to own the creation of
the Iceberg-internal AWS / S3 client. When set, these instructions should also be included as part of
IcebergInstructions data_instructions. Note: some REST Catalog implementations may be responsible for providing the
the Iceberg-internal AWS / S3 client. When set, these instructions will also be automatically included as part of
IcebergInstructions data_instructions. Note: some REST Catalog implementations may be responsible for providing S3
warehouse credentials, in which case the user is not responsible for setting this.

Other common properties include:
Expand Down