diff --git a/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java b/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java index 920f04776e..2fc855c505 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java @@ -1,21 +1,54 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ package org.aion.zero.impl.sync; import static com.google.common.truth.Truth.assertThat; import java.math.BigInteger; import java.nio.channels.SocketChannel; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.aion.crypto.ECKey; import org.aion.crypto.ECKeyFac; import org.aion.crypto.HashUtil; -import org.aion.p2p.*; +import org.aion.p2p.Handler; +import org.aion.p2p.INode; +import org.aion.p2p.IP2pMgr; +import org.aion.p2p.IPeerMetric; +import org.aion.p2p.Msg; import org.aion.zero.impl.StandaloneBlockchain; import org.aion.zero.impl.sync.handler.BlockPropagationHandler; import org.aion.zero.impl.types.AionBlock; import org.junit.Test; -/** Unit tests for block propagation */ +/** + * Unit tests for block propagation + */ public class BlockPropagationTest { private static class NodeMock implements INode { @@ -50,7 +83,8 @@ public BigInteger getTotalDifficulty() { @Override public void updateStatus( - long _bestBlockNumber, byte[] _bestBlockHash, BigInteger _totalDifficulty) {} + long _bestBlockNumber, byte[] _bestBlockHash, BigInteger _totalDifficulty) { + } @Override public byte[] getIp() { @@ -83,40 +117,64 @@ public String getBinaryVersion() { } @Override - public void setPort(int _port) { throw new IllegalStateException("not implemented"); } + public void setPort(int _port) { + throw new IllegalStateException("not implemented"); + } @Override - public void setConnection(String _connection) { throw new IllegalStateException("not implemented"); } + public void setConnection(String _connection) { + throw new IllegalStateException("not implemented"); + } @Override - public IPeerMetric getPeerMetric() { throw new IllegalStateException("not implemented"); } + public IPeerMetric getPeerMetric() { + throw new IllegalStateException("not implemented"); + } @Override - public void refreshTimestamp() { throw new IllegalStateException("not implemented"); } + public void refreshTimestamp() { + throw new IllegalStateException("not implemented"); + } @Override - public void setChannel(SocketChannel _channel) { throw new IllegalStateException("not implemented"); } + public void setChannel(SocketChannel _channel) { + throw new IllegalStateException("not implemented"); + } @Override - public void setId(byte[] _id) { throw new IllegalStateException("not implemented"); } + public void setId(byte[] _id) { + throw new IllegalStateException("not implemented"); + } @Override - public void setBinaryVersion(String _revision) { throw new IllegalStateException("not implemented"); } + public void setBinaryVersion(String _revision) { + throw new IllegalStateException("not implemented"); + } @Override - public boolean getIfFromBootList() { throw new IllegalStateException("not implemented"); } + public boolean getIfFromBootList() { + throw new IllegalStateException("not implemented"); + } @Override - public byte[] getBestBlockHash() { throw new IllegalStateException("not implemented"); } + public byte[] getBestBlockHash() { + throw new IllegalStateException("not implemented"); + } @Override - public String getConnection() { throw new IllegalStateException("not implemented"); } + public String getConnection() { + throw new IllegalStateException("not implemented"); + } @Override - public SocketChannel getChannel() { throw new IllegalStateException("not implemented"); } + public SocketChannel getChannel() { + throw new IllegalStateException("not implemented"); + } @Override - public void setFromBootList(boolean _ifBoot) { throw new IllegalStateException("not implemented"); } + public void setFromBootList(boolean _ifBoot) { + throw new IllegalStateException("not implemented"); + } } private static class P2pMock implements IP2pMgr { @@ -133,10 +191,12 @@ public Map getActiveNodes() { } @Override - public void shutdown() {} + public void shutdown() { + } @Override - public void run() {} + public void run() { + } @Override public List versions() { @@ -149,10 +209,12 @@ public int chainId() { } @Override - public void errCheck(int nodeIdHashcode, String _displayId) {} + public void errCheck(int nodeIdHashcode, String _displayId) { + } @Override - public void register(List _hs) {} + public void register(List _hs) { + } @Override public INode getRandom() { @@ -160,7 +222,8 @@ public INode getRandom() { } @Override - public void send(int _id, String s, Msg _msg) {} + public void send(int _id, String _displayId, Msg _msg) { + } @Override public boolean isShowLog() { @@ -168,7 +231,8 @@ public boolean isShowLog() { } @Override - public void closeSocket(SocketChannel _sc, String _reason) {} + public void closeSocket(SocketChannel _sc, String _reason) { + } @Override public int getSelfIdHash() { @@ -176,7 +240,9 @@ public int getSelfIdHash() { } @Override - public void dropActive(int _nodeIdHash, String _reason) { throw new IllegalStateException("not implemented."); } + public void dropActive(int _nodeIdHash, String _reason) { + throw new IllegalStateException("not implemented."); + } @Override public void configChannel(SocketChannel _channel) { @@ -184,20 +250,27 @@ public void configChannel(SocketChannel _channel) { } @Override - public int getMaxActiveNodes() { throw new IllegalStateException("not implemented."); } + public int getMaxActiveNodes() { + throw new IllegalStateException("not implemented."); + } @Override - public boolean isSyncSeedsOnly() { throw new IllegalStateException("not implemented."); } + public boolean isSyncSeedsOnly() { + return false; + } @Override public int getMaxTempNodes() { throw new IllegalStateException("not implemented."); } @Override - public boolean validateNode(INode _node) { throw new IllegalStateException("not implemented."); } + public boolean validateNode(INode _node) { + throw new IllegalStateException("not implemented."); + } @Override - public int getSelfNetId() { throw new IllegalStateException("not implemented."); } - + public int getSelfNetId() { + throw new IllegalStateException("not implemented."); + } } private static List generateDefaultAccounts() { @@ -208,19 +281,21 @@ private static List generateDefaultAccounts() { return accs; } - /** Test that we don't propagate back to the sender */ + /** + * Test that we don't propagate back to the sender + */ @Test public void testBlockPropagationReceiver() { List accounts = generateDefaultAccounts(); StandaloneBlockchain.Bundle bundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); AionBlock block = - bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); + bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); assertThat(block.getNumber()).isEqualTo(1); byte[] sender = HashUtil.h256("node1".getBytes()); @@ -230,29 +305,29 @@ public void testBlockPropagationReceiver() { node.put(1, senderMock); P2pMock p2pMock = - new P2pMock(node) { - @Override - public void send(int _nodeId, String s, Msg _msg) { - throw new RuntimeException("should not have called send"); - } - }; + new P2pMock(node) { + @Override + public void send(int _nodeId, String s, Msg _msg) { + throw new RuntimeException("should not have called send"); + } + }; StandaloneBlockchain.Bundle anotherBundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); BlockPropagationHandler handler = - new BlockPropagationHandler( - 1024, - anotherBundle.bc, // NOTE: not the same blockchain that generated the block - p2pMock, - anotherBundle.bc.getBlockHeaderValidator(), - false); + new BlockPropagationHandler( + 1024, + anotherBundle.bc, // NOTE: not the same blockchain that generated the block + p2pMock, + anotherBundle.bc.getBlockHeaderValidator(), + false); assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) - .isEqualTo(BlockPropagationHandler.PropStatus.CONNECTED); + .isEqualTo(BlockPropagationHandler.PropStatus.CONNECTED); } // given two peers, and one sends you a new block, propagate to the other @@ -261,13 +336,13 @@ public void testPropagateBlockToPeer() { List accounts = generateDefaultAccounts(); StandaloneBlockchain.Bundle bundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); AionBlock block = - bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); + bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); assertThat(block.getNumber()).isEqualTo(1); byte[] sender = HashUtil.h256("node1".getBytes()); @@ -282,20 +357,21 @@ public void testPropagateBlockToPeer() { AtomicInteger times = new AtomicInteger(); P2pMock p2pMock = - new P2pMock(node) { - @Override - public void send(int _nodeId, String s, Msg _msg) { - if (_nodeId != receiverMock.getIdHash()) - throw new RuntimeException("should only send to receiver"); - times.getAndIncrement(); + new P2pMock(node) { + @Override + public void send(int _nodeId, String s, Msg _msg) { + if (_nodeId != receiverMock.getIdHash()) { + throw new RuntimeException("should only send to receiver"); } - }; + times.getAndIncrement(); + } + }; StandaloneBlockchain.Bundle anotherBundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); assertThat(bundle.bc.genesis.getHash()).isEqualTo(anotherBundle.bc.genesis.getHash()); assertThat(block.getParentHash()).isEqualTo(bundle.bc.genesis.getHash()); @@ -305,16 +381,16 @@ public void send(int _nodeId, String s, Msg _msg) { assertThat(bestBlock.getHash()).isEqualTo(anotherBundle.bc.genesis.getHash()); BlockPropagationHandler handler = - new BlockPropagationHandler( - 1024, - anotherBundle.bc, // NOTE: not the same blockchain that generated the block - p2pMock, - anotherBundle.bc.getBlockHeaderValidator(), - false); + new BlockPropagationHandler( + 1024, + anotherBundle.bc, // NOTE: not the same blockchain that generated the block + p2pMock, + anotherBundle.bc.getBlockHeaderValidator(), + false); // block is processed assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) - .isEqualTo(BlockPropagationHandler.PropStatus.PROP_CONNECTED); + .isEqualTo(BlockPropagationHandler.PropStatus.PROP_CONNECTED); assertThat(times.get()).isEqualTo(1); } @@ -323,13 +399,13 @@ public void testIgnoreSameBlock() { List accounts = generateDefaultAccounts(); StandaloneBlockchain.Bundle bundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); AionBlock block = - bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); + bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); assertThat(block.getNumber()).isEqualTo(1); byte[] sender = HashUtil.h256("node1".getBytes()); @@ -344,36 +420,37 @@ public void testIgnoreSameBlock() { AtomicInteger times = new AtomicInteger(); P2pMock p2pMock = - new P2pMock(node) { - @Override - public void send(int _nodeId, String s, Msg _msg) { - if (_nodeId != receiverMock.getIdHash()) - throw new RuntimeException("should only send to receiver"); - times.getAndIncrement(); + new P2pMock(node) { + @Override + public void send(int _nodeId, String s, Msg _msg) { + if (_nodeId != receiverMock.getIdHash()) { + throw new RuntimeException("should only send to receiver"); } - }; + times.getAndIncrement(); + } + }; StandaloneBlockchain.Bundle anotherBundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); assertThat(bundle.bc.genesis.getHash()).isEqualTo(anotherBundle.bc.genesis.getHash()); BlockPropagationHandler handler = - new BlockPropagationHandler( - 1024, - anotherBundle.bc, // NOTE: not the same blockchain that generated the block - p2pMock, - anotherBundle.bc.getBlockHeaderValidator(), - false); + new BlockPropagationHandler( + 1024, + anotherBundle.bc, // NOTE: not the same blockchain that generated the block + p2pMock, + anotherBundle.bc.getBlockHeaderValidator(), + false); // block is processed assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) - .isEqualTo(BlockPropagationHandler.PropStatus.PROP_CONNECTED); + .isEqualTo(BlockPropagationHandler.PropStatus.PROP_CONNECTED); assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) - .isEqualTo(BlockPropagationHandler.PropStatus.DROPPED); + .isEqualTo(BlockPropagationHandler.PropStatus.DROPPED); assertThat(times.get()).isEqualTo(1); } @@ -383,13 +460,13 @@ public void testIgnoreSelfBlock() { List accounts = generateDefaultAccounts(); StandaloneBlockchain.Bundle bundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); AionBlock block = - bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); + bundle.bc.createNewBlock(bundle.bc.getGenesis(), Collections.EMPTY_LIST, true); assertThat(block.getNumber()).isEqualTo(1); byte[] sender = HashUtil.h256("node1".getBytes()); @@ -400,26 +477,26 @@ public void testIgnoreSelfBlock() { AtomicInteger sendCount = new AtomicInteger(); P2pMock p2pMock = - new P2pMock(node) { - @Override - public void send(int _nodeId, String s, Msg _msg) { - sendCount.getAndIncrement(); - } - }; + new P2pMock(node) { + @Override + public void send(int _nodeId, String s, Msg _msg) { + sendCount.getAndIncrement(); + } + }; StandaloneBlockchain.Bundle anotherBundle = - new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); BlockPropagationHandler handler = - new BlockPropagationHandler( - 1024, - anotherBundle.bc, // NOTE: not the same blockchain that generated the block - p2pMock, - anotherBundle.bc.getBlockHeaderValidator(), - false); + new BlockPropagationHandler( + 1024, + anotherBundle.bc, // NOTE: not the same blockchain that generated the block + p2pMock, + anotherBundle.bc.getBlockHeaderValidator(), + false); // pretend that we propagate the new block handler.propagateNewBlock(block); // send counter incremented @@ -428,7 +505,7 @@ public void send(int _nodeId, String s, Msg _msg) { // so our blockchain should view this block as a new block // therefore if the filter fails, this block will actually be CONNECTED assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) - .isEqualTo(BlockPropagationHandler.PropStatus.DROPPED); + .isEqualTo(BlockPropagationHandler.PropStatus.DROPPED); // we expect the counter to be incremented once (on propagation) assertThat(sendCount.get()).isEqualTo(1); diff --git a/modP2p/src/org/aion/p2p/INodeMgr.java b/modP2p/src/org/aion/p2p/INodeMgr.java index 0938d84324..cb7c831f20 100644 --- a/modP2p/src/org/aion/p2p/INodeMgr.java +++ b/modP2p/src/org/aion/p2p/INodeMgr.java @@ -27,16 +27,12 @@ public interface INodeMgr { - void timeoutActive(final IP2pMgr _p2pMgr); - void moveInboundToActive(int _channelHashCode, final IP2pMgr _p2pMgr); void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr); void dropActive(int _nodeIdHash, final IP2pMgr _p2pMgr, String _reason); - void timeoutInbound(IP2pMgr _p2pMgr); - Map getOutboundNodes(); int activeNodesSize(); @@ -47,7 +43,7 @@ public interface INodeMgr { void addTempNode(INode _n); - boolean hasActiveNode(int k); + boolean notActiveNode(int k); void addOutboundNode(INode _n); @@ -77,4 +73,5 @@ public interface INodeMgr { Map getActiveNodesMap(); + void timeoutCheck(IP2pMgr mgr); } diff --git a/modP2p/src/org/aion/p2p/IP2pMgr.java b/modP2p/src/org/aion/p2p/IP2pMgr.java index f6c0f40295..186756bf88 100644 --- a/modP2p/src/org/aion/p2p/IP2pMgr.java +++ b/modP2p/src/org/aion/p2p/IP2pMgr.java @@ -29,8 +29,6 @@ /** @author chris */ public interface IP2pMgr { - // TODO: need refactor by passing the parameter in the later version to P2pMgr. - int txBroadCastRoute = (Ctrl.SYNC << 8) + 6; // ((Ver.V0 << 16) + (Ctrl.SYNC << 8) + 6); /** @return Map */ Map getActiveNodes(); diff --git a/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java b/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java index 3dd908915d..60b02452d0 100644 --- a/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java +++ b/modP2pImpl/src/org/aion/p2p/impl/comm/NodeMgr.java @@ -22,34 +22,43 @@ */ package org.aion.p2p.impl.comm; -import org.aion.p2p.INode; -import org.aion.p2p.INodeMgr; -import org.aion.p2p.IP2pMgr; - -import java.util.*; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.OptionalDouble; +import java.util.Random; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import org.aion.p2p.INode; +import org.aion.p2p.INodeMgr; +import org.aion.p2p.IP2pMgr; public class NodeMgr implements INodeMgr { private final static int TIMEOUT_INBOUND_NODES = 10000; + private static final int TIMEOUT_OUTBOUND_NODES = 20000; + private final int maxActiveNodes; private final int maxTempNodes; + private static final Random random = new SecureRandom(); + private final Set seedIps = new HashSet<>(); - // private final Set multiActiveAllowIps = new HashSet<>(Arrays.asList( - // - // )); private final BlockingQueue tempNodes = new LinkedBlockingQueue<>(); private final Map outboundNodes = new ConcurrentHashMap<>(); private final Map inboundNodes = new ConcurrentHashMap<>(); private final Map activeNodes = new ConcurrentHashMap<>(); - public NodeMgr(int _maxActiveNodes, int _maxTempNodes){ + public NodeMgr(int _maxActiveNodes, int _maxTempNodes) { this.maxActiveNodes = _maxActiveNodes; this.maxTempNodes = _maxTempNodes; } @@ -78,11 +87,11 @@ public String dumpNodeInfo(String selfShortId) { StringBuilder sb = new StringBuilder(); sb.append("\n"); sb.append(String.format( - "======================================================================== p2p-status-%6s =========================================================================\n", - selfShortId)); + "======================================================================== p2p-status-%6s =========================================================================\n", + selfShortId)); sb.append(String.format( - "temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n", - tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size())); + "temp[%3d] inbound[%3d] outbound[%3d] active[%3d] s - seed node, td - total difficulty, # - block number, bv - binary version\n", + tempNodesSize(), inboundNodes.size(), outboundNodes.size(), activeNodes.size())); List sorted = new ArrayList<>(activeNodes.values()); if (sorted.size() > 0) { sb.append("\n s"); // id & seed @@ -95,27 +104,29 @@ public String dumpNodeInfo(String selfShortId) { sb.append(" bv"); sb.append(" ci\n"); sb.append( - "--------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"); + "--------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"); sorted.sort((n1, n2) -> { int tdCompare = n2.getTotalDifficulty().compareTo(n1.getTotalDifficulty()); if (tdCompare == 0) { Long n2Bn = n2.getBestBlockNumber(); Long n1Bn = n1.getBestBlockNumber(); return n2Bn.compareTo(n1Bn); - } else + } else { return tdCompare; + } }); for (INode n : sorted) { try { sb.append(String.format("id:%6s %c %16s %10d %64s %15s %5d %8s %15s %12s\n", - n.getIdShort(), - n.getIfFromBootList() ? 'y' : ' ', n.getTotalDifficulty().toString(10), - n.getBestBlockNumber(), - n.getBestBlockHash() == null ? "" : bytesToHex(n.getBestBlockHash()), n.getIpStr(), - n.getPort(), - n.getConnection(), - n.getBinaryVersion(), - n.getChannel().hashCode()) + n.getIdShort(), + n.getIfFromBootList() ? 'y' : ' ', n.getTotalDifficulty().toString(10), + n.getBestBlockNumber(), + n.getBestBlockHash() == null ? "" : bytesToHex(n.getBestBlockHash()), + n.getIpStr(), + n.getPort(), + n.getConnection(), + n.getBinaryVersion(), + n.getChannel().hashCode()) ); } catch (Exception ex) { ex.printStackTrace(); @@ -143,7 +154,7 @@ public boolean isSeedIp(String _ip) { */ @Override public synchronized void addTempNode(final INode _n) { - if(tempNodes.size() < maxTempNodes) { + if (tempNodes.size() < maxTempNodes) { tempNodes.add(_n); } } @@ -174,8 +185,8 @@ public int activeNodesSize() { } @Override - public boolean hasActiveNode(int k) { - return activeNodes.containsKey(k); + public boolean notActiveNode(int k) { + return !activeNodes.containsKey(k); } @Override @@ -204,42 +215,60 @@ public INode allocNode(String ip, int p0) { @Override public List getActiveNodesList() { - return new ArrayList(activeNodes.values()); + return new ArrayList<>(activeNodes.values()); + } + + @Override + public HashMap getActiveNodesMap() { + synchronized (activeNodes) { + return new HashMap<>(activeNodes); + } } @Override - public Map getActiveNodesMap() { - synchronized(activeNodes){ - return new HashMap(activeNodes); + public void timeoutCheck(IP2pMgr mgr) { + timeoutInbound(mgr); + timeoutOutBound(mgr); + timeoutActive(mgr); + } + + private void timeoutOutBound(IP2pMgr mgr) { + Iterator outboundIt = getOutboundNodes().keySet().iterator(); + while (outboundIt.hasNext()) { + int outBound = outboundIt.next(); + INode node = getOutboundNodes().get(outBound); + if (System.currentTimeMillis() - node.getTimestamp() + > TIMEOUT_OUTBOUND_NODES) { + mgr.closeSocket( + node.getChannel(), + "outbound-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr()); + outboundIt.remove(); + } } } @Override public INode getRandom() { - int nodesCount = activeNodes.size(); - if (nodesCount > 0) { - Random r = new Random(System.currentTimeMillis()); - List keysArr = new ArrayList<>(activeNodes.keySet()); + if (!activeNodes.isEmpty()) { + Object[] keysArr = activeNodes.keySet().toArray(); try { - int randomNodeKeyIndex = r.nextInt(keysArr.size()); - int randomNodeKey = keysArr.get(randomNodeKeyIndex); - return this.getActiveNode(randomNodeKey); + return this.getActiveNode((Integer) keysArr[random.nextInt(keysArr.length)]); } catch (IllegalArgumentException e) { System.out.println(""); return null; } - } else + } else { return null; + } } /** * @param _ip String * @return boolean - * @warning not thread safe - * helper function to check a specific ip a node associated with is - * is allowed to add to active list + * @warning not thread safe helper function to check a specific ip a node associated with is is + * allowed to add to active list */ - private boolean activeIpAllow(String _ip){ + private boolean activeIpAllow(String _ip) { return true; // enable this in case // if(multiActiveAllowIps.contains(_ip)) @@ -254,7 +283,7 @@ private boolean activeIpAllow(String _ip){ /** * @param _channelHashCode int - * @param _p2pMgr P2pMgr + * @param _p2pMgr P2pMgr */ // Attention: move node from container need sync to avoid node not belong to // any container during transit. @@ -262,12 +291,12 @@ public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr INode node = inboundNodes.remove(_channelHashCode); if (node != null) { - if(activeNodes.size() >= maxActiveNodes){ + if (activeNodes.size() >= maxActiveNodes) { _p2pMgr.closeSocket(node.getChannel(), "inbound -> active, active full"); return; } - if(node.getIdHash() == _p2pMgr.getSelfIdHash()){ + if (node.getIdHash() == _p2pMgr.getSelfIdHash()) { _p2pMgr.closeSocket(node.getChannel(), "inbound -> active, self-connected"); return; } @@ -293,30 +322,32 @@ public synchronized void moveInboundToActive(int _channelHashCode, final IP2pMgr /** * @param _nodeIdHash int - * @param _shortId String - * @param _p2pMgr P2pMgr + * @param _shortId String + * @param _p2pMgr P2pMgr */ // Attention: move node from container need sync to avoid node not belong to // any container during transit. - public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, final IP2pMgr _p2pMgr) { + public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, + final IP2pMgr _p2pMgr) { INode node = outboundNodes.remove(_nodeIdHash); if (node != null) { - if(activeNodes.size() >= maxActiveNodes){ + if (activeNodes.size() >= maxActiveNodes) { _p2pMgr.closeSocket(node.getChannel(), "outbound -> active, active full"); return; } - if(node.getIdHash() == _p2pMgr.getSelfIdHash()){ + if (node.getIdHash() == _p2pMgr.getSelfIdHash()) { _p2pMgr.closeSocket(node.getChannel(), "outbound -> active, self-connected"); return; } node.setConnection("outbound"); INode previous = activeNodes.putIfAbsent(_nodeIdHash, node); - if (previous != null) - _p2pMgr.closeSocket(node.getChannel(), "outbound -> active, node " + previous.getIdShort() + " exits"); - else { + if (previous != null) { + _p2pMgr.closeSocket(node.getChannel(), + "outbound -> active, node " + previous.getIdShort() + " exits"); + } else { if (_p2pMgr.isShowLog()) { System.out.println( " active node-id=" + _shortId + " ip=" + node.getIpStr() @@ -326,10 +357,10 @@ public synchronized void moveOutboundToActive(int _nodeIdHash, String _shortId, } } - public void timeoutInbound(final IP2pMgr _p2pMgr) { - Iterator inboundIt = inboundNodes.keySet().iterator(); + private void timeoutInbound(final IP2pMgr _p2pMgr) { + Iterator inboundIt = inboundNodes.keySet().iterator(); while (inboundIt.hasNext()) { - int key = (int) inboundIt.next(); + int key = inboundIt.next(); INode node = inboundNodes.get(key); if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_INBOUND_NODES) { _p2pMgr.closeSocket(node.getChannel(), "inbound-timeout ip=" + node.getIpStr()); @@ -338,28 +369,31 @@ public void timeoutInbound(final IP2pMgr _p2pMgr) { } } - public void timeoutActive(IP2pMgr _p2pMgr) { + private void timeoutActive(IP2pMgr _p2pMgr) { long now = System.currentTimeMillis(); - OptionalDouble average = activeNodes.values().stream().mapToLong(n -> now - n.getTimestamp()).average(); + OptionalDouble average = activeNodes.values().stream() + .mapToLong(n -> now - n.getTimestamp()).average(); double timeout = average.orElse(4000) * 5; timeout = Math.max(10000, Math.min(timeout, 60000)); if (_p2pMgr.isShowLog()) { System.out.printf("\n", average.orElse(0)); } - Iterator activeIt = activeNodes.keySet().iterator(); + Iterator activeIt = activeNodes.keySet().iterator(); while (activeIt.hasNext()) { - int key = (int) activeIt.next(); + int key = activeIt.next(); INode node = getActiveNode(key); if (now - node.getTimestamp() > timeout) { - _p2pMgr.closeSocket(node.getChannel(), "active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr()); + _p2pMgr.closeSocket(node.getChannel(), + "active-timeout node=" + node.getIdShort() + " ip=" + node.getIpStr()); activeIt.remove(); } if (!node.getChannel().isConnected()) { - _p2pMgr.closeSocket(node.getChannel(), "channel-already-closed node=" + node.getIdShort() + " ip=" + node.getIpStr()); + _p2pMgr.closeSocket(node.getChannel(), + "channel-already-closed node=" + node.getIdShort() + " ip=" + node.getIpStr()); activeIt.remove(); } } @@ -380,23 +414,26 @@ public void dropActive(int nodeIdHash, final IP2pMgr _p2pMgr, String _reason) { public void shutdown(final IP2pMgr _p2pMgr) { try { - synchronized (outboundNodes){ - outboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown outbound node=" + n.getIdShort() + " ip=" + n.getIpStr())); + synchronized (outboundNodes) { + outboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), + "p2p-shutdown outbound node=" + n.getIdShort() + " ip=" + n.getIpStr())); outboundNodes.clear(); } - synchronized (inboundNodes){ - inboundNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown inbound ip=" + n.getIpStr())); + synchronized (inboundNodes) { + inboundNodes.forEach((k, n) -> _p2pMgr + .closeSocket(n.getChannel(), "p2p-shutdown inbound ip=" + n.getIpStr())); inboundNodes.clear(); } - synchronized (activeNodes){ - activeNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), "p2p-shutdown active node=" + n.getIdShort() + " ip=" + n.getIpStr())); + synchronized (activeNodes) { + activeNodes.forEach((k, n) -> _p2pMgr.closeSocket(n.getChannel(), + "p2p-shutdown active node=" + n.getIdShort() + " ip=" + n.getIpStr())); activeNodes.clear(); } } catch (Exception e) { - + e.printStackTrace(); } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java b/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java index 1bbd677803..76b704f5bd 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/P2pMgr.java @@ -25,34 +25,62 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketException; -import java.nio.channels.*; -import java.util.*; -import java.util.concurrent.*; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.aion.p2p.*; +import org.aion.p2p.Ctrl; +import org.aion.p2p.Handler; +import org.aion.p2p.Header; +import org.aion.p2p.INode; +import org.aion.p2p.INodeMgr; +import org.aion.p2p.IP2pMgr; +import org.aion.p2p.Msg; import org.aion.p2p.P2pConstant; +import org.aion.p2p.Ver; import org.aion.p2p.impl.TaskRequestActiveNodes; import org.aion.p2p.impl.TaskUPnPManager; import org.aion.p2p.impl.comm.Node; import org.aion.p2p.impl.comm.NodeMgr; -import org.aion.p2p.impl.zero.msg.*; +import org.aion.p2p.impl.zero.msg.ReqHandshake1; +import org.aion.p2p.impl.zero.msg.ResHandshake1; import org.aion.p2p.impl1.tasks.MsgIn; import org.aion.p2p.impl1.tasks.MsgOut; -import org.aion.p2p.impl1.tasks.TaskReceive; -import org.aion.p2p.impl1.tasks.TaskSend; import org.aion.p2p.impl1.tasks.TaskClear; import org.aion.p2p.impl1.tasks.TaskConnectPeers; import org.aion.p2p.impl1.tasks.TaskInbound; +import org.aion.p2p.impl1.tasks.TaskReceive; +import org.aion.p2p.impl1.tasks.TaskSend; import org.aion.p2p.impl1.tasks.TaskStatus; import org.apache.commons.collections4.map.LRUMap; -/** @author Chris p2p://{uuid}@{ip}:{port} */ +/** + * @author Chris p2p://{uuid}@{ip}:{port} + */ public final class P2pMgr implements IP2pMgr { + private static final int PERIOD_SHOW_STATUS = 10000; private static final int PERIOD_REQUEST_ACTIVE_NODES = 1000; private static final int PERIOD_UPNP_PORT_MAPPING = 3600000; private static final int TIMEOUT_MSG_READ = 10000; + // TODO: need refactor by passing the parameter in the later version to P2pMgr. + public static int txBroadCastRoute = (Ctrl.SYNC << 8) + 6; // ((Ver.V0 << 16) + (Ctrl.SYNC << 8) + 6); + private final int maxTempNodes, maxActiveNodes, selfNetId, selfNodeIdHash, selfPort; private final boolean syncSeedsOnly, showStatus, showLog, upnpEnable; private final String selfRevision, selfShortId; @@ -61,6 +89,7 @@ public final class P2pMgr implements IP2pMgr { private final Map> handlers = new ConcurrentHashMap<>(); private final Set versions = new HashSet<>(); private final Map errCnt = Collections.synchronizedMap(new LRUMap<>(128)); + private final AtomicBoolean start = new AtomicBoolean(true); private ServerSocketChannel tcpServer; private Selector selector; @@ -68,7 +97,6 @@ public final class P2pMgr implements IP2pMgr { private int errTolerance; private BlockingQueue sendMsgQue = new LinkedBlockingQueue<>(); private BlockingQueue receiveMsgQue = new LinkedBlockingQueue<>(); - private AtomicBoolean start = new AtomicBoolean(true); private static ReqHandshake1 cachedReqHandshake1; private static ResHandshake1 cachedResHandshake1; @@ -91,19 +119,19 @@ public enum Dest { * @param _showLog boolean */ public P2pMgr( - int _netId, - String _revision, - String _nodeId, - String _ip, - int _port, - final String[] _bootNodes, - boolean _upnpEnable, - int _maxTempNodes, - int _maxActiveNodes, - boolean _showStatus, - boolean _showLog, - boolean _bootlistSyncOnly, - int _errorTolerance) { + int _netId, + String _revision, + String _nodeId, + String _ip, + int _port, + final String[] _bootNodes, + boolean _upnpEnable, + int _maxTempNodes, + int _maxActiveNodes, + boolean _showStatus, + boolean _showLog, + boolean _bootlistSyncOnly, + int _errorTolerance) { this.selfNetId = _netId; this.selfRevision = _revision; @@ -124,7 +152,7 @@ public P2pMgr( for (String _bootNode : _bootNodes) { Node node = Node.parseP2p(_bootNode); - if (node != null && validateNode(node)) { + if (validateNode(node)) { nodeMgr.addTempNode(node); nodeMgr.seedIpAdd(node.getIpStr()); } @@ -163,13 +191,15 @@ public void run() { }); } - for (int i = 0; i < TaskSend.TOTAL_LANE; i++) { + int pNum = Runtime.getRuntime().availableProcessors(); + + for (int i = 0; i < (pNum << 1); i++) { Thread thrdOut = new Thread(getSendInstance(i), "p2p-out-" + i); thrdOut.setPriority(Thread.NORM_PRIORITY); thrdOut.start(); } - for (int i = 0, m = Runtime.getRuntime().availableProcessors(); i < m; i++) { + for (int i = 0; i < pNum; i++) { Thread t = new Thread(getReceiveInstance(), "p2p-worker-" + i); t.setPriority(Thread.NORM_PRIORITY); t.start(); @@ -182,6 +212,7 @@ public void run() { PERIOD_UPNP_PORT_MAPPING, TimeUnit.MILLISECONDS); } + if (showStatus) { scheduledWorkers.scheduleWithFixedDelay( getStatusInstance(), @@ -189,6 +220,7 @@ public void run() { PERIOD_SHOW_STATUS, TimeUnit.MILLISECONDS); } + if (!syncSeedsOnly) { scheduledWorkers.scheduleWithFixedDelay( new TaskRequestActiveNodes(this), @@ -196,6 +228,7 @@ public void run() { PERIOD_REQUEST_ACTIVE_NODES, TimeUnit.MILLISECONDS); } + Thread thrdClear = new Thread(getClearInstance(), "p2p-clear"); thrdClear.setPriority(Thread.NORM_PRIORITY); thrdClear.start(); @@ -204,9 +237,13 @@ public void run() { thrdConn.setPriority(Thread.NORM_PRIORITY); thrdConn.start(); } catch (SocketException e) { - if (showLog) { System.out.println(" " + e.getMessage()); } + if (showLog) { + System.out.println(" " + e.getMessage()); + } } catch (IOException e) { - if (showLog) { System.out.println(""); } + if (showLog) { + System.out.println(""); + } } } @@ -248,7 +285,6 @@ public void shutdown() { for (List hdrs : handlers.values()) { hdrs.forEach(Handler::shutDown); } - nodeMgr.shutdown(this); } @@ -271,16 +307,27 @@ public void errCheck(int _nodeIdHash, String _displayId) { } } - /** @param _sc SocketChannel */ + /** + * @param _sc SocketChannel + */ public void closeSocket(final SocketChannel _sc, String _reason) { - if (showLog) { System.out.println(""); } + if (showLog) { + System.out.println(""); + } - try { + if (_sc != null) { SelectionKey sk = _sc.keyFor(selector); - _sc.close(); - if (sk != null) { sk.cancel(); } - } catch (IOException e) { - if (showLog) { System.out.println(""); } + if (sk != null) { + sk.cancel(); + } + + try { + _sc.close(); + } catch (IOException e) { + if (showLog) { + System.out.println(""); + } + } } } @@ -304,25 +351,24 @@ public boolean validateNode(final INode _node) { if (_node != null) { boolean notSelfId = !Arrays.equals(_node.getId(), this.selfNodeId); boolean notSameIpOrPort = - !(Arrays.equals(selfIp, _node.getIp()) && selfPort == _node.getPort()); - boolean notActive = !nodeMgr.hasActiveNode(_node.getIdHash()); + !(Arrays.equals(selfIp, _node.getIp()) && selfPort == _node.getPort()); + boolean notActive = nodeMgr.notActiveNode(_node.getIdHash()); boolean notOutbound = !nodeMgr.getOutboundNodes().containsKey(_node.getIdHash()); return notSelfId && notSameIpOrPort && notActive && notOutbound; - } else return false; + } else { + return false; + } } - /** @param _channel SocketChannel TODO: check option */ + /** + * @param _channel SocketChannel TODO: check option + */ @Override public void configChannel(final SocketChannel _channel) throws IOException { _channel.configureBlocking(false); _channel.socket().setSoTimeout(TIMEOUT_MSG_READ); - - // set buffer to 256k. _channel.socket().setReceiveBufferSize(P2pConstant.RECV_BUFFER_SIZE); _channel.socket().setSendBufferSize(P2pConstant.SEND_BUFFER_SIZE); - // _channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - // _channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - // _channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); } private void ban(int nodeIdHashcode) { @@ -339,7 +385,7 @@ public INode getRandom() { @Override public Map getActiveNodes() { - return new HashMap<>(this.nodeMgr.getActiveNodesMap()); + return this.nodeMgr.getActiveNodesMap(); } @Override @@ -455,4 +501,5 @@ private ReqHandshake1 getReqHandshake1Instance(List versions) { this.selfRevision.getBytes(), versions); } + } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java index 3a16e08e76..698ee21678 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/ChannelBuffer.java @@ -35,27 +35,52 @@ */ class ChannelBuffer { + // buffer for buffer remaining after NIO select read. + byte[] remainBuffer; + + int buffRemain = 0; + + int nodeIdHash = 0; + + String displayId = ""; + + Header header = null; + + private byte[] bsHead = new byte[Header.LEN]; + + byte[] body = null; + + Lock lock = new java.util.concurrent.locks.ReentrantLock(); + + /** + * Indicates whether this channel is closed. + */ + AtomicBoolean isClosed = new AtomicBoolean(false); + + private boolean showLog; + + private Map routes = new HashMap<>(); + class RouteStatus { + long timestamp; int count; - RouteStatus(){ + + RouteStatus() { this.timestamp = System.currentTimeMillis(); count = 0; } } - private boolean showLog; - - private Map routes = new HashMap<>(); - ChannelBuffer(boolean _showLog){ + ChannelBuffer(boolean _showLog) { this.showLog = _showLog; } /** - * @param _route int - * @param _maxReqsPerSec int requests within 1 s - * @return boolean flag if under route control + * @param _route int + * @param _maxReqsPerSec int requests within 1 s + * @return boolean flag if under route control */ synchronized boolean shouldRoute(int _route, int _maxReqsPerSec) { long now = System.currentTimeMillis(); @@ -67,52 +92,28 @@ synchronized boolean shouldRoute(int _route, int _maxReqsPerSec) { return true; } boolean shouldRoute = prev.count < _maxReqsPerSec; - if(shouldRoute) { + if (shouldRoute) { prev.count++; } - if(showLog) { - if(!shouldRoute) { + if (showLog) { + if (!shouldRoute) { System.out.println( ""); } - // too many msgs - //else - // System.out.println(""); } return shouldRoute; - } else + } else { return true; + } } - RouteStatus getRouteCount(int _route){ + RouteStatus getRouteCount(int _route) { return routes.get(_route); } - // buffer for buffer remaining after NIO select read. - byte[] remainBuffer; - - int buffRemain = 0; - - int nodeIdHash = 0; - - String displayId = ""; - - Header header = null; - - private byte[] bsHead = new byte[Header.LEN]; - - byte[] body = null; - - Lock lock = new java.util.concurrent.locks.ReentrantLock(); - - /** - * Indicates whether this channel is closed. - */ - AtomicBoolean isClosed = new AtomicBoolean(false); - void readHead(ByteBuffer buf) { buf.get(bsHead); try { @@ -138,15 +139,15 @@ void refreshBody() { /** * @return boolean */ - boolean isHeaderCompleted() { - return header != null; + boolean isHeaderNotCompleted() { + return header == null; } /** * @return boolean */ - boolean isBodyCompleted() { - return this.header != null && this.body != null && body.length == header.getLen(); + boolean isBodyNotCompleted() { + return this.header == null || this.body == null || body.length != header.getLen(); } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java index 046d0a03b2..bff5fd7e23 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgIn.java @@ -39,7 +39,7 @@ public class MsgIn { * @param route The route. * @param msg The message. */ - public MsgIn(int nodeId, String displayId, int route, byte[] msg) { + MsgIn(final int nodeId, final String displayId, final int route, final byte[] msg) { this.nodeId = nodeId; this.displayId = displayId; this.route = route; @@ -50,11 +50,11 @@ public int getNodeId() { return this.nodeId; } - public String getDisplayId() { + String getDisplayId() { return this.displayId; } - public int getRoute() { + int getRoute() { return this.route; } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java index 31cfb97e4a..1c6c3c0b0b 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/MsgOut.java @@ -29,8 +29,10 @@ * An outgoing message. */ public class MsgOut { + private final int nodeId; private final String displayId; + private final int lane; private final Msg msg; private final Dest dest; private final long timestamp; @@ -43,19 +45,20 @@ public class MsgOut { * @param msg The message. * @param dest The destination. */ - public MsgOut(int nodeId, String displayId, Msg msg, Dest dest) { + public MsgOut(final int nodeId, final String displayId, final Msg msg, final Dest dest) { this.nodeId = nodeId; this.displayId = displayId; this.msg = msg; this.dest = dest; - timestamp = System.currentTimeMillis(); + this.lane = TaskSend.hash2Lane(nodeId); + this.timestamp = System.currentTimeMillis(); } public int getNodeId() { return this.nodeId; } - public String getDisplayId() { + String getDisplayId() { return this.displayId; } @@ -63,11 +66,15 @@ public Msg getMsg() { return this.msg; } - public Dest getDest() { + Dest getDest() { return this.dest; } public long getTimestamp() { return this.timestamp; } + + int getLane() { + return this.lane; + } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java index b1ed935769..c4c65bee53 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskClear.java @@ -22,21 +22,19 @@ */ package org.aion.p2p.impl1.tasks; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; -import org.aion.p2p.INode; import org.aion.p2p.INodeMgr; import org.aion.p2p.IP2pMgr; public class TaskClear implements Runnable { + private static final int PERIOD_CLEAR = 20000; - private static final int TIMEOUT_OUTBOUND_NODES = 20000; private final IP2pMgr mgr; private final INodeMgr nodeMgr; - private AtomicBoolean start; + private final AtomicBoolean start; - public TaskClear(IP2pMgr _mgr, INodeMgr _nodeMgr, AtomicBoolean _start) { + public TaskClear(final IP2pMgr _mgr, final INodeMgr _nodeMgr, final AtomicBoolean _start) { this.mgr = _mgr; this.nodeMgr = _nodeMgr; this.start = _start; @@ -47,31 +45,9 @@ public void run() { while (start.get()) { try { Thread.sleep(PERIOD_CLEAR); - - nodeMgr.timeoutInbound(this.mgr); - - Iterator outboundIt = nodeMgr.getOutboundNodes().keySet().iterator(); - while (outboundIt.hasNext()) { - - Object obj = outboundIt.next(); - - if (obj == null) { continue; } - - int nodeIdHash = (int) obj; - INode node = nodeMgr.getOutboundNodes().get(nodeIdHash); - - if (node == null) { continue; } - - if (System.currentTimeMillis() - node.getTimestamp() > TIMEOUT_OUTBOUND_NODES) { - this.mgr.closeSocket( - node.getChannel(), "outbound-timeout node=" + node.getIdShort()); - outboundIt.remove(); - } - } - - nodeMgr.timeoutActive(this.mgr); - + nodeMgr.timeoutCheck(this.mgr); } catch (Exception e) { + e.printStackTrace(); } } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java index fea763fd0d..46b7937b67 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskConnectPeers.java @@ -36,25 +36,26 @@ import org.aion.p2p.impl1.P2pMgr.Dest; public class TaskConnectPeers implements Runnable { + private static final int PERIOD_CONNECT_OUTBOUND = 1000; private static final int TIMEOUT_OUTBOUND_CONNECT = 10000; private final INodeMgr nodeMgr; private final int maxActiveNodes; private final IP2pMgr mgr; - private AtomicBoolean start; - private BlockingQueue sendMsgQue; - private Selector selector; - private ReqHandshake1 cachedReqHandshake1; + private final AtomicBoolean start; + private final BlockingQueue sendMsgQue; + private final Selector selector; + private final ReqHandshake1 cachedReqHS; public TaskConnectPeers( - IP2pMgr _mgr, - AtomicBoolean _start, - INodeMgr _nodeMgr, - int _maxActiveNodes, - Selector _selector, - BlockingQueue _sendMsgQue, - ReqHandshake1 _cachedReqHandshake1) { + final IP2pMgr _mgr, + final AtomicBoolean _start, + final INodeMgr _nodeMgr, + final int _maxActiveNodes, + final Selector _selector, + final BlockingQueue _sendMsgQue, + final ReqHandshake1 _cachedReqHS) { this.start = _start; this.nodeMgr = _nodeMgr; @@ -62,7 +63,7 @@ public TaskConnectPeers( this.mgr = _mgr; this.selector = _selector; this.sendMsgQue = _sendMsgQue; - this.cachedReqHandshake1 = _cachedReqHandshake1; + this.cachedReqHS = _cachedReqHS; } @Override @@ -72,7 +73,9 @@ public void run() { try { Thread.sleep(PERIOD_CONNECT_OUTBOUND); } catch (InterruptedException e) { - if (this.mgr.isShowLog()) { System.out.println(getTcpInterruptedMsg()); } + if (this.mgr.isShowLog()) { + System.out.println(getTcpInterruptedMsg()); + } } if (this.nodeMgr.activeNodesSize() >= this.maxActiveNodes) { @@ -85,29 +88,37 @@ public void run() { INode node; try { node = this.nodeMgr.tempNodesTake(); - if (this.nodeMgr.isSeedIp(node.getIpStr())) { node.setFromBootList(true); } - if (node.getIfFromBootList()) { this.nodeMgr.addTempNode(node); } + if (this.nodeMgr.isSeedIp(node.getIpStr())) { + node.setFromBootList(true); + } + if (node.getIfFromBootList()) { + this.nodeMgr.addTempNode(node); + } // if (node.peerMetric.shouldNotConn()) { // continue; // } } catch (InterruptedException e) { - if (this.mgr.isShowLog()) { System.out.println(getTcpInterruptedMsg()); } + if (this.mgr.isShowLog()) { + System.out.println(getTcpInterruptedMsg()); + } return; } catch (Exception e) { - if (this.mgr.isShowLog()) { e.printStackTrace(); } + if (this.mgr.isShowLog()) { + e.printStackTrace(); + } continue; } int nodeIdHash = node.getIdHash(); if (!this.nodeMgr.getOutboundNodes().containsKey(nodeIdHash) - && !this.nodeMgr.hasActiveNode(nodeIdHash)) { + && this.nodeMgr.notActiveNode(nodeIdHash)) { int _port = node.getPort(); try { SocketChannel channel = SocketChannel.open(); channel.socket() - .connect( - new InetSocketAddress(node.getIpStr(), _port), - TIMEOUT_OUTBOUND_CONNECT); + .connect( + new InetSocketAddress(node.getIpStr(), _port), + TIMEOUT_OUTBOUND_CONNECT); this.mgr.configChannel(channel); if (channel.finishConnect() && channel.isConnected()) { @@ -129,11 +140,11 @@ public void run() { System.out.println(getPrepRqstMsg(node.getIdShort(), node.getIpStr())); } this.sendMsgQue.offer( - new MsgOut( - node.getIdHash(), - node.getIdShort(), - this.cachedReqHandshake1, - Dest.OUTBOUND)); + new MsgOut( + node.getIdHash(), + node.getIdShort(), + this.cachedReqHS, + Dest.OUTBOUND)); // node.peerMetric.decFailedCount(); } else { @@ -149,7 +160,9 @@ public void run() { } // node.peerMetric.incFailedCount(); } catch (Exception e) { - if (this.mgr.isShowLog()) e.printStackTrace(); + if (this.mgr.isShowLog()) { + e.printStackTrace(); + } } } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java index ddcfe03876..02ef274bcb 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskInbound.java @@ -22,11 +22,13 @@ */ package org.aion.p2p.impl1.tasks; +import static org.aion.p2p.impl1.P2pMgr.txBroadCastRoute; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -55,26 +57,27 @@ import org.aion.p2p.impl1.P2pMgr.Dest; public class TaskInbound implements Runnable { + private final IP2pMgr mgr; private final Selector selector; private final INodeMgr nodeMgr; private final Map> handlers; - private AtomicBoolean start; - private ServerSocketChannel tcpServer; - private BlockingQueue sendMsgQue; - private ResHandshake1 cachedResHandshake1; - private BlockingQueue receiveMsgQue; + private final AtomicBoolean start; + private final ServerSocketChannel tcpServer; + private final BlockingQueue sendMsgQue; + private final ResHandshake1 cachedResHandshake1; + private final BlockingQueue receiveMsgQue; public TaskInbound( - IP2pMgr _mgr, - Selector _selector, - AtomicBoolean _start, - INodeMgr _nodeMgr, - ServerSocketChannel _tcpServer, - Map> _handlers, - BlockingQueue _sendMsgQue, - ResHandshake1 _cachedResHandshake1, - BlockingQueue _receiveMsgQue) { + final IP2pMgr _mgr, + final Selector _selector, + final AtomicBoolean _start, + final INodeMgr _nodeMgr, + final ServerSocketChannel _tcpServer, + final Map> _handlers, + final BlockingQueue _sendMsgQue, + final ResHandshake1 _cachedResHandshake1, + final BlockingQueue _receiveMsgQue) { this.mgr = _mgr; this.selector = _selector; @@ -90,313 +93,327 @@ public TaskInbound( @Override public void run() { - // read buffer pre-alloc. @ max_body_size + // readBuffer buffer pre-alloc. @ max_body_size ByteBuffer readBuf = ByteBuffer.allocate(P2pConstant.MAX_BODY_SIZE); while (start.get()) { - try { - Thread.sleep(0, 1); - } catch (Exception e) { - } + if (this.selector.selectNow() == 0) { + Thread.sleep(0, 1); + continue; + } - int num; - try { - num = this.selector.selectNow(); } catch (IOException e) { - if (this.mgr.isShowLog()) System.out.println(""); + if (this.mgr.isShowLog()) { + System.out.println(""); + } continue; - } - - if (num == 0) { + } catch (ClosedSelectorException e) { + if (this.mgr.isShowLog()) { + System.out.println(""); + } continue; + } catch (InterruptedException e) { + e.printStackTrace(); + return; } - Iterator keys = this.selector.selectedKeys().iterator(); - - while (keys.hasNext() && (num-- > 0)) { - - final SelectionKey sk = keys.next(); - keys.remove(); - - try { - - if (!sk.isValid()) continue; - - if (sk.isAcceptable()) accept(); - - if (sk.isReadable()) { - - readBuf.rewind(); - - ChannelBuffer chanBuf = (ChannelBuffer) (sk.attachment()); - try { - - int ret; - int cnt = 0; - - while ((ret = ((SocketChannel) sk.channel()).read(readBuf)) > 0) { - cnt += ret; - } - - // read empty select key, continue. - if (cnt <= 0) { - continue; - } - - int prevCnt = cnt + chanBuf.buffRemain; - ByteBuffer forRead; - - if (chanBuf.buffRemain != 0) { - byte[] alreadyRead = new byte[cnt]; - - readBuf.position(0); - readBuf.get(alreadyRead); - forRead = ByteBuffer.allocate(prevCnt); - forRead.put(chanBuf.remainBuffer); - forRead.put(alreadyRead); - } else { - forRead = readBuf; - } - - do { - cnt = read(sk, forRead, prevCnt); - - if (prevCnt == cnt) { - break; - } else prevCnt = cnt; - - } while (cnt > 0); + Iterator keys = null; + try { + keys = this.selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey sk = null; + try { + sk = keys.next(); + if (!sk.isValid()) { + continue; + } - // check if really read data. - if (cnt > prevCnt) { - chanBuf.buffRemain = 0; - throw new P2pException(getReadOverflowMsg(prevCnt, cnt)); + if (sk.isAcceptable()) { + accept(); + } + if (sk.isReadable()) { + ChannelBuffer cb = (ChannelBuffer) sk.attachment(); + if (cb == null) { + throw new P2pException("attachment is null"); } + try { + readBuffer(sk, cb, readBuf); + } catch (NullPointerException e) { + mgr.closeSocket((SocketChannel) sk.channel(), + cb.displayId + "-read-msg-null-exception"); + cb.isClosed.set(true); + } catch (P2pException e) { + mgr.closeSocket((SocketChannel) sk.channel(), + cb.displayId + "-read-msg-p2p-exception"); + cb.isClosed.set(true); + } catch (ClosedChannelException e) { + mgr.closeSocket((SocketChannel) sk.channel(), + cb.displayId + "-read-msg-closed-channel-exception"); - chanBuf.buffRemain = cnt; - - if (cnt == 0) { - readBuf.rewind(); - } else { - // there are no perfect cycling buffer in jdk - // yet. - // simply just buff move for now. - // @TODO: looking for more efficient way. - - int currPos = forRead.position(); - chanBuf.remainBuffer = new byte[cnt]; - forRead.position(currPos - cnt); - forRead.get(chanBuf.remainBuffer); - readBuf.rewind(); } - - } catch (NullPointerException e) { - this.mgr.closeSocket( - (SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-null-exception"); - chanBuf.isClosed.set(true); - } catch (P2pException e) { - this.mgr.closeSocket( - (SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-p2p-exception"); - chanBuf.isClosed.set(true); - - } catch (ClosedChannelException e) { - this.mgr.closeSocket( - (SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-closed-channel-exception"); - - } catch (IOException e) { - this.mgr.closeSocket( - (SocketChannel) sk.channel(), - chanBuf.displayId - + "-read-msg-io-exception: " - + e.getMessage()); - chanBuf.isClosed.set(true); - - } catch (CancelledKeyException e) { - chanBuf.isClosed.set(true); - this.mgr.closeSocket( - (SocketChannel) sk.channel(), - chanBuf.displayId + "-read-msg-key-cancelled-exception"); - } catch (Exception e) { - if (this.mgr.isShowLog()) - System.out.println(""); } - } - } catch (Exception ex) { - if (this.mgr.isShowLog()) { - System.out.println(""); - ex.printStackTrace(); + } catch (IOException e) { + this.mgr.closeSocket((SocketChannel) sk.channel(), + "inbound-io-exception=" + e.getMessage()); + } catch (Exception e) { + e.printStackTrace(); } } + + } catch (Exception ex) { + if (mgr.isShowLog()) { + System.out.println(""); + } + } finally { + if (keys != null) { + keys.remove(); + } } } - if (this.mgr.isShowLog()) System.out.println(""); + if (this.mgr.isShowLog()) { + System.out.println(""); + } } private void accept() { SocketChannel channel; try { - if (this.nodeMgr.activeNodesSize() >= this.mgr.getMaxActiveNodes()) return; + if (this.nodeMgr.activeNodesSize() >= this.mgr.getMaxActiveNodes()) { + return; + } channel = this.tcpServer.accept(); - this.mgr.configChannel(channel); + if (channel != null) { + this.mgr.configChannel(channel); - SelectionKey sk = channel.register(this.selector, SelectionKey.OP_READ); - sk.attach(new ChannelBuffer(this.mgr.isShowLog())); + String ip = channel.socket().getInetAddress().getHostAddress(); - String ip = channel.socket().getInetAddress().getHostAddress(); - int port = channel.socket().getPort(); + if (this.mgr.isSyncSeedsOnly() && this.nodeMgr.isSeedIp(ip)) { + channel.close(); + return; + } - if (this.mgr.isSyncSeedsOnly() && this.nodeMgr.isSeedIp(ip)) { - channel.close(); - return; - } + int port = channel.socket().getPort(); + INode node = this.nodeMgr.allocNode(ip, port); + node.setChannel(channel); - INode node = this.nodeMgr.allocNode(ip, 0); - node.setChannel(channel); - this.nodeMgr.addInboundNode(node); + SelectionKey sk = channel.register(this.selector, SelectionKey.OP_READ); + sk.attach(new ChannelBuffer(this.mgr.isShowLog())); + this.nodeMgr.addInboundNode(node); - if (this.mgr.isShowLog()) - System.out.println(""); + if (this.mgr.isShowLog()) { + System.out.println(""); + } + } } catch (IOException e) { - if (this.mgr.isShowLog()) System.out.println(""); + if (this.mgr.isShowLog()) { + System.out.println(""); + } + } catch (Exception e) { + e.printStackTrace(); } } - /** - * SocketChannel - */ - private int readHeader(final ChannelBuffer _cb, ByteBuffer readBuffer, int cnt) { + private int readHeader(final ChannelBuffer _cb, + final ByteBuffer _readBuf, int cnt) { - if (cnt < Header.LEN) return cnt; + if (cnt < Header.LEN) { + return cnt; + } + + int origPos = _readBuf.position(); - int origPos = readBuffer.position(); int startP = origPos - cnt; - readBuffer.position(startP); - _cb.readHead(readBuffer); - readBuffer.position(origPos); + + _readBuf.position(startP); + + _cb.readHead(_readBuf); + + _readBuf.position(origPos); + return cnt - Header.LEN; } - /** - * SocketChannel - */ - private int readBody(final ChannelBuffer _cb, ByteBuffer readBuffer, int cnt) { + private int readBody(final ChannelBuffer _cb, ByteBuffer _readBuf, int _cnt) { int bodyLen = _cb.header.getLen(); // some msg have nobody. if (bodyLen == 0) { _cb.body = new byte[0]; - return cnt; + return _cnt; } - if (cnt < bodyLen) { return cnt; } + if (_cnt < bodyLen) { + return _cnt; + } - int origPos = readBuffer.position(); - int startP = origPos - cnt; - readBuffer.position(startP); - _cb.readBody(readBuffer); - readBuffer.position(origPos); - return cnt - bodyLen; + int origPos = _readBuf.position(); + int startP = origPos - _cnt; + _readBuf.position(startP); + _cb.readBody(_readBuf); + _readBuf.position(origPos); + return _cnt - bodyLen; } - /** - * @param _sk SelectionKey - * @throws IOException IOException - */ - private int read(final SelectionKey _sk, ByteBuffer _readBuffer, int _cnt) throws IOException { + private void readBuffer(final SelectionKey _sk, final ChannelBuffer _cb, + final ByteBuffer _readBuf) throws Exception { + + _readBuf.rewind(); + + SocketChannel sc = (SocketChannel) _sk.channel(); + + int r; + int cnt = 0; + do { + r = sc.read(_readBuf); + cnt += r; + } while (r > 0); + + if (cnt < 1) { + return; + } + + int remainBufAll = _cb.buffRemain + cnt; + ByteBuffer bufferAll = calBuffer(_cb, _readBuf, cnt); + + do { + r = readMsg(_sk, bufferAll, remainBufAll); + if (remainBufAll == r) { + break; + } else { + remainBufAll = r; + } + } while (r > 0); - int currCnt = 0; + _cb.buffRemain = r; - if (_sk.attachment() == null) { + if (r != 0) { + // there are no perfect cycling buffer in jdk + // yet. + // simply just buff move for now. + // @TODO: looking for more efficient way. + + int currPos = bufferAll.position(); + _cb.remainBuffer = new byte[r]; + bufferAll.position(currPos - r); + bufferAll.get(_cb.remainBuffer); + + } + + _readBuf.rewind(); + } + + private int readMsg(SelectionKey _sk, ByteBuffer _readBuf, int _cnt) throws IOException { + ChannelBuffer cb = (ChannelBuffer) _sk.attachment(); + if (cb == null) { throw new P2pException("attachment is null"); } - ChannelBuffer rb = (ChannelBuffer) _sk.attachment(); - // read header - if (!rb.isHeaderCompleted()) { - currCnt = readHeader(rb, _readBuffer, _cnt); + int readCnt; + if (cb.isHeaderNotCompleted()) { + readCnt = readHeader(cb, _readBuf, _cnt); } else { - currCnt = _cnt; + readCnt = _cnt; } - // read body - if (rb.isHeaderCompleted() && !rb.isBodyCompleted()) { - currCnt = readBody(rb, _readBuffer, currCnt); + if (cb.isBodyNotCompleted()) { + readCnt = readBody(cb, _readBuf, readCnt); } - if (!rb.isBodyCompleted()) { return currCnt; } + if (cb.isBodyNotCompleted()) { + return readCnt; + } + + handleMsg(_sk, cb); + + return readCnt; + } - Header h = rb.header; + private void handleMsg(SelectionKey _sk, ChannelBuffer _cb) { - byte[] bodyBytes = rb.body; - rb.refreshHeader(); - rb.refreshBody(); + Header h = _cb.header; + byte[] bodyBytes = _cb.body; - short ver = h.getVer(); - byte ctrl = h.getCtrl(); - byte act = h.getAction(); - int route = h.getRoute(); + _cb.refreshHeader(); + _cb.refreshBody(); - boolean underRC = - rb.shouldRoute( - route, - ((route == this.mgr.txBroadCastRoute) - ? P2pConstant.READ_MAX_RATE_TXBC - : P2pConstant.READ_MAX_RATE)); + boolean underRC = _cb.shouldRoute(h.getRoute(), + ((h.getRoute() == txBroadCastRoute) ? P2pConstant.READ_MAX_RATE_TXBC + : P2pConstant.READ_MAX_RATE)); if (!underRC) { if (this.mgr.isShowLog()) { System.out.println( - getRouteMsg(ver, ctrl, act, rb.getRouteCount(route).count, rb.displayId)); + ""); } - return currCnt; + return; } - switch (ver) { + switch (h.getVer()) { case Ver.V0: - switch (ctrl) { + switch (h.getCtrl()) { case Ctrl.NET: try { - handleP2pMsg(_sk, act, bodyBytes); + handleP2pMsg(_sk, h.getAction(), bodyBytes); } catch (Exception ex) { if (this.mgr.isShowLog()) { - System.out.println( - ""); + System.out + .println(""); } } break; case Ctrl.SYNC: - if (!this.handlers.containsKey(route)) { + + if (!handlers.containsKey(h.getRoute())) { if (this.mgr.isShowLog()) { - System.out.println(getUnregRouteMsg(ver, ctrl, act, rb.displayId)); + System.out.println( + ""); } - return currCnt; + return; } - this.handleKernelMsg(rb.nodeIdHash, route, bodyBytes); + handleKernelMsg(_cb.nodeIdHash, h.getRoute(), bodyBytes); break; default: if (this.mgr.isShowLog()) { - System.out.println(getInvalRouteMsg(ver, ctrl, act, rb.displayId)); + System.out.println( + ""); } break; } break; default: if (this.mgr.isShowLog()) { - System.out.println(""); + System.out.println( + ""); } break; } + } + + private ByteBuffer calBuffer(ChannelBuffer _cb, ByteBuffer _readBuf, int _cnt) { + ByteBuffer r; + if (_cb.buffRemain != 0) { + byte[] alreadyRead = new byte[_cnt]; + _readBuf.position(0); + _readBuf.get(alreadyRead); + r = ByteBuffer.allocate(_cb.buffRemain + _cnt); + r.put(_cb.remainBuffer); + r.put(alreadyRead); + } else { + r = _readBuf; + } - return currCnt; + return r; } /** @@ -414,18 +431,20 @@ private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBy ReqHandshake1 reqHandshake1 = ReqHandshake1.decode(_msgBytes); if (reqHandshake1 != null) { handleReqHandshake( - rb, - _sk.channel().hashCode(), - reqHandshake1.getNodeId(), - reqHandshake1.getNetId(), - reqHandshake1.getPort(), - reqHandshake1.getRevision()); + rb, + _sk.channel().hashCode(), + reqHandshake1.getNodeId(), + reqHandshake1.getNetId(), + reqHandshake1.getPort(), + reqHandshake1.getRevision()); } } break; case Act.RES_HANDSHAKE: - if (rb.nodeIdHash == 0) { return; } + if (rb.nodeIdHash == 0) { + return; + } if (_msgBytes.length > ResHandshake.LEN) { ResHandshake1 resHandshake1 = ResHandshake1.decode(_msgBytes); @@ -450,7 +469,9 @@ private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBy break; case Act.RES_ACTIVE_NODES: - if (this.mgr.isSyncSeedsOnly()) { break; } + if (this.mgr.isSyncSeedsOnly()) { + break; + } if (rb.nodeIdHash != 0) { INode node = nodeMgr.getActiveNode(rb.nodeIdHash); @@ -460,7 +481,10 @@ private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBy if (resActiveNodes != null) { List incomingNodes = resActiveNodes.getNodes(); for (INode incomingNode : incomingNodes) { - if (nodeMgr.tempNodesSize() >= this.mgr.getMaxTempNodes()) { return; } + if (nodeMgr.tempNodesSize() >= this.mgr.getMaxTempNodes()) { + return; + } + if (this.mgr.validateNode(incomingNode)) { nodeMgr.addTempNode(incomingNode); } @@ -484,15 +508,15 @@ private void handleP2pMsg(final SelectionKey _sk, byte _act, final byte[] _msgBy * @param _netId int * @param _port int * @param _revision byte[] - *

Construct node info after handshake request success + *

Construct node info after handshake request success */ private void handleReqHandshake( - final ChannelBuffer _buffer, - int _channelHash, - final byte[] _nodeId, - int _netId, - int _port, - final byte[] _revision) { + final ChannelBuffer _buffer, + int _channelHash, + final byte[] _nodeId, + int _netId, + int _port, + final byte[] _revision) { INode node = nodeMgr.getInboundNode(_channelHash); if (node != null && node.getPeerMetric().notBan()) { if (handshakeRuleCheck(_netId)) { @@ -512,15 +536,17 @@ private void handleReqHandshake( node.setBinaryVersion(binaryVersion); nodeMgr.moveInboundToActive(_channelHash, this.mgr); this.sendMsgQue.offer( - new MsgOut( - node.getIdHash(), - node.getIdShort(), - this.cachedResHandshake1, - Dest.ACTIVE)); + new MsgOut( + node.getIdHash(), + node.getIdShort(), + this.cachedResHandshake1, + Dest.ACTIVE)); } } else { - if (this.mgr.isShowLog()) { System.out.println(""); } + if (this.mgr.isShowLog()) { + System.out.println(""); + } } } } @@ -549,16 +575,16 @@ private void handleKernelMsg(int _nodeIdHash, int _route, final byte[] _msgBytes } } - /** @return boolean TODO: implementation */ + /** + * @return boolean TODO: implementation + */ private boolean handshakeRuleCheck(int netId) { // check net id - if (netId != this.mgr.getSelfNetId()) { return false; } - // check supported protocol versions - return true; + return netId == this.mgr.getSelfNetId(); } private String getReadOverflowMsg(int prevCnt, int cnt) { - return "IO read overflow! suppose read:" + prevCnt + " real left:" + cnt; + return "IO readBuffer overflow! suppose readBuffer:" + prevCnt + " real left:" + cnt; } private String getRouteMsg(short ver, byte ctrl, byte act, int count, String idStr) { diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java index b5af5e5e0d..14d20b358b 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskReceive.java @@ -29,16 +29,17 @@ import org.aion.p2p.Handler; public class TaskReceive implements Runnable { + private final AtomicBoolean start; private final BlockingQueue receiveMsgQue; private final Map> handlers; private final boolean showLog; public TaskReceive( - AtomicBoolean _start, - BlockingQueue _receiveMsgQue, - Map> _handlers, - boolean _showLog) { + final AtomicBoolean _start, + final BlockingQueue _receiveMsgQue, + final Map> _handlers, + final boolean _showLog) { this.start = _start; this.receiveMsgQue = _receiveMsgQue; this.handlers = _handlers; @@ -52,21 +53,31 @@ public void run() { MsgIn mi = this.receiveMsgQue.take(); List hs = this.handlers.get(mi.getRoute()); - if (hs == null) { continue; } + if (hs == null) { + continue; + } for (Handler hlr : hs) { - if (hlr == null) { continue; } + if (hlr == null) { + continue; + } try { hlr.receive(mi.getNodeId(), mi.getDisplayId(), mi.getMsg()); } catch (Exception e) { - if (this.showLog) { e.printStackTrace(); } + if (this.showLog) { + e.printStackTrace(); + } } } } catch (InterruptedException e) { - if (this.showLog) { System.out.println(""); } + if (this.showLog) { + System.out.println(""); + } return; } catch (Exception e) { - if (this.showLog) { e.printStackTrace(); } + if (this.showLog) { + e.printStackTrace(); + } } } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java index 904467b305..3329ac2ae1 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskSend.java @@ -25,6 +25,11 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.aion.p2p.INode; import org.aion.p2p.INodeMgr; @@ -32,7 +37,10 @@ import org.aion.p2p.P2pConstant; public class TaskSend implements Runnable { - public static final int TOTAL_LANE = (1 << 5) - 1; + + private static final int TOTAL_LANE = Math + .min(Runtime.getRuntime().availableProcessors() << 1, 32); + private static final int THREAD_Q_LIMIT = 20000; private final IP2pMgr mgr; private final AtomicBoolean start; @@ -41,13 +49,15 @@ public class TaskSend implements Runnable { private final Selector selector; private final int lane; + private static ThreadPoolExecutor tpe; + public TaskSend( - IP2pMgr _mgr, - int _lane, - BlockingQueue _sendMsgQue, - AtomicBoolean _start, - INodeMgr _nodeMgr, - Selector _selector) { + final IP2pMgr _mgr, + final int _lane, + final BlockingQueue _sendMsgQue, + final AtomicBoolean _start, + final INodeMgr _nodeMgr, + final Selector _selector) { this.mgr = _mgr; this.lane = _lane; @@ -55,6 +65,15 @@ public TaskSend( this.start = _start; this.nodeMgr = _nodeMgr; this.selector = _selector; + + if (tpe == null) { + tpe = new ThreadPoolExecutor(TOTAL_LANE + , TOTAL_LANE + , 0 + , TimeUnit.MILLISECONDS + , new LinkedBlockingQueue<>(THREAD_Q_LIMIT) + , Executors.defaultThreadFactory()); + } } @Override @@ -62,6 +81,7 @@ public void run() { while (start.get()) { try { MsgOut mo = sendMsgQue.take(); + // if timeout , throw away this msg. long now = System.currentTimeMillis(); if (now - mo.getTimestamp() > P2pConstant.WRITE_MSG_TIMEOUT) { @@ -72,8 +92,8 @@ public void run() { } // if not belong to current lane, put it back. - int targetLane = hash2Lane(mo.getNodeId()); - if (targetLane != lane) { + long t1 = System.nanoTime(); + if (mo.getLane() != lane) { sendMsgQue.offer(mo); continue; } @@ -96,15 +116,13 @@ public void run() { if (sk != null) { Object attachment = sk.attachment(); if (attachment != null) { - TaskWrite tw = - new TaskWrite( - this.mgr.isShowLog(), - node.getIdShort(), - node.getChannel(), - mo.getMsg(), - (ChannelBuffer) attachment, - this.mgr); - tw.run(); + tpe.execute(new TaskWrite( + this.mgr.isShowLog(), + node.getIdShort(), + node.getChannel(), + mo.getMsg(), + (ChannelBuffer) attachment, + this.mgr)); } } } else { @@ -114,22 +132,30 @@ public void run() { } } } catch (InterruptedException e) { - if (this.mgr.isShowLog()) { System.out.println(""); } + if (this.mgr.isShowLog()) { + System.out.println(""); + } return; + } catch (RejectedExecutionException e) { + if (this.mgr.isShowLog()) { + System.out.println(""); + } } catch (Exception e) { - if (this.mgr.isShowLog()) { e.printStackTrace(); } + if (this.mgr.isShowLog()) { + e.printStackTrace(); + } } } } // hash mapping channel id to write thread. - private int hash2Lane(int in) { + static int hash2Lane(int in) { in ^= in >> (32 - 5); in ^= in >> (32 - 10); in ^= in >> (32 - 15); in ^= in >> (32 - 20); in ^= in >> (32 - 25); - return in & 0b11111; + return (in & 0b11111) * TOTAL_LANE / 32; } private String getTimeoutMsg(String id, long now) { diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java index 38a16ea850..b62a4dae20 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskStatus.java @@ -26,16 +26,17 @@ import org.aion.p2p.INodeMgr; public class TaskStatus implements Runnable { + private final INodeMgr nodeMgr; private final String selfShortId; - private BlockingQueue sendMsgQue; - private BlockingQueue receiveMsgQue; + private final BlockingQueue sendMsgQue; + private final BlockingQueue receiveMsgQue; public TaskStatus( - INodeMgr _nodeMgr, - String _selfShortId, - BlockingQueue _sendMsgQue, - BlockingQueue _receiveMsgQue) { + final INodeMgr _nodeMgr, + final String _selfShortId, + final BlockingQueue _sendMsgQue, + final BlockingQueue _receiveMsgQue) { this.nodeMgr = _nodeMgr; this.selfShortId = _selfShortId; this.sendMsgQue = _sendMsgQue; @@ -48,13 +49,13 @@ public void run() { String status = this.nodeMgr.dumpNodeInfo(this.selfShortId); System.out.println(status); System.out.println("--------------------------------------------------------------------" + - "-------------------------------------------------------------------------------" + - "-----------------"); + "-------------------------------------------------------------------------------" + + "-----------------"); System.out.println( - "recv queue [" - + this.receiveMsgQue.size() - + "] send queue [" - + this.sendMsgQue.size() - + "]\n"); + "recv queue [" + + this.receiveMsgQue.size() + + "] send queue [" + + this.sendMsgQue.size() + + "]\n"); } } diff --git a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java index 38e3dacbd4..729027c448 100644 --- a/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java +++ b/modP2pImpl/src/org/aion/p2p/impl1/tasks/TaskWrite.java @@ -30,23 +30,25 @@ import org.aion.p2p.IP2pMgr; import org.aion.p2p.Msg; -/** @author chris */ +/** + * @author chris + */ public class TaskWrite implements Runnable { - private boolean showLog; - private String nodeShortId; - private SocketChannel sc; - private Msg msg; - private ChannelBuffer channelBuffer; - private IP2pMgr p2pMgr; + private final boolean showLog; + private final String nodeShortId; + private final SocketChannel sc; + private final Msg msg; + private final ChannelBuffer channelBuffer; + private final IP2pMgr p2pMgr; TaskWrite( - boolean _showLog, - String _nodeShortId, - final SocketChannel _sc, - final Msg _msg, - final ChannelBuffer _cb, - final IP2pMgr _p2pMgr) { + final boolean _showLog, + final String _nodeShortId, + final SocketChannel _sc, + final Msg _msg, + final ChannelBuffer _cb, + final IP2pMgr _p2pMgr) { this.showLog = _showLog; this.nodeShortId = _nodeShortId; this.sc = _sc; @@ -81,7 +83,9 @@ public void run() { // System.out.println("write " + h.getVer() + "-" + h.getCtrl() + "-" + h.getAction()); ByteBuffer buf = ByteBuffer.allocate(headerBytes.length + bodyLen); buf.put(headerBytes); - if (bodyBytes != null) { buf.put(bodyBytes); } + if (bodyBytes != null) { + buf.put(bodyBytes); + } buf.flip(); try { @@ -94,18 +98,18 @@ public void run() { } catch (ClosedChannelException ex1) { if (showLog) { System.out.println( - ""); + ""); } channelBuffer.isClosed.set(true); } catch (IOException ex2) { String reason = ex2.getMessage(); if (showLog) { System.out.println( - ""); + ""); } if (reason.equals("Broken pipe")) { channelBuffer.isClosed.set(true); diff --git a/modP2pImpl/test/org/aion/p2p/impl/Test.java b/modP2pImpl/test/org/aion/p2p/impl/Test.java index 8bedcba68b..293061c8d8 100644 --- a/modP2pImpl/test/org/aion/p2p/impl/Test.java +++ b/modP2pImpl/test/org/aion/p2p/impl/Test.java @@ -1,11 +1,11 @@ package org.aion.p2p.impl; -import java.util.HashSet; -import java.util.Set; - import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertFalse; +import java.util.HashSet; +import java.util.Set; + public class Test { @org.junit.Test