diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index 8d95ef12e1..6dfa5fbc1f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -1,6 +1,7 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; import org.apache.eventmesh.common.config.connector.SinkConfig; +import org.apache.eventmesh.common.remote.job.SyncMode; import lombok.Data; import lombok.EqualsAndHashCode; @@ -9,6 +10,8 @@ @EqualsAndHashCode(callSuper = true) public class CanalSinkConfig extends SinkConfig { + private SyncMode syncMode; // 同步模式:字段/整条记录 + public SinkConnectorConfig sinkConnectorConfig; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index 8877e04ddc..88ac366d3e 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -48,11 +48,6 @@ public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String nam int majorVersion, int minorVersion) { super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion); sqlTemplate = new MysqlSqlTemplate(); - - if (StringUtils.contains(databaseVersion, "-TDDL-")) { - isDRDS = true; -// initShardColumns(); - } } public boolean isCharSpacePadded() { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/AbstractLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/AbstractLoadInterceptor.java deleted file mode 100644 index 2f15c38744..0000000000 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/AbstractLoadInterceptor.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2010-2101 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.connector.canal.interceptor; - -import org.apache.eventmesh.connector.canal.dialect.DbDialect; - -import java.util.List; - - - -/** - * 提供接口的默认实现 - * - */ -public class AbstractLoadInterceptor implements LoadInterceptor { - - public void prepare(L context) { - } - - public boolean before(L context, D currentData) { - return false; - } - - public void transactionBegin(L context, List currentDatas, DbDialect dialect) { - } - - public void transactionEnd(L context, List currentDatas, DbDialect dialect) { - } - - public void after(L context, D currentData) { - - } - - public void commit(L context) { - } - - public void error(L context) { - } - -} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/LoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/LoadInterceptor.java deleted file mode 100644 index a15ecbb70a..0000000000 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/LoadInterceptor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2010-2101 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.connector.canal.interceptor; - -import org.apache.eventmesh.connector.canal.dialect.DbDialect; - -import java.util.List; - -public interface LoadInterceptor { - - public void prepare(L context); - - /** - * 返回值代表是否需要过滤该记录,true即为过滤不处理 - */ - public boolean before(L context, D currentData); - - public void transactionBegin(L context, List currentDatas, DbDialect dialect); - - public void transactionEnd(L context, List currentDatas, DbDialect dialect); - - public void after(L context, D currentData); - - public void commit(L context); - - public void error(L context); -} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java index 7d42584a7b..2b526cfed8 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java @@ -16,7 +16,9 @@ package org.apache.eventmesh.connector.canal.interceptor; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; import org.apache.eventmesh.connector.canal.CanalConnectRecord; +import org.apache.eventmesh.connector.canal.MysqlSqlTemplate; import org.apache.eventmesh.connector.canal.SqlTemplate; import org.apache.eventmesh.connector.canal.dialect.DbDialect; import org.apache.eventmesh.connector.canal.dialect.MysqlDialect; @@ -31,45 +33,37 @@ * 计算下最新的sql语句 * */ -public class SqlBuilderLoadInterceptor extends AbstractLoadInterceptor { +public class SqlBuilderLoadInterceptor { - private DbDialectFactory dbDialectFactory; + private DbDialect dbDialect; - public boolean before(DbLoadContext context, CanalConnectRecord currentData) { + public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) { // 初步构建sql -// DbDialect dbDialect = dbDialectFactory.getDbDialect(context.getIdentity().getPipelineId(), -// (DbMediaSource) context.getDataMediaSource()); - DbDialect dbDialect = new MysqlDialect(); SqlTemplate sqlTemplate = dbDialect.getSqlTemplate(); - EventType type = currentData.getEventType(); + EventType type = record.getEventType(); String sql = null; - String schemaName = (currentData.isWithoutSchema() ? null : currentData.getSchemaName()); + String schemaName = (record.isWithoutSchema() ? null : record.getSchemaName()); /** * 针对DRDS数据库 */ String shardColumns = null; -// if(dbDialect.isDRDS()){ -// // 获取拆分键 -// shardColumns = dbDialect.getShardColumns(schemaName, currentData.getTableName()); -// -// } // 注意insert/update语句对应的字段数序都是将主键排在后面 if (type.isInsert()) { - if (CollectionUtils.isEmpty(currentData.getColumns()) + if (CollectionUtils.isEmpty(record.getColumns()) && (dbDialect.isDRDS())) { // 如果表为全主键,直接进行insert // sql sql = sqlTemplate.getInsertSql(schemaName, - currentData.getTableName(), - buildColumnNames(currentData.getKeys()), - buildColumnNames(currentData.getColumns())); + record.getTableName(), + buildColumnNames(record.getKeys()), + buildColumnNames(record.getColumns())); } else { sql = sqlTemplate.getMergeSql(schemaName, - currentData.getTableName(), - buildColumnNames(currentData.getKeys()), - buildColumnNames(currentData.getColumns()), + record.getTableName(), + buildColumnNames(record.getKeys()), + buildColumnNames(record.getColumns()), new String[] {}, !dbDialect.isDRDS(), shardColumns); @@ -87,47 +81,47 @@ public boolean before(DbLoadContext context, CanalConnectRecord currentData) { // } // } - boolean existOldKeys = !CollectionUtils.isEmpty(currentData.getOldKeys()); - boolean rowMode = context.getPipeline().getParameters().getSyncMode().isRow(); + boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys()); + boolean rowMode = sinkConfig.getSyncMode().isRow(); String[] keyColumns = null; String[] otherColumns = null; if (existOldKeys) { // 需要考虑主键变更的场景 // 构造sql如下:update table xxx set pk = newPK where pk = oldPk - keyColumns = buildColumnNames(currentData.getOldKeys()); + keyColumns = buildColumnNames(record.getOldKeys()); // 这里需要精确获取变更的主键,因为目标为DRDS时主键会包含拆分键,正常的原主键变更只更新对应的单主键列即可 if (dbDialect.isDRDS()) { - otherColumns = buildColumnNames(currentData.getUpdatedColumns(), currentData.getUpdatedKeys()); + otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys()); } else { - otherColumns = buildColumnNames(currentData.getUpdatedColumns(), currentData.getKeys()); + otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys()); } } else { - keyColumns = buildColumnNames(currentData.getKeys()); - otherColumns = buildColumnNames(currentData.getUpdatedColumns()); + keyColumns = buildColumnNames(record.getKeys()); + otherColumns = buildColumnNames(record.getUpdatedColumns()); } if (rowMode && !existOldKeys) {// 如果是行记录,并且不存在主键变更,考虑merge sql sql = sqlTemplate.getMergeSql(schemaName, - currentData.getTableName(), + record.getTableName(), keyColumns, otherColumns, new String[] {}, !dbDialect.isDRDS(), shardColumns); } else {// 否则进行update sql - sql = sqlTemplate.getUpdateSql(schemaName, currentData.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns); + sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns); } } else if (type.isDelete()) { sql = sqlTemplate.getDeleteSql(schemaName, - currentData.getTableName(), - buildColumnNames(currentData.getKeys())); + record.getTableName(), + buildColumnNames(record.getKeys())); } // 处理下hint sql - if (currentData.getHint() != null) { - currentData.setSql(currentData.getHint() + sql); + if (record.getHint() != null) { + record.setSql(record.getHint() + sql); } else { - currentData.setSql(sql); + record.setSql(sql); } return false; } @@ -156,10 +150,11 @@ private String[] buildColumnNames(List columns1, List return result; } - // =============== setter / getter ============= - - public void setDbDialectFactory(DbDialectFactory dbDialectFactory) { - this.dbDialectFactory = dbDialectFactory; + public DbDialect getDbDialect() { + return dbDialect; } + public void setDbDialect(DbDialect dbDialect) { + this.dbDialect = dbDialect; + } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index eee3ae4d67..a4de4bec5f 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -25,6 +25,9 @@ import org.apache.eventmesh.connector.canal.CanalConnectRecord; import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.connector.canal.MysqlSqlTemplate; +import org.apache.eventmesh.connector.canal.dialect.DbDialect; +import org.apache.eventmesh.connector.canal.dialect.MysqlDialect; +import org.apache.eventmesh.connector.canal.interceptor.SqlBuilderLoadInterceptor; import org.apache.eventmesh.connector.canal.sink.DbLoadData; import org.apache.eventmesh.connector.canal.sink.DbLoadData.TableLoadData; import org.apache.eventmesh.connector.canal.sink.DbLoadMerger; @@ -49,6 +52,7 @@ import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.StatementCallback; +import org.springframework.jdbc.support.lob.DefaultLobHandler; import org.springframework.util.CollectionUtils; import lombok.extern.slf4j.Slf4j; @@ -62,6 +66,10 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService { private MysqlSqlTemplate mysqlSqlTemplate; + private SqlBuilderLoadInterceptor interceptor; + + private DbDialect dbDialect; + @Override public Class configClass() { return CanalSinkConfig.class; @@ -81,6 +89,9 @@ public void init(ConnectorContext connectorContext) throws Exception { DatabaseConnection.sinkConfig = this.sinkConfig; DatabaseConnection.initSinkConnection(); jdbcTemplate = new JdbcTemplate(DatabaseConnection.sinkDataSource); + dbDialect = new MysqlDialect(jdbcTemplate, new DefaultLobHandler()); + interceptor = new SqlBuilderLoadInterceptor(); + interceptor.setDbDialect(dbDialect); } @Override @@ -190,9 +201,9 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce */ private void doBefore(List canalConnectRecordList, final DbLoadData loadData) { for (final CanalConnectRecord record : canalConnectRecordList) { - boolean filter = interceptor.before(context, record); + boolean filter = interceptor.before(sinkConfig, record); if (!filter) { - loadData.merge(item);// 进行分类 + loadData.merge(record);// 进行分类 } } } @@ -213,11 +224,8 @@ private void doLoad(final DbLoadContext context, DbLoadData loadData) { } } - if (context.getPipeline().getParameters().isDryRun()) { - doDryRun(context, batchDatas, true); - } else { - doTwoPhase(context, batchDatas, true); - } + doTwoPhase(context, batchDatas, true); + batchDatas.clear(); // 处理下insert/update @@ -237,23 +245,20 @@ private void doLoad(final DbLoadContext context, DbLoadData loadData) { } } - if (context.getPipeline().getParameters().isDryRun()) { - doDryRun(context, batchDatas, true); - } else { - doTwoPhase(context, batchDatas, true); - } + doTwoPhase(context, batchDatas, true); + batchDatas.clear(); } /** * 将对应的数据按照sql相同进行batch组合 */ - private List> split(List datas) { + private List> split(List records) { List> result = new ArrayList<>(); - if (datas == null || datas.size() == 0) { + if (records == null || records.isEmpty()) { return result; } else { - int[] bits = new int[datas.size()];// 初始化一个标记,用于标明对应的记录是否已分入某个batch + int[] bits = new int[records.size()];// 初始化一个标记,用于标明对应的记录是否已分入某个batch for (int i = 0; i < bits.length; i++) { // 跳过已经被分入batch的 while (i < bits.length && bits[i] == 1) { @@ -267,10 +272,10 @@ private List> split(List datas) { // 开始添加batch,最大只加入batchSize个数的对象 List batch = new ArrayList<>(); bits[i] = 1; - batch.add(datas.get(i)); + batch.add(records.get(i)); for (int j = i + 1; j < bits.length && batch.size() < batchSize; j++) { - if (bits[j] == 0 && canBatch(datas.get(i), datas.get(j))) { - batch.add(datas.get(j)); + if (bits[j] == 0 && canBatch(records.get(i), records.get(j))) { + batch.add(records.get(j)); bits[j] = 1;// 修改为已加入 } } @@ -301,7 +306,7 @@ private boolean canBatch(CanalConnectRecord source, CanalConnectRecord target) { private void doTwoPhase(DbLoadContext context, List> totalRows, boolean canBatch) { // 预处理下数据 List> results = new ArrayList>(); - for (List rows : totalRows) { + for (List rows : totalRows) { if (CollectionUtils.isEmpty(rows)) { continue; // 过滤空记录 } @@ -315,7 +320,7 @@ private void doTwoPhase(DbLoadContext context, List> to Exception ex = null; try { ex = result.get(); - for (EventData data : totalRows.get(i)) { + for (CanalConnectRecord data : totalRows.get(i)) { interceptor.after(context, data);// 通知加载完成 } } catch (Exception e) { @@ -323,20 +328,20 @@ private void doTwoPhase(DbLoadContext context, List> to } if (ex != null) { - logger.warn("##load phase one failed!", ex); + log.warn("##load phase one failed!", ex); partFailed = true; } } - if (true == partFailed) { + if (partFailed) { // if (CollectionUtils.isEmpty(context.getFailedDatas())) { // logger.error("##load phase one failed but failedDatas is empty!"); // return; // } // 尝试的内容换成phase one跑的所有数据,避免因failed datas计算错误而导致丢数据 - List retryEventDatas = new ArrayList(); - for (List rows : totalRows) { + List retryEventDatas = new ArrayList(); + for (List rows : totalRows) { retryEventDatas.addAll(rows); } @@ -345,19 +350,19 @@ private void doTwoPhase(DbLoadContext context, List> to // 可能为null,manager老版本数据序列化传输时,因为数据库中没有skipLoadException变量配置 Boolean skipLoadException = context.getPipeline().getParameters().getSkipLoadException(); if (skipLoadException != null && skipLoadException) {// 如果设置为允许跳过单条异常,则一条条执行数据load,准确过滤掉出错的记录,并进行日志记录 - for (EventData retryEventData : retryEventDatas) { + for (CanalConnectRecord retryEventData : retryEventDatas) { DbLoadWorker worker = new DbLoadWorker(context, Arrays.asList(retryEventData), false);// 强制设置batch为false try { Exception ex = worker.call(); if (ex != null) { // do skip - logger.warn("skip exception for data : {} , caused by {}", + log.warn("skip exception for data : {} , caused by {}", retryEventData, ExceptionUtils.getFullStackTrace(ex)); } } catch (Exception ex) { // do skip - logger.warn("skip exception for data : {} , caused by {}", + log.warn("skip exception for data : {} , caused by {}", retryEventData, ExceptionUtils.getFullStackTrace(ex)); } @@ -371,14 +376,14 @@ private void doTwoPhase(DbLoadContext context, List> to throw ex; // 自己抛自己接 } } catch (Exception ex) { - logger.error("##load phase two failed!", ex); - throw new LoadException(ex); + log.error("##load phase two failed!", ex); + throw new RuntimeException(ex); } } // 清理failed data数据 - for (EventData data : retryEventDatas) { - interceptor.after(context, data);// 通知加载完成 + for (CanalConnectRecord record : retryEventDatas) { + interceptor.after(context, record);// 通知加载完成 } }