diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index f7a697625c..85484b2ce9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -27,15 +27,20 @@ @EqualsAndHashCode(callSuper = true) public class CanalSinkConfig extends SinkConfig { - private Integer batchSize = 50; // batchSize + // batchSize + private Integer batchSize = 50; - private Boolean useBatch = true; // enable batch + // enable batch + private Boolean useBatch = true; - private Integer poolSize = 5; // sink thread size for single channel + // sink thread size for single channel + private Integer poolSize = 5; - private SyncMode syncMode; // sync mode: field/row + // sync mode: field/row + private SyncMode syncMode; - private Boolean skipException = false; // skip sink process exception + // skip sink process exception + private Boolean skipException = false; public SinkConnectorConfig sinkConnectorConfig; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index e5edc5a78e..d6e6a7790d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -54,27 +54,23 @@ public class CanalSourceConfig extends SourceConfig { // ================================= channel parameter // ================================ - private Boolean enableRemedy = false; // enable remedy + // enable remedy + private Boolean enableRemedy = false; - private SyncMode syncMode; // sync mode: field/row + // sync mode: field/row + private SyncMode syncMode; - private SyncConsistency syncConsistency; // sync consistency + // sync consistency + private SyncConsistency syncConsistency; // ================================= system parameter // ================================ - private String systemSchema; // Default is retl + // Column name of the bidirectional synchronization mark + private String needSyncMarkTableColumnName = "needSync"; - private String systemMarkTable; // Bidirectional synchronization mark table - - private String systemMarkTableColumn; // Column name of the bidirectional synchronization mark - - private String systemMarkTableInfo; - // nfo information of the bidirectional synchronization mark, similar to BI_SYNC - - private String systemBufferTable; // sync buffer table - - private String systemDualTable; // sync heartbeat table + // Column value of the bidirectional synchronization mark + private String needSyncMarkTableColumnValue = "needSync"; private SourceConnectorConfig sourceConnectorConfig; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index 32bb79b54e..1a47a05211 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -29,19 +29,11 @@ public class MysqlDialect extends AbstractDbDialect { - private Map, String> shardColumns; - public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) { super(jdbcTemplate, lobHandler); sqlTemplate = new MysqlSqlTemplate(); } - public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String name, String databaseVersion, - int majorVersion, int minorVersion) { - super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion); - sqlTemplate = new MysqlSqlTemplate(); - } - public boolean isCharSpacePadded() { return false; } @@ -66,16 +58,8 @@ public boolean isDRDS() { return false; } - public String getShardColumns(String schema, String table) { - if (isDRDS()) { - return shardColumns.get(Arrays.asList(schema, table)); - } else { - return null; - } - } - public String getDefaultCatalog() { - return (String) jdbcTemplate.queryForObject("select database()", String.class); + return jdbcTemplate.queryForObject("select database()", String.class); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java index ab0776c17d..24d6b42f8b 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java @@ -28,11 +28,16 @@ import org.springframework.util.CollectionUtils; +import lombok.Getter; +import lombok.Setter; + /** * compute latest sql */ public class SqlBuilderLoadInterceptor { + @Getter + @Setter private DbDialect dbDialect; public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) { @@ -128,12 +133,4 @@ private String[] buildColumnNames(List columns1, List } return result; } - - public DbDialect getDbDialect() { - return dbDialect; - } - - public void setDbDialect(DbDialect dbDialect) { - this.dbDialect = dbDialect; - } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 3031a15df0..c54462374c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -23,7 +23,7 @@ import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable; import org.apache.eventmesh.connector.canal.model.EventType; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -49,57 +49,67 @@ public class EntryParser { public List parse(CanalSourceConfig sourceConfig, List datas) { List recordList = new ArrayList<>(); List transactionDataBuffer = new ArrayList<>(); + // need check weather the entry is loopback + boolean needSync; try { for (Entry entry : datas) { switch (entry.getEntryType()) { - case TRANSACTIONBEGIN: - break; case ROWDATA: - transactionDataBuffer.add(entry); + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0)); + if (needSync) { + transactionDataBuffer.add(entry); + } break; case TRANSACTIONEND: - for (Entry bufferEntry : transactionDataBuffer) { - List recordParsedList = internParse(sourceConfig, bufferEntry); - if (CollectionUtils.isEmpty(recordParsedList)) { - continue; - } - long totalSize = bufferEntry.getHeader().getEventLength(); - long eachSize = totalSize / recordParsedList.size(); - for (CanalConnectRecord record : recordParsedList) { - if (record == null) { - continue; - } - record.setSize(eachSize); - recordList.add(record); - } - } + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); transactionDataBuffer.clear(); break; default: break; } } + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); + } catch (Exception e) { + throw new RuntimeException(e); + } + return recordList; + } - for (Entry bufferEntry : transactionDataBuffer) { - List recordParsedList = internParse(sourceConfig, bufferEntry); - if (CollectionUtils.isEmpty(recordParsedList)) { + private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List recordList, + List transactionDataBuffer) { + for (Entry bufferEntry : transactionDataBuffer) { + List recordParsedList = internParse(sourceConfig, bufferEntry); + if (CollectionUtils.isEmpty(recordParsedList)) { + continue; + } + long totalSize = bufferEntry.getHeader().getEventLength(); + long eachSize = totalSize / recordParsedList.size(); + for (CanalConnectRecord record : recordParsedList) { + if (record == null) { continue; } + record.setSize(eachSize); + recordList.add(record); + } + } + } - long totalSize = bufferEntry.getHeader().getEventLength(); - long eachSize = totalSize / recordParsedList.size(); - for (CanalConnectRecord record : recordParsedList) { - if (record == null) { - continue; - } - record.setSize(eachSize); - recordList.add(record); - } + private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { + Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName()); + if (markedColumn != null) { + return StringUtils.equalsIgnoreCase(markedColumn.getValue(), sourceConfig.getNeedSyncMarkTableColumnValue()); + } + return false; + } + + private Column getColumnIgnoreCase(List columns, String columName) { + for (Column column : columns) { + if (column.getName().equalsIgnoreCase(columName)) { + return column; } - } catch (Exception e) { - throw new RuntimeException(e); } - return recordList; + return null; } private List internParse(CanalSourceConfig sourceConfig, Entry entry) { @@ -127,20 +137,9 @@ private List internParse(CanalSourceConfig sourceConfig, Ent return null; } - if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(), schemaName)) { - // do noting - if (eventType.isDdl()) { - return null; - } - - if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) { - return null; - } - } else { - if (eventType.isDdl()) { - log.warn("unsupported ddl event type: {}", eventType); - return null; - } + if (eventType.isDdl()) { + log.warn("unsupported ddl event type: {}", eventType); + return null; } List recordList = new ArrayList<>(); @@ -164,13 +163,12 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr List beforeColumns = rowData.getBeforeColumnsList(); List afterColumns = rowData.getAfterColumnsList(); - String tableName = canalConnectRecord.getSchemaName() + "." + canalConnectRecord.getTableName(); boolean isRowMode = canalSourceConfig.getSyncMode().isRow(); - Map keyColumns = new LinkedHashMap(); - Map oldKeyColumns = new LinkedHashMap(); - Map notKeyColumns = new LinkedHashMap(); + Map keyColumns = new LinkedHashMap<>(); + Map oldKeyColumns = new LinkedHashMap<>(); + Map notKeyColumns = new LinkedHashMap<>(); if (eventType.isInsert()) { for (Column column : afterColumns) { @@ -195,7 +193,7 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr keyColumns.put(column.getName(), copyEventColumn(column, true)); } else { if (isRowMode && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) { - notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode)); + notKeyColumns.put(column.getName(), copyEventColumn(column, true)); } } } @@ -233,7 +231,7 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr } canalConnectRecord.setColumns(columns); } else { - throw new RuntimeException("this row data has no pks , entry: " + entry.toString() + " and rowData: " + throw new RuntimeException("this row data has no pks , entry: " + entry + " and rowData: " + rowData); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java index 10c647c8f1..ceb509ef71 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java @@ -32,7 +32,7 @@ public String getSelectSql(String schemaName, String tableName, String[] pkNames } sql.append(" from ").append(getFullName(schemaName, tableName)).append(" where ( "); - appendColumnEquals(sql, pkNames, "and"); + appendColumnEquals(sql, pkNames); sql.append(" ) "); return sql.toString().intern(); } @@ -41,7 +41,7 @@ public String getUpdateSql(String schemaName, String tableName, String[] pkNames StringBuilder sql = new StringBuilder("update " + getFullName(schemaName, tableName) + " set "); appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks, shardColumn); sql.append(" where ("); - appendColumnEquals(sql, pkNames, "and"); + appendColumnEquals(sql, pkNames); sql.append(")"); return sql.toString().intern(); } @@ -65,7 +65,7 @@ public String getInsertSql(String schemaName, String tableName, String[] pkNames public String getDeleteSql(String schemaName, String tableName, String[] pkNames) { StringBuilder sql = new StringBuilder("delete from " + getFullName(schemaName, tableName) + " where "); - appendColumnEquals(sql, pkNames, "and"); + appendColumnEquals(sql, pkNames); return sql.toString().intern(); } @@ -91,12 +91,12 @@ protected void appendColumnQuestions(StringBuilder sql, String[] columns) { } } - protected void appendColumnEquals(StringBuilder sql, String[] columns, String separator) { + protected void appendColumnEquals(StringBuilder sql, String[] columns) { int size = columns.length; for (int i = 0; i < size; i++) { sql.append(" ").append(appendEscape(columns[i])).append(" = ").append("? "); if (i != size - 1) { - sql.append(separator); + sql.append("and"); } } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java index a169ed20f1..37b45c746f 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java @@ -47,7 +47,7 @@ public String getMergeSql(String schemaName, String tableName, String[] pkNames, size = columnNames.length; for (int i = 0; i < size; i++) { - if (!includePks && shardColumn != null && columnNames[i].equals(shardColumn)) { + if (!includePks && columnNames[i].equals(shardColumn)) { continue; } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java index 0fade897f6..56b3a59675 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java @@ -45,7 +45,7 @@ public class RuntimeInstance { private Map adminServerInfoMap = new HashMap<>(); - private final RegistryService registryService; +// private final RegistryService registryService; private Runtime runtime; @@ -57,20 +57,20 @@ public class RuntimeInstance { public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; - this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); +// this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); } public void init() throws Exception { - registryService.init(); - QueryInstances queryInstances = new QueryInstances(); - queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); - queryInstances.setHealth(true); - List adminServerRegisterInfoList = registryService.selectInstances(queryInstances); - if (!adminServerRegisterInfoList.isEmpty()) { - adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); - } else { - throw new RuntimeException("admin server address is empty, please check"); - } +// registryService.init(); +// QueryInstances queryInstances = new QueryInstances(); +// queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); +// queryInstances.setHealth(true); +// List adminServerRegisterInfoList = registryService.selectInstances(queryInstances); +// if (!adminServerRegisterInfoList.isEmpty()) { +// adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); +// } else { +// throw new RuntimeException("admin server address is empty, please check"); +// } runtimeInstanceConfig.setAdminServerAddr(adminServerAddr); runtimeFactory = initRuntimeFactory(runtimeInstanceConfig); runtime = runtimeFactory.createRuntime(runtimeInstanceConfig); @@ -80,19 +80,19 @@ public void init() throws Exception { public void start() throws Exception { if (!StringUtils.isBlank(adminServerAddr)) { - registryService.subscribe((event) -> { - log.info("runtime receive registry event: {}", event); - List registerServerInfoList = event.getInstances(); - Map registerServerInfoMap = new HashMap<>(); - for (RegisterServerInfo registerServerInfo : registerServerInfoList) { - registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); - } - if (!registerServerInfoMap.isEmpty()) { - adminServerInfoMap = registerServerInfoMap; - updateAdminServerAddr(); - } - - }, runtimeInstanceConfig.getAdminServiceName()); +// registryService.subscribe((event) -> { +// log.info("runtime receive registry event: {}", event); +// List registerServerInfoList = event.getInstances(); +// Map registerServerInfoMap = new HashMap<>(); +// for (RegisterServerInfo registerServerInfo : registerServerInfoList) { +// registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); +// } +// if (!registerServerInfoMap.isEmpty()) { +// adminServerInfoMap = registerServerInfoMap; +// updateAdminServerAddr(); +// } +// +// }, runtimeInstanceConfig.getAdminServiceName()); runtime.start(); isStarted = true; } else {