From bb2e0ad37bf521aebaf4839cf989855ae062f2c9 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 14 Jan 2025 14:40:32 -0500 Subject: [PATCH] Revert "Add support for Iceberg table identifiers with special characters (#33293)" (#33575) This reverts commit d6e0b0c5c0f4cd8acad230833c2ed42e220d6178. --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 4 +- .../sdk/io/iceberg/AppendFilesToTables.java | 3 +- .../beam/sdk/io/iceberg/FileWriteResult.java | 5 +-- .../IcebergReadSchemaTransformProvider.java | 3 +- .../sdk/io/iceberg/IcebergScanConfig.java | 7 +--- .../beam/sdk/io/iceberg/IcebergUtils.java | 17 -------- .../iceberg/OneTableDynamicDestinations.java | 4 +- .../iceberg/PortableIcebergDestinations.java | 3 +- .../sdk/io/iceberg/IcebergIOReadTest.java | 24 ++--------- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 40 ------------------- 11 files changed, 18 insertions(+), 94 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index b73af5e61a43..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 1775dfc5b77b..41e12921e6f8 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -55,10 +55,8 @@ dependencies { implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" - runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" implementation library.java.hadoop_common - implementation library.java.jackson_core - implementation library.java.jackson_databind + runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation project(":sdks:java:managed") testImplementation library.java.hadoop_client diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 72220faf3004..fed72a381d5e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -47,6 +47,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; @@ -132,7 +133,7 @@ public void processElement( return; } - Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey())); + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); // vast majority of the time, we will simply append data files. // in the rare case we get a batch that contains multiple partition specs, we will group diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index d58ac8696d37..bf00bf8519fc 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -25,7 +25,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.catalog.TableIdentifierParser; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @AutoValue @@ -42,7 +41,7 @@ abstract class FileWriteResult { @SchemaIgnore public TableIdentifier getTableIdentifier() { if (cachedTableIdentifier == null) { - cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString()); + cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString()); } return cachedTableIdentifier; } @@ -68,7 +67,7 @@ abstract static class Builder { @SchemaIgnore public Builder setTableIdentifier(TableIdentifier tableId) { - return setTableIdentifierString(TableIdentifierParser.toJson(tableId)); + return setTableIdentifierString(tableId.toString()); } public abstract FileWriteResult build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 951442e2c95f..d44149fda08e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.catalog.TableIdentifier; /** * SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and @@ -85,7 +86,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .getPipeline() .apply( IcebergIO.readRows(configuration.getIcebergCatalog()) - .from(IcebergUtils.parseTableIdentifier(configuration.getTable()))); + .from(TableIdentifier.parse(configuration.getTable()))); return PCollectionRowTuple.of(OUTPUT_TAG, output); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 640283d83c2e..60372b172af7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -23,7 +23,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.expressions.Expression; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -52,9 +51,7 @@ public enum ScanType { public Table getTable() { if (cachedTable == null) { cachedTable = - getCatalogConfig() - .catalog() - .loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier())); + getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier())); } return cachedTable; } @@ -129,7 +126,7 @@ public abstract static class Builder { public abstract Builder setTableIdentifier(String tableIdentifier); public Builder setTableIdentifier(TableIdentifier tableIdentifier) { - return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier)); + return this.setTableIdentifier(tableIdentifier.toString()); } public Builder setTableIdentifier(String... names) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index bd2f743172dc..ef19a5881366 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -19,9 +19,6 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; @@ -39,8 +36,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -52,9 +47,6 @@ /** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private IcebergUtils() {} private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = @@ -514,13 +506,4 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } - - public static TableIdentifier parseTableIdentifier(String table) { - try { - JsonNode jsonNode = OBJECT_MAPPER.readTree(table); - return TableIdentifierParser.fromJson(jsonNode); - } catch (JsonProcessingException e) { - return TableIdentifier.parse(table); - } - } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java index be810aa20a13..861a8ad198a8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java @@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable @VisibleForTesting TableIdentifier getTableIdentifier() { if (tableId == null) { - tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString)); + tableId = TableIdentifier.parse(checkStateNotNull(tableIdString)); } return tableId; } @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException { tableIdString = in.readUTF(); - tableId = IcebergUtils.parseTableIdentifier(tableIdString); + tableId = TableIdentifier.parse(tableIdString); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java index 58f70463bc76..47f661bba3f8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; class PortableIcebergDestinations implements DynamicDestinations { @@ -72,7 +73,7 @@ public String getTableStringIdentifier(ValueInSingleWindow element) { @Override public IcebergDestination instantiateDestination(String dest) { return IcebergDestination.builder() - .setTableIdentifier(IcebergUtils.parseTableIdentifier(dest)) + .setTableIdentifier(TableIdentifier.parse(dest)) .setTableCreateConfig(null) .setFileFormat(FileFormat.fromString(fileFormat)) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 1c3f9b53f31a..0406ff31e61e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -23,8 +23,6 @@ import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -70,11 +68,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(Parameterized.class) +@RunWith(JUnit4.class) public class IcebergIOReadTest { private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class); @@ -85,21 +83,6 @@ public class IcebergIOReadTest { @Rule public TestPipeline testPipeline = TestPipeline.create(); - @Parameterized.Parameters - public static Collection data() { - return Arrays.asList( - new Object[][] { - {String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())}, - {String.format("default.%s", tableId())}, - }); - } - - public static String tableId() { - return "table" + Long.toString(UUID.randomUUID().hashCode(), 16); - } - - @Parameterized.Parameter public String tableStringIdentifier; - static class PrintRow extends DoFn { @ProcessElement @@ -111,7 +94,8 @@ public void process(@Element Row row, OutputReceiver output) throws Excepti @Test public void testSimpleScan() throws Exception { - TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier); + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 918c6b1146ee..134f05c34bfb 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -19,13 +19,11 @@ import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType; -import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.math.BigDecimal; @@ -34,7 +32,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -52,7 +49,6 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.junit.runners.Parameterized; /** Test class for {@link IcebergUtils}. */ @RunWith(Enclosed.class) @@ -806,40 +802,4 @@ public void testStructIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema); } } - - @RunWith(Parameterized.class) - public static class TableIdentifierParseTests { - - @Parameterized.Parameters - public static Collection data() { - return Arrays.asList( - new Object[][] { - { - "{\"namespace\": [\"dogs\", \"owners.and.handlers\"], \"name\": \"food\"}", - "dogs.owners.and.handlers.food", - true - }, - {"dogs.owners.and.handlers.food", "dogs.owners.and.handlers.food", true}, - {"{\"name\": \"food\"}", "food", true}, - {"{\"table_name\": \"food\"}", "{\"table_name\": \"food\"}", false}, - }); - } - - @Parameterized.Parameter public String input; - - @Parameterized.Parameter(1) - public String expected; - - @Parameterized.Parameter(2) - public boolean shouldSucceed; - - @Test - public void test() { - if (shouldSucceed) { - assertEquals(expected, parseTableIdentifier(input).toString()); - } else { - assertThrows(IllegalArgumentException.class, () -> parseTableIdentifier(input)); - } - } - } }