From 71e0393251a7f593f4414907d10fb3fe21c4600a Mon Sep 17 00:00:00 2001 From: ryosuke-hasebe Date: Tue, 16 May 2023 15:50:56 +0900 Subject: [PATCH] Support micromater-tracing --- micrometer-tracing/build.gradle | 8 +++ .../MicrometerProcessorTraceHandle.java | 40 ++++++++++++ .../runtime/MicrometerRecordTraceHandle.java | 34 ++++++++++ .../runtime/MicrometerTraceHandle.java | 36 +++++++++++ .../runtime/MicrometerTracingProvider.java | 62 +++++++++++++++++++ settings.gradle | 1 + 6 files changed, 181 insertions(+) create mode 100644 micrometer-tracing/build.gradle create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java diff --git a/micrometer-tracing/build.gradle b/micrometer-tracing/build.gradle new file mode 100644 index 00000000..cb5f9392 --- /dev/null +++ b/micrometer-tracing/build.gradle @@ -0,0 +1,8 @@ +dependencies { + api project(":processor") + + api "org.apache.kafka:kafka-clients:$kafkaVersion" + api "io.micrometer:micrometer-tracing:1.1.1" + + itImplementation project(":testing") +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java new file mode 100644 index 00000000..b2c54dd0 --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; + +final class MicrometerProcessorTraceHandle extends MicrometerTraceHandle implements ProcessorTraceHandle { + private Tracer.SpanInScope scope; + + MicrometerProcessorTraceHandle(Tracer tracer, Span span) { + super(tracer, span); + } + + @Override + public void processingStart() { + scope = tracer.withSpan(span); + } + + @Override + public void processingReturn() { + span.event("return"); + scope.close(); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java new file mode 100644 index 00000000..b762b27a --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.DecatonProcessor; +import com.linecorp.decaton.processor.tracing.TracingProvider; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; + +final class MicrometerRecordTraceHandle extends MicrometerTraceHandle implements TracingProvider.RecordTraceHandle { + MicrometerRecordTraceHandle(Tracer tracer, Span span) { + super(tracer, span); + } + + @Override + public MicrometerProcessorTraceHandle childFor(DecatonProcessor processor) { + final Span childSpan = tracer.nextSpan(span).name(processor.name()).start(); + return new MicrometerProcessorTraceHandle(tracer, childSpan); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java new file mode 100644 index 00000000..c0e56a28 --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.tracing.TracingProvider.TraceHandle; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; + +class MicrometerTraceHandle implements TraceHandle { + protected final Tracer tracer; + protected final Span span; + + MicrometerTraceHandle(Tracer tracer, Span span) { + this.tracer = tracer; + this.span = span; + } + + @Override + public void processingCompletion() { + span.end(); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java new file mode 100644 index 00000000..d0fee5cc --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.tracing.TracingProvider; +import io.micrometer.common.lang.Nullable; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.propagation.Propagator; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class MicrometerTracingProvider implements TracingProvider { + private final Tracer tracer; + private final Propagator propagator; + + public MicrometerTracingProvider(Tracer tracer, Propagator propagator) { + this.tracer = tracer; + this.propagator = propagator; + } + + @Override + public MicrometerRecordTraceHandle traceFor(ConsumerRecord record, String subscriptionId) { + return new MicrometerRecordTraceHandle( + tracer, + propagator.extract(record.headers(), GETTER) + .name("decaton").tag("subscriptionId", subscriptionId).start() + ); + } + + private static final Propagator.Getter GETTER = new Propagator.Getter() { + @Override + public String get(Headers carrier, String key) { + return lastStringHeader(carrier, key); + } + + @Nullable + private String lastStringHeader(Headers headers, String key) { + final Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), UTF_8); + } + }; +} diff --git a/settings.gradle b/settings.gradle index 5b794a37..86b0889b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,3 +10,4 @@ include ":docs" include ":testing" include ":benchmark" include ":brave" +include ":micrometer-tracing"