Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #973 - Context correlation will not swap instances to prevent application errors #1000

Merged
merged 8 commits into from
Feb 18, 2021
Merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,241 @@

import io.opencensus.common.Scope;
import io.opencensus.tags.*;
import org.junit.jupiter.api.Test;
import io.opencensus.tags.Tag;
import io.opencensus.tags.Tags;
import org.junit.jupiter.api.*;
import rocks.inspectit.ocelot.instrumentation.InstrumentationSysTestBase;
import rocks.inspectit.ocelot.instrumentation.special.HelperClasses.TestCallable;
import rocks.inspectit.ocelot.instrumentation.special.HelperClasses.TestRunnable;
import rocks.inspectit.ocelot.utils.TestUtils;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

public class ExecutorContextPropagationTest extends InstrumentationSysTestBase {

private static final Tagger tagger = Tags.getTagger();

@Test
public void testContextPropagationAcrossExecutorForRunnables() throws Exception {
ExecutorService es = Executors.newFixedThreadPool(2);
TagKey keyToPropagate = TagKey.create("propagation/test/tag");
private ExecutorService executorService;

AtomicReference<String> tagValue = new AtomicReference<>(null);
@BeforeAll
public static void beforeAll() {
TestUtils.waitForClassInstrumentations(ThreadPoolExecutor.class, TestRunnable.class, TestCallable.class);
}

Future<?> taskFuture;
@BeforeEach
public void beforeEach() throws ExecutionException, InterruptedException {
// warmup the executor - if this is not be done, the first call to the executor will always be correlated
// because a thread is started, thus, it is correlated due to the Thread.start correlation
executorService = Executors.newSingleThreadExecutor();
executorService.submit(Math::random).get();
}

try (Scope s = tagger.currentBuilder().putLocal(keyToPropagate, TagValue.create("myval")).buildScoped()) {
taskFuture = es.submit(() -> {
Iterator<Tag> it = InternalUtils.getTags(tagger.getCurrentTagContext());
while (it.hasNext()) {
Tag tag = it.next();
if (tag.getKey().equals(keyToPropagate)) {
tagValue.set(tag.getValue().asString());
}
}
});
@AfterEach
public void afterEach() {
executorService.shutdown();
}

@Nested
public class Submit_runnable {

@Test
public void correlateRunnable_lambda() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");
AtomicReference<Iterator<Tag>> refTags = new AtomicReference<>();

Runnable runnable = HelperClasses.getRunnableAsLambda(refTags);

Future<?> taskFuture;
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
taskFuture = executorService.submit(runnable);
}

taskFuture.get();

assertThat(refTags.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}
taskFuture.get();
assertThat(tagValue.get()).isEqualTo("myval");

@Test
public void correlateRunnable_anonymous() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");
AtomicReference<Iterator<Tag>> refTags = new AtomicReference<>();

Runnable runnable = HelperClasses.getRunnableAsAnonymous(refTags);
TestUtils.waitForClassInstrumentations(runnable.getClass());

Future<?> taskFuture;
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
taskFuture = executorService.submit(runnable);
}
taskFuture.get();

assertThat(refTags.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}

@Test
public void correlateRunnable_named() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");
AtomicReference<Iterator<Tag>> refTags = new AtomicReference<>();

Runnable runnable = HelperClasses.getRunnableAsNamed(refTags);

Future<?> taskFuture;
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
taskFuture = executorService.submit(runnable);
}
taskFuture.get();

assertThat(refTags.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}
}

