Skip to content

Commit

Permalink
[ISSUE apache#5069] Enhancement for http source/sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Aug 4, 2024
1 parent 771a189 commit 857a6b4
Showing 1 changed file with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.net.URI;
Expand Down Expand Up @@ -111,10 +113,42 @@ public void handle(ConnectRecord record) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
deliver(url, httpConnectRecord);
Future<HttpResponse<Buffer>> responseFuture = deliver(url, httpConnectRecord);
responseFuture.onComplete(res -> {
if (res.succeeded()) {
HttpResponse<Buffer> response = res.result();
if (HttpUtils.is2xxSuccessful(response.statusCode())) {
record.getCallback().onSuccess(convertToSendResult(record));
} else {
record.getCallback().onException(buildSendExceptionContext(record, res.cause()));
}
} else {
record.getCallback().onException(buildSendExceptionContext(record, res.cause()));
}
});
}
}

private SendResult convertToSendResult(ConnectRecord record) {
SendResult result = new SendResult();
result.setMessageId(record.getRecordId());
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
result.setTopic(record.getExtension("topic"));
}
return result;
}

private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
SendExceptionContext sendExceptionContext = new SendExceptionContext();
sendExceptionContext.setMessageId(record.getRecordId());
sendExceptionContext.setCause(e);
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
sendExceptionContext.setTopic(record.getExtension("topic"));
}
return sendExceptionContext;
}



/**
* Processes HttpConnectRecord on specified URL while returning its own processing logic.
Expand Down

0 comments on commit 857a6b4

Please sign in to comment.