Skip to content

Commit

Permalink
HTTP/2 concurrent streams proper check #7695
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Oct 2, 2023
1 parent a57bbca commit c422786
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.helidon.http.http2;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -76,7 +75,9 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag

int maxFrameSize = flowControl.maxFrameSize();

return withStreamLock(() -> {
lock();
try {

int written = 0;
headerBuffer.clear();
headers.write(outboundDynamicTable, responseHuffman, headerBuffer);
Expand Down Expand Up @@ -131,7 +132,9 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag
written += Http2FrameHeader.LENGTH;
noLockWrite(new Http2FrameData(frameHeader, fragment));
return written;
});
} finally {
streamLock.unlock();
}
}

@Override
Expand All @@ -144,7 +147,8 @@ public int writeHeaders(Http2Headers headers,
// we must enforce parallelism of exactly 1, to make sure the dynamic table is updated
// and then immediately written

return withStreamLock(() -> {
lock();
try {
int bytesWritten = 0;

bytesWritten += writeHeaders(headers, streamId, flags, flowControl);
Expand All @@ -154,7 +158,9 @@ public int writeHeaders(Http2Headers headers,
bytesWritten += dataFrame.header().length();

return bytesWritten;
});
} finally {
streamLock.unlock();
}
}

/**
Expand All @@ -164,27 +170,26 @@ public int writeHeaders(Http2Headers headers,
* @throws InterruptedException in case we fail to lock on the stream
*/
public void updateHeaderTableSize(long newSize) throws InterruptedException {
withStreamLock(() -> {
lock();
try {
outboundDynamicTable.protocolMaxTableSize(newSize);
return null;
});
} finally {
streamLock.unlock();
}
}

private void lockedWrite(Http2FrameData frame) {
withStreamLock(() -> {
lock();
try {
noLockWrite(frame);
return null;
});
} finally {
streamLock.unlock();
}
}

private <T> T withStreamLock(Callable<T> callable) {
private void lock() {
try {
streamLock.lockInterruptibly();
try {
return callable.call();
} finally {
streamLock.unlock();
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public void closeResource() {
}

void trailers(Http2Headers headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
readState = readState.check(ReadState.END);
this.trailers.complete(headers.httpHeaders());
trailers.complete(headers.httpHeaders());
}

CompletableFuture<Headers> trailers() {
Expand Down Expand Up @@ -384,9 +384,7 @@ private Http2FrameData readOne(Duration pollTimeout) {
Http2Headers http2Headers = readHeaders(requestHuffman, false);
this.trailers(http2Headers, endOfStream);
}
default -> {
throw new IllegalStateException("Client is in wrong read state " + readState.name());
}
default -> throw new IllegalStateException("Client is in wrong read state " + readState.name());
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;

Expand Down Expand Up @@ -85,7 +83,7 @@ public class Http2Connection implements ServerConnection, InterruptableTask<Void
private static final int FRAME_HEADER_LENGTH = 9;
private static final Set<Http2StreamState> REMOVABLE_STREAMS =
Set.of(Http2StreamState.CLOSED, Http2StreamState.HALF_CLOSED_LOCAL);
private final Map<Integer, StreamContext> streams = new HashMap<>(1000);
private final Http2ConnectionStreams streams = new Http2ConnectionStreams();
private final ConnectionContext ctx;
private final Http2Config http2Config;
private final HttpRouting routing;
Expand All @@ -101,6 +99,7 @@ public class Http2Connection implements ServerConnection, InterruptableTask<Void
private final ConnectionFlowControl flowControl;
private final WritableHeaders<?> connectionHeaders;

private final long maxClientConcurrentStreams;
// initial client settings, until we receive real ones
private Http2Settings clientSettings = Http2Settings.builder()
.build();
Expand All @@ -113,7 +112,6 @@ public class Http2Connection implements ServerConnection, InterruptableTask<Void
private State state = State.WRITE_SERVER_SETTINGS;
private int continuationExpectedStreamId;
private int lastStreamId;
private long maxClientConcurrentStreams;
private boolean initConnectionHeaders;
private volatile ZonedDateTime lastRequestTimestamp;
private volatile Thread myThread;
Expand Down Expand Up @@ -230,7 +228,7 @@ public void clientSettings(Http2Settings http2Settings) {
//6.9.2/1 - SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state)
for (StreamContext sctx : streams.values()) {
for (StreamContext sctx : streams.contexts()) {
Http2StreamState streamState = sctx.stream.streamState();
if (streamState == Http2StreamState.OPEN || streamState == Http2StreamState.HALF_CLOSED_REMOTE) {
sctx.stream.flowControl().outbound().resetStreamWindowSize(initialWindowSize.intValue());
Expand Down Expand Up @@ -766,12 +764,11 @@ private StreamContext stream(int streamId) {
StreamContext streamContext = streams.get(streamId);
if (streamContext == null) {
if (same) {
throw new Http2Exception(Http2ErrorCode.STREAM_CLOSED,
"Stream closed");
throw new Http2Exception(Http2ErrorCode.STREAM_CLOSED, "Stream closed");
}
if (streamId < lastStreamId) {
// check if the newer streams are in idle state (if yes, this is OK)
for (StreamContext context : streams.values()) {
for (StreamContext context : streams.contexts()) {
if (context.streamId > streamId && context.stream().streamState() != Http2StreamState.IDLE) {
throw new Http2Exception(Http2ErrorCode.PROTOCOL,
"Stream " + streamId
Expand All @@ -789,6 +786,7 @@ private StreamContext stream(int streamId) {
streamContext = new StreamContext(streamId,
http2Config.maxHeaderListSize(),
new Http2ServerStream(ctx,
streams,
routing,
http2Config,
subProviders,
Expand All @@ -797,7 +795,8 @@ private StreamContext stream(int streamId) {
clientSettings,
connectionWriter,
flowControl));
streams.put(streamId, streamContext);
streams.doMaintenance(maxClientConcurrentStreams);
streams.put(streamContext);
}
// any request for a specific stream is now considered a valid update of connection (ignoring management messages
// on stream 0)
Expand Down Expand Up @@ -842,7 +841,7 @@ private enum State {
UNKNOWN
}

private record StreamRunnable(Map<Integer, StreamContext> streams,
private record StreamRunnable(Http2ConnectionStreams streams,
Http2ServerStream stream,
Thread handlerThread) implements Runnable {

Expand All @@ -868,7 +867,7 @@ public void run() {
}
}

private static class StreamContext {
static class StreamContext {
private final List<Http2FrameData> continuationData = new ArrayList<>();
private final long maxHeaderListSize;
private final int streamId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.http2;

import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

class Http2ConnectionStreams {
private final Map<Integer, Http2Connection.StreamContext> streams = new ConcurrentHashMap<>(1000);
private final Queue<Integer> forRemoval = new ConcurrentLinkedQueue<>();

void put(Http2Connection.StreamContext ctx){
streams.put(ctx.stream().streamId(), ctx);
}

Http2Connection.StreamContext get(int streamId){
return streams.get(streamId);
}

Collection<Http2Connection.StreamContext> contexts(){
return streams.values();
}

boolean isEmpty(){
return streams.isEmpty();
}

void remove(int streamId){
forRemoval.add(streamId);
}

int size(){
return streams.size();
}

void doMaintenance(long maxConcurrentStreams){
if (streams.size() < maxConcurrentStreams) {
return;
}
for (Integer streamId = forRemoval.poll();
streamId != null;
streamId = forRemoval.poll()) {
streams.remove(streamId);
}
}
}
Loading

0 comments on commit c422786

Please sign in to comment.