Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: HTTP2 concurrent streams check #7697

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.helidon.http.http2;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -76,7 +75,9 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag

int maxFrameSize = flowControl.maxFrameSize();

return withStreamLock(() -> {
lock();
try {

int written = 0;
headerBuffer.clear();
headers.write(outboundDynamicTable, responseHuffman, headerBuffer);
Expand Down Expand Up @@ -131,7 +132,9 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag
written += Http2FrameHeader.LENGTH;
noLockWrite(new Http2FrameData(frameHeader, fragment));
return written;
});
} finally {
streamLock.unlock();
}
}

@Override
Expand All @@ -144,7 +147,8 @@ public int writeHeaders(Http2Headers headers,
// we must enforce parallelism of exactly 1, to make sure the dynamic table is updated
// and then immediately written

return withStreamLock(() -> {
lock();
try {
int bytesWritten = 0;

bytesWritten += writeHeaders(headers, streamId, flags, flowControl);
Expand All @@ -154,7 +158,9 @@ public int writeHeaders(Http2Headers headers,
bytesWritten += dataFrame.header().length();

return bytesWritten;
});
} finally {
streamLock.unlock();
}
}

/**
Expand All @@ -164,27 +170,26 @@ public int writeHeaders(Http2Headers headers,
* @throws InterruptedException in case we fail to lock on the stream
*/
public void updateHeaderTableSize(long newSize) throws InterruptedException {
withStreamLock(() -> {
lock();
try {
outboundDynamicTable.protocolMaxTableSize(newSize);
return null;
});
} finally {
streamLock.unlock();
}
}

private void lockedWrite(Http2FrameData frame) {
withStreamLock(() -> {
lock();
try {
noLockWrite(frame);
return null;
});
} finally {
streamLock.unlock();
}
}

private <T> T withStreamLock(Callable<T> callable) {
private void lock() {
danielkec marked this conversation as resolved.
Show resolved Hide resolved
try {
streamLock.lockInterruptibly();
try {
return callable.call();
} finally {
streamLock.unlock();
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public void closeResource() {
}

void trailers(Http2Headers headers, boolean endOfStream) {
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, false, endOfStream, true);
readState = readState.check(ReadState.END);
this.trailers.complete(headers.httpHeaders());
trailers.complete(headers.httpHeaders());
}

CompletableFuture<Headers> trailers() {
Expand Down Expand Up @@ -384,9 +384,7 @@ private Http2FrameData readOne(Duration pollTimeout) {
Http2Headers http2Headers = readHeaders(requestHuffman, false);
this.trailers(http2Headers, endOfStream);
}
default -> {
throw new IllegalStateException("Client is in wrong read state " + readState.name());
}
default -> throw new IllegalStateException("Client is in wrong read state " + readState.name());
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.http2;

/**
* Operations with expected contention over connection stream collection,
* which are going to be deferred to connection dispatch thread to be executed on.
*/
sealed interface Http2ConcurrentConnectionStreams permits Http2ConnectionStreams {
/**
* Contention expected between connection dispatch thread and stream threads.
*
* @param streamId id of the stream to be removed at nearest maintenance
*/
void remove(int streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;

Expand Down Expand Up @@ -85,7 +83,7 @@ public class Http2Connection implements ServerConnection, InterruptableTask<Void
private static final int FRAME_HEADER_LENGTH = 9;
private static final Set<Http2StreamState> REMOVABLE_STREAMS =
Set.of(Http2StreamState.CLOSED, Http2StreamState.HALF_CLOSED_LOCAL);
private final Map<Integer, StreamContext> streams = new HashMap<>(1000);
private final Http2ConnectionStreams streams = new Http2ConnectionStreams();
private final ConnectionContext ctx;
private final Http2Config http2Config;
private final HttpRouting routing;
Expand All @@ -101,6 +99,7 @@ public class Http2Connection implements ServerConnection, InterruptableTask<Void
private final ConnectionFlowControl flowControl;
private final WritableHeaders<?> connectionHeaders;

private final long maxClientConcurrentStreams;
// initial client settings, until we receive real ones
private Http2Settings clientSettings = Http2Settings.builder()
.build();
Expand All @@ -113,7 +112,6 @@ public class Http2Connection implements ServerConnection, InterruptableTask<Void
private State state = State.WRITE_SERVER_SETTINGS;
private int continuationExpectedStreamId;
private int lastStreamId;
private long maxClientConcurrentStreams;
private boolean initConnectionHeaders;
private volatile ZonedDateTime lastRequestTimestamp;
private volatile Thread myThread;
Expand Down Expand Up @@ -230,7 +228,7 @@ public void clientSettings(Http2Settings http2Settings) {
//6.9.2/1 - SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state)
for (StreamContext sctx : streams.values()) {
for (StreamContext sctx : streams.contexts()) {
Http2StreamState streamState = sctx.stream.streamState();
if (streamState == Http2StreamState.OPEN || streamState == Http2StreamState.HALF_CLOSED_REMOTE) {
sctx.stream.flowControl().outbound().resetStreamWindowSize(initialWindowSize.intValue());
Expand Down Expand Up @@ -766,12 +764,11 @@ private StreamContext stream(int streamId) {
StreamContext streamContext = streams.get(streamId);
if (streamContext == null) {
if (same) {
throw new Http2Exception(Http2ErrorCode.STREAM_CLOSED,
"Stream closed");
throw new Http2Exception(Http2ErrorCode.STREAM_CLOSED, "Stream closed");
}
if (streamId < lastStreamId) {
// check if the newer streams are in idle state (if yes, this is OK)
for (StreamContext context : streams.values()) {
for (StreamContext context : streams.contexts()) {
if (context.streamId > streamId && context.stream().streamState() != Http2StreamState.IDLE) {
throw new Http2Exception(Http2ErrorCode.PROTOCOL,
"Stream " + streamId
Expand All @@ -789,6 +786,7 @@ private StreamContext stream(int streamId) {
streamContext = new StreamContext(streamId,
http2Config.maxHeaderListSize(),
new Http2ServerStream(ctx,
streams,
routing,
http2Config,
subProviders,
Expand All @@ -797,7 +795,8 @@ private StreamContext stream(int streamId) {
clientSettings,
connectionWriter,
flowControl));
streams.put(streamId, streamContext);
streams.put(streamContext);
streams.doMaintenance(maxClientConcurrentStreams);
}
// any request for a specific stream is now considered a valid update of connection (ignoring management messages
// on stream 0)
Expand Down Expand Up @@ -842,7 +841,7 @@ private enum State {
UNKNOWN
}

private record StreamRunnable(Map<Integer, StreamContext> streams,
private record StreamRunnable(Http2ConnectionStreams streams,
Http2ServerStream stream,
Thread handlerThread) implements Runnable {

Expand All @@ -868,7 +867,7 @@ public void run() {
}
}

private static class StreamContext {
static class StreamContext {
private final List<Http2FrameData> continuationData = new ArrayList<>();
private final long maxHeaderListSize;
private final int streamId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.http2;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* HTTP/2 streams bound to single connection.
* Only methods implemented from {@link io.helidon.webserver.http2.Http2ConcurrentConnectionStreams}
* are thread safe, rest needs to be called from connection dispatch thread only!
*/
final class Http2ConnectionStreams implements Http2ConcurrentConnectionStreams {
private final Map<Integer, Http2Connection.StreamContext> streams = new HashMap<>(1000);
private final Queue<Integer> forRemoval = new ConcurrentLinkedQueue<>();

@Override
public void remove(int streamId) {
forRemoval.add(streamId);
}

void put(Http2Connection.StreamContext ctx) {
streams.put(ctx.stream().streamId(), ctx);
}

Http2Connection.StreamContext get(int streamId) {
return streams.get(streamId);
}

Collection<Http2Connection.StreamContext> contexts() {
return streams.values();
}

boolean isEmpty() {
return streams.isEmpty();
}

int size() {
return streams.size();
}

void doMaintenance(long maxConcurrentStreams) {
if (streams.size() < maxConcurrentStreams) {
return;
}
for (Integer streamId = forRemoval.poll();
streamId != null;
streamId = forRemoval.poll()) {
streams.remove(streamId);
}
}
}
Loading