diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index fbea04c..4b80974 100644 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: - os: [ ubuntu-20.04, macos-11, windows-2019 ] + os: [ ubuntu-20.04, macos-12, windows-2019 ] jdk: [ 8, 11, 17, 21 ] fail-fast: false diff --git a/README.md b/README.md index 0c837e2..caf9744 100644 --- a/README.md +++ b/README.md @@ -5,30 +5,33 @@ Alternative Netty implementation of [RFC6455](https://tools.ietf.org/html/rfc6455) - the WebSocket protocol. -Its advantage is significant per-core throughput improvement (1.8 - 2x) for small frames in comparison to netty's out-of-the-box -websocket codecs, and minimal heap allocations on frame path. Library is compatible with +Its advantages are significant per-core throughput improvement (1.8 - 2x) for small frames compared to netty's out-of-the-box +websocket codecs, minimal heap allocations on frame path, and compatibility with [netty-websocket-http2](https://github.com/jauntsdn/netty-websocket-http2). ### use case & scope -* Intended for efficiently encoded, dense binary data: no extensions (compression) support / outbound text frames / inbound -utf8 validation. +* Intended for dense binary data & small text messages: no extensions (compression) support. + +* No per-frame heap allocations in websocket frameFactory / decoder. * Library assumes small frames - many have payload <= 125 bytes, most are < 1500, maximum supported is 65k (65535 bytes). -* Just codec - fragments, pings, close frames are decoded & validated only. It is responsibility of user code +* Just codec - fragments, pings, close frames are decoded & protocol validated only. It is responsibility of user code to handle frames according to protocol (reassemble frame fragments, perform graceful close, -respond to pings). - -* Dedicated decoder for case of exchanging tiny messages over TLS connection: -only non-masked frames with <= 125 bytes of payload for minimal per-webSocket state (memory) overhead. - -* No per-frame heap allocations in websocket frameFactory / decoder. +respond to pings) and do utf8 validation of inbound text frames ([utility](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java#L81) is provided). * Single-threaded (transport IO event-loop) callbacks / frame factory API - -in practice user code has its own message types to carry data, external means (e.g. mpsc / spsc queues) may be used to +in practice user code has its own message types to carry data, and external means (e.g. mpsc / spsc queues) may be used to properly publish messages on eventloop thread. +* On encoder side 3 use cases are supported: frame factory [[1]](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java#L1475) (create bytebuffer and encode frame prefix), +frame encoder [[2]](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java#L1019) (encode frame prefix into provided bytebuffer), +frame bulk-encoder [[3]](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java#L707) (much more performant - encode multiple frames into provided bytebuffer). + +* Dedicated decoder for case of exchanging tiny messages over TLS connection: +only non-masked frames with <= 125 bytes of payload for minimal per-webSocket state (memory) overhead. + ### performance Per-core throughput [this codec perf-test](https://github.com/jauntsdn/netty-websocket-http1/tree/develop/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest), @@ -77,6 +80,11 @@ to create outbound frames. It is library user responsibility to mask outbound fr public interface WebSocketFrameFactory { ByteBuf createBinaryFrame(ByteBufAllocator allocator, int binaryDataSize); + + // ByteBuf createTextFrame(ByteBufAllocator allocator, int binaryDataSize); + + // ByteBuf create*Fragment*(ByteBufAllocator allocator, int textDataSize); + // create*Frame are omitted for control frames, created in similar fashion ByteBuf mask(ByteBuf frame); @@ -159,7 +167,7 @@ repositories { } dependencies { - implementation "com.jauntsdn.netty:netty-websocket-http1:1.1.3" + implementation "com.jauntsdn.netty:netty-websocket-http1:1.1.4" } ``` diff --git a/gradle.properties b/gradle.properties index 06fc45d..40aa9bb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group=com.jauntsdn.netty -version=1.1.4 +version=1.2.0 googleJavaFormatPluginVersion=0.9 dependencyManagementPluginVersion=1.1.0 @@ -7,7 +7,7 @@ gitPluginVersion=0.13.0 osDetectorPluginVersion=1.7.3 versionsPluginVersion=0.45.0 -nettyVersion=4.1.109.Final +nettyVersion=4.1.112.Final nettyTcnativeVersion=2.0.65.Final hdrHistogramVersion=2.1.12 slf4jVersion=1.7.36 diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java index e6289b4..6359436 100644 --- a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java @@ -52,6 +52,8 @@ public static void main(String[] args) throws Exception { boolean isNativeTransport = Boolean.parseBoolean(System.getProperty("NATIVE_TRANSPORT", "true")); boolean isEncrypted = Boolean.parseBoolean(System.getProperty("ENCRYPT", "true")); + String keyStoreFile = System.getProperty("KEYSTORE", "localhost.p12"); + String keyStorePassword = System.getProperty("KEYSTORE_PASS", "localhost"); boolean isOpensslAvailable = OpenSsl.isAvailable(); boolean isEpollAvailable = Transport.isEpollAvailable(); @@ -67,7 +69,8 @@ public static void main(String[] args) throws Exception { Transport transport = Transport.get(isNativeTransport); logger.info("\n==> io transport: {}", transport.type()); - SslContext sslContext = isEncrypted ? Security.serverSslContext() : null; + SslContext sslContext = + isEncrypted ? Security.serverSslContext(keyStoreFile, keyStorePassword) : null; ServerBootstrap bootstrap = new ServerBootstrap(); Channel server = diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java index e029068..667450f 100644 --- a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java @@ -52,6 +52,8 @@ public static void main(String[] args) throws Exception { boolean isNativeTransport = Boolean.parseBoolean(System.getProperty("NATIVE_TRANSPORT", "true")); boolean isEncrypted = Boolean.parseBoolean(System.getProperty("ENCRYPT", "true")); + String keyStoreFile = System.getProperty("KEYSTORE", "localhost.p12"); + String keyStorePassword = System.getProperty("KEYSTORE_PASS", "localhost"); boolean isOpensslAvailable = OpenSsl.isAvailable(); boolean isEpollAvailable = Transport.isEpollAvailable(); @@ -67,7 +69,8 @@ public static void main(String[] args) throws Exception { Transport transport = Transport.get(isNativeTransport); logger.info("\n==> io transport: {}", transport.type()); - SslContext sslContext = isEncrypted ? Security.serverSslContext() : null; + SslContext sslContext = + isEncrypted ? Security.serverSslContext(keyStoreFile, keyStorePassword) : null; ServerBootstrap bootstrap = new ServerBootstrap(); Channel server = diff --git a/netty-websocket-http1-perftest/src/main/resources/localhost.p12 b/netty-websocket-http1-perftest/src/main/resources/localhost.p12 new file mode 100644 index 0000000..3d07e30 Binary files /dev/null and b/netty-websocket-http1-perftest/src/main/resources/localhost.p12 differ diff --git a/netty-websocket-http1-soaktest/build.gradle b/netty-websocket-http1-soaktest/build.gradle index e8e5d8a..83523b1 100644 --- a/netty-websocket-http1-soaktest/build.gradle +++ b/netty-websocket-http1-soaktest/build.gradle @@ -33,16 +33,6 @@ dependencies { runtimeOnly "ch.qos.logback:logback-classic" } -task runServer(type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.soaktest.server.Main" -} - -task runClient(type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.soaktest.client.Main" -} - task serverScripts(type: CreateStartScripts) { mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.soaktest.server.Main" applicationName = "${project.name}-server" diff --git a/netty-websocket-http1-soaktest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/soaktest/server/Main.java b/netty-websocket-http1-soaktest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/soaktest/server/Main.java index dbfe6e8..c1df42a 100644 --- a/netty-websocket-http1-soaktest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/soaktest/server/Main.java +++ b/netty-websocket-http1-soaktest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/soaktest/server/Main.java @@ -54,6 +54,8 @@ public static void main(String[] args) throws Exception { int frameSizeLimit = Integer.parseInt(System.getProperty("SIZE", "65535")); boolean expectMasked = Boolean.parseBoolean(System.getProperty("MASKED", "false")); boolean maskMismatch = !Boolean.parseBoolean(System.getProperty("STRICT", "false")); + String keyStoreFile = System.getProperty("KEYSTORE", "localhost.p12"); + String keyStorePassword = System.getProperty("KEYSTORE_PASS", "localhost"); boolean isOpensslAvailable = OpenSsl.isAvailable(); boolean isEpollAvailable = Transport.isEpollAvailable(); @@ -67,7 +69,7 @@ public static void main(String[] args) throws Exception { Transport transport = Transport.get(/*native IO*/ true); logger.info("\n==> io transport: {}", transport.type()); - SslContext sslContext = Security.serverSslContext(); + SslContext sslContext = Security.serverSslContext(keyStoreFile, keyStorePassword); ServerBootstrap bootstrap = new ServerBootstrap(); Channel server = diff --git a/netty-websocket-http1-soaktest/src/main/resources/localhost.p12 b/netty-websocket-http1-soaktest/src/main/resources/localhost.p12 new file mode 100644 index 0000000..3d07e30 Binary files /dev/null and b/netty-websocket-http1-soaktest/src/main/resources/localhost.p12 differ diff --git a/netty-websocket-http1-test/gradle.lockfile b/netty-websocket-http1-test/gradle.lockfile index 66f1289..e24ff6c 100644 --- a/netty-websocket-http1-test/gradle.lockfile +++ b/netty-websocket-http1-test/gradle.lockfile @@ -9,16 +9,16 @@ com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 com.google.guava:guava:22.0=googleJavaFormat1.6 com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 -io.netty:netty-buffer:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec-http:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-classes-epoll:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-classes-kqueue:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-native-unix-common:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec-http:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-epoll:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-kqueue:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-native-unix-common:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.14.11=testCompileClasspath,testRuntimeClasspath org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath org.assertj:assertj-core:3.25.3=testCompileClasspath,testRuntimeClasspath diff --git a/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java b/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java index c3a8956..aa3422c 100644 --- a/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java +++ b/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java @@ -22,10 +22,11 @@ import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SupportedCipherSuiteFilter; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.netty.handler.ssl.util.SelfSignedCertificate; -import java.security.SecureRandom; +import java.io.InputStream; +import java.security.KeyStore; import java.util.Arrays; import java.util.List; +import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +34,16 @@ public final class Security { private static final Logger logger = LoggerFactory.getLogger(Security.class); - public static SslContext serverSslContext() throws Exception { - SecureRandom random = new SecureRandom(); - SelfSignedCertificate ssc = new SelfSignedCertificate("com.jauntsdn", random, 1024); + public static SslContext serverSslContext(String keystoreFile, String keystorePassword) + throws Exception { + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + InputStream keystoreStream = Security.class.getClassLoader().getResourceAsStream(keystoreFile); + char[] keystorePasswordArray = keystorePassword.toCharArray(); + keyStore.load(keystoreStream, keystorePasswordArray); + keyManagerFactory.init(keyStore, keystorePasswordArray); - return SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + return SslContextBuilder.forServer(keyManagerFactory) .protocols("TLSv1.3") .sslProvider(sslProvider()) .ciphers(supportedCypherSuites(), SupportedCipherSuiteFilter.INSTANCE) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java index e4a4dd7..4ee69c0 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java @@ -84,7 +84,7 @@ void tearDown() { @ParameterizedTest void binaryFramesEncoder(boolean mask) throws Exception { int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); BinaryFramesEncoderClientHandler clientHandler = new BinaryFramesEncoderClientHandler(maxFrameSize); Channel client = @@ -98,12 +98,31 @@ void binaryFramesEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesEncoder(boolean mask) throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + TextFramesEncoderClientHandler clientHandler = + new TextFramesEncoderClientHandler(maxFrameSize, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory.Encoder encoder = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(encoder).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @ValueSource(booleans = {true, false}) @ParameterizedTest void binaryFramesBulkEncoder(boolean mask) throws Exception { int maxFrameSize = 1000; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); BinaryFramesEncoderClientBulkHandler clientHandler = new BinaryFramesEncoderClientBulkHandler(maxFrameSize); Channel client = @@ -117,6 +136,116 @@ void binaryFramesBulkEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesBulkEncoder(boolean mask) throws Exception { + int maxFrameSize = 1000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + TextFramesEncoderClientBulkHandler clientHandler = + new TextFramesEncoderClientBulkHandler(maxFrameSize, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory.BulkEncoder encoder = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(encoder).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesFragmentsEncoder(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundBinaryFragmentationEncoderClientHandler clientHandler = + new OutboundBinaryFragmentationEncoderClientHandler(maxFrameSize / 3 - 1); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesFragmentsFactory(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundBinaryFragmentationClientHandler clientHandler = + new OutboundBinaryFragmentationClientHandler(maxFrameSize / 3 - 1); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFragmentsEncoder(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundTextFragmentationEncoderClientHandler clientHandler = + new OutboundTextFragmentationEncoderClientHandler(maxFrameSize / 3 - 1, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFragmentsFactory(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundTextFragmentationClientHandler clientHandler = + new OutboundTextFragmentationClientHandler(maxFrameSize / 3 - 1, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFactory(boolean mask) throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + TextFramesFactoryClientHandler clientHandler = + new TextFramesFactoryClientHandler(maxFrameSize, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory frameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(frameFactory).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @MethodSource("maskingArgs") @ParameterizedTest @@ -124,7 +253,7 @@ void allSizeBinaryFramesDefaultDecoder( boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) throws Exception { int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); BinaryFramesTestClientHandler clientHandler = new BinaryFramesTestClientHandler(maxFrameSize); Channel client = webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); @@ -142,7 +271,7 @@ void allSizeBinaryFramesDefaultDecoder( @Test void binaryFramesSmallDecoder() throws Exception { int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), false, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), false, false); BinaryFramesTestClientHandler clientHandler = new BinaryFramesTestClientHandler(maxFrameSize); Channel client = webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); @@ -336,8 +465,8 @@ void fragmentDefaultDecoder( int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; Channel s = server = nettyServer(new FragmentTestServerHandler(1500), mask, false); - FragmentationFramesTestClientHandler clientHandler = - new FragmentationFramesTestClientHandler(3333); + InboundFragmentationFramesTestClientHandler clientHandler = + new InboundFragmentationFramesTestClientHandler(3333); Channel client = webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); @@ -356,8 +485,8 @@ void fragmentSmallDecoder() throws Exception { int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; Channel s = server = nettyServer(new FragmentTestServerHandler(33), false, false); - FragmentationFramesTestClientHandler clientHandler = - new FragmentationFramesTestClientHandler(70); + InboundFragmentationFramesTestClientHandler clientHandler = + new InboundFragmentationFramesTestClientHandler(70); Channel client = webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); @@ -450,7 +579,7 @@ protected void initChannel(SocketChannel ch) { WebSocketDecoderConfig.newBuilder() .expectMaskedFrames(expectMaskedFrames) .allowMaskMismatch(allowMaskMismatch) - .withUTF8Validator(false) + .withUTF8Validator(true) .allowExtensions(false) .maxFramePayloadLength(65535) .build(); @@ -610,25 +739,28 @@ private void sendFrames(ChannelHandlerContext c, int toSend) { } } - static class BinaryFramesEncoderClientHandler + static class TextFramesEncoderClientBulkHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { - private final CompletableFuture onHandshakeComplete = + private final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); - private WebSocketFrameFactory.Encoder binaryFrameEncoder; private final int framesCount; + private final char expectedAsciiChar; + private WebSocketFrameFactory.BulkEncoder textFrameEncoder; private int receivedFrames; private int sentFrames; + private ByteBuf outBuffer; private volatile ChannelHandlerContext ctx; - BinaryFramesEncoderClientHandler(int maxFrameSize) { + TextFramesEncoderClientBulkHandler(int maxFrameSize, char expectedAsciiChar) { this.framesCount = maxFrameSize; + this.expectedAsciiChar = expectedAsciiChar; } @Override public WebSocketFrameListener exchange( ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { - this.binaryFrameEncoder = webSocketFrameFactory.encoder(); + this.textFrameEncoder = webSocketFrameFactory.bulkEncoder(); return this; } @@ -647,9 +779,9 @@ public void onChannelRead( payload.release(); return; } - if (opcode != WebSocketProtocol.OPCODE_BINARY) { + if (opcode != WebSocketProtocol.OPCODE_TEXT) { onFrameExchangeComplete.completeExceptionally( - new AssertionError("received non-binary frame: " + Long.toHexString(opcode))); + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); payload.release(); return; } @@ -669,10 +801,14 @@ public void onChannelRead( } for (int i = 0; i < readableBytes; i++) { - byte b = payload.readByte(); - if (b != (byte) 0xFE) { + char ch = (char) payload.readByte(); + if (ch != expectedAsciiChar) { onFrameExchangeComplete.completeExceptionally( - new AssertionError("received frame with unexpected content: " + Long.toHexString(b))); + new AssertionError( + "received frame with unexpected content: " + + ch + + ", expected: " + + expectedAsciiChar)); payload.release(); return; } @@ -683,25 +819,21 @@ public void onChannelRead( } } - @Override - public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { - boolean writable = ctx.channel().isWritable(); - if (sentFrames > 0 && writable) { - int toSend = framesCount - sentFrames; - if (toSend > 0) { - sendFrames(ctx, toSend); - } - } - } - @Override public void onOpen(ChannelHandlerContext ctx) { this.ctx = ctx; - onHandshakeComplete.complete(binaryFrameEncoder); + int bufferSize = 4 * framesCount; + this.outBuffer = ctx.alloc().buffer(bufferSize, bufferSize); + onHandshakeComplete.complete(textFrameEncoder); } @Override public void onClose(ChannelHandlerContext ctx) { + ByteBuf out = outBuffer; + if (out != null) { + outBuffer = null; + out.release(); + } if (!onFrameExchangeComplete.isDone()) { onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); } @@ -714,7 +846,7 @@ public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } - CompletableFuture onHandshakeCompleted() { + CompletableFuture onHandshakeCompleted() { return onHandshakeComplete; } @@ -725,59 +857,60 @@ CompletableFuture startFramesExchange() { } private void sendFrames(ChannelHandlerContext c, int toSend) { - Channel ch = c.channel(); - WebSocketFrameFactory.Encoder frameEncoder = binaryFrameEncoder; - boolean pendingFlush = false; - ByteBufAllocator allocator = c.alloc(); + WebSocketFrameFactory.BulkEncoder frameEncoder = textFrameEncoder; for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { if (!c.channel().isOpen()) { return; } int payloadSize = sentFrames; - int frameSize = frameEncoder.sizeofBinaryFrame(payloadSize); - ByteBuf binaryFrame = allocator.buffer(frameSize); - binaryFrame.writerIndex(frameSize - payloadSize); + int frameSize = frameEncoder.sizeofTextFrame(payloadSize); + ByteBuf out = outBuffer; + if (frameSize > out.capacity() - out.writerIndex()) { + int readableBytes = out.readableBytes(); + int bufferSize = 4 * framesCount; + outBuffer = c.alloc().buffer(bufferSize, bufferSize); + if (c.channel().bytesBeforeUnwritable() < readableBytes) { + c.writeAndFlush(out, c.voidPromise()); + } else { + c.write(out, c.voidPromise()); + } + out = outBuffer; + } + int mask = frameEncoder.encodeTextFramePrefix(out, payloadSize); for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { - binaryFrame.writeByte(0xFE); + out.writeByte(expectedAsciiChar); } - ByteBuf maskedBinaryFrame = frameEncoder.encodeBinaryFrame(binaryFrame); + frameEncoder.maskTextFrame(out, mask, payloadSize); sentFrames++; - if (ch.bytesBeforeUnwritable() < binaryFrame.capacity()) { - c.writeAndFlush(maskedBinaryFrame, c.voidPromise()); - pendingFlush = false; - if (!ch.isWritable()) { - return; - } - } else { - c.write(maskedBinaryFrame, c.voidPromise()); - pendingFlush = true; - } } - if (pendingFlush) { + ByteBuf out = outBuffer; + if (out.readableBytes() > 0) { + c.writeAndFlush(out, c.voidPromise()); + } else { c.flush(); } } } - static class BinaryFramesTestClientHandler + static class BinaryFramesEncoderClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { - private final CompletableFuture onHandshakeComplete = + private final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); - private WebSocketFrameFactory webSocketFrameFactory; + private WebSocketFrameFactory.Encoder binaryFrameEncoder; private final int framesCount; private int receivedFrames; private int sentFrames; private volatile ChannelHandlerContext ctx; - BinaryFramesTestClientHandler(int maxFrameSize) { + BinaryFramesEncoderClientHandler(int maxFrameSize) { this.framesCount = maxFrameSize; } @Override public WebSocketFrameListener exchange( ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { - this.webSocketFrameFactory = webSocketFrameFactory; + this.binaryFrameEncoder = webSocketFrameFactory.encoder(); return this; } @@ -846,7 +979,7 @@ public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { @Override public void onOpen(ChannelHandlerContext ctx) { this.ctx = ctx; - onHandshakeComplete.complete(webSocketFrameFactory); + onHandshakeComplete.complete(binaryFrameEncoder); } @Override @@ -863,7 +996,7 @@ public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } - CompletableFuture onHandshakeCompleted() { + CompletableFuture onHandshakeCompleted() { return onHandshakeComplete; } @@ -875,18 +1008,21 @@ CompletableFuture startFramesExchange() { private void sendFrames(ChannelHandlerContext c, int toSend) { Channel ch = c.channel(); - WebSocketFrameFactory factory = webSocketFrameFactory; + WebSocketFrameFactory.Encoder frameEncoder = binaryFrameEncoder; boolean pendingFlush = false; + ByteBufAllocator allocator = c.alloc(); for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { if (!c.channel().isOpen()) { return; } - int frameSize = sentFrames; - ByteBuf binaryFrame = factory.createBinaryFrame(c.alloc(), frameSize); - for (int payloadIdx = 0; payloadIdx < frameSize; payloadIdx++) { + int payloadSize = sentFrames; + int frameSize = frameEncoder.sizeofBinaryFrame(payloadSize); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { binaryFrame.writeByte(0xFE); } - ByteBuf maskedBinaryFrame = factory.mask(binaryFrame); + ByteBuf maskedBinaryFrame = frameEncoder.encodeBinaryFrame(binaryFrame); sentFrames++; if (ch.bytesBeforeUnwritable() < binaryFrame.capacity()) { c.writeAndFlush(maskedBinaryFrame, c.voidPromise()); @@ -905,26 +1041,27 @@ private void sendFrames(ChannelHandlerContext c, int toSend) { } } - static class TextFramesTestClientHandler + static class TextFramesEncoderClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { - final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); - final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); - final int framesCount; - final char expectedContent; - WebSocketFrameFactory webSocketFrameFactory; - int receivedFrames; - volatile ChannelHandlerContext ctx; + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory.Encoder textFrameEncoder; + private final int framesCount; + private final char expectedAsciiChar; + private int receivedFrames; + private int sentFrames; + private volatile ChannelHandlerContext ctx; - TextFramesTestClientHandler(int maxFrameSize, char expectedContent) { + TextFramesEncoderClientHandler(int maxFrameSize, char expectedAsciiChar) { this.framesCount = maxFrameSize; - this.expectedContent = expectedContent; + this.expectedAsciiChar = expectedAsciiChar; } @Override public WebSocketFrameListener exchange( ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { - this.webSocketFrameFactory = webSocketFrameFactory; - onHandshakeComplete.complete(webSocketFrameFactory); + this.textFrameEncoder = webSocketFrameFactory.encoder(); return this; } @@ -950,31 +1087,29 @@ public void onChannelRead( return; } - CharSequence content = - payload.readCharSequence(payload.readableBytes(), StandardCharsets.UTF_8); + int readableBytes = payload.readableBytes(); int expectedSize = receivedFrames; - if (expectedSize != content.length()) { + if (expectedSize != readableBytes) { onFrameExchangeComplete.completeExceptionally( new AssertionError( "received frame of unexpected size: " + expectedSize + ", actual: " - + content.length())); + + readableBytes)); payload.release(); return; } - for (int i = 0; i < content.length(); i++) { - char ch = content.charAt(i); - char expectedCh = expectedContent; - if (ch != expectedCh) { + for (int i = 0; i < readableBytes; i++) { + char ch = (char) payload.readByte(); + if (ch != expectedAsciiChar) { onFrameExchangeComplete.completeExceptionally( new AssertionError( - "received text frame with unexpected content: " + "received frame with unexpected content: " + ch + ", expected: " - + expectedCh)); + + expectedAsciiChar)); payload.release(); return; } @@ -985,35 +1120,490 @@ public void onChannelRead( } } - CompletableFuture onHandshakeCompleted() { - return onHandshakeComplete; - } - - CompletableFuture onFrameExchangeCompleted() { - return onFrameExchangeComplete; + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } } @Override public void onOpen(ChannelHandlerContext ctx) { this.ctx = ctx; + onHandshakeComplete.complete(textFrameEncoder); } - } - - static class FragmentationFramesTestClientHandler - implements WebSocketCallbacksHandler, WebSocketFrameListener { - final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); - final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); - final int frameSize; - WebSocketFrameFactory webSocketFrameFactory; - volatile ChannelHandlerContext ctx; - FragmentationFramesTestClientHandler(int frameSize) { - this.frameSize = frameSize; + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } } @Override - public WebSocketFrameListener exchange( - ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + WebSocketFrameFactory.Encoder frameEncoder = textFrameEncoder; + boolean pendingFlush = false; + ByteBufAllocator allocator = c.alloc(); + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int payloadSize = sentFrames; + int frameSize = frameEncoder.sizeofTextFrame(payloadSize); + ByteBuf textFrame = allocator.buffer(frameSize); + textFrame.writerIndex(frameSize - payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + textFrame.writeByte(expectedAsciiChar); + } + ByteBuf maskedTextFrame = frameEncoder.encodeTextFrame(textFrame); + sentFrames++; + if (ch.bytesBeforeUnwritable() < textFrame.capacity()) { + c.writeAndFlush(maskedTextFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(maskedTextFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + + static class TextFramesFactoryClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory frameFactory; + private final int framesCount; + private final char expectedAsciiChar; + private int receivedFrames; + private int sentFrames; + private volatile ChannelHandlerContext ctx; + + TextFramesFactoryClientHandler(int maxFrameSize, char expectedAsciiChar) { + this.framesCount = maxFrameSize; + this.expectedAsciiChar = expectedAsciiChar; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.frameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + char ch = (char) payload.readByte(); + if (ch != expectedAsciiChar) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame with unexpected content: " + + ch + + ", expected: " + + expectedAsciiChar)); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(frameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + WebSocketFrameFactory factory = frameFactory; + boolean pendingFlush = false; + ByteBufAllocator allocator = c.alloc(); + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int payloadSize = sentFrames; + ByteBuf textFrame = factory.createTextFrame(allocator, payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + textFrame.writeByte(expectedAsciiChar); + } + ByteBuf maskedTextFrame = factory.mask(textFrame); + sentFrames++; + if (ch.bytesBeforeUnwritable() < textFrame.capacity()) { + c.writeAndFlush(maskedTextFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(maskedTextFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + + static class BinaryFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory webSocketFrameFactory; + private final int framesCount; + private int receivedFrames; + private int sentFrames; + private volatile ChannelHandlerContext ctx; + + BinaryFramesTestClientHandler(int maxFrameSize) { + this.framesCount = maxFrameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-binary frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + byte b = payload.readByte(); + if (b != (byte) 0xFE) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with unexpected content: " + Long.toHexString(b))); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + WebSocketFrameFactory factory = webSocketFrameFactory; + boolean pendingFlush = false; + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int frameSize = sentFrames; + ByteBuf binaryFrame = factory.createBinaryFrame(c.alloc(), frameSize); + for (int payloadIdx = 0; payloadIdx < frameSize; payloadIdx++) { + binaryFrame.writeByte(0xFE); + } + ByteBuf maskedBinaryFrame = factory.mask(binaryFrame); + sentFrames++; + if (ch.bytesBeforeUnwritable() < binaryFrame.capacity()) { + c.writeAndFlush(maskedBinaryFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(maskedBinaryFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + + static class TextFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final int framesCount; + final char expectedContent; + WebSocketFrameFactory webSocketFrameFactory; + int receivedFrames; + volatile ChannelHandlerContext ctx; + + TextFramesTestClientHandler(int maxFrameSize, char expectedContent) { + this.framesCount = maxFrameSize; + this.expectedContent = expectedContent; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + onHandshakeComplete.complete(webSocketFrameFactory); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + CharSequence content = + payload.readCharSequence(payload.readableBytes(), StandardCharsets.UTF_8); + + int expectedSize = receivedFrames; + if (expectedSize != content.length()) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + content.length())); + payload.release(); + return; + } + + for (int i = 0; i < content.length(); i++) { + char ch = content.charAt(i); + char expectedCh = expectedContent; + if (ch != expectedCh) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received text frame with unexpected content: " + + ch + + ", expected: " + + expectedCh)); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture onFrameExchangeCompleted() { + return onFrameExchangeComplete; + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + } + + static class InboundFragmentationFramesTestClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + + InboundFragmentationFramesTestClientHandler(int frameSize) { + this.frameSize = frameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { this.webSocketFrameFactory = webSocketFrameFactory; inboundFrame = new CompositeByteBuf(ctx.alloc(), true, 3); return this; @@ -1120,6 +1710,605 @@ private void sendFrames(ChannelHandlerContext c) { } } + static class OutboundBinaryFragmentationEncoderClientHandler + extends OutboundBinaryFragmentationClientHandler { + OutboundBinaryFragmentationEncoderClientHandler(int frameSize) { + super(frameSize); + } + + static ByteBuf withPayload( + ByteBufAllocator allocator, WebSocketFrameFactory.Encoder encoder, byte content, int size) { + int frameSize = encoder.sizeofBinaryFrame(size); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - size); + for (int i = 0; i < size; i++) { + binaryFrame.writeByte(content); + } + return binaryFrame; + } + + @Override + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory.Encoder encoder = webSocketFrameFactory.encoder(); + ByteBufAllocator allocator = c.alloc(); + while (framesSent < frameSize) { + int size = ++framesSent; + + ByteBuf shortFragmentStart = + encoder.encodeBinaryFragmentStart(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf shortFragmentEnd = + encoder.encodeContinuationFragmentEnd( + withPayload(allocator, encoder, (byte) 0xFE, size)); + + ByteBuf longFragmentStart = + encoder.encodeBinaryFragmentStart(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf longFragmentContinuation = + encoder.encodeContinuationFragment(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf longFragmentEnd = + encoder.encodeContinuationFragmentEnd( + withPayload(allocator, encoder, (byte) 0xFE, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + } + + static class OutboundTextFragmentationEncoderClientHandler + extends OutboundTextFragmentationClientHandler { + + OutboundTextFragmentationEncoderClientHandler(int frameSize, char expectedContent) { + super(frameSize, expectedContent); + } + + static ByteBuf withPayload( + ByteBufAllocator allocator, WebSocketFrameFactory.Encoder encoder, char content, int size) { + int frameSize = encoder.sizeofTextFrame(size); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - size); + for (int i = 0; i < size; i++) { + binaryFrame.writeByte(content); + } + return binaryFrame; + } + + @Override + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory.Encoder encoder = webSocketFrameFactory.encoder(); + ByteBufAllocator allocator = c.alloc(); + while (framesSent < frameSize) { + int size = ++framesSent; + + char expected = expectedContent; + ByteBuf shortFragmentStart = + encoder.encodeTextFragmentStart(withPayload(allocator, encoder, expected, size)); + ByteBuf shortFragmentEnd = + encoder.encodeContinuationFragmentEnd(withPayload(allocator, encoder, expected, size)); + + ByteBuf longFragmentStart = + encoder.encodeTextFragmentStart(withPayload(allocator, encoder, expected, size)); + ByteBuf longFragmentContinuation = + encoder.encodeContinuationFragment(withPayload(allocator, encoder, expected, size)); + ByteBuf longFragmentEnd = + encoder.encodeContinuationFragmentEnd(withPayload(allocator, encoder, expected, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + } + + static class OutboundTextFragmentationClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + final char expectedContent; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + int framesReceived; + int framesSent; + int fragmentsReceived; + + OutboundTextFragmentationClientHandler(int frameSize, char expectedContent) { + this.frameSize = frameSize; + this.expectedContent = expectedContent; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode == WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally(new AssertionError("received close frame")); + payload.release(); + return; + } + switch (fragmentsReceived) { + case /*short start*/ 0: + case /*long start*/ 2: + { + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-text opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + fragmentsReceived++; + } + break; + case /*short end*/ 1: + case /*long end*/ 4: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (!finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final fragment, expected final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + payload.release(); + if (fragmentsReceived == /*long end*/ 4) { + fragmentsReceived = 0; + if (++framesReceived == frameSize) { + onFrameExchangeComplete.complete(null); + } + } else { + fragmentsReceived++; + } + } + break; + case /*long continuation*/ 3: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received final fragment, expected non-final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + payload.release(); + fragmentsReceived++; + } + break; + default: + throw new AssertionError("Unexpected fragmentsReceived state: " + fragmentsReceived); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + while (framesSent < frameSize) { + int size = ++framesSent; + ByteBuf shortFragmentStart = + factory.mask( + withPayload( + factory.createTextFragmentStart(c.alloc(), size), expectedContent, size)); + ByteBuf shortFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), expectedContent, size)); + + ByteBuf longFragmentStart = + factory.mask( + withPayload( + factory.createTextFragmentStart(c.alloc(), size), expectedContent, size)); + ByteBuf longFragmentContinuation = + factory.mask( + withPayload( + factory.createContinuationFragment(c.alloc(), size), expectedContent, size)); + ByteBuf longFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), expectedContent, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + + static ByteBuf withPayload(ByteBuf fragment, char content, int size) { + for (int i = 0; i < size; i++) { + fragment.writeByte(content); + } + return fragment; + } + } + + static class OutboundBinaryFragmentationClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + int framesReceived; + int framesSent; + int fragmentsReceived; + + OutboundBinaryFragmentationClientHandler(int frameSize) { + this.frameSize = frameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode == WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally(new AssertionError("received close frame")); + payload.release(); + return; + } + switch (fragmentsReceived) { + case /*short start*/ 0: + case /*long start*/ 2: + { + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-binary opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + fragmentsReceived++; + } + break; + case /*short end*/ 1: + case /*long end*/ 4: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (!finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final fragment, expected final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + payload.release(); + if (fragmentsReceived == /*long end*/ 4) { + fragmentsReceived = 0; + if (++framesReceived == frameSize) { + onFrameExchangeComplete.complete(null); + } + } else { + fragmentsReceived++; + } + } + break; + case /*long continuation*/ 3: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received final fragment, expected non-final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + payload.release(); + fragmentsReceived++; + } + break; + default: + throw new AssertionError("Unexpected fragmentsReceived state: " + fragmentsReceived); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + while (framesSent < frameSize) { + int size = ++framesSent; + ByteBuf shortFragmentStart = + factory.mask( + withPayload(factory.createBinaryFragmentStart(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf shortFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), (byte) 0xFE, size)); + + ByteBuf longFragmentStart = + factory.mask( + withPayload(factory.createBinaryFragmentStart(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf longFragmentContinuation = + factory.mask( + withPayload( + factory.createContinuationFragment(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf longFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), (byte) 0xFE, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + + static ByteBuf withPayload(ByteBuf fragment, byte content, int size) { + for (int i = 0; i < size; i++) { + fragment.writeByte(content); + } + return fragment; + } + } + static class TextFramesTestServerHandler extends ChannelInboundHandlerAdapter { final String content; final int framesCount; @@ -1186,7 +2375,7 @@ private void sendFrames(ChannelHandlerContext c, int toSend) { } } - static class BinaryFramesTestServerHandler extends ChannelInboundHandlerAdapter { + static class WebSocketFramesTestServerHandler extends ChannelInboundHandlerAdapter { boolean ready = true; boolean pendingFlush; diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java index cca27cf..61bf79b 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -172,26 +173,6 @@ void smallDecoderConfig() throws Exception { client.close(); } - @Test - void clientBuilderMissingHandler() { - org.junit.jupiter.api.Assertions.assertThrows( - IllegalStateException.class, - () -> { - WebSocketClientProtocolHandler clientProtocolHandler = - WebSocketClientProtocolHandler.create().build(); - }); - } - - @Test - void serverBuilderMissingHandler() { - org.junit.jupiter.api.Assertions.assertThrows( - IllegalStateException.class, - () -> { - WebSocketServerProtocolHandler serverProtocolHandler = - WebSocketServerProtocolHandler.create().build(); - }); - } - @Timeout(15) @Test void clientTimeout() throws InterruptedException { @@ -309,6 +290,74 @@ protected void initChannel(SocketChannel ch) { Assertions.assertThat(client.isOpen()).isFalse(); } + @Test + void noCallbackHandlerHandshake() throws Exception { + String path = "/test"; + NoCallbackServerHandler noCallbackServerHandler = new NoCallbackServerHandler(); + NoCallbackClientHandler noCallbackClientHandler = new NoCallbackClientHandler(); + + Channel s = + server = + new ServerBootstrap() + .group(new NioEventLoopGroup(1)) + .channel(NioServerSocketChannel.class) + .childHandler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + HttpServerCodec http1Codec = new HttpServerCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + WebSocketServerProtocolHandler webSocketProtocolHandler = + WebSocketServerProtocolHandler.create() + .path(path) + .decoderConfig(webSocketDecoderConfig(true, true, 125)) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast( + http1Codec, + http1Aggregator, + webSocketProtocolHandler, + noCallbackServerHandler); + } + }) + .bind("localhost", 0) + .sync() + .channel(); + + Channel client = + new Bootstrap() + .group(new NioEventLoopGroup(1)) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + HttpClientCodec http1Codec = new HttpClientCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + WebSocketClientProtocolHandler webSocketProtocolHandler = + WebSocketClientProtocolHandler.create() + .path(path) + .allowMaskMismatch(true) + .maxFramePayloadLength(125) + .mask(true) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast( + http1Codec, + http1Aggregator, + webSocketProtocolHandler, + noCallbackClientHandler); + } + }) + .connect(s.localAddress()) + .sync() + .channel(); + + noCallbackClientHandler.exchangeCompleted.get(5, TimeUnit.SECONDS); + } + @SuppressWarnings("deprecation") @Timeout(15) @Test @@ -565,6 +614,118 @@ public void onClose(ChannelHandlerContext ctx) { } } + private static class NoCallbackClientHandler extends ChannelInboundHandlerAdapter { + Promise exchangeCompleted; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + exchangeCompleted = ctx.newPromise(); + super.handlerAdded(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + exchangeCompleted.tryFailure(new ClosedChannelException()); + } + + @SuppressWarnings("Convert2Lambda") + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt + == io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler + .ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { + WebSocketCallbacksHandler.exchange( + ctx, + new WebSocketCallbacksHandler() { + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + ctx.writeAndFlush( + webSocketFrameFactory.mask( + webSocketFrameFactory.createBinaryFrame(ctx.alloc(), 1).writeByte(0xFE))); + + return new WebSocketFrameListener() { + @Override + public void onChannelRead( + ChannelHandlerContext context, + boolean finalFragment, + int rsv, + int opcode, + ByteBuf payload) { + int readableBytes = payload.readableBytes(); + if (readableBytes != 1) { + payload.release(); + exchangeCompleted.setFailure( + new IllegalStateException("unexpected payload size: " + readableBytes)); + return; + } + byte content = payload.readByte(); + if (content != (byte) 0xFE) { + payload.release(); + exchangeCompleted.setFailure( + new IllegalStateException( + "unexpected payload content: " + Integer.toHexString(content))); + return; + } + payload.release(); + exchangeCompleted.setSuccess(null); + } + }; + } + }); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.safeRelease(msg); + } + } + + private static class NoCallbackServerHandler extends ChannelInboundHandlerAdapter { + + @SuppressWarnings("Convert2Lambda") + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt + instanceof + io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete) { + WebSocketCallbacksHandler.exchange( + ctx, + new WebSocketCallbacksHandler() { + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + return new WebSocketFrameListener() { + @Override + public void onChannelRead( + ChannelHandlerContext context, + boolean finalFragment, + int rsv, + int opcode, + ByteBuf payload) { + ByteBuf binaryFrame = + webSocketFrameFactory.mask( + webSocketFrameFactory.createBinaryFrame( + ctx.alloc(), payload.readableBytes())); + binaryFrame.writeBytes(payload); + payload.release(); + ctx.writeAndFlush(binaryFrame); + } + }; + } + }); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.safeRelease(msg); + } + } + static WebSocketDecoderConfig webSocketDecoderConfig( boolean expectMasked, boolean allowMaskMismatch, int maxFramePayloadLength) { return WebSocketDecoderConfig.newBuilder() diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java index b4bdb5c..9843801 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java @@ -19,6 +19,8 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -37,7 +39,10 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -268,6 +273,76 @@ void invalidFragmentCompletion() throws Exception { } } + @Test + void utf8Validator() { + String ascii = "Are those shy Eurasian footwear, cowboy chaps, or jolly earthmoving headgear"; + String utf8 = "Чуєш їх, доцю, га? Кумедна ж ти, прощайся без ґольфів!"; + List asciiList = stringList(ByteBufAllocator.DEFAULT, ascii); + List utf8List = stringList(ByteBufAllocator.DEFAULT, utf8); + try { + WebSocketFrameListener.Utf8FrameValidator validator = + WebSocketFrameListener.Utf8FrameValidator.create(); + for (ByteBuf byteBuf : asciiList) { + Assertions.assertThat(validator.validateTextFrame(byteBuf)).isTrue(); + } + for (ByteBuf byteBuf : utf8List) { + Assertions.assertThat(validator.validateTextFrame(byteBuf)).isTrue(); + } + } finally { + for (ByteBuf byteBuf : asciiList) { + byteBuf.release(); + } + for (ByteBuf byteBuf : utf8List) { + byteBuf.release(); + } + } + } + + static List stringList(ByteBufAllocator allocator, String string) { + int length = string.length(); + List list = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + String substring = string.substring(0, i + 1); + ByteBuf byteBuf = ByteBufUtil.writeUtf8(allocator, substring); + list.add(byteBuf); + } + return list; + } + + @Test + void utf8TextFrameValidator() { + ByteBufAllocator alloc = ByteBufAllocator.DEFAULT; + List utf8 = + Arrays.asList( + ByteBufUtil.writeUtf8(alloc, "ab"), + ByteBufUtil.writeUtf8(alloc, "c"), + ByteBufUtil.writeUtf8(alloc, "def"), + ByteBufUtil.writeUtf8(alloc, "ghijk"), + ByteBufUtil.writeUtf8(alloc, "lmn")); + ByteBuf nonUtf8 = alloc.buffer(2).writeByte(0xc3).writeByte(0x28); + + WebSocketFrameListener.Utf8FrameValidator validator = + WebSocketFrameListener.Utf8FrameValidator.create(); + + try { + Assertions.assertThat(validator.validateTextFrame(utf8.get(0))).isTrue(); + Assertions.assertThat(validator.state).isEqualTo(0); + Assertions.assertThat(validator.codep).isEqualTo(0); + Assertions.assertThat(validator.validateTextFragmentStart(utf8.get(1))).isTrue(); + Assertions.assertThat(validator.validateFragmentContinuation(utf8.get(2))).isTrue(); + Assertions.assertThat(validator.validateFragmentEnd(utf8.get(3))).isTrue(); + Assertions.assertThat(validator.state).isEqualTo(0); + Assertions.assertThat(validator.codep).isEqualTo(0); + Assertions.assertThat(validator.validateTextFrame(utf8.get(4))).isTrue(); + Assertions.assertThat(validator.validateTextFrame(nonUtf8)).isFalse(); + } finally { + for (ByteBuf string : utf8) { + string.release(); + } + nonUtf8.release(); + } + } + static WebSocketDecoderConfig decoderConfig(int maxFramePayloadLength) { return WebSocketDecoderConfig.newBuilder() .allowMaskMismatch(true) diff --git a/netty-websocket-http1/gradle.lockfile b/netty-websocket-http1/gradle.lockfile index 99ee8e2..9a50314 100644 --- a/netty-websocket-http1/gradle.lockfile +++ b/netty-websocket-http1/gradle.lockfile @@ -7,13 +7,13 @@ com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 com.google.guava:guava:22.0=googleJavaFormat1.6 com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 -io.netty:netty-buffer:4.1.109.Final=compileClasspath -io.netty:netty-codec-http:4.1.109.Final=compileClasspath -io.netty:netty-codec:4.1.109.Final=compileClasspath -io.netty:netty-common:4.1.109.Final=compileClasspath -io.netty:netty-handler:4.1.109.Final=compileClasspath -io.netty:netty-resolver:4.1.109.Final=compileClasspath -io.netty:netty-transport-native-unix-common:4.1.109.Final=compileClasspath -io.netty:netty-transport:4.1.109.Final=compileClasspath +io.netty:netty-buffer:4.1.112.Final=compileClasspath +io.netty:netty-codec-http:4.1.112.Final=compileClasspath +io.netty:netty-codec:4.1.112.Final=compileClasspath +io.netty:netty-common:4.1.112.Final=compileClasspath +io.netty:netty-handler:4.1.112.Final=compileClasspath +io.netty:netty-resolver:4.1.112.Final=compileClasspath +io.netty:netty-transport-native-unix-common:4.1.112.Final=compileClasspath +io.netty:netty-transport:4.1.112.Final=compileClasspath org.codehaus.mojo:animal-sniffer-annotations:1.14=googleJavaFormat1.6 empty=annotationProcessor diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java index a587daa..3ecd570 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java @@ -20,6 +20,7 @@ import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_CLOSE; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PING; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PONG; +import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_TEXT; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -55,6 +56,14 @@ static class FrameFactory static final int PREFIX_SIZE_SMALL = 6; static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + static final int TEXT_FRAME_SMALL = + OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + + static final int BINARY_FRAGMENT_START_SMALL = OPCODE_BINARY << 8 | /*MASK*/ (byte) 1 << 7; + static final int TEXT_FRAGMENT_START_SMALL = OPCODE_TEXT << 8 | /*MASK*/ (byte) 1 << 7; + static final int DATA_FRAGMENT_CONTINUATION_SMALL = /*MASK*/ (byte) 1 << 7; + static final int DATA_FRAGMENT_CONTINUATION_END_SMALL = /*FIN*/ + (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; @@ -65,27 +74,74 @@ static class FrameFactory static final int PREFIX_SIZE_MEDIUM = 8; static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + + static final int BINARY_FRAGMENT_START_MEDIUM = + (BINARY_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAGMENT_START_MEDIUM = + (TEXT_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_END_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_END_SMALL | /*LEN*/ (byte) 126) << 16; static final WebSocketFrameFactory INSTANCE = new FrameFactory(); - @Override - public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + static ByteBuf createDataFrame( + ByteBufAllocator allocator, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { return allocator .buffer(PREFIX_SIZE_SMALL + payloadSize) - .writeShort(BINARY_FRAME_SMALL | payloadSize) + .writeShort(prefixSmall | payloadSize) .readerIndex(2) .writeInt(mask()); } else if (payloadSize <= 65_535) { return allocator .buffer(PREFIX_SIZE_MEDIUM + payloadSize) - .writeLong((long) (BINARY_FRAME_MEDIUM | payloadSize) << 32 | mask()) + .writeLong((long) (prefixMedium | payloadSize) << 32 | mask()) .readerIndex(4); } else { throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } } + @Override + public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + return createDataFrame(allocator, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf createTextFrame(ByteBufAllocator allocator, int payloadSize) { + return createDataFrame(allocator, payloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + @Override + public ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + return createDataFrame( + allocator, binaryDataSize, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame( + allocator, textDataSize, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, dataSize, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, + dataSize, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -155,11 +211,50 @@ public BulkEncoder bulkEncoder() { @Override public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { + return encodeDataFrame(binaryFrame, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf encodeTextFrame(ByteBuf textFrame) { + return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + @Override + public ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame(fragmentFrame, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + + @Override + public int sizeofFragment(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 6; if (frameSize <= 125 + smallPrefixSize) { int payloadSize = frameSize - smallPrefixSize; - binaryFrame.setShort(0, BINARY_FRAME_SMALL | payloadSize); + binaryFrame.setShort(0, prefixSmall | payloadSize); int mask = mask(); binaryFrame.setInt(2, mask); return mask(mask, binaryFrame, smallPrefixSize, binaryFrame.writerIndex()); @@ -169,7 +264,7 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { if (frameSize <= 65_535 + mediumPrefixSize) { int payloadSize = frameSize - mediumPrefixSize; int mask = mask(); - binaryFrame.setLong(0, ((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask); + binaryFrame.setLong(0, ((prefixMedium | (long) payloadSize) << 32) | mask); return mask(mask, binaryFrame, mediumPrefixSize, binaryFrame.writerIndex()); } int payloadSize = frameSize - 12; @@ -178,8 +273,18 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { @Override public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { + return encodeDataFramePrefix(byteBuf, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public int encodeTextFramePrefix(ByteBuf byteBuf, int textPayloadSize) { + return encodeDataFramePrefix(byteBuf, textPayloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + static int encodeDataFramePrefix( + ByteBuf byteBuf, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { - byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize); + byteBuf.writeShort(prefixSmall | payloadSize); int mask = mask(); byteBuf.writeInt(mask); return mask; @@ -187,7 +292,7 @@ public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { if (payloadSize <= 65_535) { int mask = mask(); - byteBuf.writeLong(((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask); + byteBuf.writeLong(((prefixMedium | (long) payloadSize) << 32) | mask); return mask; } throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); @@ -195,6 +300,15 @@ public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { @Override public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { + return maskDataFrame(byteBuf, mask, payloadSize); + } + + @Override + public ByteBuf maskTextFrame(ByteBuf byteBuf, int mask, int textPayloadSize) { + return maskDataFrame(byteBuf, mask, textPayloadSize); + } + + static ByteBuf maskDataFrame(ByteBuf byteBuf, int mask, int payloadSize) { int end = byteBuf.writerIndex(); int start = end - payloadSize; return mask(mask, byteBuf, start, end); @@ -202,6 +316,15 @@ public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { @Override public int sizeofBinaryFrame(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + + @Override + public int sizeofTextFrame(int textPayloadSize) { + return sizeOfDataFrame(textPayloadSize); + } + + static int sizeOfDataFrame(int payloadSize) { if (payloadSize <= 125) { return payloadSize + 6; } diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java index a120f7e..dfd3fde 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java @@ -20,6 +20,7 @@ import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_CLOSE; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PING; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PONG; +import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_TEXT; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -53,6 +54,12 @@ static class FrameFactory WebSocketFrameFactory.BulkEncoder { static final int PREFIX_SIZE_SMALL = 2; static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15; + static final int TEXT_FRAME_SMALL = OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15; + + static final int BINARY_FRAGMENT_START_SMALL = OPCODE_BINARY << 8; + static final int TEXT_FRAGMENT_START_SMALL = OPCODE_TEXT << 8; + static final int DATA_FRAGMENT_CONTINUATION_SMALL = 0; + static final int DATA_FRAGMENT_CONTINUATION_END_SMALL = /*FIN*/ (byte) 1 << 15; static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15; static final int PING_FRAME = OPCODE_PING << 8 | /*FIN*/ (byte) 1 << 15; @@ -60,25 +67,72 @@ static class FrameFactory static final int PREFIX_SIZE_MEDIUM = 4; static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + + static final int BINARY_FRAGMENT_START_MEDIUM = + (BINARY_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAGMENT_START_MEDIUM = + (TEXT_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_END_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_END_SMALL | /*LEN*/ (byte) 126) << 16; static final WebSocketFrameFactory INSTANCE = new FrameFactory(); - @Override - public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + static ByteBuf createDataFrame( + ByteBufAllocator allocator, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { return allocator .buffer(PREFIX_SIZE_SMALL + payloadSize) - .writeShort(BINARY_FRAME_SMALL | payloadSize); + .writeShort(prefixSmall | payloadSize); } if (payloadSize <= 65_535) { return allocator .buffer(PREFIX_SIZE_MEDIUM + payloadSize) - .writeInt(BINARY_FRAME_MEDIUM | payloadSize); + .writeInt(prefixMedium | payloadSize); } throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } + @Override + public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + return createDataFrame(allocator, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame(allocator, textDataSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + @Override + public ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + return createDataFrame( + allocator, binaryDataSize, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame( + allocator, textDataSize, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, dataSize, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, + dataSize, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -136,17 +190,56 @@ public BulkEncoder bulkEncoder() { @Override public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { + return encodeDataFrame(binaryFrame, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf encodeTextFrame(ByteBuf textFrame) { + return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + @Override + public ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame(fragmentFrame, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + + @Override + public int sizeofFragment(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 2; if (frameSize <= 125 + smallPrefixSize) { int payloadSize = frameSize - smallPrefixSize; - return binaryFrame.setShort(0, BINARY_FRAME_SMALL | payloadSize); + return binaryFrame.setShort(0, prefixSmall | payloadSize); } int mediumPrefixSize = 4; if (frameSize <= 65_535 + mediumPrefixSize) { int payloadSize = frameSize - mediumPrefixSize; - return binaryFrame.setInt(0, BINARY_FRAME_MEDIUM | payloadSize); + return binaryFrame.setInt(0, prefixMedium | payloadSize); } int payloadSize = frameSize - 8; throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); @@ -154,10 +247,20 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { @Override public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { + return encodeDataFramePrefix(byteBuf, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public int encodeTextFramePrefix(ByteBuf byteBuf, int textPayloadSize) { + return encodeDataFramePrefix(byteBuf, textPayloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + static int encodeDataFramePrefix( + ByteBuf byteBuf, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { - byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize); + byteBuf.writeShort(prefixSmall | payloadSize); } else if (payloadSize <= 65_535) { - byteBuf.writeInt(BINARY_FRAME_MEDIUM | payloadSize); + byteBuf.writeInt(prefixMedium | payloadSize); } else { throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } @@ -169,8 +272,22 @@ public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { return byteBuf; } + @Override + public ByteBuf maskTextFrame(ByteBuf byteBuf, int mask, int textPayloadSize) { + return byteBuf; + } + @Override public int sizeofBinaryFrame(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + + @Override + public int sizeofTextFrame(int textPayloadSize) { + return sizeOfDataFrame(textPayloadSize); + } + + static int sizeOfDataFrame(int payloadSize) { if (payloadSize <= 125) { return payloadSize + 2; } diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java index ea31759..3e3a812 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java @@ -63,7 +63,7 @@ private WebSocketClientProtocolHandler( boolean allowMaskMismatch, int maxFramePayloadLength, long handshakeTimeoutMillis, - WebSocketCallbacksHandler webSocketHandler) { + @Nullable WebSocketCallbacksHandler webSocketHandler) { this.address = address; this.path = path; this.subprotocol = subprotocol; @@ -93,6 +93,11 @@ public ChannelFuture handshakeCompleted() { return completed; } + @Override + public boolean isSharable() { + return false; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeCompleted = ctx.newPromise(); @@ -178,7 +183,10 @@ private void completeHandshake(ChannelHandlerContext ctx, FullHttpResponse respo cancelHandshakeTimeout(); } ctx.pipeline().remove(this); - WebSocketCallbacksHandler.exchange(ctx, webSocketHandler); + WebSocketCallbacksHandler handler = webSocketHandler; + if (handler != null) { + WebSocketCallbacksHandler.exchange(ctx, handler); + } handshakeCompleted.trySuccess(); ctx.fireUserEventTriggered( io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler @@ -338,16 +346,13 @@ public Builder handshakeTimeoutMillis(long handshakeTimeoutMillis) { * @param webSocketHandler handler to process successfully handshaked webSocket * @return this Builder instance */ - public Builder webSocketHandler(WebSocketCallbacksHandler webSocketHandler) { - this.webSocketHandler = Objects.requireNonNull(webSocketHandler, "webSocketHandler"); + public Builder webSocketHandler(@Nullable WebSocketCallbacksHandler webSocketHandler) { + this.webSocketHandler = webSocketHandler; return this; } /** @return new WebSocketClientProtocolHandler instance */ public WebSocketClientProtocolHandler build() { - if (webSocketHandler == null) { - throw new IllegalStateException("webSocketHandler was not provided"); - } int maxPayloadLength = maxFramePayloadLength; boolean maskMismatch = allowMaskMismatch; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java index bbc8ee8..39cab66 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java @@ -20,13 +20,38 @@ import io.netty.buffer.ByteBufAllocator; /** - * Creates frame ByteBufs containing webSocket prefix. It is user's responsibility to call ByteBuf - * mask(ByteBuf) after frame payload is written. + * Creates frame bytebuffers containing webSocket prefix. It is user's responsibility to call + * ByteBuf mask(ByteBuf) after data frame payload is written. */ public interface WebSocketFrameFactory { ByteBuf createBinaryFrame(ByteBufAllocator allocator, int binaryDataSize); + default ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createTextFrame() not implemented"); + } + + default ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createBinaryFragmentStart() not implemented"); + } + + default ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createTextFragmentStart() not implemented"); + } + + default ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createContinuationFragment() not implemented"); + } + + default ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createContinuationFragmentEnd() not implemented"); + } + ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason); ByteBuf createPingFrame(ByteBufAllocator allocator, int binaryDataSize); @@ -41,15 +66,50 @@ default BulkEncoder bulkEncoder() { throw new UnsupportedOperationException("WebSocketFrameFactory.bulkEncoder() not implemented"); } - /** Encodes prefix of single binary websocket frame into provided bytebuffer. */ + /** Encodes prefix of single data websocket frame into provided bytebuffer. */ interface Encoder { ByteBuf encodeBinaryFrame(ByteBuf binaryFrame); int sizeofBinaryFrame(int payloadSize); + + default ByteBuf encodeTextFrame(ByteBuf textFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.encodeTextFrame() not implemented"); + } + + default int sizeofTextFrame(int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.sizeofTextFrame() not implemented"); + } + + default ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeBinaryFragmentStart() not implemented"); + } + + default ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeTextFragmentStart() not implemented"); + } + + default ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeContinuationFragment() not implemented"); + } + + default ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeContinuationFragmentEnd() not implemented"); + } + + default int sizeofFragment(int payloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.sizeofFragment() not implemented"); + } } - /** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */ + /** Encodes prefixes of multiple data websocket frames into provided bytebuffer. */ interface BulkEncoder { /** @return frame mask, or -1 if masking not applicable */ @@ -58,5 +118,21 @@ interface BulkEncoder { ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize); int sizeofBinaryFrame(int payloadSize); + + /** @return frame mask, or -1 if masking not applicable */ + default int encodeTextFramePrefix(ByteBuf byteBuf, int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.BulkEncoder.encodeTextFramePrefix() not implemented"); + } + + default ByteBuf maskTextFrame(ByteBuf byteBuf, int mask, int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.BulkEncoder.maskTextFrame() not implemented"); + } + + default int sizeofTextFrame(int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.BulkEncoder.sizeofTextFrame() not implemented"); + } } } diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java index 7c5bfdf..9dd7957 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java @@ -72,4 +72,141 @@ public static String reason(ByteBuf payload) { Short.BYTES, payload.readableBytes() - Short.BYTES, CharsetUtil.UTF_8); } } + + /** + * UTF8 finite state machine based implementation from + * https://bjoern.hoehrmann.de/utf-8/decoder/dfa/ optimized for ASCII content. + */ + final class Utf8FrameValidator { + public static final int UTF8_VALIDATION_ERROR_CODE = 1007; + public static final String UTF8_VALIDATION_ERROR_MESSAGE = + "inbound text frame with non-utf8 contents"; + + private static final int UTF8_ACCEPT = 0; + private static final int UTF8_REJECT = 12; + + private static final byte[] TYPES = { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, + 9, 9, 9, 9, 9, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, + 7, 7, 7, 7, 7, 7, 8, 8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + 2, 2, 2, 2, 2, 2, 2, 10, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 3, 3, 11, 6, 6, 6, 5, 8, 8, 8, + 8, 8, 8, 8, 8, 8, 8, 8 + }; + + private static final byte[] STATES = { + 0, 12, 24, 36, 60, 96, 84, 12, 12, 12, 48, 72, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, + 12, 0, 12, 12, 12, 12, 12, 0, 12, 0, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 24, 12, 12, + 12, 12, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 12, 12, 24, 12, + 12, 12, 12, 12, 12, 12, 12, 12, 36, 12, 36, 12, 12, 12, 36, 12, 12, 12, 12, 12, 36, 12, 36, + 12, 12, 12, 36, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12 + }; + + int state = UTF8_ACCEPT; + int codep; + + private Utf8FrameValidator() {} + + public static Utf8FrameValidator create() { + return new Utf8FrameValidator(); + } + + /** + * @param buffer text frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateTextFrame(ByteBuf buffer) { + checkUtf8(buffer); + int st = state; + state = UTF8_ACCEPT; + codep = 0; + return st == UTF8_ACCEPT; + } + + /** + * @param buffer text fragment frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateTextFragmentStart(ByteBuf buffer) { + checkUtf8(buffer); + return state != UTF8_REJECT; + } + + /** + * @param buffer text fragment frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateFragmentContinuation(ByteBuf buffer) { + return validateTextFragmentStart(buffer); + } + + /** + * @param buffer text fragment frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateFragmentEnd(ByteBuf buffer) { + return validateTextFrame(buffer); + } + + private void checkUtf8(ByteBuf buffer) { + int readableBytes = buffer.readableBytes(); + int from = buffer.readerIndex(); + int to = from + readableBytes; + boolean cont = true; + int step = Long.BYTES; + while (to - from >= step) { + long bytes = buffer.getLong(from); + if ( + /*is non-ascii*/ (bytes & 0x8080808080808080L) != 0) { + for (int i = 0; i < step; i++) { + byte b = (byte) ((bytes >> 8 * (step - (i + 1))) & 0xFF); + cont = checkUtf8(b); + if (!cont) { + break; + } + } + } + from += step; + } + if (cont) { + step = Integer.BYTES; + while (to - from >= step) { + int bytes = buffer.getInt(from); + if ( + /*is non-ascii*/ (bytes & 0x80808080) != 0) { + for (int i = 0; i < step; i++) { + byte b = (byte) ((bytes >> 8 * (step - (i + 1))) & 0xFF); + cont = checkUtf8(b); + if (!cont) { + break; + } + } + } + from += step; + } + } + if (cont) { + while (to - from >= 1) { + byte b = buffer.getByte(from); + cont = checkUtf8(b); + if (!cont) { + break; + } + from += 1; + } + } + } + + private boolean checkUtf8(byte bufferByte) { + byte type = TYPES[bufferByte & 0xFF]; + int st = state; + codep = st != UTF8_ACCEPT ? bufferByte & 0x3f | codep << 6 : 0xff >> type & bufferByte; + st = state = STATES[st + type]; + + return st != UTF8_REJECT; + } + } } diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index d8ae34a..9811a6d 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -20,6 +20,8 @@ import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -65,7 +67,7 @@ private WebSocketServerProtocolHandler( String subprotocols, WebSocketDecoderConfig webSocketDecoderConfig, long handshakeTimeoutMillis, - WebSocketCallbacksHandler webSocketHandler) { + @Nullable WebSocketCallbacksHandler webSocketHandler) { this.path = path; this.subprotocols = subprotocols; this.decoderConfig = webSocketDecoderConfig; @@ -87,6 +89,11 @@ public ChannelFuture handshakeCompleted() { return completed; } + @Override + public boolean isSharable() { + return false; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeCompleted = ctx.newPromise(); @@ -177,18 +184,23 @@ private void handleHandshakeResult( if (cause != null) { handshake.tryFailure(cause); if (cause instanceof WebSocketHandshakeException) { + String errorMessage = cause.getMessage(); + ByteBuf errorContent = + errorMessage == null || errorMessage.isEmpty() + ? Unpooled.EMPTY_BUFFER + : ByteBufUtil.writeUtf8(ctx.alloc(), errorMessage); FullHttpResponse response = - new DefaultFullHttpResponse( - HTTP_1_1, - HttpResponseStatus.BAD_REQUEST, - Unpooled.wrappedBuffer(cause.getMessage().getBytes())); + new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST, errorContent); ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } else { ctx.fireExceptionCaught(cause); ctx.close(); } } else { - WebSocketCallbacksHandler.exchange(ctx, webSocketHandler); + WebSocketCallbacksHandler handler = webSocketHandler; + if (handler != null) { + WebSocketCallbacksHandler.exchange(ctx, handler); + } handshake.trySuccess(); ChannelPipeline p = ctx.channel().pipeline(); p.fireUserEventTriggered( @@ -294,19 +306,15 @@ public Builder handshakeTimeoutMillis(long handshakeTimeoutMillis) { * @param webSocketHandler handler to process successfully handshaked webSocket * @return this Builder instance */ - public Builder webSocketCallbacksHandler(WebSocketCallbacksHandler webSocketHandler) { - this.webSocketCallbacksHandler = Objects.requireNonNull(webSocketHandler, "webSocketHandler"); + public Builder webSocketCallbacksHandler(@Nullable WebSocketCallbacksHandler webSocketHandler) { + this.webSocketCallbacksHandler = webSocketHandler; return this; } /** @return new WebSocketServerProtocolHandler instance */ public WebSocketServerProtocolHandler build() { - WebSocketCallbacksHandler handler = webSocketCallbacksHandler; - if (handler == null) { - throw new IllegalStateException("webSocketCallbacksHandler was not provided"); - } return new WebSocketServerProtocolHandler( - path, subprotocols, decoderConfig, handshakeTimeoutMillis, handler); + path, subprotocols, decoderConfig, handshakeTimeoutMillis, webSocketCallbacksHandler); } private static long requirePositive(long val, String desc) { diff --git a/soak_client.sh b/soak_client.sh deleted file mode 100755 index 73d13df..0000000 --- a/soak_client.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -./gradlew netty-websocket-http1-soaktest:runClient \ No newline at end of file diff --git a/soak_client_run.sh b/soak_client_run.sh new file mode 100755 index 0000000..1e81c47 --- /dev/null +++ b/soak_client_run.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +cd netty-websocket-http1-soaktest/build/install/netty-websocket-http1-soaktest/bin && ./netty-websocket-http1-soaktest-client \ No newline at end of file diff --git a/soak_server.sh b/soak_server.sh deleted file mode 100755 index 8df1e31..0000000 --- a/soak_server.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -./gradlew netty-websocket-http1-soaktest:runServer \ No newline at end of file diff --git a/soak_server_run.sh b/soak_server_run.sh new file mode 100755 index 0000000..afc1f0b --- /dev/null +++ b/soak_server_run.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +export NETTY_WEBSOCKET_HTTP1_PERFTEST_SERVER_OPTS='--add-exports java.base/sun.security.x509=ALL-UNNAMED' + +cd netty-websocket-http1-soaktest/build/install/netty-websocket-http1-soaktest/bin && ./netty-websocket-http1-soaktest-server \ No newline at end of file