Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4824] Add HTTP Sink Connector #4837

Merged
merged 27 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8275b59
feat: Add HTTP Sink Connector
cnzakii Apr 13, 2024
6776490
refactor: Replace okHttpClient with vertx.WebClient
cnzakii Apr 13, 2024
003f078
fix: Resolving dependency conflicts
cnzakii Apr 14, 2024
cd21ea3
test: Add HttpSinkConnectorTest
cnzakii Apr 14, 2024
dc08ce1
Merge branch 'apache:master' into feat_4824
cnzakii Apr 14, 2024
8486080
fix: Add License
cnzakii Apr 14, 2024
f2e3835
fix: Solving dependency issues
cnzakii Apr 14, 2024
eda1d14
fix: License Check
cnzakii Apr 14, 2024
611c8d6
feat: Add HTTPS/SSL support
cnzakii Apr 14, 2024
64f2822
fix: Optimize logging
cnzakii Apr 15, 2024
77ad32f
feat: Add webhook functionality
cnzakii Apr 16, 2024
3eb3af6
fix: Fix some bugs
cnzakii Apr 16, 2024
3c59c8e
test: add callback test
cnzakii Apr 16, 2024
408aaa6
refactor: Add webhook Support
cnzakii Apr 17, 2024
5cb55a8
fix: Optimization tests and configuration additions
cnzakii Apr 18, 2024
1050f8a
fix: code style
cnzakii Apr 18, 2024
fe2b732
feat: rebuild WebhookHttpSinkHandler and add RetryHttpSinkHandler
cnzakii Apr 27, 2024
7e2d978
fix: fix ci
cnzakii Apr 27, 2024
9c9e4a8
refactor: Use failsafe alternative resilience4j and optimize webhook …
cnzakii Apr 28, 2024
33b14e4
fix: fix License Check
cnzakii Apr 28, 2024
4af7455
fix: update something
cnzakii Apr 29, 2024
5e2bf52
fix: fix ci
cnzakii Apr 29, 2024
3dcc5a9
fix: update something
cnzakii Apr 29, 2024
6f3b361
fix: Optimized naming
cnzakii Apr 29, 2024
71c12e8
fix: fix ci
cnzakii Apr 29, 2024
6732e53
fix: fix style check error
cnzakii Apr 29, 2024
2f2e81e
test: update HttpSinkConnectorTest
cnzakii Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dependencies {
implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
implementation 'io.vertx:vertx-web:4.4.6'
implementation 'io.vertx:vertx-web-client:4.4.6'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
implementation 'io.vertx:vertx-web-validation:4.4.6'
implementation 'dev.failsafe:failsafe:3.3.2'

testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,22 @@ private void doInit() {
// Fill default values if absent
SinkConnectorConfig.populateFieldsWithDefaults(this.httpSinkConfig.connectorConfig);
// Create different handlers for different configurations
HttpSinkHandler sinkHandler0;
HttpSinkHandler nonRetryHandler;
if (this.httpSinkConfig.connectorConfig.getWebhookConfig().isActivate()) {
sinkHandler0 = new WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig);
nonRetryHandler = new WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig);
} else {
sinkHandler0 = new CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);
nonRetryHandler = new CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);
}

if (this.httpSinkConfig.connectorConfig.getRetryConfig().getMaxAttempts() > 1) {
// Wrap the sink handler with a retry handler
this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, sinkHandler0);
} else {
int maxRetries = this.httpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries();
if (maxRetries < 0) {
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
} else if (maxRetries == 0) {
cnzakii marked this conversation as resolved.
Show resolved Hide resolved
// Use the original sink handler
this.sinkHandler = sinkHandler0;
this.sinkHandler = nonRetryHandler;
} else {
// Wrap the sink handler with a retry handler
this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
}
}

