Skip to content

Commit

Permalink
update canal sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 22, 2024
1 parent 9f8c71d commit e22a385
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 169 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;

import org.apache.eventmesh.common.config.connector.SinkConfig;
import org.apache.eventmesh.common.remote.job.SyncMode;

import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -9,6 +10,8 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {

private SyncMode syncMode; // 同步模式:字段/整条记录

public SinkConnectorConfig sinkConnectorConfig;

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String nam
int majorVersion, int minorVersion) {
super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion);
sqlTemplate = new MysqlSqlTemplate();

if (StringUtils.contains(databaseVersion, "-TDDL-")) {
isDRDS = true;
// initShardColumns();
}
}

public boolean isCharSpacePadded() {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.apache.eventmesh.connector.canal.interceptor;

import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
import org.apache.eventmesh.connector.canal.MysqlSqlTemplate;
import org.apache.eventmesh.connector.canal.SqlTemplate;
import org.apache.eventmesh.connector.canal.dialect.DbDialect;
import org.apache.eventmesh.connector.canal.dialect.MysqlDialect;
Expand All @@ -31,45 +33,37 @@
* 计算下最新的sql语句
*
*/
public class SqlBuilderLoadInterceptor extends AbstractLoadInterceptor<DbLoadContext, CanalConnectRecord> {
public class SqlBuilderLoadInterceptor {

private DbDialectFactory dbDialectFactory;
private DbDialect dbDialect;

public boolean before(DbLoadContext context, CanalConnectRecord currentData) {
public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
// 初步构建sql
// DbDialect dbDialect = dbDialectFactory.getDbDialect(context.getIdentity().getPipelineId(),
// (DbMediaSource) context.getDataMediaSource());
DbDialect dbDialect = new MysqlDialect();
SqlTemplate sqlTemplate = dbDialect.getSqlTemplate();
EventType type = currentData.getEventType();
EventType type = record.getEventType();
String sql = null;

String schemaName = (currentData.isWithoutSchema() ? null : currentData.getSchemaName());
String schemaName = (record.isWithoutSchema() ? null : record.getSchemaName());

/**
* 针对DRDS数据库
*/
String shardColumns = null;
// if(dbDialect.isDRDS()){
// // 获取拆分键
// shardColumns = dbDialect.getShardColumns(schemaName, currentData.getTableName());
//
// }

// 注意insert/update语句对应的字段数序都是将主键排在后面
if (type.isInsert()) {
if (CollectionUtils.isEmpty(currentData.getColumns())
if (CollectionUtils.isEmpty(record.getColumns())
&& (dbDialect.isDRDS())) { // 如果表为全主键,直接进行insert
// sql
sql = sqlTemplate.getInsertSql(schemaName,
currentData.getTableName(),
buildColumnNames(currentData.getKeys()),
buildColumnNames(currentData.getColumns()));
record.getTableName(),
buildColumnNames(record.getKeys()),
buildColumnNames(record.getColumns()));
} else {
sql = sqlTemplate.getMergeSql(schemaName,
currentData.getTableName(),
buildColumnNames(currentData.getKeys()),
buildColumnNames(currentData.getColumns()),
record.getTableName(),
buildColumnNames(record.getKeys()),
buildColumnNames(record.getColumns()),
new String[] {},
!dbDialect.isDRDS(),
shardColumns);
Expand All @@ -87,47 +81,47 @@ public boolean before(DbLoadContext context, CanalConnectRecord currentData) {
// }
// }

boolean existOldKeys = !CollectionUtils.isEmpty(currentData.getOldKeys());
boolean rowMode = context.getPipeline().getParameters().getSyncMode().isRow();
boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
boolean rowMode = sinkConfig.getSyncMode().isRow();
String[] keyColumns = null;
String[] otherColumns = null;
if (existOldKeys) {
// 需要考虑主键变更的场景
// 构造sql如下:update table xxx set pk = newPK where pk = oldPk
keyColumns = buildColumnNames(currentData.getOldKeys());
keyColumns = buildColumnNames(record.getOldKeys());
// 这里需要精确获取变更的主键,因为目标为DRDS时主键会包含拆分键,正常的原主键变更只更新对应的单主键列即可
if (dbDialect.isDRDS()) {
otherColumns = buildColumnNames(currentData.getUpdatedColumns(), currentData.getUpdatedKeys());
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys());
} else {
otherColumns = buildColumnNames(currentData.getUpdatedColumns(), currentData.getKeys());
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
}
} else {
keyColumns = buildColumnNames(currentData.getKeys());
otherColumns = buildColumnNames(currentData.getUpdatedColumns());
keyColumns = buildColumnNames(record.getKeys());
otherColumns = buildColumnNames(record.getUpdatedColumns());
}

if (rowMode && !existOldKeys) {// 如果是行记录,并且不存在主键变更,考虑merge sql
sql = sqlTemplate.getMergeSql(schemaName,
currentData.getTableName(),
record.getTableName(),
keyColumns,
otherColumns,
new String[] {},
!dbDialect.isDRDS(),
shardColumns);
} else {// 否则进行update sql
sql = sqlTemplate.getUpdateSql(schemaName, currentData.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns);
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns);
}
} else if (type.isDelete()) {
sql = sqlTemplate.getDeleteSql(schemaName,
currentData.getTableName(),
buildColumnNames(currentData.getKeys()));
record.getTableName(),
buildColumnNames(record.getKeys()));
}

// 处理下hint sql
if (currentData.getHint() != null) {
currentData.setSql(currentData.getHint() + sql);
if (record.getHint() != null) {
record.setSql(record.getHint() + sql);
} else {
currentData.setSql(sql);
record.setSql(sql);
}
return false;
}
Expand Down Expand Up @@ -156,10 +150,11 @@ private String[] buildColumnNames(List<EventColumn> columns1, List<EventColumn>
return result;
}

// =============== setter / getter =============

public void setDbDialectFactory(DbDialectFactory dbDialectFactory) {
this.dbDialectFactory = dbDialectFactory;
public DbDialect getDbDialect() {
return dbDialect;
}

public void setDbDialect(DbDialect dbDialect) {
this.dbDialect = dbDialect;
}
}
Loading

0 comments on commit e22a385

Please sign in to comment.