From 03497689739212623a92e61535ee897821ec0224 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Wed, 31 Jul 2024 21:07:40 +0800 Subject: [PATCH] [ISSUE #5052] Enhancement for source\sink connector --- .../sink/connector/CanalSinkConnector.java | 41 +++++++++++++- .../connector/CanalSinkFullConnector.java | 5 ++ .../connector/CanalSourceConnector.java | 5 ++ .../connector/CanalSourceFullConnector.java | 5 ++ .../connector/ChatGPTSourceConnector.java | 5 ++ .../sink/connector/DingDingSinkConnector.java | 5 ++ .../sink/connector/FileSinkConnector.java | 5 ++ .../source/connector/FileSourceConnector.java | 5 ++ .../http/sink/HttpSinkConnector.java | 5 ++ .../http/source/HttpSourceConnector.java | 5 ++ .../jdbc/sink/JdbcSinkConnector.java | 5 ++ .../jdbc/source/JdbcSourceConnector.java | 5 ++ .../sink/connector/KafkaSinkConnector.java | 5 ++ .../connector/KafkaSourceConnector.java | 5 ++ .../sink/connector/KnativeSinkConnector.java | 5 ++ .../connector/KnativeSourceConnector.java | 5 ++ .../sink/connector/LarkSinkConnector.java | 5 ++ .../sink/connector/MongodbSinkConnector.java | 5 ++ .../connector/MongodbSourceConnector.java | 5 ++ .../connector/OpenFunctionSinkConnector.java | 5 ++ .../OpenFunctionSourceConnector.java | 5 ++ .../sink/connector/PravegaSinkConnector.java | 5 ++ .../connector/PravegaSourceConnector.java | 5 ++ .../connector/PrometheusSourceConnector.java | 5 ++ .../sink/connector/PulsarSinkConnector.java | 5 ++ .../connector/PulsarSourceConnector.java | 5 ++ .../sink/connector/RabbitMQSinkConnector.java | 5 ++ .../connector/RabbitMQSourceConnector.java | 5 ++ .../sink/connector/RedisSinkConnector.java | 5 ++ .../connector/RedisSourceConnector.java | 5 ++ .../sink/connector/RocketMQSinkConnector.java | 5 ++ .../connector/RocketMQSourceConnector.java | 5 ++ .../source/connector/S3SourceConnector.java | 5 ++ .../source/MessageSendingOperations.java | 2 +- .../connector/SpringSourceConnector.java | 7 ++- .../spring/pub/SpringPubController.java | 10 ++-- .../eventmesh/openconnect/SourceWorker.java | 10 ++-- .../openconnect/api/connector/Connector.java | 11 +++- .../api/connector/SourceConnectorContext.java | 3 + .../api/callback/SendExceptionContext.java} | 6 +- .../api/callback/SendMessageCallback.java | 4 +- .../offsetmgmt}/api/callback/SendResult.java | 2 +- .../offsetmgmt/api/data/ConnectRecord.java | 25 ++++++++- .../runtime/connector/ConnectorRuntime.java | 55 +++++++++++++++---- .../connector/ConnectorRuntimeConfig.java | 2 + .../src/main/resources/connector.yaml | 1 + 46 files changed, 301 insertions(+), 38 deletions(-) rename eventmesh-openconnect/{eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java => eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java} (90%) rename eventmesh-openconnect/{eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect => eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt}/api/callback/SendMessageCallback.java (87%) rename eventmesh-openconnect/{eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect => eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt}/api/callback/SendResult.java (95%) diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 8ecda8e125..1792b74fbf 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -38,6 +38,8 @@ import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.commons.lang.StringUtils; @@ -146,6 +148,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { executor.shutdown(); @@ -159,7 +166,7 @@ public void put(List sinkRecords) { List canalConnectRecordList = (List) connectRecord.getData(); canalConnectRecordList = filterRecord(canalConnectRecordList); if (isDdlDatas(canalConnectRecordList)) { - doDdl(context, canalConnectRecordList); + doDdl(context, canalConnectRecordList, connectRecord); } else if (sinkConfig.isGTIDMode()) { doLoadWithGtid(context, sinkConfig, connectRecord); } else { @@ -197,7 +204,7 @@ private List filterRecord(List canalConn .collect(Collectors.toList()); } - private void doDdl(DbLoadContext context, List canalConnectRecordList) { + private void doDdl(DbLoadContext context, List canalConnectRecordList, ConnectRecord connectRecord) { for (final CanalConnectRecord record : canalConnectRecordList) { try { Boolean result = jdbcTemplate.execute(new StatementCallback() { @@ -217,9 +224,30 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce context.getFailedRecords().add(record); } } catch (Throwable e) { + connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e)); throw new RuntimeException(e); } } + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); + } + + private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { + SendExceptionContext sendExceptionContext = new SendExceptionContext(); + sendExceptionContext.setMessageId(record.getRecordId()); + sendExceptionContext.setCause(e); + if(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + sendExceptionContext.setTopic(record.getExtension("topic")); + } + return sendExceptionContext; + } + + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; } private void doBefore(List canalConnectRecordList, final DbLoadData loadData) { @@ -291,6 +319,9 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C Exception ex = null; try { ex = result.get(); + if (ex == null) { + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); + } } catch (Exception e) { ex = e; } @@ -298,14 +329,16 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C if (skipException != null && skipException) { if (ex != null) { // do skip - log.warn("skip exception for data : {} , caused by {}", + log.warn("skip exception will ack data : {} , caused by {}", filteredRows, ExceptionUtils.getFullStackTrace(ex)); GtidBatchManager.removeGtidBatch(gtid); + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); } } else { if (ex != null) { log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex)); + connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex)); gtidSingleExecutor.shutdown(); System.exit(1); } else { @@ -314,6 +347,8 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C } } else { log.info("Batch received, waiting for other batches."); + // ack this record + connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord)); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java index 36c03b156c..2b4c9d7a94 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java @@ -109,6 +109,11 @@ public String name() { return null; } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void put(List sinkRecords) { if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index f3f8b2e160..ea5ccdeed0 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -267,6 +267,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (!running) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java index df3c7571c2..97730463b5 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java @@ -159,6 +159,11 @@ public String name() { return this.config.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public List poll() { while (flag.get()) { diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java index 4d54cb2191..6b122087e5 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -224,6 +224,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { Throwable t = this.server.close().cause(); diff --git a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java index 417d9cef36..8c5a1e6611 100644 --- a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java @@ -103,6 +103,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { isRunning = false; diff --git a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java index 89222b35b0..fabae0d43a 100644 --- a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java @@ -103,6 +103,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { outputStream.flush(); diff --git a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java index 6ea0a0d33b..68b1a50989 100644 --- a/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java @@ -86,6 +86,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { try { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java index 6d38b45306..8a14756372 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java @@ -107,6 +107,11 @@ public String name() { return this.httpSinkConfig.connectorConfig.getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.sinkHandler.stop(); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java index 4155aff910..1ca325b18d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java @@ -144,6 +144,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (this.server != null) { diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java index 39681bf179..cc00f1e142 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java @@ -139,6 +139,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + /** * Stops the Connector. * diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java index 2b2efcbef2..810a59e723 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java @@ -192,6 +192,11 @@ public String name() { return "JDBC Source Connector"; } + @Override + public void onException(ConnectRecord record) { + + } + /** * Stops the Connector. * diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java index b257cd0f44..0adafc1ce6 100644 --- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java @@ -94,6 +94,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { producer.close(); diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java index a3be1cbf93..d573126934 100644 --- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java @@ -94,6 +94,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { kafkaConsumer.unsubscribe(); diff --git a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java index a12a1c7461..b14f77ecd4 100644 --- a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/sink/connector/KnativeSinkConnector.java @@ -82,6 +82,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { started.compareAndSet(true, false); diff --git a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java index 537c1ad4d9..1b0c033e8f 100644 --- a/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-knative/src/main/java/org/apache/eventmesh/connector/knative/source/connector/KnativeSourceConnector.java @@ -65,6 +65,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { started.compareAndSet(true, false); diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java index d340dffd13..9981322e8f 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java @@ -110,6 +110,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (!started.compareAndSet(true, false)) { diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java index 776ea8d71f..1001ffa584 100644 --- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java @@ -87,6 +87,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.client.stop(); diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java index e57c396719..df3f66d6a6 100644 --- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java @@ -93,6 +93,11 @@ public String name() { return this.sourceConfig.connectorConfig.getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.client.stop(); diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java index 63444efe28..0f00a7e381 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java @@ -74,6 +74,11 @@ public String name() { return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { } diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java index b66bf9b18c..534ecfb79d 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java @@ -76,6 +76,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java index e5f09e4350..e089ef6760 100644 --- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/sink/connector/PravegaSinkConnector.java @@ -109,6 +109,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { writerMap.forEach((topic, writer) -> writer.close()); diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java index 2611617d8f..836779dbcf 100644 --- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java @@ -148,6 +148,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { sourceHandlerMap.forEach((topic, handler) -> { diff --git a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java index 5c78c718e3..0cafed73f3 100644 --- a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java @@ -145,6 +145,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { log.info("prometheus source connector stop."); diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java index 9ff1f22a29..3f90c6c1be 100644 --- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java @@ -85,6 +85,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { try { diff --git a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java index 212d3eb487..0bc576221e 100644 --- a/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java @@ -87,6 +87,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { try { diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java index 4a94a2cb1f..08d1cefbac 100644 --- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java @@ -95,6 +95,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (started) { diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java index 655c20d9b9..0b7e726bda 100644 --- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java @@ -117,6 +117,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { if (started) { diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java index 83c3498a99..5b7d27c3ba 100644 --- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java @@ -85,6 +85,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.redissonClient.shutdown(); diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java index 70adce59e2..868639c205 100644 --- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java @@ -94,6 +94,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { this.topic.removeAllListeners(); diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java index ae9d4824e5..31d45a28f4 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java @@ -78,6 +78,11 @@ public String name() { return this.sinkConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { producer.shutdown(); diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java index 8ccb84acce..410f927d75 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnector.java @@ -206,6 +206,11 @@ public String name() { return this.sourceConfig.getConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() { consumer.unsubscribe(sourceConfig.getConnectorConfig().getTopic()); diff --git a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java index d0dc30c15e..078ed7691a 100644 --- a/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-s3/src/main/java/org/apache/eventmesh/connector/s3/source/connector/S3SourceConnector.java @@ -121,6 +121,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java index a337c1cd81..5f38914bb1 100644 --- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java +++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/MessageSendingOperations.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.connector.spring.source; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; /** * Operations for sending messages. diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java index 2ab5a3a3c0..be103a1f17 100644 --- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java @@ -25,7 +25,7 @@ import org.apache.eventmesh.common.remote.offset.spring.SpringRecordPartition; import org.apache.eventmesh.connector.spring.source.MessageSendingOperations; import org.apache.eventmesh.openconnect.SourceWorker; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; @@ -95,6 +95,11 @@ public String name() { return this.sourceConfig.getSourceConnectorConfig().getConnectorName(); } + @Override + public void onException(ConnectRecord record) { + + } + @Override public void stop() throws Exception { diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java index b7ea8890ee..a734bb6efa 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/spring/pub/SpringPubController.java @@ -19,9 +19,9 @@ import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector; -import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; -import org.apache.eventmesh.openconnect.api.callback.SendResult; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import java.util.HashMap; import java.util.Map; @@ -53,8 +53,8 @@ public void onSuccess(SendResult sendResult) { } @Override - public void onException(SendExcepionContext sendExcepionContext) { - log.info("Spring source worker send message to EventMesh failed!", sendExcepionContext.getCause()); + public void onException(SendExceptionContext sendExceptionContext) { + log.info("Spring source worker send message to EventMesh failed!", sendExceptionContext.getCause()); } }); return "success!"; diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java index 6e48aa1de8..c3fa7e7cab 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java @@ -32,9 +32,9 @@ import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.SystemUtils; -import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; -import org.apache.eventmesh.openconnect.api.callback.SendResult; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.source.Source; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; @@ -264,8 +264,8 @@ private SendResult convertToSendResult(CloudEvent event) { return result; } - private SendExcepionContext convertToExceptionContext(CloudEvent event, Throwable cause) { - SendExcepionContext exceptionContext = new SendExcepionContext(); + private SendExceptionContext convertToExceptionContext(CloudEvent event, Throwable cause) { + SendExceptionContext exceptionContext = new SendExceptionContext(); exceptionContext.setTopic(event.getId()); exceptionContext.setMessageId(event.getId()); exceptionContext.setCause(cause); diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java index 8ac09eac38..07e44aea94 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java @@ -34,8 +34,7 @@ public interface Connector extends ComponentLifeCycle { Class configClass(); /** - * This init method is obsolete. For detailed discussion, - * please see here + * This init method is obsolete. For detailed discussion, please see here *

* Initializes the Connector with the provided configuration. * @@ -67,4 +66,12 @@ public interface Connector extends ComponentLifeCycle { */ String name(); + /** + * This method will be called when an exception occurs while processing a ConnectRecord object. This method can be used to handle the exception, + * such as logging error information, or stopping the connector's operation when an exception occurs. + * + * @param record The ConnectRecord object that was being processed when the exception occurred + */ + void onException(ConnectRecord record); + } diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java index 55c88ce55a..f70e77248e 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader; import java.util.List; +import java.util.Map; import lombok.Data; @@ -35,6 +36,8 @@ public class SourceConnectorContext implements ConnectorContext { public SourceConfig sourceConfig; + public Map runtimeConfig; + // initial record position public List recordPositionList; diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java similarity index 90% rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java index 0311ceaef5..974b19a547 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendExcepionContext.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendExceptionContext.java @@ -15,15 +15,15 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.api.callback; +package org.apache.eventmesh.openconnect.offsetmgmt.api.callback; -public class SendExcepionContext { +public class SendExceptionContext { private String messageId; private String topic; private Throwable cause; - public SendExcepionContext() { + public SendExceptionContext() { } public String getMessageId() { diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java similarity index 87% rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java index fd6baba7ec..8346cf36b4 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendMessageCallback.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendMessageCallback.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.api.callback; +package org.apache.eventmesh.openconnect.offsetmgmt.api.callback; /** * Message sending callback interface. @@ -24,5 +24,5 @@ public interface SendMessageCallback { void onSuccess(SendResult sendResult); - void onException(SendExcepionContext sendExcepionContext); + void onException(SendExceptionContext sendExceptionContext); } diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java similarity index 95% rename from eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java rename to eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java index 8cd861f6de..9afc745f3d 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/callback/SendResult.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/callback/SendResult.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.openconnect.api.callback; +package org.apache.eventmesh.openconnect.offsetmgmt.api.callback; public class SendResult { diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java index cda57e3758..b3fc4346c4 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java @@ -20,15 +20,19 @@ import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; import java.util.Objects; import java.util.Set; +import java.util.UUID; /** * SourceDataEntries are generated by SourceTasks and passed to specific message queue to store. */ public class ConnectRecord { + private final String recordId = UUID.randomUUID().toString(); + private Long timestamp; private Object data; @@ -37,6 +41,8 @@ public class ConnectRecord { private KeyValue extensions; + private SendMessageCallback callback; + public ConnectRecord() { } @@ -57,6 +63,10 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset, this.data = data; } + public String getRecordId() { + return recordId; + } + public Long getTimestamp() { return timestamp; } @@ -127,6 +137,14 @@ public Object getExtensionObj(String key) { return this.extensions.getObject(key); } + public SendMessageCallback getCallback() { + return callback; + } + + public void setCallback(SendMessageCallback callback) { + this.callback = callback; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -136,19 +154,20 @@ public boolean equals(Object o) { return false; } ConnectRecord that = (ConnectRecord) o; - return Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data) + return Objects.equals(recordId, that.recordId) && Objects.equals(timestamp, that.timestamp) && Objects.equals(data, that.data) && Objects.equals(position, that.position) && Objects.equals(extensions, that.extensions); } @Override public int hashCode() { - return Objects.hash(timestamp, data, position, extensions); + return Objects.hash(recordId, timestamp, data, position, extensions); } @Override public String toString() { return "ConnectRecord{" - + "timestamp=" + timestamp + + "recordId=" + recordId + + ", timestamp=" + timestamp + ", data=" + data + ", position=" + position + ", extensions=" + extensions diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 1605319862..b13a5b35c5 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -38,7 +38,9 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; -import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory; @@ -56,6 +58,7 @@ import org.apache.eventmesh.spi.EventMeshExtensionFactory; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -207,6 +210,7 @@ private void initConnectorService() throws Exception { SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass()); SourceConnectorContext sourceConnectorContext = new SourceConnectorContext(); sourceConnectorContext.setSourceConfig(sourceConfig); + sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig()); sourceConnectorContext.setOffsetStorageReader(offsetStorageReader); if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) { sourceConnectorContext.setRecordPositionList(jobResponse.getPosition()); @@ -332,15 +336,35 @@ private void startSourceConnector() throws Exception { reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE); } + // set a callback for this record + // if used the memory storage callback will be triggered after sink put success + record.setCallback(new SendMessageCallback() { + @Override + public void onSuccess(SendResult result) { + // commit record + sourceConnector.commit(record); + Optional submittedRecordPosition = prepareToUpdateRecordOffset(record); + submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); + Optional callback = + Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v); + callback.ifPresent(cb -> cb.onSuccess(convertToSendResult(record))); + } + + @Override + public void onException(SendExceptionContext sendExceptionContext) { + // handle exception + sourceConnector.onException(record); + log.error("send record to sink callback exception, process will shut down, record: {}", record, sendExceptionContext.getCause()); + try { + stop(); + } catch (Exception e) { + log.error("Failed to stop after exception", e); + } + } + }); + queue.put(record); - Optional submittedRecordPosition = prepareToUpdateRecordOffset(record); - Optional callback = - Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> (SendMessageCallback) v); - // commit record - this.sourceConnector.commit(record); - submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); - // TODO:finish the optional callback - // callback.ifPresent(cb -> cb.onSuccess(record)); + offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS); // update & commit offset updateCommittableOffsets(); @@ -350,13 +374,20 @@ private void startSourceConnector() throws Exception { } } + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if(StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; + } + private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) { - UUID uuid = UUID.randomUUID(); - String recordId = uuid.toString(); String md5Str = md5(record.toString()); ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); - reportVerifyRequest.setRecordID(recordId); + reportVerifyRequest.setRecordID(record.getRecordId()); reportVerifyRequest.setRecordSig(md5Str); reportVerifyRequest.setConnectorName( IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java index 5a58cce08e..ab6fc3aaf5 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntimeConfig.java @@ -37,6 +37,8 @@ public class ConnectorRuntimeConfig { private String region; + private Map runtimeConfig; + private String sourceConnectorType; private String sourceConnectorDesc; diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml b/eventmesh-runtime-v2/src/main/resources/connector.yaml index bf7f58028b..2e79e5cedc 100644 --- a/eventmesh-runtime-v2/src/main/resources/connector.yaml +++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml @@ -18,3 +18,4 @@ taskID: 1 jobID: 1 region: region1 +runtimeConfig: # this used for connector runtime config