Expand Down Expand Up @@ -112,7 +115,7 @@ public void put(List<ConnectRecord> sinkRecords) {
continue;
}
// Handle the ConnectRecord
this.sinkHandler.multiHandle(sinkRecord);
this.sinkHandler.handle(sinkRecord);
} catch (Exception e) {
log.error("Failed to sink message via HTTP. ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

@Data
public class HttpRetryConfig {
// maximum number of attempts to retry, default 3, if set to 0 or 1, no retry
private int maxAttempts = 3;
// maximum number of retries, default 3, minimum 0
private int maxRetries = 3;

// retry interval, default 2000ms
private int interval = 2000;

// Default value is false, indicating that only requests with network-level errors will be retried.
// If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
private boolean retryAll = false;
private boolean retryOnNonSuccess = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ public class HttpWebhookConfig {
private int port;

// timeunit: ms
private int idleTimeout = 5000;
private int serverIdleTimeout = 5000;

// max size of the storage queue
private int maxStorageSize = 5000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.data;

import java.time.LocalDateTime;

import lombok.Builder;
import lombok.Data;

/**
* Metadata for an HTTP export operation.
*/
@Data
@Builder
public class HttpExportMetadata {
private String url;

private int code;

private String message;

private LocalDateTime receivedTime;

private String uuid;

private String retriedBy;

private int retryNum;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.data;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Represents an HTTP export record containing metadata and data to be exported.
*/
@Data
@AllArgsConstructor
public class HttpExportRecord {

private HttpExportMetadata metadata;

private Object data;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.data;

import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Represents a page of HTTP export records.
*/
@Data
@AllArgsConstructor
public class HttpExportRecordPage {

private int pageNum;

private int pageSize;

private List<HttpExportRecord> pageItems;

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,32 +99,32 @@ private void doInitWebClient() {
this.webClient = WebClient.create(vertx, options);
}


/**
* Handles a ConnectRecord by sending it asynchronously to all configured URLs.
* Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
* @param record the ConnectRecord to handle
* @param record the ConnectRecord to process
*/
@Override
public void multiHandle(ConnectRecord record) {
public void handle(ConnectRecord record) {
for (URI url : this.urls) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
handle(url, httpConnectRecord);
deliver(url, httpConnectRecord);
}
}


/**
* Sends the HttpConnectRecord to the specified URL using WebClient.
* Processes HttpConnectRecord on specified URL while returning its own processing logic.
* This method sends the HttpConnectRecord to the specified URL using the WebClient.
*
* @param url the URL to send the HttpConnectRecord
* @param httpConnectRecord the HttpConnectRecord to send
* @return the Future of the HTTP request
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
* @return processing chain
*/
@Override
public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord httpConnectRecord) {
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord) {
// create headers
MultiMap headers = HttpHeaders.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
Expand All @@ -142,18 +142,22 @@ public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord httpConnec
.ssl(Objects.equals(url.getScheme(), "https"))
.sendJson(httpConnectRecord)
.onSuccess(res -> {
if (log.isDebugEnabled()) {
log.debug("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset);
} else {
log.info("Request sent successfully.");
}
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}, responseBody={}", res.statusCode(), res.bodyAsString());
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, offset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
}
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, offset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
}
}

})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
* sending them over HTTP or HTTPS, with additional support for handling multiple requests and asynchronous processing.
*
* <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface.
* Implementing classes must provide implementations for the {@link #start()}, {@link #multiHandle(ConnectRecord)},
* {@link #handle(URI, HttpConnectRecord)}, and {@link #stop()} methods.</p>
* Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)},
* {@link #deliver(URI, HttpConnectRecord)}, and {@link #stop()} methods.</p>
*
* <p>Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently.
* The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication.
* The {@link #multiHandle(ConnectRecord)} method processes a ConnectRecord multiple times by sending it over HTTP or HTTPS.
* The {@link #handle(URI, HttpConnectRecord)} method processes a single ConnectRecord by sending it over HTTP or HTTPS to the specified URL.
* The {@link #stop()} method releases any resources used for HTTP/HTTPS communication.</p>
* The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
* ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL
* while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.</p>
*
* <p>It's recommended to handle exceptions gracefully within the {@link #handle(URI, HttpConnectRecord)} method
* <p>It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method
* to prevent message loss or processing interruptions.</p>
*/
public interface HttpSinkHandler {
Expand All @@ -51,19 +50,21 @@ public interface HttpSinkHandler {
void start();

/**
* Processes the ConnectRecord multiple times.
* Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
* @param record the ConnectRecord to handle
* @param record the ConnectRecord to process
*/
void multiHandle(ConnectRecord record);
void handle(ConnectRecord record);


/**
* Processes the ConnectRecord once.
* Processes HttpConnectRecord on specified URL while returning its own processing logic
*
* @param url the URL to send the ConnectRecord to
* @param httpConnectRecord the ConnectRecord to handle
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
* @return processing chain
*/
Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord httpConnectRecord);
Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord);

/**
* Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed.
Expand Down
Loading