@Nested
public class Execute_runnable {

@Test
public void correlateRunnable_lambda() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");
AtomicReference<Iterator<Tag>> refTags = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

Runnable runnable = () -> {
refTags.set(InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
latch.countDown();
};

@Test
public void testContextPropagationAcrossExecutorForCallables() throws Exception {
ExecutorService es = Executors.newFixedThreadPool(2);
TagKey keyToPropagate = TagKey.create("propagation/test/tag");
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
executorService.execute(runnable);
}

latch.await();

Future<String> taskFuture;
try (Scope s = tagger.currentBuilder().putLocal(keyToPropagate, TagValue.create("myval")).buildScoped()) {
taskFuture = es.submit(() -> {
Iterator<Tag> it = InternalUtils.getTags(tagger.getCurrentTagContext());
while (it.hasNext()) {
Tag tag = it.next();
if (tag.getKey().equals(keyToPropagate)) {
return tag.getValue().asString();
}
assertThat(refTags.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}

@Test
public void correlateRunnable_anonymous() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");
AtomicReference<Iterator<Tag>> refTags = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

Runnable runnable = new Runnable() {
@Override
public void run() {
refTags.set(InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
latch.countDown();
}
return null;
});
};
TestUtils.waitForClassInstrumentations(runnable.getClass());

try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
executorService.execute(runnable);
}

latch.await();

assertThat(refTags.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}

assertThat(taskFuture.get()).isEqualTo("myval");
@Test
public void correlateRunnable_named() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");
AtomicReference<Iterator<Tag>> refTags = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

Runnable runnable = new TestRunnable(unused -> {
refTags.set(InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
latch.countDown();
});

try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
executorService.execute(runnable);
}

latch.await();

assertThat(refTags.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}
}

@Nested
public class Submit_callable {

@Test
public void submitCallable_lambda() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");

Callable<Iterator<Tag>> callable = HelperClasses.getCallableAsLambda();

Future<Iterator<Tag>> result;
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
result = executorService.submit(callable);
}

assertThat(result.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}

@Test
public void submitCallable_anonymous() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");

Callable<Iterator<Tag>> callable = HelperClasses.getCallableAsAnonymous();
TestUtils.waitForClassInstrumentations(callable.getClass());

Future<Iterator<Tag>> result;
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
result = executorService.submit(callable);
}

assertThat(result.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}

@Test
public void submitCallable_named() throws Exception {
TagKey tagKey = TagKey.create("tag_key");
TagValue tagValue = TagValue.create("tag_value");

Callable<Iterator<Tag>> callable = HelperClasses.getCallableAsNamed();

Future<Iterator<Tag>> result;
try (Scope s = tagger.currentBuilder().putLocal(tagKey, tagValue).buildScoped()) {
result = executorService.submit(callable);
}

assertThat(result.get()).hasSize(1)
.extracting("key", "value")
.contains(tuple(tagKey, tagValue));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package rocks.inspectit.ocelot.instrumentation.special;

import io.opencensus.tags.InternalUtils;
import io.opencensus.tags.Tag;
import io.opencensus.tags.Tags;

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

public final class HelperClasses {

// No instance of this class please
private HelperClasses() {
}

public static class TestRunnable implements Runnable {

private final Consumer<Void> callback;

public TestRunnable(Consumer<Void> callback) {
this.callback = callback;
}

@Override
public void run() {
callback.accept(null);
}
}

public static class TestCallable<T> implements Callable<T> {

private final Function<Void, T> callback;

public TestCallable(Function<Void, T> callback) {
this.callback = callback;
}

@Override
public T call() throws Exception {
return callback.apply(null);
}
}

public static Runnable getRunnableAsLambda(AtomicReference<Iterator<Tag>> refTags) {
return () -> {
refTags.set(InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
};
}

public static Runnable getRunnableAsAnonymous(AtomicReference<Iterator<Tag>> refTags) {
return new Runnable() {
@Override
public void run() {
refTags.set(InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
}
};
}

public static Runnable getRunnableAsNamed(AtomicReference<Iterator<Tag>> refTags) {
return new TestRunnable(unused -> {
refTags.set(InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
});
}

public static Callable<Iterator<Tag>> getCallableAsLambda() {
return () -> InternalUtils.getTags(Tags.getTagger().getCurrentTagContext());
}

public static Callable<Iterator<Tag>> getCallableAsAnonymous() {
return new Callable<Iterator<Tag>>() {
@Override
public Iterator<Tag> call() throws Exception {
return InternalUtils.getTags(Tags.getTagger().getCurrentTagContext());
}
};
}

public static Callable<Iterator<Tag>> getCallableAsNamed() {
return new TestCallable<Iterator<Tag>>((unused) -> InternalUtils.getTags(Tags.getTagger().getCurrentTagContext()));
}
}
Loading