Skip to content

Commit

Permalink
update canal connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 21, 2024
1 parent d4d9f52 commit 832ddae
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ public class SinkConnectorConfig {

private String url;

private String dbAddress;

private int dbPort;

private String userName;

private String passWord;

private String schemaName;

private String tableName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class SourceConnectorConfig {

private String connectorName;

private String url;

private String dbAddress;

private int dbPort;
Expand All @@ -40,4 +42,8 @@ public class SourceConnectorConfig {

private String passWord;

private String schemaName;

private String tableName;

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
@Data
public class CanalConnectRecord {

/**
* 内部维护的一套tableId,与manager中得到的table Id对应
*/
private long tableId = -1;

private String tableName;

private String schemaName;
Expand Down Expand Up @@ -164,7 +159,6 @@ private List<EventColumn> cloneColumn(List<EventColumn> columns) {

public CanalConnectRecord clone() {
CanalConnectRecord record = new CanalConnectRecord();
record.setTableId(tableId);
record.setTableName(tableName);
record.setSchemaName(schemaName);
record.setDdlSchemaName(ddlSchemaName);
Expand Down Expand Up @@ -195,7 +189,6 @@ public int hashCode() {
result = prime * result + ((oldKeys == null) ? 0 : oldKeys.hashCode());
result = prime * result + (int) (pairId ^ (pairId >>> 32));
result = prime * result + ((schemaName == null) ? 0 : schemaName.hashCode());
result = prime * result + (int) (tableId ^ (tableId >>> 32));
result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
return result;
}
Expand Down Expand Up @@ -249,9 +242,6 @@ public boolean equals(Object obj) {
} else if (!schemaName.equals(other.schemaName)) {
return false;
}
if (tableId != other.tableId) {
return false;
}
if (tableName == null) {
if (other.tableName != null) {
return false;
Expand All @@ -265,8 +255,7 @@ public boolean equals(Object obj) {
@Override
public String toString() {
return "CanalConnectRecord{" +
"tableId=" + tableId +
", tableName='" + tableName + '\'' +
"tableName='" + tableName + '\'' +
", schemaName='" + schemaName + '\'' +
", eventType=" + eventType +
", executeTime=" + executeTime +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.eventmesh.connector.canal.sink.connector;

import javafx.fxml.LoadException;

import org.apache.eventmesh.common.config.connector.Config;

import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
Expand All @@ -44,14 +47,14 @@ public Class<? extends Config> configClass() {
@Override
public void init(Config config) throws Exception {
// init config for canal source connector

this.sinkConfig = (CanalSinkConfig)config;
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
// init config for canal source connector
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;

this.sinkConfig = (CanalSinkConfig)sinkConnectorContext.getSinkConfig();
}

@Override
Expand All @@ -76,11 +79,36 @@ public void stop() {

@Override
public void put(List<ConnectRecord> sinkRecords) {
for (ConnectRecord connectRecord : sinkRecords) {
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>)connectRecord.getData();
for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) {
if (sinkConfig.getSinkConnectorConfig().getSchemaName().equals(canalConnectRecord.getSchemaName()) &&
sinkConfig.getSinkConnectorConfig().getTableName().equals(canalConnectRecord.getTableName())) {


}
}
}
}

@Override
public Sink create() {
return new CanalSinkConnector();
}

/**
* 分析整个数据,将datas划分为多个批次. ddl sql前的DML并发执行,然后串行执行ddl后,再并发执行DML
*
* @return
*/
private boolean isDdlDatas(List<CanalConnectRecord> canalConnectRecordList) {
boolean result = false;
for (CanalConnectRecord canalConnectRecord : canalConnectRecordList) {
result |= canalConnectRecord.getEventType().isDdl();
if (result && !canalConnectRecord.getEventType().isDdl()) {
throw new RuntimeException("ddl/dml can't be in one batch, it's may be a bug , pls submit issues.");
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@
*/
@Slf4j
public class EntryParser {
// private DbDialectFactory dbDialectFactory;

private static final String RETL_CLIENT_FLAG = "_SYNC";

private static final String compatibleMarkTable = "retl_client";

private static final String compatibleMarkInfoColumn = "client_info";

private static final String compatibleMarkIdentifierColumn = "client_identifier";

/**
* 将对应canal送出来的Entry对象解析为ConnectRecord
Expand Down Expand Up @@ -131,6 +122,12 @@ public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, List<Entry
}

private List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (!schemaName.equals(sourceConfig.getSourceConnectorConfig().getSchemaName()) || !tableName.equals(sourceConfig.getSourceConnectorConfig().getTableName())) {
return null;
}

RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
Expand All @@ -142,8 +139,6 @@ private List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Ent
return null;
}

String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
EventType eventType = EventType.valueOf(rowChange.getEventType().name());

// 处理下DDL操作
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,10 @@
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

Expand Down Expand Up @@ -105,7 +101,7 @@ public void init(ConnectorContext connectorContext) throws Exception {
this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig();
this.offsetStorageReader = sourceConnectorContext.getOffsetStorageReader();
// init source database connection
DatabaseConnection.sourceConfig = sourceConfig;
// DatabaseConnection.sourceConfig = sourceConfig;
// DatabaseConnection.initSourceConnection();

canalServer = CanalServerWithEmbedded.instance();
Expand Down

0 comments on commit 832ddae

Please sign in to comment.