Skip to content

Commit

Permalink
Implement support for infinite tracing payload compression and batchi…
Browse files Browse the repository at this point in the history
…ng. Payloads being sent to trace observers will be compressed with `gzip` compression by default and will attempt to send batches of up to 100 spans by default. This replaces the existing default of no compression and no batching. The data is still streaming over HTTP/2 using gRPC but utilizing compression and batching to significantly reduce payload sizes and bytes over the wire, resulting in lower network transfer costs.
  • Loading branch information
towest-nr committed Feb 2, 2023
1 parent 193ee21 commit 302e103
Show file tree
Hide file tree
Showing 18 changed files with 678 additions and 65 deletions.
1 change: 1 addition & 0 deletions infinite-tracing/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation(project(":newrelic-api"))

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.6.2")
testImplementation("org.mockito:mockito-junit-jupiter:3.3.3")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
}
Expand Down
40 changes: 29 additions & 11 deletions infinite-tracing/src/main/java/com/newrelic/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ChannelManager {
@GuardedBy("lock") private CountDownLatch backoffLatch;
@GuardedBy("lock") private ManagedChannel managedChannel;
@GuardedBy("lock") private boolean recreateSpanObserver = true;
@GuardedBy("lock") private ClientCallStreamObserver<V1.Span> spanObserver;
@GuardedBy("lock") private Observer observer;
@GuardedBy("lock") private String agentRunToken;
@GuardedBy("lock") private Map<String, String> requestMetadata;

Expand Down Expand Up @@ -62,7 +62,7 @@ void updateMetadata(String agentRunToken, Map<String, String> requestMetadata) {
*
* @return a span observer
*/
ClientCallStreamObserver<V1.Span> getSpanObserver() {
Observer getObserver() {
// Obtain the lock, and await the backoff if in progress
CountDownLatch latch;
synchronized (lock) {
Expand All @@ -88,23 +88,41 @@ ClientCallStreamObserver<V1.Span> getSpanObserver() {
managedChannel = buildChannel();
}
if (recreateSpanObserver) {
if (spanObserver != null) {
if (observer != null) {
logger.log(Level.FINE, "Cancelling and recreating gRPC span observer.");
spanObserver.cancel("CLOSING_CONNECTION", new ChannelClosingException());
observer.cancel("CLOSING_CONNECTION", new ChannelClosingException());
}
IngestServiceStub ingestServiceStub = buildStub(managedChannel);
ResponseObserver responseObserver = buildResponseObserver();
spanObserver = (ClientCallStreamObserver<V1.Span>) ingestServiceStub.recordSpan(responseObserver);
if (config.getUseBatching()) {
observer = buildSpanBatchObserver((ClientCallStreamObserver<V1.SpanBatch>) ingestServiceStub.recordSpanBatch(responseObserver));
} else {
observer = buildSpanObserver((ClientCallStreamObserver<V1.Span>) ingestServiceStub.recordSpan(responseObserver));
}
aggregator.incrementCounter("Supportability/InfiniteTracing/Connect");
recreateSpanObserver = false;
}
return spanObserver;
return observer;
}
}

@VisibleForTesting
Observer buildSpanObserver(ClientCallStreamObserver<V1.Span> observer) {
return new SpanObserver(observer);
}

@VisibleForTesting
Observer buildSpanBatchObserver(ClientCallStreamObserver<V1.SpanBatch> observer) {
return new SpanBatchObserver(observer);
}

@VisibleForTesting
IngestServiceStub buildStub(ManagedChannel managedChannel) {
return IngestServiceGrpc.newStub(managedChannel);
IngestServiceStub ingestServiceStub = IngestServiceGrpc.newStub(managedChannel);
if (config.getCompression() != null) {
ingestServiceStub = ingestServiceStub.withCompression(config.getCompression());
}
return ingestServiceStub;
}

@VisibleForTesting
Expand All @@ -113,7 +131,7 @@ ResponseObserver buildResponseObserver() {
}

/**
* Mark that the span observer should be canceled and recreated the next time {@link #getSpanObserver()} is called.
* Mark that the span observer should be canceled and recreated the next time {@link #getObserver()} is called.
*/
void recreateSpanObserver() {
synchronized (lock) {
Expand All @@ -122,7 +140,7 @@ void recreateSpanObserver() {
}

/**
* Shutdown the channel, cancel the span observer, and backoff. The next time {@link #getSpanObserver()}
* Shutdown the channel, cancel the span observer, and backoff. The next time {@link #getObserver()}
* is called, it will await the backoff and the channel will be recreated.
*
* @param backoffSeconds the number of seconds to await before the channel can be recreated
Expand Down Expand Up @@ -160,7 +178,7 @@ void shutdownChannelAndBackoff(int backoffSeconds) {
}

/**
* Shutdown the channel and do not recreate it. The next time {@link #getSpanObserver()} is called
* Shutdown the channel and do not recreate it. The next time {@link #getObserver()} is called
* an exception will be thrown.
*/
void shutdownChannelForever() {
Expand Down Expand Up @@ -200,4 +218,4 @@ ManagedChannel buildChannel() {
return channelBuilder.build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public class InfiniteTracingConfig {
private final Double flakyPercentage;
private final Long flakyCode;
private final boolean usePlaintext;
private final String compression;
private final boolean useBatching;
private final int maxBatchSize;
private final int lingerMs;

public InfiniteTracingConfig(Builder builder) {
this.licenseKey = builder.licenseKey;
Expand All @@ -23,6 +27,10 @@ public InfiniteTracingConfig(Builder builder) {
this.flakyPercentage = builder.flakyPercentage;
this.flakyCode = builder.flakyCode;
this.usePlaintext = builder.usePlaintext;
this.compression = builder.compression;
this.useBatching = builder.useBatching;
this.maxBatchSize = builder.maxBatchSize;
this.lingerMs = builder.lingerMs;
}

public static Builder builder() {
Expand Down Expand Up @@ -61,6 +69,22 @@ public boolean getUsePlaintext() {
return usePlaintext;
}

public String getCompression() {
return compression;
}

public boolean getUseBatching() {
return useBatching;
}

public int getMaxBatchSize() {
return maxBatchSize;
}

public int getLingerMs() {
return lingerMs;
}

public static class Builder {
public int maxQueueSize;
public Logger logger;
Expand All @@ -70,6 +94,10 @@ public static class Builder {
private Double flakyPercentage;
private Long flakyCode;
private boolean usePlaintext;
private String compression;
private boolean useBatching;
private int maxBatchSize;
private int lingerMs;

/**
* The New Relic APM license key configured for the application.
Expand Down Expand Up @@ -137,13 +165,50 @@ public Builder flakyCode(Long flakyCode) {
/**
* The optional boolean connect using plaintext
*
* @param usePlaintext
* @param usePlaintext true to use plaintext, false otherwise
*/
public Builder usePlaintext(boolean usePlaintext) {
this.usePlaintext = usePlaintext;
return this;
}

/**
* The optional compression type to use when sending to the Trace Observer.
*
* @param compression The compression type to use. Available options are "gzip" or "none".
*/
public Builder compression(String compression) {
this.compression = compression;
return this;
}

/**
* The optional boolean to use batching when sending to the Trace Observer.
*
* @param useBatching true to use batching, false otherwise
*/
public Builder useBatching(boolean useBatching) {
this.useBatching = useBatching;
return this;
}

/**
* Sets the maximum batch size when batching is enabled.
*/
public Builder maxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* Sets the maximum amount of time to wait for a span batch
* to fill before sending if below the {@link #maxBatchSize}
*/
public Builder lingerMs(int lingerMs) {
this.lingerMs = lingerMs;
return this;
}

public InfiniteTracingConfig build() {
return new InfiniteTracingConfig(this);
}
Expand Down
32 changes: 32 additions & 0 deletions infinite-tracing/src/main/java/com/newrelic/Observer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.newrelic;

import com.newrelic.trace.v1.V1;

import javax.annotation.Nullable;

/**
* Shared interface between batching and non-batching trace observer implementations
*/
public interface Observer {

/**
* Sends a single span to the observer
*/
void onNext(V1.Span span);

/**
* Sends a batch of spans to the observer.
*/
void onNext(V1.SpanBatch spanBatch);

/**
* Whether the observer is in a ready state to accept spans
*/
boolean isReady();

/**
* Cancel the connection to the observer
*/
void cancel(@Nullable String message, @Nullable Throwable cause);

}
36 changes: 36 additions & 0 deletions infinite-tracing/src/main/java/com/newrelic/SpanBatchObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.newrelic;

import com.newrelic.trace.v1.V1;
import io.grpc.stub.ClientCallStreamObserver;

import javax.annotation.Nullable;

public class SpanBatchObserver implements Observer {

private final ClientCallStreamObserver<V1.SpanBatch> observer;

public SpanBatchObserver(ClientCallStreamObserver<V1.SpanBatch> observer) {
this.observer = observer;
}

@Override
public void onNext(V1.Span span) {
// This should only be used by the SpanObserver
throw new UnsupportedOperationException();
}

@Override
public void onNext(V1.SpanBatch spanBatch) {
observer.onNext(spanBatch);
}

@Override
public boolean isReady() {
return observer.isReady();
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
observer.cancel(message, cause);
}
}
16 changes: 15 additions & 1 deletion infinite-tracing/src/main/java/com/newrelic/SpanConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.newrelic.agent.model.SpanEvent;
import com.newrelic.trace.v1.V1;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

class SpanConverter {

Expand Down Expand Up @@ -32,6 +34,18 @@ static V1.Span convert(SpanEvent spanEvent) {
.build();
}

/**
* Convert the batch of span events to the equivalent gRPC spans.
*
* @param spanEvents the span event batch
* @return the gRPC span batch
*/
static V1.SpanBatch convert(Collection<SpanEvent> spanEvents) {
return V1.SpanBatch.newBuilder()
.addAllSpans(spanEvents.stream().map(SpanConverter::convert).collect(Collectors.toList()))
.build();
}

private static Map<String, V1.AttributeValue> copyAttributes(Map<String, Object> original) {
Map<String, V1.AttributeValue> copy = new HashMap<>();
if (original == null) {
Expand All @@ -53,4 +67,4 @@ private static Map<String, V1.AttributeValue> copyAttributes(Map<String, Object>
return copy;
}

}
}
Loading

0 comments on commit 302e103

Please sign in to comment.