Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write iceberg tables #5989

Merged
merged 64 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
8c81883
Initial commit
malhotrashivam Aug 27, 2024
758c1f3
Added type info map and modified instructions class hierarchy
malhotrashivam Aug 28, 2024
48cb8d8
Minor tweaks to the instructions class hierarchy
malhotrashivam Aug 28, 2024
244bc99
Merged writeTable and appendTable into addPartition
malhotrashivam Aug 29, 2024
09340c2
Split IcebergParquetWriteInstructions into WriteInstr and ParquetWrit…
malhotrashivam Aug 30, 2024
33b60e2
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 25, 2024
c70b50e
Resolving more conflicts
malhotrashivam Sep 25, 2024
cd278ab
Merge branch 'main' into sm-ice-write
malhotrashivam Sep 30, 2024
689e8a1
Added unit tests and moved Iceberg tests to Junit5
malhotrashivam Sep 30, 2024
d7f2c81
Preparing change for code review Part 1
malhotrashivam Oct 1, 2024
131a552
Preparing for review Part 2
malhotrashivam Oct 1, 2024
3021585
Added more unit tests
malhotrashivam Oct 1, 2024
9f82ba0
Review with Larry part 1
malhotrashivam Oct 3, 2024
cbae64e
Fix for failing job
malhotrashivam Oct 3, 2024
7de59b0
Review with Larry Part 2
malhotrashivam Oct 3, 2024
83a0b14
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 3, 2024
c83ddbd
Review with Devin Part 1
malhotrashivam Oct 7, 2024
f0f86cc
Fix for failing jobs
malhotrashivam Oct 7, 2024
adb21e9
Review with Devin Part 2
malhotrashivam Oct 8, 2024
744ce60
Review with Devin Part 3
malhotrashivam Oct 8, 2024
a8252ce
Review with Devin Part 4
malhotrashivam Oct 8, 2024
38f55f0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 14, 2024
5a64faf
Minor tweaks
malhotrashivam Oct 14, 2024
ba70f1a
More tweaks
malhotrashivam Oct 14, 2024
96db353
Updated some comments
malhotrashivam Oct 14, 2024
6e2c233
Updated javadoc and added new tests
malhotrashivam Oct 15, 2024
de6eba0
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 17, 2024
0ebeba2
Review with Ryan Part 1
malhotrashivam Oct 17, 2024
31f46ba
Review with Ryan Part 2
malhotrashivam Oct 18, 2024
946def0
Fix for failing parquet reads
malhotrashivam Oct 18, 2024
bd8535c
Added more tests for writeDataFile
malhotrashivam Oct 18, 2024
78bd605
Added tests for on write callback
malhotrashivam Oct 18, 2024
500cfe6
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 21, 2024
e2aba1f
Added support for writing partitioned tables
malhotrashivam Oct 22, 2024
e4a936e
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 23, 2024
b32ad68
Minor tweaks
malhotrashivam Oct 24, 2024
98d7e8e
Merge branch 'main' into sm-ice-write
malhotrashivam Oct 28, 2024
f77e46c
Continuing the rebase
malhotrashivam Oct 28, 2024
a1d7912
Merge branch 'main' into sm-ice-write
malhotrashivam Nov 3, 2024
4aae03d
Resolving more conflicts
malhotrashivam Nov 3, 2024
b9d198a
Fix for failing tests
malhotrashivam Nov 3, 2024
19f5715
Minor tweaks to tests
malhotrashivam Nov 3, 2024
b4204bc
Minor changes to interface
malhotrashivam Nov 4, 2024
06bc407
Review with Ryan and Devin contd.
malhotrashivam Nov 6, 2024
0d5d213
Added more tests for refreshing iceberg + partitioned append
malhotrashivam Nov 6, 2024
7161d19
Added some comments
malhotrashivam Nov 7, 2024
43d26d7
Merge branch 'main' into sm-ice-write
malhotrashivam Nov 11, 2024
34135b1
Review with Devin Part 1
malhotrashivam Nov 11, 2024
9d05a1d
Review with Devin Part 2
malhotrashivam Nov 13, 2024
e39dced
Review with Devin contd.
malhotrashivam Nov 15, 2024
cdc9263
Merge branch 'main' into sm-ice-write
malhotrashivam Nov 15, 2024
1fa0146
Merge branch 'main' into sm-ice-write
malhotrashivam Nov 20, 2024
0c9e522
Resolving more conflicts
malhotrashivam Nov 20, 2024
51701a4
Fixes in javadoc
malhotrashivam Nov 20, 2024
2b09e0c
Added more tests and updated Python support
malhotrashivam Nov 20, 2024
5121849
More python changes
malhotrashivam Nov 21, 2024
d26bc86
Python code review with Jianfeng Part
malhotrashivam Nov 21, 2024
5cb0d5a
Drop support for overwriting
malhotrashivam Nov 21, 2024
aa49f7f
Review with Devin and Jianfeng/Chip contd.
malhotrashivam Nov 22, 2024
2dacdf8
Continuing review with Devin
malhotrashivam Nov 22, 2024
d0a09fa
Review with Chip contd.
malhotrashivam Nov 22, 2024
776432a
Review with Devin contd.
malhotrashivam Nov 22, 2024
95743c3
Review with Devin contd.
malhotrashivam Nov 22, 2024
ff31d13
Added compatibility check for table defintions
malhotrashivam Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies {
compileOnly libs.autoservice
annotationProcessor libs.autoservice.compiler

testImplementation libs.junit4
testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
Expand All @@ -45,10 +44,20 @@ dependencies {
testRuntimeOnly libs.slf4j.simple
}

TestTools.addEngineOutOfBandTest(project)
test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}

