Skip to content

Commit

Permalink
Http/2 Cached connection close detection
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec authored and dalexandrov committed Aug 26, 2023
1 parent 7168699 commit ea27712
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 15 deletions.
24 changes: 24 additions & 0 deletions http/http2/src/main/java/io/helidon/http/http2/Http2Ping.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Ping frame.
*/
public final class Http2Ping implements Http2Frame<Http2Flag.PingFlags> {
private static final byte[] EMPTY_PING_DATA = new byte[8];
private final BufferData data;

Http2Ping(BufferData data) {
Expand All @@ -38,6 +39,15 @@ public static Http2Ping create(BufferData data) {
return new Http2Ping(data);
}

/**
* Create ping.
*
* @return ping frame
*/
public static Http2Ping create() {
return new Http2Ping(BufferData.create(EMPTY_PING_DATA));
}

@Override
public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Flag.PingFlags flags) {
Http2FrameHeader header = Http2FrameHeader.create(data.available(),
Expand All @@ -48,6 +58,20 @@ public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Fla
return new Http2FrameData(header, data);
}

/**
* Representation of ping data.
*
* @return frame data crated from this ping
*/
public Http2FrameData toFrameData() {
Http2FrameHeader header = Http2FrameHeader.create(data.available(),
frameTypes(),
Http2Flag.PingFlags.create(0),
0);

return new Http2FrameData(header, data);
}

@Override
public String name() {
return Http2FrameType.PING.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -33,6 +34,7 @@
import io.helidon.common.buffers.DataWriter;
import io.helidon.common.socket.SocketContext;
import io.helidon.http.http2.ConnectionFlowControl;
import io.helidon.http.http2.FlowControl;
import io.helidon.http.http2.Http2ConnectionWriter;
import io.helidon.http.http2.Http2ErrorCode;
import io.helidon.http.http2.Http2Exception;
Expand Down Expand Up @@ -70,11 +72,13 @@ class Http2ClientConnection {
private final ConnectionFlowControl connectionFlowControl;
private final Http2Headers.DynamicTable inboundDynamicTable =
Http2Headers.DynamicTable.create(Http2Setting.HEADER_TABLE_SIZE.defaultValue());
private final Http2ClientProtocolConfig protocolConfig;
private final ClientConnection connection;
private final SocketContext ctx;
private final Http2ConnectionWriter writer;
private final DataReader reader;
private final DataWriter dataWriter;
private final Semaphore pingPongSemaphore = new Semaphore(0);
private volatile int lastStreamId;

private Http2Settings serverSettings = Http2Settings.builder()
Expand All @@ -83,16 +87,13 @@ class Http2ClientConnection {

private volatile boolean closed = false;

public boolean closed(){
return closed;
}

Http2ClientConnection(Http2ClientProtocolConfig protocolConfig, ClientConnection connection) {
this.connectionFlowControl = ConnectionFlowControl.clientBuilder(this::writeWindowsUpdate)
.maxFrameSize(protocolConfig.maxFrameSize())
.initialWindowSize(protocolConfig.initialWindowSize())
.blockTimeout(protocolConfig.flowControlBlockTimeout())
.build();
this.protocolConfig = protocolConfig;
this.connection = connection;
this.ctx = connection.helidonSocket();
this.dataWriter = connection.writer();
Expand Down Expand Up @@ -172,7 +173,29 @@ Http2ClientStream tryStream(Http2StreamConfig config) {
}
}

void updateLastStreamId(int lastStreamId){
boolean closed() {
return closed || (protocolConfig.ping() && !ping());
}

boolean ping() {
Http2Ping ping = Http2Ping.create();
Http2FrameData frameData = ping.toFrameData();
sendListener.frameHeader(ctx, 0, frameData.header());
sendListener.frame(ctx, 0, ping);
try {
this.writer().writeData(frameData, FlowControl.Outbound.NOOP);
return pingPongSemaphore.tryAcquire(protocolConfig.pingTimeout().toMillis(), TimeUnit.MILLISECONDS);
} catch (UncheckedIOException | InterruptedException e) {
ctx.log(LOGGER, DEBUG, "Ping failed!", e);
return false;
}
}

void pong() {
pingPongSemaphore.release();
}

void updateLastStreamId(int lastStreamId) {
this.lastStreamId = lastStreamId;
}

Expand Down Expand Up @@ -247,6 +270,7 @@ private void start(Http2ClientProtocolConfig protocolConfig,
}
ctx.log(LOGGER, TRACE, "Client listener interrupted");
} catch (Throwable t) {
closed = true;
ctx.log(LOGGER, DEBUG, "Failed to handle HTTP/2 client connection", t);
}
});
Expand All @@ -261,7 +285,7 @@ private void start(Http2ClientProtocolConfig protocolConfig,
}

private void writeWindowsUpdate(int streamId, Http2WindowUpdate windowUpdateFrame) {
if (streamId == 0){
if (streamId == 0) {
writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create()));
return;
}
Expand Down Expand Up @@ -385,6 +409,8 @@ private boolean handle() {
Http2Flag.PingFlags.create(Http2Flag.ACK),
0);
writer.write(new Http2FrameData(header, frame));
} else {
pong();
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,22 @@ default String type() {
*/
@ConfiguredOption("PT0.1S")
Duration flowControlBlockTimeout();

/**
* Check healthiness of cached connections with HTTP/2.0 ping frame.
* Defaults to {@code false}.
*
* @return use ping if true
*/
@ConfiguredOption("false")
boolean ping();

/**
* Timeout for ping probe used for checking healthiness of cached connections.
* Defaults to {@code PT0.5S}, which means 500 milliseconds.
*
* @return timeout
*/
@ConfiguredOption("PT0.5S")
Duration pingTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ boolean supports(ConnectionKey ck) {
}

void remove(ConnectionKey connectionKey) {
cache.remove(connectionKey);
Http2ClientConnectionHandler handler = cache.remove(connectionKey);
if (handler != null) {
handler.close();
}
http2Supported.remove(connectionKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class SharedCacheTest {
class SharedCacheTest {
@Test
void cacheHttp1WithServerRestart() {
WebServer webServer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,21 @@ public static WebServer startServer(boolean ssl) {
.route(Http2Route.route(GET, "/versionspecific", (req, res) -> res.send("HTTP/2.0 route\n")))
.route(Http1Route.route(GET, "/versionspecific1", (req, res) -> res.send("HTTP/1.1 route\n")))
.route(Http2Route.route(GET, "/versionspecific2", (req, res) -> res.send("HTTP/2.0 route\n")))
.route(Http.Method.predicate(GET, POST, PUT),
PathMatchers.create("/multi*"),
(req, res) -> res.send("HTTP/" + req.prologue().protocolVersion()
+ " route " + req.prologue().method() + "\n")))
.route(Http.Method.predicate(GET),
PathMatchers.create("/multi*"),
(req, res) -> res.send("HTTP/" + req.prologue().protocolVersion()
+ " route " + req.prologue().method() + "\n"))
.route(Http.Method.predicate(POST, PUT),
PathMatchers.create("/multi*"),
(req, res) ->
{
if (req.content().hasEntity()) {
// Workaround for #7427
req.content().as(String.class);
}
res.send("HTTP/" + req.prologue().protocolVersion()
+ " route " + req.prologue().method() + "\n");
}))
.addRouting(WsRouting.builder()
.endpoint("/ws-echo", new EchoWsListener())
.build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.tests.upgrade.test;

import io.helidon.http.Http;
import io.helidon.logging.common.LogConfig;
import io.helidon.webclient.http2.Http2Client;
import io.helidon.webclient.http2.Http2ClientProtocolConfig;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.http.HttpRouting;
import io.helidon.webserver.http2.Http2Route;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static io.helidon.http.Http.Method.POST;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

class SharedHttp2CacheTest {

static Param[] params() {
return new Param[] {
new Param(true, true, true),
new Param(true, true, false),
new Param(true, false, false),
new Param(false, false, true),
new Param(false, true, true),
new Param(false, true, false),
new Param(true, false, true),
new Param(false, false, false),
};
}

@ParameterizedTest
@MethodSource("params")
void cacheHttp2WithServerRestart(Param param) {
LogConfig.configureRuntime();
Http.HeaderName clientPortHeader = Http.HeaderNames.create("client-port");
WebServer webServer = null;
try {
HttpRouting routing = HttpRouting.builder()
.route(Http2Route.route(POST, "/versionspecific", (req, res) -> {
req.content().consume(); // Workaround for #7427
res.header(clientPortHeader, String.valueOf(req.remotePeer().port()))
.send();
}))
.build();

webServer = WebServer.builder()
.routing(routing)
.build()
.start();

int port = webServer.port();

Http2Client webClient = Http2Client.builder()
.protocolConfig(Http2ClientProtocolConfig.builder()
.priorKnowledge(param.priorKnowledge())
.ping(param.usePing())
.build())
.baseUri("http://localhost:" + port + "/versionspecific")
.build();

Integer firstReqClientPort;
try (var res = webClient.post().submit("WHATEVER")) {
firstReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE);
assertThat(res.status(), is(Http.Status.OK_200));
}

if (param.restart()) {
// Test severing cached connections
webServer.stop();
webServer = WebServer.builder()
.port(port)
.routing(routing)
.build()
.start();
}

Integer secondReqClientPort;
try (var res = webClient.post().submit("WHATEVER")) {
secondReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE);
assertThat(res.status(), is(Http.Status.OK_200));
}

if (!param.restart()) {
assertThat("In case of cached connection client port must be the same.",
secondReqClientPort,
is(firstReqClientPort));
}

} finally {
if (webServer != null) {
webServer.stop();
}
}
}

record Param(boolean usePing, boolean priorKnowledge, boolean restart) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand Down Expand Up @@ -208,11 +207,10 @@ void versionSpecificHttp20Negative(String url) throws IOException, InterruptedEx
"HTTP/1.1 GET http://localhost:%d/multi-something",
"HTTP/1.1 PUT https://localhost:%d/multi-something",
"HTTP/1.1 POST https://localhost:%d/multi-something",
//"HTTP/2.0 GET http://localhost:%d/multi-something",
"HTTP/2.0 GET http://localhost:%d/multi-something",
"HTTP/2.0 PUT https://localhost:%d/multi-something",
"HTTP/2.0 POST https://localhost:%d/multi-something",
})
@Disabled("Fails on pipeline")
void versionSpecificHttp20MultipleMethods(String param) throws IOException, InterruptedException {
String[] split = param.split("\s");
String version = split[0];
Expand Down

0 comments on commit ea27712

Please sign in to comment.