Skip to content

Commit

Permalink
[ISSUE #696] Add trace plugin (#749)
Browse files Browse the repository at this point in the history
* add docs

* add trace plugin

* fix ConfigurationWrapperTest error

* fix checkstyle

* fix checkstyle
  • Loading branch information
Roc-00 authored Feb 24, 2022
1 parent 9e42e65 commit e56e4db
Show file tree
Hide file tree
Showing 25 changed files with 643 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ public class CommonConfiguration {
public String eventMeshRegistryPluginType = "namesrv";

public List<String> eventMeshMetricsPluginType;
public String eventMeshTraceExporterType = "Log";
public int eventMeshTraceExporterMaxExportSize = 512;
public int eventMeshTraceExporterMaxQueueSize = 2048;
public int eventMeshTraceExporterExportTimeout = 30;
public int eventMeshTraceExporterExportInterval = 5;
public String eventMeshTraceExportZipkinIp = "localhost";
public int eventMeshTraceExportZipkinPort = 9411;
public String eventMeshTracePluginType;

public String namesrvAddr = "";
public Integer eventMeshRegisterIntervalInMills = 10 * 1000;
Expand Down Expand Up @@ -126,61 +120,19 @@ public void init() {
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshRegistryPluginType),
String.format("%s error", ConfKeys.KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE));

String eventMeshTraceExporterTypeStr =
configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_TRACE_EXPORTER_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTraceExporterTypeStr),
String.format("%s error", ConfKeys.KEYS_ENENTMESH_TRACE_EXPORTER_TYPE));
eventMeshTraceExporterType =
StringUtils.deleteWhitespace(eventMeshTraceExporterTypeStr);

String eventMeshTraceExporterMaxExportSizeStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_MAX_EXPORT_SIZE);
if (StringUtils.isNotEmpty(eventMeshTraceExporterMaxExportSizeStr)) {
eventMeshTraceExporterMaxExportSize =
Integer.parseInt(StringUtils.deleteWhitespace(eventMeshTraceExporterMaxExportSizeStr));
}

String eventMeshTraceExporterMaxQueueSizeStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_MAX_QUEUE_SIZE);
if (StringUtils.isNotEmpty(eventMeshTraceExporterMaxQueueSizeStr)) {
eventMeshTraceExporterMaxQueueSize =
Integer.parseInt(StringUtils.deleteWhitespace(eventMeshTraceExporterMaxQueueSizeStr));
}

String eventMeshTraceExporterExportTimeoutStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_TIMEOUT);
if (StringUtils.isNotEmpty(eventMeshTraceExporterExportTimeoutStr)) {
eventMeshTraceExporterExportTimeout =
Integer.parseInt(StringUtils.deleteWhitespace(eventMeshTraceExporterExportTimeoutStr));
}

String eventMeshTraceExporterExportIntervalStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_INTERVAL);
if (StringUtils.isNotEmpty(eventMeshTraceExporterExportIntervalStr)) {
eventMeshTraceExporterExportInterval =
Integer.parseInt(StringUtils.deleteWhitespace(eventMeshTraceExporterExportIntervalStr));
}

String eventMeshTraceExportZipkinIpStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTraceExportZipkinIpStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP));
eventMeshTraceExportZipkinIp = StringUtils.deleteWhitespace(eventMeshTraceExportZipkinIpStr);

String eventMeshTraceExportZipkinPortStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_PORT);
if (StringUtils.isNotEmpty(eventMeshTraceExportZipkinPortStr)) {
eventMeshTraceExportZipkinPort =
Integer.parseInt(StringUtils.deleteWhitespace(eventMeshTraceExportZipkinPortStr));
}

String metricsPluginType = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_METRICS_PLUGIN_TYPE);
if (StringUtils.isNotEmpty(metricsPluginType)) {
eventMeshMetricsPluginType = Arrays.stream(metricsPluginType.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.collect(Collectors.toList());
}

String eventMeshTracePluginTypeStr =
configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_TRACE_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshTracePluginTypeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_TRACE_PLUGIN_TYPE));
eventMeshTracePluginType = StringUtils.deleteWhitespace(eventMeshTracePluginTypeStr);
}
}

