Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): add strategy to acquire effective peer if needed #5088

Merged
merged 28 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
248496e
feat(net): add EffectiveService to get effective connection
317787106 Mar 16, 2023
cf37440
Merge branch 'feature/libp2p_v2.0' into feature/isolate
317787106 Mar 16, 2023
05e4232
fix(net): add ReasonCode BELOW_THAN_ME
317787106 Mar 17, 2023
df2840e
feature(net): add config node.enableEffectiveCheck; add one ReasonCode
317787106 Mar 20, 2023
db678c1
fix(net): simplify EffectiveCheckService; set nodeEffectiveCheckEnabl…
317787106 Mar 21, 2023
73b8809
fix(net): update libp2p to test-callback-v2
317787106 Mar 21, 2023
466cc87
fix(net): merge feature/libp2p_v2.0
317787106 Mar 21, 2023
fdb17bc
fix(net): resolve the bug that Lack size is increasing by time
317787106 Mar 22, 2023
891c15a
fix(net): optimize code
317787106 Mar 22, 2023
f7c93fa
fix(net): optimize EffectiveCheckService
317787106 Mar 22, 2023
d23fbb1
fix(net): change node.effectiveCheckEnable default to false
317787106 Mar 23, 2023
dd56bf0
fix(net): use get,add to replace set list in P2pConfig
317787106 Mar 24, 2023
7128fa9
fix(net): don't find new node if executor is closed
317787106 Mar 27, 2023
ae5dd9d
fix(net): remove isClosed, use executor to schedule
317787106 Mar 27, 2023
b417191
fix(net): remove isRunnig
317787106 Mar 27, 2023
6959bd4
fix(net):optimize EffectiveCheckService
317787106 Mar 28, 2023
66f1ea6
fix(net): delete changes in HandshakeService
317787106 Mar 28, 2023
8b23eb7
fix(net):optimize EffectiveCheckService
317787106 Mar 28, 2023
b1d5255
fix(net): optimize EffectiveCheckService
317787106 Mar 29, 2023
8b0f983
merge feature/libp2p_v2.0
317787106 Mar 29, 2023
6f9f5fc
fix(net): merge release_v4.7.2
317787106 Mar 29, 2023
272a16a
fix(net): update libp2p to test-v0.2.20
317787106 Mar 29, 2023
1f61d7f
fix(net): revert PeerConnection
317787106 Mar 29, 2023
bbc9e62
Merge branch 'release_v4.7.2' into feature/isolate
317787106 Mar 29, 2023
a4aaa80
fix(net): add test case EffectiveCheckServiceTest
317787106 Mar 29, 2023
287aa7a
fix(net): add test case for MessageStatistics
317787106 Mar 29, 2023
0ce3ad3
fix(net): add test case for MessageTest
317787106 Mar 30, 2023
29ac079
Merge branch 'release_v4.7.2' into feature/isolate
317787106 Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package org.tron.common.parameter;

import static org.tron.core.Constant.DYNAMIC_ENERGY_FACTOR_DECIMAL;

