Skip to content

Commit

Permalink
Move reactive server instrumentation out of WebFilter
Browse files Browse the repository at this point in the history
Prior to this commit, the Observation instrumentation for Reactive
server applications was implemented with a `WebFilter`. This allowed to
record observations and set up a tracing context for the controller
handlers.

The limitation of this approach is that all processing happening at a
lower level is not aware of any observation. Here, the
`HttpWebHandlerAdapter` handles several interesting aspects:

* logging of HTTP requests and responses at the TRACE level
* logging of client disconnect errors
* handling of unresolved errors

With the current instrumentation, these logging statements will miss the
tracing context information. As a result, this commit deprecates the
`ServerHttpObservationFilter` in favor of a more direct instrumentation
of the `HttpWebHandlerAdapter`. This enables a more precise
instrumentattion and allows to set up the current observation earlier in
the reactor context: log statements will now contain the relevant
information.

Fixes spring-projectsgh-30013
  • Loading branch information
bclozel committed Apr 6, 2023
1 parent cef9166 commit fed0f16
Show file tree
Hide file tree
Showing 16 changed files with 420 additions and 31 deletions.
11 changes: 8 additions & 3 deletions framework-docs/src/docs/asciidoc/integration/observability.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,17 @@ By default, the following `KeyValues` are created:
[[integration.observability.http-server.reactive]]
=== Reactive applications

Applications need to configure the `org.springframework.web.filter.reactive.ServerHttpObservationFilter` reactive `WebFilter` in their application.
Applications need to configure the `WebHttpHandlerBuilder` with a `MeterRegistry` to enable server instrumentation.
This can be done on the `WebHttpHandlerBuilder`, as follows:

include::code:HttpHandlerConfiguration[]

It is using the `org.springframework.http.server.reactive.observation.DefaultServerRequestObservationConvention` by default, backed by the `ServerRequestObservationContext`.

This will only record an observation as an error if the `Exception` has not been handled by the web Framework and has bubbled up to the `WebFilter`.
This will only record an observation as an error if the `Exception` has not been handled by an application Controller.
Typically, all exceptions handled by Spring WebFlux's `@ExceptionHandler` and <<web.adoc#webflux-ann-rest-exceptions,`ProblemDetail` support>> will not be recorded with the observation.
You can, at any point during request processing, set the error field on the `ObservationContext` yourself:
Exceptions handled by configured `WebExceptionHandler` instances will be recorded as errors.
If you would like to customize this behavior, you can set, at any point during request processing, the error field on the `ObservationContext` yourself:

include::code:UserController[]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.docs.integration.observability.httpserver.reactive;

import io.micrometer.observation.ObservationRegistry;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;

