Skip to content

Commit

Permalink
Support micromater-tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
be-hase committed May 16, 2023
1 parent 02024a9 commit 71e0393
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 0 deletions.
8 changes: 8 additions & 0 deletions micrometer-tracing/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dependencies {
api project(":processor")

api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "io.micrometer:micrometer-tracing:1.1.1"

itImplementation project(":testing")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

final class MicrometerProcessorTraceHandle extends MicrometerTraceHandle implements ProcessorTraceHandle {
private Tracer.SpanInScope scope;

MicrometerProcessorTraceHandle(Tracer tracer, Span span) {
super(tracer, span);
}

@Override
public void processingStart() {
scope = tracer.withSpan(span);
}

@Override
public void processingReturn() {
span.event("return");
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.tracing.TracingProvider;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

final class MicrometerRecordTraceHandle extends MicrometerTraceHandle implements TracingProvider.RecordTraceHandle {
MicrometerRecordTraceHandle(Tracer tracer, Span span) {
super(tracer, span);
}

@Override
public MicrometerProcessorTraceHandle childFor(DecatonProcessor<?> processor) {
final Span childSpan = tracer.nextSpan(span).name(processor.name()).start();
return new MicrometerProcessorTraceHandle(tracer, childSpan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider.TraceHandle;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

class MicrometerTraceHandle implements TraceHandle {
protected final Tracer tracer;
protected final Span span;

MicrometerTraceHandle(Tracer tracer, Span span) {
this.tracer = tracer;
this.span = span;
}

@Override
public void processingCompletion() {
span.end();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider;
import io.micrometer.common.lang.Nullable;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import static java.nio.charset.StandardCharsets.UTF_8;

public class MicrometerTracingProvider implements TracingProvider {
private final Tracer tracer;
private final Propagator propagator;

public MicrometerTracingProvider(Tracer tracer, Propagator propagator) {
this.tracer = tracer;
this.propagator = propagator;
}

@Override
public MicrometerRecordTraceHandle traceFor(ConsumerRecord<?, ?> record, String subscriptionId) {
return new MicrometerRecordTraceHandle(
tracer,
propagator.extract(record.headers(), GETTER)
.name("decaton").tag("subscriptionId", subscriptionId).start()
);
}

private static final Propagator.Getter<Headers> GETTER = new Propagator.Getter<Headers>() {
@Override
public String get(Headers carrier, String key) {
return lastStringHeader(carrier, key);
}

@Nullable
private String lastStringHeader(Headers headers, String key) {
final Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), UTF_8);
}
};
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ include ":docs"
include ":testing"
include ":benchmark"
include ":brave"
include ":micrometer-tracing"

0 comments on commit 71e0393

Please sign in to comment.