Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into dev-batchlimit
Browse files Browse the repository at this point in the history
# Conflicts:
#	eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
  • Loading branch information
jinrongluo committed Feb 9, 2022
2 parents 0761ae5 + b8891ee commit 3368ed8
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ subprojects {

dependency "io.cloudevents:cloudevents-core:2.2.0"
dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"

dependency "com.github.seancfoley:ipaddress:5.3.3"
}
}
}
2 changes: 2 additions & 0 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-core"
implementation "org.apache.logging.log4j:log4j-slf4j-impl"

implementation 'com.github.seancfoley:ipaddress:5.3.3'

implementation "com.lmax:disruptor"

api "com.fasterxml.jackson.core:jackson-databind"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,35 @@

package org.apache.eventmesh.common.utils;

import org.apache.commons.lang3.StringUtils;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;

import inet.ipaddr.HostName;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;

public class IPUtils {

private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);

public static String getLocalAddress() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
Expand Down Expand Up @@ -173,4 +187,59 @@ public static String parseChannelRemoteAddr(final Channel channel) {

return "";
}

public static boolean isValidDomainOrIp(String url, List<IPAddress> ipV4ReservedAddrs, List<IPAddress> ipV6ReservedAddrs) {
if (StringUtils.isBlank(url)) {
return false;
}
// Engine only need to verify DNS transformed result
if (isValidIp(url)) {
return true;
}
IPAddress ipAddress = domain2Ip(url);
if (ipAddress == null) {
return false;
}
if (ipAddress.isIPv4()) {
return isReservedIp(ipAddress, ipV4ReservedAddrs);
} else {
return isReservedIp(ipAddress, ipV6ReservedAddrs);
}
}

public static boolean isValidIp(String url) {
try {
IPAddressString ipString = new IPAddressString(url);
if (!ipString.isValid()) {
return new IPAddressString(new URL(url).getHost()).isValid();
}
} catch (Exception e) {
logger.warn("Invalid URL format url={}", url, e);
return false;
}
return true;
}

public static IPAddress domain2Ip(String url) {
HostName hostName = new HostName(url);
if (hostName.isValid()) {
return hostName.getAddress();
}
try {
String host = new URL(url).getHost();
return new HostName(host).getAddress();
} catch (MalformedURLException e) {
logger.error("Invalid URL format url={}", url, e);
return null;
}
}

private static boolean isReservedIp(IPAddress ipAddress, List<IPAddress> reservedIps) {
for (IPAddress address : reservedIps) {
if (address.contains(ipAddress)) {
return true;
}
}
return false;
}
}
2 changes: 2 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation "org.apache.httpcomponents:httpclient"
implementation 'io.netty:netty-all'

implementation 'com.github.seancfoley:ipaddress:5.3.3'

implementation project(":eventmesh-common")
implementation project(":eventmesh-spi")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
Expand Down
4 changes: 4 additions & 0 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200

#ip address blacklist
eventmesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventmesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

#connector plugin
eventMesh.connector.plugin.type=standalone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@

import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;

import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;

public class EventMeshHTTPConfiguration extends CommonConfiguration {

public static Logger logger = LoggerFactory.getLogger(EventMeshHTTPConfiguration.class);

public int httpServerPort = 10105;

public boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE;
Expand Down Expand Up @@ -74,6 +87,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {

public int eventMeshEventBatchSize = 10;

public List<IPAddress> eventMeshIpv4BlackList = Collections.emptyList();

public List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();

public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) {
super(configurationWrapper);
}
Expand Down Expand Up @@ -262,7 +279,32 @@ public void init() {
if (StringUtils.isNotEmpty(eventBatchSize) && StringUtils.isNumeric(eventBatchSize)) {
eventMeshEventBatchSize = Integer.parseInt(eventBatchSize);
}

String ipv4BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST);
if (StringUtils.isNotEmpty(ipv4BlackList)) {
eventMeshIpv4BlackList = getBlacklist(ipv4BlackList);
}

String ipv6BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST);
if (StringUtils.isNotEmpty(ipv6BlackList)) {
eventMeshIpv6BlackList = getBlacklist(ipv6BlackList);
}
}
}

private static List<IPAddress> getBlacklist(String cidrs) {
List<String> cidrList = Splitter.on(",").omitEmptyStrings()
.trimResults().splitToList(cidrs);

List<IPAddress> ipAddresses = Lists.newArrayList();
for (String cidr : cidrList) {
try {
ipAddresses.add(new IPAddressString(cidr).toAddress());
} catch (Exception e) {
logger.warn("Invalid cidr={}", cidr, e);
}
}
return ipAddresses;
}

static class ConfKeys {
Expand Down Expand Up @@ -314,5 +356,9 @@ static class ConfKeys {
public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventmesh.server.eventSize";

public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventmesh.server.eventBatchSize";

public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventmesh.server.blacklist.ipv4";

public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventmesh.server.blacklist.ipv6";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
String url = subscribeRequestBody.getUrl();
String consumerGroup = subscribeRequestBody.getConsumerGroup();

// validate URL
try {
if (!IPUtils.isValidDomainOrIp(url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv4BlackList,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIpv6BlackList)) {
httpLogger.error("subscriber url {} is not valid", url);
responseEventMeshCommand = request.createHttpCommandResponse(
subscribeResponseHeader,
SubscribeResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
} catch (Exception e) {
httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage());
responseEventMeshCommand = request.createHttpCommandResponse(
subscribeResponseHeader,
SubscribeResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

synchronized (eventMeshHTTPServer.localClientInfoMapping) {

registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
Expand Down
1 change: 1 addition & 0 deletions tools/third-party-dependencies/known-dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ guava-31.0.1-jre.jar
hamcrest-core-1.3.jar
httpclient-4.5.13.jar
httpcore-4.4.13.jar
ipaddress-5.3.3.jar
j2objc-annotations-1.3.jar
jackson-annotations-2.11.0.jar
jackson-core-2.11.0.jar
Expand Down
Loading

0 comments on commit 3368ed8

Please sign in to comment.