diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java index dc51f084a2..cfe1ff14dc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java @@ -92,6 +92,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer { private FilterEngine filterEngine; + private TransformerEngine transformerEngine; + private HttpRetryer httpRetryer; private transient RateLimiter msgRateLimiter; @@ -137,6 +139,8 @@ public void init() throws Exception { filterEngine = new FilterEngine(metaStorage, producerManager, consumerManager); + transformerEngine = new TransformerEngine(metaStorage, producerManager, consumerManager); + super.setHandlerService(new HandlerService()); super.getHandlerService().setMetrics(this.getMetrics()); @@ -180,6 +184,8 @@ public void shutdown() throws Exception { filterEngine.shutdown(); + transformerEngine.shutdown(); + consumerManager.shutdown(); httpClientPool.shutdown(); @@ -354,6 +360,10 @@ public FilterEngine getFilterEngine() { return filterEngine; } + public TransformerEngine getTransformerEngine() { + return transformerEngine; + } + public MetaStorage getMetaStorage() { return metaStorage; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java new file mode 100644 index 0000000000..245ddf4889 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.boot; + +import org.apache.eventmesh.api.meta.MetaServiceListener; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.common.utils.LogUtils; +import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager; +import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; +import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; +import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager; +import org.apache.eventmesh.runtime.meta.MetaStorage; +import org.apache.eventmesh.transformer.Transformer; +import org.apache.eventmesh.transformer.TransformerBuilder; +import org.apache.eventmesh.transformer.TransformerParam; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TransformerEngine { + + /** + * key:group-topic + **/ + private final Map transformerMap = new HashMap<>(); + + private final String transformerPrefix = "transformer-"; + + private final MetaStorage metaStorage; + + private MetaServiceListener metaServiceListener; + + private final ProducerManager producerManager; + + private final ConsumerManager consumerManager; + + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + public TransformerEngine(MetaStorage metaStorage, ProducerManager producerManager, ConsumerManager consumerManager) { + this.metaStorage = metaStorage; + this.producerManager = producerManager; + this.consumerManager = consumerManager; + } + + public void start() { + Map transformerMetaData = metaStorage.getMetaData(transformerPrefix, true); + for (Entry transformerDataEntry : transformerMetaData.entrySet()) { + // transformer-group + String key = transformerDataEntry.getKey(); + // topic-transformerParam list + String value = transformerDataEntry.getValue(); + updateTransformerMap(key, value); + } + metaServiceListener = this::updateTransformerMap; + + // addListeners for producerManager & consumerManager + scheduledExecutorService.scheduleAtFixedRate(() -> { + ConcurrentHashMap producerMap = producerManager.getProducerTable(); + for (String producerGroup : producerMap.keySet()) { + for (String transformerKey : transformerMap.keySet()) { + if (!StringUtils.contains(transformerKey, producerGroup)) { + addTransformerListener(producerGroup); + LogUtils.info(log, "addTransformerListener for producer group: " + producerGroup); + } + } + } + ConcurrentHashMap consumerMap = consumerManager.getClientTable(); + for (String consumerGroup : consumerMap.keySet()) { + for (String transformerKey : transformerMap.keySet()) { + if (!StringUtils.contains(transformerKey, consumerGroup)) { + addTransformerListener(consumerGroup); + LogUtils.info(log, "addTransformerListener for consumer group: " + consumerGroup); + } + } + } + }, 10_000, 5_000, TimeUnit.MILLISECONDS); + } + + private void updateTransformerMap(String key, String value) { + String group = StringUtils.substringAfter(key, transformerPrefix); + + JsonNode transformerJsonNodeArray = JsonUtils.getJsonNode(value); + + if (transformerJsonNodeArray != null) { + for (JsonNode transformerJsonNode : transformerJsonNodeArray) { + String topic = transformerJsonNode.get("topic").asText(); + String transformerParam = transformerJsonNode.get("transformerParam").toString(); + TransformerParam tfp = JsonUtils.parseObject(transformerParam, TransformerParam.class); + Transformer transformer = TransformerBuilder.buildTransformer(tfp); + transformerMap.put(group + "-" + topic, transformer); + } + } + addTransformerListener(group); + } + + public void addTransformerListener(String group) { + String transformerKey = transformerPrefix + group; + try { + metaStorage.getMetaDataWithListener(metaServiceListener, transformerKey); + } catch (Exception e) { + throw new RuntimeException("addTransformerListener exception", e); + } + } + + public void shutdown() { + scheduledExecutorService.shutdown(); + } + + public Transformer getTransformer(String key) { + return transformerMap.get(key); + } + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index 52f740151c..34e4ffcb35 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -45,6 +45,7 @@ import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; +import org.apache.eventmesh.transformer.Transformer; import org.apache.commons.lang3.StringUtils; @@ -161,6 +162,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final final String topic = event.getSubject(); Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic); + Transformer transformer = eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + topic); // validate body if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic) @@ -252,6 +254,14 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final isFiltered = filterPattern.filter(JsonUtils.toJSONString(event)); } + // apply transformer + if (isFiltered && transformer != null) { + String data = transformer.transform(JsonUtils.toJSONString(event)); + event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data)) + .getBytes(StandardCharsets.UTF_8)).build(); + sendMessageContext.setEvent(event); + } + if (isFiltered) { eventMeshProducer.send(sendMessageContext, new SendCallback() { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index bee14c992e..e86cda5501 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -38,6 +38,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.WebhookUtil; +import org.apache.eventmesh.transformer.Transformer; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -53,6 +54,7 @@ import java.io.IOException; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -138,6 +140,20 @@ public void tryHTTPRequest() { return; } } + Transformer transformer = eventMeshHTTPServer.getTransformerEngine() + .getTransformer(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic()); + if (transformer != null) { + try { + String data = transformer.transform(JsonUtils.toJSONString(event)); + event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data)) + .getBytes(StandardCharsets.UTF_8)).build(); + } catch (Exception exception) { + LOGGER.warn("apply transformer to cloudevents error, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", + this.handleMsgContext.getConsumerGroup(), + this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), exception); + return; + } + } handleMsgContext.setEvent(event); super.setEvent(event); diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java index a220fc0abd..a0ebde12d2 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java @@ -72,8 +72,8 @@ public List match(String json) throws JsonProcessingException { List variableList = new ArrayList<>(variablesList.size()); for (Variable element : variablesList) { - if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) { - String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getJsonPath()); + if (JsonPathUtils.isValidAndDefinite(element.getValue())) { + String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getValue()); Variable variable = new Variable(element.getName(), res); variableList.add(variable); } else { diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java index 5d2358dfe6..19c3b5cec3 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java @@ -34,8 +34,8 @@ public Template(String template) { public String substitute(List variables) throws TransformException { Map valuesMap = variables.stream() - .filter(variable -> variable.getJsonPath() != null) - .collect(Collectors.toMap(Variable::getName, Variable::getJsonPath)); + .filter(variable -> variable.getValue() != null) + .collect(Collectors.toMap(Variable::getName, Variable::getValue)); StringSubstitutor sub = new StringSubstitutor(valuesMap); return sub.replace(template); diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java index 2a4db5fc3d..bc9907ff48 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java @@ -37,8 +37,7 @@ public String transform(String json) throws JsonProcessingException { // 1: get variable match results List variableList = jsonPathParser.match(json); // 2: use results replace template - String res = template.substitute(variableList); - return res; + return template.substitute(variableList); } } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java index 05ad583550..6e007b34b3 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java @@ -19,41 +19,20 @@ public class TransformerBuilder { - public static final class Builder { - - private final TransformerType transformerType; - private String template; - private String content; - - public Builder(TransformerType transformerType) { - this.transformerType = transformerType; - } - - public Builder setContent(String content) { - this.content = content; - return this; - } - - public Builder setTemplate(String template) { - this.template = template; - return this; - } - - public Transformer build() { - switch (this.transformerType) { - case CONSTANT: - return buildConstantTransformer(this.content); - case ORIGINAL: - return buildOriginalTransformer(); - case TEMPLATE: - return buildTemplateTransFormer(this.content, this.template); - default: - throw new TransformException("invalid config"); - } + public static Transformer buildTransformer(TransformerParam transformerParam) { + switch (transformerParam.getTransformerType()) { + case ORIGINAL: + return buildOriginalTransformer(); + case CONSTANT: + return buildConstantTransformer(transformerParam.getValue()); + case TEMPLATE: + return buildTemplateTransFormer(transformerParam.getValue(), transformerParam.getTemplate()); + default: + throw new TransformException("invalid config"); } - } + public static Transformer buildTemplateTransFormer(String jsonContent, String template) { JsonPathParser jsonPathParser = new JsonPathParser(jsonContent); Template templateEntry = new Template(template); diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java new file mode 100644 index 0000000000..d747d7be4c --- /dev/null +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.transformer; + +public class TransformerParam { + + private TransformerType transformerType; + private String value; + private String template; + + public TransformerParam() { + } + + public TransformerParam(TransformerType transformerType, String value, String template) { + this.transformerType = transformerType; + this.value = value; + this.template = template; + } + + public TransformerParam(TransformerType transformerType, String value) { + this(transformerType, value, null); + } + + public TransformerType getTransformerType() { + return transformerType; + } + + public void setTransformerType(TransformerType transformerType) { + this.transformerType = transformerType; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getTemplate() { + return template; + } + + public void setTemplate(String template) { + this.template = template; + } + +} diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java index 5ca2142c34..bf344afaed 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java @@ -17,8 +17,49 @@ package org.apache.eventmesh.transformer; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + public enum TransformerType { - ORIGINAL, - CONSTANT, - TEMPLATE + ORIGINAL(1, "original"), + CONSTANT(2, "constant"), + TEMPLATE(3, "template"); + + private int code; + + private String type; + + TransformerType(int code, String type) { + this.code = code; + this.type = type; + } + + @JsonCreator + public static TransformerType getItem(String type) { + for(TransformerType transformerType : values()) { + if (Objects.equals(transformerType.getType(), type)) { + return transformerType; + } + } + return null; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + @JsonValue + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java index 6b3cc4da47..c9259d335c 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java @@ -21,11 +21,11 @@ public class Variable { private String name; - private String jsonPath; + private String value; - public Variable(String name, String jsonPath) { + public Variable(String name, String value) { this.name = name; - this.jsonPath = jsonPath; + this.value = value; } public String getName() { @@ -36,11 +36,11 @@ public void setName(String name) { this.name = name; } - public String getJsonPath() { - return jsonPath; + public String getValue() { + return value; } - public void setJsonPath(String jsonPath) { - this.jsonPath = jsonPath; + public void setValue(String value) { + this.value = value; } } diff --git a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java index 2127e50edc..a55cde0baf 100644 --- a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java +++ b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java @@ -42,8 +42,10 @@ public class TransformTest { @Test public void testOriginalTransformer() throws JsonProcessingException { + TransformerParam transformerParam = new TransformerParam(); + transformerParam.setTransformerType(TransformerType.ORIGINAL); - Transformer transformer = new TransformerBuilder.Builder(TransformerType.ORIGINAL).build(); + Transformer transformer = TransformerBuilder.buildTransformer(transformerParam); String output = transformer.transform(EVENT); Assertions.assertEquals(EVENT, output); @@ -54,7 +56,8 @@ public void testOriginalTransformer() throws JsonProcessingException { @Test public void testConstantTransformer() throws JsonProcessingException { - Transformer transformer = new TransformerBuilder.Builder(TransformerType.CONSTANT).setContent("constant test").build(); + TransformerParam transformerParam = new TransformerParam(TransformerType.CONSTANT, "constant test"); + Transformer transformer = TransformerBuilder.buildTransformer(transformerParam); String output = transformer.transform(EVENT); Assertions.assertEquals("constant test", output); @@ -72,9 +75,9 @@ public void testTemplateTransFormerWithStringValue() throws JsonProcessingExcept String output = transform.transform(EVENT); Assertions.assertEquals("Transformers test:data name is test-transformer", output); - Transformer transformer1 = new TransformerBuilder.Builder(TransformerType.TEMPLATE) - .setContent(content) - .setTemplate(template).build(); + TransformerParam transformerParam = new TransformerParam(TransformerType.TEMPLATE, content, template); + + Transformer transformer1 = TransformerBuilder.buildTransformer(transformerParam); String output1 = transformer1.transform(EVENT); Assertions.assertEquals("Transformers test:data name is test-transformer", output1);