Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use DH S3Instructions to build Iceberg AWS clients #6113

Merged
merged 23 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8066110
feat: Use DH S3Instructions to build Iceberg AWS clients
devinrsmith Sep 23, 2024
4dc52bd
Refactor some shared code
devinrsmith Sep 23, 2024
9c13e93
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 8, 2024
de78d8a
Review response
devinrsmith Oct 10, 2024
28f7643
small changes
devinrsmith Oct 10, 2024
c8ecd2c
Plumb through to iceberg.py
devinrsmith Oct 10, 2024
3804ef9
remove import
devinrsmith Oct 10, 2024
385ac25
Refactor, don't have IcebergTools extendable
devinrsmith Oct 10, 2024
b909ff3
Delay cleanup until catalog is unreachable
devinrsmith Oct 11, 2024
a4d2b5a
Review response
devinrsmith Oct 15, 2024
7da2369
review response
devinrsmith Oct 18, 2024
a6f51a4
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 18, 2024
14aa4fb
Update iceberg python documentation
devinrsmith Oct 18, 2024
b62debc
Review response
devinrsmith Oct 21, 2024
11ba983
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 21, 2024
6d0bbad
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 28, 2024
1340833
Fix
devinrsmith Oct 28, 2024
790f086
changes
devinrsmith Oct 28, 2024
57b98a4
Migrate cleanup to CleanupReferenceProcessor
devinrsmith Oct 28, 2024
e3a9c65
fix deps for javadoc
devinrsmith Oct 29, 2024
3e78ce8
Merge remote-tracking branch 'upstream/main' into dh-iceberg-s3-client
devinrsmith Oct 29, 2024
bb7e239
fixup after merge
devinrsmith Oct 29, 2024
098388f
review response
devinrsmith Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ evaluationDependsOn Docker.registryProject('minio')
description 'Iceberg: Support to read iceberg catalogs.'

