Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not merge -- infinite tracing compression and batching #1266

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/actions/setup-environment/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ runs:
11
8

- name: Check Gradle wrapper
uses: gradle/wrapper-validation-action@v1

- name: set gradle.properties
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/GHA-Scala-Functional-Tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:

- name: Upload coverage to Codecov
if: success()
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v3
with:
files: '**/build/reports/jacoco/test/jacocoTestReport.xml'
fail_ci_if_error: false #default is false, but being explicit about what to expect.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/GHA-Unit-Tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:

- name: Upload coverage to Codecov
if: success()
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v3
with:
files: '**/build/reports/jacoco/test/jacocoTestReport.xml'
fail_ci_if_error: false #default is false, but being explicit about what to expect.
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/X-Reusable-VerifyInstrumentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,8 @@ jobs:
path: /home/runner/work/newrelic-java-agent/newrelic-java-agent/newrelic-agent/build/newrelicJar/newrelic.jar
key: ${{ github.run_id }}

# Verify instrumentation must run with Java 17
- name: Running verifyInstrumentation on (${{ matrix.modules }})
env:
JAVA_HOME: ${{ env.JAVA_HOME_17_X64 }}
run: ./gradlew $GRADLE_OPTIONS --info :instrumentation:${{ matrix.modules }}:verifyInstrumentation
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.security.Principal;
import java.util.HashMap;
import java.util.Map;
Expand Down
1 change: 1 addition & 0 deletions infinite-tracing/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation(project(":newrelic-api"))

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.6.2")
testImplementation("org.mockito:mockito-junit-jupiter:3.3.3")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
}
Expand Down
40 changes: 29 additions & 11 deletions infinite-tracing/src/main/java/com/newrelic/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ChannelManager {
@GuardedBy("lock") private CountDownLatch backoffLatch;
@GuardedBy("lock") private ManagedChannel managedChannel;
@GuardedBy("lock") private boolean recreateSpanObserver = true;
@GuardedBy("lock") private ClientCallStreamObserver<V1.Span> spanObserver;
@GuardedBy("lock") private Observer observer;
@GuardedBy("lock") private String agentRunToken;
@GuardedBy("lock") private Map<String, String> requestMetadata;

Expand Down Expand Up @@ -62,7 +62,7 @@ void updateMetadata(String agentRunToken, Map<String, String> requestMetadata) {
*
* @return a span observer
*/
ClientCallStreamObserver<V1.Span> getSpanObserver() {
Observer getObserver() {
// Obtain the lock, and await the backoff if in progress
CountDownLatch latch;
synchronized (lock) {
Expand All @@ -88,23 +88,41 @@ ClientCallStreamObserver<V1.Span> getSpanObserver() {
managedChannel = buildChannel();
}
if (recreateSpanObserver) {
if (spanObserver != null) {
if (observer != null) {
logger.log(Level.FINE, "Cancelling and recreating gRPC span observer.");
spanObserver.cancel("CLOSING_CONNECTION", new ChannelClosingException());
observer.cancel("CLOSING_CONNECTION", new ChannelClosingException());
}
IngestServiceStub ingestServiceStub = buildStub(managedChannel);
ResponseObserver responseObserver = buildResponseObserver();
spanObserver = (ClientCallStreamObserver<V1.Span>) ingestServiceStub.recordSpan(responseObserver);
if (config.getUseBatching()) {
observer = buildSpanBatchObserver((ClientCallStreamObserver<V1.SpanBatch>) ingestServiceStub.recordSpanBatch(responseObserver));
} else {
observer = buildSpanObserver((ClientCallStreamObserver<V1.Span>) ingestServiceStub.recordSpan(responseObserver));
}
aggregator.incrementCounter("Supportability/InfiniteTracing/Connect");
recreateSpanObserver = false;
}
return spanObserver;
return observer;
}
}

@VisibleForTesting
Observer buildSpanObserver(ClientCallStreamObserver<V1.Span> observer) {
return new SpanObserver(observer);
}

@VisibleForTesting
Observer buildSpanBatchObserver(ClientCallStreamObserver<V1.SpanBatch> observer) {
return new SpanBatchObserver(observer);
}

@VisibleForTesting
IngestServiceStub buildStub(ManagedChannel managedChannel) {
return IngestServiceGrpc.newStub(managedChannel);
IngestServiceStub ingestServiceStub = IngestServiceGrpc.newStub(managedChannel);
if (config.getUseCompression()) {
ingestServiceStub = ingestServiceStub.withCompression("gzip");
}
return ingestServiceStub;
}

@VisibleForTesting
Expand All @@ -113,7 +131,7 @@ ResponseObserver buildResponseObserver() {
}

/**
* Mark that the span observer should be canceled and recreated the next time {@link #getSpanObserver()} is called.
* Mark that the span observer should be canceled and recreated the next time {@link #getObserver()} is called.
*/
void recreateSpanObserver() {
synchronized (lock) {
Expand All @@ -122,7 +140,7 @@ void recreateSpanObserver() {
}

/**
* Shutdown the channel, cancel the span observer, and backoff. The next time {@link #getSpanObserver()}
* Shutdown the channel, cancel the span observer, and backoff. The next time {@link #getObserver()}
* is called, it will await the backoff and the channel will be recreated.
*
* @param backoffSeconds the number of seconds to await before the channel can be recreated
Expand Down Expand Up @@ -160,7 +178,7 @@ void shutdownChannelAndBackoff(int backoffSeconds) {
}

/**
* Shutdown the channel and do not recreate it. The next time {@link #getSpanObserver()} is called
* Shutdown the channel and do not recreate it. The next time {@link #getObserver()} is called
* an exception will be thrown.
*/
void shutdownChannelForever() {
Expand Down Expand Up @@ -200,4 +218,4 @@ ManagedChannel buildChannel() {
return channelBuilder.build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public class InfiniteTracing implements Consumer<SpanEvent> {
*/
public void start(String agentRunToken, Map<String, String> requestMetadata) {
synchronized (lock) {
// Record supportability metrics related to Infinite Tracing configuration settings
aggregator.incrementCounter("Supportability/InfiniteTracing/gRPC/Compression/" + (config.getUseCompression() ? "enabled" : "disabled"));
aggregator.incrementCounter("Supportability/InfiniteTracing/gRPC/Batching/" + (config.getUseBatching() ? "enabled" : "disabled"));

if (spanEventSenderFuture != null) {
channelManager.updateMetadata(agentRunToken, requestMetadata);
channelManager.shutdownChannelAndBackoff(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class InfiniteTracingConfig {
private final Double flakyPercentage;
private final Long flakyCode;
private final boolean usePlaintext;
private final boolean useCompression;
private final boolean useBatching;

public InfiniteTracingConfig(Builder builder) {
this.licenseKey = builder.licenseKey;
Expand All @@ -23,6 +25,8 @@ public InfiniteTracingConfig(Builder builder) {
this.flakyPercentage = builder.flakyPercentage;
this.flakyCode = builder.flakyCode;
this.usePlaintext = builder.usePlaintext;
this.useCompression = builder.useCompression;
this.useBatching = builder.useBatching;
}

public static Builder builder() {
Expand Down Expand Up @@ -61,6 +65,14 @@ public boolean getUsePlaintext() {
return usePlaintext;
}

public boolean getUseCompression() {
return useCompression;
}

public boolean getUseBatching() {
return useBatching;
}

public static class Builder {
public int maxQueueSize;
public Logger logger;
Expand All @@ -70,6 +82,8 @@ public static class Builder {
private Double flakyPercentage;
private Long flakyCode;
private boolean usePlaintext;
private boolean useCompression;
private boolean useBatching;

/**
* The New Relic APM license key configured for the application.
Expand Down Expand Up @@ -137,13 +151,33 @@ public Builder flakyCode(Long flakyCode) {
/**
* The optional boolean connect using plaintext
*
* @param usePlaintext
* @param usePlaintext true to use plaintext, false otherwise
*/
public Builder usePlaintext(boolean usePlaintext) {
this.usePlaintext = usePlaintext;
return this;
}

/**
* The optional boolean to use compression when sending to the Trace Observer.
*
* @param useCompression true to use compression, false otherwise
*/
public Builder useCompression(boolean useCompression) {
this.useCompression = useCompression;
return this;
}

/**
* The optional boolean to use batching when sending to the Trace Observer.
*
* @param useBatching true to use batching, false otherwise
*/
public Builder useBatching(boolean useBatching) {
this.useBatching = useBatching;
return this;
}

public InfiniteTracingConfig build() {
return new InfiniteTracingConfig(this);
}
Expand Down
32 changes: 32 additions & 0 deletions infinite-tracing/src/main/java/com/newrelic/Observer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.newrelic;

import com.newrelic.trace.v1.V1;

import javax.annotation.Nullable;

/**
* Shared interface between batching and non-batching trace observer implementations
*/
public interface Observer {

/**
* Sends a single span to the observer
*/
void onNext(V1.Span span);

/**
* Sends a batch of spans to the observer.
*/
void onNext(V1.SpanBatch spanBatch);

/**
* Whether the observer is in a ready state to accept spans
*/
boolean isReady();

/**
* Cancel the connection to the observer
*/
void cancel(@Nullable String message, @Nullable Throwable cause);

}
36 changes: 36 additions & 0 deletions infinite-tracing/src/main/java/com/newrelic/SpanBatchObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.newrelic;

import com.newrelic.trace.v1.V1;
import io.grpc.stub.ClientCallStreamObserver;

import javax.annotation.Nullable;

public class SpanBatchObserver implements Observer {

private final ClientCallStreamObserver<V1.SpanBatch> observer;

public SpanBatchObserver(ClientCallStreamObserver<V1.SpanBatch> observer) {
this.observer = observer;
}

@Override
public void onNext(V1.Span span) {
// This should only be used by the SpanObserver
throw new UnsupportedOperationException();
}

@Override
public void onNext(V1.SpanBatch spanBatch) {
observer.onNext(spanBatch);
}

@Override
public boolean isReady() {
return observer.isReady();
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
observer.cancel(message, cause);
}
}
16 changes: 15 additions & 1 deletion infinite-tracing/src/main/java/com/newrelic/SpanConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.newrelic.agent.model.SpanEvent;
import com.newrelic.trace.v1.V1;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

class SpanConverter {

Expand Down Expand Up @@ -32,6 +34,18 @@ static V1.Span convert(SpanEvent spanEvent) {
.build();
}

/**
* Convert the batch of span events to the equivalent gRPC spans.
*
* @param spanEvents the span event batch
* @return the gRPC span batch
*/
static V1.SpanBatch convert(Collection<SpanEvent> spanEvents) {
return V1.SpanBatch.newBuilder()
.addAllSpans(spanEvents.stream().map(SpanConverter::convert).collect(Collectors.toList()))
.build();
}

private static Map<String, V1.AttributeValue> copyAttributes(Map<String, Object> original) {
Map<String, V1.AttributeValue> copy = new HashMap<>();
if (original == null) {
Expand All @@ -53,4 +67,4 @@ private static Map<String, V1.AttributeValue> copyAttributes(Map<String, Object>
return copy;
}

}
}
Loading