Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration tests for BraveTracingProvider #93

Merged
merged 5 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions brave/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ dependencies {
compile project(":processor")

compile "io.zipkin.brave:brave-instrumentation-kafka-clients:5.12.6"

itCompile "junit:junit:$junitVersion"
itCompile project(":testing")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import static org.junit.Assert.assertEquals;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;
import com.linecorp.decaton.testing.processor.ProducedRecord;

import brave.Tracing;
import brave.kafka.clients.KafkaTracing;

public class BraveTracingTest {

private KafkaTracing kafkaTracing;
private Tracing tracing;

public static class BraveTracePropagation implements ProcessingGuarantee {
private final Map<String, String> producedTraceIds = new ConcurrentHashMap<>();
private final Map<String, String> consumedTraceIds = new ConcurrentHashMap<>();
private final Tracing tracing;

public BraveTracePropagation(Tracing tracing) {
this.tracing = tracing;
}

@Override
public void onProduce(ProducedRecord record) {
producedTraceIds.put(record.task().getId(), tracing.currentTraceContext().get().traceIdString());
m50d marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
consumedTraceIds.put(record.task().getId(), tracing.currentTraceContext().get().traceIdString());
}

@Override
public void doAssert() {
assertEquals(producedTraceIds, consumedTraceIds);
}
}

@ClassRule
public static KafkaClusterRule rule = new KafkaClusterRule();

private String retryTopic;

@Before
public void setUp() {
retryTopic = rule.admin().createRandomTopic(3, 3);
tracing = Tracing.newBuilder().build();
kafkaTracing = KafkaTracing.create(tracing);
}

@After
public void tearDown() {
rule.admin().deleteTopics(true, retryTopic);
}

@Test(timeout = 30000)
public void testTracePropagation() throws Exception {
// scenario:
// * half of arrived tasks are retried once
// * after retried (i.e. retryCount() > 0), no more retry
final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) {
ctx.retry();
}
}))
.producerSupplier(
bootstrapServers -> kafkaTracing.producer(TestUtils.producer(bootstrapServers)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that necessary to pass producerSupplier ?

.producerDecorator(kafkaTracing::producer)

more natural?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there might be future tests where we wanted to supply a completely new producer (e.g. a mock/stub, different configuration parameters) rather than just wrap the existing one.

.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
.producerSupplier(config -> kafkaTracing.producer(
producerSupplier.getProducer(config)))
.build())
.tracingProvider(new BraveTracingProvider(kafkaTracing))
// If we retry tasks, there's no guarantee about ordering nor serial processing
.excludeSemantics(
GuaranteeType.PROCESS_ORDERING,
GuaranteeType.SERIAL_PROCESSING)
m50d marked this conversation as resolved.
Show resolved Hide resolved
.customSemantics(new BraveTracePropagation(tracing))
.build()
.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.hamcrest.MatcherAssert.assertThat;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -34,6 +35,7 @@
import com.linecorp.decaton.processor.runtime.RetryConfig;
import com.linecorp.decaton.processor.tracing.TestTracingProvider;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
Expand All @@ -48,7 +50,10 @@ public static class TracePropagation implements ProcessingGuarantee {

@Override
public void onProduce(ProducedRecord record) {
producedTraceIds.put(record.task().getId(), record.traceId());
producedTraceIds.put(record.task().getId(),
new String(
record.headers().lastHeader(TestTracingProvider.TRACE_HEADER).value(),
StandardCharsets.UTF_8));
}

@Override
Expand Down Expand Up @@ -93,6 +98,7 @@ public void testTracePropagation() throws Exception {
ctx.retry();
}
}))
.producerSupplier(config -> new TestTracingProducer(TestUtils.producer(config)))
.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class ProcessorTestSuite {
private final Set<ProcessingGuarantee> semantics;
private final SubscriptionStatesListener statesListener;
private final TracingProvider tracingProvider;
private final Function<String, Producer<String, DecatonTaskRequest>> producerSupplier;
private final Function<Integer, String> taskKeySupplier;
m50d marked this conversation as resolved.
Show resolved Hide resolved

private static final int DEFAULT_NUM_TASKS = 10000;
private static final int NUM_KEYS = 100;
Expand Down Expand Up @@ -137,6 +139,10 @@ public static class Builder {

private TracingProvider tracingProvider;

private Function<String, Producer<String, DecatonTaskRequest>> producerSupplier = TestUtils::producer;

private Function<Integer, String> taskKeySupplier = taskId -> String.valueOf(taskId % NUM_KEYS);

/**
* Exclude semantics from assertion.
* Intended to be used when we test a feature which breaks subset of semantics
Expand Down Expand Up @@ -178,7 +184,9 @@ public ProcessorTestSuite build() {
propertySupplier,
semantics,
statesListener,
tracingProvider);
tracingProvider,
producerSupplier,
taskKeySupplier);
}
}

Expand All @@ -199,7 +207,7 @@ public void run() {
ProcessorSubscription[] subscriptions = new ProcessorSubscription[NUM_SUBSCRIPTION_INSTANCES];

try {
producer = TestUtils.producer(rule.bootstrapServers());
producer = producerSupplier.apply(rule.bootstrapServers());
for (int i = 0; i < subscriptions.length; i++) {
subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch));
}
Expand Down Expand Up @@ -317,7 +325,7 @@ private CompletableFuture<Map<Integer, Long>> produceTasks(
TestTaskSerializer serializer = new TestTaskSerializer();
for (int i = 0; i < produceFutures.length; i++) {
TestTask task = new TestTask(String.valueOf(i));
String key = String.valueOf(i % NUM_KEYS);
String key = taskKeySupplier.apply(i);
TaskMetadataProto taskMetadata =
TaskMetadataProto.newBuilder()
.setTimestampMillis(System.currentTimeMillis())
Expand All @@ -329,12 +337,8 @@ private CompletableFuture<Map<Integer, Long>> produceTasks(
.setMetadata(taskMetadata)
.setSerializedTask(ByteString.copyFrom(serializer.serialize(task)))
.build();
String traceId = "trace-" + UUID.randomUUID();
final RecordHeader traceHeader = new RecordHeader(TestTracingProvider.TRACE_HEADER,
traceId.getBytes(StandardCharsets.UTF_8));
ProducerRecord<String, DecatonTaskRequest> record =
new ProducerRecord<>(topic, null, taskMetadata.getTimestampMillis(), key, request,
Collections.singleton(traceHeader));
new ProducerRecord<>(topic, null, taskMetadata.getTimestampMillis(), key, request);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
produceFutures[i] = future;

Expand All @@ -346,7 +350,7 @@ private CompletableFuture<Map<Integer, Long>> produceTasks(
metadata.partition()),
metadata.offset(),
task,
traceId));
record.headers()));
} else {
future.completeExceptionally(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.decaton.testing.processor;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;

import lombok.Value;
import lombok.experimental.Accessors;
Expand Down Expand Up @@ -44,7 +45,7 @@ public class ProducedRecord {
*/
TestTask task;
/**
* Trace ID
* Headers that were set on the produced task
*/
String traceId;
Headers headers;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -69,12 +70,13 @@ public void abortTransaction() throws ProducerFencedException {
}

private static void propagateCurrentTrace(ProducerRecord<String, DecatonTaskRequest> record) {
final String traceId = TestTracingProvider.getCurrentTraceId();
if (null != traceId) {
final RecordHeader tracingHeader = new RecordHeader(TestTracingProvider.TRACE_HEADER,
traceId.getBytes(StandardCharsets.UTF_8));
record.headers().add(tracingHeader);
String traceId = TestTracingProvider.getCurrentTraceId();
if (null == traceId) {
traceId = "trace-" + UUID.randomUUID();
}
final RecordHeader tracingHeader = new RecordHeader(TestTracingProvider.TRACE_HEADER,
traceId.getBytes(StandardCharsets.UTF_8));
record.headers().add(tracingHeader);
}

@Override
Expand Down