Skip to content

Commit

Permalink
[ISSUE apache#5052] Enhancement for source\sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Jul 31, 2024
1 parent 53b7b8c commit 0349768
Show file tree
Hide file tree
Showing 46 changed files with 301 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -146,6 +148,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
executor.shutdown();
Expand All @@ -159,7 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>) connectRecord.getData();
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList);
doDdl(context, canalConnectRecordList, connectRecord);
} else if (sinkConfig.isGTIDMode()) {
doLoadWithGtid(context, sinkConfig, connectRecord);
} else {
Expand Down Expand Up @@ -197,7 +204,7 @@ private List<CanalConnectRecord> filterRecord(List<CanalConnectRecord> canalConn
.collect(Collectors.toList());
}

private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList) {
private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList, ConnectRecord connectRecord) {
for (final CanalConnectRecord record : canalConnectRecordList) {
try {
Boolean result = jdbcTemplate.execute(new StatementCallback<Boolean>() {
Expand All @@ -217,9 +224,30 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce
context.getFailedRecords().add(record);
}
} catch (Throwable e) {
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e));
throw new RuntimeException(e);
}
}
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}

private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
SendExceptionContext sendExceptionContext = new SendExceptionContext();
sendExceptionContext.setMessageId(record.getRecordId());
sendExceptionContext.setCause(e);
if(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
sendExceptionContext.setTopic(record.getExtension("topic"));
}
return sendExceptionContext;
}

private SendResult convertToSendResult(ConnectRecord record) {
SendResult result = new SendResult();
result.setMessageId(record.getRecordId());
if(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
result.setTopic(record.getExtension("topic"));
}
return result;
}

private void doBefore(List<CanalConnectRecord> canalConnectRecordList, final DbLoadData loadData) {
Expand Down Expand Up @@ -291,21 +319,26 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
Exception ex = null;
try {
ex = result.get();
if (ex == null) {
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} catch (Exception e) {
ex = e;
}
Boolean skipException = sinkConfig.getSkipException();
if (skipException != null && skipException) {
if (ex != null) {
// do skip
log.warn("skip exception for data : {} , caused by {}",
log.warn("skip exception will ack data : {} , caused by {}",
filteredRows,
ExceptionUtils.getFullStackTrace(ex));
GtidBatchManager.removeGtidBatch(gtid);
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} else {
if (ex != null) {
log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex));
gtidSingleExecutor.shutdown();
System.exit(1);
} else {
Expand All @@ -314,6 +347,8 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
}
} else {
log.info("Batch received, waiting for other batches.");
// ack this record
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public String name() {
return null;
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void put(List<ConnectRecord> sinkRecords) {
if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (!running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public String name() {
return this.config.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public List<ConnectRecord> poll() {
while (flag.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
Throwable t = this.server.close().cause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
isRunning = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
outputStream.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public String name() {
return this.httpSinkConfig.connectorConfig.getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() throws Exception {
this.sinkHandler.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (this.server != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

/**
* Stops the Connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public String name() {
return "JDBC Source Connector";
}

@Override
public void onException(ConnectRecord record) {

}

/**
* Stops the Connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
producer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
kafkaConsumer.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
started.compareAndSet(true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
started.compareAndSet(true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (!started.compareAndSet(true, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() throws Exception {
this.client.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public String name() {
return this.sourceConfig.connectorConfig.getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() throws Exception {
this.client.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
writerMap.forEach((topic, writer) -> writer.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
sourceHandlerMap.forEach((topic, handler) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
log.info("prometheus source connector stop.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
try {
Expand Down
Loading

0 comments on commit 0349768

Please sign in to comment.