From 3e3dfabf1498a95a8f37fd143c398abffc4fdf59 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 24 Jul 2024 09:49:49 +0200 Subject: [PATCH 1/6] Add option to change case of destination/iceberg table names --- .../server/iceberg/IcebergChangeConsumer.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index b16c6432..05af3a3c 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -84,6 +84,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu protected Optional destinationRegexp; @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "") protected Optional destinationRegexpReplace; + @ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-names", defaultValue = "false") + protected boolean destinationUppercaseNames; + @ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-names", defaultValue = "false") + protected boolean destinationLowercaseNames; @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") Optional tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -144,7 +148,7 @@ public void handleBatch(List> records, DebeziumEngin Map> result = records.stream() .map((ChangeEvent e) - -> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key()))) + -> new IcebergChangeEvent(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key()))) .collect(Collectors.groupingBy(IcebergChangeEvent::destination)); // consume list of events for each destination table @@ -178,8 +182,8 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample } try { return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(createIdentifierFields), writeFormat); - } catch (Exception e){ - throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e); + } catch (Exception e) { + throw new DebeziumException("Failed to create table from debezium event schema:" + tableId + " Error:" + e.getMessage(), e); } }); } @@ -204,6 +208,12 @@ public TableIdentifier mapDestination(String destination) { .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) .replace(".", "_"); - return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName); + if (destinationUppercaseNames) { + return TableIdentifier.of(Namespace.of(namespace.toUpperCase()), (tablePrefix.orElse("") + tableName).toUpperCase()); + } else if (destinationLowercaseNames) { + return TableIdentifier.of(Namespace.of(namespace.toLowerCase()), (tablePrefix.orElse("") + tableName).toLowerCase()); + } else { + return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName); + } } } From 8ba3252fde9bc1cfca84ca0c726f5a849e9edd67 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 24 Jul 2024 15:24:57 +0200 Subject: [PATCH 2/6] Add option to change case of destination/iceberg table names --- .../debezium/server/iceberg/IcebergChangeConsumerTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index a5e0570a..07c44d61 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -334,6 +334,12 @@ public void testPartitionedTable() { public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1")); assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); + icebergConsumer.destinationUppercaseNames = true; + icebergConsumer.destinationLowercaseNames = false; + assertEquals(TableIdentifier.of(Namespace.of(namespace.toUpperCase()), "DEBEZIUMCDC_TABLE_LOWERCASE"), icebergConsumer.mapDestination("table_lowercase")); + icebergConsumer.destinationUppercaseNames = false; + icebergConsumer.destinationLowercaseNames = true; + assertEquals(TableIdentifier.of(Namespace.of(namespace.toLowerCase()), "debeziumcdc_table_camelcase"), icebergConsumer.mapDestination("table_CamelCase")); } public static class TestProfile implements QuarkusTestProfile { From e79fdd227a33a403ef7aea8b36fa58e45a685349 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 24 Jul 2024 15:34:34 +0200 Subject: [PATCH 3/6] Add option to change case of destination/iceberg table names --- .../server/iceberg/IcebergChangeConsumer.java | 16 ++++++++-------- .../iceberg/IcebergChangeConsumerTest.java | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 05af3a3c..360625d5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -84,10 +84,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu protected Optional destinationRegexp; @ConfigProperty(name = "debezium.sink.iceberg.destination-regexp-replace", defaultValue = "") protected Optional destinationRegexpReplace; - @ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-names", defaultValue = "false") - protected boolean destinationUppercaseNames; - @ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-names", defaultValue = "false") - protected boolean destinationLowercaseNames; + @ConfigProperty(name = "debezium.sink.iceberg.destination-uppercase-table-names", defaultValue = "false") + protected boolean destinationUppercaseTableNames; + @ConfigProperty(name = "debezium.sink.iceberg.destination-lowercase-table-names", defaultValue = "false") + protected boolean destinationLowercaseTableNames; @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") Optional tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -208,10 +208,10 @@ public TableIdentifier mapDestination(String destination) { .replaceAll(destinationRegexp.orElse(""), destinationRegexpReplace.orElse("")) .replace(".", "_"); - if (destinationUppercaseNames) { - return TableIdentifier.of(Namespace.of(namespace.toUpperCase()), (tablePrefix.orElse("") + tableName).toUpperCase()); - } else if (destinationLowercaseNames) { - return TableIdentifier.of(Namespace.of(namespace.toLowerCase()), (tablePrefix.orElse("") + tableName).toLowerCase()); + if (destinationUppercaseTableNames) { + return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toUpperCase()); + } else if (destinationLowercaseTableNames) { + return TableIdentifier.of(Namespace.of(namespace), (tablePrefix.orElse("") + tableName).toLowerCase()); } else { return TableIdentifier.of(Namespace.of(namespace), tablePrefix.orElse("") + tableName); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 07c44d61..2e60c9fe 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -334,11 +334,11 @@ public void testPartitionedTable() { public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1")); assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); - icebergConsumer.destinationUppercaseNames = true; - icebergConsumer.destinationLowercaseNames = false; + icebergConsumer.destinationUppercaseTableNames = true; + icebergConsumer.destinationLowercaseTableNames = false; assertEquals(TableIdentifier.of(Namespace.of(namespace.toUpperCase()), "DEBEZIUMCDC_TABLE_LOWERCASE"), icebergConsumer.mapDestination("table_lowercase")); - icebergConsumer.destinationUppercaseNames = false; - icebergConsumer.destinationLowercaseNames = true; + icebergConsumer.destinationUppercaseTableNames = false; + icebergConsumer.destinationLowercaseTableNames = true; assertEquals(TableIdentifier.of(Namespace.of(namespace.toLowerCase()), "debeziumcdc_table_camelcase"), icebergConsumer.mapDestination("table_CamelCase")); } From 946abf7916c227e62d3840c87032eba1c90af99a Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 24 Jul 2024 15:50:34 +0200 Subject: [PATCH 4/6] Add option to change case of destination/iceberg table names --- .../io/debezium/server/iceberg/IcebergChangeConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 2e60c9fe..4e4df5dc 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -336,10 +336,10 @@ public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); icebergConsumer.destinationUppercaseTableNames = true; icebergConsumer.destinationLowercaseTableNames = false; - assertEquals(TableIdentifier.of(Namespace.of(namespace.toUpperCase()), "DEBEZIUMCDC_TABLE_LOWERCASE"), icebergConsumer.mapDestination("table_lowercase")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_LOWERCASE"), icebergConsumer.mapDestination("table_lowercase")); icebergConsumer.destinationUppercaseTableNames = false; icebergConsumer.destinationLowercaseTableNames = true; - assertEquals(TableIdentifier.of(Namespace.of(namespace.toLowerCase()), "debeziumcdc_table_camelcase"), icebergConsumer.mapDestination("table_CamelCase")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_camelcase"), icebergConsumer.mapDestination("table_CamelCase")); } public static class TestProfile implements QuarkusTestProfile { From 2d4f8cd62c7eb00cd819489e80158b4be0a34bb3 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Tue, 30 Jul 2024 14:15:49 +0200 Subject: [PATCH 5/6] Add option to change case of destination/iceberg table names --- .../debezium/server/iceberg/IcebergChangeConsumerTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 4e4df5dc..e65805a8 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -336,10 +336,13 @@ public void testMapDestination() { assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2")); icebergConsumer.destinationUppercaseTableNames = true; icebergConsumer.destinationLowercaseTableNames = false; - assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_LOWERCASE"), icebergConsumer.mapDestination("table_lowercase")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("table_name")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("Table_Name")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("TABLE_NAME")); icebergConsumer.destinationUppercaseTableNames = false; icebergConsumer.destinationLowercaseTableNames = true; - assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_camelcase"), icebergConsumer.mapDestination("table_CamelCase")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("Table_Name")); + assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("TABLE_NAME")); } public static class TestProfile implements QuarkusTestProfile { From 9d876c59c2507993b5981dd6f3bab86db6637818 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Wed, 31 Jul 2024 10:14:10 +0200 Subject: [PATCH 6/6] Add option to change case of destination/iceberg table names --- docs/DOCS.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/DOCS.md b/docs/DOCS.md index fc5e8ae0..1137c5b1 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -29,6 +29,8 @@ tables created automatically with the first start. | `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to false the consumer will create tables without identifier fields. useful when user wants to consume nested events with append only mode. | | `debezium.sink.iceberg.destination-regexp` | `` | Regexp to modify destination iceberg table name. For example with this setting, its possible to combine some tables `table_ptt1`,`table_ptt2` to one `table_combined`. | | `debezium.sink.iceberg.destination-regexp-replace` | `` | Regexp replace part to modify destination iceberg table name | +| `debezium.sink.iceberg.destination-uppercase-table-names` | `false` | Maps debezium destination to uppercase iceberg table names | +| `debezium.sink.iceberg.destination-lowercase-table-names` | `false` | Maps debezium destination to lowercase iceberg table names | | `debezium.sink.batch.batch-size-wait` | `NoBatchSizeWait` | Batch size wait strategy, Used to optimize data file size and upload interval. explained below. | | `debezium.sink.iceberg.{iceberg.prop.name}` | | [Iceberg config](https://iceberg.apache.org/docs/latest/configuration/) this settings are passed to Iceberg (without the prefix) | | `debezium.source.offset.storage` | `io.debezium.server.iceberg.offset.IcebergOffsetBackingStore` | The name of the Java class that is responsible for persistence of connector offsets. see [debezium doc](https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming) |