From 5c4d176545f9727a164d39e5ce2f60e0d0ec95c8 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 3 Jan 2023 10:10:52 +0200 Subject: [PATCH 1/3] add perftest module (#1) --- netty-websocket-http1-perftest/build.gradle | 66 ++++ .../http/websocketx/perftest/client/Main.java | 364 ++++++++++++++++++ .../http/websocketx/perftest/server/Main.java | 177 +++++++++ .../src/main/resources/logback.xml | 13 + perf_client_run.sh | 3 + perf_server_run.sh | 3 + settings.gradle | 1 + 7 files changed, 627 insertions(+) create mode 100644 netty-websocket-http1-perftest/build.gradle create mode 100644 netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java create mode 100644 netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java create mode 100644 netty-websocket-http1-perftest/src/main/resources/logback.xml create mode 100755 perf_client_run.sh create mode 100755 perf_server_run.sh diff --git a/netty-websocket-http1-perftest/build.gradle b/netty-websocket-http1-perftest/build.gradle new file mode 100644 index 0000000..9a23563 --- /dev/null +++ b/netty-websocket-http1-perftest/build.gradle @@ -0,0 +1,66 @@ +/* + * Copyright 2022 - present Maksym Ostroverkhov. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id "application" +} + +description = "Perf test for netty based implementation of rfc6455 - the websocket protocol" + +dependencies { + implementation project(":netty-websocket-http1-test") + implementation "org.hdrhistogram:HdrHistogram" + implementation "org.slf4j:slf4j-api" + + if (osdetector.os == "linux") { + runtimeOnly "io.netty:netty-transport-native-epoll::${osdetector.classifier}" + } else if (osdetector.os == "osx") { + runtimeOnly "io.netty:netty-transport-native-kqueue::${osdetector.classifier}" + } + runtimeOnly "io.netty:netty-tcnative-boringssl-static::${osdetector.classifier}" + runtimeOnly "ch.qos.logback:logback-classic" +} + +task runServer(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main" +} + +task runClient(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main" +} + +task serverScripts(type: CreateStartScripts) { + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main" + applicationName = "${project.name}-server" + classpath = startScripts.classpath + outputDir = startScripts.outputDir +} + +task clientScripts(type: CreateStartScripts) { + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main" + applicationName = "${project.name}-client" + classpath = startScripts.classpath + outputDir = startScripts.outputDir +} + +startScripts.dependsOn serverScripts +startScripts.dependsOn clientScripts + +tasks.named("startScripts") { + enabled = false +} \ No newline at end of file diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java new file mode 100644 index 0000000..645ab4d --- /dev/null +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java @@ -0,0 +1,364 @@ +/* + * Copyright 2022 - present Maksym Ostroverkhov. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client; + +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameFactory; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameListener; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol; +import com.jauntsdn.netty.handler.codec.http.websocketx.test.Security; +import com.jauntsdn.netty.handler.codec.http.websocketx.test.Transport; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.ResourceLeakDetector; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Main { + private static final Logger logger = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws Exception { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + + String host = System.getProperty("HOST", "localhost"); + int port = Integer.parseInt(System.getProperty("PORT", "8088")); + int duration = Integer.parseInt(System.getProperty("DURATION", "600")); + boolean isNativeTransport = Boolean.parseBoolean(System.getProperty("NATIVE", "true")); + boolean isEncrypted = Boolean.parseBoolean(System.getProperty("ENCRYPT", "true")); + boolean isMasked = Boolean.parseBoolean(System.getProperty("MASK", "false")); + boolean isTotalFrames = Boolean.parseBoolean(System.getProperty("TOTAL", "false")); + int frameSize = Integer.parseInt(System.getProperty("FRAME", "64")); + int outboundFramesWindow = Integer.parseInt(System.getProperty("WINDOW", "2222")); + + boolean isOpensslAvailable = OpenSsl.isAvailable(); + boolean isEpollAvailable = Transport.isEpollAvailable(); + boolean isKqueueAvailable = Transport.isKqueueAvailable(); + + logger.info("\n==> http1 websocket test client\n"); + logger.info("\n==> remote address: {}:{}", host, port); + logger.info("\n==> duration: {}", duration); + logger.info("\n==> native transport: {}", isNativeTransport); + logger.info("\n==> epoll available: {}", isEpollAvailable); + logger.info("\n==> kqueue available: {}", isKqueueAvailable); + logger.info("\n==> openssl available: {}\n", isOpensslAvailable); + logger.info("\n==> encryption: {}\n", isEncrypted); + logger.info("\n==> frame masking: {}\n", isMasked); + logger.info("\n==> frame payload size: {}", frameSize); + logger.info("\n==> outbound frames window: {}", outboundFramesWindow); + + Transport transport = Transport.get(isNativeTransport); + logger.info("\n==> io transport: {}", transport.type()); + SslContext sslContext = isEncrypted ? Security.clientLocalSslContext() : null; + + List framesPayload = framesPayload(1000, frameSize); + FrameCounters frameCounters = new FrameCounters(isTotalFrames); + + Channel channel = + new Bootstrap() + .group(transport.eventLoopGroup()) + .channel(transport.clientChannel()) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + + HttpClientCodec http1Codec = new HttpClientCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + WebSocketCallbacksHandler webSocketHandler = + new WebSocketClientHandler( + frameCounters, + framesPayload, + ThreadLocalRandom.current(), + outboundFramesWindow); + + WebSocketClientProtocolHandler webSocketProtocolHandler = + WebSocketClientProtocolHandler.create() + .path("/echo") + .mask(isMasked) + .allowMaskMismatch(true) + .maxFramePayloadLength(65_535) + .webSocketHandler(webSocketHandler) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + if (sslContext != null) { + SslHandler sslHandler = sslContext.newHandler(ch.alloc()); + pipeline.addLast(sslHandler); + } + pipeline.addLast(http1Codec, http1Aggregator, webSocketProtocolHandler); + } + }) + .connect(new InetSocketAddress(host, port)) + .sync() + .channel(); + + int warmupMillis = 5000; + logger.info("==> warming up for {} millis...", warmupMillis); + channel + .eventLoop() + .schedule( + () -> { + logger.info("==> warm up completed"); + frameCounters.start(); + channel + .eventLoop() + .scheduleAtFixedRate( + new StatsReporter(frameCounters, frameSize), + 1000, + 1000, + TimeUnit.MILLISECONDS); + }, + warmupMillis, + TimeUnit.MILLISECONDS); + + channel.closeFuture().sync(); + logger.info("Client terminated"); + } + + private static class FrameCounters { + private final Recorder histogram; + private int frameCount; + private boolean isStarted; + + public FrameCounters(boolean totalFrames) { + histogram = totalFrames ? null : new Recorder(36000000000L, 3); + } + + private long totalFrameCount; + + public void start() { + isStarted = true; + } + + public void countFrame(long timestamp) { + if (!isStarted) { + return; + } + + if (histogram == null) { + totalFrameCount++; + } else { + frameCount++; + if (timestamp >= 0) { + histogram.recordValue(System.nanoTime() - timestamp); + } + } + } + + public Recorder histogram() { + return histogram; + } + + public int frameCount() { + int count = frameCount; + frameCount = 0; + return count; + } + + public long totalFrameCount() { + return totalFrameCount; + } + } + + private static class StatsReporter implements Runnable { + private final FrameCounters frameCounters; + private final int frameSize; + private int iteration; + + public StatsReporter(FrameCounters frameCounters, int frameSize) { + this.frameCounters = frameCounters; + this.frameSize = frameSize; + } + + @Override + public void run() { + Recorder histogram = frameCounters.histogram(); + if (histogram != null) { + Histogram h = histogram.getIntervalHistogram(); + long p50 = h.getValueAtPercentile(50) / 1000; + long p95 = h.getValueAtPercentile(95) / 1000; + long p99 = h.getValueAtPercentile(99) / 1000; + int count = frameCounters.frameCount(); + + logger.info("p50 => {} micros", p50); + logger.info("p95 => {} micros", p95); + logger.info("p99 => {} micros", p99); + logger.info("throughput => {} messages", count); + logger.info("throughput => {} kbytes\n", count * frameSize / (float) 1024); + } else { + if (++iteration % 10 == 0) { + logger.info( + "total frames, iteration {} => {}", iteration, frameCounters.totalFrameCount()); + } + } + } + } + + static class WebSocketClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { + + private final FrameCounters frameCounters; + private final List dataList; + private final Random random; + private final int window; + private int sendIndex; + private boolean isClosed; + private FrameWriter frameWriter; + + WebSocketClientHandler( + FrameCounters frameCounters, List dataList, Random random, int window) { + this.frameCounters = frameCounters; + this.dataList = dataList; + this.random = random; + this.window = window; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory frameFactory) { + frameWriter = new FrameWriter(ctx, frameFactory, window); + return this; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + frameWriter.startWrite(); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + isClosed = true; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + payload.release(); + return; + } + + long timeStamp = payload.readLong(); + frameCounters.countFrame(timeStamp); + payload.release(); + frameWriter.tryContinueWrite(); + } + + @Override + public void onChannelReadComplete(ChannelHandlerContext ctx) {} + + @Override + public void onUserEventTriggered(ChannelHandlerContext ctx, Object evt) {} + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + Channel ch = ctx.channel(); + if (!ch.isWritable()) { + ch.flush(); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!isClosed) { + isClosed = true; + logger.error("Channel error", cause); + ctx.close(); + } + } + + class FrameWriter { + private final ChannelHandlerContext ctx; + private final WebSocketFrameFactory frameFactory; + private final int window; + private int queued; + + FrameWriter(ChannelHandlerContext ctx, WebSocketFrameFactory frameFactory, int window) { + this.ctx = ctx; + this.frameFactory = frameFactory; + this.window = window; + } + + void startWrite() { + if (isClosed) { + return; + } + int cur = queued; + int w = window; + int writeCount = w - cur; + ChannelHandlerContext c = ctx; + for (int i = 0; i < writeCount; i++) { + c.write(webSocketFrame(c), c.voidPromise()); + } + queued = w; + c.flush(); + } + + void tryContinueWrite() { + int q = --queued; + if (q <= window / 2) { + startWrite(); + } + } + + ByteBuf webSocketFrame(ChannelHandlerContext ctx) { + List dl = dataList; + int dataIndex = random.nextInt(dl.size()); + ByteBuf data = dl.get(dataIndex); + int index = sendIndex++; + int dataSize = data.readableBytes(); + + WebSocketFrameFactory factory = frameFactory; + + ByteBuf frame = factory.createBinaryFrame(ctx.alloc(), Long.BYTES + dataSize); + frame.writeLong(index % 50_000 == 0 ? System.nanoTime() : -1).writeBytes(data, 0, dataSize); + + return factory.mask(frame); + } + } + } + + private static List framesPayload(int count, int size) { + Random random = new Random(); + List data = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + byte[] bytes = new byte[size]; + random.nextBytes(bytes); + data.add(Unpooled.wrappedBuffer(bytes)); + } + return data; + } +} diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java new file mode 100644 index 0000000..01e828a --- /dev/null +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java @@ -0,0 +1,177 @@ +/* + * Copyright 2022 - present Maksym Ostroverkhov. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server; + +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameFactory; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameListener; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol; +import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import com.jauntsdn.netty.handler.codec.http.websocketx.test.Security; +import com.jauntsdn.netty.handler.codec.http.websocketx.test.Transport; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.ResourceLeakDetector; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Main { + private static final Logger logger = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws Exception { + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + + String host = System.getProperty("HOST", "localhost"); + int port = Integer.parseInt(System.getProperty("PORT", "8088")); + boolean isNativeTransport = + Boolean.parseBoolean(System.getProperty("NATIVE_TRANSPORT", "true")); + boolean isEncrypted = Boolean.parseBoolean(System.getProperty("ENCRYPT", "true")); + + boolean isOpensslAvailable = OpenSsl.isAvailable(); + boolean isEpollAvailable = Transport.isEpollAvailable(); + boolean isKqueueAvailable = Transport.isKqueueAvailable(); + + logger.info("\n==> http1 websocket load test server\n"); + logger.info("\n==> bind address: {}:{}", host, port); + logger.info("\n==> native transport: {}", isNativeTransport); + logger.info("\n==> epoll available: {}", isEpollAvailable); + logger.info("\n==> kqueue available: {}", isKqueueAvailable); + logger.info("\n==> openssl available: {}", isOpensslAvailable); + logger.info("\n==> encryption: {}\n", isEncrypted); + + Transport transport = Transport.get(isNativeTransport); + logger.info("\n==> io transport: {}", transport.type()); + SslContext sslContext = isEncrypted ? Security.serverSslContext() : null; + + ServerBootstrap bootstrap = new ServerBootstrap(); + Channel server = + bootstrap + .group(transport.eventLoopGroup()) + .channel(transport.serverChannel()) + .childHandler(new ConnectionAcceptor(sslContext)) + .bind(host, port) + .sync() + .channel(); + logger.info("\n==> Server is listening on {}:{}", host, port); + server.closeFuture().sync(); + } + + private static class ConnectionAcceptor extends ChannelInitializer { + private final SslContext sslContext; + private final WebSocketDecoderConfig webSocketDecoderConfig; + + ConnectionAcceptor(SslContext sslContext) { + this.sslContext = sslContext; + this.webSocketDecoderConfig = + WebSocketDecoderConfig.newBuilder() + .allowMaskMismatch(true) + .expectMaskedFrames(false) + .maxFramePayloadLength(65_535) + .withUTF8Validator(false) + .build(); + } + + @Override + protected void initChannel(SocketChannel ch) { + HttpServerCodec http1Codec = new HttpServerCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + WebSocketCallbacksHandler webSocketHandler = new WebSocketServerCallbacksHandler(); + WebSocketServerProtocolHandler webSocketProtocolHandler = + WebSocketServerProtocolHandler.create() + .path("/echo") + .decoderConfig(webSocketDecoderConfig) + .webSocketCallbacksHandler(webSocketHandler) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + if (sslContext != null) { + SslHandler sslHandler = sslContext.newHandler(ch.alloc()); + pipeline.addLast(sslHandler); + } + pipeline.addLast(http1Codec).addLast(http1Aggregator).addLast(webSocketProtocolHandler); + } + } + + private static class WebSocketServerCallbacksHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private WebSocketFrameFactory frameFactory; + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory frameFactory) { + this.frameFactory = frameFactory; + return this; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) {} + + @Override + public void onClose(ChannelHandlerContext ctx) {} + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + payload.release(); + throw new IllegalStateException("received non-binary opcode"); + } + WebSocketFrameFactory factory = frameFactory; + ByteBuf frame = factory.createBinaryFrame(ctx.alloc(), payload.readableBytes()); + frame.writeBytes(payload); + factory.mask(frame); + payload.release(); + ctx.write(frame, ctx.voidPromise()); + } + + @Override + public void onChannelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + if (!ctx.channel().isWritable()) { + ctx.flush(); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (cause instanceof IOException) { + return; + } + logger.info("Unexpected websocket error", cause); + ctx.close(); + } + + @Override + public void onUserEventTriggered(ChannelHandlerContext ctx, Object evt) {} + } +} diff --git a/netty-websocket-http1-perftest/src/main/resources/logback.xml b/netty-websocket-http1-perftest/src/main/resources/logback.xml new file mode 100644 index 0000000..d15181d --- /dev/null +++ b/netty-websocket-http1-perftest/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + + %date{HH:mm:ss.SSS} %-10thread %-42logger %msg%n + + + + + + + diff --git a/perf_client_run.sh b/perf_client_run.sh new file mode 100755 index 0000000..4f76e80 --- /dev/null +++ b/perf_client_run.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +cd netty-websocket-http1-perftest/build/install/netty-websocket-http1-perftest/bin && ./netty-websocket-http1-perftest-client \ No newline at end of file diff --git a/perf_server_run.sh b/perf_server_run.sh new file mode 100755 index 0000000..d12ba1b --- /dev/null +++ b/perf_server_run.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +cd netty-websocket-http1-perftest/build/install/netty-websocket-http1-perftest/bin && ./netty-websocket-http1-perftest-server \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 6087417..951f71e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,5 +28,6 @@ rootProject.name = "netty-websocket-http1-parent" include "netty-websocket-http1" include "netty-websocket-http1-test" +include "netty-websocket-http1-perftest" include "netty-websocket-http1-soaktest" From bc942b19958c1486ef7414bee9c69ef36a55bfa5 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 3 Jan 2023 10:07:06 +0200 Subject: [PATCH 2/3] add websocket codec test --- .../http/websocketx/WebSocketCodecTest.java | 1285 +++++++++++++++++ 1 file changed, 1285 insertions(+) create mode 100644 netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java new file mode 100644 index 0000000..116550d --- /dev/null +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java @@ -0,0 +1,1285 @@ +/* + * Copyright 2022 - present Maksym Ostroverkhov. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.jauntsdn.netty.handler.codec.http.websocketx; + +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.util.ReferenceCountUtil; +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class WebSocketCodecTest { + static final Logger logger = LoggerFactory.getLogger(WebSocketCodecTest.class); + static final int SMALL_CODEC_MAX_FRAME_SIZE = 125; + static final int DEFAULT_CODEC_MAX_FRAME_SIZE = + Integer.parseInt(System.getProperty("MAX_FRAME_SIZE", "5000")); + + Channel server; + + @AfterEach + void tearDown() { + Channel s = server; + if (s != null) { + s.close(); + } + } + + @Timeout(300) + @MethodSource("maskingArgs") + @ParameterizedTest + void allSizeBinaryFramesDefaultDecoder( + boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) + throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + BinaryFramesTestClientHandler clientHandler = new BinaryFramesTestClientHandler(maxFrameSize); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory).isExactlyInstanceOf(webSocketFrameFactoryType); + Assertions.assertThat(client.pipeline().get(webSocketDecoderType)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @Test + void binaryFramesSmallDecoder() throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), false, false); + BinaryFramesTestClientHandler clientHandler = new BinaryFramesTestClientHandler(maxFrameSize); + Channel client = + webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory) + .isExactlyInstanceOf(NonMaskingWebSocketEncoder.FrameFactory.class); + Assertions.assertThat(client.pipeline().get(SmallWebSocketDecoder.class)).isNotNull(); + + CompletableFuture onCompleted = clientHandler.startFramesExchange(); + onCompleted.join(); + client.close(); + } + + @Timeout(300) + @MethodSource("maskingArgs") + @ParameterizedTest + void allSizeTextFramesDefaultDecoder( + boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) + throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + char content = 'a'; + Channel s = + server = nettyServer(new TextFramesTestServerHandler(maxFrameSize, content), mask, false); + TextFramesTestClientHandler clientHandler = + new TextFramesTestClientHandler(maxFrameSize, content); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory).isExactlyInstanceOf(webSocketFrameFactoryType); + Assertions.assertThat(client.pipeline().get(webSocketDecoderType)).isNotNull(); + + clientHandler.onFrameExchangeCompleted().join(); + client.close(); + } + + @Timeout(15) + @Test + void textFramesSmallDecoder() throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + char content = 'a'; + Channel s = + server = nettyServer(new TextFramesTestServerHandler(maxFrameSize, content), false, false); + TextFramesTestClientHandler clientHandler = + new TextFramesTestClientHandler(maxFrameSize, content); + Channel client = + webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory) + .isExactlyInstanceOf(NonMaskingWebSocketEncoder.FrameFactory.class); + Assertions.assertThat(client.pipeline().get(SmallWebSocketDecoder.class)).isNotNull(); + + clientHandler.onFrameExchangeCompleted().join(); + client.close(); + } + + @Timeout(15) + @MethodSource("maskingArgs") + @ParameterizedTest + void pingFramesDefaultDecoder( + boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) + throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new PingPongTestServerHandler(), mask, false); + PingFramesTestClientHandler clientHandler = + new PingFramesTestClientHandler(maxFrameSize, (byte) 0xFE); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory).isExactlyInstanceOf(webSocketFrameFactoryType); + Assertions.assertThat(client.pipeline().get(webSocketDecoderType)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @Test + void pingFramesSmallDecoder() throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new PingPongTestServerHandler(), false, false); + PingFramesTestClientHandler clientHandler = + new PingFramesTestClientHandler(maxFrameSize, (byte) 0xFE); + Channel client = + webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory) + .isExactlyInstanceOf(NonMaskingWebSocketEncoder.FrameFactory.class); + Assertions.assertThat(client.pipeline().get(SmallWebSocketDecoder.class)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @MethodSource("maskingArgs") + @ParameterizedTest + void pongFramesDefaultDecoder( + boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) + throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new PingPongTestServerHandler(), mask, false); + PongFramesTestClientHandler clientHandler = + new PongFramesTestClientHandler(maxFrameSize, (byte) 0xFE); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory).isExactlyInstanceOf(webSocketFrameFactoryType); + Assertions.assertThat(client.pipeline().get(webSocketDecoderType)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @Test + void pongFramesSmallDecoder() throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new PingPongTestServerHandler(), false, false); + PongFramesTestClientHandler clientHandler = + new PongFramesTestClientHandler(maxFrameSize, (byte) 0xFE); + Channel client = + webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory) + .isExactlyInstanceOf(NonMaskingWebSocketEncoder.FrameFactory.class); + Assertions.assertThat(client.pipeline().get(SmallWebSocketDecoder.class)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @ParameterizedTest + @MethodSource("maskingArgs") + void closeFramesDefaultDecoder( + boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) + throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new CloseTestServerHandler(), mask, false); + CloseFramesTestClientHandler clientHandler = + new CloseFramesTestClientHandler(WebSocketCloseStatus.NORMAL_CLOSURE, "NORMAL_CLOSURE"); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory).isExactlyInstanceOf(webSocketFrameFactoryType); + Assertions.assertThat(client.pipeline().get(webSocketDecoderType)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @Test + void closeFramesSmallDecoder() throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new CloseTestServerHandler(), false, false); + CloseFramesTestClientHandler clientHandler = + new CloseFramesTestClientHandler(WebSocketCloseStatus.NORMAL_CLOSURE, "NORMAL_CLOSURE"); + Channel client = + webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory) + .isExactlyInstanceOf(NonMaskingWebSocketEncoder.FrameFactory.class); + Assertions.assertThat(client.pipeline().get(SmallWebSocketDecoder.class)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @ParameterizedTest + @MethodSource("maskingArgs") + void fragmentDefaultDecoder( + boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) + throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + + Channel s = server = nettyServer(new FragmentTestServerHandler(1500), mask, false); + FragmentationFramesTestClientHandler clientHandler = + new FragmentationFramesTestClientHandler(3333); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory).isExactlyInstanceOf(webSocketFrameFactoryType); + Assertions.assertThat(client.pipeline().get(webSocketDecoderType)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(15) + @Test + void fragmentSmallDecoder() throws Exception { + int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; + + Channel s = server = nettyServer(new FragmentTestServerHandler(33), false, false); + FragmentationFramesTestClientHandler clientHandler = + new FragmentationFramesTestClientHandler(70); + Channel client = + webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); + + WebSocketFrameFactory webSocketFrameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(webSocketFrameFactory) + .isExactlyInstanceOf(NonMaskingWebSocketEncoder.FrameFactory.class); + Assertions.assertThat(client.pipeline().get(SmallWebSocketDecoder.class)).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + static Stream maskingArgs() { + return Stream.of( + arguments(true, MaskingWebSocketEncoder.FrameFactory.class, DefaultWebSocketDecoder.class), + arguments( + false, NonMaskingWebSocketEncoder.FrameFactory.class, DefaultWebSocketDecoder.class)); + } + + static Channel nettyServer( + ChannelHandler webSocketHandler, boolean expectMaskedFrames, boolean allowMaskMismatch) + throws Exception { + return new ServerBootstrap() + .group(new NioEventLoopGroup(1)) + .channel(NioServerSocketChannel.class) + .childHandler( + new ServerConnectionAcceptor(webSocketHandler, expectMaskedFrames, allowMaskMismatch)) + .bind("localhost", 0) + .sync() + .channel(); + } + + static Channel webSocketCallbacksClient( + SocketAddress address, + boolean mask, + boolean allowMaskMismatch, + int maxFramePayloadLength, + WebSocketCallbacksHandler webSocketHandler) + throws InterruptedException { + Channel channel = + new Bootstrap() + .group(new NioEventLoopGroup(1)) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + + HttpClientCodec http1Codec = new HttpClientCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + + WebSocketClientProtocolHandler webSocketProtocolHandler = + WebSocketClientProtocolHandler.create() + .path("/test") + .mask(mask) + .allowMaskMismatch(allowMaskMismatch) + .maxFramePayloadLength(maxFramePayloadLength) + .webSocketHandler(webSocketHandler) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(http1Codec, http1Aggregator, webSocketProtocolHandler); + } + }) + .connect(address) + .sync() + .channel(); + return channel; + } + + static class ServerConnectionAcceptor extends ChannelInitializer { + final ChannelHandler webSocketHandler; + final boolean expectMaskedFrames; + final boolean allowMaskMismatch; + + ServerConnectionAcceptor( + ChannelHandler webSocketHandler, boolean expectMaskedFrames, boolean allowMaskMismatch) { + this.webSocketHandler = webSocketHandler; + this.expectMaskedFrames = expectMaskedFrames; + this.allowMaskMismatch = allowMaskMismatch; + } + + @Override + protected void initChannel(SocketChannel ch) { + HttpServerCodec http1Codec = new HttpServerCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + + WebSocketDecoderConfig decoderConfig = + WebSocketDecoderConfig.newBuilder() + .expectMaskedFrames(expectMaskedFrames) + .allowMaskMismatch(allowMaskMismatch) + .withUTF8Validator(false) + .allowExtensions(false) + .maxFramePayloadLength(65535) + .build(); + + WebSocketServerProtocolHandler webSocketProtocolHandler = + new WebSocketServerProtocolHandler("/test", null, false, false, 15_000, decoderConfig); + + ch.pipeline() + .addLast(http1Codec, http1Aggregator, webSocketProtocolHandler, webSocketHandler); + } + } + + static class BinaryFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory webSocketFrameFactory; + private final int framesCount; + private int receivedFrames; + private int sentFrames; + private volatile ChannelHandlerContext ctx; + + BinaryFramesTestClientHandler(int maxFrameSize) { + this.framesCount = maxFrameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-binary frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + byte b = payload.readByte(); + if (b != (byte) 0xFE) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with unexpected content: " + Long.toHexString(b))); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + WebSocketFrameFactory factory = webSocketFrameFactory; + boolean pendingFlush = false; + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int frameSize = sentFrames; + ByteBuf binaryFrame = factory.createBinaryFrame(c.alloc(), frameSize); + for (int payloadIdx = 0; payloadIdx < frameSize; payloadIdx++) { + binaryFrame.writeByte(0xFE); + } + ByteBuf maskedBinaryFrame = factory.mask(binaryFrame); + sentFrames++; + if (ch.bytesBeforeUnwritable() < binaryFrame.capacity()) { + c.writeAndFlush(maskedBinaryFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(maskedBinaryFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + + static class TextFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final int framesCount; + final char expectedContent; + WebSocketFrameFactory webSocketFrameFactory; + int receivedFrames; + volatile ChannelHandlerContext ctx; + + TextFramesTestClientHandler(int maxFrameSize, char expectedContent) { + this.framesCount = maxFrameSize; + this.expectedContent = expectedContent; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + onHandshakeComplete.complete(webSocketFrameFactory); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + CharSequence content = + payload.readCharSequence(payload.readableBytes(), StandardCharsets.UTF_8); + + int expectedSize = receivedFrames; + if (expectedSize != content.length()) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + content.length())); + payload.release(); + return; + } + + for (int i = 0; i < content.length(); i++) { + char ch = content.charAt(i); + char expectedCh = expectedContent; + if (ch != expectedCh) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received text frame with unexpected content: " + + ch + + ", expected: " + + expectedCh)); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture onFrameExchangeCompleted() { + return onFrameExchangeComplete; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + } + + static class FragmentationFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + + FragmentationFramesTestClientHandler(int frameSize) { + this.frameSize = frameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + inboundFrame = new CompositeByteBuf(ctx.alloc(), true, 3); + return this; + } + + CompositeByteBuf inboundFrame; + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + + CompositeByteBuf inbound = inboundFrame; + /*first*/ + if (inbound.numComponents() == 0) { + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + payload.release(); + inbound.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-binary opcode: " + opcode)); + } + if (finalFragment) { + payload.release(); + inbound.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + } + } else { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + inbound.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-continuation frame, opcode: " + opcode)); + } + } + inbound.addComponent(true, payload); + + if (finalFragment) { + int readableBytes = inbound.readableBytes(); + if (readableBytes != frameSize) { + inbound.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "Unexpected inbound frame size: " + readableBytes + ", expected: " + frameSize)); + } + for (int i = 0; i < readableBytes; i++) { + byte b = inbound.readByte(); + if (b != (byte) 0xFE) { + inbound.release(); + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame with unexpected content: " + Long.toHexString(b))); + return; + } + } + onFrameExchangeComplete.complete(null); + inbound.release(); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + ByteBuf binaryFrame = factory.createBinaryFrame(c.alloc(), frameSize); + for (int payloadIdx = 0; payloadIdx < frameSize; payloadIdx++) { + binaryFrame.writeByte(0xFE); + } + c.writeAndFlush(factory.mask(binaryFrame)); + } + } + + static class TextFramesTestServerHandler extends ChannelInboundHandlerAdapter { + final String content; + final int framesCount; + int sentFrames; + + TextFramesTestServerHandler(int maxFrameSize, char content) { + this.framesCount = maxFrameSize; + this.content = String.valueOf(content); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof WebSocketFrame) { + ReferenceCountUtil.release(msg); + return; + } + super.channelRead(ctx, msg); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { + sendFrames(ctx, framesCount - sentFrames); + } + super.userEventTriggered(ctx, evt); + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + boolean pendingFlush = false; + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + int payloadSize = sentFrames; + ByteBuf payload = c.alloc().buffer(payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + payload.writeCharSequence(content, StandardCharsets.UTF_8); + } + TextWebSocketFrame webSocketFrame = new TextWebSocketFrame(payload); + sentFrames++; + if (ch.bytesBeforeUnwritable() < payload.capacity()) { + c.writeAndFlush(webSocketFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(webSocketFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + + static class BinaryFramesTestServerHandler extends ChannelInboundHandlerAdapter { + boolean ready = true; + boolean pendingFlush; + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { + ctx.channel().config().setAutoRead(false); + ctx.read(); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof WebSocketFrame) { + ctx.write(msg); + pendingFlush = true; + if (ready) { + ctx.read(); + } + return; + } + super.channelRead(ctx, msg); + ctx.read(); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + if (!ctx.channel().isWritable()) { + ready = false; + ctx.flush(); + } else { + ready = true; + ctx.read(); + } + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (pendingFlush) { + pendingFlush = false; + ctx.flush(); + } + super.channelReadComplete(ctx); + } + } + + static class PongFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final int framesCount; + final byte content; + WebSocketFrameFactory webSocketFrameFactory; + int receivedFrames; + volatile ChannelHandlerContext ctx; + + PongFramesTestClientHandler(int maxFrameSize, byte content) { + this.framesCount = maxFrameSize; + this.content = content; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.ctx = ctx; + this.webSocketFrameFactory = webSocketFrameFactory; + onHandshakeComplete.complete(webSocketFrameFactory); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_PING) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-ping frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + byte b = payload.readByte(); + if (b != content) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received ping frame with unexpected content: " + + b + + ", expected: " + + Long.toHexString(content))); + payload.release(); + return; + } + } + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c) { + Channel ch = c.channel(); + WebSocketFrameFactory factory = webSocketFrameFactory; + boolean pendingFlush = false; + for (int frameIdx = 0; frameIdx < framesCount; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + ByteBuf pongFrame = factory.createPongFrame(c.alloc(), frameIdx); + for (int payloadIdx = 0; payloadIdx < frameIdx; payloadIdx++) { + pongFrame.writeByte(content); + } + ByteBuf maskedFrame = factory.mask(pongFrame); + if (ch.bytesBeforeUnwritable() < pongFrame.capacity()) { + c.writeAndFlush(maskedFrame, c.voidPromise()); + pendingFlush = false; + } else { + c.write(maskedFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + } + + static class PingFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final int framesCount; + final byte content; + WebSocketFrameFactory webSocketFrameFactory; + int receivedFrames; + volatile ChannelHandlerContext ctx; + + PingFramesTestClientHandler(int maxFrameSize, byte content) { + this.framesCount = maxFrameSize; + this.content = content; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.ctx = ctx; + this.webSocketFrameFactory = webSocketFrameFactory; + onHandshakeComplete.complete(webSocketFrameFactory); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_PONG) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-pong frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + byte b = payload.readByte(); + if (b != content) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received pong frame with unexpected content: " + + b + + ", expected: " + + Long.toHexString(content))); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c) { + Channel ch = c.channel(); + WebSocketFrameFactory factory = webSocketFrameFactory; + boolean pendingFlush = false; + for (int frameIdx = 0; frameIdx < framesCount; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + ByteBuf pingFrame = factory.createPingFrame(c.alloc(), frameIdx); + for (int payloadIdx = 0; payloadIdx < frameIdx; payloadIdx++) { + pingFrame.writeByte(content); + } + ByteBuf maskedFrame = factory.mask(pingFrame); + if (ch.bytesBeforeUnwritable() < pingFrame.capacity()) { + c.writeAndFlush(maskedFrame, c.voidPromise()); + pendingFlush = false; + } else { + c.write(maskedFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + } + + static class CloseFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final WebSocketCloseStatus closeStatus; + final String closeReason; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + + CloseFramesTestClientHandler(WebSocketCloseStatus closeStatus, String closeReason) { + this.closeStatus = closeStatus; + this.closeReason = closeReason; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.ctx = ctx; + this.webSocketFrameFactory = webSocketFrameFactory; + onHandshakeComplete.complete(webSocketFrameFactory); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-close frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + WebSocketCloseStatus status = + WebSocketCloseStatus.valueOf(CloseFramePayload.statusCode(payload)); + String reason = CloseFramePayload.reason(payload); + if (!closeStatus.equals(status) || !closeReason.equals(reason)) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received close frame with unexpected content, status: " + + status + + ", reason: " + + reason)); + } + + payload.release(); + onFrameExchangeComplete.complete(null); + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + ByteBuf closeFrame = factory.createCloseFrame(c.alloc(), closeStatus.code(), closeReason); + ctx.writeAndFlush(factory.mask(closeFrame)); + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + } + + static class PingPongTestServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof PingWebSocketFrame) { + PingWebSocketFrame pingFrame = (PingWebSocketFrame) msg; + ctx.writeAndFlush(new PongWebSocketFrame(pingFrame.content().retain())); + pingFrame.release(); + } else if (msg instanceof PongWebSocketFrame) { + PongWebSocketFrame pongFrame = (PongWebSocketFrame) msg; + ctx.writeAndFlush(new PingWebSocketFrame(pongFrame.content().retain())); + pongFrame.release(); + } else { + super.channelRead(ctx, msg); + } + } + } + + static class CloseTestServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof CloseWebSocketFrame) { + CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) msg; + ctx.writeAndFlush( + new CloseWebSocketFrame( + closeWebSocketFrame.statusCode(), closeWebSocketFrame.reasonText())); + closeWebSocketFrame.release(); + } else { + super.channelRead(ctx, msg); + } + } + } + + static class FragmentTestServerHandler extends ChannelInboundHandlerAdapter { + private final int fragmentSize; + + FragmentTestServerHandler(int fragmentSize) { + this.fragmentSize = fragmentSize; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof BinaryWebSocketFrame) { + BinaryWebSocketFrame webSocketFrame = (BinaryWebSocketFrame) msg; + ByteBuf content = webSocketFrame.content(); + ByteBuf fragmentContent = content.readRetainedSlice(fragmentSize); + ctx.write(new BinaryWebSocketFrame(false, 0, fragmentContent)); + while (content.readableBytes() > fragmentSize) { + ctx.write( + new ContinuationWebSocketFrame(false, 0, content.readRetainedSlice(fragmentSize))); + } + ctx.writeAndFlush( + new ContinuationWebSocketFrame( + true, 0, content.readRetainedSlice(content.readableBytes()))); + webSocketFrame.release(); + } else { + super.channelRead(ctx, msg); + } + } + } +} From 788c036849d97c433626fd840e5aa61b5230765d Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 3 Jan 2023 10:46:44 +0200 Subject: [PATCH 3/3] readme --- README.md | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 183 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d7dfc92..419eaec 100644 --- a/README.md +++ b/README.md @@ -1 +1,183 @@ -# netty-websocket-http1 \ No newline at end of file +# netty-websocket-http1 + +Alternative Netty implementation of [RFC6455](https://tools.ietf.org/html/rfc6455) - the WebSocket protocol. + +Its advantage is significant per-core throughput improvement (1.8 - 2x) for small frames in comparison to netty's out-of-the-box +websocket codecs, and minimal heap allocations on frame path. Library is compatible with +[netty-websocket-http2](https://github.com/jauntsdn/netty-websocket-http2). + +### use case & scope + +* Intended for efficiently encoded, dense binary data: no extensions (compression) support / outbound text frames / inbound +utf8 validation. + +* Library assumes small frames - many have payload <= 125 bytes, most are < 1500, maximum supported is 65k (65535 bytes). + +* Just codec - fragments, pings, close frames are decoded & validated only. It is responsibility of user code +to handle frames according to protocol (reassemble frame fragments, perform graceful close, +respond to pings). + +* Dedicated decoder for case of exchanging tiny messages over TLS connection: +only non-masked frames with <= 125 bytes of payload for minimal per-webSocket state (memory) overhead. + +* No per-frame heap allocations in websocket frameFactory / decoder. + +* Single-threaded (transport IO event-loop) callbacks / frame factory API - +in practice user code has its own message types to carry data, external means (e.g. mpsc / spsc queues) may be used to +properly publish messages on eventloop thread. + +### performance + +Per-core throughput [this codec perf-test](https://github.com/jauntsdn/netty-websocket-http1/tree/develop/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest), +[netty built-in codec perf-test](https://github.com/jauntsdn/netty-websocket-http1/tree/netty-codec/netty-builtin-websocket-perftest/src/main/java/io/netty/handler/codec/http/websocketx/perftest) +comparison with netty's out-of-the-box websocket handlers: +non-masked frames with 8, 64, 125, 1000 bytes of payload over encrypted/non-encrypted connection. + +``` +./gradlew clean build installDist +./perf_server_run.sh +./perf_client_run.sh +``` + +* non-encrypted + +| payload size | this codec, million messages | netty's codec, million messages | +| :--- | :---: | :---: | +| 8 | 2.45 | 1.35 | +| 64 | 2.35 | 1.25 | +| 125 | 2.3 | 1.15 | +| 1000 | 1.15 | 0.64 | + +* encrypted + +| payload size | this codec, million messages | netty's codec, million messages | +| :--- | :---: | :---: | +| 8 | 2.8 | 1.45 | +| 64 | 2.3 | 1.2 | +| 125 | 1.9 | 1.1 | +| 1000 | 0.52| 0.35 | + +### websocket-http2 + +Library may be combined with [jauntsdn/websocket-http2](https://github.com/jauntsdn/netty-websocket-http2) using [http1 codec](https://github.com/jauntsdn/netty-websocket-http2/blob/develop/netty-websocket-http2/src/main/java/com/jauntsdn/netty/handler/codec/http2/websocketx/Http1WebSocketCodec.java) + +for significantly improved per-core throughput [this codec perf-test](), [netty built-in codec perf-test](): +for 8, 125, 1000 byte payload frames over encrypted connection results are as follows: + +| payload size | this codec, million msgs | netty's codec, million msgs | +| :---: | :---: | :---: | +| 8 | 0.93 | 0.56 | +| 125 | 0.74 | 0.464 | +| 1000 | 0.275 | 0.211 | + +### frameFactory / callbacks API + +[WebSocketFrameFactory](https://github.com/jauntsdn/netty-websocket-http1/blob/develop/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java) +to create outbound frames. It is library user responsibility to mask outbound frame once payload is written +`ByteBuf WebSocketFrameFactory.mask(ByteBuf)` + +```java +public interface WebSocketFrameFactory { + + ByteBuf createBinaryFrame(ByteBufAllocator allocator, int binaryDataSize); + // create*Frame are omitted for control frames, created in similar fashion + + ByteBuf mask(ByteBuf frame); +} +``` + +[WebSocketFrameListener](https://github.com/jauntsdn/netty-websocket-http1/blob/develop/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java) +to receive inbound frames + +```java +public interface WebSocketFrameListener { + + void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload); + + // netty handler callbacks are omitted for brevity + + // lifecycle + default void onOpen(ChannelHandlerContext ctx) {} + + default void onClose(ChannelHandlerContext ctx) {} +} +``` + +[WebSocketCallbacksHandler](https://github.com/jauntsdn/netty-websocket-http1/blob/develop/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCallbacksHandler.java) +to exchange `WebSocketFrameListener` for `WebSocketFrameFactory` on successful websocket handshake + +```java +public interface WebSocketCallbacksHandler { + + WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory); +} +``` + +### configuration + +#### default messages decoder + +Always expects masked frames, allows mask mismatch [test example](https://github.com/jauntsdn/netty-websocket-http1/blob/bc942b19958c1486ef7414bee9c69ef36a55bfa5/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java#L121) + +server: `allowMaskMismatch: true, maxFramePayloadLength: 65_535` + +client: `allowMaskMismatch: true, maxFramePayloadLength: 65_535` + +#### small messages decoder + +Always expects non-masking frames, disallows mask mismatch [test example](https://github.com/jauntsdn/netty-websocket-http1/blob/bc942b19958c1486ef7414bee9c69ef36a55bfa5/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java#L140): + +server: `expectMasked: false, allowMaskMismatch: false, maxFramePayloadLength: 125` + +client: `mask: false, allowMaskMismatch: false, maxFramePayloadLength: 125` + +### testing + +* WebSocket frames [integration test](https://github.com/jauntsdn/netty-websocket-http1/blob/develop/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java): +control & data frames of all allowed sizes, jauntsdn/netty-websocket-http1 client, netty websocket server. + +* WebSocket frames long-running [soak test](https://github.com/jauntsdn/netty-websocket-http1/tree/develop/netty-websocket-http1-soaktest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/soaktest): +exercising all logic paths of codec with 100m of randomized frames over multiple connections: netty websocket client, jauntsdn/netty-websocket-http1 server. + +* [Perf test](https://github.com/jauntsdn/netty-websocket-http1/tree/develop/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest): +per-core throughput of jauntsdn/netty-websocket-http1 client & server. + +### examples + +`netty-websocket-http1-perftest` may serve as API showcase for both [client](https://github.com/jauntsdn/netty-websocket-http1/blob/develop/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java) +and [server](https://github.com/jauntsdn/netty-websocket-http1/blob/develop/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java): + +### build & binaries + +``` +./gradlew +``` + +Releases are published on MavenCentral +```groovy +repositories { + mavenCentral() +} + +dependencies { + implementation "com.jauntsdn.netty:netty-websocket-http1:" +} +``` + +## LICENSE + +Copyright 2022-Present Maksym Ostroverkhov. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file