diff --git a/src/main/java/io/debezium/connector/vitess/VitessType.java b/src/main/java/io/debezium/connector/vitess/VitessType.java index ba33374f..b5520d9d 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessType.java +++ b/src/main/java/io/debezium/connector/vitess/VitessType.java @@ -41,10 +41,32 @@ public int getJdbcId() { return jdbcId; } + public Integer getEnumOrdinal(String value) { + int index = enumValues.indexOf(value); + if (index == -1) { + return Integer.valueOf(value); + } + return Integer.valueOf(index + 1); + } + public List getEnumValues() { return enumValues; } + public Long getSetNumeral(String value) { + String[] members = value.split(","); + Long result = 0L; + for (String member : members) { + long index = enumValues.indexOf(member); + if (index == -1) { + index = Long.valueOf(member); + } + Double power = Math.pow(2, index); + result = result + Long.valueOf(power.longValue()); + } + return result; + } + public boolean isEnum() { return !enumValues.isEmpty(); } diff --git a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessageColumnValueResolver.java b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessageColumnValueResolver.java index 6000609e..65614188 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessageColumnValueResolver.java +++ b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessageColumnValueResolver.java @@ -22,8 +22,14 @@ public static Object resolveValue( case Types.SMALLINT: return value.asShort(); case Types.INTEGER: + if (vitessType.isEnum()) { + return vitessType.getEnumOrdinal(value.asString()); + } return value.asInteger(); case Types.BIGINT: + if (vitessType.isEnum()) { + return vitessType.getSetNumeral(value.asString()); + } return value.asLong(); case Types.BLOB: case Types.BINARY: diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index f5a726d5..089d97bb 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -990,6 +990,28 @@ public void testInitialOnlySnapshot() throws Exception { consumer.await(Math.min(2, TestHelper.waitTimeForRecords()), TimeUnit.SECONDS); } + @Test + public void shouldHandleEnumAndSetDuringTableCopy() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + + TestHelper.execute(INSERT_ENUM_TYPE_STMT, TEST_UNSHARDED_KEYSPACE); + TestHelper.execute(INSERT_SET_TYPE_STMT, TEST_UNSHARDED_KEYSPACE); + + final String enumTableName = "enum_table"; + final String setTableName = "set_table"; + + String tableInclude = TEST_UNSHARDED_KEYSPACE + "." + enumTableName + "," + TEST_UNSHARDED_KEYSPACE + "." + setTableName; + startConnector(Function.identity(), false, false, 1, + -1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD); + + int expectedRecordsCount = 2; + consumer = testConsumer(expectedRecordsCount); + + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + + stopConnector(); + } + private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception { TestHelper.executeDDL("vitess_create_tables.ddl", TEST_UNSHARDED_KEYSPACE);