From c42278600bf130bf570dd291dc28bf33ec32e7b6 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Mon, 2 Oct 2023 17:26:53 +0200 Subject: [PATCH] HTTP/2 concurrent streams proper check #7695 --- .../http/http2/Http2ConnectionWriter.java | 39 +++++++----- .../webclient/http2/Http2ClientStream.java | 8 +-- .../webserver/http2/Http2Connection.java | 21 +++---- .../http2/Http2ConnectionStreams.java | 63 +++++++++++++++++++ .../webserver/http2/Http2ServerStream.java | 53 ++++++++-------- 5 files changed, 125 insertions(+), 59 deletions(-) create mode 100644 webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ConnectionStreams.java diff --git a/http/http2/src/main/java/io/helidon/http/http2/Http2ConnectionWriter.java b/http/http2/src/main/java/io/helidon/http/http2/Http2ConnectionWriter.java index 75536aff36a..4735607b026 100755 --- a/http/http2/src/main/java/io/helidon/http/http2/Http2ConnectionWriter.java +++ b/http/http2/src/main/java/io/helidon/http/http2/Http2ConnectionWriter.java @@ -17,7 +17,6 @@ package io.helidon.http.http2; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -76,7 +75,9 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag int maxFrameSize = flowControl.maxFrameSize(); - return withStreamLock(() -> { + lock(); + try { + int written = 0; headerBuffer.clear(); headers.write(outboundDynamicTable, responseHuffman, headerBuffer); @@ -131,7 +132,9 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag written += Http2FrameHeader.LENGTH; noLockWrite(new Http2FrameData(frameHeader, fragment)); return written; - }); + } finally { + streamLock.unlock(); + } } @Override @@ -144,7 +147,8 @@ public int writeHeaders(Http2Headers headers, // we must enforce parallelism of exactly 1, to make sure the dynamic table is updated // and then immediately written - return withStreamLock(() -> { + lock(); + try { int bytesWritten = 0; bytesWritten += writeHeaders(headers, streamId, flags, flowControl); @@ -154,7 +158,9 @@ public int writeHeaders(Http2Headers headers, bytesWritten += dataFrame.header().length(); return bytesWritten; - }); + } finally { + streamLock.unlock(); + } } /** @@ -164,27 +170,26 @@ public int writeHeaders(Http2Headers headers, * @throws InterruptedException in case we fail to lock on the stream */ public void updateHeaderTableSize(long newSize) throws InterruptedException { - withStreamLock(() -> { + lock(); + try { outboundDynamicTable.protocolMaxTableSize(newSize); - return null; - }); + } finally { + streamLock.unlock(); + } } private void lockedWrite(Http2FrameData frame) { - withStreamLock(() -> { + lock(); + try { noLockWrite(frame); - return null; - }); + } finally { + streamLock.unlock(); + } } - private T withStreamLock(Callable callable) { + private void lock() { try { streamLock.lockInterruptibly(); - try { - return callable.call(); - } finally { - streamLock.unlock(); - } } catch (RuntimeException e) { throw e; } catch (Exception e) { diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java index 63cec5feca7..c58e75adf83 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java @@ -178,9 +178,9 @@ public void closeResource() { } void trailers(Http2Headers headers, boolean endOfStream) { - this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true); + state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true); readState = readState.check(ReadState.END); - this.trailers.complete(headers.httpHeaders()); + trailers.complete(headers.httpHeaders()); } CompletableFuture trailers() { @@ -384,9 +384,7 @@ private Http2FrameData readOne(Duration pollTimeout) { Http2Headers http2Headers = readHeaders(requestHuffman, false); this.trailers(http2Headers, endOfStream); } - default -> { - throw new IllegalStateException("Client is in wrong read state " + readState.name()); - } + default -> throw new IllegalStateException("Client is in wrong read state " + readState.name()); } } break; diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java index 17ca91d6d7d..e489d5734dd 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java @@ -21,9 +21,7 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; @@ -85,7 +83,7 @@ public class Http2Connection implements ServerConnection, InterruptableTask REMOVABLE_STREAMS = Set.of(Http2StreamState.CLOSED, Http2StreamState.HALF_CLOSED_LOCAL); - private final Map streams = new HashMap<>(1000); + private final Http2ConnectionStreams streams = new Http2ConnectionStreams(); private final ConnectionContext ctx; private final Http2Config http2Config; private final HttpRouting routing; @@ -101,6 +99,7 @@ public class Http2Connection implements ServerConnection, InterruptableTask connectionHeaders; + private final long maxClientConcurrentStreams; // initial client settings, until we receive real ones private Http2Settings clientSettings = Http2Settings.builder() .build(); @@ -113,7 +112,6 @@ public class Http2Connection implements ServerConnection, InterruptableTask streamId && context.stream().streamState() != Http2StreamState.IDLE) { throw new Http2Exception(Http2ErrorCode.PROTOCOL, "Stream " + streamId @@ -789,6 +786,7 @@ private StreamContext stream(int streamId) { streamContext = new StreamContext(streamId, http2Config.maxHeaderListSize(), new Http2ServerStream(ctx, + streams, routing, http2Config, subProviders, @@ -797,7 +795,8 @@ private StreamContext stream(int streamId) { clientSettings, connectionWriter, flowControl)); - streams.put(streamId, streamContext); + streams.doMaintenance(maxClientConcurrentStreams); + streams.put(streamContext); } // any request for a specific stream is now considered a valid update of connection (ignoring management messages // on stream 0) @@ -842,7 +841,7 @@ private enum State { UNKNOWN } - private record StreamRunnable(Map streams, + private record StreamRunnable(Http2ConnectionStreams streams, Http2ServerStream stream, Thread handlerThread) implements Runnable { @@ -868,7 +867,7 @@ public void run() { } } - private static class StreamContext { + static class StreamContext { private final List continuationData = new ArrayList<>(); private final long maxHeaderListSize; private final int streamId; diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ConnectionStreams.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ConnectionStreams.java new file mode 100644 index 00000000000..965a9b84b60 --- /dev/null +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ConnectionStreams.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.http2; + +import java.util.Collection; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +class Http2ConnectionStreams { + private final Map streams = new ConcurrentHashMap<>(1000); + private final Queue forRemoval = new ConcurrentLinkedQueue<>(); + + void put(Http2Connection.StreamContext ctx){ + streams.put(ctx.stream().streamId(), ctx); + } + + Http2Connection.StreamContext get(int streamId){ + return streams.get(streamId); + } + + Collection contexts(){ + return streams.values(); + } + + boolean isEmpty(){ + return streams.isEmpty(); + } + + void remove(int streamId){ + forRemoval.add(streamId); + } + + int size(){ + return streams.size(); + } + + void doMaintenance(long maxConcurrentStreams){ + if (streams.size() < maxConcurrentStreams) { + return; + } + for (Integer streamId = forRemoval.poll(); + streamId != null; + streamId = forRemoval.poll()) { + streams.remove(streamId); + } + } +} diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java index 6621b18b242..06b1915510c 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java @@ -65,7 +65,7 @@ /** * Server HTTP/2 stream implementation. */ -public class Http2ServerStream implements Runnable, Http2Stream { +class Http2ServerStream implements Runnable, Http2Stream { private static final DataFrame TERMINATING_FRAME = new DataFrame(Http2FrameHeader.create(0, Http2FrameTypes.DATA, @@ -85,6 +85,8 @@ public class Http2ServerStream implements Runnable, Http2Stream { private final Router router; private final ArrayBlockingQueue inboundData = new ArrayBlockingQueue<>(32); private final StreamFlowControl flowControl; + private final Http2ConnectionStreams streams; + private final HttpRouting routing; private boolean wasLastDataFrame = false; private volatile Http2Headers headers; @@ -94,7 +96,6 @@ public class Http2ServerStream implements Runnable, Http2Stream { private WriteState writeState = WriteState.INIT; private Http2SubProtocolSelector.SubProtocolHandler subProtocolHandler; private long expectedLength = -1; - private HttpRouting routing; private HttpPrologue prologue; // create a semaphore if accessed before we get the one from connection // must be volatile, as it is accessed both from connection thread and from stream thread @@ -105,6 +106,7 @@ public class Http2ServerStream implements Runnable, Http2Stream { * A new HTTP/2 server stream. * * @param ctx connection context + * @param streams * @param routing HTTP routing * @param http2Config HTTP/2 configuration * @param subProviders HTTP/2 sub protocol selectors @@ -114,7 +116,8 @@ public class Http2ServerStream implements Runnable, Http2Stream { * @param writer writer * @param connectionFlowControl connection flow control */ - public Http2ServerStream(ConnectionContext ctx, + Http2ServerStream(ConnectionContext ctx, + Http2ConnectionStreams streams, HttpRouting routing, Http2Config http2Config, List subProviders, @@ -124,6 +127,7 @@ public Http2ServerStream(ConnectionContext ctx, Http2StreamWriter writer, ConnectionFlowControl connectionFlowControl) { this.ctx = ctx; + this.streams = streams; this.routing = routing; this.http2Config = http2Config; this.subProviders = subProviders; @@ -139,20 +143,6 @@ public Http2ServerStream(ConnectionContext ctx, ); } - /** - * Check this stream is not closed. - * This method is called from connection thread. - * - * @throws Http2Exception in case this stream is closed - */ - public void checkNotClosed() throws Http2Exception { - if (state == Http2StreamState.HALF_CLOSED_REMOTE - || state == Http2StreamState.CLOSED) { - throw new Http2Exception(Http2ErrorCode.STREAM_CLOSED, - "Stream " + streamId + " is closed. State: " + state); - } - } - /** * Check if data can be received on this stream. * This method is called from connection thread. @@ -329,20 +319,26 @@ public void run() { } int writeHeaders(Http2Headers http2Headers, boolean endOfStream) { - writeState = writeState.check(endOfStream ? WriteState.END : WriteState.HEADERS_SENT); - + writeState = writeState.checkAndMove(WriteState.HEADERS_SENT); Http2Flag.HeaderFlags flags; if (endOfStream) { + writeState = writeState.checkAndMove(WriteState.END); + streams.remove(this.streamId); flags = Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM); } else { flags = Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS); } + return writer.writeHeaders(http2Headers, streamId, flags, flowControl.outbound()); } int writeHeadersWithData(Http2Headers http2Headers, int contentLength, BufferData bufferData, boolean endOfStream) { - writeState = writeState.check(WriteState.HEADERS_SENT); - writeState = writeState.check(endOfStream ? WriteState.END : WriteState.DATA_SENT); + writeState = writeState.checkAndMove(WriteState.HEADERS_SENT); + writeState = writeState.checkAndMove(WriteState.DATA_SENT); + if (endOfStream) { + writeState = writeState.checkAndMove(WriteState.END); + streams.remove(this.streamId); + } Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(contentLength, @@ -357,7 +353,11 @@ int writeHeadersWithData(Http2Headers http2Headers, int contentLength, BufferDat } int writeData(BufferData bufferData, boolean endOfStream) { - writeState = writeState.check(endOfStream ? WriteState.END : WriteState.DATA_SENT); + writeState = writeState.checkAndMove(WriteState.DATA_SENT); + if (endOfStream) { + writeState = writeState.checkAndMove(WriteState.END); + streams.remove(this.streamId); + } Http2FrameData frameData = new Http2FrameData(Http2FrameHeader.create(bufferData.available(), @@ -371,7 +371,8 @@ int writeData(BufferData bufferData, boolean endOfStream) { } int writeTrailers(Http2Headers http2trailers) { - writeState = writeState.check(WriteState.TRAILERS_SENT); + writeState = writeState.checkAndMove(WriteState.TRAILERS_SENT); + streams.remove(this.streamId); return writer.writeHeaders(http2trailers, streamId, @@ -381,7 +382,7 @@ int writeTrailers(Http2Headers http2trailers) { void write100Continue() { if (writeState == WriteState.EXPECTED_100) { - writeState = writeState.check(WriteState.CONTINUE_100_SENT); + writeState = writeState.checkAndMove(WriteState.CONTINUE_100_SENT); Header status = HeaderValues.createCached(Http2Headers.STATUS_NAME, 100); Http2Headers http2Headers = Http2Headers.create(WritableHeaders.create().add(status)); @@ -431,7 +432,7 @@ private void handle() { this.expectedLength = httpHeaders.get(HeaderNames.CONTENT_LENGTH).get(long.class); } if (headers.httpHeaders().contains(HeaderValues.EXPECT_100)) { - writeState = writeState.check(WriteState.EXPECTED_100); + writeState = writeState.checkAndMove(WriteState.EXPECTED_100); } subProtocolHandler = null; @@ -534,7 +535,7 @@ private enum WriteState { this.allowedTransitions = Set.of(allowedTransitions); } - WriteState check(WriteState newState) { + WriteState checkAndMove(WriteState newState) { if (this == newState || allowedTransitions.contains(newState)) { return newState; }