From f88a19453c0b8c91f331dddbc7ef8ac13c3a9a8d Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Wed, 24 Jul 2024 14:34:18 +0800 Subject: [PATCH] [ISSUE #5040] Support gtid mode for sync data with mysql (#5041) * [ISSUE #5040] Support gtid mode for sync data with mysql * fix conflicts with master * fix checkstyle error --- eventmesh-admin-server/conf/eventmesh.sql | 3 + .../mapper/EventMeshMysqlPositionMapper.xml | 28 +- .../web/db/entity/EventMeshMysqlPosition.java | 6 + .../position/impl/MysqlPositionHandler.java | 8 +- .../connector/rdb/canal/CanalSinkConfig.java | 2 + .../rdb/canal/CanalSourceConfig.java | 4 + .../offset/canal/CanalRecordOffset.java | 5 + .../offset/canal/CanalRecordPartition.java | 2 + .../connector/canal/CanalConnectRecord.java | 6 + .../canal/dialect/AbstractDbDialect.java | 4 - .../connector/canal/dialect/DbDialect.java | 2 - .../connector/canal/dialect/MysqlDialect.java | 4 - .../SqlBuilderLoadInterceptor.java | 24 +- .../connector/canal/sink/DbLoadContext.java | 2 + .../connector/canal/sink/GtidBatch.java | 48 +++ .../canal/sink/GtidBatchManager.java | 45 ++ .../sink/connector/CanalSinkConnector.java | 404 +++++++++++++----- .../connector/canal/source/EntryParser.java | 33 +- .../connector/CanalSourceConnector.java | 51 ++- 19 files changed, 511 insertions(+), 170 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 3b6fc9b777..226101661c 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -71,8 +71,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `jobID` int unsigned NOT NULL, + `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `position` bigint DEFAULT NULL, + `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `timestamp` bigint DEFAULT NULL, `journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml index bc3a3292a2..cbb7c094d8 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml @@ -16,24 +16,28 @@ limitations under the License. --> + PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> - - - - - - - - + + + + + + + + + + + - id,jobID,address, - position,timestamp,journalName, + id + ,jobID,serverUUID,address, + position,gtid,currentGtid,timestamp,journalName, createTime,updateTime diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java index ffe3e446d4..65a38b54b5 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java @@ -38,10 +38,16 @@ public class EventMeshMysqlPosition implements Serializable { private Integer jobID; + private String serverUUID; + private String address; private Long position; + private String gtid; + + private String currentGtid; + private Long timestamp; private String journalName; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index 525fe02c0d..f2c174c3b7 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -115,9 +115,12 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); if (offset != null) { position.setPosition(offset.getOffset()); + position.setGtid(offset.getGtid()); + position.setCurrentGtid(offset.getCurrentGtid()); } CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition(); if (partition != null) { + position.setServerUUID(partition.getServerUUID()); position.setTimestamp(partition.getTimeStamp()); position.setJournalName(partition.getJournalName()); } @@ -148,13 +151,16 @@ public List handler(FetchPositionRequest request, Metadata metad request.getJobID())); List recordPositionList = new ArrayList<>(); for (EventMeshMysqlPosition position : positionList) { - RecordPosition recordPosition = new RecordPosition(); CanalRecordPartition partition = new CanalRecordPartition(); partition.setTimeStamp(position.getTimestamp()); partition.setJournalName(position.getJournalName()); + partition.setServerUUID(position.getServerUUID()); + RecordPosition recordPosition = new RecordPosition(); recordPosition.setRecordPartition(partition); CanalRecordOffset offset = new CanalRecordOffset(); offset.setOffset(position.getPosition()); + offset.setGtid(position.getGtid()); + offset.setCurrentGtid(position.getCurrentGtid()); recordPosition.setRecordOffset(offset); recordPositionList.add(recordPosition); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index 85484b2ce9..80aec7bfe9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -39,6 +39,8 @@ public class CanalSinkConfig extends SinkConfig { // sync mode: field/row private SyncMode syncMode; + private boolean isGTIDMode = true; + // skip sink process exception private Boolean skipException = false; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index d75ceb6b58..707f102901 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -45,6 +45,10 @@ public class CanalSourceConfig extends SourceConfig { private Short clientId; + private String serverUUID; + + private boolean isGTIDMode = true; + private Integer batchSize = 10000; private Long batchTimeout = -1L; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java index 90c94c99bd..d0f2053f4d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java @@ -30,6 +30,11 @@ public class CanalRecordOffset extends RecordOffset { private Long offset; + // mysql instance gtid range + private String gtid; + + private String currentGtid; + @Override public Class getRecordOffsetClass() { return CanalRecordOffset.class; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java index 72d404bab9..ded82306e3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java @@ -29,6 +29,8 @@ @ToString public class CanalRecordPartition extends RecordPartition { + private String serverUUID; + private String journalName; private Long timeStamp; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java index a723b24dc3..36ecd158f6 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java @@ -31,8 +31,14 @@ public class CanalConnectRecord { private String schemaName; + private String tableName; + // mysql instance gtid range + private String gtid; + + private String currentGtid; + /** * The business type of the changed data (I/U/D/C/A/E), consistent with the EventType defined in EntryProtocol in canal. */ diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java index f5c2245b9f..4cf0f82ec9 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java @@ -97,10 +97,6 @@ public SqlTemplate getSqlTemplate() { return sqlTemplate; } - public boolean isDRDS() { - return false; - } - public String getShardColumns(String schema, String table) { return null; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java index a18edfd5b2..781c2fe954 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java @@ -48,8 +48,6 @@ public interface DbDialect { public boolean isSupportMergeSql(); - public boolean isDRDS(); - public LobHandler getLobHandler(); public JdbcTemplate getJdbcTemplate(); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index acd491ba64..bfe5628716 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -50,10 +50,6 @@ public String getDefaultSchema() { return null; } - public boolean isDRDS() { - return false; - } - public String getDefaultCatalog() { return jdbcTemplate.queryForObject("select database()", String.class); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java index 24d6b42f8b..0ad07577f9 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java @@ -51,35 +51,21 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) { String shardColumns = null; if (type.isInsert()) { - if (CollectionUtils.isEmpty(record.getColumns()) - && (dbDialect.isDRDS())) { - // sql - sql = sqlTemplate.getInsertSql(schemaName, - record.getTableName(), - buildColumnNames(record.getKeys()), - buildColumnNames(record.getColumns())); - } else { - sql = sqlTemplate.getMergeSql(schemaName, + sql = sqlTemplate.getMergeSql(schemaName, record.getTableName(), buildColumnNames(record.getKeys()), buildColumnNames(record.getColumns()), new String[] {}, - !dbDialect.isDRDS(), + true, shardColumns); - } } else if (type.isUpdate()) { - boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys()); boolean rowMode = sinkConfig.getSyncMode().isRow(); String[] keyColumns = null; String[] otherColumns = null; if (existOldKeys) { keyColumns = buildColumnNames(record.getOldKeys()); - if (dbDialect.isDRDS()) { - otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys()); - } else { - otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys()); - } + otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys()); } else { keyColumns = buildColumnNames(record.getKeys()); otherColumns = buildColumnNames(record.getUpdatedColumns()); @@ -91,10 +77,10 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) { keyColumns, otherColumns, new String[] {}, - !dbDialect.isDRDS(), + true, shardColumns); } else { - sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns); + sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns); } } else if (type.isDelete()) { sql = sqlTemplate.getDeleteSql(schemaName, diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java index 561d894870..3498e87e7b 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java @@ -28,6 +28,8 @@ @Data public class DbLoadContext { + private String gtid; + private List lastProcessedRecords; private List prepareRecords; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java new file mode 100644 index 0000000000..dd6559b832 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.sink; + +import org.apache.eventmesh.connector.canal.CanalConnectRecord; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class GtidBatch { + private int totalBatches; + private List> batches; + private int receivedBatchCount; + + public GtidBatch(int totalBatches) { + this.totalBatches = totalBatches; + this.batches = new CopyOnWriteArrayList<>(new List[totalBatches]); + this.receivedBatchCount = 0; + } + + public void addBatch(int batchIndex, List batchRecords) { + batches.set(batchIndex, batchRecords); + receivedBatchCount++; + } + + public List> getBatches() { + return batches; + } + + public boolean isComplete() { + return receivedBatchCount == totalBatches; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java new file mode 100644 index 0000000000..30060aa8f5 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.canal.sink; + +import org.apache.eventmesh.connector.canal.CanalConnectRecord; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class GtidBatchManager { + + private static ConcurrentHashMap gtidBatchMap = new ConcurrentHashMap<>(); + + public static void addBatch(String gtid, int batchIndex, int totalBatches, List batchRecords) { + gtidBatchMap.computeIfAbsent(gtid, k -> new GtidBatch(totalBatches)).addBatch(batchIndex, batchRecords); + } + + public static GtidBatch getGtidBatch(String gtid) { + return gtidBatchMap.get(gtid); + } + + public static boolean isComplete(String gtid) { + GtidBatch batch = gtidBatchMap.get(gtid); + return batch != null && batch.isComplete(); + } + + public static void removeGtidBatch(String gtid) { + gtidBatchMap.remove(gtid); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 8f9df7595b..5f3c0a2bca 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -31,6 +31,8 @@ import org.apache.eventmesh.connector.canal.sink.DbLoadData; import org.apache.eventmesh.connector.canal.sink.DbLoadData.TableLoadData; import org.apache.eventmesh.connector.canal.sink.DbLoadMerger; +import org.apache.eventmesh.connector.canal.sink.GtidBatch; +import org.apache.eventmesh.connector.canal.sink.GtidBatchManager; import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; @@ -38,7 +40,6 @@ import org.apache.eventmesh.openconnect.api.sink.Sink; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; - import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; @@ -52,6 +53,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -86,9 +88,12 @@ public class CanalSinkConnector implements Sink, ConnectorCreateService { private ExecutorService executor; + private ExecutorService gtidSingleExecutor; + private int batchSize = 50; private boolean useBatch = true; + private RdbTableMgr tableMgr; @Override @@ -123,6 +128,7 @@ public void init(ConnectorContext connectorContext) throws Exception { new ArrayBlockingQueue<>(sinkConfig.getPoolSize() * 4), new NamedThreadFactory("canalSink"), new ThreadPoolExecutor.CallerRunsPolicy()); + gtidSingleExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "gtidSingleExecutor")); } @Override @@ -143,6 +149,7 @@ public String name() { @Override public void stop() { executor.shutdown(); + gtidSingleExecutor.shutdown(); } @Override @@ -153,6 +160,8 @@ public void put(List sinkRecords) { canalConnectRecordList = filterRecord(canalConnectRecordList); if (isDdlDatas(canalConnectRecordList)) { doDdl(context, canalConnectRecordList); + } else if (sinkConfig.isGTIDMode()) { + doLoadWithGtid(context, sinkConfig, connectRecord); } else { canalConnectRecordList = DbLoadMerger.merge(canalConnectRecordList); @@ -257,6 +266,57 @@ private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, DbLoadDat batchDatas.clear(); } + private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, ConnectRecord connectRecord) { + int batchIndex = connectRecord.getExtension("batchIndex", Integer.class); + int totalBatches = connectRecord.getExtension("totalBatches", Integer.class); + List canalConnectRecordList = (List) connectRecord.getData(); + String gtid = canalConnectRecordList.get(0).getCurrentGtid(); + GtidBatchManager.addBatch(gtid, batchIndex, totalBatches, canalConnectRecordList); + // check whether the batch is complete + if (GtidBatchManager.isComplete(gtid)) { + GtidBatch batch = GtidBatchManager.getGtidBatch(gtid); + List> totalRows = batch.getBatches(); + List filteredRows = new ArrayList<>(); + for (List canalConnectRecords : totalRows) { + canalConnectRecords = filterRecord(canalConnectRecords); + if (!CollectionUtils.isEmpty(canalConnectRecords)) { + for (final CanalConnectRecord record : canalConnectRecords) { + boolean filter = interceptor.before(sinkConfig, record); + filteredRows.add(record); + } + } + } + context.setGtid(gtid); + Future result = gtidSingleExecutor.submit(new DbLoadWorker(context, filteredRows, dbDialect, false, sinkConfig)); + Exception ex = null; + try { + ex = result.get(); + } catch (Exception e) { + ex = e; + } + Boolean skipException = sinkConfig.getSkipException(); + if (skipException != null && skipException) { + if (ex != null) { + // do skip + log.warn("skip exception for data : {} , caused by {}", + filteredRows, + ExceptionUtils.getFullStackTrace(ex)); + GtidBatchManager.removeGtidBatch(gtid); + } + } else { + if (ex != null) { + log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex)); + gtidSingleExecutor.shutdown(); + System.exit(1); + } else { + GtidBatchManager.removeGtidBatch(gtid); + } + } + } else { + log.info("Batch received, waiting for other batches."); + } + } + private List> split(List records) { List> result = new ArrayList<>(); if (records == null || records.isEmpty()) { @@ -296,12 +356,12 @@ private boolean canBatch(CanalConnectRecord source, CanalConnectRecord target) { } private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List> totalRows, boolean canBatch) { - List> results = new ArrayList>(); + List> results = new ArrayList<>(); for (List rows : totalRows) { if (CollectionUtils.isEmpty(rows)) { continue; } - results.add(executor.submit(new DbLoadWorker(context, rows, dbDialect, canBatch))); + results.add(executor.submit(new DbLoadWorker(context, rows, dbDialect, canBatch, sinkConfig))); } boolean partFailed = false; @@ -330,7 +390,7 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List< Boolean skipException = sinkConfig.getSkipException(); if (skipException != null && skipException) { for (CanalConnectRecord retryRecord : retryRecords) { - DbLoadWorker worker = new DbLoadWorker(context, Arrays.asList(retryRecord), dbDialect, false); + DbLoadWorker worker = new DbLoadWorker(context, Arrays.asList(retryRecord), dbDialect, false, sinkConfig); try { Exception ex = worker.call(); if (ex != null) { @@ -347,7 +407,7 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List< } } } else { - DbLoadWorker worker = new DbLoadWorker(context, retryRecords, dbDialect, false); + DbLoadWorker worker = new DbLoadWorker(context, retryRecords, dbDialect, false, sinkConfig); try { Exception ex = worker.call(); if (ex != null) { @@ -355,7 +415,9 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List< } } catch (Exception ex) { log.error("##load phase two failed!", ex); - throw new RuntimeException(ex); + log.error("sink connector will shutdown by " + ex.getMessage(), ex); + executor.shutdown(); + System.exit(1); } } } @@ -371,16 +433,21 @@ class DbLoadWorker implements Callable { private final DbDialect dbDialect; private final List records; private final boolean canBatch; + + private final CanalSinkConfig sinkConfig; + private final List allFailedRecords = new ArrayList<>(); private final List allProcessedRecords = new ArrayList<>(); private final List processedRecords = new ArrayList<>(); private final List failedRecords = new ArrayList<>(); - public DbLoadWorker(DbLoadContext context, List records, DbDialect dbDialect, boolean canBatch) { + public DbLoadWorker(DbLoadContext context, List records, DbDialect dbDialect, boolean canBatch, + CanalSinkConfig sinkConfig) { this.context = context; this.records = records; this.canBatch = canBatch; this.dbDialect = dbDialect; + this.sinkConfig = sinkConfig; } public Exception call() throws Exception { @@ -394,132 +461,239 @@ public Exception call() throws Exception { private Exception doCall() { RuntimeException error = null; ExecuteResult exeResult = null; - int index = 0; - while (index < records.size()) { - final List splitDatas = new ArrayList<>(); - if (useBatch && canBatch) { - int end = Math.min(index + batchSize, records.size()); - splitDatas.addAll(records.subList(index, end)); - index = end; - } else { - splitDatas.add(records.get(index)); - index = index + 1; - } + if (sinkConfig.isGTIDMode()) { int retryCount = 0; - while (true) { - try { - if (!CollectionUtils.isEmpty(failedRecords)) { - splitDatas.clear(); - splitDatas.addAll(failedRecords); - } else { - failedRecords.addAll(splitDatas); + final List toExecuteRecords = new ArrayList<>(); + try { + if (!CollectionUtils.isEmpty(failedRecords)) { + // if failedRecords not empty, make it retry + toExecuteRecords.addAll(failedRecords); + } else { + toExecuteRecords.addAll(records); + // add to failed record first, maybe get lob or datasource error + failedRecords.addAll(toExecuteRecords); + } + JdbcTemplate template = dbDialect.getJdbcTemplate(); + String sourceGtid = context.getGtid(); + if (StringUtils.isNotEmpty(sourceGtid)) { + String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';"; + template.execute(setGtid); + } else { + log.error("gtid is empty in gtid mode"); + throw new RuntimeException("gtid is empty in gtid mode"); + } + + final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator(); + int affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> { + try { + failedRecords.clear(); + processedRecords.clear(); + int affect1 = 0; + for (CanalConnectRecord record : toExecuteRecords) { + int affects = template.update(record.getSql(), new PreparedStatementSetter() { + public void setValues(PreparedStatement ps) throws SQLException { + doPreparedStatement(ps, dbDialect, lobCreator, record); + } + }); + affect1 = affect1 + affects; + processStat(record, affects, false); + } + return affect1; + } catch (Exception e) { + // rollback + status.setRollbackOnly(); + throw new RuntimeException("Failed to executed", e); + } finally { + lobCreator.close(); } + }); + + // reset gtid + String resetGtid = "SET @@session.gtid_next = AUTOMATIC;"; + dbDialect.getJdbcTemplate().execute(resetGtid); + error = null; + exeResult = ExecuteResult.SUCCESS; + } catch (DeadlockLoserDataAccessException ex) { + error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex)); + exeResult = ExecuteResult.RETRY; + } catch (Throwable ex) { + error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex)); + exeResult = ExecuteResult.ERROR; + } - final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator(); - if (useBatch && canBatch) { - final String sql = splitDatas.get(0).getSql(); - int[] affects = new int[splitDatas.size()]; - affects = (int[]) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> { - try { - failedRecords.clear(); - processedRecords.clear(); - JdbcTemplate template = dbDialect.getJdbcTemplate(); - int[] affects1 = template.batchUpdate(sql, new BatchPreparedStatementSetter() { - - public void setValues(PreparedStatement ps, int idx) throws SQLException { - doPreparedStatement(ps, dbDialect, lobCreator, splitDatas.get(idx)); - } - - public int getBatchSize() { - return splitDatas.size(); - } - }); - return affects1; - } finally { - lobCreator.close(); - } - }); + if (ExecuteResult.SUCCESS == exeResult) { + allFailedRecords.addAll(failedRecords); + allProcessedRecords.addAll(processedRecords); + failedRecords.clear(); + processedRecords.clear(); + } else if (ExecuteResult.RETRY == exeResult) { + retryCount = retryCount + 1; + processedRecords.clear(); + failedRecords.clear(); + failedRecords.addAll(toExecuteRecords); + int retry = 3; + if (retryCount >= retry) { + processFailedDatas(toExecuteRecords.size()); + throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error); + } else { + try { + int retryWait = 3000; + int wait = retryCount * retryWait; + wait = Math.max(wait, retryWait); + Thread.sleep(wait); + } catch (InterruptedException ex) { + Thread.interrupted(); + processFailedDatas(toExecuteRecords.size()); + throw new RuntimeException(ex); + } + } + } else { + processedRecords.clear(); + failedRecords.clear(); + failedRecords.addAll(toExecuteRecords); + processFailedDatas(toExecuteRecords.size()); + throw error; + } + } else { + int index = 0; + while (index < records.size()) { + final List toExecuteRecords = new ArrayList<>(); + if (useBatch && canBatch) { + int end = Math.min(index + batchSize, records.size()); + toExecuteRecords.addAll(records.subList(index, end)); + index = end; + } else { + toExecuteRecords.add(records.get(index)); + index = index + 1; + } - for (int i = 0; i < splitDatas.size(); i++) { - assert affects != null; - processStat(splitDatas.get(i), affects[i], true); + int retryCount = 0; + while (true) { + try { + if (!CollectionUtils.isEmpty(failedRecords)) { + toExecuteRecords.clear(); + toExecuteRecords.addAll(failedRecords); + } else { + failedRecords.addAll(toExecuteRecords); } - } else { - final CanalConnectRecord record = splitDatas.get(0); - int affect = 0; - affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> { - try { - failedRecords.clear(); - processedRecords.clear(); - JdbcTemplate template = dbDialect.getJdbcTemplate(); - int affect1 = template.update(record.getSql(), new PreparedStatementSetter() { - - public void setValues(PreparedStatement ps) throws SQLException { - doPreparedStatement(ps, dbDialect, lobCreator, record); - } - }); - return affect1; - } finally { - lobCreator.close(); + + final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator(); + if (useBatch && canBatch) { + JdbcTemplate template = dbDialect.getJdbcTemplate(); + final String sql = toExecuteRecords.get(0).getSql(); + + int[] affects = new int[toExecuteRecords.size()]; + + affects = (int[]) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> { + try { + failedRecords.clear(); + processedRecords.clear(); + int[] affects1 = template.batchUpdate(sql, new BatchPreparedStatementSetter() { + + public void setValues(PreparedStatement ps, int idx) throws SQLException { + doPreparedStatement(ps, dbDialect, lobCreator, toExecuteRecords.get(idx)); + } + + public int getBatchSize() { + return toExecuteRecords.size(); + } + }); + return affects1; + } catch (Exception e) { + // rollback + status.setRollbackOnly(); + throw new RuntimeException("Failed to execute batch with GTID", e); + } finally { + lobCreator.close(); + } + }); + + for (int i = 0; i < toExecuteRecords.size(); i++) { + assert affects != null; + processStat(toExecuteRecords.get(i), affects[i], true); } - }); - processStat(record, affect, false); - } + } else { + final CanalConnectRecord record = toExecuteRecords.get(0); + JdbcTemplate template = dbDialect.getJdbcTemplate(); + int affect = 0; + affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> { + try { + failedRecords.clear(); + processedRecords.clear(); + int affect1 = template.update(record.getSql(), new PreparedStatementSetter() { + + public void setValues(PreparedStatement ps) throws SQLException { + doPreparedStatement(ps, dbDialect, lobCreator, record); + } + }); + return affect1; + } catch (Exception e) { + // rollback + status.setRollbackOnly(); + throw new RuntimeException("Failed to executed", e); + } finally { + lobCreator.close(); + } + }); + processStat(record, affect, false); + } - error = null; - exeResult = ExecuteResult.SUCCESS; - } catch (DeadlockLoserDataAccessException ex) { - error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex)); - exeResult = ExecuteResult.RETRY; - } catch (Throwable ex) { - error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex)); - exeResult = ExecuteResult.ERROR; - } + error = null; + exeResult = ExecuteResult.SUCCESS; + } catch (DeadlockLoserDataAccessException ex) { + error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex)); + exeResult = ExecuteResult.RETRY; + } catch (Throwable ex) { + error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex)); + exeResult = ExecuteResult.ERROR; + } - if (ExecuteResult.SUCCESS == exeResult) { - allFailedRecords.addAll(failedRecords); - allProcessedRecords.addAll(processedRecords); - failedRecords.clear(); - processedRecords.clear(); - break; // do next eventData - } else if (ExecuteResult.RETRY == exeResult) { - retryCount = retryCount + 1; - processedRecords.clear(); - failedRecords.clear(); - failedRecords.addAll(splitDatas); - int retry = 3; - if (retryCount >= retry) { - processFailedDatas(index); - throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error); - } else { - try { - int retryWait = 3000; - int wait = retryCount * retryWait; - wait = Math.max(wait, retryWait); - Thread.sleep(wait); - } catch (InterruptedException ex) { - Thread.interrupted(); + if (ExecuteResult.SUCCESS == exeResult) { + allFailedRecords.addAll(failedRecords); + allProcessedRecords.addAll(processedRecords); + failedRecords.clear(); + processedRecords.clear(); + break; // do next eventData + } else if (ExecuteResult.RETRY == exeResult) { + retryCount = retryCount + 1; + processedRecords.clear(); + failedRecords.clear(); + failedRecords.addAll(toExecuteRecords); + int retry = 3; + if (retryCount >= retry) { processFailedDatas(index); - throw new RuntimeException(ex); + throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error); + } else { + try { + int retryWait = 3000; + int wait = retryCount * retryWait; + wait = Math.max(wait, retryWait); + Thread.sleep(wait); + } catch (InterruptedException ex) { + Thread.interrupted(); + processFailedDatas(index); + throw new RuntimeException(ex); + } } + } else { + processedRecords.clear(); + failedRecords.clear(); + failedRecords.addAll(toExecuteRecords); + processFailedDatas(index); + throw error; } - } else { - processedRecords.clear(); - failedRecords.clear(); - failedRecords.addAll(splitDatas); - processFailedDatas(index); - throw error; } } } + context.getFailedRecords().addAll(allFailedRecords); context.getProcessedRecords().addAll(allProcessedRecords); return null; } private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator, - CanalConnectRecord record) throws SQLException { + CanalConnectRecord record) throws SQLException { EventType type = record.getEventType(); List columns = new ArrayList(); if (type.isInsert()) { @@ -530,11 +704,7 @@ private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobC } else if (type.isUpdate()) { boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys()); columns.addAll(record.getUpdatedColumns()); - if (existOldKeys && dbDialect.isDRDS()) { - columns.addAll(record.getUpdatedKeys()); - } else { - columns.addAll(record.getKeys()); - } + columns.addAll(record.getKeys()); if (existOldKeys) { columns.addAll(record.getOldKeys()); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 8ef60ff04d..708d5d120c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -60,9 +60,17 @@ public static Map> parse(CanalSourceConfig source switch (entry.getEntryType()) { case ROWDATA: RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); - needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0)); - if (needSync) { - transactionDataBuffer.add(entry); + if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) { + String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); + if (currentGtid.contains(sourceConfig.getServerUUID())) { + transactionDataBuffer.add(entry); + } + } else { + // if not gtid mode, need check weather the entry is loopback by specified column value + needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0)); + if (needSync) { + transactionDataBuffer.add(entry); + } } break; case TRANSACTIONEND: @@ -169,6 +177,14 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi canalConnectRecord.setExecuteTime(entry.getHeader().getExecuteTime()); canalConnectRecord.setJournalName(entry.getHeader().getLogfileName()); canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset()); + // if enabled gtid mode, gtid not null + if (canalSourceConfig.isGTIDMode()) { + String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); + String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID()); + canalConnectRecord.setGtid(gtidRange); + canalConnectRecord.setCurrentGtid(currentGtid); + } + EventType eventType = canalConnectRecord.getEventType(); List beforeColumns = rowData.getBeforeColumnsList(); @@ -248,6 +264,17 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi return canalConnectRecord; } + public static String replaceGtidRange(String gtid, String currentGtid, String serverUUID) { + String[] gtidRangeArray = gtid.split(","); + for (int i = 0; i < gtidRangeArray.length; i++) { + String gtidRange = gtidRangeArray[i]; + if (gtidRange.startsWith(serverUUID)) { + gtidRangeArray[i] = gtidRange.replaceFirst("\\d+$", currentGtid.split(":")[1]); + } + } + return String.join(",", gtidRangeArray); + } + private static void checkUpdateKeyColumns(Map oldKeyColumns, Map keyColumns) { if (oldKeyColumns.isEmpty()) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 4b96177319..6cd575cb77 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -150,6 +150,8 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup return instance; } }); + DatabaseConnection.sourceConfig = sourceConfig.getSourceConnectorConfig(); + DatabaseConnection.initSourceConnection(); tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); } @@ -180,6 +182,9 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName()); parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); + // set if enabled gtid mode + parameter.setGtidEnable(sourceConfig.isGTIDMode()); + // check positions // example: Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}", // "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}") @@ -193,6 +198,14 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { recordPositionMap.put("journalName", canalRecordPartition.getJournalName()); recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp()); recordPositionMap.put("position", canalRecordOffset.getOffset()); + String gtidRange = canalRecordOffset.getGtid(); + if (gtidRange != null) { + if (canalRecordOffset.getCurrentGtid() != null) { + gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(), + sourceConfig.getServerUUID()); + } + recordPositionMap.put("gtid", gtidRange); + } positions.add(JsonUtils.toJSONString(recordPositionMap)); }); parameter.setPositions(positions); @@ -237,7 +250,13 @@ public void start() throws Exception { @Override public void commit(ConnectRecord record) { long batchId = Long.parseLong(record.getExtension("messageId")); - canalServer.ack(clientIdentity, batchId); + int batchIndex = record.getExtension("batchIndex", Integer.class); + int totalBatches = record.getExtension("totalBatches", Integer.class); + if (batchIndex == totalBatches - 1) { + log.debug("ack records batchIndex:{}, totalBatches:{}, batchId:{}", + batchIndex, totalBatches, batchId); + canalServer.ack(clientIdentity, batchId); + } } @Override @@ -301,21 +320,37 @@ public List poll() { if (!connectorRecordMap.isEmpty()) { Set>> entrySet = connectorRecordMap.entrySet(); for (Map.Entry> entry : entrySet) { - // Xid offset - Long binLogOffset = entry.getKey(); List connectRecordList = entry.getValue(); CanalConnectRecord lastRecord = entry.getValue().get(connectRecordList.size() - 1); CanalRecordPartition canalRecordPartition = new CanalRecordPartition(); + canalRecordPartition.setServerUUID(sourceConfig.getServerUUID()); canalRecordPartition.setJournalName(lastRecord.getJournalName()); canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime()); - + // Xid offset with gtid + Long binLogOffset = entry.getKey(); CanalRecordOffset canalRecordOffset = new CanalRecordOffset(); canalRecordOffset.setOffset(binLogOffset); + if (StringUtils.isNotEmpty(lastRecord.getGtid()) && StringUtils.isNotEmpty(lastRecord.getCurrentGtid())) { + canalRecordOffset.setGtid(lastRecord.getGtid()); + canalRecordOffset.setCurrentGtid(lastRecord.getCurrentGtid()); + } - ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis()); - connectRecord.addExtension("messageId", String.valueOf(message.getId())); - connectRecord.setData(connectRecordList); - result.add(connectRecord); + // split record list + List> splitLists = new ArrayList<>(); + for (int i = 0; i < connectRecordList.size(); i += sourceConfig.getBatchSize()) { + int end = Math.min(i + sourceConfig.getBatchSize(), connectRecordList.size()); + List subList = connectRecordList.subList(i, end); + splitLists.add(subList); + } + + for (int i = 0; i < splitLists.size(); i++) { + ConnectRecord connectRecord = new ConnectRecord(canalRecordPartition, canalRecordOffset, System.currentTimeMillis()); + connectRecord.addExtension("messageId", String.valueOf(message.getId())); + connectRecord.addExtension("batchIndex", i); + connectRecord.addExtension("totalBatches", splitLists.size()); + connectRecord.setData(splitLists.get(i)); + result.add(connectRecord); + } } } else { // for the message has been filtered need ack message