diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java index 73878fb65e..5a2493a371 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java +++ b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java @@ -26,26 +26,31 @@ public class PatternEntry { - private final String patternPath; + private String patternName; + + private String patternPath; private final List conditionList = new ArrayList<>(); - public PatternEntry(final String patternPath) { + public PatternEntry(final String patternName, final String patternPath) { + this.patternName = patternName; this.patternPath = patternPath; } - public void addRuleCondition(Condition patternCondition) { + public void addCondition(Condition patternCondition) { this.conditionList.add(patternCondition); } public String getPatternName() { - return "123"; + return patternName; } public String getPatternPath() { return patternPath; } + // default filter type is OR + // todo: extend the filter type with AND public boolean match(JsonNode jsonElement) { for (final Condition patternCondition : conditionList) { if (patternCondition.match(jsonElement)) { diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java index 0a2fb01760..19cdcc6c59 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java +++ b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java @@ -74,18 +74,18 @@ public static Pattern build(String jsonStr) { } // iter all requiredField - parseRequiredField("$." + key, value, pattern); + parseRequiredField(key, "$." + key, value, pattern); if (value.isEmpty()) { // Empty array throw new JsonException("INVALID_PATTERN_VALUE "); } - PatternEntry patternEntry = new PatternEntry("$." + key); + PatternEntry patternEntry = new PatternEntry(key, "$." + key); for (final JsonNode objNode : value) { // { // "suffix":".jpg" // } Condition condition = parseCondition(objNode); - patternEntry.addRuleCondition(condition); + patternEntry.addCondition(condition); } pattern.addRequiredFieldList(patternEntry); @@ -112,12 +112,12 @@ private static void parseDataField(JsonNode jsonNode, Pattern pattern) { String key = entry.getKey(); // [{"anything-but":"initializing"}] [{"anything-but":123}]} JsonNode value = entry.getValue(); - PatternEntry patternEntry = new PatternEntry(elepath + "." + key); + PatternEntry patternEntry = new PatternEntry(key, elepath + "." + key); if (!value.isObject()) { if (value.isArray()) { for (JsonNode node11 : value) { // {"anything-but":"initializing"} - patternEntry.addRuleCondition(parseCondition(node11)); + patternEntry.addCondition(parseCondition(node11)); } } pattern.addDataList(patternEntry); @@ -146,15 +146,15 @@ private static Condition parseCondition(JsonNode jsonNode) { return null; } - private static void parseRequiredField(String path, JsonNode jsonNode, Pattern pattern) { + private static void parseRequiredField(String patternName, String patternPath, JsonNode jsonNode, Pattern pattern) { if (jsonNode.isEmpty()) { // Empty array throw new JsonException("INVALID_PATTERN_VALUE "); } - PatternEntry patternEntry = new PatternEntry(path); + PatternEntry patternEntry = new PatternEntry(patternName, patternPath); for (final JsonNode objNode : jsonNode) { Condition condition = parseCondition(objNode); - patternEntry.addRuleCondition(condition); + patternEntry.addCondition(condition); } pattern.addRequiredFieldList(patternEntry); diff --git a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java new file mode 100644 index 0000000000..024e003503 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.api.meta; + +/** + * MetaServiceListener + */ +public interface MetaServiceListener { + + void onChange(K key, V value); +} diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 54f9533697..02552c3ff8 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation "commons-io:commons-io" implementation project(":eventmesh-common") + implementation project(":eventmesh-filter") implementation project(":eventmesh-spi") implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") implementation project(":eventmesh-storage-plugin:eventmesh-storage-standalone") @@ -42,6 +43,7 @@ dependencies { implementation project(":eventmesh-security-plugin:eventmesh-security-acl") implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic") implementation project(":eventmesh-security-plugin:eventmesh-security-auth-token") + implementation project(":eventmesh-transformer") implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-meta:eventmesh-meta-nacos") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index 2cfe2d28d6..68b42eff71 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -82,6 +82,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { private final EventMeshHTTPConfiguration eventMeshHttpConfiguration; private final MetaStorage metaStorage; + private final Acl acl; private final EventBus eventBus = new EventBus(); @@ -89,6 +90,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { private ProducerManager producerManager; private SubscriptionManager subscriptionManager; + private FilterEngine filterEngine; + private HttpRetryer httpRetryer; private transient RateLimiter msgRateLimiter; @@ -132,6 +135,9 @@ public void init() throws Exception { producerManager = new ProducerManager(this); producerManager.init(); + filterEngine = new FilterEngine(metaStorage); + filterEngine.init(); + super.setHandlerService(new HandlerService()); super.getHandlerService().setMetrics(this.getMetrics()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java new file mode 100644 index 0000000000..5ffd0213e9 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.runtime.meta.MetaStorage; + +import java.util.HashMap; +import java.util.Map; + +public class FilterEngine { + + /** + * key:topic-group + **/ + public Map filterPatternMap = new HashMap<>(); + + private MetaStorage metaStorage; + + public FilterEngine(MetaStorage metaStorage) { + this.metaStorage = metaStorage; + } + + public void init() { + + } + + //todo: 根据订阅信息监听config + //todo: 根据获取的config更新filtermap +}