From 6c94eb1ba4e4cae2c92c59493e481966bad2c3fe Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Fri, 9 Sep 2022 18:05:26 +0800 Subject: [PATCH 1/8] =?UTF-8?q?fix(webhook,runtime):1.=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90webhook=E8=81=94=E8=B0=83=202.=20=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E5=8C=96=E4=B8=8D=E6=AD=A3=E7=A1=AE=E4=BB=A3=E7=A0=81=203.=20?= =?UTF-8?q?=E4=BF=AE=E6=8A=A4pulsar=E6=8F=92=E4=BB=B6=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gradle.properties | 2 +- .../webhook/WebHookProtocolAdaptor.java | 43 +++++++++++++++---- .../http/processor/HandlerService.java | 10 ++--- eventmesh-webhook/build.gradle | 2 +- .../eventmesh-webhook-admin/build.gradle | 4 +- .../AdminWebHookConfigOperationManage.java | 3 +- .../admin/FileWebHookConfigOperation.java | 14 ++---- .../admin/NacosWebHookConfigOperation.java | 13 +++--- .../webhook/api/WebHookOperationConstant.java | 4 +- .../webhook/api/utils/StringUtils.java | 8 ++++ .../webhook/receive/WebHookController.java | 18 +++----- .../webhook/receive/WebHookMQProducer.java | 9 +++- .../storage/HookConfigOperationManage.java | 14 +++--- .../receive/storage/WebhookFileListener.java | 36 ++++++++++------ 14 files changed, 106 insertions(+), 74 deletions(-) create mode 100644 eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java diff --git a/eventmesh-connector-plugin/eventmesh-connector-pulsar/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-pulsar/gradle.properties index b4c14f4fc7..fdbe0e83c9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pulsar/gradle.properties +++ b/eventmesh-connector-plugin/eventmesh-connector-pulsar/gradle.properties @@ -15,4 +15,4 @@ # pluginType=connector -pluginName=rocketmq \ No newline at end of file +pluginName=pulsar diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-webhook/src/main/java/org/apache/eventmesh/protocol/webhook/WebHookProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-webhook/src/main/java/org/apache/eventmesh/protocol/webhook/WebHookProtocolAdaptor.java index 6fb448ee83..2a102ad6cb 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-webhook/src/main/java/org/apache/eventmesh/protocol/webhook/WebHookProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-webhook/src/main/java/org/apache/eventmesh/protocol/webhook/WebHookProtocolAdaptor.java @@ -17,14 +17,20 @@ package org.apache.eventmesh.protocol.webhook; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; +import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.protocol.http.WebhookProtocolTransportObject; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; + import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -34,13 +40,14 @@ public class WebHookProtocolAdaptor implements ProtocolAdaptor toBatchCloudEvent(WebhookProtocolTransportObject protoco @Override public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException { - return null; + final HttpEventWrapper httpEventWrapper = new HttpEventWrapper(); + Map sysHeaderMap = new HashMap<>(); + // ce attributes + Set attributeNames = cloudEvent.getAttributeNames(); + // ce extensions + Set extensionNames = cloudEvent.getExtensionNames(); + for (String attributeName : attributeNames) { + sysHeaderMap.put(attributeName, cloudEvent.getAttribute(attributeName)); + } + for (String extensionName : extensionNames) { + sysHeaderMap.put(extensionName, cloudEvent.getExtension(extensionName)); + } + sysHeaderMap.put("cloudEventId", cloudEvent.getId()); + sysHeaderMap.put("cloudEventName", cloudEvent.getSubject()); + sysHeaderMap.put("cloudEventSource", cloudEvent.getSource().toString()); + sysHeaderMap.put("type", cloudEvent.getType()); + httpEventWrapper.setSysHeaderMap(sysHeaderMap); + httpEventWrapper.setBody(cloudEvent.getData().toBytes()); + return httpEventWrapper; } @Override public String getProtocolType() { - return "webhook"; + return "webhookProtocolAdaptor"; } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java index 4222035eea..350ec68a00 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java @@ -94,7 +94,7 @@ public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecuto if (httpProcessorMap.containsKey(path)) { throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ", - path, httpProcessor.getClass().getSimpleName())); + path, httpProcessor.getClass().getSimpleName())); } ProcessorWrapper processorWrapper = new ProcessorWrapper(); processorWrapper.threadPoolExecutor = threadPoolExecutor; @@ -154,10 +154,10 @@ private void sendResponse(ChannelHandlerContext ctx, HttpRequest httpRequest, Ht ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { if (!f.isSuccess()) { httpLogger.warn("send response to [{}] fail, will close this channel", - RemotingHelper.parseChannelRemoteAddr(f.channel())); - if (isClose) { - f.channel().close(); - } + RemotingHelper.parseChannelRemoteAddr(f.channel())); + } + if (isClose) { + f.channel().close(); } }); } diff --git a/eventmesh-webhook/build.gradle b/eventmesh-webhook/build.gradle index 8d8657a4e6..b461b7c920 100644 --- a/eventmesh-webhook/build.gradle +++ b/eventmesh-webhook/build.gradle @@ -25,7 +25,7 @@ task copyEventMeshAdmin(dependsOn: ['jar']) { into('../eventmesh-webhook/dist/apps/') from project.jar.getArchivePath() exclude { - "eventmesh-webhook-${version}.jar" + "eventmesh-webhook-${version}.jar" } } copy { diff --git a/eventmesh-webhook/eventmesh-webhook-admin/build.gradle b/eventmesh-webhook/eventmesh-webhook-admin/build.gradle index af232747a0..4b1b2dac19 100644 --- a/eventmesh-webhook/eventmesh-webhook-admin/build.gradle +++ b/eventmesh-webhook/eventmesh-webhook-admin/build.gradle @@ -18,7 +18,7 @@ dependencies { implementation project(":eventmesh-common") - implementation project(":eventmesh-webhook:eventmesh-webhook-api") + implementation project(":eventmesh-webhook:eventmesh-webhook-api") implementation 'org.slf4j:slf4j-api' implementation "com.alibaba.nacos:nacos-client:2.0.4" @@ -26,7 +26,7 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-core" implementation "com.fasterxml.jackson.core:jackson-annotations" - testImplementation project(":eventmesh-webhook:eventmesh-webhook-api") + testImplementation project(":eventmesh-webhook:eventmesh-webhook-api") } diff --git a/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/AdminWebHookConfigOperationManage.java b/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/AdminWebHookConfigOperationManage.java index 84ecf7014a..d5b228fa17 100644 --- a/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/AdminWebHookConfigOperationManage.java +++ b/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/AdminWebHookConfigOperationManage.java @@ -33,8 +33,6 @@ public class AdminWebHookConfigOperationManage { - public Logger logger = LoggerFactory.getLogger(this.getClass()); - private static final Map> map = new HashMap<>(); static { @@ -42,6 +40,7 @@ public class AdminWebHookConfigOperationManage { map.put("nacos", NacosWebHookConfigOperation.class); } + public Logger logger = LoggerFactory.getLogger(this.getClass()); private ConfigurationWrapper configurationWrapper; private WebHookConfigOperation webHookConfigOperation; diff --git a/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/FileWebHookConfigOperation.java b/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/FileWebHookConfigOperation.java index 6b517df398..9bb517ff10 100644 --- a/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/FileWebHookConfigOperation.java +++ b/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/FileWebHookConfigOperation.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.webhook.api.WebHookConfig; import org.apache.eventmesh.webhook.api.WebHookConfigOperation; import org.apache.eventmesh.webhook.api.WebHookOperationConstant; +import org.apache.eventmesh.webhook.api.utils.StringUtils; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -31,8 +32,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -166,14 +165,9 @@ private String getWebhookConfigManuDir(WebHookConfig webHookConfig) { private File getWebhookConfigFile(WebHookConfig webHookConfig) { String webhookConfigFilePath = null; - try { - // use URLEncoder.encode before, because the path may contain some speacial char like '/', which is illegal as a file name. - webhookConfigFilePath = this.getWebhookConfigManuDir(webHookConfig) - + WebHookOperationConstant.FILE_SEPARATOR + URLEncoder.encode(webHookConfig.getCallbackPath(), "UTF-8") - + WebHookOperationConstant.FILE_EXTENSION; - } catch (UnsupportedEncodingException e) { - logger.error("get webhookConfig file path {} failed", webHookConfig.getCallbackPath(), e); - } + webhookConfigFilePath = this.getWebhookConfigManuDir(webHookConfig) + WebHookOperationConstant.FILE_SEPARATOR + + StringUtils.getFileName(webHookConfig.getCallbackPath()); + assert webhookConfigFilePath != null; return new File(webhookConfigFilePath); } diff --git a/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/NacosWebHookConfigOperation.java b/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/NacosWebHookConfigOperation.java index 33b3ccdbbb..82567065de 100644 --- a/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/NacosWebHookConfigOperation.java +++ b/eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/NacosWebHookConfigOperation.java @@ -23,7 +23,6 @@ import static org.apache.eventmesh.webhook.api.WebHookOperationConstant.TIMEOUT_MS; - import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.webhook.api.ManufacturerObject; import org.apache.eventmesh.webhook.api.WebHookConfig; @@ -80,7 +79,7 @@ public Integer insertWebHookConfig(WebHookConfig webHookConfig) { return 0; } result = configService.publishConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig), - JsonUtils.serialize(webHookConfig), ConfigType.JSON.getType()); + JsonUtils.serialize(webHookConfig), ConfigType.JSON.getType()); } catch (NacosException e) { logger.error("insertWebHookConfig failed", e); return 0; @@ -92,7 +91,7 @@ public Integer insertWebHookConfig(WebHookConfig webHookConfig) { manufacturerObject.addManufacturer(manufacturerName); manufacturerObject.getManufacturerEvents(manufacturerName).add(getWebHookConfigDataId(webHookConfig)); configService.publishConfig(MANUFACTURERS_DATA_ID, CONSTANTS_WEBHOOK, - JsonUtils.serialize(manufacturerObject), ConfigType.JSON.getType()); + JsonUtils.serialize(manufacturerObject), ConfigType.JSON.getType()); } catch (NacosException e) { logger.error("update manufacturersInfo error", e); //rollback insert @@ -115,7 +114,7 @@ public Integer updateWebHookConfig(WebHookConfig webHookConfig) { return 0; } result = configService.publishConfig(getWebHookConfigDataId(webHookConfig), - getManuGroupId(webHookConfig), JsonUtils.serialize(webHookConfig), ConfigType.JSON.getType()); + getManuGroupId(webHookConfig), JsonUtils.serialize(webHookConfig), ConfigType.JSON.getType()); } catch (NacosException e) { logger.error("updateWebHookConfig failed", e); } @@ -136,7 +135,7 @@ public Integer deleteWebHookConfig(WebHookConfig webHookConfig) { ManufacturerObject manufacturerObject = getManufacturersInfo(); manufacturerObject.getManufacturerEvents(manufacturerName).remove(getWebHookConfigDataId(webHookConfig)); configService.publishConfig(MANUFACTURERS_DATA_ID, CONSTANTS_WEBHOOK, - JsonUtils.serialize(manufacturerObject), ConfigType.JSON.getType()); + JsonUtils.serialize(manufacturerObject), ConfigType.JSON.getType()); } catch (NacosException e) { logger.error("update manufacturersInfo error", e); } @@ -170,7 +169,7 @@ public List queryWebHookConfigByManufacturer(WebHookConfig webHoo // nacos API is not able to get all config, so use foreach for (int i = startIndex; i < endIndex && i < manufacturerEvents.size(); i++) { String content = configService.getConfig(manufacturerEvents.get(i) + DATA_ID_EXTENSION, - getManuGroupId(webHookConfig), TIMEOUT_MS); + getManuGroupId(webHookConfig), TIMEOUT_MS); webHookConfigs.add(JsonUtils.deserialize(content, WebHookConfig.class)); } } @@ -201,7 +200,7 @@ private String getManuGroupId(WebHookConfig webHookConfig) { private ManufacturerObject getManufacturersInfo() throws NacosException { String manufacturersContent = configService.getConfig(MANUFACTURERS_DATA_ID, CONSTANTS_WEBHOOK, TIMEOUT_MS); return StringUtil.isNullOrEmpty(manufacturersContent) - ? new ManufacturerObject() : JsonUtils.deserialize(manufacturersContent, ManufacturerObject.class); + ? new ManufacturerObject() : JsonUtils.deserialize(manufacturersContent, ManufacturerObject.class); } } diff --git a/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/WebHookOperationConstant.java b/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/WebHookOperationConstant.java index 2a887f682f..92de716e0c 100644 --- a/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/WebHookOperationConstant.java +++ b/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/WebHookOperationConstant.java @@ -42,8 +42,8 @@ public class WebHookOperationConstant { public static final String getFilePath(String filePath) { if (filePath.startsWith("#{eventMeshHome}")) { String configPath = System.getProperty("confPath", System.getenv("confPath")); - - filePath = filePath.replace("#{eventMeshHome}", configPath.substring(0, configPath.lastIndexOf(FILE_SEPARATOR))); + + filePath = filePath.replace("#{eventMeshHome}", configPath.substring(0, configPath.lastIndexOf(FILE_SEPARATOR))); } return filePath; } diff --git a/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java b/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java new file mode 100644 index 0000000000..890c0cdfda --- /dev/null +++ b/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java @@ -0,0 +1,8 @@ +package org.apache.eventmesh.webhook.api.utils; + +public class StringUtils { + + public static final String getFileName(String path) { + return path.substring(1).replace('/', '.'); + } +} diff --git a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookController.java b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookController.java index c3c9c7b3d0..1b13cbbb21 100644 --- a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookController.java +++ b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookController.java @@ -41,13 +41,11 @@ public class WebHookController { - public Logger logger = LoggerFactory.getLogger(this.getClass()); - /** * protocol pool */ private final ProtocolManage protocolManage = new ProtocolManage(); - + public Logger logger = LoggerFactory.getLogger(this.getClass()); /** * config pool */ @@ -61,7 +59,7 @@ public class WebHookController { private ConfigurationWrapper configurationWrapper; public void init() throws Exception { - this.webHookMQProducer = new WebHookMQProducer( + this.webHookMQProducer = new WebHookMQProducer(configurationWrapper.getProperties(), configurationWrapper.getProp("eventMesh.webHook.producer.connector")); this.hookConfigOperationManage = new HookConfigOperationManage(configurationWrapper); this.protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor("webhook"); @@ -98,7 +96,7 @@ public void execute(String path, Map header, byte[] body) throws try { protocol.execute(webHookRequest, webHookConfig, header); } catch (Exception e) { - throw new Exception("Webhook Message Parse Failed."); + throw new Exception("Webhook Message Parse Failed. " + e.getMessage(), e); } // 3. convert to cloudEvent obj @@ -107,13 +105,9 @@ public void execute(String path, Map header, byte[] body) throws String eventType = manufacturerName + "." + webHookConfig.getManufacturerEventName(); WebhookProtocolTransportObject webhookProtocolTransportObject = WebhookProtocolTransportObject.builder() - .cloudEventId(cloudEventId) - .eventType(eventType) - .cloudEventName(webHookConfig.getCloudEventName()) - .cloudEventSource(webHookConfig.getCloudEventSource()) - .dataContentType(webHookConfig.getDataContentType()) - .body(body) - .build(); + .cloudEventId(cloudEventId).eventType(eventType).cloudEventName(webHookConfig.getCloudEventName()) + .cloudEventSource("www." + webHookConfig.getManufacturerName() + ".com") + .dataContentType(webHookConfig.getDataContentType()).body(body).build(); // 4. send cloudEvent webHookMQProducer.send(this.protocolAdaptor.toCloudEvent(webhookProtocolTransportObject), new SendCallback() { diff --git a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookMQProducer.java b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookMQProducer.java index 4058b233fb..45f3f28444 100644 --- a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookMQProducer.java +++ b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/WebHookMQProducer.java @@ -22,6 +22,8 @@ import org.apache.eventmesh.api.factory.ConnectorPluginFactory; import org.apache.eventmesh.api.producer.Producer; +import java.util.Properties; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,12 +35,17 @@ public class WebHookMQProducer { protected Producer hookMQProducer; - public WebHookMQProducer(String connectorPluginType) { + public WebHookMQProducer(Properties properties, String connectorPluginType) { this.hookMQProducer = ConnectorPluginFactory.getMeshMQProducer(connectorPluginType); if (hookMQProducer == null) { logger.error("can't load the hookMQProducer plugin, please check."); throw new RuntimeException("doesn't load the hookMQProducer plugin, please check."); } + try { + this.hookMQProducer.init(properties); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } public void send(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { diff --git a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/HookConfigOperationManage.java b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/HookConfigOperationManage.java index d47c7f89ca..000a62a979 100644 --- a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/HookConfigOperationManage.java +++ b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/HookConfigOperationManage.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.webhook.api.WebHookConfig; import org.apache.eventmesh.webhook.api.WebHookConfigOperation; import org.apache.eventmesh.webhook.api.WebHookOperationConstant; +import org.apache.eventmesh.webhook.api.utils.StringUtils; import java.io.FileNotFoundException; import java.util.List; @@ -43,16 +44,13 @@ public class HookConfigOperationManage implements WebHookConfigOperation { - public Logger logger = LoggerFactory.getLogger(this.getClass()); - - private String operationMode; - - private ConfigService nacosConfigService; - /** * webhook config pool -> key is CallbackPath */ private final Map cacheWebHookConfig = new ConcurrentHashMap<>(); + public Logger logger = LoggerFactory.getLogger(this.getClass()); + private String operationMode; + private ConfigService nacosConfigService; public HookConfigOperationManage() { } @@ -60,7 +58,7 @@ public HookConfigOperationManage() { /** * Initialize according to operationMode * - * @param configurationWrapper + * @param configurationWrapper */ public HookConfigOperationManage(ConfigurationWrapper configurationWrapper) throws FileNotFoundException, NacosException { @@ -80,7 +78,7 @@ private void nacosModeInit(Properties config) throws NacosException { @Override public WebHookConfig queryWebHookConfigById(WebHookConfig webHookConfig) { if ("file".equals(operationMode)) { - return cacheWebHookConfig.get(webHookConfig.getCallbackPath()); + return cacheWebHookConfig.get(StringUtils.getFileName(webHookConfig.getCallbackPath())); } else if ("nacos".equals(operationMode)) { try { String content = nacosConfigService.getConfig(webHookConfig.getManufacturerEventName() + DATA_ID_EXTENSION, diff --git a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/WebhookFileListener.java b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/WebhookFileListener.java index 1c02030278..780a2ae515 100644 --- a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/WebhookFileListener.java +++ b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/WebhookFileListener.java @@ -24,6 +24,7 @@ import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.webhook.api.WebHookConfig; import org.apache.eventmesh.webhook.api.WebHookOperationConstant; +import org.apache.eventmesh.webhook.api.utils.StringUtils; import java.io.BufferedReader; import java.io.File; @@ -49,16 +50,12 @@ public class WebhookFileListener { + private final Set pathSet = new LinkedHashSet<>(); // monitored subdirectory + private final Map watchKeyPathMap = new HashMap<>(); // WatchKey's path public Logger logger = LoggerFactory.getLogger(this.getClass()); - private String filePath; - private Map cacheWebHookConfig; - private final Set pathSet = new LinkedHashSet<>(); // monitored subdirectory - - private final Map watchKeyPathMap = new HashMap<>(); // WatchKey's path - public WebhookFileListener() { } @@ -113,7 +110,11 @@ public void cacheInit(File webhookConfigFile) { logger.error("cacheInit failed", e); } WebHookConfig webHookConfig = JsonUtils.deserialize(fileContent.toString(), WebHookConfig.class); - cacheWebHookConfig.put(webHookConfig.getCallbackPath(), webHookConfig); + cacheWebHookConfig.put(webhookConfigFile.getName(), webHookConfig); + } + + public void deleteConfig(File webhookConfigFile) { + cacheWebHookConfig.remove(webhookConfigFile.getName()); } /** @@ -155,18 +156,25 @@ public void fileWatchRegister() { for (WatchEvent event : key.pollEvents()) { String flashPath = watchKeyPathMap.get(key); // manufacturer change - if (flashPath.equals(filePath)) { - if (ENTRY_CREATE == event.kind()) { + String path = flashPath + "/" + event.context(); + File file = new File(path); + if (ENTRY_CREATE == event.kind() || ENTRY_MODIFY == event.kind()) { + if (file.isFile()) { + cacheInit(file); + } else { try { - key = Paths.get(filePath + WebHookOperationConstant.FILE_SEPARATOR + event.context()) - .register(service, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE); + key = Paths.get(path).register(service, ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE); + watchKeyPathMap.put(key, path); } catch (IOException e) { logger.error("registerWatchKey failed", e); } - watchKeyPathMap.put(key, filePath + WebHookOperationConstant.FILE_SEPARATOR + event.context()); } - } else { // config change - cacheInit(new File(flashPath + WebHookOperationConstant.FILE_SEPARATOR + event.context())); + } else if (ENTRY_DELETE == event.kind()) { + if (file.isDirectory()) { + watchKeyPathMap.remove(key); + } else { + deleteConfig(file); + } } } if (!key.reset()) { From 77311b01cadba09b3dbaadace0f990a5a7b7f37c Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Fri, 9 Sep 2022 21:46:10 +0800 Subject: [PATCH 2/8] =?UTF-8?q?fix(config):1.=E5=90=8C=E6=AD=A5=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/config/ConfigurationWrapper.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigurationWrapper.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigurationWrapper.java index b24426c92b..c3e88e34e6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigurationWrapper.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigurationWrapper.java @@ -64,12 +64,12 @@ public boolean support(FileChangeContext changeContext) { public ConfigurationWrapper(String directoryPath, String fileName, boolean reload) { this.directoryPath = directoryPath - .replace('/', File.separator.charAt(0)) - .replace('\\', File.separator.charAt(0)); + .replace('/', File.separator.charAt(0)) + .replace('\\', File.separator.charAt(0)); this.fileName = fileName; this.file = (directoryPath + File.separator + fileName) - .replace('/', File.separator.charAt(0)) - .replace('\\', File.separator.charAt(0)); + .replace('/', File.separator.charAt(0)) + .replace('\\', File.separator.charAt(0)); this.reload = reload; init(); } @@ -104,7 +104,7 @@ public int getIntProp(String configKey, int defaultValue) { return defaultValue; } Preconditions.checkState(StringUtils.isNumeric(configValue), - String.format("key:%s, value:%s error", configKey, configValue)); + String.format("key:%s, value:%s error", configKey, configValue)); return Integer.parseInt(configValue); } @@ -138,4 +138,8 @@ public T getPropertiesByConfig(String prefix, Class clazz, boolean remove return (T) objectMapper.convertValue(getPropertiesByConfig(prefix, removePrefix), clazz); } + public Properties getProperties() { + return this.properties; + } + } \ No newline at end of file From 4284418a456c88ea87053bc77fff7bd4b79a4230 Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Fri, 9 Sep 2022 22:28:06 +0800 Subject: [PATCH 3/8] =?UTF-8?q?fix(config):1.=E5=90=8C=E6=AD=A5=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../eventmesh/webhook/receive/protocol/GithubProtocol.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java index 67804ca25f..05e3d4b2cf 100644 --- a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java +++ b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java @@ -49,7 +49,7 @@ public void execute(WebHookRequest webHookRequest, WebHookConfig webHookConfig, } try { - webHookRequest.setManufacturerEventId(header.get("X-GitHub-Delivery")); + webHookRequest.setManufacturerEventId(header.get("x-gitHub-delivery")); webHookRequest.setManufacturerEventName(webHookConfig.getManufacturerEventName()); webHookRequest.setManufacturerSource(getManufacturerName()); } catch (Exception e) { From 1f6884f2e74828c94c9991333a1beb68073607e3 Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Fri, 9 Sep 2022 22:52:15 +0800 Subject: [PATCH 4/8] =?UTF-8?q?fix(config):1.=E5=90=8C=E6=AD=A5=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../eventmesh/webhook/receive/protocol/GithubProtocol.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java index 05e3d4b2cf..c6d1e9d158 100644 --- a/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java +++ b/eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/protocol/GithubProtocol.java @@ -49,7 +49,7 @@ public void execute(WebHookRequest webHookRequest, WebHookConfig webHookConfig, } try { - webHookRequest.setManufacturerEventId(header.get("x-gitHub-delivery")); + webHookRequest.setManufacturerEventId(header.get("x-github-delivery")); webHookRequest.setManufacturerEventName(webHookConfig.getManufacturerEventName()); webHookRequest.setManufacturerSource(getManufacturerName()); } catch (Exception e) { From 97b3b3a774bee7bff3c48daaec825170125f6cc2 Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Tue, 13 Sep 2022 21:42:38 +0800 Subject: [PATCH 5/8] fix(runtime):1. webhook test --- .gitignore | 2 ++ .../runtime/protocol/processor/WebHookProcessorTest.java | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 817436eb14..48794e2a68 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,5 @@ h2/db.mv.db all-dependencies.txt self-modules.txt third-party-dependencies.txt +/bin/ +*/**/bin/ diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/protocol/processor/WebHookProcessorTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/protocol/processor/WebHookProcessorTest.java index 052ec99e00..52bdb9f163 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/protocol/processor/WebHookProcessorTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/protocol/processor/WebHookProcessorTest.java @@ -82,7 +82,7 @@ public void testHandler() { CloudEvent msgSendToMq = captor.getValue(); Assert.assertNotNull(msgSendToMq); Assert.assertTrue(StringUtils.isNoneBlank(msgSendToMq.getId())); - Assert.assertEquals("github", msgSendToMq.getSource().getPath()); + Assert.assertEquals("www.github.com", msgSendToMq.getSource().getPath()); Assert.assertEquals("github.ForkEvent", msgSendToMq.getType()); Assert.assertEquals(BytesCloudEventData.wrap("\"mock_data\":0".getBytes()), msgSendToMq.getData()); } catch (Exception e) { @@ -97,7 +97,7 @@ private HttpRequest buildMockWebhookRequest() { FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/webhook/github/eventmesh/all", buffer); request.headers().set("content-type", "application/json"); // encrypt method see: GithubProtocol - request.headers().set("X-Hub-Signature-256", "sha256=ddb62e1182e2e6d364c0b5d03f2413fd5d1f68d99d1a4b3873e0d6850650d4b3"); + request.headers().set("x-hub-signature-256", "sha256=ddb62e1182e2e6d364c0b5d03f2413fd5d1f68d99d1a4b3873e0d6850650d4b3"); return request; } From f089c5da680a6b7ede6c9e4fd63cccc086f9839e Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Tue, 13 Sep 2022 22:30:17 +0800 Subject: [PATCH 6/8] fix(webhook):1. licensed --- .../webhook/api/utils/StringUtils.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java b/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java index 890c0cdfda..99a13c82ba 100644 --- a/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java +++ b/eventmesh-webhook/eventmesh-webhook-api/src/main/java/org/apache/eventmesh/webhook/api/utils/StringUtils.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.webhook.api.utils; public class StringUtils { From 9fdfa135b95ec2b40aa7afa67b2f2cbf5c93b213 Mon Sep 17 00:00:00 2001 From: laohu <2732554140@qq.com> Date: Tue, 13 Sep 2022 23:49:45 +0800 Subject: [PATCH 7/8] fix(*):1. licensed --- tools/dependency-check/known-dependencies.txt | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt index 3238fd2dfa..d8d05e2ace 100644 --- a/tools/dependency-check/known-dependencies.txt +++ b/tools/dependency-check/known-dependencies.txt @@ -166,4 +166,14 @@ system-rules-1.16.1.jar validation-api-1.1.0.Final.jar zipkin-2.23.2.jar zipkin-reporter-2.16.3.jar -zipkin-sender-okhttp3-2.16.3.jar \ No newline at end of file +zipkin-sender-okhttp3-2.16.3.jar +truth-0.30.jar +commons-cli-1.2.jar +commons-collections-3.2.2.jar +commons-validator-1.7.jar +dledger-0.2.3.jar +fastjson-1.2.76.jar +jcommander-1.72.jar +jna-4.2.2.jar +commons-beanutils-1.9.4.jar +commons-digester-2.1.jar \ No newline at end of file From f54ea71a9d741541f04ce4f88ca41c4eb7905ef3 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Wed, 14 Sep 2022 11:35:20 +0800 Subject: [PATCH 8/8] update known-dependencies.txt --- tools/dependency-check/known-dependencies.txt | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt index d8d05e2ace..8fadc114bc 100644 --- a/tools/dependency-check/known-dependencies.txt +++ b/tools/dependency-check/known-dependencies.txt @@ -18,16 +18,23 @@ cloudevents-core-2.2.0.jar cloudevents-core-2.3.0.jar cloudevents-json-jackson-2.2.0.jar cloudevents-protobuf-2.3.0.jar +commons-beanutils-1.9.4.jar +commons-cli-1.2.jar commons-codec-1.11.jar +commons-collections-3.2.2.jar commons-collections4-4.1.jar +commons-digester-2.1.jar +commons-io-2.11.0.jar commons-lang3-3.6.jar commons-logging-1.2.jar -commons-io-2.11.0.jar commons-text-1.9.jar +commons-validator-1.7.jar consul-api-1.4.5.jar disruptor-3.4.2.jar +dledger-0.2.3.jar error_prone_annotations-2.9.0.jar failureaccess-1.0.1.jar +fastjson-1.2.76.jar google-auth-library-credentials-0.22.2.jar grpc-api-1.42.2.jar grpc-auth-1.39.0.jar @@ -59,10 +66,12 @@ javax.ws.rs-api-2.1.jar jboss-marshalling-2.0.11.Final.jar jboss-marshalling-river-2.0.11.Final.jar jcip-annotations-1.0.jar +jcommander-1.72.jar jcommander-1.78.jar jetcd-common-0.3.0.jar jetcd-core-0.3.0.jar jetcd-resolver-0.3.0.jar +jna-4.2.2.jar jodd-bean-5.1.6.jar jodd-core-5.1.6.jar jsr305-3.0.2.jar @@ -83,10 +92,10 @@ netty-codec-4.1.73.Final.jar netty-codec-4.1.79.Final.jar netty-codec-dns-4.1.73.Final.jar netty-codec-haproxy-4.1.73.Final.jar -netty-codec-http-4.1.73.Final.jar -netty-codec-http-4.1.79.Final.jar netty-codec-http2-4.1.73.Final.jar netty-codec-http2-4.1.79.Final.jar +netty-codec-http-4.1.73.Final.jar +netty-codec-http-4.1.79.Final.jar netty-codec-memcache-4.1.73.Final.jar netty-codec-mqtt-4.1.73.Final.jar netty-codec-redis-4.1.73.Final.jar @@ -135,45 +144,48 @@ opentelemetry-sdk-common-1.3.0.jar opentelemetry-sdk-metrics-1.3.0-alpha.jar opentelemetry-sdk-trace-1.3.0.jar opentelemetry-semconv-1.3.0-alpha.jar +perfmark-api-0.23.0.jar pravega-client-0.11.0.jar pravega-common-0.11.0.jar pravega-shared-authplugin-0.11.0.jar pravega-shared-controller-api-0.11.0.jar pravega-shared-protocol-0.11.0.jar pravega-shared-security-0.11.0.jar -perfmark-api-0.23.0.jar -proto-google-common-protos-2.0.1.jar protobuf-java-3.18.2.jar protobuf-java-3.19.4.jar protobuf-java-3.21.5.jar protobuf-java-util-3.15.0.jar -protobuf-java-util-3.5.1.jar protobuf-java-util-3.21.5.jar +protobuf-java-util-3.5.1.jar +proto-google-common-protos-2.0.1.jar pulsar-client-2.10.1.jar pulsar-client-admin-api-2.10.1.jar pulsar-client-api-2.10.1.jar -reactor-core-3.4.13.jar reactive-streams-1.0.3.jar +reactor-core-3.4.13.jar redisson-3.17.3.jar reflections-0.9.11.jar +rocketmq-acl-4.9.3.jar +rocketmq-broker-4.9.3.jar +rocketmq-client-4.9.3.jar +rocketmq-common-4.9.3.jar +rocketmq-filter-4.9.3.jar +rocketmq-logging-4.9.3.jar +rocketmq-namesrv-4.9.3.jar +rocketmq-remoting-4.9.3.jar +rocketmq-srvutil-4.9.3.jar +rocketmq-store-4.9.3.jar +rocketmq-test-4.9.3.jar +rocketmq-tools-4.9.3.jar rxjava-3.0.12.jar -simpleclient-0.8.1.jar simpleclient_common-0.8.1.jar simpleclient_httpserver-0.8.1.jar +simpleclient-0.8.1.jar slf4j-api-1.7.30.jar snakeyaml-1.30.jar system-rules-1.16.1.jar +truth-0.30.jar validation-api-1.1.0.Final.jar zipkin-2.23.2.jar zipkin-reporter-2.16.3.jar zipkin-sender-okhttp3-2.16.3.jar -truth-0.30.jar -commons-cli-1.2.jar -commons-collections-3.2.2.jar -commons-validator-1.7.jar -dledger-0.2.3.jar -fastjson-1.2.76.jar -jcommander-1.72.jar -jna-4.2.2.jar -commons-beanutils-1.9.4.jar -commons-digester-2.1.jar \ No newline at end of file