Skip to content

Commit

Permalink
handle enum during vstream copy
Browse files Browse the repository at this point in the history
Enable support for enums during vstream copy phase.

There are two reasons that the connector does not handle `enum` for PSDB branches.

 1. The upstream debezium-connector-vitess simply does not support
    `enum` during the VStream copy phase. It tries to cast the row value
    to an integer, but the value is a string. It seems support for
    `enum` landed in 2021
    debezium#20, and
    support for snapshots (VStream Copy) landed in 2022
    debezium#112,
    without taking the former into account. This is easily fixed by
    finding finding the index of the string
    value in the list of values obtained from `column_type` during the
    schema discovery phase at the beginning of the VStream.
 2. However, this isn't working on some PSDB branches which don't have
    the fix vitessio/vitess#13045 for this bug
    vitessio/vitess#12981. Fixable by
    backporting the bugfix or upgrading those branches.

Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Oct 21, 2024
1 parent f1d3d25 commit 504fadd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/main/java/io/debezium/connector/vitess/VitessType.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,32 @@ public int getJdbcId() {
return jdbcId;
}

public Integer getEnumOrdinal(String value) {
int index = enumValues.indexOf(value);
if (index == -1) {
return Integer.valueOf(value);
}
return Integer.valueOf(index + 1);
}

public List<String> getEnumValues() {
return enumValues;
}

public Long getSetNumeral(String value) {
String[] members = value.split(",");
Long result = 0L;
for (String member : members) {
long index = enumValues.indexOf(member);
if (index == -1) {
index = Long.valueOf(member);
}
Double power = Math.pow(2, index);
result = result + Long.valueOf(power.longValue());
}
return result;
}

public boolean isEnum() {
return !enumValues.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ public static Object resolveValue(
case Types.SMALLINT:
return value.asShort();
case Types.INTEGER:
if (vitessType.isEnum()) {
return vitessType.getEnumOrdinal(value.asString());
}
return value.asInteger();
case Types.BIGINT:
if (vitessType.isEnum()) {
return vitessType.getSetNumeral(value.asString());
}
return value.asLong();
case Types.BLOB:
case Types.BINARY:
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,28 @@ public void testInitialOnlySnapshot() throws Exception {
consumer.await(Math.min(2, TestHelper.waitTimeForRecords()), TimeUnit.SECONDS);
}

@Test
public void shouldHandleEnumAndSetDuringTableCopy() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");

TestHelper.execute(INSERT_ENUM_TYPE_STMT, TEST_UNSHARDED_KEYSPACE);
TestHelper.execute(INSERT_SET_TYPE_STMT, TEST_UNSHARDED_KEYSPACE);

final String enumTableName = "enum_table";
final String setTableName = "set_table";

String tableInclude = TEST_UNSHARDED_KEYSPACE + "." + enumTableName + "," + TEST_UNSHARDED_KEYSPACE + "." + setTableName;
startConnector(Function.identity(), false, false, 1,
-1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD);

int expectedRecordsCount = 2;
consumer = testConsumer(expectedRecordsCount);

consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);

stopConnector();
}

private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_UNSHARDED_KEYSPACE);

Expand Down

0 comments on commit 504fadd

Please sign in to comment.