Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inroduce RecordWrapper for better handling of deletes #426

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.DebeziumException;
import io.debezium.server.iceberg.tableoperator.Operation;
import io.debezium.server.iceberg.tableoperator.RecordWrapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -72,8 +74,36 @@ public Long cdcSourceTsMsValue(String cdcSourceTsMsField) {
return value().get(cdcSourceTsMsField).asLong(0);
}

public String cdcOpValue(String cdcOpField) {
return value().get(cdcOpField).asText("c");
public Operation cdcOpValue(String cdcOpField) {
final String opFieldValue;
if (value().has(cdcOpField)) {
opFieldValue = value().get(cdcOpField).asText("c");
} else if (value().has("ddl") && value().has("databaseName")
&& value().has("tableChanges")) {
// its "schema change topic" https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic
opFieldValue = "c";
} else {
opFieldValue = null;
}

if (opFieldValue == null) {
throw new DebeziumException("The value for field `" + cdcOpField + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}

if (opFieldValue.equals("u")) {
return Operation.UPDATE;
} else if (opFieldValue.equals("d")) {
return Operation.DELETE;
} else if (opFieldValue.equals("r")) {
return Operation.READ;
} else if (opFieldValue.equals("c")) {
return Operation.INSERT;
}else if (opFieldValue.equals("i")) {
return Operation.INSERT;
}
throw new DebeziumException("Unexpected `" + cdcOpField + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']");
}

public SchemaConverter schemaConverter() {
Expand All @@ -92,8 +122,15 @@ public String destination() {
return destination;
}

public GenericRecord convert(Schema schema) {
return convert(schema.asStruct(), value());
public RecordWrapper convertAsAppend(Schema schema) {
GenericRecord row = convert(schema.asStruct(), value());
return new RecordWrapper(row, Operation.INSERT);
}

public RecordWrapper convert(Schema schema, String cdcOpField) {
GenericRecord row = convert(schema.asStruct(), value());
Operation op = cdcOpValue(cdcOpField);
return new RecordWrapper(row, op);
}

private static GenericRecord convert(Types.StructType tableFields, JsonNode data) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.debezium.server.iceberg.tableoperator;

import com.google.common.collect.Sets;
import io.debezium.DebeziumException;
import org.apache.iceberg.*;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
Expand All @@ -14,8 +13,6 @@
import java.io.IOException;
import java.util.Set;

import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.cdcOpField;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

private final Schema schema;
Expand Down Expand Up @@ -49,20 +46,14 @@ InternalRecordWrapper wrapper() {
return wrapper;
}

@Override
@Override/**/
public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
final Object opFieldValue = row.getField(cdcOpField);
if (opFieldValue == null) {
throw new DebeziumException("The value for field `" + cdcOpField + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}

if (opFieldValue.equals("c")) {
Operation rowOperation = ((RecordWrapper) row).op();
if (rowOperation == Operation.INSERT) {
// new row
writer.write(row);
} else if (opFieldValue.equals("d") && !keepDeletes) {
} else if (rowOperation == Operation.DELETE && !keepDeletes) {
// deletes. doing hard delete. when keepDeletes = FALSE we dont keep deleted record
writer.deleteKey(keyProjection.wrap(row));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
Expand All @@ -39,11 +38,12 @@
@Dependent
public class IcebergTableOperator {

static final ImmutableMap<String, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
static final ImmutableMap<Operation, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of(Operation.INSERT, 1, Operation.READ, 2, Operation.UPDATE, 3, Operation.DELETE, 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
protected static final String cdcOpField = "__op";
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String cdcSourceTsMsField;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-field", defaultValue = "__op")
String cdcOpField;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
Expand Down Expand Up @@ -178,7 +178,7 @@ private void addToTablePerSchema(Table icebergTable, List<RecordConverter> event
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try (writer) {
for (RecordConverter e : events) {
final GenericRecord record = e.convert(tableSchema);
final RecordWrapper record = (upsert && !tableSchema.identifierFieldIds().isEmpty()) ? e.convert(tableSchema, cdcOpField) : e.convertAsAppend(tableSchema);
writer.write(record);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.debezium.server.iceberg.tableoperator;

public enum Operation {
INSERT,
UPDATE,
DELETE,
READ
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.debezium.server.iceberg.tableoperator;

import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types.StructType;

import java.util.Map;

public class RecordWrapper implements Record {

private final Record delegate;
private final Operation op;

public RecordWrapper(Record delegate, Operation op) {
this.delegate = delegate;
this.op = op;
}

public Operation op() {
return op;
}

@Override
public StructType struct() {
return delegate.struct();
}

@Override
public Object getField(String name) {
return delegate.getField(name);
}

@Override
public void setField(String name, Object value) {
delegate.setField(name, value);
}

@Override
public Object get(int pos) {
return delegate.get(pos);
}

@Override
public Record copy() {
return new RecordWrapper(delegate.copy(), op);
}

@Override
public Record copy(Map<String, Object> overwriteValues) {
return new RecordWrapper(delegate.copy(overwriteValues), op);
}

@Override
public int size() {
return delegate.size();
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return delegate.get(pos, javaClass);
}

@Override
public <T> void set(int pos, T value) {
delegate.set(pos, value);
}

@Override
public String toString() {
return delegate.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public Map<String, String> getConfigOverrides() {
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers,inventory.test_delete_table");
//config.put("%mysql.debezium.source.include.schema.changes", "false");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.tableoperator.RecordWrapper;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void testUnwrapJsonRecord() {
RecordConverter e = new RecordConverter("test",
unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema(true);
GenericRecord record = e.convert(schema);
RecordWrapper record = e.convert(schema, "__op");
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
assertEquals(schema.toString(), """
Expand Down Expand Up @@ -110,7 +111,7 @@ public void testNestedArrayJsonRecord() {
assertEquals(schema.identifierFieldIds(), Set.of());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int");
assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string");
GenericRecord record = e.convert(schema);
RecordWrapper record = e.convert(schema,"__op");
//System.out.println(record);
assertTrue(record.toString().contains("[10000, 10001, 10002, 10003]"));
}
Expand All @@ -137,7 +138,7 @@ public void testNestedGeomJsonRecord() {
RecordConverter e = new RecordConverter("test",
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema(true);
GenericRecord record = e.convert(schema);
RecordWrapper record = e.convert(schema,"__op");
assertEquals(schema.toString(), """
table {
1: id: optional int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testIcebergSchemaConverterWithDelete() throws IOException {
});
assertTrue(exception.getMessage().contains("Identifier fields are not supported for unnested events"));
// print converted event value!
System.out.println(ie.convert(ie.icebergSchema(false)));
System.out.println(ie.convert(ie.icebergSchema(false),"__op"));
}

public static class TestProfile implements QuarkusTestProfile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.IcebergChangeEventBuilder;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.WithTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
Expand All @@ -39,6 +40,7 @@
*/
@QuarkusTest
@WithTestResource(value = S3Minio.class)
@WithTestResource(value = SourcePostgresqlDB.class)
class IcebergTableOperatorTest extends BaseSparkTest {

static String testTable = "inventory.test_table_operator";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void testUnpartitionedDeltaWriter() throws IOException {
row.setField("id2", "123");
row.setField("__op", "u");

writer.write(row);
writer.write(new RecordWrapper(row, Operation.UPDATE));
WriteResult result = writer.complete();

// in upsert mode, each write is a delete + append, so we'll have 1 data file and 1 delete file
Expand Down