@Configuration(proxyBeanMethods = false)
public class HttpHandlerConfiguration {

private final ApplicationContext applicationContext;

public HttpHandlerConfiguration(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

@Bean
public HttpHandler httpHandler(ObservationRegistry registry) {
return WebHttpHandlerBuilder.applicationContext(this.applicationContext)
.observationRegistry(registry)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.springframework.docs.integration.observability.httpserver.reactive;

import org.springframework.http.ResponseEntity;
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.filter.reactive.ServerHttpObservationFilter;
import org.springframework.web.server.ServerWebExchange;

@Controller
Expand All @@ -28,7 +28,7 @@ public class UserController {
@ExceptionHandler(MissingUserException.class)
ResponseEntity<Void> handleMissingUser(ServerWebExchange exchange, MissingUserException exception) {
// We want to record this exception with the observation
ServerHttpObservationFilter.findObservationContext(exchange)
ServerRequestObservationContext.findCurrent(exchange)
.ifPresent(context -> context.setError(exception));
return ResponseEntity.notFound().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import io.micrometer.observation.transport.RequestReplyReceiverContext;

import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.server.ServerWebExchange;

/**
* Context that holds information for metadata collection regarding
Expand All @@ -36,6 +38,12 @@
*/
public class ServerRequestObservationContext extends RequestReplyReceiverContext<ServerHttpRequest, ServerHttpResponse> {

/**
* Name of the request attribute holding the {@link ServerRequestObservationContext context} for the current observation.
* @since 6.1.0
*/
public static final String CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE = ServerRequestObservationContext.class.getName() + ".context";

private final Map<String, Object> attributes;

@Nullable
Expand All @@ -50,6 +58,16 @@ public ServerRequestObservationContext(ServerHttpRequest request, ServerHttpResp
this.attributes = Collections.unmodifiableMap(attributes);
}

/**
* Get the current {@link ServerRequestObservationContext observation context} from the given exchange, if available.
* @param exchange the current exchange
* @return the current observation context
* @since 6.1.0
*/
public static Optional<ServerRequestObservationContext> findCurrent(ServerWebExchange exchange) {
return Optional.ofNullable(exchange.getAttribute(CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE));
}

/**
* Return an immutable map of the current request attributes.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import org.reactivestreams.Publisher;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import reactor.core.publisher.Mono;

import org.springframework.http.server.reactive.ServerHttpResponse;
Expand All @@ -47,7 +48,9 @@
*
* @author Brian Clozel
* @since 6.0
* @deprecated since 6.1.0 in favor of {@link WebHttpHandlerBuilder}.
*/
@Deprecated(since = "6.1.0", forRemoval = true)
public class ServerHttpObservationFilter implements WebFilter {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,12 @@
import java.util.Set;
import java.util.function.Function;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import org.springframework.context.ApplicationContext;
Expand All @@ -36,11 +40,16 @@
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.observation.DefaultServerRequestObservationConvention;
import org.springframework.http.server.reactive.observation.ServerHttpObservationDocumentation;
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
import org.springframework.http.server.reactive.observation.ServerRequestObservationConvention;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebHandler;
import org.springframework.web.server.handler.ExceptionHandlingWebHandler;
import org.springframework.web.server.handler.WebHandlerDecorator;
import org.springframework.web.server.i18n.AcceptHeaderLocaleContextResolver;
import org.springframework.web.server.i18n.LocaleContextResolver;
Expand All @@ -55,6 +64,7 @@
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @author Brian Clozel
* @since 5.0
*/
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
Expand All @@ -75,6 +85,8 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS =
Set.of("AbortedException", "ClientAbortException", "EOFException", "EofException");

private static final ServerRequestObservationConvention DEFAULT_OBSERVATION_CONVENTION = new DefaultServerRequestObservationConvention();


private static final Log logger = LogFactory.getLog(HttpWebHandlerAdapter.class);

Expand All @@ -91,6 +103,12 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
@Nullable
private ForwardedHeaderTransformer forwardedHeaderTransformer;

@Nullable
private ObservationRegistry observationRegistry;

@Nullable
private ServerRequestObservationConvention observationConvention;

@Nullable
private ApplicationContext applicationContext;

Expand Down Expand Up @@ -192,6 +210,44 @@ public ForwardedHeaderTransformer getForwardedHeaderTransformer() {
return this.forwardedHeaderTransformer;
}

/**
* Configure a {@link ObservationRegistry} for recording server exchange observations.
* By default, a {@link ObservationRegistry#NOOP no-op} instance will be used.
* @param observationRegistry the observation registry to use
* @since 6.1.0
*/
public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/**
* Return the configured {@link ObservationRegistry}.
* @since 6.1.0
*/
@Nullable
public ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

/**
* Configure a {@link ServerRequestObservationConvention} for server exchanges observations.
* By default, a {@link DefaultServerRequestObservationConvention} instance will be used.
* @param observationConvention the observation convention to use
* @since 6.1.0
*/
public void setObservationConvention(ServerRequestObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* Return the Observation convention configured for server exchanges observations.
* @since 6.1.0
*/
@Nullable
public ServerRequestObservationConvention getObservationConvention() {
return this.observationConvention;
}

/**
* Configure the {@code ApplicationContext} associated with the web application,
* if it was initialized with one via
Expand Down Expand Up @@ -247,9 +303,12 @@ public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response)
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

ServerRequestObservationContext observationContext = new ServerRequestObservationContext(exchange.getRequest(),
exchange.getResponse(), exchange.getAttributes());
exchange.getAttributes().put(ServerRequestObservationContext.CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext);

return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.transformDeferred(call -> transform(exchange, observationContext, call))
.then(cleanupMultipart(exchange))
.then(Mono.defer(response::setComplete));
}
Expand All @@ -271,6 +330,42 @@ protected String formatRequest(ServerHttpRequest request) {
return "HTTP " + request.getMethod() + " \"" + request.getPath() + query + "\"";
}

private Publisher<Void> transform(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Mono<Void> call) {
Observation observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(this.observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry);
observation.start();
return call
.doOnSuccess(aVoid -> {
logResponse(exchange);
stopObservation(observation, exchange);
})
.onErrorResume(ex -> handleUnresolvedError(exchange, observationContext, ex))
.doOnCancel(() -> cancelObservation(observationContext, observation))
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
}

private void stopObservation(Observation observation, ServerWebExchange exchange) {
Throwable throwable = exchange.getAttribute(ExceptionHandlingWebHandler.HANDLED_WEB_EXCEPTION);
if (throwable != null) {
observation.error(throwable);
}
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
observation.stop();
}
else {
response.beforeCommit(() -> {
observation.stop();
return Mono.empty();
});
}
}

private void cancelObservation(ServerRequestObservationContext observationContext, Observation observation) {
observationContext.setConnectionAborted(true);
observation.stop();
}

private void logResponse(ServerWebExchange exchange) {
LogFormatUtils.traceDebug(logger, traceOn -> {
HttpStatusCode status = exchange.getResponse().getStatusCode();
Expand All @@ -284,7 +379,7 @@ private String formatHeaders(HttpHeaders responseHeaders) {
responseHeaders.toString() : responseHeaders.isEmpty() ? "{}" : "{masked}";
}

private Mono<Void> handleUnresolvedError(ServerWebExchange exchange, Throwable ex) {
private Mono<Void> handleUnresolvedError(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Throwable ex) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
String logPrefix = exchange.getLogPrefix();
Expand All @@ -304,6 +399,7 @@ else if (lostClientLogger.isDebugEnabled()) {
lostClientLogger.debug(logPrefix + "Client went away: " + ex +
" (stacktrace at TRACE level for '" + DISCONNECTED_CLIENT_LOG_CATEGORY + "')");
}
observationContext.setConnectionAborted(true);
return Mono.empty();
}
else {
Expand Down
Loading

0 comments on commit fed0f16

Please sign in to comment.