Skip to content

Commit

Permalink
Make ReactorClientHttpConnector lifecycle-aware
Browse files Browse the repository at this point in the history
With this commit, ReactorClientHttpConnector now implements
SmartLifecycle which optionally allows recreating the HttpClient
after ReactorResourceFactory has been updated.

Closes spring-projectsgh-31180
  • Loading branch information
sdeleuze committed Sep 6, 2023
1 parent ab2ad74 commit 9d768a8
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,51 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Reactor-Netty implementation of {@link ClientHttpConnector}.
*
* <p>This class implements {@link Lifecycle} and can be optionally declared
* as a Spring-managed bean.
*
* @author Brian Clozel
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @since 5.0
* @see reactor.netty.http.client.HttpClient
*/
public class ReactorClientHttpConnector implements ClientHttpConnector {
public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLifecycle {

private static final Log logger = LogFactory.getLog(ReactorClientHttpConnector.class);

private final static Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);


private final HttpClient httpClient;
private HttpClient httpClient;

@Nullable
private final ReactorResourceFactory resourceFactory;

@Nullable
private final Function<HttpClient, HttpClient> mapper;

private volatile boolean running = true;

private final Object lifecycleMonitor = new Object();


/**
Expand All @@ -54,6 +75,8 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
*/
public ReactorClientHttpConnector() {
this.httpClient = defaultInitializer.apply(HttpClient.create());
this.resourceFactory = null;
this.mapper = null;
}

/**
Expand All @@ -68,14 +91,20 @@ public ReactorClientHttpConnector() {
* consider declaring a {@link ReactorResourceFactory} bean with
* {@code globalResources=true} in order to ensure the Reactor Netty global
* resources are shut down when the Spring ApplicationContext is closed.
* @param factory the resource factory to obtain the resources from
* @param resourceFactory the resource factory to obtain the resources from
* @param mapper a mapper for further initialization of the created client
* @since 5.1
*/
public ReactorClientHttpConnector(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
ConnectionProvider provider = factory.getConnectionProvider();
public ReactorClientHttpConnector(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
this.httpClient = createHttpClient(resourceFactory, mapper);
this.resourceFactory = resourceFactory;
this.mapper = mapper;
}

private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
ConnectionProvider provider = resourceFactory.getConnectionProvider();
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
this.httpClient = defaultInitializer.andThen(mapper).andThen(applyLoopResources(factory))
return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory))
.apply(HttpClient.create(provider));
}

Expand All @@ -96,6 +125,8 @@ private static Function<HttpClient, HttpClient> applyLoopResources(ReactorResour
public ReactorClientHttpConnector(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient is required");
this.httpClient = httpClient;
this.resourceFactory = null;
this.mapper = null;
}


Expand Down Expand Up @@ -131,4 +162,52 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
}

@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
if (this.resourceFactory != null && this.mapper != null) {
this.httpClient = createHttpClient(this.resourceFactory, this.mapper);
}
else {
logger.warn("Restarting a ReactorClientHttpConnector bean is only supported with externally managed Reactor Netty resources");
}
this.running = true;
}
}
}

@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
this.running = false;
}
}
}

@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}

@Override
public boolean isRunning() {
return this.running;
}

@Override
public boolean isAutoStartup() {
return false;
}

@Override
public int getPhase() {
// Start after ReactorResourceFactory
return 1;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client.reactive;

import java.util.function.Function;

import org.junit.jupiter.api.Test;
import reactor.netty.http.client.HttpClient;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Sebastien Deleuze
*/
public class ReactorClientHttpConnectorTests {

@Test
void restartWithDefaultConstructor() {
ReactorClientHttpConnector connector = new ReactorClientHttpConnector();
assertThat(connector.isRunning()).isTrue();
connector.start();
assertThat(connector.isRunning()).isTrue();
connector.stop();
assertThat(connector.isRunning()).isFalse();
connector.start();
assertThat(connector.isRunning()).isTrue();
}

@Test
void restartWithExternalResourceFactory() {
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
resourceFactory.afterPropertiesSet();
Function<HttpClient, HttpClient> mapper = Function.identity();
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(resourceFactory, mapper);
assertThat(connector.isRunning()).isTrue();
connector.start();
assertThat(connector.isRunning()).isTrue();
connector.stop();
assertThat(connector.isRunning()).isFalse();
connector.start();
assertThat(connector.isRunning()).isTrue();
}

@Test
void restartWithHttpClient() {
HttpClient httpClient = HttpClient.create();
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
assertThat(connector.isRunning()).isTrue();
connector.start();
assertThat(connector.isRunning()).isTrue();
connector.stop();
assertThat(connector.isRunning()).isFalse();
connector.start();
assertThat(connector.isRunning()).isTrue();
}

}

0 comments on commit 9d768a8

Please sign in to comment.