Skip to content

Commit

Permalink
Merge pull request #190 from newrelic/spring-webflux-subscribe
Browse files Browse the repository at this point in the history
Spring webflux subscribe
  • Loading branch information
tspring authored Jan 21, 2021
2 parents f522baf + 3bdf099 commit 0d8d810
Show file tree
Hide file tree
Showing 21 changed files with 406 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation;

import com.newrelic.api.agent.NewRelic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.api.agent.weaver.NewField;
Expand Down
2 changes: 1 addition & 1 deletion instrumentation/netty-reactor-0.8.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jar {
}

verifyInstrumentation {
passesOnly 'io.projectreactor.netty:reactor-netty:[0.8.0.RELEASE,)'
passesOnly 'io.projectreactor.netty:reactor-netty:[0.8.0.RELEASE,0.9.0.RELEASE)'
}

site {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation;

import com.newrelic.api.agent.NewRelic;

public class NettyReactorConfig {
public static final boolean errorsEnabled = NewRelic.getAgent().getConfig().getValue("reactor-netty.errors.enabled", true);
public static final boolean errorsEnabled = NewRelic.getAgent().getConfig()
.getValue("reactor-netty.errors.enabled", true);

private NettyReactorConfig() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.api.agent.weaver.NewField;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.scheduler;

import com.newrelic.api.agent.Trace;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.scheduler;

import com.newrelic.api.agent.Trace;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package reactor.netty.http.server;

import com.newrelic.api.agent.weaver.SkipIfPresent;

@SkipIfPresent
final class HttpServerMetricsHandler {
}
17 changes: 17 additions & 0 deletions instrumentation/netty-reactor-0.9.0/NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
This product contains a modified part of OpenTelemetry:

* License:

Copyright 2019 The OpenTelemetry Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.

* Homepage: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/master/LICENSE
27 changes: 27 additions & 0 deletions instrumentation/netty-reactor-0.9.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("io.projectreactor.netty:reactor-netty:0.9.0.RELEASE")
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

compileTestJava {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

compileJava.options.bootstrapClasspath = null

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.netty-reactor-0.9.0' }
}

verifyInstrumentation {
passesOnly 'io.projectreactor.netty:reactor-netty:[0.9.0.RELEASE,)'
}

site {
title 'Netty Reactor'
type 'Appserver'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation;

import com.newrelic.api.agent.NewRelic;

public class NettyReactorConfig {
public static final boolean errorsEnabled = NewRelic.getAgent().getConfig()
.getValue("reactor-netty.errors.enabled", true);

private NettyReactorConfig() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.nr.instrumentation.reactor.netty;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.nr.instrumentation.NettyReactorConfig;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

// Based on OpenTelemetry code
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/master/instrumentation-core/reactor-3.1/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java
public class TokenLinkingSubscriber<T> implements CoreSubscriber<T> {
private final Token token;
private final Subscriber<? super T> subscriber;
private Context context;

public TokenLinkingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
this.subscriber = subscriber;
this.context = ctx;
// newrelic-token is added by spring-webflux instrumentation
this.token = ctx.getOrDefault("newrelic-token", null);
}

@Override
public void onSubscribe(Subscription subscription) {
withNRToken(() -> subscriber.onSubscribe(subscription));
}

@Override
public void onNext(T o) {
withNRToken(() -> subscriber.onNext(o));
}

@Override
public void onError(Throwable throwable) {
withNRError(() -> subscriber.onError(throwable), throwable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}

@Override
public Context currentContext() {
return context;
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRToken(Runnable runnable) {
if (token != null && AgentBridge.getAgent().getTransaction(false) == null) {
token.link();
}
runnable.run();
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRError(Runnable runnable, Throwable throwable) {
if (token != null && token.isActive()) {
token.linkAndExpire();
if (NettyReactorConfig.errorsEnabled) {
NewRelic.noticeError(throwable);
}
}
runnable.run();
}

public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tokenLift() {
return Operators.lift(new TokenLifter<>());
}

private static class TokenLifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

public TokenLifter() {
}

@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
// if Flux/Mono #just, #empty, #error
if (publisher instanceof Fuseable.ScalarCallable) {
return sub;
}
Token token = sub.currentContext().getOrDefault("newrelic-token", null);
if (token != null ) {
return new TokenLinkingSubscriber<>(sub, sub.currentContext());
}
return sub;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;

import java.util.concurrent.atomic.AtomicBoolean;

@Weave(originalName = "reactor.core.publisher.Hooks")
public abstract class Hooks_Instrumentation {
@NewField
public static AtomicBoolean instrumented = new AtomicBoolean(false);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import reactor.util.context.Context;

@Weave(originalName = "reactor.core.publisher.LambdaMonoSubscriber")
abstract class LambdaMonoSubscriber_Instrumentation {
@NewField
private Context nrContext;
final Context initialContext = Weaver.callOriginal();

@WeaveAllConstructors
protected LambdaMonoSubscriber_Instrumentation() {
// LamdaMonoSubscriber creates a new Context, so we create a new token and put it on the Context
// to be linked by TokenLinkingSubscriber but expired on onComplete here
if (AgentBridge.getAgent().getTransaction(false) != null
&& initialContext.getOrDefault("newrelic-token", null) == null) {
nrContext = Context.of("newrelic-token", NewRelic.getAgent().getTransaction().getToken());
}
}

public final void onComplete() {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;

}
}

public Context currentContext() {
if (nrContext != null) {
return initialContext.putAll(nrContext);
}
return Weaver.callOriginal();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import reactor.util.context.Context;

@Weave(originalName = "reactor.core.publisher.LambdaSubscriber")
abstract class LambdaSubscriber_Instrumentation {
final Context initialContext = Weaver.callOriginal();
@NewField
private Context nrContext;

@WeaveAllConstructors
protected LambdaSubscriber_Instrumentation() {
if (AgentBridge.getAgent().getTransaction(false) != null
&& initialContext.getOrDefault("newrelic-token", null) == null) {
nrContext = Context.of("newrelic-token", NewRelic.getAgent().getTransaction().getToken());
}
}

public final void onComplete() {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;

}
}

public Context currentContext() {
if (nrContext != null) {
//return nrContext;
return initialContext.putAll(nrContext);
}
return Weaver.callOriginal();
}

}
Loading

0 comments on commit 0d8d810

Please sign in to comment.