diff --git a/build.gradle b/build.gradle index 21e52608..89a341be 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ subprojects { apply plugin: "maven-publish" group "io.tabular.connect" - version "0.4.9-SNAPSHOT" + version "0.4.9" repositories { mavenCentral() diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/TestContext.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/TestContext.java index 17fc3704..02404d27 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/TestContext.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/TestContext.java @@ -63,9 +63,9 @@ public class TestContext { private static final String KC_PLUGIN_DIR = "/test/kafka-connect"; private static final String MINIO_IMAGE = "minio/minio"; - private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.4.0"; - private static final String CONNECT_IMAGE = "confluentinc/cp-kafka-connect:7.4.0"; - private static final String REST_CATALOG_IMAGE = "tabulario/iceberg-rest:0.5.0"; + private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.4.1"; + private static final String CONNECT_IMAGE = "confluentinc/cp-kafka-connect:7.4.1"; + private static final String REST_CATALOG_IMAGE = "tabulario/iceberg-rest:0.6.0"; private TestContext() { network = Network.newNetwork(); diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index e2ba7285..e3027d8b 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -7,6 +7,7 @@ dependencies { compileOnly libs.bundles.kafka.connect testImplementation(testFixtures(project(":iceberg-kafka-connect-events"))) + testImplementation libs.hadoop.common testImplementation libs.junit.api testRuntimeOnly libs.junit.engine diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/BaseDeltaTaskWriter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/BaseDeltaTaskWriter.java index 5ba8af38..e8da2900 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/BaseDeltaTaskWriter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/BaseDeltaTaskWriter.java @@ -39,6 +39,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema deleteSchema; private final InternalRecordWrapper wrapper; private final InternalRecordWrapper keyWrapper; + private final RecordProjection keyProjection; private final boolean upsertMode; BaseDeltaTaskWriter( @@ -55,6 +56,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(schema.identifierFieldIds())); this.wrapper = new InternalRecordWrapper(schema.asStruct()); this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct()); + this.keyProjection = RecordProjection.create(schema, deleteSchema); this.upsertMode = upsertMode; } @@ -72,8 +74,7 @@ public void write(Record row) throws IOException { : upsertMode ? Operation.UPDATE : Operation.INSERT; RowDataDeltaWriter writer = route(row); if (op == Operation.UPDATE || op == Operation.DELETE) { - // TODO: use deleteKey() - writer.delete(row); + writer.deleteKey(keyProjection.wrap(row)); } if (op == Operation.UPDATE || op == Operation.INSERT) { writer.write(row); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordProjection.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordProjection.java new file mode 100644 index 00000000..cc1e4f30 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordProjection.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + +/** + * This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types. + */ +public class RecordProjection implements Record { + + /** + * Creates a projecting wrapper for {@link Record} rows. + * + *

This projection does not work with repeated types like lists and maps. + * + * @param dataSchema schema of rows wrapped by this projection + * @param projectedSchema result schema of the projected rows + * @return a wrapper to project rows + */ + public static RecordProjection create(Schema dataSchema, Schema projectedSchema) { + return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct()); + } + + private final StructType type; + private final int[] positionMap; + private final RecordProjection[] nestedProjections; + private Record record; + + private RecordProjection(StructType structType, StructType projection) { + this(structType, projection, false); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private RecordProjection(StructType structType, StructType projection, boolean allowMissing) { + this.type = projection; + this.positionMap = new int[projection.fields().size()]; + this.nestedProjections = new RecordProjection[projection.fields().size()]; + + // set up the projection positions and any nested projections that are needed + List dataFields = structType.fields(); + for (int pos = 0; pos < positionMap.length; pos += 1) { + Types.NestedField projectedField = projection.fields().get(pos); + + boolean found = false; + for (int i = 0; !found && i < dataFields.size(); i += 1) { + Types.NestedField dataField = dataFields.get(i); + if (projectedField.fieldId() == dataField.fieldId()) { + found = true; + positionMap[pos] = i; + switch (projectedField.type().typeId()) { + case STRUCT: + nestedProjections[pos] = + new RecordProjection( + dataField.type().asStructType(), projectedField.type().asStructType()); + break; + case MAP: + MapType projectedMap = projectedField.type().asMapType(); + MapType originalMap = dataField.type().asMapType(); + + boolean keyProjectable = + !projectedMap.keyType().isNestedType() + || projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = + !projectedMap.valueType().isNestedType() + || projectedMap.valueType().equals(originalMap.valueType()); + Preconditions.checkArgument( + keyProjectable && valueProjectable, + "Cannot project a partial map key or value struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + case LIST: + ListType projectedList = projectedField.type().asListType(); + ListType originalList = dataField.type().asListType(); + + boolean elementProjectable = + !projectedList.elementType().isNestedType() + || projectedList.elementType().equals(originalList.elementType()); + Preconditions.checkArgument( + elementProjectable, + "Cannot project a partial list element struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + default: + nestedProjections[pos] = null; + } + } + } + + if (!found && projectedField.isOptional() && allowMissing) { + positionMap[pos] = -1; + nestedProjections[pos] = null; + } else if (!found) { + throw new IllegalArgumentException( + String.format("Cannot find field %s in %s", projectedField, structType)); + } + } + } + + public RecordProjection wrap(Record newRecord) { + this.record = newRecord; + return this; + } + + @Override + public int size() { + return type.fields().size(); + } + + @Override + public T get(int pos, Class javaClass) { + // struct can be null if wrap is not called first before the get call + // or if a null struct is wrapped. + if (record == null) { + return null; + } + + int recordPos = positionMap[pos]; + if (nestedProjections[pos] != null) { + Record nestedStruct = record.get(recordPos, Record.class); + if (nestedStruct == null) { + return null; + } + + return javaClass.cast(nestedProjections[pos].wrap(nestedStruct)); + } + + if (recordPos != -1) { + return record.get(recordPos, javaClass); + } else { + return null; + } + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException(); + } + + @Override + public StructType struct() { + return type; + } + + @Override + public Object getField(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void setField(String name, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public Object get(int pos) { + return get(pos, Object.class); + } + + @Override + public Record copy() { + throw new UnsupportedOperationException(); + } + + @Override + public Record copy(Map overwriteValues) { + throw new UnsupportedOperationException(); + } +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java index 856a099d..e9151e19 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java @@ -26,6 +26,7 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import java.lang.reflect.InvocationTargetException; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileFormat; @@ -37,7 +38,9 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.kafka.connect.data.Struct; import org.slf4j.Logger; @@ -88,14 +91,23 @@ public static TaskWriter createTableWriter(Table table, IcebergSinkConfi PropertyUtil.propertyAsLong( table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); - FileAppenderFactory appenderFactory = - new GenericAppenderFactory( - table.schema(), - table.spec(), - Ints.toArray(table.schema().identifierFieldIds()), - table.schema(), - null) - .setAll(table.properties()); + Set equalityFieldIds = table.schema().identifierFieldIds(); + + FileAppenderFactory appenderFactory; + if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { + appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec(), null, null, null) + .setAll(table.properties()); + } else { + appenderFactory = + new GenericAppenderFactory( + table.schema(), + table.spec(), + Ints.toArray(equalityFieldIds), + TypeUtil.select(table.schema(), Sets.newHashSet(equalityFieldIds)), + null) + .setAll(table.properties()); + } // (partition ID + task ID + operation ID) must be unique OutputFileFactory fileFactory = diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java new file mode 100644 index 00000000..0b8d926d --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.LocationProviders; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; + +public class BaseWriterTest { + + protected InMemoryFileIO fileIO; + protected Table table; + + protected static final Schema SCHEMA = + new Schema( + ImmutableList.of( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "id2", Types.LongType.get())), + ImmutableSet.of(1, 3)); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("data").build(); + + @BeforeEach + public void setup() { + fileIO = new InMemoryFileIO(); + + table = mock(Table.class); + when(table.schema()).thenReturn(SCHEMA); + when(table.spec()).thenReturn(PartitionSpec.unpartitioned()); + when(table.io()).thenReturn(fileIO); + when(table.locationProvider()) + .thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of())); + when(table.encryption()).thenReturn(new PlaintextEncryptionManager()); + when(table.properties()).thenReturn(ImmutableMap.of()); + } + + protected WriteResult writeTest( + List rows, IcebergSinkConfig config, Class expectedWriterClass) { + try (TaskWriter writer = Utilities.createTableWriter(table, config)) { + assertEquals(expectedWriterClass, writer.getClass()); + + rows.forEach( + row -> { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + return writer.complete(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/PartitionedAppendWriterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/PartitionedAppendWriterTest.java new file mode 100644 index 00000000..e7615364 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/PartitionedAppendWriterTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class PartitionedAppendWriterTest extends BaseWriterTest { + + @Test + public void testPartitionedAppendWriter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + + when(table.spec()).thenReturn(SPEC); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = + writeTest(ImmutableList.of(row1, row2), config, PartitionedAppendWriter.class); + + // 1 data file for each partition (2 total) + assertEquals(2, result.dataFiles().length); + assertEquals(0, result.deleteFiles().length); + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/PartitionedDeltaWriterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/PartitionedDeltaWriterTest.java new file mode 100644 index 00000000..8454b6b0 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/PartitionedDeltaWriterTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class PartitionedDeltaWriterTest extends BaseWriterTest { + + @Test + public void testPartitionedDeltaWriter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.isUpsertMode()).thenReturn(true); + + when(table.spec()).thenReturn(SPEC); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = + writeTest(ImmutableList.of(row1, row2), config, PartitionedDeltaWriter.class); + + // in upsert mode, each write is a delete + append, so we'll have 1 data file + // and 1 delete file for each partition (2 total) + assertEquals(2, result.dataFiles().length); + assertEquals(2, result.deleteFiles().length); + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UnpartitionedDeltaWriterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UnpartitionedDeltaWriterTest.java new file mode 100644 index 00000000..da203e5d --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UnpartitionedDeltaWriterTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class UnpartitionedDeltaWriterTest extends BaseWriterTest { + + @Test + public void testUnpartitionedDeltaWriter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.isUpsertMode()).thenReturn(true); + + Record row = GenericRecord.create(SCHEMA); + row.setField("id", 123L); + row.setField("data", "hello world!"); + row.setField("id2", 123L); + + WriteResult result = writeTest(ImmutableList.of(row), config, UnpartitionedDeltaWriter.class); + + // in upsert mode, each write is a delete + append, so we'll have 1 data file + // and 1 delete file + assertEquals(1, result.dataFiles().length); + assertEquals(1, result.deleteFiles().length); + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UnpartitionedWriterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UnpartitionedWriterTest.java new file mode 100644 index 00000000..806f7cb6 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UnpartitionedWriterTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class UnpartitionedWriterTest extends BaseWriterTest { + + @Test + public void testUnpartitionedWriter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = writeTest(ImmutableList.of(row1, row2), config, UnpartitionedWriter.class); + + assertEquals(1, result.dataFiles().length); + assertEquals(0, result.deleteFiles().length); + } +} diff --git a/versions.toml b/versions.toml index c4ce133b..2de1780d 100644 --- a/versions.toml +++ b/versions.toml @@ -6,10 +6,10 @@ aws-ver = "2.20.18" hadoop-ver = "3.3.6" hive-ver = "2.3.9" http-client-ver = "5.2.1" -iceberg-ver = "1.3.1-tabular.4" +iceberg-ver = "1.3.1-tabular.11" jackson-ver = "2.14.2" junit-ver = "5.9.2" -kafka-ver = "3.4.0" +kafka-ver = "3.5.1" slf4j-ver = "1.7.36" testcontainers-ver = "1.18.1"