Skip to content

Commit

Permalink
Merge branch 'release/0.9.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
mostroverkhov committed Feb 9, 2023
2 parents c9c3e70 + e3dca48 commit e207c5c
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 17 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ Per-core throughput [this codec perf-test](https://github.com/jauntsdn/netty-web
comparison with netty's out-of-the-box websocket handlers:
non-masked frames with 8, 64, 125, 1000 bytes of payload over encrypted/non-encrypted connection.

java 9+
```
./gradlew clean build installDist
./perf_server_run.sh
./perf_client_run.sh
```
java 8
```
./gradlew clean build installDist
./perf_server_java8_run.sh
./perf_client_run.sh
```

* non-encrypted

Expand Down Expand Up @@ -166,7 +173,7 @@ repositories {
}
dependencies {
implementation "com.jauntsdn.netty:netty-websocket-http1:0.9.0"
implementation "com.jauntsdn.netty:netty-websocket-http1:0.9.1"
}
```

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group=com.jauntsdn.netty
version=0.9.1
version=0.9.2

googleJavaFormatPluginVersion=0.9
dependencyManagementPluginVersion=1.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
Expand All @@ -28,13 +30,25 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -174,6 +188,123 @@ void serverBuilderMissingHandler() {
});
}

@Timeout(15)
@Test
void clientTimeout() throws InterruptedException {
Channel s =
server =
new ServerBootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.safeRelease(msg);
}
});
}
})
.bind("localhost", 0)
.sync()
.channel();

AttributeKey<ChannelFuture> handshakeKey = AttributeKey.newInstance("handshake");

Channel client =
new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {

HttpClientCodec http1Codec = new HttpClientCodec();
HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536);

WebSocketClientProtocolHandler webSocketProtocolHandler =
WebSocketClientProtocolHandler.create()
.handshakeTimeoutMillis(1)
.allowMaskMismatch(true)
.webSocketHandler(
(ctx, webSocketFrameFactory) -> {
throw new AssertionError("should not be called");
})
.build();

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(http1Codec, http1Aggregator, webSocketProtocolHandler);

ChannelFuture handshake = webSocketProtocolHandler.handshakeCompleted();
ch.attr(handshakeKey).set(handshake);
}
})
.connect(s.localAddress())
.sync()
.channel();

ChannelFuture handshakeFuture = client.attr(handshakeKey).get();
handshakeFuture.await();
Throwable cause = handshakeFuture.cause();
Assertions.assertThat(cause).isNotNull();
Assertions.assertThat(cause).isInstanceOf(WebSocketClientHandshakeException.class);
client.closeFuture().await();
Assertions.assertThat(client.isOpen()).isFalse();
}

@Timeout(15)
@Test
void serverNonWebSocketRequest() throws InterruptedException {
WebSocketDecoderConfig decoderConfig = webSocketDecoderConfig(false, true, 125);
TestWebSocketHandler serverHandler = new TestWebSocketHandler();
Channel s = server = testServer("/", decoderConfig, serverHandler);

AttributeKey<Future<FullHttpResponse>> handshakeKey = AttributeKey.newInstance("response");

Channel client =
new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {

HttpClientCodec http1Codec = new HttpClientCodec();
HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536);
NonWebSocketRequestHandler nonWebSocketRequestHandler =
new NonWebSocketRequestHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(http1Codec, http1Aggregator, nonWebSocketRequestHandler);

Future<FullHttpResponse> handshake = nonWebSocketRequestHandler.response();
ch.attr(handshakeKey).set(handshake);
}
})
.connect(s.localAddress())
.sync()
.channel();

Future<FullHttpResponse> responseFuture = client.attr(handshakeKey).get();
responseFuture.await();
FullHttpResponse response = responseFuture.getNow();
try {
Assertions.assertThat(response).isNotNull();
Assertions.assertThat(response.status()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
} finally {
response.release();
}
client.closeFuture().await();
Assertions.assertThat(client.isOpen()).isFalse();
}

static Channel testClient(
SocketAddress address,
String path,
Expand Down Expand Up @@ -236,6 +367,51 @@ static Channel testServer(
.channel();
}

static class NonWebSocketRequestHandler extends ChannelInboundHandlerAdapter {
private Promise<FullHttpResponse> responsePromise;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpResponse) {
responsePromise.trySuccess((FullHttpResponse) msg);
return;
}
super.channelRead(ctx, msg);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
responsePromise.tryFailure(new ClosedChannelException());
super.channelInactive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
responsePromise.tryFailure(cause);
super.exceptionCaught(ctx, cause);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
responsePromise = new DefaultPromise<>(ctx.executor());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
FullHttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/", Unpooled.EMPTY_BUFFER);

ctx.writeAndFlush(request);
}

Future<FullHttpResponse> response() {
return responsePromise;
}
}

static class TestAcceptor extends ChannelInitializer<SocketChannel> {
private final String path;
private final WebSocketDecoderConfig webSocketDecoderConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ void tearDown() throws Exception {
}
}

@Test
void serverBuilderMinimalConfigIsValid() {
WebSocketServerProtocolHandler serverProtocolHandler =
WebSocketServerProtocolHandler.create()
.webSocketCallbacksHandler(
(ctx, webSocketFrameFactory) -> {
throw new AssertionError("never called");
})
.build();
}

@Test
void clientBuilderMinimalConfigIsValid() {
WebSocketClientProtocolHandler clientProtocolHandler =
WebSocketClientProtocolHandler.create()
.webSocketHandler(
(ctx, webSocketFrameFactory) -> {
throw new AssertionError("never called");
})
.build();
}

@Timeout(15)
@Test
void frameSizeLimit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -110,16 +111,29 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
mask,
expectMaskedFrames,
allowMaskMismatch);
h.handshake(ctx.channel());
startHandshakeTimeout(ctx, handshakeTimeoutMillis);
h.handshake(ctx.channel())
.addListener(
future -> {
Throwable cause = future.cause();
if (cause != null) {
handshakeCompleted.tryFailure(cause);
ctx.fireExceptionCaught(cause);
cancelHandshakeTimeout();
} else {
ctx.fireUserEventTriggered(
io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
.ClientHandshakeStateEvent.HANDSHAKE_ISSUED);
}
});
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cancelHandshakeTimeout();
ChannelPromise completed = handshakeCompleted;
if (!completed.isDone()) {
completed.setFailure(new ClosedChannelException());
completed.tryFailure(new ClosedChannelException());
}
super.channelInactive(ctx);
}
Expand Down Expand Up @@ -153,15 +167,22 @@ private void completeHandshake(ChannelHandlerContext ctx, FullHttpResponse respo
try {
h.finishHandshake(ctx.channel(), response);
handshaker = null;
cancelHandshakeTimeout();
} catch (WebSocketHandshakeException e) {
handshakeCompleted.setFailure(e);
} catch (Exception e) {
handshakeCompleted.tryFailure(e);
if (!(e instanceof WebSocketHandshakeException)) {
ctx.fireExceptionCaught(e);
}
ctx.close();
return;
} finally {
cancelHandshakeTimeout();
}
ctx.pipeline().remove(this);
WebSocketCallbacksHandler.exchange(ctx, webSocketHandler);
handshakeCompleted.setSuccess();
handshakeCompleted.trySuccess();
ctx.fireUserEventTriggered(
io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
}

private void startHandshakeTimeout(ChannelHandlerContext ctx, long timeoutMillis) {
Expand All @@ -170,7 +191,19 @@ private void startHandshakeTimeout(ChannelHandlerContext ctx, long timeoutMillis
.schedule(
() -> {
handshakeTimeoutFuture = null;
ctx.close();
ChannelPromise handshake = handshakeCompleted;
if (!handshake.isDone()
&& handshake.tryFailure(
new WebSocketClientHandshakeException(
"websocket handshake timeout after "
+ handshakeTimeoutMillis
+ " millis"))) {
ctx.flush();
ctx.fireUserEventTriggered(
io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT);
ctx.close();
}
},
timeoutMillis,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -213,7 +246,7 @@ public static final class Builder {
private String subprotocol;
private HttpHeaders headers;
private boolean mask = true;
private boolean allowMaskMismatch;
private boolean allowMaskMismatch = true;
private int maxFramePayloadLength = 65_535;
private long handshakeTimeoutMillis = 15_000;
private WebSocketCallbacksHandler webSocketHandler;
Expand Down
Loading

0 comments on commit e207c5c

Please sign in to comment.