Skip to content

Commit

Permalink
Maintenance: peer forwarder from trace event migration branch (#1239)
Browse files Browse the repository at this point in the history
* MAINT: migrate and adapt to both ExportTraceServiceRequest and event

Signed-off-by: Qi Chen <[email protected]>

* MAINT: merge test cases on event

Signed-off-by: Qi Chen <[email protected]>

* MAINT: unsupported record data type

Signed-off-by: Qi Chen <[email protected]>

* MAINT: TODO

Signed-off-by: Qi Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Apr 1, 2022
1 parent 46b205a commit 9f3e90d
Show file tree
Hide file tree
Showing 4 changed files with 497 additions and 67 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/peer-forwarder/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:otel-proto-common')
testImplementation project(':data-prepper-api').sourceSets.test.output
implementation "io.opentelemetry:opentelemetry-proto:${versionMap.opentelemetryProto}"
implementation "com.linecorp.armeria:armeria:1.9.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@

import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.prepper.AbstractPrepper;
import com.amazon.dataprepper.model.prepper.Prepper;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.trace.Span;
import com.amazon.dataprepper.plugins.otel.codec.OTelProtoCodec;
import com.amazon.dataprepper.plugins.prepper.peerforwarder.discovery.StaticPeerListProvider;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import org.apache.commons.codec.DecoderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
Expand All @@ -33,9 +37,11 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@DataPrepperPlugin(name = "peer_forwarder", pluginType = Prepper.class)
public class PeerForwarder extends AbstractPrepper<Record<ExportTraceServiceRequest>, Record<ExportTraceServiceRequest>> {
@DataPrepperPlugin(name = "peer_forwarder", pluginType = Processor.class)
public class PeerForwarder extends AbstractProcessor<Record<Object>, Record<Object>> {
public static final String REQUESTS = "requests";
public static final String LATENCY = "latency";
public static final String ERRORS = "errors";
Expand All @@ -47,6 +53,7 @@ public class PeerForwarder extends AbstractPrepper<Record<ExportTraceServiceRequ

private static final Logger LOG = LoggerFactory.getLogger(PeerForwarder.class);

private final OTelProtoCodec.OTelProtoEncoder oTelProtoEncoder;
private final HashRing hashRing;
private final PeerClientPool peerClientPool;
private final int maxNumSpansPerRequest;
Expand All @@ -58,10 +65,12 @@ public class PeerForwarder extends AbstractPrepper<Record<ExportTraceServiceRequ
private final ExecutorService executorService;

public PeerForwarder(final PluginSetting pluginSetting,
final OTelProtoCodec.OTelProtoEncoder oTelProtoEncoder,
final PeerClientPool peerClientPool,
final HashRing hashRing,
final int maxNumSpansPerRequest) {
super(pluginSetting);
this.oTelProtoEncoder = oTelProtoEncoder;
this.peerClientPool = peerClientPool;
this.hashRing = hashRing;
this.maxNumSpansPerRequest = maxNumSpansPerRequest;
Expand All @@ -79,19 +88,39 @@ public PeerForwarder(final PluginSetting pluginSetting) {
public PeerForwarder(final PluginSetting pluginSetting, final PeerForwarderConfig peerForwarderConfig) {
this(
pluginSetting,
new OTelProtoCodec.OTelProtoEncoder(),
peerForwarderConfig.getPeerClientPool(),
peerForwarderConfig.getHashRing(),
peerForwarderConfig.getMaxNumSpansPerRequest()
);
}

@Override
public List<Record<ExportTraceServiceRequest>> doExecute(final Collection<Record<ExportTraceServiceRequest>> records) {
public List<Record<Object>> doExecute(final Collection<Record<Object>> records) {
final List<Span> spans = new ArrayList<>();
final List<ExportTraceServiceRequest> exportTraceServiceRequests = new ArrayList<>();
records.forEach(record -> {
final Object recordData = record.getData();
// TODO: remove support for ExportTraceServiceRequest in 2.0
if (recordData instanceof ExportTraceServiceRequest) {
exportTraceServiceRequests.add((ExportTraceServiceRequest) recordData);
} else if (recordData instanceof Span) {
spans.add((Span) recordData);
} else {
throw new RuntimeException("Unsupported record data type: " + recordData.getClass());
}
});
final List<ExportTraceServiceRequest> requestsToProcessLocally = executeExportTraceServiceRequests(exportTraceServiceRequests);
final List<Span> spansToProcessLocally = executeSpans(spans);
return Stream.concat(requestsToProcessLocally.stream(), spansToProcessLocally.stream()).map(Record::new).collect(Collectors.toList());
}

private List<ExportTraceServiceRequest> executeExportTraceServiceRequests(final List<ExportTraceServiceRequest> requests) {
final Map<String, List<ResourceSpans>> groupedRS = new HashMap<>();

// Group ResourceSpans by consistent hashing of traceId
for (final Record<ExportTraceServiceRequest> record : records) {
for (final ResourceSpans rs : record.getData().getResourceSpansList()) {
for (final ExportTraceServiceRequest request : requests) {
for (final ResourceSpans rs : request.getResourceSpansList()) {
final List<Map.Entry<String, ResourceSpans>> rsBatch = PeerForwarderUtils.splitByTrace(rs);
for (final Map.Entry<String, ResourceSpans> entry : rsBatch) {
final String traceId = entry.getKey();
Expand All @@ -102,8 +131,8 @@ public List<Record<ExportTraceServiceRequest>> doExecute(final Collection<Record
}
}

final List<Record<ExportTraceServiceRequest>> recordsToProcessLocally = new ArrayList<>();
final List<CompletableFuture<Record>> forwardedRequestFutures = new ArrayList<>();
final List<ExportTraceServiceRequest> requestsToProcessLocally = new ArrayList<>();
final List<CompletableFuture<ExportTraceServiceRequest>> forwardedRequestFutures = new ArrayList<>();

for (final Map.Entry<String, List<ResourceSpans>> entry : groupedRS.entrySet()) {
final TraceServiceGrpc.TraceServiceBlockingStub client = getClient(entry.getKey());
Expand All @@ -116,7 +145,7 @@ public List<Record<ExportTraceServiceRequest>> doExecute(final Collection<Record
if (currSpansCount >= maxNumSpansPerRequest) {
final ExportTraceServiceRequest currRequest = currRequestBuilder.build();
if (isLocalClient(client)) {
recordsToProcessLocally.add(new Record<>(currRequest));
requestsToProcessLocally.add(currRequest);
} else {
forwardedRequestFutures.add(processRequest(client, currRequest));
}
Expand All @@ -130,33 +159,98 @@ public List<Record<ExportTraceServiceRequest>> doExecute(final Collection<Record
if (currSpansCount > 0) {
final ExportTraceServiceRequest currRequest = currRequestBuilder.build();
if (client == null) {
recordsToProcessLocally.add(new Record<>(currRequest));
requestsToProcessLocally.add(currRequest);
} else {
forwardedRequestFutures.add(processRequest(client, currRequest));
}
}
}

for (final CompletableFuture<Record> future : forwardedRequestFutures) {
for (final CompletableFuture<ExportTraceServiceRequest> future : forwardedRequestFutures) {
try {
final Record record = future.get();
if (record != null) {
recordsToProcessLocally.add(record);
final ExportTraceServiceRequest request = future.get();
if (request != null) {
requestsToProcessLocally.add(request);
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Problem with asynchronous peer forwarding", e);
}
}

return recordsToProcessLocally;
return requestsToProcessLocally;
}

private List<Span> executeSpans(final List<Span> spans) {
final Map<String, List<Span>> spansByTraceId = PeerForwarderUtils.splitByTrace(spans);
// Group ResourceSpans by consistent hashing of traceId
final Map<String, List<Span>> spansByEndPoint = new HashMap<>();
for (final Map.Entry<String, List<Span>> entry: spansByTraceId.entrySet()) {
final String traceId = entry.getKey();
final String dataPrepperIp = hashRing.getServerIp(traceId).orElse(StaticPeerListProvider.LOCAL_ENDPOINT);
spansByEndPoint.computeIfAbsent(dataPrepperIp, x -> new ArrayList<>()).addAll(entry.getValue());
}

final List<Span> spansToProcessLocally = new ArrayList<>();
final Map<CompletableFuture<ExportTraceServiceRequest>, List<Span>> forwardedRequestFuturesToSpans = new HashMap<>();

for (final Map.Entry<String, List<Span>> entry : spansByEndPoint.entrySet()) {
final TraceServiceGrpc.TraceServiceBlockingStub client = getClient(entry.getKey());
if (isLocalClient(client)) {
spansToProcessLocally.addAll(entry.getValue());
continue;
}

// Create ExportTraceRequest for storing single batch of spans
ExportTraceServiceRequest.Builder currRequestBuilder = ExportTraceServiceRequest.newBuilder();
List<Span> currBatchSpans = new ArrayList<>();
for (final Span span : entry.getValue()) {
try {
final ResourceSpans rs = oTelProtoEncoder.convertToResourceSpans(span);
currRequestBuilder.addResourceSpans(rs);
currBatchSpans.add(span);
} catch (UnsupportedEncodingException | DecoderException e) {
LOG.error("failed to encode span with spanId: {} into opentelemetry-protobuf, span will be processed locally.",
span.getSpanId(), e);
spansToProcessLocally.add(span);
}
if (currBatchSpans.size() >= maxNumSpansPerRequest) {
final ExportTraceServiceRequest currRequest = currRequestBuilder.build();
forwardedRequestFuturesToSpans.put(processRequest(client, currRequest), currBatchSpans);
currRequestBuilder = ExportTraceServiceRequest.newBuilder();
currBatchSpans = new ArrayList<>();
}
}
// Dealing with the last batch request
if (currBatchSpans.size() > 0) {
final ExportTraceServiceRequest currRequest = currRequestBuilder.build();
forwardedRequestFuturesToSpans.put(processRequest(client, currRequest), currBatchSpans);
}
}

for (final Map.Entry<CompletableFuture<ExportTraceServiceRequest>, List<Span>> entry : forwardedRequestFuturesToSpans.entrySet()) {
try {
final CompletableFuture<ExportTraceServiceRequest> future = entry.getKey();
final ExportTraceServiceRequest request = future.get();
if (request != null) {
final List<Span> spansFailedToForward = entry.getValue();
spansToProcessLocally.addAll(spansFailedToForward);
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Problem with asynchronous peer forwarding, current batch of spans will be processed locally.", e);
final List<Span> spansFailedToForward = entry.getValue();
spansToProcessLocally.addAll(spansFailedToForward);
}
}

return spansToProcessLocally;
}

/**
* Asynchronously forwards a request to the peer address. Returns a record with an empty payload if
* the request succeeds, otherwise the payload will contain the failed ExportTraceServiceRequest to
* be processed locally.
*/
private CompletableFuture<Record> processRequest(final TraceServiceGrpc.TraceServiceBlockingStub client,
private CompletableFuture<ExportTraceServiceRequest> processRequest(final TraceServiceGrpc.TraceServiceBlockingStub client,
final ExportTraceServiceRequest request) {
final String peerIp = client.getChannel().authority();
final Timer forwardRequestTimer = forwardRequestTimers.computeIfAbsent(
Expand All @@ -166,7 +260,7 @@ private CompletableFuture<Record> processRequest(final TraceServiceGrpc.TraceSer
final Counter forwardRequestErrorCounter = forwardRequestErrorCounters.computeIfAbsent(
peerIp, ip -> pluginMetrics.counterWithTags(ERRORS, DESTINATION, ip));

final CompletableFuture<Record> callFuture = CompletableFuture.supplyAsync(() ->
final CompletableFuture<ExportTraceServiceRequest> callFuture = CompletableFuture.supplyAsync(() ->
{
forwardedRequestCounter.increment();
try {
Expand All @@ -175,7 +269,7 @@ private CompletableFuture<Record> processRequest(final TraceServiceGrpc.TraceSer
} catch (Exception e) {
LOG.error("Failed to forward request to address: {}", peerIp, e);
forwardRequestErrorCounter.increment();
return new Record<>(request);
return request;
}
}, executorService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

package com.amazon.dataprepper.plugins.prepper.peerforwarder;

import com.amazon.dataprepper.model.trace.Span;
import com.linecorp.armeria.internal.shaded.bouncycastle.util.encoders.Hex;
import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.Span;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public final class PeerForwarderUtils {
public static int getResourceSpansSize(final ResourceSpans rs) {
Expand All @@ -25,7 +27,7 @@ public static List<Map.Entry<String, ResourceSpans>> splitByTrace(final Resource
final List<Map.Entry<String, ResourceSpans>> result = new ArrayList<>();
for (final InstrumentationLibrarySpans ils: rs.getInstrumentationLibrarySpansList()) {
final Map<String, ResourceSpans.Builder> batches = new HashMap<>();
for (final Span span: ils.getSpansList()) {
for (final io.opentelemetry.proto.trace.v1.Span span: ils.getSpansList()) {
final String sTraceId = Hex.toHexString(span.getTraceId().toByteArray());
if (!batches.containsKey(sTraceId)) {
final ResourceSpans.Builder newRSBuilder = ResourceSpans.newBuilder()
Expand All @@ -43,4 +45,8 @@ public static List<Map.Entry<String, ResourceSpans>> splitByTrace(final Resource

return result;
}

public static Map<String, List<Span>> splitByTrace(final Collection<Span> spans) {
return spans.stream().collect(Collectors.groupingBy(Span::getTraceId));
}
}
Loading

0 comments on commit 9f3e90d

Please sign in to comment.