import com.beust.jcommander.Parameter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.Setter;
import org.quartz.CronExpression;
Expand Down Expand Up @@ -144,6 +141,9 @@ public class CommonParameter {
public boolean nodeDiscoveryPersist;
@Getter
@Setter
public boolean nodeEffectiveCheckEnable;
@Getter
@Setter
public int nodeConnectionTimeout;
@Getter
@Setter
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class Constant {
public static final String BLOCK_NEED_SYNC_CHECK = "block.needSyncCheck";
public static final String NODE_DISCOVERY_ENABLE = "node.discovery.enable";
public static final String NODE_DISCOVERY_PERSIST = "node.discovery.persist";
public static final String NODE_EFFECTIVE_CHECK_ENABLE = "node.effectiveCheckEnable";
public static final String NODE_CONNECTION_TIMEOUT = "node.connection.timeout";
public static final String NODE_FETCH_BLOCK_TIMEOUT = "node.fetchBlock.timeout";
public static final String NODE_CHANNEL_READ_TIMEOUT = "node.channel.read.timeout";
Expand Down
18 changes: 9 additions & 9 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public static void clearParam() {
PARAMETER.needSyncCheck = false;
PARAMETER.nodeDiscoveryEnable = false;
PARAMETER.nodeDiscoveryPersist = false;
PARAMETER.nodeEffectiveCheckEnable = false;
PARAMETER.nodeConnectionTimeout = 2000;
PARAMETER.activeNodes = new ArrayList<>();
PARAMETER.passiveNodes = new ArrayList<>();
Expand Down Expand Up @@ -559,6 +560,10 @@ public static void setParam(final String[] args, final String confFileName) {
config.hasPath(Constant.NODE_DISCOVERY_PERSIST)
&& config.getBoolean(Constant.NODE_DISCOVERY_PERSIST);

PARAMETER.nodeEffectiveCheckEnable =
config.hasPath(Constant.NODE_EFFECTIVE_CHECK_ENABLE)
&& config.getBoolean(Constant.NODE_EFFECTIVE_CHECK_ENABLE);

PARAMETER.nodeConnectionTimeout =
config.hasPath(Constant.NODE_CONNECTION_TIMEOUT)
? config.getInt(Constant.NODE_CONNECTION_TIMEOUT) * 1000
Expand Down Expand Up @@ -1222,9 +1227,6 @@ public static List<InetSocketAddress> getInetSocketAddress(
List<String> list = config.getStringList(path);
for (String configString : list) {
InetSocketAddress inetSocketAddress = NetUtil.parseInetSocketAddress(configString);
if (inetSocketAddress == null) {
continue;
}
if (filter) {
String ip = inetSocketAddress.getAddress().getHostAddress();
int port = inetSocketAddress.getPort();
Expand All @@ -1250,9 +1252,7 @@ public static List<InetAddress> getInetAddress(
List<String> list = config.getStringList(path);
for (String configString : list) {
InetSocketAddress inetSocketAddress = NetUtil.parseInetSocketAddress(configString);
if (inetSocketAddress != null) {
ret.add(inetSocketAddress.getAddress());
}
ret.add(inetSocketAddress.getAddress());
}
return ret;
}
Expand Down Expand Up @@ -1321,9 +1321,7 @@ private static List<InetSocketAddress> loadSeeds(final com.typesafe.config.Confi
if (PARAMETER.seedNodes != null && !PARAMETER.seedNodes.isEmpty()) {
for (String s : PARAMETER.seedNodes) {
InetSocketAddress inetSocketAddress = NetUtil.parseInetSocketAddress(s);
if (inetSocketAddress != null) {
inetSocketAddressList.add(inetSocketAddress);
}
inetSocketAddressList.add(inetSocketAddress);
}
} else {
inetSocketAddressList = getInetSocketAddress(config, Constant.SEED_NODE_IP_LIST, false);
Expand Down Expand Up @@ -1632,6 +1630,7 @@ public static void logConfig() {
logger.info("Bind IP: {}", parameter.getNodeDiscoveryBindIp());
logger.info("External IP: {}", parameter.getNodeExternalIp());
logger.info("Listen port: {}", parameter.getNodeListenPort());
logger.info("Node ipv6 enable: {}", parameter.isNodeEnableIpv6());
logger.info("Discover enable: {}", parameter.isNodeDiscoveryEnable());
logger.info("Active node size: {}", parameter.getActiveNodes().size());
logger.info("Passive node size: {}", parameter.getPassiveNodes().size());
Expand All @@ -1646,6 +1645,7 @@ public static void logConfig() {
logger.info("Trx reference block: {}", parameter.getTrxReferenceBlock());
logger.info("Open full tcp disconnect: {}", parameter.isOpenFullTcpDisconnect());
logger.info("Node detect enable: {}", parameter.isNodeDetectEnable());
logger.info("Node effective check enable: {}", parameter.isNodeEffectiveCheckEnable());
logger.info("Rate limiter global qps: {}", parameter.getRateLimiterGlobalQps());
logger.info("Rate limiter global ip qps: {}", parameter.getRateLimiterGlobalIpQps());
logger.info("************************ Backup config ************************");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.tron.core.net.messagehandler.TransactionsMsgHandler;
import org.tron.core.net.peer.PeerConnection;
import org.tron.core.net.peer.PeerManager;
import org.tron.core.net.service.effective.EffectiveCheckService;
import org.tron.core.net.service.handshake.HandshakeService;
import org.tron.core.net.service.keepalive.KeepAliveService;
import org.tron.p2p.P2pEventHandler;
Expand Down Expand Up @@ -78,6 +79,9 @@ public class P2pEventHandlerImpl extends P2pEventHandler {
@Autowired
private KeepAliveService keepAliveService;

@Autowired
private EffectiveCheckService effectiveCheckService;

private byte MESSAGE_MAX_TYPE = 127;

public P2pEventHandlerImpl() {
Expand All @@ -102,6 +106,7 @@ public synchronized void onDisconnect(Channel channel) {
if (peerConnection != null) {
peerConnection.onDisconnect();
}
effectiveCheckService.onDisconnect(channel.getInetSocketAddress());
}

@Override
Expand Down
16 changes: 10 additions & 6 deletions framework/src/main/java/org/tron/core/net/TronNetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.tron.core.net.peer.PeerManager;
import org.tron.core.net.peer.PeerStatusCheck;
import org.tron.core.net.service.adv.AdvService;
import org.tron.core.net.service.effective.EffectiveCheckService;
import org.tron.core.net.service.fetchblock.FetchBlockService;
import org.tron.core.net.service.nodepersist.NodePersistService;
import org.tron.core.net.service.relay.RelayService;
Expand Down Expand Up @@ -69,6 +70,9 @@ public class TronNetService {
@Autowired
private RelayService relayService;

@Autowired
private EffectiveCheckService effectiveCheckService;

private volatile boolean init;

private static void setP2pConfig(P2pConfig config) {
Expand All @@ -90,6 +94,7 @@ public void start() {
tronStatsManager.init();
PeerManager.init();
relayService.init();
effectiveCheckService.init();
logger.info("Net service start successfully");
} catch (Exception e) {
logger.error("Net service start failed", e);
Expand All @@ -108,6 +113,7 @@ public void close() {
peerStatusCheck.close();
transactionsMsgHandler.close();
fetchBlockService.close();
effectiveCheckService.close();
p2pService.close();
relayService.close();
logger.info("Net service closed successfully");
Expand Down Expand Up @@ -144,13 +150,11 @@ public static boolean hasIpv4Stack(Set<String> ipSet) {
private P2pConfig getConfig() {
List<InetSocketAddress> seeds = parameter.getSeedNode().getAddressList();
seeds.addAll(nodePersistService.dbRead());
for (InetSocketAddress inetSocketAddress : seeds) {
logger.debug("Seed InetSocketAddress: {}", inetSocketAddress);
}
logger.debug("Seed InetSocketAddress: {}", seeds);
P2pConfig config = new P2pConfig();
config.setSeedNodes(seeds);
config.setActiveNodes(parameter.getActiveNodes());
config.setTrustNodes(parameter.getPassiveNodes());
config.getSeedNodes().addAll(seeds);
config.getActiveNodes().addAll(parameter.getActiveNodes());
config.getTrustNodes().addAll(parameter.getPassiveNodes());
config.getActiveNodes().forEach(n -> config.getTrustNodes().add(n.getAddress()));
parameter.getFastForwardNodes().forEach(f -> config.getTrustNodes().add(f.getAddress()));
int maxConnections = parameter.getMaxConnections();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package org.tron.core.net.service.effective;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.core.config.args.Args;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.TronNetService;
import org.tron.core.net.peer.PeerConnection;
import org.tron.p2p.discover.Node;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
@Component
public class EffectiveCheckService {

@Getter
private final boolean isEffectiveCheck = Args.getInstance().isNodeEffectiveCheckEnable();
@Autowired
private TronNetDelegate tronNetDelegate;

private final Cache<InetSocketAddress, Boolean> nodesCache = CacheBuilder.newBuilder()
.initialCapacity(100)
.maximumSize(10000)
.expireAfterWrite(20, TimeUnit.MINUTES).build();
@Getter
@Setter
private volatile InetSocketAddress cur;
private final AtomicInteger count = new AtomicInteger(0);
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("effective-thread-%d").build());
private long MAX_HANDSHAKE_TIME = 60_000;

public void init() {
if (isEffectiveCheck) {
executor.scheduleWithFixedDelay(() -> {
try {
findEffectiveNode();
} catch (Exception e) {
logger.error("Check effective connection processing failed", e);
}
}, 60, 5, TimeUnit.SECONDS);
} else {
logger.info("EffectiveCheckService is disabled");
}
}

public void triggerNext() {
try {
executor.submit(this::findEffectiveNode);
} catch (Exception e) {
logger.warn("Submit effective service task failed, message:{}", e.getMessage());
}
}

public void close() {
if (executor != null) {
try {
executor.shutdown();
} catch (Exception e) {
logger.error("Exception in shutdown effective service worker, {}", e.getMessage());
}
}
}

public boolean isIsolateLand() {
return (int) tronNetDelegate.getActivePeer().stream()
.filter(PeerConnection::isNeedSyncFromUs)
.count() == tronNetDelegate.getActivePeer().size();
}

//try to find node which we can sync from
private void findEffectiveNode() {
if (!isIsolateLand()) {
if (count.get() > 0) {
logger.info("Success to verify effective node {}", cur);
resetCount();
}
return;
}

if (cur != null) {
tronNetDelegate.getActivePeer().forEach(p -> {
if (p.getInetSocketAddress().equals(cur)
&& System.currentTimeMillis() - p.getChannel().getStartTime() >= MAX_HANDSHAKE_TIME) {
// we encounter no effective connection again, so we disconnect with last used node
logger.info("Disconnect with {}", cur);
p.disconnect(ReasonCode.BELOW_THAN_ME);
}
});
logger.info("Thread is running");
return;
}

List<Node> tableNodes = TronNetService.getP2pService().getConnectableNodes();
tableNodes.sort(Comparator.comparingLong(node -> -node.getUpdateTime()));
Set<InetSocketAddress> usedAddressSet = new HashSet<>();
tronNetDelegate.getActivePeer().forEach(p -> usedAddressSet.add(p.getInetSocketAddress()));
Optional<Node> chosenNode = tableNodes.stream()
.filter(node -> nodesCache.getIfPresent(node.getPreferInetSocketAddress()) == null)
.filter(node -> !usedAddressSet.contains(node.getPreferInetSocketAddress()))
.filter(node -> !TronNetService.getP2pConfig().getActiveNodes()
.contains(node.getPreferInetSocketAddress()))
.findFirst();
if (!chosenNode.isPresent()) {
logger.warn("No available node to choose");
return;
}

count.incrementAndGet();
nodesCache.put(chosenNode.get().getPreferInetSocketAddress(), true);
cur = new InetSocketAddress(chosenNode.get().getPreferInetSocketAddress().getAddress(),
chosenNode.get().getPreferInetSocketAddress().getPort());

logger.info("Try to get effective connection by using {} at seq {}", cur, count.get());
TronNetService.getP2pService().connect(chosenNode.get(), future -> {
if (future.isCancelled()) {
// Connection attempt cancelled by user
cur = null;
} else if (!future.isSuccess()) {
// You might get a NullPointerException here because the future might not be completed yet.
logger.warn("Connect to chosen peer {} fail, cause:{}", cur, future.cause().getMessage());
future.channel().close();
cur = null;
triggerNext();
} else {
// Connection established successfully
}
});
}

private void resetCount() {
count.set(0);
}

public void onDisconnect(InetSocketAddress inetSocketAddress) {
if (inetSocketAddress.equals(cur)) {
logger.warn("Close chosen peer: {}", cur);
cur = null;
triggerNext();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.tron.core.net.message.handshake.HelloMessage;
import org.tron.core.net.peer.PeerConnection;
import org.tron.core.net.peer.PeerManager;
import org.tron.core.net.service.effective.EffectiveCheckService;
import org.tron.core.net.service.relay.RelayService;
import org.tron.p2p.discover.Node;
import org.tron.protos.Protocol.ReasonCode;
Expand All @@ -22,6 +23,9 @@ public class HandshakeService {
@Autowired
private RelayService relayService;

@Autowired
private EffectiveCheckService effectiveCheckService;

@Autowired
private ChainBaseManager chainBaseManager;

Expand Down Expand Up @@ -98,6 +102,15 @@ public void processHelloMessage(PeerConnection peer, HelloMessage msg) {
return;
}

if (msg.getHeadBlockId().getNum() < chainBaseManager.getHeadBlockId().getNum()
&& peer.getInetSocketAddress().equals(effectiveCheckService.getCur())) {
logger.info("Peer's head block {} is below than we, peer->{}, me->{}",
peer.getInetSocketAddress(), msg.getHeadBlockId().getNum(),
chainBaseManager.getHeadBlockId().getNum());
peer.disconnect(ReasonCode.BELOW_THAN_ME);
return;
}

peer.setHelloMessageReceive(msg);

peer.getChannel().updateAvgLatency(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,17 @@ private void addTcpMessage(Message msg, boolean flag) {
break;
case TRX:
if (flag) {
tronInMessage.add();
tronInTrx.add();
} else {
tronOutMessage.add();
tronOutTrx.add();
}
break;
case BLOCK:
if (flag) {
tronInBlock.add();
} else {
tronOutBlock.add();
}
tronOutBlock.add();
break;
default:
break;
Expand Down
Loading