dependencies {
implementation project(':extensions-iceberg')
api project(':extensions-iceberg')

// Bring in the AWS / S3 extensions
api platform(libs.iceberg.bom)
Expand All @@ -25,12 +25,19 @@ dependencies {
implementation libs.awssdk.s3
implementation libs.awssdk.crt.client
runtimeOnly libs.awssdk.sts
runtimeOnly libs.awssdk.glue
implementation libs.awssdk.glue

// We don't want to explicitly pull in dependencies for dynamodb (org.apache.iceberg.aws.dynamodb.DynamoDbCatalog),
// but we need to be able to compile against it to implement AwsClientFactory
compileOnly libs.awssdk.dynamodb

// We don't want to explicitly pull in dependencies for KMS (there doesn't seem to be anything in Iceberg that
// actually calls it?), but we need to be able to compile against it to implement AwsClientFactory
compileOnly libs.awssdk.kms

compileOnly libs.autoservice
annotationProcessor libs.autoservice.compiler

testImplementation libs.junit4
testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
Expand All @@ -44,12 +51,27 @@ dependencies {
testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

testImplementation platform(libs.junit.bom)
testImplementation libs.junit.jupiter
testRuntimeOnly libs.junit.jupiter.engine
testRuntimeOnly libs.junit.platform.launcher
}

TestTools.addEngineOutOfBandTest(project)
test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}

testOutOfBand.dependsOn Docker.registryTask(project, 'localstack')
testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')
dependsOn Docker.registryTask(project, 'localstack')
systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

testOutOfBand.dependsOn Docker.registryTask(project, 'minio')
testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
dependsOn Docker.registryTask(project, 'minio')
systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3;

import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3FileIOAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* An {@link AwsClientFactory} and {@link S3FileIOAwsClientFactory} implementation that assumes ownership of AWS client
* creation as configured via {@link S3Instructions}.
*/
public final class DeephavenAwsClientFactory implements AwsClientFactory, S3FileIOAwsClientFactory {

private static final String UUID_KEY = DeephavenAwsClientFactory.class.getName() + ".__uuid";

/**
* Adds {@link DeephavenAwsClientFactory} to {@code propertiesOut} with the keys
* {@value AwsProperties#CLIENT_FACTORY} and {@value S3FileIOProperties#CLIENT_FACTORY}; it is an error if either of
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
* these properties is already set. After the corresponding {@link org.apache.iceberg.catalog.Catalog} is no longer
* in use, the caller should invoke the returned {@link Runnable} to clean up.
*
* @param instructions the instructions
* @param propertiesOut the properties
* @return the runnable to be invoked after initialization
*/
public static Runnable addToProperties(S3Instructions instructions, Map<String, String> propertiesOut) {
Objects.requireNonNull(instructions);
putOrThrow(propertiesOut, AwsProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
putOrThrow(propertiesOut, S3FileIOProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
final String uuid = UUID.randomUUID().toString();
putOrThrow(propertiesOut, UUID_KEY, uuid);
S3_INSTRUCTIONS_MAP.put(uuid, instructions);
return () -> S3_INSTRUCTIONS_MAP.remove(uuid);
}

/**
* Get the {@link S3Instructions} as set in the corresponding {@link #addToProperties(S3Instructions, Map)} if the
* properties were built with that. If the properties were built with {@link #addToProperties(S3Instructions, Map)},
* but the {@link Runnable} was already invoked for cleanup, an {@link IllegalStateException} will be thrown.
*
* @param properties the properties
* @return the instructions
*/
public static Optional<S3Instructions> getInstructions(Map<String, String> properties) {
final String uuid = properties.get(UUID_KEY);
if (uuid == null) {
return Optional.empty();
}
final S3Instructions instructions = S3_INSTRUCTIONS_MAP.get(uuid);
if (instructions == null) {
throw new IllegalStateException(
"This S3Instructions were already cleaned up; please ensure that the returned Runnable from addToProperties is not invoked until the Catalog is no longer in use.");
}
return Optional.of(instructions);
}

private static <K, V> void putOrThrow(Map<K, V> map, K key, V value) {
if (map.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException(String.format("Key '%s' already exists in map", key));
}
}

private static final Map<String, S3Instructions> S3_INSTRUCTIONS_MAP = new ConcurrentHashMap<>();

private S3Instructions instructions;
rcaudy marked this conversation as resolved.
Show resolved Hide resolved

public DeephavenAwsClientFactory() {
// This follows the pattern established by other Iceberg classes that have an initialize method; they have a
// default value that is set in construction, with the expectation that they are properly constructed in the
// initialize call. While those implementations likely could be stricter and implemented defensively (throwing
// an exception if any other methods are called before initialize), that does not seem to be the pattern in use.
// We do not _expect_ the default instructions as set here to ever be used, but we are choosing to follow the
// "established convention" in the rare case there is some caller misusing this in a way that does not effect
// the correctness of the end Catalog.
this.instructions = S3Instructions.DEFAULT;
}

@Override
public void initialize(Map<String, String> properties) {
this.instructions = getInstructions(properties).orElseThrow(() -> new IllegalArgumentException(
"DeephavenAwsClientFactory was setup improperly; it must be configured with DeephavenAwsClientFactory.addToProperties"));
}

@Override
public S3Client s3() {
// Iceberg calls this from org.apache.iceberg.aws.s3.S3FileIO which multiple Catalog implementations use. This
// implementation is backed by the same configuration primitives that our own async S3 client uses. It is well
// tested and provides parity between how Iceberg S3 and Deephaven S3 clients are initialized.
return S3ClientFactory.getSyncClient(instructions);
}

@Override
public GlueClient glue() {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// Iceberg calls this from org.apache.iceberg.aws.glue.GlueCatalog, and it's possible that other
// custom Catalog implementations could make use out of this interface. This implementation has been manually
// tested and confirmed to work in simple cases.
return GlueClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}

@Override
public KmsClient kms() {
// Iceberg does not call this method. It is likely part of the interface to support advanced authorization that
// enterprise users may need. It's likely in those scenarios that the user is owning the full Catalog creation
// as well, with their own custom AwsClientFactory, so it's not clear if this implementation will be of value.
// That said, it is easy to build and follows the same pattern as the other clients, so it is provided in a
// "best-effort" basis without further testing.
return KmsClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}

@Override
public DynamoDbClient dynamo() {
// Iceberg calls this from org.apache.iceberg.aws.dynamodb.DynamoDbCatalog, and it's possible that other
// custom Catalog implementations could make use out of this interface. This implementation is easy to build
// and follows the same pattern as the other clients, so it is provided in a "best-effort" basis without further
// testing.
return DynamoDbClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
package io.deephaven.iceberg.util;

import com.google.common.base.Strings;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.util.reference.CleanupReferenceProcessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.HttpClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.rest.RESTCatalog;
Expand All @@ -20,8 +25,7 @@
* Tools for accessing tables in the Iceberg table format.
*/
@SuppressWarnings("unused")
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";
public final class IcebergToolsS3 {

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
Expand Down Expand Up @@ -104,4 +108,52 @@ public static IcebergCatalogAdapter createGlue(

return new IcebergCatalogAdapter(catalog, properties);
}

/**
* Create an Iceberg catalog adapter.
*
* <p>
* This is the preferred way to configure an Iceberg catalog adapter when the caller is responsible for providing
* AWS / S3 connectivity details; specifically, this allows for the parity of construction logic between
* Iceberg-managed and Deephaven-managed AWS clients. For advanced use-cases, users are encouraged to use
* {@link S3Instructions#profileName() profiles} which allows a rich degree of configurability. The
* {@code instructions} will automatically be used as special instructions if
* {@link IcebergReadInstructions#dataInstructions()} is not explicitly set. The caller is still responsible for
* providing any other properties necessary to configure their {@link org.apache.iceberg.catalog.Catalog}
* implementation.
*
* <p>
* In cases where the caller prefers to use Iceberg's AWS properties (found amongst {@link AwsProperties},
* {@link S3FileIOProperties}, and {@link HttpClientProperties}), they should use
* {@link IcebergTools#createAdapter(String, Map, Map) IcebergTools} directly. In this case, parity will be limited
* to what {@link S3InstructionsProviderPlugin} is able to infer; in advanced cases, it's possible that there will
* be a difference in construction logic between the Iceberg-managed and Deephaven-managed AWS clients which
* manifests itself as being able to browse {@link org.apache.iceberg.catalog.Catalog} metadata, but not retrieve
* {@link org.apache.iceberg.Table} data.
*
* <p>
* Note: this method does not explicitly set, nor impose, that {@link org.apache.iceberg.aws.s3.S3FileIO} be used.
* It's possible that a {@link org.apache.iceberg.catalog.Catalog} implementations depends on an AWS client for
* purposes unrelated to storing the warehouse data via S3.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param properties a map containing the Iceberg catalog properties to use
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* @param hadoopConfig a map containing Hadoop configuration properties to use
* @param instructions the s3 instructions
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createAdapter(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@Nullable final String name,
@NotNull final Map<String, String> properties,
@NotNull final Map<String, String> hadoopConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is hadoopConfig used in an S3-backed adapter with S3 file IO? Can we drop this as a parameter and create an empty config internal to this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question; org.apache.iceberg.aws.s3.S3FileIO does not use hadoop conf. That said, this method is not imposing S3FileIO. It looks like GlueCatalog is technically written in a way where hadoopConf can be passed along if something besides S3FileIO is used... maybe it's possible to use GlueCatalog and not use S3 as the warehouse... for example, maybe some sort of other AWS NFS storage, I'm not sure. I'm going to add more explicit documentation about this.

@NotNull final S3Instructions instructions) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, String> newProperties = new HashMap<>(properties);
final Runnable cleanup = DeephavenAwsClientFactory.addToProperties(instructions, newProperties);
final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(name, newProperties, hadoopConfig);
// When the Catalog becomes phantom reachable, we can invoke the DeephavenAwsClientFactory cleanup.
// Note: it would be incorrect to register the cleanup against the adapter since the Catalog can outlive the
// adapter (and the DeephavenAwsClientFactory properties are needed by the Catalog).
CleanupReferenceProcessor.getDefault().registerPhantom(adapter.catalog(), cleanup);
return adapter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.auto.service.AutoService;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderPlugin;
import org.apache.iceberg.aws.AwsClientProperties;
Expand All @@ -15,13 +16,21 @@
import java.util.Map;

/**
* {@link io.deephaven.iceberg.internal.DataInstructionsProviderPlugin} implementation used for reading files from S3.
* {@link DataInstructionsProviderPlugin} implementation for producing a {@link S3Instructions}. The produced
* instructions will be from {@link DeephavenAwsClientFactory#getInstructions(Map)} if present, and otherwise will make
* a best-effort attempt to create an equivalent instructions based on properties from {@link AwsClientProperties} and
* {@link S3FileIOProperties}.
*/
@AutoService(io.deephaven.iceberg.internal.DataInstructionsProviderPlugin.class)
@AutoService(DataInstructionsProviderPlugin.class)
@SuppressWarnings("unused")
public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin {
@Override
public Object createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
final S3Instructions s3Instructions = DeephavenAwsClientFactory.getInstructions(properties).orElse(null);
if (s3Instructions != null) {
return s3Instructions;
}

// If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can
// create a useful S3Instructions object.
if (uri.getScheme().equals("s3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -15,9 +16,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we had to deprecate these?
They might still give us coverage on a lot of smaller cases that Larry tested.
Is it deprecated in a sense that new tests should not be added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly that I think we should not add new tests here, and work to migrating them as mentioned in IcebergToolsTest. @lbooker42 has so far owned this layer, but it should be relatively easy to migrate to db_resource, and then we get the benefit of:

  1. No separate container (s3) needed (+ no need to upload files)
  2. Ne need to have custom catalog IcebergTestCatalog
  3. On disk JDBC catalog + on disk warehouse

I see db_resource as mainly a way to test out how well we can interoperate with Iceberg that has been written via different processes (pyiceberg, spark, etc).

For more thorough testing (once we have our own writing support) we should be able to extend SqliteCatalogBase (or create further specialized tests that look similar to it), which can work with any warehouse - currently the same logic is tested out via local disk, minio, and localstack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, these new catalog testing code is the more comprehensive way for all future tests. Once we can migrate these tests so we don't lose any coverage, we should remove these as well as the IcebergTestCatalog class.
cc: @lbooker42

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur.

public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
@BeforeAll
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
Expand All @@ -34,7 +37,7 @@ public S3AsyncClient s3AsyncClient() {
}

@Override
public Map<String, String> s3Properties() {
public Map<String, String> properties() {
return Map.of(
ENDPOINT, LocalStack.s3Endpoint(),
CLIENT_REGION, LocalStack.region(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -17,9 +18,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
@Deprecated
public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
@BeforeAll
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
Expand All @@ -38,12 +41,11 @@ public S3AsyncClient s3AsyncClient() {
}

@Override
public Map<String, String> s3Properties() {
public Map<String, String> properties() {
return Map.of(
ENDPOINT, MinIO.s3Endpoint(),
CLIENT_REGION, MinIO.region(),
ACCESS_KEY_ID, MinIO.accessKey(),
SECRET_ACCESS_KEY, MinIO.secretAccessKey());
}

}
Loading