Skip to content

Commit

Permalink
Require full table name for dynamic routing (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored May 26, 2023
1 parent a9fc63c commit 00eca36
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 27 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ PARTITIONED BY (hours(ts));
```

## Multi-table fan-out, dynamic routing
This example writes to tables with names based on the value in the `type` field. If a table with
the name does not exist, then the record will be skipped. For example, if the record's `type` field
is set to `list`, then the record is written to the `default.events_list` table.
This example writes to tables with names from the value in the `db_table` field. If a table with
the name does not exist, then the record will be skipped. For example, if the record's `db_table`
field is set to `default.events_list`, then the record is written to the `default.events_list` table.

### Create two destination tables
See above for creating two tables.
Expand All @@ -126,8 +126,8 @@ See above for creating two tables.
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "events",
"iceberg.tables.dynamic.namePrefix": "default.events_",
"iceberg.tables.routeField": "type",
"iceberg.tables.dynamic.enabled": "true",
"iceberg.tables.routeField": "db_table",
"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog",
"iceberg.catalog.uri": ...
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ public class IntegrationDynamicTableTest extends IntegrationTestBase {
private static final String CONNECTOR_NAME = "test_connector";
private static final String TEST_TOPIC = "test-topic";
private static final String TEST_DB = "default";
private static final String TEST_TABLE_PREFIX = "tbl_";
private static final String TEST_TABLE1 = TEST_TABLE_PREFIX + "type1";
private static final String TEST_TABLE2 = TEST_TABLE_PREFIX + "type2";
private static final String TEST_TABLE1 = "tbl1";
private static final String TEST_TABLE2 = "tbl2";
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);
private static final Schema TEST_SCHEMA =
Expand Down Expand Up @@ -90,9 +89,8 @@ public void testIcebergSink() {
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", false)
.config(
"iceberg.tables.dynamic.namePrefix", format("%s.%s", TEST_DB, TEST_TABLE_PREFIX))
.config("iceberg.tables.routeField", "type")
.config("iceberg.tables.dynamic.enabled", true)
.config("iceberg.tables.routeField", "payload")
.config("iceberg.control.commitIntervalMs", 1000)
.config("iceberg.control.commitTimeoutMs", 1000)
.config("iceberg.catalog", RESTCatalog.class.getName())
Expand Down Expand Up @@ -126,9 +124,9 @@ public void testIcebergSink() {
}

private void runTest() {
String event1 = format(RECORD_FORMAT, 1, "type1", System.currentTimeMillis(), "hello world!");
String event2 = format(RECORD_FORMAT, 2, "type2", System.currentTimeMillis(), "having fun?");
String event3 = format(RECORD_FORMAT, 3, "type3", System.currentTimeMillis(), "ignore me");
String event1 = format(RECORD_FORMAT, 1, "type1", System.currentTimeMillis(), "default.tbl1");
String event2 = format(RECORD_FORMAT, 2, "type2", System.currentTimeMillis(), "default.tbl2");
String event3 = format(RECORD_FORMAT, 3, "type3", System.currentTimeMillis(), "default.tbl3");

producer.send(new ProducerRecord<>(TEST_TOPIC, event1));
producer.send(new ProducerRecord<>(TEST_TOPIC, event2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class IcebergSinkConfig extends AbstractConfig {

private static final String CATALOG_IMPL_PROP = "iceberg.catalog";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic.namePrefix";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic.enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.routeField";
private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdcField";
private static final String TABLES_UPSERT_MODE_ENABLED_PROP = "iceberg.tables.upsertModeEnabled";
Expand Down Expand Up @@ -103,10 +103,10 @@ private static ConfigDef newConfigDef() {
"Comma-delimited list of destination tables");
configDef.define(
TABLES_DYNAMIC_PROP,
Type.STRING,
null,
Importance.HIGH,
"Table name prefix to match for destination tables");
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Enable dynamic routing to tables based on a record value");
configDef.define(
TABLES_ROUTE_FIELD_PROP,
Type.STRING,
Expand Down Expand Up @@ -183,9 +183,8 @@ public IcebergSinkConfig(Map<String, String> originalProps) {

private void validate() {
if (getTables() != null) {
checkState(
getDynamicTablesPrefix() == null, "Cannot specify both static and dynamic table names");
} else if (getDynamicTablesPrefix() != null) {
checkState(!getDynamicTablesEnabled(), "Cannot specify both static and dynamic table names");
} else if (getDynamicTablesEnabled()) {
checkState(
getTablesRouteField() != null, "Must specify a route field if using dynamic table names");
} else {
Expand Down Expand Up @@ -228,8 +227,8 @@ public List<String> getTables() {
return getList(TABLES_PROP);
}

public String getDynamicTablesPrefix() {
return getString(TABLES_DYNAMIC_PROP);
public boolean getDynamicTablesEnabled() {
return getBoolean(TABLES_DYNAMIC_PROP);
}

public String getTablesRouteField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void save(Collection<SinkRecord> sinkRecords) {
}

private void save(SinkRecord record) {
if (config.getDynamicTablesPrefix() != null) {
if (config.getDynamicTablesEnabled()) {
routeRecordDynamically(record);
} else {
routeRecordStatically(record);
Expand Down Expand Up @@ -198,7 +198,7 @@ private void routeRecordDynamically(SinkRecord record) {

String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
String tableName = config.getDynamicTablesPrefix() + routeValue.toLowerCase();
String tableName = routeValue.toLowerCase();
if (tableExists(tableName)) {
getWriterForTable(tableName).write(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testInvalid() {
"topics", "source-topic",
"iceberg.catalog", RESTCatalog.class.getName(),
"iceberg.tables", "db.landing",
"iceberg.tables.dynamic.namePrefix", "db.tbl_");
"iceberg.tables.dynamic.enabled", "true");
assertThatExceptionOfType(ConfigException.class).isThrownBy(() -> new IcebergSinkConfig(props));
}

Expand Down

0 comments on commit 00eca36

Please sign in to comment.