Skip to content

Commit

Permalink
Merge 6f3b361 into 70f9892
Browse files Browse the repository at this point in the history
  • Loading branch information
cnzakii authored Apr 29, 2024
2 parents 70f9892 + 6f3b361 commit d235a48
Show file tree
Hide file tree
Showing 20 changed files with 1,592 additions and 4 deletions.
6 changes: 5 additions & 1 deletion eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'

implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
implementation 'io.vertx:vertx-web:4.4.6'
implementation 'io.vertx:vertx-web-client:4.4.6'
implementation 'dev.failsafe:failsafe:3.3.2'

testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.connector.http.server;

import org.apache.eventmesh.connector.http.config.HttpServerConfig;
import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
import org.apache.eventmesh.openconnect.Application;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
Expand All @@ -33,7 +34,8 @@ public static void main(String[] args) throws Exception {
}

if (serverConfig.isSinkEnable()) {
// TODO support sink connector
Application httpSinkApp = new Application();
httpSinkApp.run(HttpSinkConnector.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink;

import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler;
import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler;
import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler;
import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.List;
import java.util.Objects;

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HttpSinkConnector implements Sink {

private HttpSinkConfig httpSinkConfig;

@Getter
private HttpSinkHandler sinkHandler;

@Override
public Class<? extends Config> configClass() {
return HttpSinkConfig.class;
}

@Override
public void init(Config config) throws Exception {
this.httpSinkConfig = (HttpSinkConfig) config;
doInit();
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
this.httpSinkConfig = (HttpSinkConfig) sinkConnectorContext.getSinkConfig();
doInit();
}

@SneakyThrows
private void doInit() {
// Fill default values if absent
SinkConnectorConfig.populateFieldsWithDefaults(this.httpSinkConfig.connectorConfig);
// Create different handlers for different configurations
HttpSinkHandler nonRetryHandler;
if (this.httpSinkConfig.connectorConfig.getWebhookConfig().isActivate()) {
nonRetryHandler = new WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig);
} else {
nonRetryHandler = new CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);
}

int maxRetries = this.httpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries();
if (maxRetries == 0) {
// Use the original sink handler
this.sinkHandler = nonRetryHandler;
} else if (maxRetries > 0) {
// Wrap the sink handler with a retry handler
this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
} else {
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
}
}

@Override
public void start() throws Exception {
this.sinkHandler.start();
}

@Override
public void commit(ConnectRecord record) {

}

@Override
public String name() {
return this.httpSinkConfig.connectorConfig.getConnectorName();
}

@Override
public void stop() throws Exception {
this.sinkHandler.stop();
}

@Override
public void put(List<ConnectRecord> sinkRecords) {
for (ConnectRecord sinkRecord : sinkRecords) {
try {
if (Objects.isNull(sinkRecord)) {
log.warn("ConnectRecord data is null, ignore.");
continue;
}
// Handle the ConnectRecord
this.sinkHandler.handle(sinkRecord);
} catch (Exception e) {
log.error("Failed to sink message via HTTP. ", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.config;

import lombok.Data;

@Data
public class HttpRetryConfig {
// maximum number of retries, default 3, minimum 0
private int maxRetries = 3;

// retry interval, default 2000ms
private int interval = 2000;

// Default value is false, indicating that only requests with network-level errors will be retried.
// If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
private boolean retryOnNonSuccess = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.config;

import org.apache.eventmesh.openconnect.api.config.SinkConfig;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class HttpSinkConfig extends SinkConfig {

public SinkConnectorConfig connectorConfig;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.config;

import lombok.Data;

@Data
public class HttpWebhookConfig {

private boolean activate = false;

// Path to display/export callback data
private String exportPath = "/export";

private int port;

// timeunit: ms
private int serverIdleTimeout = 5000;

// max size of the storage queue
private int maxStorageSize = 5000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.config;

import io.vertx.core.http.HttpClientOptions;

import lombok.Data;

@Data
public class SinkConnectorConfig {

private String connectorName;

private String[] urls;

// keepAlive, default true
private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE;

// timeunit: ms, default 60000ms
private int keepAliveTimeout = HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent

// timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
private int connectionTimeout = 5000;

// timeunit: ms, default 5000ms
private int idleTimeout;

// maximum number of HTTP/1 connections a client will pool, default 5
private int maxConnectionPoolSize = HttpClientOptions.DEFAULT_MAX_POOL_SIZE;

// retry config
private HttpRetryConfig retryConfig = new HttpRetryConfig();

// webhook config
private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();


/**
* Fill default values if absent (When there are multiple default values for a field)
*
* @param config SinkConnectorConfig
*/
public static void populateFieldsWithDefaults(SinkConnectorConfig config) {
/*
* set default values for idleTimeout
* recommended scope: common(5s - 10s), webhook(15s - 30s)
*/
final int commonHttpIdleTimeout = 5000;
final int webhookHttpIdleTimeout = 15000;

// Set default values for idleTimeout
if (config.getIdleTimeout() == 0) {
int idleTimeout = config.webhookConfig.isActivate() ? webhookHttpIdleTimeout : commonHttpIdleTimeout;
config.setIdleTimeout(idleTimeout);
}

}
}
Loading

0 comments on commit d235a48

Please sign in to comment.