Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5052] Enhancement for source\sink connector #5066

Merged
merged 13 commits into from
Aug 1, 2024
Merged
52 changes: 18 additions & 34 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;


-- 导出 eventmesh 的数据库结构
-- export eventmesh database
CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `eventmesh`;

-- 导出 表 eventmesh.event_mesh_data_source 结构
-- export table eventmesh.event_mesh_data_source structure
CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand All @@ -39,11 +39,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_job_info 结构
-- export table eventmesh.event_mesh_job_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
Expand All @@ -61,11 +59,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `jobID` (`jobID`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_mysql_position 结构
-- export table eventmesh.event_mesh_mysql_position structure
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand All @@ -80,11 +76,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `jobID` (`jobID`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;

-- 导出 表 eventmesh.event_mesh_position_reporter_history 结构
-- export table eventmesh.event_mesh_position_reporter_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand All @@ -94,57 +88,49 @@ CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
PRIMARY KEY (`id`),
KEY `job` (`job`),
KEY `address` (`address`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='record position reporter changes';

-- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构
-- export table eventmesh.event_mesh_runtime_heartbeat structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间',
`reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime local report time',
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_runtime_history 结构
-- export table eventmesh.event_mesh_runtime_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `address` (`address`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history';

-- 导出 表 eventmesh.event_mesh_task_info 结构
-- export table eventmesh.event_mesh_task_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'TaskState',
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `taskID` (`taskID`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_verify 结构
-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
Expand All @@ -157,8 +143,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。

/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

/**
* for table 'event_mesh_job_info' db operation
* 2024-05-09 15:51:45
*/
@Service
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -146,6 +148,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
executor.shutdown();
Expand All @@ -159,7 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>) connectRecord.getData();
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList);
doDdl(context, canalConnectRecordList, connectRecord);
} else if (sinkConfig.isGTIDMode()) {
doLoadWithGtid(context, sinkConfig, connectRecord);
} else {
Expand Down Expand Up @@ -197,7 +204,7 @@ private List<CanalConnectRecord> filterRecord(List<CanalConnectRecord> canalConn
.collect(Collectors.toList());
}

private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList) {
private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList, ConnectRecord connectRecord) {
for (final CanalConnectRecord record : canalConnectRecordList) {
try {
Boolean result = jdbcTemplate.execute(new StatementCallback<Boolean>() {
Expand All @@ -217,9 +224,30 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce
context.getFailedRecords().add(record);
}
} catch (Throwable e) {
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e));
throw new RuntimeException(e);
}
}
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}

private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
SendExceptionContext sendExceptionContext = new SendExceptionContext();
sendExceptionContext.setMessageId(record.getRecordId());
sendExceptionContext.setCause(e);
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
sendExceptionContext.setTopic(record.getExtension("topic"));
}
return sendExceptionContext;
}

private SendResult convertToSendResult(ConnectRecord record) {
SendResult result = new SendResult();
result.setMessageId(record.getRecordId());
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
result.setTopic(record.getExtension("topic"));
}
return result;
}

private void doBefore(List<CanalConnectRecord> canalConnectRecordList, final DbLoadData loadData) {
Expand Down Expand Up @@ -291,21 +319,26 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
Exception ex = null;
try {
ex = result.get();
if (ex == null) {
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} catch (Exception e) {
ex = e;
}
Boolean skipException = sinkConfig.getSkipException();
if (skipException != null && skipException) {
if (ex != null) {
// do skip
log.warn("skip exception for data : {} , caused by {}",
log.warn("skip exception will ack data : {} , caused by {}",
filteredRows,
ExceptionUtils.getFullStackTrace(ex));
GtidBatchManager.removeGtidBatch(gtid);
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} else {
if (ex != null) {
log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex));
gtidSingleExecutor.shutdown();
System.exit(1);
} else {
Expand All @@ -314,6 +347,8 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
}
} else {
log.info("Batch received, waiting for other batches.");
// ack this record
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public String name() {
return null;
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void put(List<ConnectRecord> sinkRecords) {
if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (!running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public String name() {
return this.config.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public List<ConnectRecord> poll() {
while (flag.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
Throwable t = this.server.close().cause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
isRunning = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
outputStream.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public String name() {
return this.httpSinkConfig.connectorConfig.getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() throws Exception {
this.sinkHandler.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (this.server != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

/**
* Stops the Connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public String name() {
return "JDBC Source Connector";
}

@Override
public void onException(ConnectRecord record) {

}

/**
* Stops the Connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
producer.close();
Expand Down
Loading
Loading