diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java index 7a5c4b59..21a1ff94 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java @@ -41,7 +41,6 @@ public class RecordConverter { protected static final ObjectMapper mapper = new ObjectMapper(); protected static final Logger LOGGER = LoggerFactory.getLogger(RecordConverter.class); public static final List TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms"); - public static final String CDC_OPERATION_FIELD_NAME = "__op"; static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt(); protected final String destination; protected final byte[] valueData; @@ -75,26 +74,10 @@ public Long cdcSourceTsMsValue(String cdcSourceTsMsField) { return value().get(cdcSourceTsMsField).asLong(0); } - public SchemaConverter schemaConverter() { - try { - return new SchemaConverter(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema")); - } catch (IOException e) { - throw new DebeziumException("Failed to get event schema", e); - } - } - - public Schema icebergSchema(boolean createIdentifierFields) { - return schemaConverter().icebergSchema(createIdentifierFields); - } - - public String destination() { - return destination; - } - - public Operation cdcOpValue() { + public Operation cdcOpValue(String cdcOpField) { final String opFieldValue; - if (value().has(CDC_OPERATION_FIELD_NAME)) { - opFieldValue = value().get(CDC_OPERATION_FIELD_NAME).asText("c"); + if (value().has(cdcOpField)) { + opFieldValue = value().get(cdcOpField).asText("c"); } else if (value().has("ddl") && value().has("databaseName") && value().has("tableChanges")) { // its "schema change topic" https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic @@ -104,7 +87,7 @@ && value().has("tableChanges")) { } if (opFieldValue == null) { - throw new DebeziumException("The value for field `" + CDC_OPERATION_FIELD_NAME + "` is missing. " + + throw new DebeziumException("The value for field `" + cdcOpField + "` is missing. " + "This field is required when updating or deleting data, when running in upsert mode." ); } @@ -120,12 +103,33 @@ && value().has("tableChanges")) { }else if (opFieldValue.equals("i")) { return Operation.INSERT; } - throw new DebeziumException("Unexpected `" + CDC_OPERATION_FIELD_NAME + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']"); + throw new DebeziumException("Unexpected `" + cdcOpField + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']"); + } + + public SchemaConverter schemaConverter() { + try { + return new SchemaConverter(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema")); + } catch (IOException e) { + throw new DebeziumException("Failed to get event schema", e); + } + } + + public Schema icebergSchema(boolean createIdentifierFields) { + return schemaConverter().icebergSchema(createIdentifierFields); + } + + public String destination() { + return destination; } public RecordWrapper convert(Schema schema) { GenericRecord row = convert(schema.asStruct(), value()); - Operation op = cdcOpValue(); + return new RecordWrapper(row, Operation.INSERT); + } + + public RecordWrapper convert(Schema schema, String cdcOpField) { + GenericRecord row = convert(schema.asStruct(), value()); + Operation op = cdcOpValue(cdcOpField); return new RecordWrapper(row, op); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index ed294309..ebcb537d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -42,6 +42,8 @@ public class IcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String cdcSourceTsMsField; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-field", defaultValue = "__op") + String cdcOpField; @ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true") boolean allowFieldAddition; @ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true") @@ -94,9 +96,9 @@ private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) { if (result == 0) { // return (x < y) ? -1 : ((x == y) ? 0 : 1); - result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(), -1) + result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(cdcOpField), -1) .compareTo( - CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(), -1) + CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(cdcOpField), -1) ); } @@ -176,7 +178,7 @@ private void addToTablePerSchema(Table icebergTable, List event BaseTaskWriter writer = writerFactory.create(icebergTable); try (writer) { for (RecordConverter e : events) { - final RecordWrapper record = e.convert(tableSchema); + final RecordWrapper record = e.convert(tableSchema, cdcOpField); writer.write(record); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTest.java index b90a14f9..1924edf5 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTest.java @@ -71,7 +71,7 @@ public void testUnwrapJsonRecord() { RecordConverter e = new RecordConverter("test", unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(true); - RecordWrapper record = e.convert(schema); + RecordWrapper record = e.convert(schema, "__op"); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); assertEquals(schema.toString(), """ @@ -111,7 +111,7 @@ public void testNestedArrayJsonRecord() { assertEquals(schema.identifierFieldIds(), Set.of()); assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int"); assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string"); - RecordWrapper record = e.convert(schema); + RecordWrapper record = e.convert(schema,"__op"); //System.out.println(record); assertTrue(record.toString().contains("[10000, 10001, 10002, 10003]")); } @@ -138,7 +138,7 @@ public void testNestedGeomJsonRecord() { RecordConverter e = new RecordConverter("test", unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null); Schema schema = e.icebergSchema(true); - RecordWrapper record = e.convert(schema); + RecordWrapper record = e.convert(schema,"__op"); assertEquals(schema.toString(), """ table { 1: id: optional int diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTestUnwrapped.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTestUnwrapped.java index 22526962..64c59592 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTestUnwrapped.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/RecordConverterTestUnwrapped.java @@ -71,7 +71,7 @@ public void testIcebergSchemaConverterWithDelete() throws IOException { }); assertTrue(exception.getMessage().contains("Identifier fields are not supported for unnested events")); // print converted event value! - System.out.println(ie.convert(ie.icebergSchema(false))); + System.out.println(ie.convert(ie.icebergSchema(false),"__op")); } public static class TestProfile implements QuarkusTestProfile {