Skip to content

Commit

Permalink
update canal source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed May 10, 2024
1 parent d6314ea commit 023938f
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;

import org.apache.eventmesh.common.config.connector.SinkConfig;
import org.apache.eventmesh.common.config.connector.rdb.jdbc.SinkConnectorConfig;

import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;

import org.apache.eventmesh.common.config.connector.SourceConfig;
import org.apache.eventmesh.common.config.connector.rdb.jdbc.SourceConnectorConfig;

import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -10,5 +9,13 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSourceConfig extends SourceConfig {

private String destination;

private Short clientId;

private Integer batchSize;

private Long batchTimeout;

private SourceConnectorConfig sourceConnectorConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@
@Data
public class SinkConnectorConfig {

private String connectorName;

private String url;

private String userName;

private String passWord;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,11 @@
public class SourceConnectorConfig {

private String connectorName;

private String url;

private String userName;

private String passWord;

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
@Slf4j
public class IPUtils {

public static String getLocalAddress() {
public static String localAddress = init();

private static String init() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
String dockerHostIp = System.getenv("docker_host_ip");
Expand Down Expand Up @@ -114,6 +116,11 @@ public static String getLocalAddress() {
return null;
}

public static String getLocalAddress() {
return localAddress;

}

public static boolean isValidIPV4Address(String ip) {

// Regex for digit from 0 to 255.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation canal
implementation "com.alibaba:druid:1.2.6"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation "org.mockito:mockito-core"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.eventmesh.connector.canal;


import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;
import java.sql.SQLException;

public class DatabaseConnection {

private static DruidDataSource sourceDataSource;

private static DruidDataSource sinkDataSource;

public static CanalSourceConfig sourceConfig;

public static CanalSinkConfig sinkConfig;

public static void initSourceConnection() {
sourceDataSource = new DruidDataSource();
sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl());
sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName());
sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
sourceDataSource.setInitialSize(5);
sourceDataSource.setMinIdle(5);
sourceDataSource.setMaxActive(20);
sourceDataSource.setMaxWait(60000);
sourceDataSource.setTimeBetweenEvictionRunsMillis(60000);
sourceDataSource.setMinEvictableIdleTimeMillis(300000);
sourceDataSource.setValidationQuery("SELECT 1");
sourceDataSource.setTestWhileIdle(true);
sourceDataSource.setTestOnBorrow(false);
sourceDataSource.setTestOnReturn(false);
sourceDataSource.setPoolPreparedStatements(true);
sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
}

public static void initSinkConnection() {
sinkDataSource = new DruidDataSource();
sinkDataSource.setUrl(sinkConfig.getSinkConnectorConfig().getUrl());
sinkDataSource.setUsername(sinkConfig.getSinkConnectorConfig().getUserName());
sinkDataSource.setPassword(sinkConfig.getSinkConnectorConfig().getPassWord());
sinkDataSource.setInitialSize(5);
sinkDataSource.setMinIdle(5);
sinkDataSource.setMaxActive(20);
sinkDataSource.setMaxWait(60000);
sinkDataSource.setTimeBetweenEvictionRunsMillis(60000);
sinkDataSource.setMinEvictableIdleTimeMillis(300000);
sinkDataSource.setValidationQuery("SELECT 1");
sinkDataSource.setTestWhileIdle(true);
sinkDataSource.setTestOnBorrow(false);
sinkDataSource.setTestOnReturn(false);
sinkDataSource.setPoolPreparedStatements(true);
sinkDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
}


public static Connection getSourceConnection() throws SQLException {
return sourceDataSource.getConnection();
}

public static Connection getSinkConnection() throws SQLException {
return sinkDataSource.getConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
@Slf4j
public class CanalSinkConnector implements Sink, ConnectorCreateService<Sink> {

private CanalSinkConfig sinkConfig;

@Override
public Class<? extends Config> configClass() {
return CanalSinkConfig.class;
Expand Down Expand Up @@ -64,7 +66,7 @@ public void commit(ConnectRecord record) {

@Override
public String name() {
return "";
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
import org.apache.eventmesh.connector.canal.DatabaseConnection;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
Expand All @@ -28,13 +29,26 @@
import java.util.List;


import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CanalSourceConnector implements Source, ConnectorCreateService<Source> {

private CanalSourceConfig sourceConfig;

private CanalServerWithEmbedded canalServer;

private ClientIdentity clientIdentity;

private String filter;

private volatile boolean running = false;

@Override
public Class<? extends Config> configClass() {
return CanalSourceConfig.class;
Expand All @@ -44,18 +58,38 @@ public Class<? extends Config> configClass() {
public void init(Config config) throws Exception {
// init config for canal source connector
this.sourceConfig = (CanalSourceConfig) config;

}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (CanalSourceConfig) sourceConnectorContext.getSourceConfig();
// init source database connection
DatabaseConnection.sourceConfig = sourceConfig;
DatabaseConnection.initSourceConnection();

canalServer = CanalServerWithEmbedded.instance();
canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
@Override
public CanalInstance generate(String destination) {
return null;
}
});
}


@Override
public void start() throws Exception {
if (running) {
return;
}
canalServer.start();

canalServer.start(sourceConfig.getDestination());
this.clientIdentity = new ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), filter);
canalServer.subscribe(clientIdentity);

running = true;
}


Expand All @@ -66,7 +100,7 @@ public void commit(ConnectRecord record) {

@Override
public String name() {
return this.sourceConfig.getSourceConnectorConfig().getName();
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}

@Override
Expand Down

0 comments on commit 023938f

Please sign in to comment.