diff --git a/core/build.gradle b/core/build.gradle index d7cc90394d2..1a09b846706 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -156,6 +156,13 @@ dependencies { optionalImplementation libs.brotli4j.osx optionalImplementation libs.brotli4j.osx.aarch64 optionalImplementation libs.brotli4j.windows + + // for testing the observation API with tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.context.slf4j + testImplementation libs.brave.instrumentation.http.tests } if (!rootProject.hasProperty('noWeb')) { diff --git a/core/src/main/java/com/linecorp/armeria/client/observation/ClientObservationContext.java b/core/src/main/java/com/linecorp/armeria/client/observation/ClientObservationContext.java new file mode 100644 index 00000000000..e13f2502217 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/observation/ClientObservationContext.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.observation; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.RequestHeadersBuilder; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLog; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.transport.RequestReplySenderContext; + +/** + * A {@link Context} which may be used in conjunction with {@link ObservationClient} + * to implement custom {@link ObservationConvention}s or {@link ObservationHandler}s. + *
{@code
+ * ObservationConvention convention = ...
+ * WebClient.builder()
+ *          .decorator(ObservationClient.newDecorator(registry, convention))
+ * ...
+ * }
+ */ +@UnstableApi +public final class ClientObservationContext + extends RequestReplySenderContext { + + private final ClientRequestContext clientRequestContext; + private final HttpRequest httpRequest; + + ClientObservationContext(ClientRequestContext clientRequestContext, RequestHeadersBuilder carrier, + HttpRequest httpRequest) { + super(RequestHeadersBuilder::add); + this.clientRequestContext = clientRequestContext; + this.httpRequest = httpRequest; + setCarrier(carrier); + } + + /** + * The {@link ClientRequestContext} associated with this {@link Context}. + */ + public ClientRequestContext requestContext() { + return clientRequestContext; + } + + /** + * The {@link HttpRequest} associated with this {@link Context}. + */ + public HttpRequest httpRequest() { + return httpRequest; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).omitNullValues() + .add("clientRequestContext", clientRequestContext) + .add("httpRequest", httpRequest) + .toString(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/observation/DefaultHttpClientObservationConvention.java b/core/src/main/java/com/linecorp/armeria/client/observation/DefaultHttpClientObservationConvention.java new file mode 100644 index 00000000000..8c6d3e99947 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/observation/DefaultHttpClientObservationConvention.java @@ -0,0 +1,152 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.observation; + +import static com.google.common.base.MoreObjects.firstNonNull; + +import java.net.InetSocketAddress; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.observation.HttpClientObservationDocumentation.HighCardinalityKeys; +import com.linecorp.armeria.client.observation.HttpClientObservationDocumentation.LowCardinalityKeys; +import com.linecorp.armeria.common.SerializationFormat; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.logging.RequestLogAccess; +import com.linecorp.armeria.common.logging.RequestLogProperty; + +import io.micrometer.common.KeyValue; +import io.micrometer.common.KeyValues; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +class DefaultHttpClientObservationConvention implements ObservationConvention { + + static final DefaultHttpClientObservationConvention INSTANCE = + new DefaultHttpClientObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(ClientObservationContext context) { + final ClientRequestContext ctx = context.requestContext(); + int expectedSize = 1; + final RequestLog log = context.getResponse(); + KeyValue protocol = null; + KeyValue serializationFormat = null; + KeyValue statusCode = null; + if (log != null) { + protocol = LowCardinalityKeys.HTTP_PROTOCOL.withValue(protocol(log)); + statusCode = LowCardinalityKeys.STATUS_CODE + .withValue(log.responseStatus().codeAsText()); + expectedSize = 3; + final String serFmt = serializationFormat(log); + if (serFmt != null) { + expectedSize = 4; + serializationFormat = LowCardinalityKeys.HTTP_SERIALIZATION_FORMAT.withValue(serFmt); + } + } + final ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(expectedSize); + builder.add(LowCardinalityKeys.HTTP_METHOD.withValue(ctx.method().name())); + addIfNotNull(protocol, builder); + addIfNotNull(statusCode, builder); + addIfNotNull(serializationFormat, builder); + return KeyValues.of(builder.build()); + } + + private static void addIfNotNull(@Nullable KeyValue keyValue, ImmutableList.Builder builder) { + if (keyValue != null) { + builder.add(keyValue); + } + } + + @Override + public KeyValues getHighCardinalityKeyValues(ClientObservationContext context) { + final ClientRequestContext ctx = context.requestContext(); + int expectedSize = 3; + KeyValue addressRemote = null; + KeyValue addressLocal = null; + KeyValue error = null; + if (context.getResponse() != null) { + final RequestLog log = ctx.log().ensureComplete(); + final InetSocketAddress raddr = ctx.remoteAddress(); + if (raddr != null) { + expectedSize = expectedSize + 1; + addressRemote = HighCardinalityKeys.ADDRESS_REMOTE.withValue(raddr.toString()); + } + final InetSocketAddress laddr = ctx.localAddress(); + if (laddr != null) { + expectedSize = expectedSize + 1; + addressLocal = HighCardinalityKeys.ADDRESS_LOCAL.withValue(laddr.toString()); + } + + final Throwable responseCause = log.responseCause(); + if (responseCause != null) { + expectedSize = expectedSize + 1; + error = HighCardinalityKeys.ERROR.withValue(responseCause.toString()); + } else if (log.responseStatus().isError()) { + expectedSize = expectedSize + 1; + error = HighCardinalityKeys.ERROR.withValue(log.responseStatus().codeAsText()); + } + } + final ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(expectedSize); + builder.add(HighCardinalityKeys.HTTP_PATH.withValue(ctx.path()), + HighCardinalityKeys.HTTP_HOST.withValue(firstNonNull(ctx.authority(), "UNKNOWN")), + HighCardinalityKeys.HTTP_URL.withValue(ctx.uri().toString())); + addIfNotNull(addressRemote, builder); + addIfNotNull(addressLocal, builder); + addIfNotNull(error, builder); + return KeyValues.of(builder.build()); + } + + /** + * Returns the {@link SessionProtocol#uriText()} of the {@link RequestLog}. + */ + private static String protocol(RequestLog requestLog) { + return requestLog.sessionProtocol().uriText(); + } + + /** + * Returns the {@link SerializationFormat#uriText()} if it's not {@link SerializationFormat#NONE}. + */ + @Nullable + private static String serializationFormat(RequestLog requestLog) { + final SerializationFormat serFmt = requestLog.serializationFormat(); + return serFmt == SerializationFormat.NONE ? null : serFmt.uriText(); + } + + @Override + public String getName() { + return "http.client.requests"; + } + + @Override + public String getContextualName(ClientObservationContext context) { + final RequestLogAccess logAccess = context.requestContext().log(); + if (logAccess.isAvailable(RequestLogProperty.NAME)) { + return logAccess.partial().fullName(); + } else { + return context.getName(); + } + } + + @Override + public boolean supportsContext(Context context) { + return context instanceof ClientObservationContext; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/observation/HttpClientObservationDocumentation.java b/core/src/main/java/com/linecorp/armeria/client/observation/HttpClientObservationDocumentation.java new file mode 100644 index 00000000000..e52cfbe3892 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/observation/HttpClientObservationDocumentation.java @@ -0,0 +1,195 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.observation; + +import java.net.URI; + +import com.linecorp.armeria.common.Request; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.Observation.Event; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +enum HttpClientObservationDocumentation implements ObservationDocumentation { + + /** + * A span collected by {@link ObservationClient}. + */ + OBSERVATION { + @Override + public Class> getDefaultConvention() { + return DefaultHttpClientObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityKeys.values(); + } + + @Override + public KeyName[] getHighCardinalityKeyNames() { + return HighCardinalityKeys.values(); + } + + @Override + public Event[] getEvents() { + return Events.values(); + } + }; + + enum LowCardinalityKeys implements KeyName { + + /** + * The HTTP method of the request. + */ + HTTP_METHOD { + @Override + public String asString() { + return "http.method"; + } + }, + + /** + * The session-level protocol used for the request. + */ + STATUS_CODE { + @Override + public String asString() { + return "http.status_code"; + } + }, + + /** + * The session-level protocol used for the request. + */ + HTTP_PROTOCOL { + @Override + public String asString() { + return "http.protocol"; + } + }, + + /** + * The serialization format used for the HTTP request if exists. + * An example can be the `gproto` format when using gRPC. + */ + HTTP_SERIALIZATION_FORMAT { + @Override + public String asString() { + return "http.serfmt"; + } + } + } + + enum HighCardinalityKeys implements KeyName { + + /** + * The absolute path part of the current {@link Request} URI, excluding the query part. + */ + HTTP_PATH { + @Override + public String asString() { + return "http.path"; + } + }, + + /** + * The authority of the current {@link Request}. + */ + HTTP_HOST { + @Override + public String asString() { + return "http.host"; + } + }, + + /** + * The {@link URI} associated with the current {@link Request}. + */ + HTTP_URL { + @Override + public String asString() { + return "http.url"; + } + }, + + /** + * The remote address of this request. + */ + ADDRESS_REMOTE { + @Override + public String asString() { + return "address.remote"; + } + }, + + /** + * The local address of this request. + */ + ADDRESS_LOCAL { + @Override + public String asString() { + return "address.local"; + } + }, + + /** + * The response cause for why the request has failed. + */ + ERROR { + @Override + public String asString() { + return "error"; + } + } + } + + enum Events implements Event { + + /** + * Semi-official annotation for the time the first bytes were sent on the wire. + */ + WIRE_SEND { + @Override + public String getName() { + return "ws"; + } + + @Override + public String getContextualName() { + return "ws"; + } + }, + + /** + * Semi-official annotation for the time the first bytes were received on the wire. + */ + WIRE_RECEIVE { + @Override + public String getName() { + return "wr"; + } + + @Override + public String getContextualName() { + return "wr"; + } + } + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java b/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java new file mode 100644 index 00000000000..3d4bcb30f93 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java @@ -0,0 +1,161 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.observation; + +import static java.util.Objects.requireNonNull; + +import java.util.function.Function; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.SimpleDecoratingHttpClient; +import com.linecorp.armeria.client.observation.HttpClientObservationDocumentation.Events; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.RequestHeadersBuilder; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLogProperty; +import com.linecorp.armeria.internal.common.RequestContextExtension; +import com.linecorp.armeria.server.observation.ObservationService; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.ObservationRegistry; + +/** + * Decorates an {@link HttpClient} to trace outbound {@link HttpRequest}s using + * Micrometer Observation. + * The following may be a typical implementation using a brave implementation: + *
{@code
+ * // create a tracer
+ * BraveCurrentTraceContext braveCurrentTraceContext = new BraveCurrentTraceContext(
+ *   tracing.currentTraceContext());
+ * BravePropagator bravePropagator = new BravePropagator(tracing);
+ * BraveTracer braveTracer = new BraveTracer(tracing.tracer(), braveCurrentTraceContext,
+ *                                           new BraveBaggageManager());
+ *
+ * // add tracing handlers
+ * List> tracingHandlers =
+ *   Arrays.asList(new PropagatingSenderTracingObservationHandler<>(braveTracer, bravePropagator),
+ *                 new PropagatingReceiverTracingObservationHandler<>(braveTracer, bravePropagator),
+ *                 new DefaultTracingObservationHandler(braveTracer));
+ *
+ * // create a registry
+ * ObservationRegistry observationRegistry = ObservationRegistry.create();
+ *
+ * // add the tracing handlers
+ * observationRegistry.observationConfig().observationHandler(
+ *         new FirstMatchingCompositeObservationHandler(tracingHandlers));
+ *
+ * // add the decorator
+ * WebClient.builder()
+ *          .decorator(ObservationClient.newDecorator(observationRegistry))
+ * ...
+ * }
+ * + * @see ObservationService + */ +@UnstableApi +public final class ObservationClient extends SimpleDecoratingHttpClient { + + /** + * Creates a new micrometer observation integrated {@link HttpClient} decorator + * using the specified {@link ObservationRegistry}. + */ + public static Function newDecorator( + ObservationRegistry observationRegistry) { + requireNonNull(observationRegistry, "observationRegistry"); + return delegate -> new ObservationClient(delegate, observationRegistry, null); + } + + /** + * Creates a new micrometer observation integrated {@link HttpClient} decorator + * using the specified {@link ObservationRegistry} and {@link ObservationConvention}. + */ + public static Function newDecorator( + ObservationRegistry observationRegistry, + ObservationConvention observationConvention) { + requireNonNull(observationRegistry, "observationRegistry"); + requireNonNull(observationConvention, "observationConvention"); + return delegate -> new ObservationClient(delegate, observationRegistry, + observationConvention); + } + + private final ObservationRegistry observationRegistry; + + @Nullable + private final ObservationConvention httpClientObservationConvention; + + private ObservationClient( + HttpClient delegate, ObservationRegistry observationRegistry, + @Nullable ObservationConvention httpClientObservationConvention) { + super(delegate); + this.observationRegistry = requireNonNull(observationRegistry, "observationRegistry"); + this.httpClientObservationConvention = httpClientObservationConvention; + } + + @Override + public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception { + final RequestHeadersBuilder newHeaders = req.headers().toBuilder(); + final ClientObservationContext clientObservationContext = + new ClientObservationContext(ctx, newHeaders, req); + final Observation observation = HttpClientObservationDocumentation.OBSERVATION.observation( + httpClientObservationConvention, DefaultHttpClientObservationConvention.INSTANCE, + () -> clientObservationContext, observationRegistry).start(); + final HttpRequest newReq = req.withHeaders(newHeaders); + ctx.updateRequest(newReq); + final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class); + + if (observationRegistry.isNoop() || observation.isNoop()) { + return unwrap().execute(ctx, newReq); + } + + if (ctxExtension != null) { + // Make the span the current span and run scope decorators when the ctx is pushed. + ctxExtension.hook(observation::openScope); + } + + enrichObservation(ctx, clientObservationContext, observation); + + return observation.scopedChecked(() -> unwrap().execute(ctx, newReq)); + } + + private static void enrichObservation(ClientRequestContext ctx, + ClientObservationContext clientObservationContext, + Observation observation) { + ctx.log() + .whenAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME) + .thenAccept(requestLog -> observation.event(Events.WIRE_SEND)); + + ctx.log() + .whenAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME) + .thenAccept(requestLog -> { + if (requestLog.responseFirstBytesTransferredTimeNanos() != null) { + observation.event(Events.WIRE_RECEIVE); + } + }); + + ctx.log().whenComplete() + .thenAccept(requestLog -> { + // TODO: ClientConnectionTimings - there is no way to record events + // with a specific timestamp for an observation + clientObservationContext.setResponse(requestLog); + observation.stop(); + }); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/observation/package-info.java b/core/src/main/java/com/linecorp/armeria/client/observation/package-info.java new file mode 100644 index 00000000000..eae4ef33f51 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/observation/package-info.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Observation basing on Micrometer Observation. + */ +@UnstableApi +@NonNullByDefault +package com.linecorp.armeria.client.observation; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; +import com.linecorp.armeria.common.annotation.UnstableApi; diff --git a/core/src/main/java/com/linecorp/armeria/server/observation/DefaultServiceObservationConvention.java b/core/src/main/java/com/linecorp/armeria/server/observation/DefaultServiceObservationConvention.java new file mode 100644 index 00000000000..3dc137ba49f --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/observation/DefaultServiceObservationConvention.java @@ -0,0 +1,153 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.observation; + +import static com.google.common.base.MoreObjects.firstNonNull; + +import java.net.InetSocketAddress; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.common.SerializationFormat; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.logging.RequestLogAccess; +import com.linecorp.armeria.common.logging.RequestLogProperty; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.observation.HttpServiceObservationDocumentation.HighCardinalityKeys; +import com.linecorp.armeria.server.observation.HttpServiceObservationDocumentation.LowCardinalityKeys; + +import io.micrometer.common.KeyValue; +import io.micrometer.common.KeyValues; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +final class DefaultServiceObservationConvention implements ObservationConvention { + + static final DefaultServiceObservationConvention INSTANCE = + new DefaultServiceObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(ServiceObservationContext context) { + final ServiceRequestContext ctx = context.requestContext(); + int expectedSize = 1; + KeyValue protocol = null; + KeyValue serializationFormat = null; + KeyValue statusCode = null; + if (context.getResponse() != null) { + final RequestLog log = ctx.log().ensureComplete(); + protocol = LowCardinalityKeys.HTTP_PROTOCOL.withValue(protocol(log)); + statusCode = LowCardinalityKeys.STATUS_CODE + .withValue(log.responseStatus().codeAsText()); + expectedSize = 3; + final String serFmt = serializationFormat(log); + if (serFmt != null) { + expectedSize = 4; + serializationFormat = LowCardinalityKeys.HTTP_SERIALIZATION_FORMAT.withValue(serFmt); + } + } + final ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(expectedSize); + builder.add(LowCardinalityKeys.HTTP_METHOD.withValue(ctx.method().name())); + addIfNotNull(protocol, builder); + addIfNotNull(statusCode, builder); + addIfNotNull(serializationFormat, builder); + return KeyValues.of(builder.build()); + } + + private void addIfNotNull(@Nullable KeyValue keyValue, ImmutableList.Builder builder) { + if (keyValue != null) { + builder.add(keyValue); + } + } + + @Override + public KeyValues getHighCardinalityKeyValues(ServiceObservationContext context) { + final ServiceRequestContext ctx = context.requestContext(); + int expectedSize = 3; + KeyValue addressRemote = null; + KeyValue addressLocal = null; + KeyValue error = null; + if (context.getResponse() != null) { + final RequestLog log = ctx.log().ensureComplete(); + final InetSocketAddress raddr = ctx.remoteAddress(); + if (raddr != null) { + expectedSize = expectedSize + 1; + addressRemote = HighCardinalityKeys.ADDRESS_REMOTE.withValue(raddr.toString()); + } + final InetSocketAddress laddr = ctx.localAddress(); + if (laddr != null) { + expectedSize = expectedSize + 1; + addressLocal = HighCardinalityKeys.ADDRESS_LOCAL.withValue(laddr.toString()); + } + + final Throwable responseCause = log.responseCause(); + if (responseCause != null) { + expectedSize = expectedSize + 1; + error = HighCardinalityKeys.ERROR.withValue(responseCause.toString()); + } else if (log.responseStatus().isError()) { + expectedSize = expectedSize + 1; + error = HighCardinalityKeys.ERROR.withValue(log.responseStatus().codeAsText()); + } + } + final ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(expectedSize); + builder.add(HighCardinalityKeys.HTTP_PATH.withValue(ctx.path()), + HighCardinalityKeys.HTTP_HOST.withValue(firstNonNull(context.httpRequest().authority(), + "UNKNOWN")), + HighCardinalityKeys.HTTP_URL.withValue(ctx.uri().toString())); + addIfNotNull(addressRemote, builder); + addIfNotNull(addressLocal, builder); + addIfNotNull(error, builder); + return KeyValues.of(builder.build()); + } + + /** + * Returns the {@link SessionProtocol#uriText()} of the {@link RequestLog}. + */ + private static String protocol(RequestLog requestLog) { + return requestLog.sessionProtocol().uriText(); + } + + /** + * Returns the {@link SerializationFormat#uriText()} if it's not {@link SerializationFormat#NONE}. + */ + @Nullable + private static String serializationFormat(RequestLog requestLog) { + final SerializationFormat serFmt = requestLog.serializationFormat(); + return serFmt == SerializationFormat.NONE ? null : serFmt.uriText(); + } + + @Override + public String getName() { + return "http.server.requests"; + } + + @Override + public String getContextualName(ServiceObservationContext context) { + final RequestLogAccess logAccess = context.requestContext().log(); + if (logAccess.isAvailable(RequestLogProperty.NAME)) { + return logAccess.partial().fullName(); + } else { + return context.getName(); + } + } + + @Override + public boolean supportsContext(Context context) { + return context instanceof ServiceObservationContext; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/server/observation/HttpServiceObservationDocumentation.java b/core/src/main/java/com/linecorp/armeria/server/observation/HttpServiceObservationDocumentation.java new file mode 100644 index 00000000000..8604a447f8f --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/observation/HttpServiceObservationDocumentation.java @@ -0,0 +1,195 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.observation; + +import java.net.URI; + +import com.linecorp.armeria.common.Request; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.Observation.Event; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +enum HttpServiceObservationDocumentation implements ObservationDocumentation { + + /** + * A span collected by {@link ObservationService}. + */ + OBSERVATION { + @Override + public Class> getDefaultConvention() { + return DefaultServiceObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityKeys.values(); + } + + @Override + public KeyName[] getHighCardinalityKeyNames() { + return HighCardinalityKeys.values(); + } + + @Override + public Event[] getEvents() { + return Events.values(); + } + }; + + enum LowCardinalityKeys implements KeyName { + + /** + * The HTTP method of the request. + */ + HTTP_METHOD { + @Override + public String asString() { + return "http.method"; + } + }, + + /** + * The HTTP status code of the response. + */ + STATUS_CODE { + @Override + public String asString() { + return "http.status_code"; + } + }, + + /** + * The session-level protocol used for the request. + */ + HTTP_PROTOCOL { + @Override + public String asString() { + return "http.protocol"; + } + }, + + /** + * The serialization format used for the HTTP request if exists. + * An example can be the `gproto` format when using gRPC. + */ + HTTP_SERIALIZATION_FORMAT { + @Override + public String asString() { + return "http.serfmt"; + } + } + } + + enum HighCardinalityKeys implements KeyName { + + /** + * The absolute path part of the current {@link Request} URI, excluding the query part. + */ + HTTP_PATH { + @Override + public String asString() { + return "http.path"; + } + }, + + /** + * The authority of the current {@link Request}. + */ + HTTP_HOST { + @Override + public String asString() { + return "http.host"; + } + }, + + /** + * The {@link URI} associated with the current {@link Request}. + */ + HTTP_URL { + @Override + public String asString() { + return "http.url"; + } + }, + + /** + * The remote address of this request. + */ + ADDRESS_REMOTE { + @Override + public String asString() { + return "address.remote"; + } + }, + + /** + * The local address of this request. + */ + ADDRESS_LOCAL { + @Override + public String asString() { + return "address.local"; + } + }, + + /** + * The response cause for why the request has failed. + */ + ERROR { + @Override + public String asString() { + return "error"; + } + } + } + + enum Events implements Event { + + /** + * Semi-official annotation for the time the first bytes were sent on the wire. + */ + WIRE_SEND { + @Override + public String getName() { + return "ws"; + } + + @Override + public String getContextualName() { + return "ws"; + } + }, + + /** + * Semi-official annotation for the time the first bytes were received on the wire. + */ + WIRE_RECEIVE { + @Override + public String getName() { + return "wr"; + } + + @Override + public String getContextualName() { + return "wr"; + } + } + } +} diff --git a/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java b/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java new file mode 100644 index 00000000000..a86a85b3d14 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java @@ -0,0 +1,162 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.observation; + +import static java.util.Objects.requireNonNull; + +import java.util.Set; +import java.util.function.Function; + +import com.linecorp.armeria.client.observation.ObservationClient; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLogProperty; +import com.linecorp.armeria.internal.common.RequestContextExtension; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.SimpleDecoratingHttpService; +import com.linecorp.armeria.server.TransientServiceOption; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.ObservationRegistry; + +/** + * Decorates an {@link HttpService} to trace inbound {@link HttpRequest}s using + * Micrometer Observation. + * The following may be a typical implementation using a brave implementation: + *
{@code
+ * // create a tracer
+ * BraveCurrentTraceContext braveCurrentTraceContext = new BraveCurrentTraceContext(
+ *   tracing.currentTraceContext());
+ * BravePropagator bravePropagator = new BravePropagator(tracing);
+ * BraveTracer braveTracer = new BraveTracer(tracing.tracer(), braveCurrentTraceContext,
+ *                                           new BraveBaggageManager());
+ *
+ * // add tracing handlers
+ * List> tracingHandlers =
+ *   Arrays.asList(new PropagatingSenderTracingObservationHandler<>(braveTracer, bravePropagator),
+ *                 new PropagatingReceiverTracingObservationHandler<>(braveTracer, bravePropagator),
+ *                 new DefaultTracingObservationHandler(braveTracer));
+ *
+ * // create a registry
+ * ObservationRegistry observationRegistry = ObservationRegistry.create();
+ *
+ * // add the tracing handlers
+ * observationRegistry.observationConfig().observationHandler(
+ *         new FirstMatchingCompositeObservationHandler(tracingHandlers));
+ *
+ * // add the decorator
+ * Server.builder()
+ *       .decorator(ObservationService.newDecorator(observationRegistry))
+ * ...
+ * }
+ * + * @see ObservationClient + */ +@UnstableApi +public final class ObservationService extends SimpleDecoratingHttpService { + + /** + * Creates a new micrometer observation integrated {@link HttpService} decorator using the + * specified {@link ObservationRegistry} instance. + */ + public static Function + newDecorator(ObservationRegistry observationRegistry) { + requireNonNull(observationRegistry, "observationRegistry"); + return service -> new ObservationService(service, observationRegistry, null); + } + + /** + * Creates a new micrometer observation integrated {@link HttpService} decorator using the + * specified {@link ObservationRegistry} and {@link ObservationConvention}. + */ + public static Function + newDecorator(ObservationRegistry observationRegistry, + ObservationConvention observationConvention) { + requireNonNull(observationRegistry, "observationRegistry"); + requireNonNull(observationConvention, "observationConvention"); + return service -> new ObservationService( + service, observationRegistry, observationConvention); + } + + private final ObservationRegistry observationRegistry; + @Nullable + private final ObservationConvention observationConvention; + + private ObservationService( + HttpService delegate, ObservationRegistry observationRegistry, + @Nullable ObservationConvention observationConvention) { + super(delegate); + this.observationRegistry = requireNonNull(observationRegistry, "observationRegistry"); + this.observationConvention = observationConvention; + } + + @Override + public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception { + final Set transientServiceOptions = ctx.config().transientServiceOptions(); + if (!transientServiceOptions.contains(TransientServiceOption.WITH_TRACING) || + !transientServiceOptions.contains(TransientServiceOption.WITH_METRIC_COLLECTION)) { + return unwrap().serve(ctx, req); + } + + final ServiceObservationContext serviceObservationContext = new ServiceObservationContext(ctx, req); + final Observation observation = HttpServiceObservationDocumentation.OBSERVATION.observation( + observationConvention, DefaultServiceObservationConvention.INSTANCE, + () -> serviceObservationContext, observationRegistry).start(); + + final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class); + + if (observationRegistry.isNoop() || observation.isNoop()) { + return unwrap().serve(ctx, req); + } + + if (ctxExtension != null) { + // Make the span the current span and run scope decorators when the ctx is pushed. + ctxExtension.hook(observation::openScope); + } + + enrichObservation(ctx, serviceObservationContext, observation); + + return observation.scopedChecked(() -> unwrap().serve(ctx, req)); + } + + private static void enrichObservation(ServiceRequestContext ctx, + ServiceObservationContext serviceObservationContext, + Observation observation) { + ctx.log() + .whenAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME) + .thenAccept(requestLog -> observation.event( + HttpServiceObservationDocumentation.Events.WIRE_RECEIVE)); + + ctx.log() + .whenAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME) + .thenAccept(requestLog -> { + if (requestLog.responseFirstBytesTransferredTimeNanos() != null) { + observation.event(HttpServiceObservationDocumentation.Events.WIRE_SEND); + } + }); + + ctx.log().whenComplete() + .thenAccept(requestLog -> { + serviceObservationContext.setResponse(requestLog); + observation.stop(); + }); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/server/observation/ServiceObservationContext.java b/core/src/main/java/com/linecorp/armeria/server/observation/ServiceObservationContext.java new file mode 100644 index 00000000000..8be71d28fce --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/observation/ServiceObservationContext.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.observation; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.server.ServiceRequestContext; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.transport.RequestReplyReceiverContext; + +/** + * A {@link Context} which may be used in conjunction with {@link ObservationService} + * to implement custom {@link ObservationConvention}s or {@link ObservationHandler}s. + *
{@code
+ * ObservationConvention convention = ...
+ * Server.builder()
+ *       .decorator(ObservationService.newDecorator(registry, convention))
+ * ...
+ * }
+ */ +@UnstableApi +public final class ServiceObservationContext extends RequestReplyReceiverContext { + + private final ServiceRequestContext serviceRequestContext; + private final HttpRequest httpRequest; + + ServiceObservationContext(ServiceRequestContext serviceRequestContext, HttpRequest httpRequest) { + super((c, key) -> c.headers().get(key)); + this.serviceRequestContext = serviceRequestContext; + this.httpRequest = httpRequest; + setCarrier(httpRequest); + } + + /** + * The {@link ServiceRequestContext} associated with this {@link Context}. + */ + public ServiceRequestContext requestContext() { + return serviceRequestContext; + } + + /** + * The {@link HttpRequest} associated with this {@link Context}. + */ + public HttpRequest httpRequest() { + return httpRequest; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).omitNullValues() + .add("serviceRequestContext", serviceRequestContext) + .add("httpRequest", httpRequest) + .toString(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/server/observation/package-info.java b/core/src/main/java/com/linecorp/armeria/server/observation/package-info.java new file mode 100644 index 00000000000..0fa2ad50f70 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/observation/package-info.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Observation basing on Micrometer Observation. + */ +@UnstableApi +@NonNullByDefault +package com.linecorp.armeria.server.observation; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; +import com.linecorp.armeria.common.annotation.UnstableApi; diff --git a/core/src/test/java/com/linecorp/armeria/client/observation/ObservationClientIntegrationTest.java b/core/src/test/java/com/linecorp/armeria/client/observation/ObservationClientIntegrationTest.java new file mode 100644 index 00000000000..011331980b7 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/observation/ObservationClientIntegrationTest.java @@ -0,0 +1,229 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.client.observation; + +import java.io.IOException; +import java.util.List; +import java.util.function.BiConsumer; + +import org.junit.AssumptionViolatedException; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.internal.common.observation.MicrometerObservationRegistryUtils; +import com.linecorp.armeria.server.ServiceRequestContext; + +import brave.propagation.CurrentTraceContext; +import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.test.http.ITHttpAsyncClient; +import io.micrometer.common.KeyValues; +import io.micrometer.observation.ObservationRegistry; +import okhttp3.Protocol; + +@RunWith(Parameterized.class) +public class ObservationClientIntegrationTest extends ITHttpAsyncClient { + + @Parameters + public static List sessionProtocols() { + return ImmutableList.of(SessionProtocol.H1C, SessionProtocol.H2C); + } + + private final List protocols; + private final SessionProtocol sessionProtocol; + + private ObservationRegistry observationRegistry; + + private DefaultHttpClientObservationConvention clientObservationConvention; + + public ObservationClientIntegrationTest(SessionProtocol sessionProtocol) { + this.sessionProtocol = sessionProtocol; + + if (sessionProtocol == SessionProtocol.H2C) { + protocols = ImmutableList.of(Protocol.H2_PRIOR_KNOWLEDGE); + } else { + protocols = ImmutableList.of(Protocol.HTTP_1_1); + } + } + + @Before + @Override + public void setup() throws IOException { + clientObservationConvention = DefaultHttpClientObservationConvention.INSTANCE; + server.setProtocols(protocols); + super.setup(); + } + + @Override + protected CurrentTraceContext.Builder currentTraceContextBuilder() { + return ThreadLocalCurrentTraceContext.newBuilder(); + } + + @Override + protected WebClient newClient(int port) { + return WebClient.builder(sessionProtocol.uriText() + "://127.0.0.1:" + port) + .decorator(ObservationClient.newDecorator( + observationRegistry(), clientObservationConvention)) + .build(); + } + + private ObservationRegistry observationRegistry() { + observationRegistry = MicrometerObservationRegistryUtils.observationRegistry(httpTracing); + return this.observationRegistry; + } + + @Test + @Override + public void callbackContextIsFromInvocationTime_root() { + try (SafeCloseable ignored = serverContext().push()) { + super.callbackContextIsFromInvocationTime_root(); + } + } + + @Test + @Override + public void addsStatusCodeWhenNotOk_async() { + try (SafeCloseable ignored = serverContext().push()) { + super.addsStatusCodeWhenNotOk_async(); + } + } + + @Test + @Override + public void usesParentFromInvocationTime() { + try (SafeCloseable ignored = serverContext().push()) { + super.usesParentFromInvocationTime(); + } + } + + @Test + @Override + @Ignore("TODO: maybe integrate with brave's clock") + public void clientTimestampAndDurationEnclosedByParent() { + } + + @Override + @Test + public void supportsPortableCustomization() throws IOException { + clientObservationConvention = new DefaultHttpClientObservationConvention() { + + @Override + public KeyValues getHighCardinalityKeyValues(ClientObservationContext context) { + context.setRemoteServiceName("remote-service"); // TODO: As a side effect + KeyValues values = + super.getHighCardinalityKeyValues( + context); + values = values.and( + KeyValues.of("http.url", + context.requestContext().uri().toString(), + "request_customizer.is_span", "false")); + if (context.getResponse() != null) { + values = values.and("response_customizer.is_span", "false"); + } + return values; + } + + @Override + public String getContextualName(ClientObservationContext context) { + return context.httpRequest().method() + .toString().toLowerCase() + " " + + context.httpRequest().path() + .substring(0, context.httpRequest().path().indexOf("?")); + } + }; + super.supportsPortableCustomization(); + } + + @Test + @Override + @Ignore("TODO: somehow propagate the parent context to the client callback") + public void callbackContextIsFromInvocationTime() { + // TODO(trustin): Can't make this pass because span is updated *after* we invoke the callback + // ITHttpAsyncClient gave us. + } + + @Override + @Ignore + public void supportsDeprecatedPortableCustomization() { + } + + @Override + @Ignore("We're not using HttpTracing at all") + public void customSampler() { + } + + @Test + @Override + public void redirect() { + throw new AssumptionViolatedException("Armeria does not support client redirect."); + } + + @Override + @Test + @Ignore + public void reportsServerAddress() throws IOException { + super.reportsServerAddress(); + } + + @Override + protected void closeClient(WebClient client) { + } + + @Override + protected void get(WebClient client, String pathIncludingQuery) { + client.blocking().get(pathIncludingQuery); + } + + @Override + protected void get(WebClient client, String path, BiConsumer callback) { + final HttpResponse res = client.get(path); + // Use 'handleAsync' to make sure a callback + // is invoked without the current trace context + res.aggregate().handleAsync((response, cause) -> { + if (cause == null) { + callback.accept(response.status().code(), null); + } else { + callback.accept(null, cause); + } + return null; + }); + } + + @Override + protected void post(WebClient client, String pathIncludingQuery, String body) { + client.blocking().post(pathIncludingQuery, body); + } + + @Override + protected void options(WebClient client, String path) { + client.blocking().options(path); + } + + static ServiceRequestContext serverContext() { + return ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/observation/ObservationClientTest.java b/core/src/test/java/com/linecorp/armeria/client/observation/ObservationClientTest.java new file mode 100644 index 00000000000..770e48ac856 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/observation/ObservationClientTest.java @@ -0,0 +1,293 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.observation; + +import static com.linecorp.armeria.client.observation.ObservationClient.newDecorator; +import static com.linecorp.armeria.internal.common.observation.MicrometerObservationRegistryUtils.observationRegistry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.UnprocessedRequestException; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.endpoint.EmptyEndpointGroupException; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.internal.common.observation.SpanCollector; + +import brave.Span.Kind; +import brave.Tracing; +import brave.handler.MutableSpan; +import brave.http.HttpTracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.CurrentTraceContext.ScopeDecorator; +import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.sampler.Sampler; +import io.micrometer.common.KeyValues; + +class ObservationClientTest { + + private static final String TEST_SERVICE = "test-service"; + + private static final String TEST_SPAN = "java.lang.Object/hello"; + + @AfterEach + void tearDown() { + Tracing.current().close(); + } + + @Test + void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextNotConfigured() { + newDecorator(observationRegistry( + HttpTracing.create( + Tracing.newBuilder().build())), + new DefaultHttpClientObservationConvention() { + @Override + public KeyValues getHighCardinalityKeyValues(ClientObservationContext context) { + context.setRemoteServiceName("remote-service"); + return super.getHighCardinalityKeyValues(context); + } + }); + } + + @Test + void shouldSubmitSpanWhenSampled() throws Exception { + final SpanCollector collector = new SpanCollector(); + + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .addSpanHandler(collector) + .sampler(Sampler.create(1.0f)) + .build(); + final RequestLog requestLog = testRemoteInvocation(tracing, null); + + // check span name + final MutableSpan span = collector.spans().poll(10, TimeUnit.SECONDS); + assertThat(span).isNotNull(); + assertThat(span.name()).isEqualTo(TEST_SPAN); + + // check kind + assertThat(span.kind()).isSameAs(Kind.CLIENT); + + // only one span should be submitted + assertThat(collector.spans().poll(1, TimeUnit.SECONDS)).isNull(); + + // check # of annotations (we add wire annotations) + assertThat(span.annotations()).hasSize(2); + assertTags(span); + + assertThat(span.traceId().length()).isEqualTo(16); + + // check duration is correct from request log - + // we're not setting timestamps so the values will not be the same + assertThat(span.finishTimestamp() - span.startTimestamp()) + .isNotEqualTo(requestLog.totalDurationNanos() / 1000); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + + // check remote service name + assertThat(span.remoteServiceName()).isEqualTo(null); + } + + @Test + void shouldSubmitSpanWithCustomRemoteName() throws Exception { + final SpanCollector collector = new SpanCollector(); + + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .addSpanHandler(collector) + .sampler(Sampler.create(1.0f)) + .build(); + testRemoteInvocation(tracing, "fooService"); + + // check span name + final MutableSpan span = collector.spans().poll(10, TimeUnit.SECONDS); + + // check tags + assertThat(span).isNotNull(); + assertThat(span.tags()).containsEntry("http.host", "foo.com") + .containsEntry("http.method", "POST") + .containsEntry("http.path", "/hello/armeria") + .containsEntry("http.url", "http://foo.com/hello/armeria") + .containsEntry("http.protocol", "h2c"); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + + // check remote service name + assertThat(span.remoteServiceName()).isEqualTo("fooService"); + } + + @Test + void scopeDecorator() throws Exception { + final SpanCollector collector = new SpanCollector(); + final AtomicInteger scopeDecoratorCallingCounter = new AtomicInteger(); + final ScopeDecorator scopeDecorator = (currentSpan, scope) -> { + scopeDecoratorCallingCounter.getAndIncrement(); + return scope; + }; + final CurrentTraceContext traceContext = + ThreadLocalCurrentTraceContext.newBuilder().addScopeDecorator(scopeDecorator).build(); + + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .currentTraceContext(traceContext) + .addSpanHandler(collector) + .sampler(Sampler.create(1.0f)) + .build(); + testRemoteInvocation(tracing, null); + + // check span name + final MutableSpan span = collector.spans().poll(10, TimeUnit.SECONDS); + + // check tags + assertTags(span); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + // check the client invocation had the current span in scope. + assertThat(scopeDecoratorCallingCounter.get()).isOne(); + } + + @Test + void shouldNotSubmitSpanWhenNotSampled() throws Exception { + final SpanCollector collector = new SpanCollector(); + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .addSpanHandler(collector) + .sampler(Sampler.create(0.0f)) + .build(); + testRemoteInvocation(tracing, null); + + assertThat(collector.spans().poll(1, TimeUnit.SECONDS)).isNull(); + } + + @Test + void testEmptyEndpointTags() { + final SpanCollector collector = new SpanCollector(); + final Tracing tracing = Tracing.newBuilder() + .addSpanHandler(collector) + .currentTraceContext(ThreadLocalCurrentTraceContext.create()) + .sampler(Sampler.ALWAYS_SAMPLE) + .build(); + final BlockingWebClient blockingWebClient = + WebClient.builder(SessionProtocol.HTTP, EndpointGroup.of()) + .decorator(newDecorator( + observationRegistry(tracing))).build() + .blocking(); + assertThatThrownBy(() -> blockingWebClient.get("/")) + .isInstanceOf(UnprocessedRequestException.class) + .hasCauseInstanceOf(EmptyEndpointGroupException.class); + assertThat(collector.spans()).hasSize(1); + final MutableSpan span = collector.spans().poll(); + assertThat(span.tag("http.host")).isEqualTo("UNKNOWN"); + assertThat(span.tag("http.url")).isEqualTo("http:/"); + } + + private static RequestLog testRemoteInvocation(Tracing tracing, @Nullable String remoteServiceName) + throws Exception { + + // prepare parameters + final HttpRequest req = HttpRequest + .of(RequestHeaders.of(HttpMethod.POST, "/hello/armeria", + HttpHeaderNames.SCHEME, "http", + HttpHeaderNames.AUTHORITY, "foo.com")); + final RpcRequest rpcReq = RpcRequest.of(Object.class, + "hello", "Armeria"); + final HttpResponse res = HttpResponse.of(HttpStatus.OK); + final RpcResponse rpcRes = RpcResponse.of("Hello, Armeria!"); + final ClientRequestContext ctx = ClientRequestContext.builder(req).build(); + // the authority is extracted even when the request doesn't declare an authority + final RequestHeaders headersWithoutAuthority = + req.headers().toBuilder().removeAndThen(HttpHeaderNames.AUTHORITY).build(); + ctx.updateRequest(req.withHeaders(headersWithoutAuthority)); + final HttpRequest actualReq = ctx.request(); + assertThat(actualReq).isNotNull(); + + ctx.logBuilder().requestFirstBytesTransferred(); + ctx.logBuilder().requestContent(rpcReq, actualReq); + ctx.logBuilder().endRequest(); + + try (SafeCloseable ignored = ctx.push()) { + final HttpClient delegate = mock(HttpClient.class); + when(delegate.execute(any(), any())).thenReturn(res); + + final ObservationClient stub = newDecorator(observationRegistry( + HttpTracing.create(tracing)), new DefaultHttpClientObservationConvention() { + @Override + public KeyValues getHighCardinalityKeyValues(ClientObservationContext context) { + context.setRemoteServiceName(remoteServiceName); + return super.getHighCardinalityKeyValues(context); + } + }).apply(delegate); + // do invoke + final HttpResponse actualRes = stub.execute(ctx, actualReq); + + assertThat(actualRes).isEqualTo(res); + + verify(delegate, times(1)).execute(same(ctx), argThat(arg -> { + final RequestHeaders headers = arg.headers(); + return headers.contains(HttpHeaderNames.of("x-b3-traceid")) && + headers.contains(HttpHeaderNames.of("x-b3-spanid")) && + headers.contains(HttpHeaderNames.of("x-b3-sampled")); + })); + } + + ctx.logBuilder().responseHeaders(ResponseHeaders.of(HttpStatus.OK)); + ctx.logBuilder().responseFirstBytesTransferred(); + ctx.logBuilder().responseContent(rpcRes, res); + ctx.logBuilder().endResponse(); + return ctx.log().ensureComplete(); + } + + private static void assertTags(@Nullable MutableSpan span) { + assertThat(span).isNotNull(); + assertThat(span.tags()).containsEntry("http.host", "foo.com") + .containsEntry("http.method", "POST") + .containsEntry("http.path", "/hello/armeria") + .containsEntry("http.url", "http://foo.com/hello/armeria") + .containsEntry("http.protocol", "h2c"); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/observation/MicrometerObservationRegistryUtils.java b/core/src/test/java/com/linecorp/armeria/internal/common/observation/MicrometerObservationRegistryUtils.java new file mode 100644 index 00000000000..e2a88d4d5f0 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/internal/common/observation/MicrometerObservationRegistryUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common.observation; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import brave.Tracing; +import brave.http.HttpTracing; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.observation.MeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.brave.bridge.BraveBaggageManager; +import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext; +import io.micrometer.tracing.brave.bridge.BravePropagator; +import io.micrometer.tracing.brave.bridge.BraveTracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.handler.TracingObservationHandler; + +public final class MicrometerObservationRegistryUtils { + + public static ObservationRegistry observationRegistry(HttpTracing httpTracing) { + return observationRegistry(httpTracing.tracing()); + } + + public static ObservationRegistry observationRegistry(Tracing tracing) { + final BraveCurrentTraceContext braveCurrentTraceContext = new BraveCurrentTraceContext( + tracing.currentTraceContext()); + final BravePropagator bravePropagator = new BravePropagator(tracing); + final BraveTracer braveTracer = new BraveTracer(tracing.tracer(), braveCurrentTraceContext, + new BraveBaggageManager()); + final List> tracingHandlers = + Arrays.asList(new PropagatingSenderTracingObservationHandler<>(braveTracer, bravePropagator), + new PropagatingReceiverTracingObservationHandler<>(braveTracer, bravePropagator), + new DefaultTracingObservationHandler(braveTracer)); + + final MeterRegistry meterRegistry = new SimpleMeterRegistry(); + final List> meterHandlers = Collections.singletonList( + new DefaultMeterObservationHandler(meterRegistry)); + + final ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig().observationHandler( + new ObservationHandler.CompositeObservationHandler.FirstMatchingCompositeObservationHandler( + tracingHandlers)); + observationRegistry.observationConfig().observationHandler( + new ObservationHandler.CompositeObservationHandler.FirstMatchingCompositeObservationHandler( + meterHandlers)); + return observationRegistry; + } + + private MicrometerObservationRegistryUtils() {} +} diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/observation/SpanCollector.java b/core/src/test/java/com/linecorp/armeria/internal/common/observation/SpanCollector.java new file mode 100644 index 00000000000..01fa28a5b1b --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/internal/common/observation/SpanCollector.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common.observation; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; + +import brave.handler.MutableSpan; +import brave.handler.SpanHandler; +import brave.propagation.TraceContext; + +public final class SpanCollector extends SpanHandler { + + private final BlockingQueue spans = new LinkedTransferQueue<>(); + + @Override + public boolean end(TraceContext context, MutableSpan span, Cause cause) { + return spans.add(span); + } + + public BlockingQueue spans() { + return spans; + } +} diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/observation/it/CustomObservationTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/observation/it/CustomObservationTest.java new file mode 100644 index 00000000000..166fbcc3d7d --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/internal/common/observation/it/CustomObservationTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common.observation.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.observation.ClientObservationContext; +import com.linecorp.armeria.client.observation.ObservationClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.internal.common.observation.MicrometerObservationRegistryUtils; +import com.linecorp.armeria.internal.common.observation.SpanCollector; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.observation.ObservationService; +import com.linecorp.armeria.server.observation.ServiceObservationContext; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +import brave.Tracing; +import brave.sampler.Sampler; +import io.micrometer.common.KeyValues; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.ObservationRegistry; + +class CustomObservationTest { + + private static final SpanCollector spanHandler = new SpanCollector(); + + private static ObservationRegistry newTracing(String name) { + return MicrometerObservationRegistryUtils.observationRegistry(tracingBuilder(name)); + } + + private static Tracing tracingBuilder(String name) { + return Tracing.newBuilder() + .localServiceName(name) + .addSpanHandler(spanHandler) + .sampler(Sampler.ALWAYS_SAMPLE) + .build(); + } + + private static Function + customConventionServiceDecorator(String name) { + final ObservationConvention convention = + new ObservationConvention() { + @Override + public KeyValues getLowCardinalityKeyValues(ServiceObservationContext context) { + return KeyValues.of("ctx.id", context.requestContext().id().shortText()); + } + + @Override + public String getName() { + return "custom.convention.service"; + } + + @Override + public boolean supportsContext(Context context) { + return context instanceof ServiceObservationContext; + } + }; + return ObservationService.newDecorator(newTracing(name), convention); + } + + private static Function + customConventionClientDecorator(String name) { + final ObservationConvention convention = + new ObservationConvention() { + @Override + public KeyValues getLowCardinalityKeyValues(ClientObservationContext context) { + return KeyValues.of("ctx.id", context.requestContext().id().shortText()); + } + + @Override + public String getName() { + return "custom.convention.client"; + } + + @Override + public boolean supportsContext(Context context) { + return context instanceof ClientObservationContext; + } + }; + return ObservationClient.newDecorator(newTracing(name), convention); + } + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.route() + .path("/foo") + .decorator(customConventionServiceDecorator("tracing/foo")) + .build((ctx, req) -> HttpResponse.of("foo")); + } + }; + + @BeforeEach + void beforeEach() { + spanHandler.spans().clear(); + } + + @Test + void testCustomConvention() throws Exception { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + final BlockingWebClient client = server.blockingWebClient( + cb -> cb.decorator(customConventionClientDecorator("tracing/foo"))); + assertThat(client.get("/foo").contentUtf8()).isEqualTo("foo"); + await().until(() -> spanHandler.spans().size() == 2); + final List entries = + spanHandler.spans().stream().flatMap(span -> span.tags().values().stream()) + .collect(Collectors.toList()); + assertThat(entries).containsExactlyInAnyOrder( + captor.get().id().shortText(), + server.requestContextCaptor().take().id().shortText()); + } + } +} diff --git a/core/src/test/java/com/linecorp/armeria/server/observation/ObservationServiceTest.java b/core/src/test/java/com/linecorp/armeria/server/observation/ObservationServiceTest.java new file mode 100644 index 00000000000..f5e58524908 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/server/observation/ObservationServiceTest.java @@ -0,0 +1,246 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.observation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.logging.RequestLogBuilder; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.internal.common.observation.MicrometerObservationRegistryUtils; +import com.linecorp.armeria.internal.common.observation.SpanCollector; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.TransientHttpService; +import com.linecorp.armeria.server.TransientServiceOption; + +import brave.Span.Kind; +import brave.Tracing; +import brave.handler.MutableSpan; +import brave.handler.SpanHandler; +import brave.http.HttpTracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.CurrentTraceContext.ScopeDecorator; +import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.sampler.Sampler; + +class ObservationServiceTest { + + private static final String TEST_SERVICE = "test-service"; + + @AfterEach + public void tearDown() { + Tracing.current().close(); + } + + @Test + void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextConfigured() { + ObservationService.newDecorator( + MicrometerObservationRegistryUtils.observationRegistry(HttpTracing.create( + Tracing.newBuilder() + .currentTraceContext(ThreadLocalCurrentTraceContext.create()) + .build()))); + } + + @Test + void shouldSubmitSpanWhenRequestIsSampled() throws Exception { + final SpanCollector collector = new SpanCollector(); + final RequestLog requestLog = testServiceInvocation(collector, + ThreadLocalCurrentTraceContext.create(), + 1.0f); + + // check span name + final MutableSpan span = collector.spans().take(); + assertThat(span.name()).isEqualTo(requestLog.fullName()); + + // check kind + assertThat(span.kind()).isSameAs(Kind.SERVER); + + // only one span should be submitted + assertThat(collector.spans().poll(1, TimeUnit.SECONDS)).isNull(); + + // check # of annotations (we add wire annotations) + assertThat(span.annotations()).hasSize(2); + + // check tags + assertTags(span); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + + // check duration is correct from request log - we do it differently + assertThat(span.finishTimestamp() - span.startTimestamp()) + .isNotEqualTo(requestLog.totalDurationNanos() / 1000); + } + + @Test + void shouldNotSubmitSpanWhenRequestIsNotSampled() throws Exception { + final SpanCollector collector = new SpanCollector(); + testServiceInvocation(collector, ThreadLocalCurrentTraceContext.create(), 0.0f); + + // don't submit any spans + assertThat(collector.spans().poll(1, TimeUnit.SECONDS)).isNull(); + } + + @Test + void scopeDecorator() throws Exception { + final AtomicInteger scopeDecoratorCallingCounter = new AtomicInteger(); + final ScopeDecorator scopeDecorator = (currentSpan, scope) -> { + scopeDecoratorCallingCounter.getAndIncrement(); + return scope; + }; + final CurrentTraceContext traceContext = + ThreadLocalCurrentTraceContext.newBuilder() + .addScopeDecorator(scopeDecorator) + .build(); + final SpanCollector collector = new SpanCollector(); + testServiceInvocation(collector, traceContext, 1.0f); + + // check span name + final MutableSpan span = collector.spans().take(); + + // check tags + assertTags(span); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + // check the service invocation had the current span in scope. + assertThat(scopeDecoratorCallingCounter.get()).isOne(); + } + + @Test + void transientService() throws Exception { + final SpanCollector collector = new SpanCollector(); + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .addSpanHandler(collector) + .currentTraceContext(ThreadLocalCurrentTraceContext.create()) + .sampler(Sampler.ALWAYS_SAMPLE) + .build(); + + final HttpService transientService = new TransientHttpService() { + @Override + public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception { + return HttpResponse.of(HttpStatus.OK); + } + + @Override + public Set transientServiceOptions() { + return Collections.emptySet(); + } + }; + + final HttpRequest req = HttpRequest.of(RequestHeaders.of(HttpMethod.GET, "/internal/healthcheck", + HttpHeaderNames.SCHEME, "http", + HttpHeaderNames.AUTHORITY, "foo.com")); + final ServiceRequestContext ctx = ServiceRequestContext.builder(req) + .service(transientService) + .build(); + final RequestLogBuilder logBuilder = ctx.logBuilder(); + logBuilder.endRequest(); + + try (SafeCloseable ignored = ctx.push()) { + final ObservationService + service = ObservationService.newDecorator( + MicrometerObservationRegistryUtils.observationRegistry(tracing)).apply(transientService); + + // do invoke + final AggregatedHttpResponse res = service.serve(ctx, req).aggregate().join(); + logBuilder.responseHeaders(res.headers()); + logBuilder.responseFirstBytesTransferred(); + logBuilder.endResponse(); + } + + // don't submit any spans + assertThat(collector.spans().poll(1, TimeUnit.SECONDS)).isNull(); + } + + private static RequestLog testServiceInvocation(SpanHandler spanHandler, + CurrentTraceContext traceContext, + float samplingRate) throws Exception { + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .addSpanHandler(spanHandler) + .currentTraceContext(traceContext) + .sampler(Sampler.create(samplingRate)) + .build(); + + final HttpTracing httpTracing = HttpTracing.newBuilder(tracing) + .build(); + + final HttpRequest req = HttpRequest.of(RequestHeaders.of(HttpMethod.POST, "/hello/trustin", + HttpHeaderNames.SCHEME, "http", + HttpHeaderNames.AUTHORITY, "foo.com")); + final ServiceRequestContext ctx = ServiceRequestContext.builder(req).build(); + final RpcRequest rpcReq = RpcRequest.of(ObservationServiceTest.class, "hello", "trustin"); + final HttpResponse res = HttpResponse.of(HttpStatus.OK); + final RpcResponse rpcRes = RpcResponse.of("Hello, trustin!"); + final RequestLogBuilder logBuilder = ctx.logBuilder(); + logBuilder.requestContent(rpcReq, req); + logBuilder.endRequest(); + + try (SafeCloseable ignored = ctx.push()) { + final HttpService delegate = mock(HttpService.class); + final ObservationService + service = ObservationService.newDecorator( + MicrometerObservationRegistryUtils.observationRegistry(httpTracing)).apply(delegate); + when(delegate.serve(ctx, req)).thenReturn(res); + + // do invoke + service.serve(ctx, req); + + verify(delegate, times(1)).serve(eq(ctx), eq(req)); + } + + logBuilder.responseHeaders(ResponseHeaders.of(HttpStatus.OK)); + logBuilder.responseFirstBytesTransferred(); + logBuilder.responseContent(rpcRes, res); + logBuilder.endResponse(); + return ctx.log().ensureComplete(); + } + + private static void assertTags(MutableSpan span) { + assertThat(span.tags()).containsEntry("http.host", "foo.com") + .containsEntry("http.method", "POST") + .containsEntry("http.path", "/hello/trustin") + .containsEntry("http.url", "http://foo.com/hello/trustin") + .containsEntry("http.protocol", "h2c"); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/server/observation/SpanPropagationTest.java b/core/src/test/java/com/linecorp/armeria/server/observation/SpanPropagationTest.java new file mode 100644 index 00000000000..3cf63cfaa84 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/server/observation/SpanPropagationTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2021 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License + */ + +package com.linecorp.armeria.server.observation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.MDC; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.logging.LoggingClient; +import com.linecorp.armeria.client.observation.ObservationClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.internal.common.observation.MicrometerObservationRegistryUtils; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.logging.LoggingService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +import brave.Tracing; +import brave.baggage.BaggageFields; +import brave.context.slf4j.MDCScopeDecorator; +import brave.propagation.CurrentTraceContext; +import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.propagation.TraceContext; + +class SpanPropagationTest { + + private static final CurrentTraceContext traceCtx = + ThreadLocalCurrentTraceContext.newBuilder() + .addScopeDecorator(MDCScopeDecorator.get()) + .build(); + private static final Tracing tracing = Tracing.newBuilder() + .currentTraceContext(traceCtx) + .build(); + private static final AtomicReference clientTraceCtx = new AtomicReference<>(); + private static final AtomicReference serviceTraceCtx = new AtomicReference<>(); + + private static final AtomicReference> serviceMdcContextRef = new AtomicReference<>(); + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + + sb.service("/trace", (ctx, req) -> { + ctx.log().whenComplete() + .thenAcceptAsync(log -> { + serviceMdcContextRef.set(MDC.getCopyOfContextMap()); + serviceTraceCtx.set(tracing.currentTraceContext().get()); + }, ctx.eventLoop()); + return HttpResponse.from( + server.webClient(cb -> cb.decorator(ObservationClient.newDecorator( + MicrometerObservationRegistryUtils.observationRegistry(tracing)))) + .get("/bar").aggregate().thenApply(res -> { + return HttpResponse.of("OK"); + })); + }); + + sb.service("/bar", (ctx, req) -> { + return HttpResponse.of("OK"); + }); + sb.decorator(LoggingService.newDecorator()); + sb.decorator(ObservationService.newDecorator( + MicrometerObservationRegistryUtils.observationRegistry(tracing))); + } + }; + + @BeforeEach + void beforeEach() { + clientTraceCtx.set(null); + serviceTraceCtx.set(null); + } + + @Test + void mdcScopeDecorator() { + final WebClient client = WebClient.builder(server.httpUri()) + .decorator(((delegate, ctx, req) -> { + clientTraceCtx.set(tracing.currentTraceContext().get()); + return delegate.execute(ctx, req); + })) + .decorator(ObservationClient.newDecorator( + MicrometerObservationRegistryUtils.observationRegistry( + tracing))) + .decorator(LoggingClient.newDecorator()) + .build(); + + final AtomicReference> clientMdcContextRef = new AtomicReference<>(); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + client.prepare().get("/trace").execute().aggregate(); + final ClientRequestContext cctx = captor.get(); + cctx.log().whenComplete() + .thenAcceptAsync(log -> { + clientMdcContextRef.set(MDC.getCopyOfContextMap()); + }, cctx.eventLoop()); + } + + await().untilAtomic(serviceMdcContextRef, Matchers.notNullValue()); + await().untilAtomic(clientMdcContextRef, Matchers.notNullValue()); + + final Map serviceMdcContext = serviceMdcContextRef.get(); + final String serviceTraceId = serviceMdcContext.get(BaggageFields.TRACE_ID.name()); + final String serviceSpanId = serviceMdcContext.get(BaggageFields.SPAN_ID.name()); + assertThat(serviceTraceId).isEqualTo(serviceTraceCtx.get().traceIdString()); + assertThat(serviceSpanId).isEqualTo(serviceTraceCtx.get().spanIdString()); + + final Map clientMdcContext = clientMdcContextRef.get(); + final String clientTraceId = clientMdcContext.get(BaggageFields.TRACE_ID.name()); + final String clientSpanId = clientMdcContext.get(BaggageFields.SPAN_ID.name()); + assertThat(clientTraceId).isEqualTo(clientTraceCtx.get().traceIdString()); + assertThat(clientSpanId).isEqualTo(clientTraceCtx.get().spanIdString()); + + assertThat(serviceTraceCtx.get().parentIdString()).isEqualTo(clientTraceCtx.get().spanIdString()); + } +} diff --git a/dependencies.toml b/dependencies.toml index 33ffceb8e3a..1ca3eed55b9 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -79,6 +79,8 @@ logback12 = "1.2.11" logback13 = "1.3.8" logback14 = "1.4.8" micrometer = "1.11.2" +micrometer-tracing = "1.1.3" +micrometer-docs-generator = "1.0.2" micrometer13 = "1.3.20" mockito = "4.11.0" monix = "3.4.1" @@ -770,17 +772,34 @@ javadocs = "https://www.javadoc.io/doc/ch.qos.logback/logback-classic/1.4.7/" [libraries.micrometer-core] module = "io.micrometer:micrometer-core" version.ref = "micrometer" -javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-core/1.10.6/" +javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-core/1.10.8/" +[libraries.micrometer-observation] +module = "io.micrometer:micrometer-observation" +version.ref = "micrometer" +javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-observation/1.10.8/" +[libraries.micrometer-observation-test] +module = "io.micrometer:micrometer-observation-test" +version.ref = "micrometer" [libraries.micrometer-prometheus] module = "io.micrometer:micrometer-registry-prometheus" version.ref = "micrometer" -javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-registry-prometheus/1.10.6/" +javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-registry-prometheus/1.10.8/" [libraries.micrometer-spring-legacy] module = "io.micrometer:micrometer-spring-legacy" version.ref = "micrometer" javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-spring-legacy/1.3.20/" exclusions = ["org.springframework:spring-web", "org.springframework:spring-webmvc"] +[libraries.micrometer-tracing-integration-test] +module = "io.micrometer:micrometer-tracing-integration-test" +version.ref = "micrometer-tracing" +javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-tracing-integration-test/1.1.3/" + +[libraries.micrometer-docs-generator] +module = "io.micrometer:micrometer-docs-generator" +version.ref = "micrometer-docs-generator" +javadocs = "https://www.javadoc.io/doc/io.micrometer/micrometer-docs-generator/1.0.2/" + [libraries.micrometer13-core] module = "io.micrometer:micrometer-core" version.ref = "micrometer13" diff --git a/thrift/thrift0.12/build.gradle b/thrift/thrift0.12/build.gradle index dc097443160..fb1886b45dd 100644 --- a/thrift/thrift0.12/build.gradle +++ b/thrift/thrift0.12/build.gradle @@ -24,6 +24,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } tasks.processResources.from "${rootProject.projectDir}/thrift/thrift0.13/src/main/resources" diff --git a/thrift/thrift0.13/build.gradle b/thrift/thrift0.13/build.gradle index 82dfe3a1835..dbbe31c2229 100644 --- a/thrift/thrift0.13/build.gradle +++ b/thrift/thrift0.13/build.gradle @@ -15,6 +15,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the old compiler. diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/observation/BraveIntegrationTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/observation/BraveIntegrationTest.java new file mode 100644 index 00000000000..01a2b5c4a4f --- /dev/null +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/observation/BraveIntegrationTest.java @@ -0,0 +1,544 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.it.observation; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.util.concurrent.Futures.allAsList; +import static com.google.common.util.concurrent.Futures.transformAsync; +import static com.linecorp.armeria.client.observation.ObservationClient.newDecorator; +import static com.linecorp.armeria.common.SessionProtocol.H1C; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.transport.TTransportException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.InvalidResponseHeadersException; +import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.thrift.ThriftClients; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.thrift.ThriftFuture; +import com.linecorp.armeria.internal.testing.BlockingUtils; +import com.linecorp.armeria.server.AbstractHttpService; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.observation.ObservationService; +import com.linecorp.armeria.server.thrift.THttpService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +import brave.ScopedSpan; +import brave.Span; +import brave.Tracer.SpanInScope; +import brave.Tracing; +import brave.handler.MutableSpan; +import brave.handler.SpanHandler; +import brave.propagation.CurrentTraceContext; +import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.propagation.TraceContext; +import brave.sampler.Sampler; +import io.micrometer.observation.ObservationRegistry; +import testing.thrift.main.HelloService; +import testing.thrift.main.HelloService.AsyncIface; + +class BraveIntegrationTest { + + private static final SpanHandlerImpl spanHandler = new SpanHandlerImpl(); + + private static HelloService.Iface fooClient; + private static HelloService.Iface fooClientWithoutTracing; + private static HelloService.Iface timeoutClient; + private static HelloService.Iface timeoutClientClientTimesOut; + private static HelloService.Iface http1TimeoutClientClientTimesOut; + private static HelloService.AsyncIface barClient; + private static HelloService.AsyncIface quxClient; + private static HelloService.Iface zipClient; + private static WebClient poolWebClient; + + @RegisterExtension + static ServerExtension server = new ServerExtension(true) { + @Override + protected void configure(ServerBuilder sb) throws Exception { + // Our test that triggers a timeout will take 10 seconds to run to avoid flakiness + // that a client cancels a request before a server receives it. + sb.requestTimeout(Duration.ofSeconds(10)); + + sb.service("/foo", decorate("service/foo", THttpService.of( + (AsyncIface) (name, resultHandler) -> + barClient.hello("Miss. " + name, new DelegatingCallback(resultHandler))))); + + sb.service("/bar", decorate("service/bar", THttpService.of( + (AsyncIface) (name, resultHandler) -> { + if (name.startsWith("Miss. ")) { + name = "Ms. " + name.substring(6); + } + quxClient.hello(name, new DelegatingCallback(resultHandler)); + }))); + + sb.service("/zip", decorate("service/zip", THttpService.of( + (AsyncIface) (name, resultHandler) -> { + final ThriftFuture f1 = new ThriftFuture<>(); + final ThriftFuture f2 = new ThriftFuture<>(); + quxClient.hello(name, f1); + quxClient.hello(name, f2); + CompletableFuture.allOf(f1, f2).whenCompleteAsync((aVoid, throwable) -> { + resultHandler.onComplete(f1.getNow(null) + ", and " + f2.getNow(null)); + }, RequestContext.current().eventLoop()); + }))); + + sb.service("/qux", decorate("service/qux", THttpService.of( + (AsyncIface) (name, resultHandler) -> resultHandler.onComplete("Hello, " + name + '!')))); + + sb.service("/pool", decorate("service/pool", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) + throws Exception { + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(2)); + final CountDownLatch countDownLatch = new CountDownLatch(2); + + final ListenableFuture> spanAware = allAsList(IntStream.range(1, 3).mapToObj( + i -> executorService.submit( + RequestContext.current().makeContextAware(() -> { + if (i == 2) { + countDownLatch.countDown(); + countDownLatch.await(); + } + final Span span = Tracing.currentTracer().nextSpan().start(); + try (SpanInScope unused = + Tracing.currentTracer().withSpanInScope(span)) { + if (i == 1) { + countDownLatch.countDown(); + countDownLatch.await(); + // to wait second task get span. + Thread.sleep(1000L); + } + } finally { + span.finish(); + } + return null; + }))).collect(toImmutableList())); + + final CompletableFuture responseFuture = new CompletableFuture<>(); + final HttpResponse res = HttpResponse.from(responseFuture); + transformAsync(spanAware, + result -> allAsList(IntStream.range(1, 3).mapToObj( + i -> executorService.submit( + RequestContext.current().makeContextAware(() -> { + final ScopedSpan span = Tracing.currentTracer() + .startScopedSpan("aloha"); + try { + return null; + } finally { + span.finish(); + } + }) + )).collect(toImmutableList())), + RequestContext.current().eventLoop()) + .addListener(() -> { + responseFuture.complete(HttpResponse.of(HttpStatus.OK, + MediaType.PLAIN_TEXT_UTF_8, + "Lee")); + }, RequestContext.current().eventLoop()); + return res; + } + })); + + sb.service("/timeout", decorate("service/timeout", THttpService.of( + // This service never calls the handler and will timeout. + (AsyncIface) (name, resultHandler) -> { + }))); + + sb.service("/http", (req, ctx) -> HttpResponse.of(HttpStatus.OK)); + } + }; + + @BeforeEach + void setupClients() { + fooClient = ThriftClients.builder(server.httpUri()) + .path("/foo") + .decorator(newDecorator(newTracing("client/foo"))) + .build(HelloService.Iface.class); + zipClient = ThriftClients.builder(server.httpUri()) + .path("/zip") + .decorator(newDecorator(newTracing("client/zip"))) + .build(HelloService.Iface.class); + fooClientWithoutTracing = ThriftClients.newClient(server.httpUri() + "/foo", HelloService.Iface.class); + barClient = newClient("/bar"); + quxClient = newClient("/qux"); + poolWebClient = WebClient.of(server.httpUri()); + timeoutClient = ThriftClients.builder(server.httpUri()) + .path("/timeout") + .decorator(newDecorator(newTracing("client/timeout"))) + .build(HelloService.Iface.class); + timeoutClientClientTimesOut = + ThriftClients.builder(server.httpUri()) + .path("/timeout") + .decorator(newDecorator(newTracing("client/timeout"))) + .responseTimeout(Duration.ofSeconds(3)) + .build(HelloService.Iface.class); + http1TimeoutClientClientTimesOut = + ThriftClients.builder(server.uri(H1C)) + .path("/timeout") + .decorator(newDecorator(newTracing("client/timeout"))) + .responseTimeout(Duration.ofSeconds(3)) + .build(HelloService.Iface.class); + } + + @AfterEach + void tearDown() { + Tracing.current().close(); + } + + @AfterEach + void shouldHaveNoExtraSpans() { + assertThat(spanHandler.spans).isEmpty(); + } + + private static ObservationService decorate(String name, HttpService service) { + return ObservationService.newDecorator(newTracing(name)).apply(service); + } + + private static HelloService.AsyncIface newClient(String path) { + return ThriftClients.builder(server.httpUri()) + .path(path) + .decorator(newDecorator(newTracing("client" + path))) + .build(HelloService.AsyncIface.class); + } + + private static ObservationRegistry newTracing(String name) { + final CurrentTraceContext currentTraceContext = ThreadLocalCurrentTraceContext.create(); + return MicrometerObservationRegistryUtils + .observationRegistry(tracingBuilder(name, currentTraceContext)); + } + + private static Tracing tracingBuilder(String name, CurrentTraceContext currentTraceContext) { + return Tracing.newBuilder() + .currentTraceContext(currentTraceContext) + .localServiceName(name) + .addSpanHandler(spanHandler) + .sampler(Sampler.ALWAYS_SAMPLE) + .build(); + } + + @Test + @Disabled("TODO: We don't support these") + void testTimingAnnotations() { + // Use separate client factory to make sure connection is created. + try (ClientFactory clientFactory = ClientFactory.builder().build()) { + final BlockingWebClient client = + WebClient.builder(server.httpUri()) + .factory(clientFactory) + .decorator( + newDecorator(newTracing("timed-client"))) + .build() + .blocking(); + assertThat(client.get("/http").status()).isEqualTo(HttpStatus.OK); + final MutableSpan[] initialConnectSpans = spanHandler.take(1); + assertThat(initialConnectSpans[0].annotations()) + .extracting(Map.Entry::getValue).containsExactlyInAnyOrder( + "connection-acquire.start", + "socket-connect.start", + "socket-connect.end", + "connection-acquire.end", + "ws", + "wr"); + + // Make another request which will reuse the connection so no connection timing. + assertThat(client.get("/http").status()).isEqualTo(HttpStatus.OK); + + final MutableSpan[] secondConnectSpans = spanHandler.take(1); + assertThat(secondConnectSpans[0].annotations()) + .extracting(Map.Entry::getValue).containsExactlyInAnyOrder( + "ws", + "wr"); + } + } + + @Test + void testServiceHasMultipleClientRequests() throws Exception { + assertThat(zipClient.hello("Lee")).isEqualTo("Hello, Lee!, and Hello, Lee!"); + + final MutableSpan[] spans = spanHandler.take(6); + final String traceId = spans[0].traceId(); + assertThat(spans).allMatch(s -> s.traceId().equals(traceId)); + } + + @Test + void testClientInitiatedTrace() throws Exception { + assertThat(fooClient.hello("Lee")).isEqualTo("Hello, Ms. Lee!"); + + final MutableSpan[] spans = spanHandler.take(6); + final String traceId = spans[0].traceId(); + assertThat(spans).allMatch(s -> s.traceId().equals(traceId)); + + // Find all spans. + final MutableSpan clientFooSpan = findSpan(spans, "client/foo"); + final MutableSpan serviceFooSpan = findSpan(spans, "service/foo"); + final MutableSpan clientBarSpan = findSpan(spans, "client/bar"); + final MutableSpan serviceBarSpan = findSpan(spans, "service/bar"); + final MutableSpan clientQuxSpan = findSpan(spans, "client/qux"); + final MutableSpan serviceQuxSpan = findSpan(spans, "service/qux"); + + // client/foo and service/foo should have no parents. + assertThat(clientFooSpan.parentId()).isNull(); + assertThat(serviceFooSpan.parentId()).isEqualTo(clientFooSpan.id()); + + // client/foo and service/foo should have the ID values identical with their traceIds. + assertThat(clientFooSpan.id()).isEqualTo(traceId); + assertThat(serviceFooSpan.id()).isEqualTo(serviceFooSpan.localRootId()); + + // Check the parentIds. + assertThat(serviceFooSpan.parentId()).isEqualTo(clientFooSpan.id()); + assertThat(clientBarSpan.parentId()).isEqualTo(serviceFooSpan.id()); + assertThat(serviceBarSpan.parentId()).isEqualTo(clientBarSpan.id()); + assertThat(clientQuxSpan.parentId()).isEqualTo(serviceBarSpan.id()); + assertThat(serviceQuxSpan.parentId()).isEqualTo(clientQuxSpan.id()); + + // Check the service names. + assertThat(clientFooSpan.localServiceName()).isEqualTo("client/foo"); + assertThat(serviceFooSpan.localServiceName()).isEqualTo("service/foo"); + assertThat(clientBarSpan.localServiceName()).isEqualTo("client/bar"); + assertThat(serviceBarSpan.localServiceName()).isEqualTo("service/bar"); + assertThat(clientQuxSpan.localServiceName()).isEqualTo("client/qux"); + assertThat(serviceQuxSpan.localServiceName()).isEqualTo("service/qux"); + + // Check RPC request can update http request. + assertThat(clientFooSpan.tags().get("http.protocol")).isEqualTo("h2c"); + assertThat(clientFooSpan.tags().get("http.host")).startsWith("127.0.0.1"); + + // Check the span names. + assertThat(spans).allMatch( + s -> "testing.thrift.main.HelloService$AsyncIface/hello".equals(s.name()) || + "testing.thrift.main.HelloService$Iface/hello".equals(s.name())); + + // Check wire times + final long clientStartTime = clientFooSpan.startTimestamp(); + final long clientWireSendTime = clientFooSpan.annotations().stream() + .filter(a -> "ws".equals(a.getValue())) + .findFirst().get().getKey(); + final long clientWireReceiveTime = clientFooSpan.annotations().stream() + .filter(a -> "wr".equals(a.getValue())) + .findFirst().get().getKey(); + final long clientEndTime = clientFooSpan.finishTimestamp(); + + final long serverStartTime = serviceFooSpan.startTimestamp(); + final long serverWireSendTime = serviceFooSpan.annotations().stream() + .filter(a -> "ws".equals(a.getValue())) + .findFirst().get().getKey(); + final long serverWireReceiveTime = serviceFooSpan.annotations().stream() + .filter(a -> "wr".equals(a.getValue())) + .findFirst().get().getKey(); + final long serverEndTime = serviceFooSpan.finishTimestamp(); + + // These values are taken at microsecond precision and should be reliable to compare to each other. + + // Because of the small deltas among these numbers in a unit test, a thread context switch can cause + // client - server values to not compare correctly. We go ahead and only verify values recorded from the + // same thread. + + assertThat(clientStartTime).isNotZero(); + assertThat(clientWireSendTime).isGreaterThanOrEqualTo(clientStartTime); + assertThat(clientWireReceiveTime).isGreaterThanOrEqualTo(clientWireSendTime); + assertThat(clientEndTime).isGreaterThanOrEqualTo(clientWireReceiveTime); + + // Server start time and wire receive time are essentially the same in our current model, and whether + // one is greater than the other is mostly an implementation detail, so we don't compare them to each + // other. + + assertThat(serverWireSendTime).isGreaterThanOrEqualTo(serverStartTime); + assertThat(serverWireSendTime).isGreaterThanOrEqualTo(serverWireReceiveTime); + assertThat(serverEndTime).isGreaterThanOrEqualTo(serverWireSendTime); + } + + @Test + void testServiceInitiatedTrace() throws Exception { + assertThat(fooClientWithoutTracing.hello("Lee")).isEqualTo("Hello, Ms. Lee!"); + + final MutableSpan[] spans = spanHandler.take(5); + final String traceId = spans[0].traceId(); + assertThat(spans).allMatch(s -> s.traceId().equals(traceId)); + + // Find all spans. + final MutableSpan serviceFooSpan = findSpan(spans, "service/foo"); + final MutableSpan clientBarSpan = findSpan(spans, "client/bar"); + final MutableSpan serviceBarSpan = findSpan(spans, "service/bar"); + final MutableSpan clientQuxSpan = findSpan(spans, "client/qux"); + final MutableSpan serviceQuxSpan = findSpan(spans, "service/qux"); + + // service/foo should have no parent. + assertThat(serviceFooSpan.parentId()).isNull(); + + // service/foo should have the ID value identical with its traceId. + assertThat(serviceFooSpan.id()).isEqualTo(traceId); + + // Check the parentIds + assertThat(clientBarSpan.parentId()).isEqualTo(serviceFooSpan.id()); + assertThat(serviceBarSpan.parentId()).isEqualTo(clientBarSpan.id()); + assertThat(clientQuxSpan.parentId()).isEqualTo(serviceBarSpan.id()); + assertThat(serviceQuxSpan.parentId()).isEqualTo(clientQuxSpan.id()); + + // Check the service names. + assertThat(serviceFooSpan.localServiceName()).isEqualTo("service/foo"); + assertThat(clientBarSpan.localServiceName()).isEqualTo("client/bar"); + assertThat(serviceBarSpan.localServiceName()).isEqualTo("service/bar"); + assertThat(clientQuxSpan.localServiceName()).isEqualTo("client/qux"); + assertThat(serviceQuxSpan.localServiceName()).isEqualTo("service/qux"); + + // Check the span names. + assertThat(spans).allMatch( + s -> "testing.thrift.main.HelloService$AsyncIface/hello".equals(s.name())); + } + + @Test + void testSpanInThreadPoolHasSameTraceId() throws Exception { + poolWebClient.get("pool").aggregate().get(); + final MutableSpan[] spans = spanHandler.take(5); + assertThat(Arrays.stream(spans).map(MutableSpan::traceId).collect(toImmutableSet())).hasSize(1); + assertThat(Arrays.stream(spans).map(MutableSpan::parentId) + .filter(Objects::nonNull) + .collect(toImmutableSet())).hasSize(1); + } + + @Test + void testServerTimesOut() throws Exception { + assertThatThrownBy(() -> timeoutClient.hello("name")) + .isInstanceOf(TTransportException.class) + .hasCauseInstanceOf(InvalidResponseHeadersException.class); + final MutableSpan[] spans = spanHandler.take(2); + + final MutableSpan serverSpan = findSpan(spans, "service/timeout"); + final MutableSpan clientSpan = findSpan(spans, "client/timeout"); + + // Server timed out meaning it did still send a timeout response to the client and we have all + // annotations. + assertThat(serverSpan.annotations()).hasSize(2); + assertThat(clientSpan.annotations()).hasSize(2); + } + + @Test + void testHttp2ClientTimesOut() throws Exception { + testClientTimesOut(timeoutClientClientTimesOut); + } + + @Test + void testHttp1ClientTimesOut() throws Exception { + testClientTimesOut(http1TimeoutClientClientTimesOut); + } + + private static void testClientTimesOut(HelloService.Iface client) { + assertThatThrownBy(() -> client.hello("name")) + .isInstanceOf(TTransportException.class) + .hasCauseInstanceOf(ResponseTimeoutException.class); + final MutableSpan[] spans = spanHandler.take(2); + + final MutableSpan serverSpan = findSpan(spans, "service/timeout"); + final MutableSpan clientSpan = findSpan(spans, "client/timeout"); + + // Collect all annotations except for connection attempts. + final List serverAnnotations = serverSpan.annotations().stream() + .map(Map.Entry::getValue) + .collect(toImmutableList()); + final List clientAnnotations = clientSpan.annotations().stream() + .filter(a -> !a.getValue().contains("connect")) + .map(Map.Entry::getValue) + .collect(toImmutableList()); + + // Client timed out, so no response data was ever sent from the server. + // There is a wire send in the server and no wire receive in the client. + assertThat(serverAnnotations).containsExactly("wr"); + assertThat(clientAnnotations).containsExactly("ws"); + } + + private static MutableSpan findSpan(MutableSpan[] spans, String serviceName) { + return Arrays.stream(spans) + .filter(s -> serviceName.equals(s.localServiceName())) + .findAny() + .orElseThrow(() -> new AssertionError( + "Can't find a Span with service name: " + serviceName)); + } + + private static class DelegatingCallback implements AsyncMethodCallback { + private final AsyncMethodCallback resultHandler; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + DelegatingCallback(AsyncMethodCallback resultHandler) { + this.resultHandler = resultHandler; + } + + @Override + public void onComplete(String response) { + resultHandler.onComplete(response); + } + + @Override + public void onError(Exception exception) { + resultHandler.onError(exception); + } + } + + private static class SpanHandlerImpl extends SpanHandler { + private final BlockingQueue spans = new LinkedBlockingQueue<>(); + + @Override + public boolean end(TraceContext context, MutableSpan span, Cause cause) { + return BlockingUtils.blockingRun(() -> spans.add(span)); + } + + MutableSpan[] take(int numSpans) { + final List taken = new ArrayList<>(); + while (taken.size() < numSpans) { + BlockingUtils.blockingRun(() -> taken.add(spans.poll(30, TimeUnit.SECONDS))); + } + + // Reverse the collected spans to sort the spans by request time. + Collections.reverse(taken); + return taken.toArray(new MutableSpan[numSpans]); + } + } +} diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/observation/MicrometerObservationRegistryUtils.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/observation/MicrometerObservationRegistryUtils.java new file mode 100644 index 00000000000..a9b43909687 --- /dev/null +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/observation/MicrometerObservationRegistryUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.it.observation; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import brave.Tracing; +import brave.http.HttpTracing; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.observation.MeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.brave.bridge.BraveBaggageManager; +import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext; +import io.micrometer.tracing.brave.bridge.BravePropagator; +import io.micrometer.tracing.brave.bridge.BraveTracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.handler.TracingObservationHandler; + +public final class MicrometerObservationRegistryUtils { + + public static ObservationRegistry observationRegistry(HttpTracing httpTracing) { + return observationRegistry(httpTracing.tracing()); + } + + public static ObservationRegistry observationRegistry(Tracing tracing) { + final BraveCurrentTraceContext braveCurrentTraceContext = new BraveCurrentTraceContext( + tracing.currentTraceContext()); + final BravePropagator bravePropagator = new BravePropagator(tracing); + final BraveTracer braveTracer = new BraveTracer(tracing.tracer(), braveCurrentTraceContext, + new BraveBaggageManager()); + final List> tracingHandlers = + Arrays.asList(new PropagatingSenderTracingObservationHandler<>(braveTracer, bravePropagator), + new PropagatingReceiverTracingObservationHandler<>(braveTracer, bravePropagator), + new DefaultTracingObservationHandler(braveTracer)); + + final MeterRegistry meterRegistry = new SimpleMeterRegistry(); + final List> meterHandlers = Collections.singletonList( + new DefaultMeterObservationHandler(meterRegistry)); + + final ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig().observationHandler( + new ObservationHandler.CompositeObservationHandler.FirstMatchingCompositeObservationHandler( + tracingHandlers)); + observationRegistry.observationConfig().observationHandler( + new ObservationHandler.CompositeObservationHandler.FirstMatchingCompositeObservationHandler( + meterHandlers)); + return observationRegistry; + } + + private MicrometerObservationRegistryUtils() {} +} diff --git a/thrift/thrift0.14/build.gradle b/thrift/thrift0.14/build.gradle index dc695101cdd..0547e2853b4 100644 --- a/thrift/thrift0.14/build.gradle +++ b/thrift/thrift0.14/build.gradle @@ -16,6 +16,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the sources from ':thrift0.13'. diff --git a/thrift/thrift0.15/build.gradle b/thrift/thrift0.15/build.gradle index 822afb8e783..038418a9772 100644 --- a/thrift/thrift0.15/build.gradle +++ b/thrift/thrift0.15/build.gradle @@ -16,6 +16,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the sources from ':thrift0.13' and ':thrift0.14'. diff --git a/thrift/thrift0.16/build.gradle b/thrift/thrift0.16/build.gradle index 7b82de32f7f..4d729a63e10 100644 --- a/thrift/thrift0.16/build.gradle +++ b/thrift/thrift0.16/build.gradle @@ -17,6 +17,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the sources from ':thrift0.13' and ':thrift0.14'. ':thrift0.15' is empty so no need to copy anything. diff --git a/thrift/thrift0.17/build.gradle b/thrift/thrift0.17/build.gradle index 9290cb3e04b..e30fa5477d2 100644 --- a/thrift/thrift0.17/build.gradle +++ b/thrift/thrift0.17/build.gradle @@ -17,6 +17,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the sources from ':thrift0.13' and ':thrift0.14'. ':thrift0.15' is empty so no need to copy anything. diff --git a/thrift/thrift0.18/build.gradle b/thrift/thrift0.18/build.gradle index 1f6711a033e..5d32519855b 100644 --- a/thrift/thrift0.18/build.gradle +++ b/thrift/thrift0.18/build.gradle @@ -17,6 +17,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the sources from ':thrift0.13' and ':thrift0.14'. ':thrift0.15' is empty so no need to copy anything. diff --git a/thrift/thrift0.9/build.gradle b/thrift/thrift0.9/build.gradle index 65401ffe63f..d3b44b2660c 100644 --- a/thrift/thrift0.9/build.gradle +++ b/thrift/thrift0.9/build.gradle @@ -23,6 +23,12 @@ dependencies { testImplementation libs.dropwizard.metrics.core testImplementation libs.micrometer.prometheus testImplementation libs.prometheus + + // micrometer tracing + testImplementation (libs.micrometer.tracing.integration.test) { + exclude group: "org.mockito" + } + testImplementation libs.brave.instrumentation.http.tests } // Use the sources from ':thrift0.13'.