Skip to content

Commit

Permalink
Merge pull request #515 from aionnetwork/p2p-outbound
Browse files Browse the repository at this point in the history
P2p outbound
  • Loading branch information
AionJayT authored Jun 13, 2018
2 parents f272658 + 9326849 commit 382531f
Show file tree
Hide file tree
Showing 16 changed files with 882 additions and 661 deletions.
317 changes: 197 additions & 120 deletions modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions modP2p/src/org/aion/p2p/INodeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@

public interface INodeMgr {

void timeoutActive(final IP2pMgr _p2pMgr);

void moveInboundToActive(int _channelHashCode, final IP2pMgr _p2pMgr);

void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr);

void dropActive(int _nodeIdHash, final IP2pMgr _p2pMgr, String _reason);

void timeoutInbound(IP2pMgr _p2pMgr);

Map<Integer, INode> getOutboundNodes();

int activeNodesSize();
Expand All @@ -47,7 +43,7 @@ public interface INodeMgr {

void addTempNode(INode _n);

boolean hasActiveNode(int k);
boolean notActiveNode(int k);

void addOutboundNode(INode _n);

Expand Down Expand Up @@ -77,4 +73,5 @@ public interface INodeMgr {

Map<Integer, INode> getActiveNodesMap();

void timeoutCheck(IP2pMgr mgr);
}
2 changes: 0 additions & 2 deletions modP2p/src/org/aion/p2p/IP2pMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

/** @author chris */
public interface IP2pMgr {
// TODO: need refactor by passing the parameter in the later version to P2pMgr.
int txBroadCastRoute = (Ctrl.SYNC << 8) + 6; // ((Ver.V0 << 16) + (Ctrl.SYNC << 8) + 6);

/** @return Map */
Map<Integer, INode> getActiveNodes();
Expand Down
175 changes: 106 additions & 69 deletions modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,43 @@
*/
package org.aion.p2p.impl.comm;

import org.aion.p2p.INode;
import org.aion.p2p.INodeMgr;
import org.aion.p2p.IP2pMgr;

import java.util.*;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.aion.p2p.INode;
import org.aion.p2p.INodeMgr;
import org.aion.p2p.IP2pMgr;

public class NodeMgr implements INodeMgr {

private final static int TIMEOUT_INBOUND_NODES = 10000;

private static final int TIMEOUT_OUTBOUND_NODES = 20000;

private final int maxActiveNodes;

private final int maxTempNodes;

private static final Random random = new SecureRandom();

private final Set<String> seedIps = new HashSet<>();
// private final Set<String> multiActiveAllowIps = new HashSet<>(Arrays.asList(
//
// ));

private final BlockingQueue<INode> tempNodes = new LinkedBlockingQueue<>();
private final Map<Integer, INode> outboundNodes = new ConcurrentHashMap<>();
private final Map<Integer, INode> inboundNodes = new ConcurrentHashMap<>();
private final Map<Integer, INode> activeNodes = new ConcurrentHashMap<>();

public NodeMgr(int _maxActiveNodes, int _maxTempNodes){
public NodeMgr(int _maxActiveNodes, int _maxTempNodes) {
this.maxActiveNodes = _maxActiveNodes;
this.maxTempNodes = _maxTempNodes;
}
Expand Down Expand Up @@ -78,11 +87,11 @@ public String dumpNodeInfo(String selfShortId) {
StringBuilder sb = new StringBuilder();
sb.append("\n");
sb.append(String.format(
"======================================================================== p2p-status-%6s =========================================================================\n",
selfShortId));
"======================================================================== p2p-status-%6s =========================================================================\n",
selfShortId));
sb.append(String.format(
"temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n",
tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size()));
"temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n",
tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size()));
List<INode> sorted = new ArrayList<>(activeNodes.values());
if (sorted.size() > 0) {
sb.append("\n s"); // id & seed
Expand All @@ -95,27 +104,29 @@ public String dumpNodeInfo(String selfShortId) {
sb.append(" bv");
sb.append(" ci\n");
sb.append(
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------\n");
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------\n");
sorted.sort((n1, n2) -> {
int tdCompare = n2.getTotalDifficulty().compareTo(n1.getTotalDifficulty());
if (tdCompare == 0) {
Long n2Bn = n2.getBestBlockNumber();
Long n1Bn = n1.getBestBlockNumber();
return n2Bn.compareTo(n1Bn);
} else
} else {
return tdCompare;
}
});
for (INode n : sorted) {
try {
sb.append(String.format("id:%6s %c %16s %10d %64s %15s %5d %8s %15s %12s\n",
n.getIdShort(),
n.getIfFromBootList() ? 'y' : ' ', n.getTotalDifficulty().toString(10),
n.getBestBlockNumber(),
n.getBestBlockHash() == null ? "" : bytesToHex(n.getBestBlockHash()), n.getIpStr(),
n.getPort(),
n.getConnection(),
n.getBinaryVersion(),
n.getChannel().hashCode())
n.getIdShort(),
n.getIfFromBootList() ? 'y' : ' ', n.getTotalDifficulty().toString(10),
n.getBestBlockNumber(),
n.getBestBlockHash() == null ? "" : bytesToHex(n.getBestBlockHash()),
n.getIpStr(),
n.getPort(),
n.getConnection(),
n.getBinaryVersion(),
n.getChannel().hashCode())
);
} catch (Exception ex) {
ex.printStackTrace();
Expand Down Expand Up @@ -143,7 +154,7 @@ public boolean isSeedIp(String _ip) {
*/
@Override
public synchronized void addTempNode(final INode _n) {
if(tempNodes.size() < maxTempNodes) {
if (tempNodes.size() < maxTempNodes) {
tempNodes.add(_n);
}
}
Expand Down Expand Up @@ -174,8 +185,8 @@ public int activeNodesSize() {
}

@Override
public boolean hasActiveNode(int k) {
return activeNodes.containsKey(k);
public boolean notActiveNode(int k) {
return !activeNodes.containsKey(k);
}

@Override
Expand Down Expand Up @@ -204,42 +215,60 @@ public INode allocNode(String ip, int p0) {

@Override
public List<INode> getActiveNodesList() {
return new ArrayList(activeNodes.values());
return new ArrayList<>(activeNodes.values());
}

@Override
public HashMap getActiveNodesMap() {
synchronized (activeNodes) {
return new HashMap<>(activeNodes);
}
}

@Override
public Map<Integer, INode> getActiveNodesMap() {
synchronized(activeNodes){
return new HashMap(activeNodes);
public void timeoutCheck(IP2pMgr mgr) {
timeoutInbound(mgr);
timeoutOutBound(mgr);
timeoutActive(mgr);
}

private void timeoutOutBound(IP2pMgr mgr) {
Iterator<Integer> outboundIt = getOutboundNodes().keySet().iterator();
while (outboundIt.hasNext()) {
int outBound = outboundIt.next();
INode node = getOutboundNodes().get(outBound);
if (System.currentTimeMillis() - node.getTimestamp()
> TIMEOUT_OUTBOUND_NODES) {
mgr.closeSocket(
node.getChannel(),
"outbound-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr());
outboundIt.remove();
}
}
}

@Override
public INode getRandom() {
int nodesCount = activeNodes.size();
if (nodesCount > 0) {
Random r = new Random(System.currentTimeMillis());
List<Integer> keysArr = new ArrayList<>(activeNodes.keySet());
if (!activeNodes.isEmpty()) {
Object[] keysArr = activeNodes.keySet().toArray();
try {
int randomNodeKeyIndex = r.nextInt(keysArr.size());
int randomNodeKey = keysArr.get(randomNodeKeyIndex);
return this.getActiveNode(randomNodeKey);
return this.getActiveNode((Integer) keysArr[random.nextInt(keysArr.length)]);
} catch (IllegalArgumentException e) {
System.out.println("<p2p get-random-exception>");
return null;
}
} else
} else {
return null;
}
}

/**
* @param _ip String
* @return boolean
* @warning not thread safe
* helper function to check a specific ip a node associated with is
* is allowed to add to active list
* @warning not thread safe helper function to check a specific ip a node associated with is is
* allowed to add to active list
*/
private boolean activeIpAllow(String _ip){
private boolean activeIpAllow(String _ip) {
return true;
// enable this in case
// if(multiActiveAllowIps.contains(_ip))
Expand All @@ -254,20 +283,20 @@ private boolean activeIpAllow(String _ip){

/**
* @param _channelHashCode int
* @param _p2pMgr P2pMgr
* @param _p2pMgr P2pMgr
*/
// Attention: move node from container need sync to avoid node not belong to
// any container during transit.
public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr _p2pMgr) {
INode node = inboundNodes.remove(_channelHashCode);
if (node != null) {

if(activeNodes.size() >= maxActiveNodes){
if (activeNodes.size() >= maxActiveNodes) {
_p2pMgr.closeSocket(node.getChannel(), "inbound -> active, active full");
return;
}

if(node.getIdHash() == _p2pMgr.getSelfIdHash()){
if (node.getIdHash() == _p2pMgr.getSelfIdHash()) {
_p2pMgr.closeSocket(node.getChannel(), "inbound -> active, self-connected");
return;
}
Expand All @@ -293,30 +322,32 @@ public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr

/**
* @param _nodeIdHash int
* @param _shortId String
* @param _p2pMgr P2pMgr
* @param _shortId String
* @param _p2pMgr P2pMgr
*/
// Attention: move node from container need sync to avoid node not belong to
// any container during transit.
public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr) {
public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId,
final IP2pMgr _p2pMgr) {
INode node = outboundNodes.remove(_nodeIdHash);
if (node != null) {

if(activeNodes.size() >= maxActiveNodes){
if (activeNodes.size() >= maxActiveNodes) {
_p2pMgr.closeSocket(node.getChannel(), "outbound -> active, active full");
return;
}

if(node.getIdHash() == _p2pMgr.getSelfIdHash()){
if (node.getIdHash() == _p2pMgr.getSelfIdHash()) {
_p2pMgr.closeSocket(node.getChannel(), "outbound -> active, self-connected");
return;
}

node.setConnection("outbound");
INode previous = activeNodes.putIfAbsent(_nodeIdHash, node);
if (previous != null)
_p2pMgr.closeSocket(node.getChannel(), "outbound -> active, node " + previous.getIdShort() + " exits");
else {
if (previous != null) {
_p2pMgr.closeSocket(node.getChannel(),
"outbound -> active, node " + previous.getIdShort() + " exits");
} else {
if (_p2pMgr.isShowLog()) {
System.out.println(
"<p2p outbound -> active node-id=" + _shortId + " ip=" + node.getIpStr()
Expand All @@ -326,10 +357,10 @@ public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId,
}
}

public void timeoutInbound(final IP2pMgr _p2pMgr) {
Iterator inboundIt = inboundNodes.keySet().iterator();
private void timeoutInbound(final IP2pMgr _p2pMgr) {
Iterator<Integer> inboundIt = inboundNodes.keySet().iterator();
while (inboundIt.hasNext()) {
int key = (int) inboundIt.next();
int key = inboundIt.next();
INode node = inboundNodes.get(key);
if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_INBOUND_NODES) {
_p2pMgr.closeSocket(node.getChannel(), "inbound-timeout ip=" + node.getIpStr());
Expand All @@ -338,28 +369,31 @@ public void timeoutInbound(final IP2pMgr _p2pMgr) {
}
}

public void timeoutActive(IP2pMgr _p2pMgr) {
private void timeoutActive(IP2pMgr _p2pMgr) {
long now = System.currentTimeMillis();

OptionalDouble average = activeNodes.values().stream().mapToLong(n -> now - n.getTimestamp()).average();
OptionalDouble average = activeNodes.values().stream()
.mapToLong(n -> now - n.getTimestamp()).average();
double timeout = average.orElse(4000) * 5;
timeout = Math.max(10000, Math.min(timeout, 60000));
if (_p2pMgr.isShowLog()) {
System.out.printf("<p2p average-delay=%.0fms>\n", average.orElse(0));
}

Iterator activeIt = activeNodes.keySet().iterator();
Iterator<Integer> activeIt = activeNodes.keySet().iterator();
while (activeIt.hasNext()) {
int key = (int) activeIt.next();
int key = activeIt.next();
INode node = getActiveNode(key);

if (now - node.getTimestamp() > timeout) {
_p2pMgr.closeSocket(node.getChannel(), "active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr());
_p2pMgr.closeSocket(node.getChannel(),
"active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr());
activeIt.remove();
}

if (!node.getChannel().isConnected()) {
_p2pMgr.closeSocket(node.getChannel(), "channel-already-closed node=" + node.getIdShort() + " ip=" + node.getIpStr());
_p2pMgr.closeSocket(node.getChannel(),
"channel-already-closed node=" + node.getIdShort() + " ip=" + node.getIpStr());
activeIt.remove();
}
}
Expand All @@ -380,23 +414,26 @@ public void dropActive(int nodeIdHash, final IP2pMgr _p2pMgr, String _reason) {
public void shutdown(final IP2pMgr _p2pMgr) {
try {

synchronized (outboundNodes){
outboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown outbound node=" + n.getIdShort() + " ip=" + n.getIpStr()));
synchronized (outboundNodes) {
outboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(),
"p2p-shutdown outbound node=" + n.getIdShort() + " ip=" + n.getIpStr()));
outboundNodes.clear();
}

synchronized (inboundNodes){
inboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown inbound ip=" + n.getIpStr()));
synchronized (inboundNodes) {
inboundNodes.forEach((k, n) -> _p2pMgr
.closeSocket(n.getChannel(), "p2p-shutdown inbound ip=" + n.getIpStr()));
inboundNodes.clear();
}

synchronized (activeNodes){
activeNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown active node=" + n.getIdShort() + " ip=" + n.getIpStr()));
synchronized (activeNodes) {
activeNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(),
"p2p-shutdown active node=" + n.getIdShort() + " ip=" + n.getIpStr()));
activeNodes.clear();
}

} catch (Exception e) {

e.printStackTrace();
}
}

Expand Down
Loading

0 comments on commit 382531f

Please sign in to comment.