Skip to content

Commit

Permalink
[ISSUE #5040] Support gtid mode for sync data with mysql (#5041)
Browse files Browse the repository at this point in the history
* [ISSUE #5040] Support gtid mode for sync data with mysql

* fix conflicts with master

* fix checkstyle error
  • Loading branch information
xwm1992 authored Jul 24, 2024
1 parent 3b770f8 commit f88a194
Show file tree
Hide file tree
Showing 19 changed files with 511 additions and 170 deletions.
3 changes: 3 additions & 0 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` int unsigned NOT NULL,
`serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`position` bigint DEFAULT NULL,
`gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`timestamp` bigint DEFAULT NULL,
`journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down
28 changes: 16 additions & 12 deletions eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@
limitations under the License.
-->
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper">

<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="jobID" column="jobID" jdbcType="INTEGER"/>
<result property="address" column="address" jdbcType="VARCHAR"/>
<result property="position" column="position" jdbcType="BIGINT"/>
<result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
<result property="journalName" column="journalName" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="jobID" column="jobID" jdbcType="INTEGER"/>
<result property="serverUUID" column="serverUUID" jdbcType="VARCHAR"/>
<result property="address" column="address" jdbcType="VARCHAR"/>
<result property="position" column="position" jdbcType="BIGINT"/>
<result property="gtid" column="gtid" jdbcType="VARCHAR"/>
<result property="currentGtid" column="currentGtid" jdbcType="VARCHAR"/>
<result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
<result property="journalName" column="journalName" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
</resultMap>

<sql id="Base_Column_List">
id,jobID,address,
position,timestamp,journalName,
id
,jobID,serverUUID,address,
position,gtid,currentGtid,timestamp,journalName,
createTime,updateTime
</sql>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ public class EventMeshMysqlPosition implements Serializable {

private Integer jobID;

private String serverUUID;

private String address;

private Long position;

private String gtid;

private String currentGtid;

private Long timestamp;

private String journalName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset();
if (offset != null) {
position.setPosition(offset.getOffset());
position.setGtid(offset.getGtid());
position.setCurrentGtid(offset.getCurrentGtid());
}
CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition();
if (partition != null) {
position.setServerUUID(partition.getServerUUID());
position.setTimestamp(partition.getTimeStamp());
position.setJournalName(partition.getJournalName());
}
Expand Down Expand Up @@ -148,13 +151,16 @@ public List<RecordPosition> handler(FetchPositionRequest request, Metadata metad
request.getJobID()));
List<RecordPosition> recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
partition.setServerUUID(position.getServerUUID());
RecordPosition recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
offset.setGtid(position.getGtid());
offset.setCurrentGtid(position.getCurrentGtid());
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class CanalSinkConfig extends SinkConfig {
// sync mode: field/row
private SyncMode syncMode;

private boolean isGTIDMode = true;

// skip sink process exception
private Boolean skipException = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class CanalSourceConfig extends SourceConfig {

private Short clientId;

private String serverUUID;

private boolean isGTIDMode = true;

private Integer batchSize = 10000;

private Long batchTimeout = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public class CanalRecordOffset extends RecordOffset {

private Long offset;

// mysql instance gtid range
private String gtid;

private String currentGtid;

@Override
public Class<? extends RecordOffset> getRecordOffsetClass() {
return CanalRecordOffset.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
@ToString
public class CanalRecordPartition extends RecordPartition {

private String serverUUID;

private String journalName;

private Long timeStamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@
public class CanalConnectRecord {

private String schemaName;

private String tableName;

// mysql instance gtid range
private String gtid;

private String currentGtid;

/**
* The business type of the changed data (I/U/D/C/A/E), consistent with the EventType defined in EntryProtocol in canal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public SqlTemplate getSqlTemplate() {
return sqlTemplate;
}

public boolean isDRDS() {
return false;
}

public String getShardColumns(String schema, String table) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public interface DbDialect {

public boolean isSupportMergeSql();

public boolean isDRDS();

public LobHandler getLobHandler();

public JdbcTemplate getJdbcTemplate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public String getDefaultSchema() {
return null;
}

public boolean isDRDS() {
return false;
}

public String getDefaultCatalog() {
return jdbcTemplate.queryForObject("select database()", String.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,21 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
String shardColumns = null;

if (type.isInsert()) {
if (CollectionUtils.isEmpty(record.getColumns())
&& (dbDialect.isDRDS())) {
// sql
sql = sqlTemplate.getInsertSql(schemaName,
record.getTableName(),
buildColumnNames(record.getKeys()),
buildColumnNames(record.getColumns()));
} else {
sql = sqlTemplate.getMergeSql(schemaName,
sql = sqlTemplate.getMergeSql(schemaName,
record.getTableName(),
buildColumnNames(record.getKeys()),
buildColumnNames(record.getColumns()),
new String[] {},
!dbDialect.isDRDS(),
true,
shardColumns);
}
} else if (type.isUpdate()) {

boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
boolean rowMode = sinkConfig.getSyncMode().isRow();
String[] keyColumns = null;
String[] otherColumns = null;
if (existOldKeys) {
keyColumns = buildColumnNames(record.getOldKeys());
if (dbDialect.isDRDS()) {
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys());
} else {
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
}
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
} else {
keyColumns = buildColumnNames(record.getKeys());
otherColumns = buildColumnNames(record.getUpdatedColumns());
Expand All @@ -91,10 +77,10 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
keyColumns,
otherColumns,
new String[] {},
!dbDialect.isDRDS(),
true,
shardColumns);
} else {
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns);
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns);
}
} else if (type.isDelete()) {
sql = sqlTemplate.getDeleteSql(schemaName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
@Data
public class DbLoadContext {

private String gtid;

private List<CanalConnectRecord> lastProcessedRecords;

private List<CanalConnectRecord> prepareRecords;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.canal.sink;

import org.apache.eventmesh.connector.canal.CanalConnectRecord;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class GtidBatch {
private int totalBatches;
private List<List<CanalConnectRecord>> batches;
private int receivedBatchCount;

public GtidBatch(int totalBatches) {
this.totalBatches = totalBatches;
this.batches = new CopyOnWriteArrayList<>(new List[totalBatches]);
this.receivedBatchCount = 0;
}

public void addBatch(int batchIndex, List<CanalConnectRecord> batchRecords) {
batches.set(batchIndex, batchRecords);
receivedBatchCount++;
}

public List<List<CanalConnectRecord>> getBatches() {
return batches;
}

public boolean isComplete() {
return receivedBatchCount == totalBatches;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.canal.sink;

import org.apache.eventmesh.connector.canal.CanalConnectRecord;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class GtidBatchManager {

private static ConcurrentHashMap<String, GtidBatch> gtidBatchMap = new ConcurrentHashMap<>();

public static void addBatch(String gtid, int batchIndex, int totalBatches, List<CanalConnectRecord> batchRecords) {
gtidBatchMap.computeIfAbsent(gtid, k -> new GtidBatch(totalBatches)).addBatch(batchIndex, batchRecords);
}

public static GtidBatch getGtidBatch(String gtid) {
return gtidBatchMap.get(gtid);
}

public static boolean isComplete(String gtid) {
GtidBatch batch = gtidBatchMap.get(gtid);
return batch != null && batch.isComplete();
}

public static void removeGtidBatch(String gtid) {
gtidBatchMap.remove(gtid);
}
}
Loading

0 comments on commit f88a194

Please sign in to comment.