Expand Down Expand Up @@ -213,20 +165,8 @@ static class ConfKeys {

public static String KEYS_ENENTMESH_REGISTRY_PLUGIN_TYPE = "eventMesh.registry.plugin.type";

public static String KEYS_ENENTMESH_TRACE_EXPORTER_TYPE = "eventmesh.trace.exporter.type";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_MAX_EXPORT_SIZE = "eventmesh.trace.exporter.max.export.size";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_MAX_QUEUE_SIZE = "eventmesh.trace.exporter.max.queue.size";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_TIMEOUT = "eventmesh.trace.exporter.export.timeout";

public static String KEYS_EVENTMESH_TRACE_EXPORTER_EXPORT_INTERVAL = "eventmesh.trace.exporter.export.interval";

public static String KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_IP = "eventmesh.trace.export.zipkin.ip";

public static String KEYS_EVENTMESH_TRACE_EXPORT_ZIPKIN_PORT = "eventmesh.trace.export.zipkin.port";

public static String KEYS_EVENTMESH_METRICS_PLUGIN_TYPE = "eventmesh.metrics.plugin";

public static String KEYS_EVENTMESH_TRACE_PLUGIN_TYPE = "eventmesh.trace.plugin";
}
}
3 changes: 1 addition & 2 deletions eventmesh-common/src/test/resources/configuration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ eventMesh.server.hostIp=value6
eventMesh.connector.plugin.type=rocketmq
eventMesh.security.plugin.type=acl
eventMesh.registry.plugin.type=namesrv
eventmesh.trace.export.zipkin.ip=localhost
eventmesh.trace.exporter.type=Zipkin
eventmesh.trace.plugin=zipkin
3 changes: 3 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ dependencies {
implementation project(":eventmesh-metrics-plugin:eventmesh-metrics-api")
implementation project(":eventmesh-metrics-plugin:eventmesh-metrics-opentelemetry")

implementation project(":eventmesh-trace-plugin:eventmesh-trace-api")
implementation project(":eventmesh-trace-plugin:eventmesh-trace-zipkin")

testImplementation "org.mockito:mockito-core"
testImplementation "org.powermock:powermock-module-junit4"
testImplementation "org.powermock:powermock-api-mockito2"
Expand Down
17 changes: 2 additions & 15 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,5 @@ eventMesh.registry.plugin.type=namesrv
# metrics plugin, if you have multiple plugin, you can use ',' to split
eventmesh.metrics.plugin=opentelemetry

#trace exporter
eventmesh.trace.exporter.type=Log

#set the maximum batch size to use
eventmesh.trace.exporter.max.export.size=512
#set the queue size. This must be >= the export batch size
eventmesh.trace.exporter.max.queue.size=2048
#set the max amount of time an export can run before getting(TimeUnit=SECONDS)
eventmesh.trace.exporter.export.timeout=30
#set time between two different exports(TimeUnit=SECONDS)
eventmesh.trace.exporter.export.interval=5

#zipkin
eventmesh.trace.export.zipkin.ip=localhost
eventmesh.trace.export.zipkin.port=9411
# trace plugin
eventmesh.trace.plugin=zipkin
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.trace.AttributeKeys;
import org.apache.eventmesh.runtime.trace.OpenTelemetryTraceFactory;
import org.apache.eventmesh.runtime.trace.SpanKey;
import org.apache.eventmesh.runtime.util.RemotingHelper;

Expand Down Expand Up @@ -111,12 +110,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {

private boolean useTLS;

private Boolean useTrace = true; //Determine whether trace is enabled
public Boolean useTrace = false; //Determine whether trace is enabled

public TextMapPropagator textMapPropagator;

public OpenTelemetryTraceFactory openTelemetryTraceFactory;

public Tracer tracer;

public ThreadPoolExecutor asyncContextCompleteHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.trace.OpenTelemetryTraceFactory;
import org.apache.eventmesh.trace.api.TracePluginFactory;
import org.apache.eventmesh.trace.api.TraceService;

import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Optional;
Expand All @@ -61,6 +64,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {

private EventMeshHTTPConfiguration eventMeshHttpConfiguration;

private TraceService traceService;

public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -216,9 +221,16 @@ public void init() throws Exception {

registerHTTPRequestProcessor();

super.openTelemetryTraceFactory = new OpenTelemetryTraceFactory(eventMeshHttpConfiguration);
super.tracer = openTelemetryTraceFactory.getTracer(this.getClass().toString());
super.textMapPropagator = openTelemetryTraceFactory.getTextMapPropagator();
//get the trace-plugin
if (StringUtils.isNotEmpty(eventMeshHttpConfiguration.eventMeshTracePluginType)) {

traceService =
TracePluginFactory.getTraceService(eventMeshHttpConfiguration.eventMeshTracePluginType);
traceService.init();
super.tracer = traceService.getTracer(super.getClass().toString());
super.textMapPropagator = traceService.getTextMapPropagator();
super.useTrace = true;
}

logger.info("--------------------------EventMeshHTTPServer inited");
}
Expand All @@ -240,6 +252,10 @@ public void shutdown() throws Exception {

metrics.shutdown();

if (traceService != null) {
traceService.shutdown();
}

consumerManager.shutdown();

shutdownThreadPool();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.exporter;
package org.apache.eventmesh.runtime.trace;

import java.util.Collection;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum EventMeshExtensionType {
SECURITY("security"),
PROTOCOL("protocol"),
METRICS("metrics"),
TRACE("trace"),
;

private final String extensionTypeName;
Expand Down
Loading

0 comments on commit e56e4db

Please sign in to comment.