Skip to content

Commit

Permalink
Support rx java
Browse files Browse the repository at this point in the history
RxJava Support

fixes #235
shivangshah authored and marcingrzejszczak committed Apr 4, 2016
1 parent 60cb5eb commit beafc3a
Showing 9 changed files with 382 additions and 10 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -15,4 +15,6 @@ _site/
*.logtjmeter
.checkstyle
.DS_Store
*.log
*.log
/spring-cloud-sleuth-core/nb-configuration.xml
/spring-cloud-sleuth-core/nbactions.xml
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/spring-cloud-sleuth.adoc
Original file line number Diff line number Diff line change
@@ -407,6 +407,13 @@ In order to pass the tracing information you have to wrap the same logic in the
include::../../../../spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/hystrix/TraceCommandTests.java[tags=trace_hystrix_command,indent=0]
----

=== `RxJavaSchedulersHook` to add support for `RxJava` Schedulers

We're registering a custom https://github.com/ReactiveX/RxJava/wiki/Plugins#rxjavaschedulershook [`RxJavaSchedulersHook`]
that wraps all `Action0` instances into their Sleuth representative -
the `TraceAction`. The hook either starts or continues a span depending on the fact whether tracing was already going
on before the Action was scheduled. To disable the custom RxJavaSchedulersHook set the `spring.sleuth.rxjava.schedulers.hook.enabled` to `false`.

=== HTTP integration

Features from this section can be disabled by providing the `spring.sleuth.web.enabled` property with value equal to `false`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rx.plugins.RxJavaSchedulersHook;

