Skip to content

Commit

Permalink
Fix port unification issue in triple and dubbo protocol (#13696)
Browse files Browse the repository at this point in the history
* Fix issue of tri Portunification

* style fix
  • Loading branch information
namelessssssssssss authored Jan 31, 2024
1 parent f2478b4 commit 29b0c72
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
package org.apache.dubbo.remoting.api.pu;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.transport.AbstractServer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class AbstractPortUnificationServer extends AbstractServer {
private final List<WireProtocol> protocols;

/**
* extension name -> activate WireProtocol
*/
private final Map<String, WireProtocol> protocols;

/*
protocol name --> URL object
Expand All @@ -44,12 +50,13 @@ public abstract class AbstractPortUnificationServer extends AbstractServer {

public AbstractPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
this.protocols = url.getOrDefaultFrameworkModel()
.getExtensionLoader(WireProtocol.class)
.getActivateExtension(url, new String[0]);
ExtensionLoader<WireProtocol> extensionLoader =
url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class);
this.protocols = extensionLoader.getActivateExtension(url, new String[0]).stream()
.collect(Collectors.toConcurrentMap(extensionLoader::getExtensionName, Function.identity()));
}

public List<WireProtocol> getProtocols() {
public Map<String, WireProtocol> getProtocols() {
return protocols;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected void doClose() {
} catch (Throwable e) {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}
for (WireProtocol protocol : getProtocols()) {
for (WireProtocol protocol : getProtocols().values()) {
protocol.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void doClose() {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}

for (WireProtocol protocol : getProtocols()) {
for (WireProtocol protocol : getProtocols().values()) {
protocol.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import javax.net.ssl.SSLSession;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -51,14 +52,14 @@ public class NettyPortUnificationServerHandler extends ByteToMessageDecoder {
private final URL url;
private final ChannelHandler handler;
private final boolean detectSsl;
private final List<WireProtocol> protocols;
private final Map<String, WireProtocol> protocols;
private final Map<String, URL> urlMapper;
private final Map<String, ChannelHandler> handlerMapper;

public NettyPortUnificationServerHandler(
URL url,
boolean detectSsl,
List<WireProtocol> protocols,
Map<String, WireProtocol> protocols,
ChannelHandler handler,
Map<String, URL> urlMapper,
Map<String, ChannelHandler> handlerMapper) {
Expand Down Expand Up @@ -118,7 +119,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
if (providerConnectionConfig != null && isSsl(in)) {
enableSsl(ctx, providerConnectionConfig);
} else {
for (final WireProtocol protocol : protocols) {
Set<String> supportedProtocolNames = new HashSet<>(protocols.keySet());
supportedProtocolNames.retainAll(urlMapper.keySet());

for (final String name : supportedProtocolNames) {
WireProtocol protocol = protocols.get(name);
in.markReaderIndex();
ChannelBuffer buf = new NettyBackedChannelBuffer(in);
final ProtocolDetector.Result result = protocol.detector().detect(buf);
Expand All @@ -127,11 +132,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
case UNRECOGNIZED:
continue;
case RECOGNIZED:
String protocolName = url.getOrDefaultFrameworkModel()
.getExtensionLoader(WireProtocol.class)
.getExtensionName(protocol);
ChannelHandler localHandler = this.handlerMapper.getOrDefault(protocolName, handler);
URL localURL = this.urlMapper.getOrDefault(protocolName, url);
ChannelHandler localHandler = this.handlerMapper.getOrDefault(name, handler);
URL localURL = this.urlMapper.getOrDefault(name, url);
channel.setUrl(localURL);
NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
protocol.configServerProtocolHandler(url, operator);
Expand Down

0 comments on commit 29b0c72

Please sign in to comment.