diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index b16c6432..360625d5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -84,6 +84,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu protected Optional destinationRegexp; @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "") protected Optional destinationRegexpReplace; + @ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-table-names", defaultValue = "false") + protected boolean destinationUppercaseTableNames; + @ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-table-names", defaultValue = "false") + protected boolean destinationLowercaseTableNames; @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") Optional tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -144,7 +148,7 @@ public void handleBatch(List> records, DebeziumEngin Map> result = records.stream() .map((ChangeEvent e) - -> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key()))) + -> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key()))) .collect(Collectors.groupingBy(IcebergChangeEvent::destination)); // consume list of events for each destination table @@ -178,8 +182,8 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample } try { return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat); - } catch (Exception e){ - throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e); + } catch (Exception e) { + throw new DebeziumException("Failed to create table from debezium event schema:" + tableId + " Error:" + e.getMessage(), e); } }); } @@ -204,6 +208,12 @@ public TableIdentifier mapDestination(String destination) { .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) .replace(".", "_"); - return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName); + if (destinationUppercaseTableNames) { + return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toUpperCase()); + } else if (destinationLowercaseTableNames) { + return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toLowerCase()); + } else { + return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName); + } } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index a5e0570a..e65805a8 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -334,6 +334,15 @@ public void testPartitionedTable() { public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1")); assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); + icebergConsumer.destinationUppercaseTableNames = true; + icebergConsumer.destinationLowercaseTableNames = false; + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("table_name")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("Table_Name")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("TABLE_NAME")); + icebergConsumer.destinationUppercaseTableNames = false; + icebergConsumer.destinationLowercaseTableNames = true; + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("Table_Name")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("TABLE_NAME")); } public static class TestProfile implements QuarkusTestProfile { diff --git a/docs/DOCS.md b/docs/DOCS.md index fc5e8ae0..1137c5b1 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -29,6 +29,8 @@ tables created automatically with the first start. | `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to false the consumer will create tables without identifier fields. useful when user wants to consume nested events with append only mode. | | `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination iceberg table name. For example with this setting, its possible to combine some tables `table_ptt1`,`table_ptt2` to one `table_combined`. | | `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp replace part to modify destination iceberg table name | +| `debezium.sink.iceberg.destination-uppercase-table-names` | `false` | Maps debezium destination to uppercase iceberg table names | +| `debezium.sink.iceberg.destination-lowercase-table-names` | `false` | Maps debezium destination to lowercase iceberg table names | | `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy, Used to optimize data file size and upload interval. explained below. | | `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) this settings are passed to Iceberg (without the prefix) | | `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming) |