Skip to content

Commit

Permalink
Improve code, Improve class and method namings (#410)
Browse files Browse the repository at this point in the history
* Rename IcebergChangeEvent to RecordConverter

* Rename ChangeEventSchema to SchemaConverter

* Rename changeEventSchema() to schemaConverter()

* Rename asIcebergRecord(Schema schema) to convert(Schema schema)

* Rename IcebergChangeEventSchemaData to RecordSchemaData

* Rename cdcOperations to CDC_OPERATION_PRIORITY

* Rename opFieldName to cdcOpField and sourceTsMsColumn to cdcSourceTsMsField
  • Loading branch information
ismailsimsek authored Sep 5, 2024
1 parent 6e553bb commit a7f541c
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Instant start = Instant.now();

//group events by destination (per iceberg table)
Map<String, List<IcebergChangeEvent>> result =
Map<String, List<RecordConverter>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
.collect(Collectors.groupingBy(IcebergChangeEvent::destination));
-> new RecordConverter(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
.collect(Collectors.groupingBy(RecordConverter::destination));

// consume list of events for each destination table
for (Map.Entry<String, List<IcebergChangeEvent>> tableEvents : result.entrySet()) {
for (Map.Entry<String, List<RecordConverter>> tableEvents : result.entrySet()) {
Table icebergTable = this.loadIcebergTable(mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, tableEvents.getValue());
}
Expand All @@ -175,7 +175,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @return iceberg table, throws RuntimeException when table not found, and it's not possible to create it
*/
public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
public Table loadIcebergTable(TableIdentifier tableId, RecordConverter sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
*
* @author Ismail Simsek
*/
public class IcebergChangeEvent {
public class RecordConverter {

protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(RecordConverter.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
static final boolean eventsAreUnwrapped = IcebergUtil.configIncludesUnwrapSmt();
protected final String destination;
Expand All @@ -46,7 +46,7 @@ public class IcebergChangeEvent {
private JsonNode value;
private JsonNode key;

public IcebergChangeEvent(String destination, byte[] valueData, byte[] keyData) {
public RecordConverter(String destination, byte[] valueData, byte[] keyData) {
this.destination = destination;
this.valueData = valueData;
this.keyData = keyData;
Expand All @@ -68,27 +68,27 @@ public JsonNode value() {
return value;
}

public ChangeEventSchema changeEventSchema() {
public SchemaConverter schemaConverter() {
try {
return new ChangeEventSchema(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
return new SchemaConverter(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
} catch (IOException e) {
throw new DebeziumException("Failed to get event schema", e);
}
}

public Schema icebergSchema(boolean createIdentifierFields) {
return changeEventSchema().icebergSchema(createIdentifierFields);
return schemaConverter().icebergSchema(createIdentifierFields);
}

public String destination() {
return destination;
}

public GenericRecord asIcebergRecord(Schema schema) {
return asIcebergRecord(schema.asStruct(), value());
public GenericRecord convert(Schema schema) {
return convert(schema.asStruct(), value());
}

private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
private static GenericRecord convert(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);

Expand Down Expand Up @@ -172,7 +172,7 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
HashMap<Object, Object> mapVal = new HashMap<>();
node.fields().forEachRemaining(f -> {
if (valType.isStructType()) {
mapVal.put(f.getKey(), asIcebergRecord(valType.asStructType(), f.getValue()));
mapVal.put(f.getKey(), convert(valType.asStructType(), f.getValue()));
} else {
mapVal.put(f.getKey(), f.getValue());
}
Expand All @@ -182,7 +182,7 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
case STRUCT:
// create it as struct, nested type
// recursive call to get nested data/record
val = asIcebergRecord(field.type().asStructType(), node);
val = convert(field.type().asStructType(), node);
break;
default:
// default to String type
Expand All @@ -195,11 +195,11 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
}


public static class ChangeEventSchema {
public static class SchemaConverter {
private final JsonNode valueSchema;
private final JsonNode keySchema;

ChangeEventSchema(JsonNode valueSchema, JsonNode keySchema) {
SchemaConverter(JsonNode valueSchema, JsonNode keySchema) {
this.valueSchema = valueSchema;
this.keySchema = keySchema;
}
Expand All @@ -220,13 +220,13 @@ protected JsonNode keySchema() {
* @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergChangeEventSchemaData schemaData, JsonNode keySchemaNode) {
private static RecordSchemaData debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, RecordSchemaData schemaData, JsonNode keySchemaNode) {
String fieldType = fieldSchema.get("type").textValue();
boolean isPkField = !(keySchemaNode == null || keySchemaNode.isNull());
switch (fieldType) {
case "struct":
int rootStructId = schemaData.nextFieldId().getAndIncrement();
final IcebergChangeEventSchemaData subSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
final RecordSchemaData subSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
for (JsonNode subFieldSchema : fieldSchema.get("fields")) {
String subFieldName = subFieldSchema.get("field").textValue();
JsonNode equivalentNestedKeyField = findNodeFieldByName(subFieldName, keySchemaNode);
Expand All @@ -243,10 +243,10 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
int rootMapId = schemaData.nextFieldId().getAndIncrement();
int keyFieldId = schemaData.nextFieldId().getAndIncrement();
int valFieldId = schemaData.nextFieldId().getAndIncrement();
final IcebergChangeEventSchemaData keySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
final RecordSchemaData keySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("keys"), fieldName + "_key", keySchemaData, null);
schemaData.nextFieldId().incrementAndGet();
final IcebergChangeEventSchemaData valSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
final RecordSchemaData valSchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("values"), fieldName + "_val", valSchemaData, null);
final Types.MapType mapField = Types.MapType.ofOptional(keyFieldId, valFieldId, keySchemaData.fields().get(0).type(), valSchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootMapId, fieldName, mapField));
Expand All @@ -257,7 +257,7 @@ private static IcebergChangeEventSchemaData debeziumFieldToIcebergField(JsonNode
throw new DebeziumException("Cannot set array field '" + fieldName + "' as a identifier field, array types are not supported as an identifier field!");
}
int rootArrayId = schemaData.nextFieldId().getAndIncrement();
final IcebergChangeEventSchemaData arraySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
final RecordSchemaData arraySchemaData = schemaData.copyKeepIdentifierFieldIdsAndNextFieldId();
debeziumFieldToIcebergField(fieldSchema.get("items"), fieldName + "_items", arraySchemaData, null);
final Types.ListType listField = Types.ListType.ofOptional(schemaData.nextFieldId().getAndIncrement(), arraySchemaData.fields().get(0).type());
schemaData.fields().add(Types.NestedField.optional(rootArrayId, fieldName, listField));
Expand Down Expand Up @@ -301,7 +301,7 @@ private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) {
* @param schemaNode
* @return
*/
private static IcebergChangeEventSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, IcebergChangeEventSchemaData schemaData) {
private static RecordSchemaData icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, RecordSchemaData schemaData) {
LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
String fieldName = field.get("field").textValue();
Expand All @@ -318,7 +318,7 @@ private Schema icebergSchema(boolean createIdentifierFields) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

IcebergChangeEventSchemaData schemaData = new IcebergChangeEventSchemaData();
RecordSchemaData schemaData = new RecordSchemaData();
if (!createIdentifierFields) {
LOGGER.warn("Creating identifier fields is disabled, creating table without identifier field!");
icebergSchemaFields(valueSchema, null, schemaData);
Expand Down Expand Up @@ -388,7 +388,7 @@ private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChangeEventSchema that = (ChangeEventSchema) o;
SchemaConverter that = (SchemaConverter) o;
return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.debezium.server.iceberg;

import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

record RecordSchemaData(List<Types.NestedField> fields, Set<Integer> identifierFieldIds,
AtomicInteger nextFieldId) {


public RecordSchemaData(Integer nextFieldId) {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(nextFieldId));
}

public RecordSchemaData() {
this(new ArrayList<>(), new HashSet<>(), new AtomicInteger(1));
}

public RecordSchemaData copyKeepIdentifierFieldIdsAndNextFieldId() {
return new RecordSchemaData(new ArrayList<>(), this.identifierFieldIds, this.nextFieldId);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.io.IOException;
import java.util.Set;

import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.opFieldName;
import static io.debezium.server.iceberg.tableoperator.IcebergTableOperator.cdcOpField;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {

Expand Down Expand Up @@ -53,9 +53,9 @@ InternalRecordWrapper wrapper() {
@Override
public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
final Object opFieldValue = row.getField(opFieldName);
final Object opFieldValue = row.getField(cdcOpField);
if (opFieldValue == null) {
throw new DebeziumException("The value for field `" + opFieldName + "` is missing. " +
throw new DebeziumException("The value for field `" + cdcOpField + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;
import io.debezium.server.iceberg.RecordConverter;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
Expand All @@ -38,11 +38,11 @@
@Dependent
public class IcebergTableOperator {

static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
static final ImmutableMap<String, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
protected static final String opFieldName = "__op";
protected static final String cdcOpField = "__op";
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
String cdcSourceTsMsField;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@ConfigProperty(name = "debezium.sink.iceberg.create-identifier-fields", defaultValue = "true")
Expand All @@ -53,9 +53,9 @@ public class IcebergTableOperator {
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;

protected List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> events) {
protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {

ConcurrentHashMap<JsonNode, IcebergChangeEvent> deduplicatedEvents = new ConcurrentHashMap<>();
ConcurrentHashMap<JsonNode, RecordConverter> deduplicatedEvents = new ConcurrentHashMap<>();

events.forEach(e -> {
if (e.key() == null || e.key().isNull()) {
Expand Down Expand Up @@ -91,13 +91,13 @@ protected List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> eve
*/
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0));
int result = Long.compare(lhs.get(cdcSourceTsMsField).asLong(0), rhs.get(cdcSourceTsMsField).asLong(0));

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = cdcOperations.getOrDefault(lhs.get(opFieldName).asText("c"), -1)
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.get(cdcOpField).asText("c"), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.get(opFieldName).asText("c"), -1)
CDC_OPERATION_PRIORITY.getOrDefault(rhs.get(cdcOpField).asText("c"), -1)
);
}

Expand Down Expand Up @@ -139,7 +139,7 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) {
* @param icebergTable
* @param events
*/
public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
public void addToTable(Table icebergTable, List<RecordConverter> events) {

// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
Expand All @@ -150,12 +150,12 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
Map<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
Map<RecordConverter.SchemaConverter, List<RecordConverter>> eventsGroupedBySchema =
events.stream()
.collect(Collectors.groupingBy(IcebergChangeEvent::changeEventSchema));
.collect(Collectors.groupingBy(RecordConverter::schemaConverter));
LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size());

for (Map.Entry<IcebergChangeEvent.ChangeEventSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
for (Map.Entry<RecordConverter.SchemaConverter, List<RecordConverter>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(createIdentifierFields));
// add set of events to table
Expand All @@ -171,12 +171,12 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
* @param icebergTable
* @param events
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
private void addToTablePerSchema(Table icebergTable, List<RecordConverter> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
final Schema tableSchema = icebergTable.schema();
try (BaseTaskWriter<Record> writer = writerFactory.create(icebergTable)) {
for (IcebergChangeEvent e : events) {
final GenericRecord record = e.asIcebergRecord(tableSchema);
for (RecordConverter e : events) {
final GenericRecord record = e.convert(tableSchema);
writer.write(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand All @@ -44,7 +42,7 @@ public void testSimpleUpload() throws Exception {

// make sure its not unwrapped
assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false);
assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false);
assertEquals(RecordConverter.eventsAreUnwrapped, false);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down Expand Up @@ -83,7 +81,7 @@ public void testDeleteEvents() throws Exception {

// make sure its not unwrapped
assertEquals(IcebergUtil.configIncludesUnwrapSmt(), false);
assertEquals(IcebergChangeEvent.eventsAreUnwrapped, false);
assertEquals(RecordConverter.eventsAreUnwrapped, false);

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Expand Down
Loading

0 comments on commit a7f541c

Please sign in to comment.