Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2p outbound #515

Merged
merged 20 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 197 additions & 120 deletions modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions modP2p/src/org/aion/p2p/INodeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@

public interface INodeMgr {

void timeoutActive(final IP2pMgr _p2pMgr);

void moveInboundToActive(int _channelHashCode, final IP2pMgr _p2pMgr);

void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr);

void dropActive(int _nodeIdHash, final IP2pMgr _p2pMgr, String _reason);

void timeoutInbound(IP2pMgr _p2pMgr);

Map<Integer, INode> getOutboundNodes();

int activeNodesSize();
Expand All @@ -47,7 +43,7 @@ public interface INodeMgr {

void addTempNode(INode _n);

boolean hasActiveNode(int k);
boolean notActiveNode(int k);

void addOutboundNode(INode _n);

Expand Down Expand Up @@ -77,4 +73,5 @@ public interface INodeMgr {

Map<Integer, INode> getActiveNodesMap();

void timeoutCheck(IP2pMgr mgr);
}
2 changes: 0 additions & 2 deletions modP2p/src/org/aion/p2p/IP2pMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

/** @author chris */
public interface IP2pMgr {
// TODO: need refactor by passing the parameter in the later version to P2pMgr.
int txBroadCastRoute = (Ctrl.SYNC << 8) + 6; // ((Ver.V0 << 16) + (Ctrl.SYNC << 8) + 6);

/** @return Map */
Map<Integer, INode> getActiveNodes();
Expand Down
175 changes: 106 additions & 69 deletions modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,43 @@
*/
package org.aion.p2p.impl.comm;

import org.aion.p2p.INode;
import org.aion.p2p.INodeMgr;
import org.aion.p2p.IP2pMgr;

import java.util.*;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.aion.p2p.INode;
import org.aion.p2p.INodeMgr;
import org.aion.p2p.IP2pMgr;

public class NodeMgr implements INodeMgr {

private final static int TIMEOUT_INBOUND_NODES = 10000;

private static final int TIMEOUT_OUTBOUND_NODES = 20000;

private final int maxActiveNodes;

private final int maxTempNodes;

private static final Random random = new SecureRandom();

private final Set<String> seedIps = new HashSet<>();
// private final Set<String> multiActiveAllowIps = new HashSet<>(Arrays.asList(
//
// ));

private final BlockingQueue<INode> tempNodes = new LinkedBlockingQueue<>();
private final Map<Integer, INode> outboundNodes = new ConcurrentHashMap<>();
private final Map<Integer, INode> inboundNodes = new ConcurrentHashMap<>();
private final Map<Integer, INode> activeNodes = new ConcurrentHashMap<>();

public NodeMgr(int _maxActiveNodes, int _maxTempNodes){
public NodeMgr(int _maxActiveNodes, int _maxTempNodes) {
this.maxActiveNodes = _maxActiveNodes;
this.maxTempNodes = _maxTempNodes;
}
Expand Down Expand Up @@ -78,11 +87,11 @@ public String dumpNodeInfo(String selfShortId) {
StringBuilder sb = new StringBuilder();
sb.append("\n");
sb.append(String.format(
"======================================================================== p2p-status-%6s =========================================================================\n",
selfShortId));
"======================================================================== p2p-status-%6s =========================================================================\n",
selfShortId));
sb.append(String.format(
"temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n",
tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size()));
"temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n",
tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size()));
List<INode> sorted = new ArrayList<>(activeNodes.values());
if (sorted.size() > 0) {
sb.append("\n s"); // id & seed
Expand All @@ -95,27 +104,29 @@ public String dumpNodeInfo(String selfShortId) {
sb.append(" bv");
sb.append(" ci\n");
sb.append(
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------\n");
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------\n");
sorted.sort((n1, n2) -> {
int tdCompare = n2.getTotalDifficulty().compareTo(n1.getTotalDifficulty());
if (tdCompare == 0) {
Long n2Bn = n2.getBestBlockNumber();
Long n1Bn = n1.getBestBlockNumber();
return n2Bn.compareTo(n1Bn);
} else
} else {
return tdCompare;
}
});
for (INode n : sorted) {
try {
sb.append(String.format("id:%6s %c %16s %10d %64s %15s %5d %8s %15s %12s\n",
n.getIdShort(),
n.getIfFromBootList() ? 'y' : ' ', n.getTotalDifficulty().toString(10),
n.getBestBlockNumber(),
n.getBestBlockHash() == null ? "" : bytesToHex(n.getBestBlockHash()), n.getIpStr(),
n.getPort(),
n.getConnection(),
n.getBinaryVersion(),
n.getChannel().hashCode())
n.getIdShort(),
n.getIfFromBootList() ? 'y' : ' ', n.getTotalDifficulty().toString(10),
n.getBestBlockNumber(),
n.getBestBlockHash() == null ? "" : bytesToHex(n.getBestBlockHash()),
n.getIpStr(),
n.getPort(),
n.getConnection(),
n.getBinaryVersion(),
n.getChannel().hashCode())
);
} catch (Exception ex) {
ex.printStackTrace();
Expand Down Expand Up @@ -143,7 +154,7 @@ public boolean isSeedIp(String _ip) {
*/
@Override
public synchronized void addTempNode(final INode _n) {
if(tempNodes.size() < maxTempNodes) {
if (tempNodes.size() < maxTempNodes) {
tempNodes.add(_n);
}
}
Expand Down Expand Up @@ -174,8 +185,8 @@ public int activeNodesSize() {
}

@Override
public boolean hasActiveNode(int k) {
return activeNodes.containsKey(k);
public boolean notActiveNode(int k) {
return !activeNodes.containsKey(k);
}

@Override
Expand Down Expand Up @@ -204,42 +215,60 @@ public INode allocNode(String ip, int p0) {

@Override
public List<INode> getActiveNodesList() {
return new ArrayList(activeNodes.values());
return new ArrayList<>(activeNodes.values());
}

@Override
public HashMap getActiveNodesMap() {
synchronized (activeNodes) {
return new HashMap<>(activeNodes);
}
}

@Override
public Map<Integer, INode> getActiveNodesMap() {
synchronized(activeNodes){
return new HashMap(activeNodes);
public void timeoutCheck(IP2pMgr mgr) {
timeoutInbound(mgr);
timeoutOutBound(mgr);
timeoutActive(mgr);
}

private void timeoutOutBound(IP2pMgr mgr) {
Iterator<Integer> outboundIt = getOutboundNodes().keySet().iterator();
while (outboundIt.hasNext()) {
int outBound = outboundIt.next();
INode node = getOutboundNodes().get(outBound);
if (System.currentTimeMillis() - node.getTimestamp()
> TIMEOUT_OUTBOUND_NODES) {
mgr.closeSocket(
node.getChannel(),
"outbound-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr());
outboundIt.remove();
}
}
}

@Override
public INode getRandom() {
int nodesCount = activeNodes.size();
if (nodesCount > 0) {
Random r = new Random(System.currentTimeMillis());
List<Integer> keysArr = new ArrayList<>(activeNodes.keySet());
if (!activeNodes.isEmpty()) {
Object[] keysArr = activeNodes.keySet().toArray();
try {
int randomNodeKeyIndex = r.nextInt(keysArr.size());
int randomNodeKey = keysArr.get(randomNodeKeyIndex);
return this.getActiveNode(randomNodeKey);
return this.getActiveNode((Integer) keysArr[random.nextInt(keysArr.length)]);
} catch (IllegalArgumentException e) {
System.out.println("<p2p get-random-exception>");
return null;
}
} else
} else {
return null;
}
}

/**
* @param _ip String
* @return boolean
* @warning not thread safe
* helper function to check a specific ip a node associated with is
* is allowed to add to active list
* @warning not thread safe helper function to check a specific ip a node associated with is is
* allowed to add to active list
*/
private boolean activeIpAllow(String _ip){
private boolean activeIpAllow(String _ip) {
return true;
// enable this in case
// if(multiActiveAllowIps.contains(_ip))
Expand All @@ -254,20 +283,20 @@ private boolean activeIpAllow(String _ip){

/**
* @param _channelHashCode int
* @param _p2pMgr P2pMgr
* @param _p2pMgr P2pMgr
*/
// Attention: move node from container need sync to avoid node not belong to
// any container during transit.
public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr _p2pMgr) {
INode node = inboundNodes.remove(_channelHashCode);
if (node != null) {

if(activeNodes.size() >= maxActiveNodes){
if (activeNodes.size() >= maxActiveNodes) {
_p2pMgr.closeSocket(node.getChannel(), "inbound -> active, active full");
return;
}

if(node.getIdHash() == _p2pMgr.getSelfIdHash()){
if (node.getIdHash() == _p2pMgr.getSelfIdHash()) {
_p2pMgr.closeSocket(node.getChannel(), "inbound -> active, self-connected");
return;
}
Expand All @@ -293,30 +322,32 @@ public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr

/**
* @param _nodeIdHash int
* @param _shortId String
* @param _p2pMgr P2pMgr
* @param _shortId String
* @param _p2pMgr P2pMgr
*/
// Attention: move node from container need sync to avoid node not belong to
// any container during transit.
public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr) {
public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId,
final IP2pMgr _p2pMgr) {
INode node = outboundNodes.remove(_nodeIdHash);
if (node != null) {

if(activeNodes.size() >= maxActiveNodes){
if (activeNodes.size() >= maxActiveNodes) {
_p2pMgr.closeSocket(node.getChannel(), "outbound -> active, active full");
return;
}

if(node.getIdHash() == _p2pMgr.getSelfIdHash()){
if (node.getIdHash() == _p2pMgr.getSelfIdHash()) {
_p2pMgr.closeSocket(node.getChannel(), "outbound -> active, self-connected");
return;
}

node.setConnection("outbound");
INode previous = activeNodes.putIfAbsent(_nodeIdHash, node);
if (previous != null)
_p2pMgr.closeSocket(node.getChannel(), "outbound -> active, node " + previous.getIdShort() + " exits");
else {
if (previous != null) {
_p2pMgr.closeSocket(node.getChannel(),
"outbound -> active, node " + previous.getIdShort() + " exits");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor typo: it should read "exists" instead of "exits"

} else {
if (_p2pMgr.isShowLog()) {
System.out.println(
"<p2p outbound -> active node-id=" + _shortId + " ip=" + node.getIpStr()
Expand All @@ -326,10 +357,10 @@ public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId,
}
}

public void timeoutInbound(final IP2pMgr _p2pMgr) {
Iterator inboundIt = inboundNodes.keySet().iterator();
private void timeoutInbound(final IP2pMgr _p2pMgr) {
Iterator<Integer> inboundIt = inboundNodes.keySet().iterator();
while (inboundIt.hasNext()) {
int key = (int) inboundIt.next();
int key = inboundIt.next();
INode node = inboundNodes.get(key);
if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_INBOUND_NODES) {
_p2pMgr.closeSocket(node.getChannel(), "inbound-timeout ip=" + node.getIpStr());
Expand All @@ -338,28 +369,31 @@ public void timeoutInbound(final IP2pMgr _p2pMgr) {
}
}

public void timeoutActive(IP2pMgr _p2pMgr) {
private void timeoutActive(IP2pMgr _p2pMgr) {
long now = System.currentTimeMillis();

OptionalDouble average = activeNodes.values().stream().mapToLong(n -> now - n.getTimestamp()).average();
OptionalDouble average = activeNodes.values().stream()
.mapToLong(n -> now - n.getTimestamp()).average();
double timeout = average.orElse(4000) * 5;
timeout = Math.max(10000, Math.min(timeout, 60000));
if (_p2pMgr.isShowLog()) {
System.out.printf("<p2p average-delay=%.0fms>\n", average.orElse(0));
}

Iterator activeIt = activeNodes.keySet().iterator();
Iterator<Integer> activeIt = activeNodes.keySet().iterator();
while (activeIt.hasNext()) {
int key = (int) activeIt.next();
int key = activeIt.next();
INode node = getActiveNode(key);

if (now - node.getTimestamp() > timeout) {
_p2pMgr.closeSocket(node.getChannel(), "active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr());
_p2pMgr.closeSocket(node.getChannel(),
"active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr());
activeIt.remove();
}

if (!node.getChannel().isConnected()) {
_p2pMgr.closeSocket(node.getChannel(), "channel-already-closed node=" + node.getIdShort() + " ip=" + node.getIpStr());
_p2pMgr.closeSocket(node.getChannel(),
"channel-already-closed node=" + node.getIdShort() + " ip=" + node.getIpStr());
activeIt.remove();
}
}
Expand All @@ -380,23 +414,26 @@ public void dropActive(int nodeIdHash, final IP2pMgr _p2pMgr, String _reason) {
public void shutdown(final IP2pMgr _p2pMgr) {
try {

synchronized (outboundNodes){
outboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown outbound node=" + n.getIdShort() + " ip=" + n.getIpStr()));
synchronized (outboundNodes) {
outboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(),
"p2p-shutdown outbound node=" + n.getIdShort() + " ip=" + n.getIpStr()));
outboundNodes.clear();
}

synchronized (inboundNodes){
inboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown inbound ip=" + n.getIpStr()));
synchronized (inboundNodes) {
inboundNodes.forEach((k, n) -> _p2pMgr
.closeSocket(n.getChannel(), "p2p-shutdown inbound ip=" + n.getIpStr()));
inboundNodes.clear();
}

synchronized (activeNodes){
activeNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown active node=" + n.getIdShort() + " ip=" + n.getIpStr()));
synchronized (activeNodes) {
activeNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(),
"p2p-shutdown active node=" + n.getIdShort() + " ip=" + n.getIpStr()));
activeNodes.clear();
}

} catch (Exception e) {

e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be log the exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in the PR #518 93da2b6

}
}

Expand Down
Loading