From cd3575bc1d68874c8bb2d8fbe19abe9bdb52c2ad Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Thu, 11 Jul 2024 16:49:14 +0800 Subject: [PATCH] close to finish full read and begin full write --- .../connector/rdb/canal/CanalMySQLType.java | 81 ++++---- .../rdb/canal/CanalSinkFullConfig.java | 12 ++ .../rdb/canal/CanalSourceFullConfig.java | 1 + .../rdb/canal/JobRdbFullPosition.java | 5 +- .../offset/canal/CanalFullRecordOffset.java | 18 ++ .../canal/CanalFullRecordPartition.java | 37 ++++ .../connector/canal/DatabaseConnection.java | 4 +- .../eventmesh/connector/canal/SqlUtils.java | 16 +- .../connector/CanalSinkFullConnector.java | 59 ++++++ .../canal/source/CanalFullProducer.java | 188 +++++++++++++----- .../connector/canal/source/EntryParser.java | 32 +-- .../connector/CanalSourceConnector.java | 5 +- .../connector/CanalSourceFullConnector.java | 76 +++++-- .../source/position/CanalFullPositionMgr.java | 89 ++++++--- .../canal/source/table/RdbTableMgr.java | 78 ++++---- 15 files changed, 501 insertions(+), 200 deletions(-) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java index 969e631e93..257822810e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java @@ -2,7 +2,8 @@ import com.mysql.cj.MysqlType; -import java.sql.JDBCType; +import java.util.HashMap; +import java.util.Map; public enum CanalMySQLType { BIT("BIT"), @@ -12,37 +13,38 @@ public enum CanalMySQLType { INT("INT"), BIGINT("BIGINT"), DECIMAL("DECIMAL"), - FLOAT("FLOAT", JDBCType.REAL), - DOUBLE("DOUBLE", JDBCType.DOUBLE), - DATE("DATE", JDBCType.DATE), - DATETIME("DATETIME", JDBCType.TIMESTAMP), - TIMESTAMP("TIMESTAMP", JDBCType.TIMESTAMP), - TIME("TIME", JDBCType.TIME), - YEAR("YEAR", JDBCType.DATE), - CHAR("CHAR", JDBCType.CHAR), - VARCHAR("VARCHAR", JDBCType.VARCHAR), - BINARY("BINARY", JDBCType.BINARY), - VARBINARY("VARBINARY", JDBCType.VARBINARY), - TINYBLOB("TINYBLOB", JDBCType.VARBINARY), - BLOB("BLOB", JDBCType.LONGVARBINARY), - MEDIUMBLOB("MEDIUMBLOB", JDBCType.LONGVARBINARY), - LONGBLOB("LONGBLOB", JDBCType.LONGVARBINARY), - TINYTEXT("TINYTEXT", JDBCType.VARCHAR), - TEXT("TEXT", JDBCType.LONGVARCHAR), - MEDIUMTEXT("MEDIUMTEXT", JDBCType.LONGVARCHAR), - LONGTEXT("LONGTEXT", JDBCType.LONGVARCHAR), - ENUM("ENUM", JDBCType.CHAR), - SET("SET", JDBCType.CHAR), - JSON("JSON", JDBCType.LONGVARCHAR), - GEOMETRY("GEOMETRY", JDBCType.BINARY), - POINT("POINT", JDBCType.BINARY), - LINESTRING("LINESTRING", JDBCType.BINARY), - POLYGON("POLYGON", JDBCType.BINARY), - MULTIPOINT("MULTIPOINT", JDBCType.BINARY), - GEOMETRY_COLLECTION("GEOMETRYCOLLECTION", JDBCType.BINARY), - GEOM_COLLECTION("GEOMCOLLECTION", JDBCType.BINARY), - MULTILINESTRING("MULTILINESTRING", JDBCType.BINARY), - MULTIPOLYGON("MULTIPOLYGON", JDBCType.BINARY); + FLOAT("FLOAT"), + DOUBLE("DOUBLE"), + DATE("DATE"), + DATETIME("DATETIME"), + TIMESTAMP("TIMESTAMP"), + TIME("TIME"), + YEAR("YEAR"), + CHAR("CHAR"), + VARCHAR("VARCHAR"), + BINARY("BINARY"), + VARBINARY("VARBINARY"), + TINYBLOB("TINYBLOB"), + BLOB("BLOB"), + MEDIUMBLOB("MEDIUMBLOB"), + LONGBLOB("LONGBLOB"), + TINYTEXT("TINYTEXT"), + TEXT("TEXT"), + MEDIUMTEXT("MEDIUMTEXT"), + LONGTEXT("LONGTEXT"), + ENUM("ENUM"), + SET("SET"), + JSON("JSON"), + GEOMETRY("GEOMETRY"), + // MysqlType not include the following type + POINT("POINT"), + LINESTRING("LINESTRING"), + POLYGON("POLYGON"), + MULTIPOINT("MULTIPOINT"), + GEOMETRY_COLLECTION("GEOMETRYCOLLECTION"), + GEOM_COLLECTION("GEOMCOLLECTION"), + MULTILINESTRING("MULTILINESTRING"), + MULTIPOLYGON("MULTIPOLYGON"); private final String codeKey; private final MysqlType mysqlType; @@ -51,13 +53,18 @@ public enum CanalMySQLType { this.codeKey = codeKey; this.mysqlType = MysqlType.getByName(codeKey); } - - public static CanalMySQLType valueOfCode(String code) { + private static final Map TYPES = new HashMap<>(); + static { CanalMySQLType[] values = values(); for (CanalMySQLType tableType : values) { - if (tableType.codeKey.equalsIgnoreCase(code)) { - return tableType; - } + TYPES.put(tableType.codeKey, tableType); + } + } + + public static CanalMySQLType valueOfCode(String code) { + CanalMySQLType type = TYPES.get(code.toUpperCase()); + if (type != null) { + return type; } switch (MysqlType.getByName(code)) { case BOOLEAN: diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java new file mode 100644 index 0000000000..5a6deb77cd --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java @@ -0,0 +1,12 @@ +package org.apache.eventmesh.common.config.connector.rdb.canal; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.apache.eventmesh.common.config.connector.SinkConfig; + + +@Data +@EqualsAndHashCode(callSuper = true) +public class CanalSinkFullConfig extends SinkConfig { + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java index fcfa6a0e92..6508e49fdb 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java @@ -13,4 +13,5 @@ public class CanalSourceFullConfig extends SourceConfig { private SourceConnectorConfig connectorConfig; private List startPosition; private int parallel; + private int flushSize; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java index b4aeffd721..5f0f5326ed 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java @@ -1,15 +1,18 @@ package org.apache.eventmesh.common.config.connector.rdb.canal; import lombok.Data; +import lombok.ToString; import java.math.BigDecimal; @Data +@ToString public class JobRdbFullPosition { private String jobId; private String schema; private String tableName; - private String curPrimaryKey; + private String primaryKeyRecords; private long maxCount; private boolean finished; + private BigDecimal percent; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java new file mode 100644 index 0000000000..f1f3a7c132 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java @@ -0,0 +1,18 @@ +package org.apache.eventmesh.common.remote.offset.canal; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; +import org.apache.eventmesh.common.remote.offset.RecordOffset; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString +public class CanalFullRecordOffset extends RecordOffset { + private JobRdbFullPosition position; + @Override + public Class getRecordOffsetClass() { + return CanalFullRecordOffset.class; + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java new file mode 100644 index 0000000000..a325444be5 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.offset.canal; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.eventmesh.common.remote.offset.RecordPartition; + + +@Data +@ToString +@EqualsAndHashCode(callSuper = true) +public class CanalFullRecordPartition extends RecordPartition { + private String schema; + private String table; + + @Override + public Class getRecordPartitionClass() { + return CanalFullRecordPartition.class; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java index 00f9693be3..dc576186ea 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java @@ -18,14 +18,13 @@ package org.apache.eventmesh.connector.canal; +import com.alibaba.druid.pool.DruidDataSource; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; import java.sql.Connection; import java.sql.SQLException; -import com.alibaba.druid.pool.DruidDataSource; - public class DatabaseConnection { public static DruidDataSource sourceDataSource; @@ -42,6 +41,7 @@ public static DruidDataSource createDruidDataSource(String url, String UserName, dataSource.setUsername(UserName); dataSource.setPassword(passWord); dataSource.setInitialSize(5); + dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); dataSource.setMinIdle(5); dataSource.setMaxActive(20); dataSource.setMaxWait(60000); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java index 6d4ca03697..ed72a9eb4d 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java @@ -17,12 +17,11 @@ package org.apache.eventmesh.connector.canal; -import static org.apache.eventmesh.connector.canal.ByteArrayConverter.SQL_BYTES; -import static org.apache.eventmesh.connector.canal.SqlTimestampConverter.SQL_TIMESTAMP; - import com.mysql.cj.MysqlType; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; @@ -42,8 +41,8 @@ import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.eventmesh.connector.canal.ByteArrayConverter.SQL_BYTES; +import static org.apache.eventmesh.connector.canal.SqlTimestampConverter.SQL_TIMESTAMP; public class SqlUtils { @@ -127,8 +126,13 @@ public static String genPrepareSqlOfInClause(int size) { } public static void setInClauseParameters(PreparedStatement preparedStatement, List params) throws SQLException { + setInClauseParameters(preparedStatement, 0, params); + } + + public static void setInClauseParameters(PreparedStatement preparedStatement, int paramIndexStart, + List params) throws SQLException { for (int i = 0; i < params.size(); i++) { - preparedStatement.setString(i + 1, params.get(i)); + preparedStatement.setString(paramIndexStart + i, params.get(i)); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java new file mode 100644 index 0000000000..a43f6a4413 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java @@ -0,0 +1,59 @@ +package org.apache.eventmesh.connector.canal.sink.connector; + +import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; + +public class CanalSinkFullConnector implements Sink, ConnectorCreateService { + private CanalSinkFullConfig config; + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public Sink create() { + return new CanalSinkFullConnector(); + } + + @Override + public Class configClass() { + return CanalSinkFullConfig.class; + } + + @Override + public void init(Config config) throws Exception { + this.config = (CanalSinkFullConfig) config; + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + this.config = (CanalSinkFullConfig)((SinkConnectorContext)connectorContext).getSinkConfig(); + } + + @Override + public void commit(ConnectRecord record) { + + } + + @Override + public String name() { + return null; + } + + @Override + public void put(List sinkRecords) { + + } +} diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java index 9c3a6d07fb..979346512c 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java @@ -1,15 +1,20 @@ package org.apache.eventmesh.connector.canal.source; -import com.mysql.cj.MysqlType; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType; +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordPartition; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.source.position.TableFullPosition; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.locationtech.jts.geom.GeometryFactory; import org.locationtech.jts.io.WKBReader; import javax.sql.DataSource; @@ -26,14 +31,16 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; - @Slf4j public class CanalFullProducer { private BlockingQueue> queue; @@ -45,7 +52,7 @@ public class CanalFullProducer { private final AtomicReference choosePrimaryKey = new AtomicReference<>(null); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter DATE_STAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final WKBReader WKB_READER = new wk + private static final WKBReader WKB_READER = new WKBReader(new GeometryFactory()); public CanalFullProducer(BlockingQueue> queue, DataSource dataSource, MySQLTableDef tableDefinition, TableFullPosition position, int flushSize) { @@ -56,42 +63,108 @@ public CanalFullProducer(BlockingQueue> queue, DataSource da this.flushSize = flushSize; } + public void choosePrimaryKey() { + for (RdbColumnDefinition col : tableDefinition.getColumnDefinitions().values()) { + if (position.getCurPrimaryKeyCols().get(col.getName()) != null) { + choosePrimaryKey.set(col.getName()); + return; + } + } + throw new EventMeshException("illegal: can't pick any primary key"); + } + + public void start(AtomicBoolean flag) { + choosePrimaryKey(); boolean isNextPage = false; - ArrayList records = new ArrayList<>(); + List> rows = new LinkedList<>(); while (flag.get()) { - String scanSql = generateScanSql(tableDefinition, !isNextPage); - log.info("scan sql is [{}] , cur position [{}]", scanSql, - JsonUtils.toJSONString(position.getCurPrimaryKeyCols())); + String scanSql = generateScanSql(!isNextPage); + log.info("scan sql is [{}] , cur position [{}], choose primary key [{}]", scanSql, + JsonUtils.toJSONString(position.getCurPrimaryKeyCols()), choosePrimaryKey.get()); try (Connection connection = dataSource.getConnection(); PreparedStatement statement = connection.prepareStatement(scanSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { statement.setFetchSize(Integer.MIN_VALUE); - setPrepareStatementValue(statement, position); + setPrepareStatementValue(statement); try (ResultSet resultSet = statement.executeQuery()) { + Map lastCol = null; while (flag.get() && resultSet.next()) { - - - if (records.size() < flushSize) { + Map columnValues = new LinkedHashMap<>(); + for (Map.Entry col : + tableDefinition.getColumnDefinitions().entrySet()) { + columnValues.put(col.getKey(), readColumn(resultSet, col.getKey(), + ((MySQLColumnDef) col.getValue()).getType())); + } + lastCol = columnValues; + rows.add(lastCol); + if (rows.size() < flushSize) { continue; } - queue.put(records); - records = new ArrayList<>(); + refreshPosition(lastCol); + commitConnectRecord(rows); + rows = new LinkedList<>(); } + + if (lastCol == null || checkIsScanFinish(lastCol)) { + log.info("full scan db [{}] table [{}] finish", tableDefinition.getSchemaName(), + tableDefinition.getTableName()); + commitConnectRecord(rows); + return; + } + refreshPosition(lastCol); } catch (InterruptedException ignore) { + log.info("full scan db [{}] table [{}] interrupted", tableDefinition.getSchemaName(), + tableDefinition.getTableName()); + Thread.currentThread().interrupt(); + return; } } catch (SQLException e) { - log.error("create connection fail", e); + log.error("catch SQLException fail", e); + LockSupport.parkNanos(3000 * 1000L); + } catch (Exception e) { + log.error("process schema [{}] table [{}] catch unknown exception", tableDefinition.getSchemaName(), + tableDefinition.getTableName(), e); LockSupport.parkNanos(3000 * 1000L); } if (!isNextPage) { isNextPage = true; } } + } + + private void commitConnectRecord(List> rows) throws InterruptedException { + if (rows == null || rows.isEmpty()) { + return; + } + ArrayList records = new ArrayList<>(); + CanalFullRecordOffset offset = new CanalFullRecordOffset(); + JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition(); + jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position)); + offset.setPosition(jobRdbFullPosition); + CanalFullRecordPartition partition = new CanalFullRecordPartition(); + partition.setSchema(tableDefinition.getSchemaName()); + partition.setTable(tableDefinition.getTableName()); + records.add(new ConnectRecord(partition, offset, System.currentTimeMillis(), rows)); + queue.put(records); + } + private boolean checkIsScanFinish(Map lastCol) { + Object lastPrimaryValue = lastCol.get(choosePrimaryKey.get()); + Object maxPrimaryValue = position.getMaxPrimaryKeyCols().get(choosePrimaryKey.get()); + if (lastPrimaryValue instanceof Number) { + BigDecimal last = new BigDecimal(String.valueOf(lastPrimaryValue)); + BigDecimal max = + new BigDecimal(String.valueOf(maxPrimaryValue)); + return last.compareTo(max) > 0; + } + if (lastPrimaryValue instanceof Comparable) { + return ((Comparable) lastPrimaryValue).compareTo(maxPrimaryValue) > 0; + } + return false; } - public Object readColumn(ResultSet rs, String col, MysqlType colType) throws Exception { + public Object readColumn(ResultSet rs, String col, CanalMySQLType colType) throws Exception { switch (colType) { case TINYINT: case SMALLINT: @@ -103,7 +176,7 @@ public Object readColumn(ResultSet rs, String col, MysqlType colType) throws Exc } else { uLong = rs.getLong(col); } - if (uLong.compareTo(2147483647L) > 0) { + if (uLong.compareTo((long) Integer.MAX_VALUE) > 0) { return uLong; } return uLong.intValue(); @@ -152,27 +225,55 @@ public Object readColumn(ResultSet rs, String col, MysqlType colType) throws Exc case LONGBLOB: return rs.getBytes(col); case GEOMETRY: - String wkb = rs.getString(col); - if (wkb == null) { - return null; - } - return safeToGisWKT("0x" + wkb); + return toGeometry("0x" + rs.getString(col)); + case GEOMETRY_COLLECTION: + case GEOM_COLLECTION: + case POINT: + case LINESTRING: + case POLYGON: + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + return null; default: return rs.getObject(col); } } - private void refreshPosition() { + protected static String toGeometry(Object value) throws Exception { + if (value == null) { + return null; + } + if (value instanceof String) { + String strVal = (String) value; + if (!strVal.startsWith("0x") && !strVal.startsWith("0X")) { + return (String) value; + } + return WKB_READER.read(hex2bytes(strVal.substring(2))).toText(); + } else if (value instanceof byte[]) { + return WKB_READER.read((byte[]) value).toText(); + } else { + throw new UnsupportedOperationException("class " + value.getClass() + ", value '" + value + "' , " + + "safeToGisWKT" + + " failed."); + } + } + private void refreshPosition(Map lastCol) { + Map nextPosition = new LinkedHashMap<>(); + for (Map.Entry entry : position.getCurPrimaryKeyCols().entrySet()) { + nextPosition.put(entry.getKey(), lastCol.get(entry.getKey())); + } + position.setCurPrimaryKeyCols(nextPosition); } - private void setPrepareStatementValue(PreparedStatement statement, TableFullPosition position) throws SQLException { + private void setPrepareStatementValue(PreparedStatement statement) throws SQLException { String colName = choosePrimaryKey.get(); if (colName == null) { return; } RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(colName); - Object value = position.getCurPrimaryKeyCols().get(choosePrimaryKey); + Object value = position.getCurPrimaryKeyCols().get(colName); String str; switch (columnDefinition.getJdbcType()) { case BIT: @@ -212,12 +313,12 @@ private void setPrepareStatementValue(PreparedStatement statement, TableFullPosi case DATE: Instant d; if (value instanceof Long) { - Long val = (Long)value; + Long val = (Long) value; d = Instant.ofEpochMilli(val); str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER); } else if (value instanceof Integer) { - Integer val = (Integer)value; - d = Instant.ofEpochMilli((long)val); + Integer val = (Integer) value; + d = Instant.ofEpochMilli((long) val); str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER); } else if (value instanceof String) { str = (String) value; @@ -225,18 +326,18 @@ private void setPrepareStatementValue(PreparedStatement statement, TableFullPosi if (!(value instanceof LocalDate)) { throw new IllegalArgumentException("unsupported date class type:" + value.getClass().getSimpleName()); } - str = ((LocalDate)value).format(DATE_FORMATTER); + str = ((LocalDate) value).format(DATE_FORMATTER); } statement.setString(1, str); break; case TIMESTAMP: if (value instanceof String) { - str = (String)value; + str = (String) value; } else { if (!(value instanceof LocalDateTime)) { throw new IllegalArgumentException("unsupported timestamp class type:" + value.getClass().getSimpleName()); } - str = ((LocalDateTime)value).format(DATE_STAMP_FORMATTER); + str = ((LocalDateTime) value).format(DATE_STAMP_FORMATTER); } statement.setString(1, str); break; @@ -263,10 +364,10 @@ public static byte[] hex2bytes(String hexStr) { char c1 = hexStr.charAt(index); char c2 = hexStr.charAt(index + 1); if (c1 < '0' || c1 > 'F' || c2 < '0' || c2 > 'F') { - throw new EventMeshException(String.format("illegal byte [%s], [%s]" , c1, c2)); + throw new EventMeshException(String.format("illegal byte [%s], [%s]", c1, c2)); } - ret[i] = (byte) ((byte)c1 << 4); - ret[i] = (byte) (ret[i] | (byte)(c2)); + ret[i] = (byte) ((byte) c1 << 4); + ret[i] = (byte) (ret[i] | (byte) (c2)); } return ret; } @@ -289,37 +390,30 @@ private void generateQueryColumnsSql(StringBuilder builder, Collection= ? "); } else { builder.append(" > ? "); } - builder.append(" order by ").append(Constants.MySQLQuot).append(colName).append(Constants.MySQLQuot) + builder.append(" order by ").append(Constants.MySQLQuot).append(choosePrimaryKey.get()).append(Constants.MySQLQuot) .append(" asc "); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 3ca2f7ec2f..8ef60ff04d 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.connector.canal.model.EventColumn; import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable; import org.apache.eventmesh.connector.canal.model.EventType; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.commons.lang3.StringUtils; @@ -31,7 +32,6 @@ import java.util.List; import java.util.Map; -import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.springframework.util.CollectionUtils; import com.alibaba.otter.canal.protocol.CanalEntry; @@ -48,7 +48,8 @@ @Slf4j public class EntryParser { - public static Map> parse(CanalSourceConfig sourceConfig, List datas) { + public static Map> parse(CanalSourceConfig sourceConfig, List datas, + RdbTableMgr tables) { List recordList = new ArrayList<>(); List transactionDataBuffer = new ArrayList<>(); // need check weather the entry is loopback @@ -65,7 +66,7 @@ public static Map> parse(CanalSourceConfig source } break; case TRANSACTIONEND: - parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer, tables); if (!recordList.isEmpty()) { recordMap.put(entry.getHeader().getLogfileOffset(), recordList); } @@ -81,10 +82,11 @@ public static Map> parse(CanalSourceConfig source return recordMap; } - private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List recordList, - List transactionDataBuffer) { + private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, + List recordList, + List transactionDataBuffer, RdbTableMgr tables) { for (Entry bufferEntry : transactionDataBuffer) { - List recordParsedList = internParse(sourceConfig, bufferEntry); + List recordParsedList = internParse(sourceConfig, bufferEntry, tables); if (CollectionUtils.isEmpty(recordParsedList)) { continue; } @@ -100,15 +102,17 @@ private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List } } - private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { - Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName()); + private static boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { + Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), + sourceConfig.getNeedSyncMarkTableColumnName()); if (markedColumn != null) { - return StringUtils.equalsIgnoreCase(markedColumn.getValue(), sourceConfig.getNeedSyncMarkTableColumnValue()); + return StringUtils.equalsIgnoreCase(markedColumn.getValue(), + sourceConfig.getNeedSyncMarkTableColumnValue()); } return false; } - private Column getColumnIgnoreCase(List columns, String columName) { + private static Column getColumnIgnoreCase(List columns, String columName) { for (Column column : columns) { if (column.getName().equalsIgnoreCase(columName)) { return column; @@ -117,7 +121,8 @@ private Column getColumnIgnoreCase(List columns, String columName) { return null; } - private static List internParse(CanalSourceConfig sourceConfig, Entry entry, RdbTableMgr tableMgr) { + private static List internParse(CanalSourceConfig sourceConfig, Entry entry, + RdbTableMgr tableMgr) { String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); if (tableMgr.getTable(schemaName, tableName) == null) { @@ -155,7 +160,8 @@ private static List internParse(CanalSourceConfig sourceConf return recordList; } - private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, RowChange rowChange, RowData rowData) { + private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, + RowChange rowChange, RowData rowData) { CanalConnectRecord canalConnectRecord = new CanalConnectRecord(); canalConnectRecord.setTableName(entry.getHeader().getTableName()); canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName()); @@ -243,7 +249,7 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi } private static void checkUpdateKeyColumns(Map oldKeyColumns, - Map keyColumns) { + Map keyColumns) { if (oldKeyColumns.isEmpty()) { return; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index c1c381c91f..a598352f73 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -17,16 +17,15 @@ package org.apache.eventmesh.connector.canal.source.connector; -import com.alibaba.otter.canal.parse.inbound.TableMeta; import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; -import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.CanalConnectRecord; import org.apache.eventmesh.connector.canal.source.EntryParser; +import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; @@ -296,7 +295,7 @@ public List poll() { List result = new ArrayList<>(); // key: Xid offset - Map> connectorRecordMap = EntryParser.parse(sourceConfig, entries); + Map> connectorRecordMap = EntryParser.parse(sourceConfig, entries, tableMgr); if (!connectorRecordMap.isEmpty()) { Set>> entrySet = connectorRecordMap.entrySet(); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java index b7e15157f0..60a84b4241 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java @@ -1,6 +1,5 @@ package org.apache.eventmesh.connector.canal.source.connector; -import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.AbstractComponent; import org.apache.eventmesh.common.EventMeshThreadFactory; import org.apache.eventmesh.common.config.connector.Config; @@ -8,10 +7,13 @@ import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; -import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.DatabaseConnection; +import org.apache.eventmesh.connector.canal.source.CanalFullProducer; import org.apache.eventmesh.connector.canal.source.position.CanalFullPositionMgr; +import org.apache.eventmesh.connector.canal.source.position.TableFullPosition; import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; @@ -20,14 +22,19 @@ import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; -import javax.sql.DataSource; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; @Slf4j public class CanalSourceFullConnector extends AbstractComponent implements Source, ConnectorCreateService { @@ -37,6 +44,7 @@ public class CanalSourceFullConnector extends AbstractComponent implements Sourc private ThreadPoolExecutor executor; private final Map dataSources = new HashMap<>(); private final BlockingQueue> queue = new LinkedBlockingQueue<>(); + private final AtomicBoolean flag = new AtomicBoolean(true); @Override protected void startup() throws Exception { @@ -46,35 +54,46 @@ protected void startup() throws Exception { log.info("connector [{}] has finished the job", config.getConnectorConfig().getConnectorName()); return; } - executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(),0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-source-full")); + executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(), 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-source-full")); + List producers = new LinkedList<>(); if (config.getConnectorConfig().getDatabases() != null) { for (RdbDBDefinition db : config.getConnectorConfig().getDatabases()) { for (RdbTableDefinition table : db.getTables()) { - log.info("it will create producer of db [{}] table [{}]", db.getSchemaName(), table.getTableName()); - DataSource dataSource = dataSources.computeIfAbsent(db.getSchemaName(), - k -> DatabaseConnection.createDruidDataSource(null, config.getConnectorConfig().getUserName(), - config.getConnectorConfig().getPassWord())); - RdbSimpleTable simpleTable = new RdbSimpleTable(db.getSchemaName(), table.getTableName()); - JobRdbFullPosition position = positionMgr.getPosition(simpleTable); - if (position == null) { - throw new EventMeshException(String.format("db [%s] table [%s] have none position info", + try { + log.info("it will create producer of db [{}] table [{}]", db.getSchemaName(), table.getTableName()); + DataSource dataSource = dataSources.computeIfAbsent(db.getSchemaName(), + k -> DatabaseConnection.createDruidDataSource(config.getConnectorConfig().getUrl(), + config.getConnectorConfig().getUserName(), + config.getConnectorConfig().getPassWord())); + RdbSimpleTable simpleTable = new RdbSimpleTable(db.getSchemaName(), table.getTableName()); + JobRdbFullPosition position = positionMgr.getPosition(simpleTable); + if (position == null) { + throw new EventMeshException(String.format("db [%s] table [%s] have none position info", db.getSchemaName(), table.getTableName())); - } - RdbTableDefinition tableDefinition = tableMgr.getTable(simpleTable); - if (tableDefinition == null) { - throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info", + } + RdbTableDefinition tableDefinition = tableMgr.getTable(simpleTable); + if (tableDefinition == null) { + throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info", db.getSchemaName(), table.getTableName())); - } + } + producers.add(new CanalFullProducer(queue, dataSource, (MySQLTableDef) tableDefinition, + JsonUtils.parseObject(position.getPrimaryKeyRecords(), TableFullPosition.class), + config.getFlushSize())); + } catch (Exception e) { + log.error("create schema [{}] table [{}] producers fail", db.getSchemaName(), + table.getTableName(), e); + } } } } + producers.forEach(p -> executor.execute(() -> p.start(flag))); } @Override protected void shutdown() throws Exception { - + flag.set(false); } @Override @@ -89,7 +108,7 @@ public Class configClass() { @Override public void init(Config config) throws Exception { - this.config = (CanalSourceFullConfig)config; + this.config = (CanalSourceFullConfig) config; } @Override @@ -102,7 +121,7 @@ public void init(ConnectorContext connectorContext) throws Exception { @Override public void commit(ConnectRecord record) { - + // nothing } @Override @@ -112,6 +131,21 @@ public String name() { @Override public List poll() { + while (flag.get()) { + try { + List records = queue.poll(5, TimeUnit.SECONDS); + if (records == null || records.isEmpty()) { + continue; + } + return records; + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + log.info("[{}] thread interrupted", this.getClass()); + return null; + } + } + log.info("[{}] life flag is stop, so return null", this.getClass()); return null; } + } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java index 8003ef1784..fa93871b0f 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java @@ -1,18 +1,18 @@ package org.apache.eventmesh.connector.canal.source.position; import com.alibaba.druid.pool.DruidDataSource; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.AbstractComponent; import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; -import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants; import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.canal.DatabaseConnection; import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable; @@ -25,7 +25,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -54,10 +53,18 @@ protected void startup() throws Exception { thread.setName("task-full-position-timer"); return thread; }, new ScheduledThreadPoolExecutor.DiscardOldestPolicy()); - if (config.getStartPosition() != null) { - processPositions(config.getStartPosition()); - } + prepareRecordPosition(); + initPositions(); + } + public void prepareRecordPosition() { + if (config.getStartPosition() != null && !config.getStartPosition().isEmpty()) { + for (RecordPosition record : config.getStartPosition()) { + CanalFullRecordOffset offset = (CanalFullRecordOffset) record.getRecordOffset(); + RdbSimpleTable table = new RdbSimpleTable(offset.getPosition().getSchema(), offset.getPosition().getTableName()); + positions.put(table, offset.getPosition()); + } + } } public JobRdbFullPosition getPosition(RdbSimpleTable table) { @@ -74,30 +81,39 @@ public boolean isFinished() { return true; } - private void processPositions(List startPosition) throws SQLException { - for (RdbDBDefinition database : config.getConnectorConfig().getDatabases()) { - for (RdbTableDefinition table : database.getTables()) { - RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName()); - RdbTableDefinition tableDefinition; - if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) { - log.error("db [{}] table [{}] definition is null", database.getSchemaName(), table.getTableName()); - continue; - } - log.info("init position of data [{}] table [{}]", database.getSchemaName(), table.getTableName()); - - JobRdbFullPosition recordPosition = positions.get(simpleTable); - if (recordPosition == null || !recordPosition.isFinished()) { - try (DruidDataSource dataSource = DatabaseConnection.createDruidDataSource(null, - config.getConnectorConfig().getUserName(), config.getConnectorConfig().getPassWord())) { - positions.put(simpleTable, initPosition(dataSource, (MySQLTableDef)tableDefinition, recordPosition)); + private void initPositions() { + try (DruidDataSource dataSource = + DatabaseConnection.createDruidDataSource(config.getConnectorConfig().getUrl(), + config.getConnectorConfig().getUserName(), config.getConnectorConfig().getPassWord())) { + for (RdbDBDefinition database : config.getConnectorConfig().getDatabases()) { + for (RdbTableDefinition table : database.getTables()) { + try { + RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName()); + RdbTableDefinition tableDefinition; + if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) { + log.error("db [{}] table [{}] definition is null", database.getSchemaName(), + table.getTableName()); + continue; + } + log.info("init position of data [{}] table [{}]", database.getSchemaName(), + table.getTableName()); + + JobRdbFullPosition recordPosition = positions.get(simpleTable); + if (recordPosition == null || !recordPosition.isFinished()) { + positions.put(simpleTable, fetchTableInfo(dataSource, (MySQLTableDef) tableDefinition, + recordPosition)); + } + } catch (Exception e) { + log.error("process schema [{}] table [{}] position fail", database.getSchemaName(), + table.getTableName(), e); } } } } } - private JobRdbFullPosition initPosition(DataSource dataSource, MySQLTableDef tableDefinition, - JobRdbFullPosition recordPosition) throws SQLException { + private JobRdbFullPosition fetchTableInfo(DataSource dataSource, MySQLTableDef tableDefinition, + JobRdbFullPosition recordPosition) throws SQLException { TableFullPosition position = new TableFullPosition(); Map preMinPrimaryKeys = new LinkedHashMap<>(); Map preMaxPrimaryKeys = new LinkedHashMap<>(); @@ -112,31 +128,36 @@ private JobRdbFullPosition initPosition(DataSource dataSource, MySQLTableDef tab } long rowCount = queryCurTableRowCount(dataSource, tableDefinition); JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition(); - if (recordPosition != null ) { - if (StringUtils.isNotBlank(recordPosition.getCurPrimaryKey())) { - TableFullPosition record = JsonUtils.parseObject(recordPosition.getCurPrimaryKey(), TableFullPosition.class); + if (recordPosition != null) { + if (StringUtils.isNotBlank(recordPosition.getPrimaryKeyRecords())) { + TableFullPosition record = JsonUtils.parseObject(recordPosition.getPrimaryKeyRecords(), + TableFullPosition.class); if (record != null && record.getCurPrimaryKeyCols() != null && !record.getCurPrimaryKeyCols().isEmpty()) { position.setCurPrimaryKeyCols(record.getCurPrimaryKeyCols()); } } + jobRdbFullPosition.setPercent(recordPosition.getPercent()); } jobRdbFullPosition.setSchema(tableDefinition.getSchemaName()); jobRdbFullPosition.setTableName(tableDefinition.getTableName()); jobRdbFullPosition.setMaxCount(rowCount); - jobRdbFullPosition.setCurPrimaryKey(JsonUtils.toJSONString(position.getCurPrimaryKeyCols())); + jobRdbFullPosition.setPrimaryKeyRecords(JsonUtils.toJSONString(position)); return jobRdbFullPosition; } private long queryCurTableRowCount(DataSource datasource, MySQLTableDef tableDefinition) throws SQLException { String sql = - "select AVG_ROW_LENGTH,DATA_LENGTH from information_schema.TABLES where TABLE_SCHEMA=" + Constants.MySQLQuot + tableDefinition.getSchemaName() + Constants.MySQLQuot + " and TABLE_NAME="+ Constants.MySQLQuot + tableDefinition.getTableName() + Constants.MySQLQuot; - try (Statement statement = datasource.getConnection().createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { + "select `AVG_ROW_LENGTH`,`DATA_LENGTH` from information_schema.TABLES where `TABLE_SCHEMA`='" + tableDefinition.getSchemaName() +"' and `TABLE_NAME`='" + tableDefinition.getTableName() +"'"; + try (Statement statement = datasource.getConnection().createStatement(); ResultSet resultSet = + statement.executeQuery(sql)) { long result = 0L; if (resultSet.next()) { long avgRowLength = resultSet.getLong("AVG_ROW_LENGTH"); long dataLength = resultSet.getLong("DATA_LENGTH"); - result = dataLength / avgRowLength; + if (avgRowLength != 0L) { + result = dataLength / avgRowLength; + } } return result; } @@ -175,7 +196,8 @@ private Object fetchMinPrimaryKey(DataSource dataSource, MySQLTableDef tableDefi .append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot).append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot); appendPrePrimaryKey(prePrimary, builder); String sql = builder.toString(); - try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)){ + log.info("fetch min primary sql [{}]", sql); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { setValue2Statement(statement, prePrimary, tableDefinition); try (ResultSet resultSet = statement.executeQuery()) { if (resultSet.next()) { @@ -198,7 +220,8 @@ private Object fetchMaxPrimaryKey(DataSource dataSource, MySQLTableDef tableDefi .append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot).append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot); appendPrePrimaryKey(prePrimary, builder); String sql = builder.toString(); - try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)){ + log.info("fetch max primary sql [{}]", sql); + try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { setValue2Statement(statement, prePrimary, tableDefinition); try (ResultSet resultSet = statement.executeQuery()) { if (resultSet.next()) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java index 6cacc56122..18d134db96 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java @@ -1,9 +1,9 @@ package org.apache.eventmesh.connector.canal.source.table; import com.alibaba.druid.pool.DruidDataSource; -import com.mysql.cj.MysqlType; import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.common.AbstractComponent; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition; import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition; @@ -18,20 +18,20 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * Description: */ @Slf4j public class RdbTableMgr extends AbstractComponent { - private SourceConnectorConfig config; + private final SourceConnectorConfig config; private final Map tables = new HashMap<>(); public RdbTableMgr(SourceConnectorConfig config) { @@ -47,59 +47,63 @@ public RdbTableDefinition getTable(RdbSimpleTable table) { } @Override - protected void startup() throws Exception { + protected void startup() { if (config != null && config.getDatabases() != null) { for (RdbDBDefinition db : config.getDatabases()) { if (db.getTables() == null) { log.warn("init db [{}] position, but it's tables are null", db.getSchemaName()); continue; } - try (DruidDataSource dataSource = DatabaseConnection.createDruidDataSource(null, + try (DruidDataSource dataSource = DatabaseConnection.createDruidDataSource(config.getUrl(), config.getUserName(), config.getPassWord())) { - List tableNames = - db.getTables().stream().map(RdbTableDefinition::getTableName).collect(Collectors.toList()); - Map> primaryKeys = queryTablePrimaryKey(dataSource, tableNames); - Map> columns = queryColumns(dataSource, tableNames); for (RdbTableDefinition table : db.getTables()) { - MySQLTableDef mysqlTable = new MySQLTableDef(); - mysqlTable.setSchemaName(db.getSchemaName()); - mysqlTable.setTableName(table.getTableName()); - if (primaryKeys == null || primaryKeys.isEmpty() || primaryKeys.get(table.getTableName()) == null) { - log.warn("init db [{}] table [{}] info, and primary keys are empty", db.getSchemaName(), - table.getTableName()); - } else { - mysqlTable.setPrimaryKeys(new HashSet<>(primaryKeys.get(table.getTableName()))); - } - if (columns == null || columns.isEmpty() || columns.get(table.getTableName()) == null) { - log.warn("init db [{}] table [{}] info, and columns are empty", db.getSchemaName(), - table.getTableName()); - } else { - LinkedHashMap cols = new LinkedHashMap<>(); - columns.get(table.getTableName()).forEach(x -> cols.put(x.getName(), x)); - mysqlTable.setColumnDefinitions(cols); - } + try { + MySQLTableDef mysqlTable = new MySQLTableDef(); + mysqlTable.setSchemaName(db.getSchemaName()); + mysqlTable.setTableName(table.getTableName()); + List tables = Collections.singletonList(table.getTableName()); + Map> primaryKeys = queryTablePrimaryKey(dataSource, db.getSchemaName(), + tables); + Map> columns = queryColumns(dataSource, db.getSchemaName(), tables); + if (primaryKeys == null || primaryKeys.isEmpty() || primaryKeys.get(table.getTableName()) == null) { + log.warn("init db [{}] table [{}] info, and primary keys are empty", db.getSchemaName(), + table.getTableName()); + } else { + mysqlTable.setPrimaryKeys(new HashSet<>(primaryKeys.get(table.getTableName()))); + } + if (columns == null || columns.isEmpty() || columns.get(table.getTableName()) == null) { + log.warn("init db [{}] table [{}] info, and columns are empty", db.getSchemaName(), + table.getTableName()); + } else { + LinkedHashMap cols = new LinkedHashMap<>(); + columns.get(table.getTableName()).forEach(x -> cols.put(x.getName(), x)); + mysqlTable.setColumnDefinitions(cols); + } - tables.put(new RdbSimpleTable(db.getSchemaName(), table.getTableName()), mysqlTable); + this.tables.put(new RdbSimpleTable(db.getSchemaName(), table.getTableName()), mysqlTable); + } catch (Exception e) { + log.error("init rdb table schema [{}] table [{}] fail", db.getSchemaName(), + table.getTableName(), e); + } } - } catch (Exception e) { - log.error("init db [{}] tables info fail", db.getSchemaName(), e); } - } } } - private Map> queryTablePrimaryKey(DruidDataSource dataSource, List tables) throws SQLException { + private Map> queryTablePrimaryKey(DruidDataSource dataSource, String schema, + List tables) throws SQLException { Map> primaryKeys = new LinkedHashMap<>(); String prepareTables = SqlUtils.genPrepareSqlOfInClause(tables.size()); String sql = "select L.TABLE_NAME,L.COLUMN_NAME,R.CONSTRAINT_TYPE from " + - "INFORMATION_SCHEMA.KEY_COLUMN_USAGE L left join INFORMATION_SCHEMA.TABLE_CONSTRAINTS R on C" + + "INFORMATION_SCHEMA.KEY_COLUMN_USAGE L left join INFORMATION_SCHEMA.TABLE_CONSTRAINTS R on L" + ".TABLE_SCHEMA = R.TABLE_SCHEMA and L.TABLE_NAME = R.TABLE_NAME and L.CONSTRAINT_CATALOG = R" + ".CONSTRAINT_CATALOG and L.CONSTRAINT_SCHEMA = R.CONSTRAINT_SCHEMA and L.CONSTRAINT_NAME = R" + ".CONSTRAINT_NAME where L.TABLE_SCHEMA = ? and L.TABLE_NAME in " + prepareTables + " and R" + ".CONSTRAINT_TYPE IN ('PRIMARY KEY') order by L.ORDINAL_POSITION asc"; try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { - SqlUtils.setInClauseParameters(statement, tables); + statement.setString(1, schema); + SqlUtils.setInClauseParameters(statement, 2, tables); ResultSet resultSet = statement.executeQuery(); if (resultSet == null) { return null; @@ -120,7 +124,7 @@ private Map> queryTablePrimaryKey(DruidDataSource dataSourc return primaryKeys; } - private Map> queryColumns(DataSource dataSource, List tables) throws SQLException { + private Map> queryColumns(DataSource dataSource, String schema, List tables) throws SQLException { String prepareTables = SqlUtils.genPrepareSqlOfInClause(tables.size()); String sql = "select TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH," + "CHARACTER_OCTET_LENGTH,NUMERIC_SCALE,NUMERIC_PRECISION,DATETIME_PRECISION,CHARACTER_SET_NAME," + @@ -129,7 +133,8 @@ private Map> queryColumns(DataSource dataSource, Li "ORDINAL_POSITION asc"; Map> cols = new LinkedHashMap<>(); try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) { - SqlUtils.setInClauseParameters(statement, tables); + statement.setString(1, schema); + SqlUtils.setInClauseParameters(statement, 2, tables); ResultSet resultSet = statement.executeQuery(); if (resultSet == null) { return null; @@ -139,10 +144,9 @@ private Map> queryColumns(DataSource dataSource, Li String colName = resultSet.getString("COLUMN_NAME"); String dataType = resultSet.getString("DATA_TYPE"); JDBCType jdbcType = SqlUtils.toJDBCType(dataType); - MysqlType type = MysqlType.getByName(dataType); MySQLColumnDef col = new MySQLColumnDef(); col.setJdbcType(jdbcType); - col.setType(type); + col.setType(CanalMySQLType.valueOfCode(dataType)); col.setName(colName); cols.compute(tableName, (k, v) -> { if (v == null) {