Skip to content

Commit

Permalink
[ISSUE apache#4621] Implement TransformerEngine for EventMesh Transfo…
Browse files Browse the repository at this point in the history
…rmer
  • Loading branch information
xwm1992 committed Dec 7, 2023
1 parent 6508df8 commit 9b734a3
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {

private FilterEngine filterEngine;

private TransformerEngine transformerEngine;

private HttpRetryer httpRetryer;

private transient RateLimiter msgRateLimiter;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void init() throws Exception {

filterEngine = new FilterEngine(metaStorage, producerManager, consumerManager);

transformerEngine = new TransformerEngine(metaStorage, producerManager, consumerManager);

super.setHandlerService(new HandlerService());
super.getHandlerService().setMetrics(this.getMetrics());

Expand Down Expand Up @@ -180,6 +184,8 @@ public void shutdown() throws Exception {

filterEngine.shutdown();

transformerEngine.shutdown();

consumerManager.shutdown();

httpClientPool.shutdown();
Expand Down Expand Up @@ -354,6 +360,10 @@ public FilterEngine getFilterEngine() {
return filterEngine;
}

public TransformerEngine getTransformerEngine() {
return transformerEngine;
}

public MetaStorage getMetaStorage() {
return metaStorage;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.transformer.Transformer;
import org.apache.eventmesh.transformer.TransformerBuilder;
import org.apache.eventmesh.transformer.TransformerParam;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.JsonNode;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TransformerEngine {

/**
* key:group-topic
**/
private final Map<String, Transformer> transformerMap = new HashMap<>();

private final String transformerPrefix = "transformer-";

private final MetaStorage metaStorage;

private MetaServiceListener metaServiceListener;

private final ProducerManager producerManager;

private final ConsumerManager consumerManager;

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

public TransformerEngine(MetaStorage metaStorage, ProducerManager producerManager, ConsumerManager consumerManager) {
this.metaStorage = metaStorage;
this.producerManager = producerManager;
this.consumerManager = consumerManager;
}

public void start() {
Map<String, String> transformerMetaData = metaStorage.getMetaData(transformerPrefix, true);
for (Entry<String, String> transformerDataEntry : transformerMetaData.entrySet()) {
// transformer-group
String key = transformerDataEntry.getKey();
// topic-transformerParam list
String value = transformerDataEntry.getValue();
updateTransformerMap(key, value);
}
metaServiceListener = this::updateTransformerMap;

// addListeners for producerManager & consumerManager
scheduledExecutorService.scheduleAtFixedRate(() -> {
ConcurrentHashMap<String, EventMeshProducer> producerMap = producerManager.getProducerTable();
for (String producerGroup : producerMap.keySet()) {
for (String transformerKey : transformerMap.keySet()) {
if (!StringUtils.contains(transformerKey, producerGroup)) {
addTransformerListener(producerGroup);
LogUtils.info(log, "addTransformerListener for producer group: " + producerGroup);
}
}
}
ConcurrentHashMap<String, ConsumerGroupManager> consumerMap = consumerManager.getClientTable();
for (String consumerGroup : consumerMap.keySet()) {
for (String transformerKey : transformerMap.keySet()) {
if (!StringUtils.contains(transformerKey, consumerGroup)) {
addTransformerListener(consumerGroup);
LogUtils.info(log, "addTransformerListener for consumer group: " + consumerGroup);
}
}
}
}, 10_000, 5_000, TimeUnit.MILLISECONDS);
}

private void updateTransformerMap(String key, String value) {
String group = StringUtils.substringAfter(key, transformerPrefix);

JsonNode transformerJsonNodeArray = JsonUtils.getJsonNode(value);

if (transformerJsonNodeArray != null) {
for (JsonNode transformerJsonNode : transformerJsonNodeArray) {
String topic = transformerJsonNode.get("topic").asText();
String transformerParam = transformerJsonNode.get("transformerParam").toString();
TransformerParam tfp = JsonUtils.parseObject(transformerParam, TransformerParam.class);
Transformer transformer = TransformerBuilder.buildTransformer(tfp);
transformerMap.put(group + "-" + topic, transformer);
}
}
addTransformerListener(group);
}

public void addTransformerListener(String group) {
String transformerKey = transformerPrefix + group;
try {
metaStorage.getMetaDataWithListener(metaServiceListener, transformerKey);
} catch (Exception e) {
throw new RuntimeException("addTransformerListener exception", e);
}
}

public void shutdown() {
scheduledExecutorService.shutdown();
}

public Transformer getTransformer(String key) {
return transformerMap.get(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.eventmesh.transformer.Transformer;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -161,6 +162,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
final String topic = event.getSubject();

Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic);
Transformer transformer = eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + topic);

// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
Expand Down Expand Up @@ -252,6 +254,14 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
isFiltered = filterPattern.filter(JsonUtils.toJSONString(event));
}

// apply transformer
if (isFiltered && transformer != null) {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
sendMessageContext.setEvent(event);
}

if (isFiltered) {
eventMeshProducer.send(sendMessageContext, new SendCallback() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.eventmesh.transformer.Transformer;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
Expand All @@ -53,6 +54,7 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -138,6 +140,20 @@ public void tryHTTPRequest() {
return;
}
}
Transformer transformer = eventMeshHTTPServer.getTransformerEngine()
.getTransformer(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
if (transformer != null) {
try {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
} catch (Exception exception) {
LOGGER.warn("apply transformer to cloudevents error, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), exception);
return;
}
}
handleMsgContext.setEvent(event);
super.setEvent(event);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public List<Variable> match(String json) throws JsonProcessingException {

List<Variable> variableList = new ArrayList<>(variablesList.size());
for (Variable element : variablesList) {
if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) {
String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getJsonPath());
if (JsonPathUtils.isValidAndDefinite(element.getValue())) {
String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getValue());
Variable variable = new Variable(element.getName(), res);
variableList.add(variable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public Template(String template) {
public String substitute(List<Variable> variables) throws TransformException {

Map<String, String> valuesMap = variables.stream()
.filter(variable -> variable.getJsonPath() != null)
.collect(Collectors.toMap(Variable::getName, Variable::getJsonPath));
.filter(variable -> variable.getValue() != null)
.collect(Collectors.toMap(Variable::getName, Variable::getValue));
StringSubstitutor sub = new StringSubstitutor(valuesMap);

return sub.replace(template);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public String transform(String json) throws JsonProcessingException {
// 1: get variable match results
List<Variable> variableList = jsonPathParser.match(json);
// 2: use results replace template
String res = template.substitute(variableList);
return res;
return template.substitute(variableList);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,20 @@

public class TransformerBuilder {

public static final class Builder {

private final TransformerType transformerType;
private String template;
private String content;

public Builder(TransformerType transformerType) {
this.transformerType = transformerType;
}

public Builder setContent(String content) {
this.content = content;
return this;
}

public Builder setTemplate(String template) {
this.template = template;
return this;
}

public Transformer build() {
switch (this.transformerType) {
case CONSTANT:
return buildConstantTransformer(this.content);
case ORIGINAL:
return buildOriginalTransformer();
case TEMPLATE:
return buildTemplateTransFormer(this.content, this.template);
default:
throw new TransformException("invalid config");
}
public static Transformer buildTransformer(TransformerParam transformerParam) {
switch (transformerParam.getTransformerType()) {
case ORIGINAL:
return buildOriginalTransformer();
case CONSTANT:
return buildConstantTransformer(transformerParam.getValue());
case TEMPLATE:
return buildTemplateTransFormer(transformerParam.getValue(), transformerParam.getTemplate());
default:
throw new TransformException("invalid config");
}

}


public static Transformer buildTemplateTransFormer(String jsonContent, String template) {
JsonPathParser jsonPathParser = new JsonPathParser(jsonContent);
Template templateEntry = new Template(template);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.transformer;

public class TransformerParam {

private TransformerType transformerType;
private String value;
private String template;

public TransformerParam() {
}

public TransformerParam(TransformerType transformerType, String value, String template) {
this.transformerType = transformerType;
this.value = value;
this.template = template;
}

public TransformerParam(TransformerType transformerType, String value) {
this(transformerType, value, null);
}

public TransformerType getTransformerType() {
return transformerType;
}

public void setTransformerType(TransformerType transformerType) {
this.transformerType = transformerType;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public String getTemplate() {
return template;
}

public void setTemplate(String template) {
this.template = template;
}

}
Loading

0 comments on commit 9b734a3

Please sign in to comment.