Skip to content

Commit

Permalink
add filters
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Nov 8, 2023
1 parent 9f70e1a commit d077d0f
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,31 @@

public class PatternEntry {

private final String patternPath;
private String patternName;

private String patternPath;

private final List<Condition> conditionList = new ArrayList<>();

public PatternEntry(final String patternPath) {
public PatternEntry(final String patternName, final String patternPath) {
this.patternName = patternName;
this.patternPath = patternPath;
}

public void addRuleCondition(Condition patternCondition) {
public void addCondition(Condition patternCondition) {
this.conditionList.add(patternCondition);
}

public String getPatternName() {
return "123";
return patternName;
}

public String getPatternPath() {
return patternPath;
}

// default filter type is OR
// todo: extend the filter type with AND
public boolean match(JsonNode jsonElement) {
for (final Condition patternCondition : conditionList) {
if (patternCondition.match(jsonElement)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,18 @@ public static Pattern build(String jsonStr) {
}

// iter all requiredField
parseRequiredField("$." + key, value, pattern);
parseRequiredField(key, "$." + key, value, pattern);
if (value.isEmpty()) {
// Empty array
throw new JsonException("INVALID_PATTERN_VALUE ");
}
PatternEntry patternEntry = new PatternEntry("$." + key);
PatternEntry patternEntry = new PatternEntry(key, "$." + key);
for (final JsonNode objNode : value) {
// {
// "suffix":".jpg"
// }
Condition condition = parseCondition(objNode);
patternEntry.addRuleCondition(condition);
patternEntry.addCondition(condition);
}

pattern.addRequiredFieldList(patternEntry);
Expand All @@ -112,12 +112,12 @@ private static void parseDataField(JsonNode jsonNode, Pattern pattern) {
String key = entry.getKey();
// [{"anything-but":"initializing"}] [{"anything-but":123}]}
JsonNode value = entry.getValue();
PatternEntry patternEntry = new PatternEntry(elepath + "." + key);
PatternEntry patternEntry = new PatternEntry(key, elepath + "." + key);
if (!value.isObject()) {
if (value.isArray()) {
for (JsonNode node11 : value) {
// {"anything-but":"initializing"}
patternEntry.addRuleCondition(parseCondition(node11));
patternEntry.addCondition(parseCondition(node11));
}
}
pattern.addDataList(patternEntry);
Expand Down Expand Up @@ -146,15 +146,15 @@ private static Condition parseCondition(JsonNode jsonNode) {
return null;
}

private static void parseRequiredField(String path, JsonNode jsonNode, Pattern pattern) {
private static void parseRequiredField(String patternName, String patternPath, JsonNode jsonNode, Pattern pattern) {
if (jsonNode.isEmpty()) {
// Empty array
throw new JsonException("INVALID_PATTERN_VALUE ");
}
PatternEntry patternEntry = new PatternEntry(path);
PatternEntry patternEntry = new PatternEntry(patternName, patternPath);
for (final JsonNode objNode : jsonNode) {
Condition condition = parseCondition(objNode);
patternEntry.addRuleCondition(condition);
patternEntry.addCondition(condition);
}

pattern.addRequiredFieldList(patternEntry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api.meta;

/**
* MetaServiceListener
*/
public interface MetaServiceListener<K, V> {

void onChange(K key, V value);
}
2 changes: 2 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation "commons-io:commons-io"

implementation project(":eventmesh-common")
implementation project(":eventmesh-filter")
implementation project(":eventmesh-spi")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-standalone")
Expand All @@ -42,6 +43,7 @@ dependencies {
implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic")
implementation project(":eventmesh-security-plugin:eventmesh-security-auth-token")
implementation project(":eventmesh-transformer")
implementation project(":eventmesh-meta:eventmesh-meta-api")
implementation project(":eventmesh-meta:eventmesh-meta-nacos")
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;

private final MetaStorage metaStorage;

private final Acl acl;
private final EventBus eventBus = new EventBus();

private ConsumerManager consumerManager;
private ProducerManager producerManager;
private SubscriptionManager subscriptionManager;

private FilterEngine filterEngine;

private HttpRetryer httpRetryer;

private transient RateLimiter msgRateLimiter;
Expand Down Expand Up @@ -132,6 +135,9 @@ public void init() throws Exception {
producerManager = new ProducerManager(this);
producerManager.init();

filterEngine = new FilterEngine(metaStorage);
filterEngine.init();

super.setHandlerService(new HandlerService());
super.getHandlerService().setMetrics(this.getMetrics());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.runtime.meta.MetaStorage;

import java.util.HashMap;
import java.util.Map;

public class FilterEngine {

/**
* key:topic-group
**/
public Map<String, Pattern> filterPatternMap = new HashMap<>();

private MetaStorage metaStorage;

public FilterEngine(MetaStorage metaStorage) {
this.metaStorage = metaStorage;
}

public void init() {

}

//todo: 根据订阅信息监听config
//todo: 根据获取的config更新filtermap
}

0 comments on commit d077d0f

Please sign in to comment.