Skip to content

Commit

Permalink
Use RecordWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Oct 5, 2024
1 parent a539be3 commit 305a2a0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer;
import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer;
import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.CDC_OPERATION_FIELD_NAME;

/**
* Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema.
Expand All @@ -42,6 +41,7 @@ public class RecordConverter {
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Logger LOGGER = LoggerFactory.getLogger(RecordConverter.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
public static final String CDC_OPERATION_FIELD_NAME = "__op";
static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt();
protected final String destination;
protected final byte[] valueData;
Expand Down Expand Up @@ -91,10 +91,10 @@ public String destination() {
return destination;
}

public Operation cdcOpValue(String cdcOpField) {
public Operation cdcOpValue() {
final String opFieldValue;
if (value().has(cdcOpField)) {
opFieldValue = value().get(cdcOpField).asText("c");
if (value().has(CDC_OPERATION_FIELD_NAME)) {
opFieldValue = value().get(CDC_OPERATION_FIELD_NAME).asText("c");
} else if (value().has("ddl") && value().has("databaseName")
&& value().has("tableChanges")) {
// its "schema change topic" https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic
Expand All @@ -104,7 +104,7 @@ && value().has("tableChanges")) {
}

if (opFieldValue == null) {
throw new DebeziumException("The value for field `" + cdcOpField + "` is missing. " +
throw new DebeziumException("The value for field `" + CDC_OPERATION_FIELD_NAME + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}
Expand All @@ -120,12 +120,12 @@ && value().has("tableChanges")) {
}else if (opFieldValue.equals("i")) {
return Operation.INSERT;
}
throw new DebeziumException("Unexpected `" + cdcOpField + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']");
throw new DebeziumException("Unexpected `" + CDC_OPERATION_FIELD_NAME + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']");
}

public RecordWrapper convert(Schema schema) {
GenericRecord row = convert(schema.asStruct(), value());
Operation op = cdcOpValue(CDC_OPERATION_FIELD_NAME);
Operation op = cdcOpValue();
return new RecordWrapper(row, op);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class IcebergTableOperator {

static final ImmutableMap<Operation, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of(Operation.INSERT, 1, Operation.READ, 2, Operation.UPDATE, 3, Operation.DELETE, 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
public static final String CDC_OPERATION_FIELD_NAME = "__op";
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String cdcSourceTsMsField;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
Expand Down Expand Up @@ -95,9 +94,9 @@ private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) {

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(CDC_OPERATION_FIELD_NAME), -1)
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(), -1)
.compareTo(
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(CDC_OPERATION_FIELD_NAME), -1)
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(), -1)
);
}

Expand Down

0 comments on commit 305a2a0

Please sign in to comment.