From 65afe44aa5758e6c6fdbe9ef4011c0d3809ddccc Mon Sep 17 00:00:00 2001 From: walklown Date: Mon, 1 Apr 2024 00:14:18 +0800 Subject: [PATCH 1/4] Triple protocol http1 upgrade support --- .../rpc/protocol/tri/TripleHttp2Protocol.java | 81 +++++++++++++++---- .../h12/HttpServerAfterUpgradeHandler.java | 69 ++++++++++++++++ 2 files changed, 133 insertions(+), 17 deletions(-) create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index ea088c9af19..a462d9e1aae 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -31,8 +31,10 @@ import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec; import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler; import org.apache.dubbo.remoting.utils.UrlUtils; +import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.FrameworkModel; import org.apache.dubbo.rpc.model.ScopeModelAware; +import org.apache.dubbo.rpc.protocol.tri.h12.HttpServerAfterUpgradeHandler; import org.apache.dubbo.rpc.protocol.tri.h12.TripleProtocolDetector; import org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportListenerFactory; import org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory; @@ -48,14 +50,18 @@ import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.logging.LogLevel; +import io.netty.util.AsciiString; import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY; import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY; @@ -143,31 +149,54 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) { } private void configurerHttp1Handlers(URL url, List handlers) { - handlers.add(new ChannelHandlerPretender(new HttpServerCodec())); + final HttpServerCodec sourceCodec = new HttpServerCodec(); + handlers.add(new ChannelHandlerPretender(sourceCodec)); + // Triple protocol http1 upgrade support + handlers.add(new ChannelHandlerPretender(new HttpServerUpgradeHandler( + sourceCodec, + protocol -> { + if (!AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { + // Not upgrade request + return null; + } + return buildHttp2ServerUpgradeCodec(url); + }, + Integer.MAX_VALUE))); + // If the upgrade was successful, remove the message from the output list + // so that it's not propagated to the next handler. This request will + // be propagated as a user event instead. handlers.add(new ChannelHandlerPretender(new HttpObjectAggregator(Integer.MAX_VALUE))); handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec())); handlers.add(new ChannelHandlerPretender(new NettyHttp1ConnectionHandler( url, frameworkModel, DefaultHttp11ServerTransportListenerFactory.INSTANCE))); } + private Http2ServerUpgradeCodec buildHttp2ServerUpgradeCodec(URL url) { + Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel()); + final Http2FrameCodec codec = buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel()); + final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel ch) { + final ChannelPipeline p = ch.pipeline(); + p.addLast(new NettyHttp2FrameCodec()); + p.addLast(new NettyHttp2ProtocolSelectorHandler( + url, frameworkModel, GenericHttp2ServerTransportListenerFactory.INSTANCE)); + } + }); + + return new Http2ServerUpgradeCodec( + codec, + new HttpServerAfterUpgradeHandler(), + new HttpWriteQueueHandler(), + new FlushConsolidationHandler(64, true), + new TripleServerConnectionHandler(), + handler, + new TripleTailHandler()); + } + private void configurerHttp2Handlers(URL url, List handlers) { Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel()); - final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer() - .customizeConnection((connection) -> connection - .remote() - .flowController( - new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel()))) - .gracefulShutdownTimeoutMillis(10000) - .initialSettings(new Http2Settings() - .headerTableSize( - config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE)) - .maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE)) - .initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE)) - .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE)) - .maxHeaderListSize( - config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, DEFAULT_MAX_HEADER_LIST_SIZE))) - .frameLogger(SERVER_LOGGER) - .build(); + final Http2FrameCodec codec = buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel()); final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer() { @Override protected void initChannel(Http2StreamChannel ch) { @@ -184,4 +213,22 @@ protected void initChannel(Http2StreamChannel ch) { handlers.add(new ChannelHandlerPretender(handler)); handlers.add(new ChannelHandlerPretender(new TripleTailHandler())); } + + private Http2FrameCodec buildHttp2FrameCodec(Configuration config, ApplicationModel applicationModel) { + return TripleHttp2FrameCodecBuilder.forServer() + .customizeConnection((connection) -> connection + .remote() + .flowController(new TriHttp2RemoteFlowController(connection, applicationModel))) + .gracefulShutdownTimeoutMillis(10000) + .initialSettings(new Http2Settings() + .headerTableSize( + config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE)) + .maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE)) + .initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE)) + .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE)) + .maxHeaderListSize( + config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, DEFAULT_MAX_HEADER_LIST_SIZE))) + .frameLogger(SERVER_LOGGER) + .build(); + } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java new file mode 100644 index 00000000000..30c89087d50 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.h12; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameStream; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.InboundHttpToHttp2Adapter; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; + +/** + * If an upgrade occurred, the program need send a simple response via HTTP/2 on stream 1 (the stream specifically reserved + * for cleartext HTTP upgrade). However, {@link Http2FrameCodec} send 'upgradeRequest' to upgraded channel handlers by + * {@link InboundHttpToHttp2Adapter} (As it noted that this may behave strangely). So we need to distinguish the 'upgradeRequest' + * and send the response.
+ * + * @see HttpServerUpgradeHandler + * @see Http2FrameCodec + * @see InboundHttpToHttp2Adapter + * @since 3.3.0 + */ +@Sharable +public class HttpServerAfterUpgradeHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof DefaultHttp2HeadersFrame) { + DefaultHttp2HeadersFrame headersFrame = (DefaultHttp2HeadersFrame) msg; + if (headersFrame.stream().id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && headersFrame.isEndStream()) { + // upgradeRequest + sendResponse(ctx, headersFrame.stream()); + return; + } + } + super.channelRead(ctx, msg); + } + + /** + * Send a frame for the response status + */ + private static void sendResponse(ChannelHandlerContext ctx, Http2FrameStream stream) { + Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText()); + ctx.write(new DefaultHttp2HeadersFrame(headers).stream(stream)); + ctx.write(new DefaultHttp2DataFrame(true).stream(stream)); + } +} From 7b9da205bae184e9f3a69dae1436f810b3d86701 Mon Sep 17 00:00:00 2001 From: walklown Date: Fri, 26 Apr 2024 11:18:21 +0800 Subject: [PATCH 2/4] support Application-Layer Protocol Negotiation --- dubbo-remoting/dubbo-remoting-netty4/pom.xml | 4 + .../NettyPortUnificationServerHandler.java | 100 ++++++++++-------- .../transport/netty4/ssl/SslContexts.java | 15 ++- 3 files changed, 76 insertions(+), 43 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-netty4/pom.xml b/dubbo-remoting/dubbo-remoting-netty4/pom.xml index ec180ddb3fb..06d548d1bdf 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/pom.xml +++ b/dubbo-remoting/dubbo-remoting-netty4/pom.xml @@ -84,6 +84,10 @@ linux-aarch_64 runtime + + io.netty + netty-codec-http2 + org.apache.dubbo diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java index fe07a9e69a0..2b6480741a9 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java @@ -39,6 +39,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandshakeCompletionEvent; @@ -119,48 +121,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t if (providerConnectionConfig != null && isSsl(in)) { enableSsl(ctx, providerConnectionConfig); } else { - Set supportedProtocolNames = new HashSet<>(protocols.keySet()); - supportedProtocolNames.retainAll(urlMapper.keySet()); - - for (final String name : supportedProtocolNames) { - WireProtocol protocol = protocols.get(name); - in.markReaderIndex(); - ChannelBuffer buf = new NettyBackedChannelBuffer(in); - final ProtocolDetector.Result result = protocol.detector().detect(buf); - in.resetReaderIndex(); - switch (result.flag()) { - case UNRECOGNIZED: - continue; - case RECOGNIZED: - ChannelHandler localHandler = this.handlerMapper.getOrDefault(name, handler); - URL localURL = this.urlMapper.getOrDefault(name, url); - channel.setUrl(localURL); - NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler); - operator.setDetectResult(result); - protocol.configServerProtocolHandler(url, operator); - ctx.pipeline().remove(this); - case NEED_MORE_DATA: - return; - default: - return; - } - } - byte[] preface = new byte[in.readableBytes()]; - in.readBytes(preface); - Set supported = url.getApplicationModel() - .getExtensionLoader(WireProtocol.class) - .getSupportedExtensions(); - LOGGER.error( - INTERNAL_ERROR, - "unknown error in remoting module", - "", - String.format( - "Can not recognize protocol from downstream=%s . " + "preface=%s protocols=%s", - ctx.channel().remoteAddress(), Bytes.bytes2hex(preface), supported)); - - // Unknown protocol; discard everything and close the connection. - in.clear(); - ctx.close(); + invokeProtocol(ctx, url, channel, in); } } @@ -171,6 +132,17 @@ private void enableSsl(ChannelHandlerContext ctx, ProviderCert providerConnectio p.addLast( "unificationA", new NettyPortUnificationServerHandler(url, false, protocols, handler, urlMapper, handlerMapper)); + p.addLast("ALPN", new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) { + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { + if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) { + return; + } + NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + ByteBuf in = ctx.alloc().buffer(); + invokeProtocol(ctx, url, channel, in); + } + }); p.remove(this); } @@ -181,4 +153,48 @@ private boolean isSsl(ByteBuf buf) { } return false; } + + private void invokeProtocol(ChannelHandlerContext ctx, URL url, NettyChannel channel, ByteBuf in) { + Set supportedProtocolNames = new HashSet<>(protocols.keySet()); + supportedProtocolNames.retainAll(urlMapper.keySet()); + + for (final String name : supportedProtocolNames) { + WireProtocol protocol = protocols.get(name); + in.markReaderIndex(); + ChannelBuffer buf = new NettyBackedChannelBuffer(in); + final ProtocolDetector.Result result = protocol.detector().detect(buf); + in.resetReaderIndex(); + switch (result.flag()) { + case UNRECOGNIZED: + continue; + case RECOGNIZED: + ChannelHandler localHandler = this.handlerMapper.getOrDefault(name, handler); + URL localURL = this.urlMapper.getOrDefault(name, url); + channel.setUrl(localURL); + NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler); + operator.setDetectResult(result); + protocol.configServerProtocolHandler(url, operator); + ctx.pipeline().remove(this); + case NEED_MORE_DATA: + return; + default: + return; + } + } + byte[] preface = new byte[in.readableBytes()]; + in.readBytes(preface); + Set supported = + url.getApplicationModel().getExtensionLoader(WireProtocol.class).getSupportedExtensions(); + LOGGER.error( + INTERNAL_ERROR, + "unknown error in remoting module", + "", + String.format( + "Can not recognize protocol from downstream=%s . " + "preface=%s protocols=%s", + ctx.channel().remoteAddress(), Bytes.bytes2hex(preface), supported)); + + // Unknown protocol; discard everything and close the connection. + in.clear(); + ctx.close(); + } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java index beb7e611420..13b1bbda874 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java @@ -31,11 +31,15 @@ import java.security.Provider; import java.security.Security; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE_STREAM; @@ -77,7 +81,16 @@ public static SslContext buildServerSslContext(ProviderCert providerConnectionCo safeCloseStream(serverPrivateKeyPathStream); } try { - return sslClientContextBuilder.sslProvider(findSslProvider()).build(); + return sslClientContextBuilder + .sslProvider(findSslProvider()) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2, + ApplicationProtocolNames.HTTP_1_1)) + .build(); } catch (SSLException e) { throw new IllegalStateException("Build SslSession failed.", e); } From 6016080e0cb78403589a9d4cb9ed3e4c5dab073a Mon Sep 17 00:00:00 2001 From: walklown Date: Wed, 8 May 2024 23:47:20 +0800 Subject: [PATCH 3/4] support Application-Layer Protocol Negotiation --- .../NettyPortUnificationServerHandler.java | 6 ++-- .../rpc/protocol/tri/TripleHttp2Protocol.java | 32 +++++++++---------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java index 2b6480741a9..f3c490376ad 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java @@ -121,7 +121,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t if (providerConnectionConfig != null && isSsl(in)) { enableSsl(ctx, providerConnectionConfig); } else { - invokeProtocol(ctx, url, channel, in); + detectProtocol(ctx, url, channel, in); } } @@ -140,7 +140,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); ByteBuf in = ctx.alloc().buffer(); - invokeProtocol(ctx, url, channel, in); + detectProtocol(ctx, url, channel, in); } }); p.remove(this); @@ -154,7 +154,7 @@ private boolean isSsl(ByteBuf buf) { return false; } - private void invokeProtocol(ChannelHandlerContext ctx, URL url, NettyChannel channel, ByteBuf in) { + private void detectProtocol(ChannelHandlerContext ctx, URL url, NettyChannel channel, ByteBuf in) { Set supportedProtocolNames = new HashSet<>(protocols.keySet()); supportedProtocolNames.retainAll(urlMapper.keySet()); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index a462d9e1aae..7a205b2ed9d 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -155,11 +155,20 @@ private void configurerHttp1Handlers(URL url, List handlers) { handlers.add(new ChannelHandlerPretender(new HttpServerUpgradeHandler( sourceCodec, protocol -> { - if (!AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - // Not upgrade request - return null; + if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { + Configuration config = + ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel()); + return new Http2ServerUpgradeCodec( + buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel()), + new HttpServerAfterUpgradeHandler(), + new HttpWriteQueueHandler(), + new FlushConsolidationHandler(64, true), + new TripleServerConnectionHandler(), + buildHttp2MultiplexHandler(url), + new TripleTailHandler()); } - return buildHttp2ServerUpgradeCodec(url); + // Not upgrade request + return null; }, Integer.MAX_VALUE))); // If the upgrade was successful, remove the message from the output list @@ -171,10 +180,8 @@ private void configurerHttp1Handlers(URL url, List handlers) { url, frameworkModel, DefaultHttp11ServerTransportListenerFactory.INSTANCE))); } - private Http2ServerUpgradeCodec buildHttp2ServerUpgradeCodec(URL url) { - Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel()); - final Http2FrameCodec codec = buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel()); - final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer() { + private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) { + return new Http2MultiplexHandler(new ChannelInitializer() { @Override protected void initChannel(Http2StreamChannel ch) { final ChannelPipeline p = ch.pipeline(); @@ -183,15 +190,6 @@ protected void initChannel(Http2StreamChannel ch) { url, frameworkModel, GenericHttp2ServerTransportListenerFactory.INSTANCE)); } }); - - return new Http2ServerUpgradeCodec( - codec, - new HttpServerAfterUpgradeHandler(), - new HttpWriteQueueHandler(), - new FlushConsolidationHandler(64, true), - new TripleServerConnectionHandler(), - handler, - new TripleTailHandler()); } private void configurerHttp2Handlers(URL url, List handlers) { From 383671982451301ded6fcd69fd09be6881703e3a Mon Sep 17 00:00:00 2001 From: walklown Date: Fri, 10 May 2024 22:46:05 +0800 Subject: [PATCH 4/4] support Application-Layer Protocol Negotiation --- .../dubbo/rpc/protocol/tri/TripleHttp2Protocol.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index 7a205b2ed9d..4e91dabb35c 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -195,15 +195,7 @@ protected void initChannel(Http2StreamChannel ch) { private void configurerHttp2Handlers(URL url, List handlers) { Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel()); final Http2FrameCodec codec = buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel()); - final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer() { - @Override - protected void initChannel(Http2StreamChannel ch) { - final ChannelPipeline p = ch.pipeline(); - p.addLast(new NettyHttp2FrameCodec()); - p.addLast(new NettyHttp2ProtocolSelectorHandler( - url, frameworkModel, GenericHttp2ServerTransportListenerFactory.INSTANCE)); - } - }); + final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url); handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler())); handlers.add(new ChannelHandlerPretender(codec)); handlers.add(new ChannelHandlerPretender(new FlushConsolidationHandler(64, true)));