From 023938f31362993b555a43b99698a2ede6abe053 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Fri, 10 May 2024 18:02:47 +0800 Subject: [PATCH] update canal source connector --- .../connector/rdb/canal/CanalSinkConfig.java | 1 - .../rdb/canal/CanalSourceConfig.java | 9 ++- .../rdb/canal/SinkConnectorConfig.java | 7 ++ .../rdb/canal/SourceConnectorConfig.java | 7 ++ .../eventmesh/common/utils/IPUtils.java | 9 ++- .../eventmesh-connector-canal/build.gradle | 1 + .../connector/canal/DatabaseConnection.java | 68 +++++++++++++++++++ .../sink/connector/CanalSinkConnector.java | 4 +- .../connector/CanalSourceConnector.java | 38 ++++++++++- 9 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index 464e659723..8d95ef12e1 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -1,7 +1,6 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; import org.apache.eventmesh.common.config.connector.SinkConfig; -import org.apache.eventmesh.common.config.connector.rdb.jdbc.SinkConnectorConfig; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index 24614c9c68..0a29656bbe 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -1,7 +1,6 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; import org.apache.eventmesh.common.config.connector.SourceConfig; -import org.apache.eventmesh.common.config.connector.rdb.jdbc.SourceConnectorConfig; import lombok.Data; import lombok.EqualsAndHashCode; @@ -10,5 +9,13 @@ @EqualsAndHashCode(callSuper = true) public class CanalSourceConfig extends SourceConfig { + private String destination; + + private Short clientId; + + private Integer batchSize; + + private Long batchTimeout; + private SourceConnectorConfig sourceConnectorConfig; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java index ac44c8c9fb..eb6435e537 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SinkConnectorConfig.java @@ -27,4 +27,11 @@ @Data public class SinkConnectorConfig { + private String connectorName; + + private String url; + + private String userName; + + private String passWord; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java index 8347d944c6..af174a97ce 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java @@ -31,4 +31,11 @@ public class SourceConnectorConfig { private String connectorName; + + private String url; + + private String userName; + + private String passWord; + } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/IPUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/IPUtils.java index 998735181e..cd3e5c7f6a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/IPUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/IPUtils.java @@ -46,7 +46,9 @@ @Slf4j public class IPUtils { - public static String getLocalAddress() { + public static String localAddress = init(); + + private static String init() { // if the progress works under docker environment // return the host ip about this docker located from environment value String dockerHostIp = System.getenv("docker_host_ip"); @@ -114,6 +116,11 @@ public static String getLocalAddress() { return null; } + public static String getLocalAddress() { + return localAddress; + + } + public static boolean isValidIPV4Address(String ip) { // Regex for digit from 0 to 255. diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle b/eventmesh-connectors/eventmesh-connector-canal/build.gradle index 9e1cd63e93..640cb5ce42 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle @@ -25,6 +25,7 @@ dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") implementation project(":eventmesh-common") implementation canal + implementation "com.alibaba:druid:1.2.6" compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation "org.mockito:mockito-core" diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java new file mode 100644 index 0000000000..87aa71273c --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java @@ -0,0 +1,68 @@ +package org.apache.eventmesh.connector.canal; + + +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; + +import com.alibaba.druid.pool.DruidDataSource; + +import java.sql.Connection; +import java.sql.SQLException; + +public class DatabaseConnection { + + private static DruidDataSource sourceDataSource; + + private static DruidDataSource sinkDataSource; + + public static CanalSourceConfig sourceConfig; + + public static CanalSinkConfig sinkConfig; + + public static void initSourceConnection() { + sourceDataSource = new DruidDataSource(); + sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl()); + sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName()); + sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord()); + sourceDataSource.setInitialSize(5); + sourceDataSource.setMinIdle(5); + sourceDataSource.setMaxActive(20); + sourceDataSource.setMaxWait(60000); + sourceDataSource.setTimeBetweenEvictionRunsMillis(60000); + sourceDataSource.setMinEvictableIdleTimeMillis(300000); + sourceDataSource.setValidationQuery("SELECT 1"); + sourceDataSource.setTestWhileIdle(true); + sourceDataSource.setTestOnBorrow(false); + sourceDataSource.setTestOnReturn(false); + sourceDataSource.setPoolPreparedStatements(true); + sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + } + + public static void initSinkConnection() { + sinkDataSource = new DruidDataSource(); + sinkDataSource.setUrl(sinkConfig.getSinkConnectorConfig().getUrl()); + sinkDataSource.setUsername(sinkConfig.getSinkConnectorConfig().getUserName()); + sinkDataSource.setPassword(sinkConfig.getSinkConnectorConfig().getPassWord()); + sinkDataSource.setInitialSize(5); + sinkDataSource.setMinIdle(5); + sinkDataSource.setMaxActive(20); + sinkDataSource.setMaxWait(60000); + sinkDataSource.setTimeBetweenEvictionRunsMillis(60000); + sinkDataSource.setMinEvictableIdleTimeMillis(300000); + sinkDataSource.setValidationQuery("SELECT 1"); + sinkDataSource.setTestWhileIdle(true); + sinkDataSource.setTestOnBorrow(false); + sinkDataSource.setTestOnReturn(false); + sinkDataSource.setPoolPreparedStatements(true); + sinkDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + } + + + public static Connection getSourceConnection() throws SQLException { + return sourceDataSource.getConnection(); + } + + public static Connection getSinkConnection() throws SQLException { + return sinkDataSource.getConnection(); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index c159301faa..468e50acc9 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -34,6 +34,8 @@ @Slf4j public class CanalSinkConnector implements Sink, ConnectorCreateService { + private CanalSinkConfig sinkConfig; + @Override public Class configClass() { return CanalSinkConfig.class; @@ -64,7 +66,7 @@ public void commit(ConnectRecord record) { @Override public String name() { - return ""; + return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } @Override diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 68165a1b9a..a076899671 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; +import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; @@ -28,6 +29,11 @@ import java.util.List; +import com.alibaba.otter.canal.instance.core.CanalInstance; +import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; +import com.alibaba.otter.canal.protocol.ClientIdentity; +import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; + import lombok.extern.slf4j.Slf4j; @Slf4j @@ -35,6 +41,14 @@ public class CanalSourceConnector implements Source, ConnectorCreateService configClass() { return CanalSourceConfig.class; @@ -44,18 +58,38 @@ public Class configClass() { public void init(Config config) throws Exception { // init config for canal source connector this.sourceConfig = (CanalSourceConfig) config; - } @Override public void init(ConnectorContext connectorContext) throws Exception { SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig(); + // init source database connection + DatabaseConnection.sourceConfig = sourceConfig; + DatabaseConnection.initSourceConnection(); + + canalServer = CanalServerWithEmbedded.instance(); + canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { + @Override + public CanalInstance generate(String destination) { + return null; + } + }); } @Override public void start() throws Exception { + if (running) { + return; + } + canalServer.start(); + + canalServer.start(sourceConfig.getDestination()); + this.clientIdentity = new ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), filter); + canalServer.subscribe(clientIdentity); + + running = true; } @@ -66,7 +100,7 @@ public void commit(ConnectRecord record) { @Override public String name() { - return this.sourceConfig.getSourceConnectorConfig().getName(); + return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } @Override