Skip to content

Commit

Permalink
Write only key fields to delete files (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Aug 2, 2023
1 parent a03a7ac commit 5331c8d
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.4.9-SNAPSHOT"
version "0.4.9"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public class TestContext {
private static final String KC_PLUGIN_DIR = "/test/kafka-connect";

private static final String MINIO_IMAGE = "minio/minio";
private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.4.0";
private static final String CONNECT_IMAGE = "confluentinc/cp-kafka-connect:7.4.0";
private static final String REST_CATALOG_IMAGE = "tabulario/iceberg-rest:0.5.0";
private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.4.1";
private static final String CONNECT_IMAGE = "confluentinc/cp-kafka-connect:7.4.1";
private static final String REST_CATALOG_IMAGE = "tabulario/iceberg-rest:0.6.0";

private TestContext() {
network = Network.newNetwork();
Expand Down
1 change: 1 addition & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
compileOnly libs.bundles.kafka.connect

testImplementation(testFixtures(project(":iceberg-kafka-connect-events")))
testImplementation libs.hadoop.common

testImplementation libs.junit.api
testRuntimeOnly libs.junit.engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
private final Schema deleteSchema;
private final InternalRecordWrapper wrapper;
private final InternalRecordWrapper keyWrapper;
private final RecordProjection keyProjection;
private final boolean upsertMode;

BaseDeltaTaskWriter(
Expand All @@ -55,6 +56,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(schema.identifierFieldIds()));
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct());
this.keyProjection = RecordProjection.create(schema, deleteSchema);
this.upsertMode = upsertMode;
}

Expand All @@ -72,8 +74,7 @@ public void write(Record row) throws IOException {
: upsertMode ? Operation.UPDATE : Operation.INSERT;
RowDataDeltaWriter writer = route(row);
if (op == Operation.UPDATE || op == Operation.DELETE) {
// TODO: use deleteKey()
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
if (op == Operation.UPDATE || op == Operation.INSERT) {
writer.write(row);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

/**
* This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types.
*/
public class RecordProjection implements Record {

/**
* Creates a projecting wrapper for {@link Record} rows.
*
* <p>This projection does not work with repeated types like lists and maps.
*
* @param dataSchema schema of rows wrapped by this projection
* @param projectedSchema result schema of the projected rows
* @return a wrapper to project rows
*/
public static RecordProjection create(Schema dataSchema, Schema projectedSchema) {
return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct());
}

private final StructType type;
private final int[] positionMap;
private final RecordProjection[] nestedProjections;
private Record record;

private RecordProjection(StructType structType, StructType projection) {
this(structType, projection, false);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private RecordProjection(StructType structType, StructType projection, boolean allowMissing) {
this.type = projection;
this.positionMap = new int[projection.fields().size()];
this.nestedProjections = new RecordProjection[projection.fields().size()];

// set up the projection positions and any nested projections that are needed
List<NestedField> dataFields = structType.fields();
for (int pos = 0; pos < positionMap.length; pos += 1) {
Types.NestedField projectedField = projection.fields().get(pos);

boolean found = false;
for (int i = 0; !found && i < dataFields.size(); i += 1) {
Types.NestedField dataField = dataFields.get(i);
if (projectedField.fieldId() == dataField.fieldId()) {
found = true;
positionMap[pos] = i;
switch (projectedField.type().typeId()) {
case STRUCT:
nestedProjections[pos] =
new RecordProjection(
dataField.type().asStructType(), projectedField.type().asStructType());
break;
case MAP:
MapType projectedMap = projectedField.type().asMapType();
MapType originalMap = dataField.type().asMapType();

boolean keyProjectable =
!projectedMap.keyType().isNestedType()
|| projectedMap.keyType().equals(originalMap.keyType());
boolean valueProjectable =
!projectedMap.valueType().isNestedType()
|| projectedMap.valueType().equals(originalMap.valueType());
Preconditions.checkArgument(
keyProjectable && valueProjectable,
"Cannot project a partial map key or value struct. Trying to project %s out of %s",
projectedField,
dataField);

nestedProjections[pos] = null;
break;
case LIST:
ListType projectedList = projectedField.type().asListType();
ListType originalList = dataField.type().asListType();

boolean elementProjectable =
!projectedList.elementType().isNestedType()
|| projectedList.elementType().equals(originalList.elementType());
Preconditions.checkArgument(
elementProjectable,
"Cannot project a partial list element struct. Trying to project %s out of %s",
projectedField,
dataField);

nestedProjections[pos] = null;
break;
default:
nestedProjections[pos] = null;
}
}
}

if (!found && projectedField.isOptional() && allowMissing) {
positionMap[pos] = -1;
nestedProjections[pos] = null;
} else if (!found) {
throw new IllegalArgumentException(
String.format("Cannot find field %s in %s", projectedField, structType));
}
}
}

public RecordProjection wrap(Record newRecord) {
this.record = newRecord;
return this;
}

@Override
public int size() {
return type.fields().size();
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
// struct can be null if wrap is not called first before the get call
// or if a null struct is wrapped.
if (record == null) {
return null;
}

int recordPos = positionMap[pos];
if (nestedProjections[pos] != null) {
Record nestedStruct = record.get(recordPos, Record.class);
if (nestedStruct == null) {
return null;
}

return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
}

if (recordPos != -1) {
return record.get(recordPos, javaClass);
} else {
return null;
}
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException();
}

@Override
public StructType struct() {
return type;
}

@Override
public Object getField(String name) {
throw new UnsupportedOperationException();
}

@Override
public void setField(String name, Object value) {
throw new UnsupportedOperationException();
}

@Override
public Object get(int pos) {
return get(pos, Object.class);
}

@Override
public Record copy() {
throw new UnsupportedOperationException();
}

@Override
public Record copy(Map<String, Object> overwriteValues) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.tabular.iceberg.connect.IcebergSinkConfig;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
Expand All @@ -37,7 +38,9 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
Expand Down Expand Up @@ -88,14 +91,23 @@ public static TaskWriter<Record> createTableWriter(Table table, IcebergSinkConfi
PropertyUtil.propertyAsLong(
table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);

FileAppenderFactory<Record> appenderFactory =
new GenericAppenderFactory(
table.schema(),
table.spec(),
Ints.toArray(table.schema().identifierFieldIds()),
table.schema(),
null)
.setAll(table.properties());
Set<Integer> equalityFieldIds = table.schema().identifierFieldIds();

FileAppenderFactory<Record> appenderFactory;
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
appenderFactory =
new GenericAppenderFactory(table.schema(), table.spec(), null, null, null)
.setAll(table.properties());
} else {
appenderFactory =
new GenericAppenderFactory(
table.schema(),
table.spec(),
Ints.toArray(equalityFieldIds),
TypeUtil.select(table.schema(), Sets.newHashSet(equalityFieldIds)),
null)
.setAll(table.properties());
}

// (partition ID + task ID + operation ID) must be unique
OutputFileFactory fileFactory =
Expand Down
Loading

0 comments on commit 5331c8d

Please sign in to comment.