testOutOfBand.dependsOn Docker.registryTask(project, 'localstack')
testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')
dependsOn Docker.registryTask(project, 'localstack')
systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

testOutOfBand.dependsOn Docker.registryTask(project, 'minio')
testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
dependsOn Docker.registryTask(project, 'minio')
systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.rest.RESTCatalog;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -16,12 +18,10 @@
import java.util.Map;

/**
* Tools for accessing tables in the Iceberg table format.
* Tools for accessing tables in the Iceberg table format from S3.
*/
@SuppressWarnings("unused")
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
Expand All @@ -47,14 +47,6 @@ public static IcebergCatalogAdapter createS3Rest(

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final RESTCatalog catalog = new RESTCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Configure the properties map from the Iceberg instructions.
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
properties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId);
properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey);
Expand All @@ -66,9 +58,8 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);
return IcebergCatalogAdapter.of(catalog);
final Catalog catalog = new RESTCatalog();
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

/**
Expand All @@ -90,15 +81,27 @@ public static IcebergCatalogAdapter createGlue(
// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();
final Catalog catalog = new GlueCatalog();
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

private static IcebergCatalogAdapter createAdapterCommon(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation,
@NotNull final Catalog catalog,
@NotNull final Map<String, String> properties) {
properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Following is needed to write new manifest files when writing new data.
// Not setting this will result in using ResolvingFileIO.
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, properties);
return IcebergCatalogAdapter.of(catalog, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package io.deephaven.iceberg.util;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -15,10 +16,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

public class IcebergLocalStackTest extends IcebergToolsTest {
@Tag("testcontainers")
class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public static void initContainer() {
@BeforeAll
static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -17,10 +18,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

public class IcebergMinIOTest extends IcebergToolsTest {
@Tag("testcontainers")
class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
public static void initContainer() {
@BeforeAll
static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
// ensure container is started so container startup time isn't associated with a specific test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import io.deephaven.parquet.table.ParquetInstructions;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class IcebergParquetWriteInstructionsTest {

@Test
void defaults() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder().build();
assertThat(instructions.tableDefinition().isEmpty()).isTrue();
assertThat(instructions.dataInstructions().isEmpty()).isTrue();
assertThat(instructions.dhToIcebergColumnRenames().isEmpty()).isTrue();
assertThat(instructions.createTableIfNotExist()).isFalse();
assertThat(instructions.verifySchema()).isEmpty();
assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY");
assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576);
assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576);
}

@Test
void testSetCreateTableIfNotExist() {
assertThat(IcebergParquetWriteInstructions.builder()
.createTableIfNotExist(true)
.build()
.createTableIfNotExist())
.isTrue();
}

@Test
void testSetVerifySchema() {
assertThat(IcebergParquetWriteInstructions.builder()
.verifySchema(true)
.build()
.verifySchema())
.hasValue(true);
}

@Test
void testSetCompressionCodecName() {
assertThat(IcebergParquetWriteInstructions.builder()
.compressionCodecName("GZIP")
.build()
.compressionCodecName())
.isEqualTo("GZIP");
}

@Test
void testSetMaximumDictionaryKeys() {
assertThat(IcebergParquetWriteInstructions.builder()
.maximumDictionaryKeys(100)
.build()
.maximumDictionaryKeys())
.isEqualTo(100);
}

@Test
void testSetMaximumDictionarySize() {
assertThat(IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(100)
.build()
.maximumDictionarySize())
.isEqualTo(100);
}

@Test
void testSetTargetPageSize() {
assertThat(IcebergParquetWriteInstructions.builder()
.targetPageSize(1 << 20)
.build()
.targetPageSize())
.isEqualTo(1 << 20);
}

@Test
void testMinMaximumDictionaryKeys() {

try {
IcebergParquetWriteInstructions.builder()
.maximumDictionaryKeys(-1)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionaryKeys");
}
}

@Test
void testMinMaximumDictionarySize() {
try {
IcebergParquetWriteInstructions.builder()
.maximumDictionarySize(-1)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maximumDictionarySize");
}
}

@Test
void testMinTargetPageSize() {
try {
IcebergParquetWriteInstructions.builder()
.targetPageSize(1024)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("targetPageSize");
}
}

@Test
void testSetToIcebergColumnRename() {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.putDhToIcebergColumnRenames("dh1", "ice1")
.putDhToIcebergColumnRenames("dh2", "ice2")
.build();
assertThat(instructions.dhToIcebergColumnRenames().size()).isEqualTo(2);
assertThat(instructions.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1");
assertThat(instructions.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2");

final IcebergParquetWriteInstructions instructions2 = IcebergParquetWriteInstructions.builder()
.putAllDhToIcebergColumnRenames(Map.of(
"dh1", "ice1",
"dh2", "ice2",
"dh3", "ice3"))
.build();
assertThat(instructions2.dhToIcebergColumnRenames().size()).isEqualTo(3);
assertThat(instructions2.dhToIcebergColumnRenames().get("dh1")).isEqualTo("ice1");
assertThat(instructions2.dhToIcebergColumnRenames().get("dh2")).isEqualTo("ice2");
assertThat(instructions2.dhToIcebergColumnRenames().get("dh3")).isEqualTo("ice3");
}

@Test
void testToIcebergColumnRenameUniqueness() {
try {
IcebergParquetWriteInstructions.builder()
.putDhToIcebergColumnRenames("dh1", "ice1")
.putDhToIcebergColumnRenames("dh2", "ice1")
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (final IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Duplicate values in column renames");
}
}

@Test
void toParquetInstructionTest() {
final IcebergParquetWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder()
.compressionCodecName("GZIP")
.maximumDictionaryKeys(100)
.maximumDictionarySize(200)
.targetPageSize(1 << 20)
.build();
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions(
null, fieldIdToName);

assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
assertThat(parquetInstructions.getMaximumDictionarySize()).isEqualTo(200);
assertThat(parquetInstructions.getTargetPageSize()).isEqualTo(1 << 20);
assertThat(parquetInstructions.getFieldId("field1")).isEmpty();
assertThat(parquetInstructions.getFieldId("field2")).hasValue(2);
assertThat(parquetInstructions.getFieldId("field3")).hasValue(3);
assertThat(parquetInstructions.onWriteCompleted()).isEmpty();
}
}
Loading