/**
* {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration} that
* enables support for RxJava via {@link RxJavaSchedulersHook}.
*
* @author Shivang Shah
* @since 1.0.0
*/
@Configuration
@AutoConfigureAfter(TraceAutoConfiguration.class)
@ConditionalOnBean(Tracer.class)
@ConditionalOnClass(RxJavaSchedulersHook.class)
@ConditionalOnProperty(value = "spring.sleuth.rxjava.schedulers.hook.enabled", matchIfMissing = true)
public class RxJavaAutoConfiguration {

@Bean
SleuthRxJavaSchedulersHook sleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) {
return new SleuthRxJavaSchedulersHook(tracer, traceKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import rx.functions.Action0;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaSchedulersHook;
import rx.plugins.SleuthRxJavaPlugins;

/**
* {@link RxJavaSchedulersHook} that wraps a {@link Action0} into its tracing
* representation.
*
* @author Shivang Shah
* @since 1.0.0
*/
class SleuthRxJavaSchedulersHook extends RxJavaSchedulersHook {

private static final Log log = LogFactory.getLog(SleuthRxJavaSchedulersHook.class);

private static final String RXJAVA_COMPONENT = "rxjava";
private final Tracer tracer;
private final TraceKeys traceKeys;
private RxJavaSchedulersHook delegate;

SleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) {
this.tracer = tracer;
this.traceKeys = traceKeys;
try {
this.delegate = SleuthRxJavaPlugins.getInstance().getSchedulersHook();
if (this.delegate instanceof SleuthRxJavaSchedulersHook) {
return;
}
RxJavaErrorHandler errorHandler = SleuthRxJavaPlugins.getInstance().getErrorHandler();
RxJavaObservableExecutionHook observableExecutionHook
= SleuthRxJavaPlugins.getInstance().getObservableExecutionHook();
logCurrentStateOfRxJavaPlugins(errorHandler, observableExecutionHook);
SleuthRxJavaPlugins.resetPlugins();
SleuthRxJavaPlugins.getInstance().registerSchedulersHook(this);
SleuthRxJavaPlugins.getInstance().registerErrorHandler(errorHandler);
SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(observableExecutionHook);
} catch (Exception e) {
log.error("Failed to register Sleuth RxJava SchedulersHook", e);
}
}

private void logCurrentStateOfRxJavaPlugins(RxJavaErrorHandler errorHandler,
RxJavaObservableExecutionHook observableExecutionHook) {
log.debug("Current RxJava plugins configuration is ["
+ "schedulersHook [" + this.delegate + "],"
+ "errorHandler [" + errorHandler + "],"
+ "observableExecutionHook [" + observableExecutionHook + "],"
+ "]");
log.debug("Registering Sleuth RxJava Schedulers Hook.");
}

@Override
public Action0 onSchedule(Action0 action) {
if (action instanceof TraceAction) {
return action;
}
Action0 wrappedAction = this.delegate != null
? this.delegate.onSchedule(action) : action;
if (wrappedAction instanceof TraceAction) {
return action;
}
return super.onSchedule(new TraceAction(this.tracer, this.traceKeys, wrappedAction));
}

static class TraceAction implements Action0 {

private final Action0 actual;
private Tracer tracer;
private TraceKeys traceKeys;
private Span parent;

public TraceAction(Tracer tracer, TraceKeys traceKeys, Action0 actual) {
this.tracer = tracer;
this.traceKeys = traceKeys;
this.parent = tracer.getCurrentSpan();
this.actual = actual;
}

@Override
public void call() {
Span span = this.parent;
boolean created = false;
if (span != null) {
span = this.tracer.continueSpan(span);
} else {
span = this.tracer.createSpan(RXJAVA_COMPONENT);
this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, RXJAVA_COMPONENT);
this.tracer.addTag(this.traceKeys.getAsync().getPrefix()
+ this.traceKeys.getAsync().getThreadNameKey(), Thread.currentThread().getName());
created = true;
}
try {
this.actual.call();
} finally {
if (created) {
this.tracer.close(span);
} else {
this.tracer.detach(span);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package rx.plugins;

/**
* {@link RxJavaPlugins} helper class to access the package scope method
* of {@link RxJavaPlugins#reset()}. Will disappear once this gets closed
* https://github.com/ReactiveX/RxJava/issues/2297
*
* @author Shivang Shah
* @since 1.0.0
*/
public class SleuthRxJavaPlugins extends RxJavaPlugins {

SleuthRxJavaPlugins() {
super();
}

public static void resetPlugins() {
getInstance().reset();
}
}
Original file line number Diff line number Diff line change
@@ -14,7 +14,8 @@ org.springframework.cloud.sleuth.instrument.web.TraceWebAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.TraceWebClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.TraceWebAsyncClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration
org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.rxjava.RxJavaAutoConfiguration

# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=\
Original file line number Diff line number Diff line change
@@ -16,13 +16,15 @@

package org.springframework.cloud.sleuth;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.assertj.core.api.BDDAssertions.then;

/**
@@ -36,24 +38,24 @@ public class LogTest {
ObjectMapper objectMapper = new ObjectMapper();

@Test public void ctor_missing_event() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("event");
this.thrown.expect(NullPointerException.class);
this.thrown.expectMessage("event");

new Log(1234L, null);
}

@Test public void serialization_round_trip() throws IOException {
Log log = new Log(1234L, "cs");

String serialized = objectMapper.writeValueAsString(log);
Log deserialized = objectMapper.readValue(serialized, Log.class);
String serialized = this.objectMapper.writeValueAsString(log);
Log deserialized = this.objectMapper.readValue(serialized, Log.class);

then(deserialized).isEqualTo(log);
}

@Test public void deserialize_missing_event() throws IOException {
thrown.expect(JsonMappingException.class);
this.thrown.expect(JsonMappingException.class);

objectMapper.readValue("{\"timestamp\": 1234}", Log.class);
this.objectMapper.readValue("{\"timestamp\": 1234}", Log.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import java.util.ArrayList;
import java.util.List;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.sleuth.Sampler;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.SpanReporter;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import rx.Observable;
import rx.functions.Action0;
import rx.plugins.SleuthRxJavaPlugins;
import rx.schedulers.Schedulers;
import static org.springframework.cloud.sleuth.assertions.SleuthAssertions.then;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {
SleuthRxJavaIntegrationTests.TestConfig.class})
public class SleuthRxJavaIntegrationTests {

@Autowired
Tracer tracer;
@Autowired
TraceKeys traceKeys;
@Autowired
Listener listener;
@Autowired
SleuthRxJavaSchedulersHook sleuthRxJavaSchedulersHook;
StringBuilder caller = new StringBuilder();

@Before
public void cleanTrace() {
this.listener.getEvents().clear();
}

@BeforeClass
@AfterClass
public static void cleanUp() {
SleuthRxJavaPlugins.resetPlugins();
}

@Test
public void should_create_new_span_when_no_current_span_when_rx_java_action_is_executed() {
Observable.defer(() -> Observable.just(
(Action0) () -> this.caller = new StringBuilder("actual_action")
)).subscribeOn(Schedulers.newThread()).toBlocking()
.subscribe(Action0::call);

then(this.caller.toString()).isEqualTo("actual_action");
then(this.tracer.getCurrentSpan()).isNull();
then(this.listener.getEvents().size()).isEqualTo(1);
then(this.listener.getEvents().get(0)).hasNameEqualTo("rxjava");
then(this.listener.getEvents().get(0)).hasATag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, "rxjava");
then(this.listener.getEvents().get(0)).isALocalComponentSpan();
}

@Test
public void should_continue_current_span_when_rx_java_action_is_executed() {
Span spanInCurrentThread = this.tracer.createSpan("current_span");
this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, "current_span");

Observable.defer(() -> Observable.just(
(Action0) () -> this.caller = new StringBuilder("actual_action")
)).subscribeOn(Schedulers.newThread()).toBlocking()
.subscribe(Action0::call);

then(this.caller.toString()).isEqualTo("actual_action");
then(this.tracer.getCurrentSpan()).isNotNull();
//making sure here that no new spans were created or reported as closed
then(this.listener.getEvents().size()).isEqualTo(0);
then(spanInCurrentThread).hasNameEqualTo(spanInCurrentThread.getName());
then(spanInCurrentThread).hasATag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, "current_span");
then(spanInCurrentThread).isALocalComponentSpan();
}

@Component
public static class Listener implements SpanReporter {

List<Span> events = new ArrayList<>();

public List<Span> getEvents() {
return this.events;
}

@Override
public void report(Span span) {
this.events.add(span);
}
}

@Configuration
@EnableAutoConfiguration
public static class TestConfig {

@Bean
Listener listener() {
return new Listener();
}

@Bean
Sampler alwaysSampler() {
return new AlwaysSampler();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;

import rx.functions.Action0;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaSchedulersHook;
import rx.plugins.SleuthRxJavaPlugins;

import static org.assertj.core.api.BDDAssertions.then;

/**
*
* @author Shivang Shah
*/
@RunWith(MockitoJUnitRunner.class)
public class SleuthRxJavaSchedulersHookTest {

@Mock
Tracer tracer;
TraceKeys traceKeys = new TraceKeys();

private static StringBuilder caller;

@Before
@After
public void setup() {
SleuthRxJavaPlugins.resetPlugins();
caller = new StringBuilder();
}

@Test
public void should_not_override_existing_custom_hooks() {
SleuthRxJavaPlugins.getInstance().registerErrorHandler(new MyRxJavaErrorHandler());
SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(new MyRxJavaObservableExecutionHook());
new SleuthRxJavaSchedulersHook(this.tracer, this.traceKeys);
then(SleuthRxJavaPlugins.getInstance().getErrorHandler()).isExactlyInstanceOf(MyRxJavaErrorHandler.class);
then(SleuthRxJavaPlugins.getInstance().getObservableExecutionHook()).isExactlyInstanceOf(MyRxJavaObservableExecutionHook.class);
}

@Test
public void should_wrap_delegates_action_in_wrapped_action_when_delegate_is_present_on_schedule() {
SleuthRxJavaPlugins.getInstance().registerSchedulersHook(new MyRxJavaSchedulersHook());
SleuthRxJavaSchedulersHook schedulersHook = new SleuthRxJavaSchedulersHook(
this.tracer, this.traceKeys);
Action0 action = schedulersHook.onSchedule(() -> {
caller = new StringBuilder("hello");
});
action.call();
then(action).isInstanceOf(SleuthRxJavaSchedulersHook.TraceAction.class);
then(caller.toString()).isEqualTo("called_from_schedulers_hook");
}

static class MyRxJavaObservableExecutionHook extends RxJavaObservableExecutionHook {
}

static class MyRxJavaSchedulersHook extends RxJavaSchedulersHook {

@Override
public Action0 onSchedule(Action0 action) {
return () -> {
caller = new StringBuilder("called_from_schedulers_hook");
};
}
}

static class MyRxJavaErrorHandler extends RxJavaErrorHandler {
}
}

0 comments on commit beafc3a

Please sign in to comment.