From c38ce3240822549f59fe9aea3c17ff24375a6b34 Mon Sep 17 00:00:00 2001 From: Alexandra Roatis Date: Tue, 1 Oct 2019 16:36:09 -0400 Subject: [PATCH] Refactored header requests and block imports to use new manager: - simplified import block functionality; - removed sync-related constants from p2p module. --- .../impl/blockchain/AionBlockchainImpl.java | 10 - .../zero/impl/blockchain/IAionBlockchain.java | 11 - .../aion/zero/impl/db/PendingBlockStore.java | 82 ---- .../org/aion/zero/impl/sync/PeerState.java | 202 -------- .../src/org/aion/zero/impl/sync/SyncMgr.java | 31 +- .../aion/zero/impl/sync/TaskGetHeaders.java | 181 ------- .../aion/zero/impl/sync/TaskImportBlocks.java | 405 ++------------- .../zero/impl/db/PendingBlockStoreTest.java | 111 ----- .../zero/impl/sync/TaskImportBlocksTest.java | 464 +----------------- modP2p/src/org/aion/p2p/P2pConstant.java | 23 +- 10 files changed, 55 insertions(+), 1465 deletions(-) delete mode 100644 modAionImpl/src/org/aion/zero/impl/sync/PeerState.java delete mode 100644 modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java b/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java index cae46cd4c8..8ba3cefafa 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java @@ -2009,16 +2009,6 @@ public Map> loadPendingBlocksAtLevel(long level) { } } - @Override - public long nextBase(long current, long knownStatus) { - try { - return repository.getPendingBlockStore().nextBase(current, knownStatus); - } catch (Exception e) { - LOG.error("Unable to generate next LIGHTNING request base due to: ", e); - return current; - } - } - @Override public void dropImported( long level, diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/IAionBlockchain.java b/modAionImpl/src/org/aion/zero/impl/blockchain/IAionBlockchain.java index d6cd42f9f0..49103c26f7 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/IAionBlockchain.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/IAionBlockchain.java @@ -120,17 +120,6 @@ Map getReferencedTrieNodes( */ Map> loadPendingBlocksAtLevel(long level); - /** - * Returns a number greater or equal to the given {@code current} number representing the base - * value for a subsequent LIGHTNING request. - * - * @param current the starting point value for the next base - * @param knownStatus value retrieved from the last best block status update for the peer - * requesting a base value for a subsequent LIGHTNING request. - * @return the next generated base value for the request. - */ - long nextBase(long current, long knownStatus); - /** * Deletes the given blocks from the pending block storage. * diff --git a/modAionImpl/src/org/aion/zero/impl/db/PendingBlockStore.java b/modAionImpl/src/org/aion/zero/impl/db/PendingBlockStore.java index 5b8cc9603f..96fb9c14fb 100644 --- a/modAionImpl/src/org/aion/zero/impl/db/PendingBlockStore.java +++ b/modAionImpl/src/org/aion/zero/impl/db/PendingBlockStore.java @@ -2,8 +2,6 @@ import static org.aion.zero.impl.db.DatabaseUtils.connectAndOpen; import static org.aion.zero.impl.db.DatabaseUtils.verifyAndBuildPath; -import static org.aion.p2p.P2pConstant.LARGE_REQUEST_SIZE; -import static org.aion.p2p.P2pConstant.STEP_COUNT; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; @@ -63,7 +61,6 @@ public class PendingBlockStore implements Closeable { private static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.DB.name()); - private static final Logger LOG_SYNC = AionLoggerFactory.getLogger(LogEnum.SYNC.name()); private final ReadWriteLock databaseLock = new ReentrantReadWriteLock(); private final Lock internalLock = new ReentrantLock(); @@ -88,11 +85,6 @@ public class PendingBlockStore implements Closeable { /** Used to maps a block hash to its current queue identifier. */ private ByteArrayKeyValueDatabase indexSource; - // tracking the status: with access managed by the `internalLock` - private long maxRequest = 0L, maxHeight = 0L; - - private static final int FORWARD_SKIP = STEP_COUNT * LARGE_REQUEST_SIZE; - /** * Constructor. Initializes the databases used for storage. If the database configuration used * requires persistence, the constructor ensures the path can be accessed or throws an exception @@ -511,85 +503,11 @@ public void dropPendingQueues( levelSource.flushBatch(); } catch (Exception e) { LOG.error("Unable to delete used blocks due to: ", e); - return; } finally { databaseLock.writeLock().unlock(); } } - /** - * Generates a number greater or equal to the given {@code current} number representing the base - * value for a subsequent LIGHTNING request. The returned base is generated taking into - * consideration the current {@code knownBest} value for the peer for which this functionality - * is requested and the maximum recorded best throughout calls. - * - *

The bases are generated in an optimistic continuous manner based on the following - * assumptions: - * - *

    - *
  1. the majority of peers are on the same (main) chain; - *
  2. bases that cannot be used are recycled in the functionality calling this method; - *
  3. chain continuity is ensured by employing NORMAL requests in addition to LIGHTNING ones. - *
- * - * @param current the starting point value for the next base - * @param knownBest value retrieved from the last best block status update for the peer - * requesting a base value for a subsequent LIGHTNING request. - * @return the next generated base value for the request. - */ - public long nextBase(long current, long knownBest) { - internalLock.lock(); - - try { - long base = -1; - - if (knownBest > maxHeight) { - maxHeight = knownBest; - } - - if (maxHeight == 0) { - // optimistic jump forward - base = current > maxRequest ? current : maxRequest; - base += FORWARD_SKIP; - } else { - // same as initialization => no change from gap fill functionality - if (base == -1) { - if (maxHeight < current + LARGE_REQUEST_SIZE) { - // signal to switch back to / stay in NORMAL mode - base = current; - } else { - // regular jump forward - base = current > maxRequest ? current : maxRequest; - base += FORWARD_SKIP; - } - } - } - - if (LOG_SYNC.isDebugEnabled()) { - LOG_SYNC.debug( - "max status = {}, max requested = {}, known best = {}, current = {}, returned base = {}", - maxHeight, - knownBest, - maxRequest, - current, - base); - } - - // keep track of base - if (base > maxRequest) { - maxRequest = base; - } - - // return new base - return base; - } catch (Exception e) { - LOG.error("Unable to generate next LIGHTNING request base due to: ", e); - return current; - } finally { - internalLock.unlock(); - } - } - @Override public void close() { databaseLock.writeLock().lock(); diff --git a/modAionImpl/src/org/aion/zero/impl/sync/PeerState.java b/modAionImpl/src/org/aion/zero/impl/sync/PeerState.java deleted file mode 100644 index 7041f2765d..0000000000 --- a/modAionImpl/src/org/aion/zero/impl/sync/PeerState.java +++ /dev/null @@ -1,202 +0,0 @@ -package org.aion.zero.impl.sync; - -import static org.aion.p2p.P2pConstant.STEP_COUNT; - -import java.util.Objects; - -public class PeerState { - - public enum Mode { - /** - * The peer is in main-chain. Use normal syncing strategy. - * - * @implNote When switching to this mode it is not necessary to set the base value. The base - * will automatically be set to the current best block. - */ - NORMAL, - - /** The peer is in side-chain. Sync backward to find the fork point. */ - BACKWARD, - - /** The peer is in side-chain. Sync forward to catch up. */ - FORWARD, - - /** - * The peer is far ahead of the local chain. Use lightning sync strategy of jumping forward - * to request blocks out-of-order ahead of import time. Continue by filling the gap to the - * next jump step. - */ - LIGHTNING, - - /** - * The peer was far ahead of the local chain and made a sync jump. Gradually return to - * normal syncing strategy, allowing time for old lightning sync requests to come in. - * - * @implNote When switching to this mode it is not necessary to set the base value. The base - * will automatically be set to the current best block. - */ - THUNDER - } - - // TODO: enforce rules on this - public enum State { - /** The initial state. */ - INITIAL, - - /** Status request, waiting for response. */ - STATUS_REQUESTED, - - /** Block headers request, waiting for response. */ - HEADERS_REQUESTED, - - /** Block bodies request, waiting for response. */ - BODIES_REQUESTED, - } - - // The syncing mode and the base block number - private Mode mode; - private long base; - - // used in FORWARD mode to prevent endlessly importing EXISTing blocks - // compute how many times to go forward without importing a new block - private int repeated; - - // The syncing status - private State state; - private long lastBestBlock = 0; - private long lastHeaderRequest; - - /** Creates a new peer state. */ - public PeerState(Mode mode, long base) { - this.mode = mode; - this.base = base; - - this.state = State.INITIAL; - } - - /** Copy constructor. */ - public PeerState(PeerState _state) { - this.copy(_state); - } - - public void copy(PeerState _state) { - this.mode = _state.mode; - this.base = _state.base; - this.repeated = _state.repeated; - this.state = _state.state; - this.lastBestBlock = _state.lastBestBlock; - this.lastHeaderRequest = _state.lastHeaderRequest; - } - - public long getLastBestBlock() { - return lastBestBlock; - } - - public void setLastBestBlock(long lastBestBlock) { - this.lastBestBlock = lastBestBlock; - } - - public Mode getMode() { - return mode; - } - - public void setMode(Mode mode) { - this.mode = mode; - this.resetRepeated(); - } - - /** - * Method for checking if the state is in one of the fast modes, namely {@link Mode#LIGHTNING} - * and {@link Mode#THUNDER}. - * - * @return {@code true} when the state is in one of the fast modes, {@code false} otherwise. - */ - public boolean isInFastMode() { - return mode == Mode.LIGHTNING || mode == Mode.THUNDER; - } - - public long getBase() { - return base; - } - - public void setBase(long base) { - this.base = base; - } - - public State getState() { - return state; - } - - public void setState(State state) { - this.state = state; - } - - public long getLastHeaderRequest() { - return lastHeaderRequest; - } - - public void setLastHeaderRequest(long lastStatusRequest) { - this.state = State.HEADERS_REQUESTED; - this.lastHeaderRequest = lastStatusRequest; - } - - public void resetLastHeaderRequest() { - this.state = State.INITIAL; - this.lastHeaderRequest = 0; - } - - public boolean isUnderRepeatThreshold() { - return repeated < STEP_COUNT; - } - - private void resetRepeated() { - this.repeated = 0; - } - - public void incRepeated() { - this.repeated++; - } - - public int getRepeated() { - return repeated; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PeerState peerState = (PeerState) o; - return base == peerState.base - && repeated == peerState.repeated - && lastBestBlock == peerState.lastBestBlock - && lastHeaderRequest == peerState.lastHeaderRequest - && mode == peerState.mode - && state == peerState.state; - } - - @Override - public int hashCode() { - return Objects.hash(mode, base, repeated, state, lastBestBlock, lastHeaderRequest); - } - - @Override - public String toString() { - return "{" - + mode.toString().charAt(0) - + ", " - + state.toString().substring(0, 2) - + ", " - + base - + ", " - + repeated - + ", " - + lastBestBlock - + ", " - + lastHeaderRequest - + '}'; - } -} diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java index 9983ed1d5c..24a4a83f49 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java @@ -6,7 +6,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,8 +54,6 @@ public final class SyncMgr { private SyncHeaderRequestManager syncHeaderRequestManager; - // peer syncing states - private final Map peerStates = new ConcurrentHashMap<>(); // store the downloaded headers from network private final BlockingQueue downloadedHeaders = new LinkedBlockingQueue<>(); // store the downloaded blocks that are ready to import @@ -70,18 +67,6 @@ public final class SyncMgr { private IEventMgr evtMgr; private SyncStats stats; private AtomicBoolean start = new AtomicBoolean(true); - // private ExecutorService workers = Executors.newFixedThreadPool(5); - private ExecutorService workers = - Executors.newCachedThreadPool( - new ThreadFactory() { - - private AtomicInteger cnt = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "sync-gh-" + cnt.incrementAndGet()); - } - }); private Thread syncGb = null; private Thread syncIb = null; @@ -212,7 +197,7 @@ public void init( stats, downloadedBlocks, importedBlockHashes, - peerStates, + syncHeaderRequestManager, _slowImportTime, _compactFrequency), "sync-ib"); @@ -251,17 +236,8 @@ private void getHeaders(BigInteger _selfTd) { log.debug("Downloaded blocks queue is full. Stop requesting headers"); } } else { - if (!workers.isShutdown()) { - workers.submit( - new TaskGetHeaders( - p2pMgr, - chain.getBestBlock().getNumber(), - _selfTd, - peerStates, - stats, - log)); - queueFull.set(false); - } + syncHeaderRequestManager.sendHeadersRequests(chain.getBestBlock().getNumber(), _selfTd, p2pMgr, stats); + queueFull.set(false); } } @@ -398,7 +374,6 @@ public long getNetworkBestBlockNumber() { public synchronized void shutdown() { start.set(false); - workers.shutdown(); interruptAndWait(syncGb, 10000); interruptAndWait(syncIb, 10000); diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java deleted file mode 100644 index a0c72d211d..0000000000 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java +++ /dev/null @@ -1,181 +0,0 @@ -package org.aion.zero.impl.sync; - -import static org.aion.p2p.P2pConstant.BACKWARD_SYNC_STEP; -import static org.aion.p2p.P2pConstant.CLOSE_OVERLAPPING_BLOCKS; -import static org.aion.p2p.P2pConstant.FAR_OVERLAPPING_BLOCKS; -import static org.aion.p2p.P2pConstant.LARGE_REQUEST_SIZE; -import static org.aion.p2p.P2pConstant.REQUEST_SIZE; -import static org.aion.zero.impl.sync.PeerState.Mode.NORMAL; -import static org.aion.zero.impl.sync.PeerState.Mode.THUNDER; - -import java.math.BigInteger; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.stream.Collectors; -import org.aion.p2p.INode; -import org.aion.p2p.IP2pMgr; -import org.aion.zero.impl.sync.msg.ReqBlocksHeaders; -import org.aion.zero.impl.sync.statistics.RequestType; -import org.slf4j.Logger; - -/** @author chris */ -final class TaskGetHeaders implements Runnable { - - private final IP2pMgr p2p; - - private final long selfNumber; - - private final BigInteger selfTd; - - private final Map peerStates; - - private final SyncStats stats; - - private final Logger log; - - private final Random random = new Random(System.currentTimeMillis()); - - TaskGetHeaders( - IP2pMgr p2p, - long selfNumber, - BigInteger selfTd, - Map peerStates, - final SyncStats _stats, - Logger log) { - this.p2p = p2p; - this.selfNumber = selfNumber; - this.selfTd = selfTd; - this.peerStates = peerStates; - this.stats = _stats; - this.log = log; - } - - /** Checks that the peer's total difficulty is higher than or equal to the local chain. */ - private boolean isAdequateTotalDifficulty(INode n) { - return n.getTotalDifficulty() != null && n.getTotalDifficulty().compareTo(this.selfTd) >= 0; - } - - /** Checks that the required time has passed since the last request. */ - private boolean isTimelyRequest(long now, INode n) { - return (now - 5000) - > peerStates - .computeIfAbsent(n.getIdHash(), k -> new PeerState(NORMAL, selfNumber)) - .getLastHeaderRequest(); - } - - @Override - public void run() { - // get all active nodes - Collection nodes = this.p2p.getActiveNodes().values(); - - // filter nodes by total difficulty - long now = System.currentTimeMillis(); - List nodesFiltered = - nodes.stream() - .filter(n -> isAdequateTotalDifficulty(n) && isTimelyRequest(now, n)) - .collect(Collectors.toList()); - - if (nodesFiltered.isEmpty()) { - return; - } - - // pick one random node - INode node = nodesFiltered.get(random.nextInt(nodesFiltered.size())); - - // fetch the peer state - PeerState state = peerStates.get(node.getIdHash()); - - // decide the start block number - long from = 0; - int size = REQUEST_SIZE; - - state.setLastBestBlock(node.getBestBlockNumber()); - - switch (state.getMode()) { - case LIGHTNING: - { - // request far forward blocks - if (state.getBase() > selfNumber + LARGE_REQUEST_SIZE - // there have not been STEP_COUNT sequential requests - && state.isUnderRepeatThreshold()) { - size = LARGE_REQUEST_SIZE; - from = state.getBase(); - break; - } else { - // transition to ramp down strategy - state.setMode(THUNDER); - } - } - case THUNDER: - { - // there have not been STEP_COUNT sequential requests - if (state.isUnderRepeatThreshold()) { - state.setBase(selfNumber); - size = LARGE_REQUEST_SIZE; - from = Math.max(1, selfNumber - FAR_OVERLAPPING_BLOCKS); - break; - } else { - // behave as normal - state.setMode(NORMAL); - } - } - case NORMAL: - { - // update base block - state.setBase(selfNumber); - - // normal mode - long nodeNumber = node.getBestBlockNumber(); - if (nodeNumber >= selfNumber + BACKWARD_SYNC_STEP) { - from = Math.max(1, selfNumber - FAR_OVERLAPPING_BLOCKS); - } else if (nodeNumber >= selfNumber - BACKWARD_SYNC_STEP) { - from = Math.max(1, selfNumber - CLOSE_OVERLAPPING_BLOCKS); - } else { - // no need to request from this node. His TD is probably corrupted. - return; - } - break; - } - case BACKWARD: - { - int backwardStep; - // the randomness improves performance when - // multiple peers are on the side-chain - if (random.nextBoolean()) { - // step back by REQUEST_SIZE to BACKWARD_SYNC_STEP blocks - backwardStep = size * (random.nextInt(BACKWARD_SYNC_STEP / size) + 1); - } else { - // step back by BACKWARD_SYNC_STEP blocks - backwardStep = BACKWARD_SYNC_STEP; - } - from = Math.max(1, state.getBase() - backwardStep); - break; - } - case FORWARD: - { - // start from base block - from = state.getBase() + 1; - break; - } - } - - // send request - if (log.isDebugEnabled()) { - log.debug( - "", - state.getMode(), - from, - size, - node.getIdShort()); - } - ReqBlocksHeaders rbh = new ReqBlocksHeaders(from, size); - this.p2p.send(node.getIdHash(), node.getIdShort(), rbh); - stats.updateTotalRequestsToPeer(node.getIdShort(), RequestType.STATUS); - stats.updateRequestTime(node.getIdShort(), System.nanoTime(), RequestType.HEADERS); - - // update timestamp - state.setLastHeaderRequest(now); - } -} diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java index d0064720c3..7869334391 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java @@ -1,35 +1,24 @@ package org.aion.zero.impl.sync; -import static org.aion.p2p.P2pConstant.COEFFICIENT_NORMAL_PEERS; -import static org.aion.p2p.P2pConstant.LARGE_REQUEST_SIZE; -import static org.aion.p2p.P2pConstant.MAX_NORMAL_PEERS; -import static org.aion.zero.impl.sync.PeerState.Mode.BACKWARD; -import static org.aion.zero.impl.sync.PeerState.Mode.FORWARD; -import static org.aion.zero.impl.sync.PeerState.Mode.LIGHTNING; -import static org.aion.zero.impl.sync.PeerState.Mode.NORMAL; -import static org.aion.zero.impl.sync.PeerState.Mode.THUNDER; +import static org.aion.zero.impl.sync.SyncHeaderRequestManager.SyncMode.BACKWARD; +import static org.aion.zero.impl.sync.SyncHeaderRequestManager.SyncMode.FORWARD; +import static org.aion.zero.impl.sync.SyncHeaderRequestManager.SyncMode.NORMAL; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; - import org.aion.mcf.blockchain.Block; import org.aion.zero.impl.core.ImportResult; -import org.aion.p2p.P2pConstant; import org.aion.util.types.ByteArrayWrapper; import org.aion.zero.impl.SystemExitCodes; import org.aion.zero.impl.blockchain.AionBlockchainImpl; import org.aion.zero.impl.db.AionBlockStore; -import org.aion.zero.impl.sync.PeerState.Mode; +import org.aion.zero.impl.sync.SyncHeaderRequestManager.SyncMode; import org.aion.zero.impl.sync.statistics.BlockType; import org.slf4j.Logger; @@ -52,14 +41,11 @@ final class TaskImportBlocks implements Runnable { private final Map importedBlockHashes; - private final Map peerStates; + private final SyncHeaderRequestManager syncHeaderRequestManager; private final Logger log; private final Logger surveyLog; - private SortedSet baseList; - private PeerState state; - private final int slowImportTime; private final int compactFrequency; @@ -73,7 +59,7 @@ final class TaskImportBlocks implements Runnable { final SyncStats _syncStats, final BlockingQueue _downloadedBlocks, final Map _importedBlockHashes, - final Map _peerStates, + final SyncHeaderRequestManager syncHeaderRequestManager, final int _slowImportTime, final int _compactFrequency) { this.log = syncLog; @@ -83,9 +69,7 @@ final class TaskImportBlocks implements Runnable { this.syncStats = _syncStats; this.downloadedBlocks = _downloadedBlocks; this.importedBlockHashes = _importedBlockHashes; - this.peerStates = _peerStates; - this.baseList = new TreeSet<>(); - this.state = new PeerState(NORMAL, 0L); + this.syncHeaderRequestManager = syncHeaderRequestManager; this.slowImportTime = _slowImportTime; this.compactFrequency = _compactFrequency; this.lastCompactTime = System.currentTimeMillis(); @@ -112,11 +96,11 @@ public void run() { } startTime = System.nanoTime(); - PeerState peerState = peerStates.get(bw.getNodeIdHash()); + SyncMode syncMode = syncHeaderRequestManager.getSyncMode(bw.getNodeIdHash()); duration = System.nanoTime() - startTime; surveyLog.info("Import Stage 2: wait for peer state, duration = {} ns.", duration); - if (peerState == null) { + if (syncMode == null) { // ignoring these blocks log.warn("Peer {} sent blocks that were not requested.", bw.getDisplayId()); } else { // the peerState is not null after this @@ -125,29 +109,15 @@ public void run() { duration = System.nanoTime() - startTime; surveyLog.info("Import Stage 3: filter batch, duration = {} ns.", duration); - if (log.isDebugEnabled()) { - log.debug( - "", - bw.getDisplayId(), - peerState.getMode(), - peerState.getBase()); - } - startTime = System.nanoTime(); // process batch and update the peer state - peerState.copy(processBatch(peerState, batch, bw.getDisplayId())); + SyncMode newMode = processBatch(syncMode, batch, bw.getDisplayId()); duration = System.nanoTime() - startTime; surveyLog.info("Import Stage 4: process received and disk batches, duration = {} ns.", duration); - // so we can continue immediately - peerState.resetLastHeaderRequest(); - - if (log.isDebugEnabled()) { - log.debug( - "", - bw.getDisplayId(), - peerState.getMode(), - peerState.getBase()); + // transition to recommended sync mode + if (syncMode != newMode) { + syncHeaderRequestManager.runInMode(bw.getNodeIdHash(), newMode); } syncStats.update(getBestBlockNumber()); @@ -199,44 +169,25 @@ private static boolean isNotRestricted(Block b, AionBlockchainImpl chain) { } /** @implNote This method is called only when state is not null. */ - private PeerState processBatch(PeerState givenState, List batch, String displayId) { + private SyncMode processBatch(SyncMode syncMode, List batch, String displayId) { // for runtime survey information long startTime, duration; - // make a copy of the original state - state.copy(Objects.requireNonNull(givenState)); - - // new batch received -> add another iteration to the count - state.incRepeated(); - // all blocks were filtered out // interpreted as repeated work if (batch.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug( - "Empty batch received from node = {} in mode = {} with base = {}.", - displayId, - givenState.getMode(), - givenState.getBase()); - } + log.debug("Empty batch received from node = {} in mode = {}.", displayId, syncMode); - if (state.getMode() == BACKWARD || state.getMode() == FORWARD) { - // multiple peers are doing the same BACKWARD/FORWARD pass - // TODO: verify that this improves efficiency - // TODO: impact of allowing the LIGHTNING jump instead? - state.setMode(NORMAL); - return state; - } else { - return attemptLightningJump( - getBestBlockNumber(), state, peerStates.values(), baseList, chain); - } + // this transition is useful regardless of the given syncMode + // the responses from multiple peers overlapped (because the batch was empty) + // we therefore reset this peer to (possibly) do something other than its previous mode + return NORMAL; } // the batch cannot be empty henceforth // check last block in batch to see if we can skip batch - if (givenState.getMode() != BACKWARD) { + if (syncMode != BACKWARD) { Block b = batch.get(batch.size() - 1); - Mode mode = givenState.getMode(); // last block already exists // implies the full batch was already imported (but not filtered by the queue) @@ -245,23 +196,13 @@ private PeerState processBatch(PeerState givenState, List batch, String d importedBlockHashes.put(ByteArrayWrapper.wrap(b.getHash()), true); // skipping the batch - if (log.isDebugEnabled()) { - log.debug( - "Skip {} blocks from node = {} in mode = {} with base = {}.", - batch.size(), - displayId, - givenState.getMode(), - givenState.getBase()); - } + log.debug("Skip {} blocks from node = {} in mode = {}.", batch.size(), displayId, syncMode); batch.clear(); - // updating the state - if (mode == FORWARD) { - return forwardModeUpdate(state, b.getNumber(), ImportResult.EXIST); + if (syncMode == FORWARD) { + return FORWARD; } else { - // mode in { NORMAL, LIGHTNING, THUNDER } - return attemptLightningJump( - getBestBlockNumber(), state, peerStates.values(), baseList, chain); + return NORMAL; } } } @@ -269,11 +210,12 @@ private PeerState processBatch(PeerState givenState, List batch, String d // remembering imported range long first = -1L, last = -1L; ImportResult importResult; + SyncMode returnMode = syncMode; startTime = System.nanoTime(); for (Block b : batch) { try { - importResult = importBlock(b, displayId, givenState); + importResult = importBlock(b, displayId, syncMode); if (importResult.isStored()) { importedBlockHashes.put(ByteArrayWrapper.wrap(b.getHash()), true); @@ -296,91 +238,22 @@ private PeerState processBatch(PeerState givenState, List batch, String d // decide whether to change mode based on the first if (b == batch.get(0)) { first = b.getNumber(); - Mode mode = givenState.getMode(); // if any block results in NO_PARENT, all subsequent blocks will too if (importResult == ImportResult.NO_PARENT) { storePendingBlocks(batch, displayId); - if (log.isDebugEnabled()) { - log.debug( - "Stopped importing batch due to NO_PARENT result. " - + "Batch of {} blocks starting at hash = {}, number = {} from node = {} delegated to storage.", - batch.size(), - b.getShortHash(), - b.getNumber(), - displayId); - } else { - // message used instead of import NO_PARENT ones - if (state.isInFastMode()) { - log.info( - "", - batch.size(), - displayId, - b.getShortHash(), - b.getNumber(), - b.getTransactionsList().size()); - } - } - - switch (mode) { - case FORWARD: - { - // switch to backward mode - state.setMode(BACKWARD); - state.setBase(b.getNumber()); - break; - } - case NORMAL: - { - // switch to backward mode - state.setMode(BACKWARD); - state.setBase(b.getNumber()); - break; - } - case BACKWARD: - { - // update base - state.setBase(b.getNumber()); - break; - } - case LIGHTNING: - { - state.setBase(b.getNumber() + batch.size()); - break; - } - case THUNDER: - break; + // check if it is below the current importable blocks + if (b.getNumber() <= getBestBlockNumber() + 1) { + duration = System.nanoTime() - startTime; + surveyLog.info("Import Stage 4.A: import received batch, duration = {} ns.", duration); + return BACKWARD; } - // exit loop after NO_PARENT result - break; } else if (importResult.isStored()) { - // assuming the remaining blocks will be imported. if not, the state - // and base will be corrected by the next cycle - long lastBlock = batch.get(batch.size() - 1).getNumber(); - - switch (mode) { - case BACKWARD: - // we found the fork point - state.setMode(FORWARD); - state.setBase(lastBlock); - break; - case FORWARD: - state = forwardModeUpdate(state, lastBlock, importResult); - break; - case LIGHTNING: - case THUNDER: - state = - attemptLightningJump( - getBestBlockNumber(), - state, - peerStates.values(), - baseList, - chain); - break; - case NORMAL: - default: - break; + if (syncMode == BACKWARD) { + returnMode = FORWARD; + } else if (syncMode == FORWARD && importResult.isBest()) { + returnMode = NORMAL; } } } @@ -391,163 +264,12 @@ private PeerState processBatch(PeerState givenState, List batch, String d startTime = System.nanoTime(); // check for stored blocks if (first < last) { - int imported = importFromStorage(state, first, last); - if (imported > 0) { - // TODO: may have already updated torrent mode - if (state.getMode() == LIGHTNING) { - if (state.getBase() == givenState.getBase() // was not already updated - || state.getBase() <= getBestBlockNumber() + P2pConstant.REQUEST_SIZE) { - state = - attemptLightningJump( - getBestBlockNumber(), - state, - peerStates.values(), - baseList, - chain); - } // else already updated to a correct request - duration = System.nanoTime() - startTime; - surveyLog.info("Import Stage 4.B: process all disk batches, duration = {} ns.", duration); - return state; - } else if (state.getMode() == BACKWARD || state.getMode() == FORWARD) { - // TODO: verify that this improves efficiency - // TODO: impact of allowing the LIGHTNING jump instead? - state.setMode(NORMAL); - duration = System.nanoTime() - startTime; - surveyLog.info("Import Stage 4.B: process all disk batches, duration = {} ns.", duration); - return state; - } - } + returnMode = importFromStorage(returnMode, first, last); } duration = System.nanoTime() - startTime; surveyLog.info("Import Stage 4.B: process all disk batches, duration = {} ns.", duration); - return state; - } - - /** - * Utility method that updates the given state to a LIGHTNING jump when the jump conditions - * (balancing the number of fast and normal states) are met. If a jump is not possible (due to - * the requirement of having a best block status larger than the selected base value) for a - * state that is already in LIGHTNING mode, the state is changed to THUNDER mode. - * - * @param best the starting point value for the attempted jump - * @param state the state to be modified for the jump or ramp down - * @param states all the existing peer states are the time of the method call used for checking - * if the jump conditions are met - * @param baseSet sorted set of generated values that can be used as base for the jump - * @param chain the blockchain where the blocks will be imported which can be used to expand the - * set of base value options - * @return a state modified for a LIGHTNING when possible, otherwise a state in THUNDER (ramp - * down) mode if the state was previously in LIGHTNING mode, or an unchanged state when none - * of the before mentioned conditions are met. - * @implNote Typically called when {@link PeerState#getMode()} in { {@link - * PeerState.Mode#NORMAL}, {@link PeerState.Mode#LIGHTNING}, {@link PeerState.Mode#THUNDER} - * }, but the same behaviour of jumping ahead will be applied if the give state mode is - * {@link PeerState.Mode#BACKWARD} or {@link PeerState.Mode#FORWARD}. - */ - static PeerState attemptLightningJump( - long best, - PeerState state, - Collection states, - SortedSet baseSet, - AionBlockchainImpl chain) { - - // no need to count states if already in LIGHTNING - if (state.getMode() == LIGHTNING) { - // select the base to be used - long nextBase = selectBase(best, state.getLastBestBlock(), baseSet, chain); - - // determine if base is future block - if (nextBase > best) { - // determine if a jump is possible - if (state.getLastBestBlock() > nextBase + LARGE_REQUEST_SIZE) { - // new jump resets the repeated count - state.setMode(LIGHTNING); - state.setBase(nextBase); - } else { - // can't jump so ramp down - state.setMode(THUNDER); - // recycle unused base - baseSet.add(nextBase); - } - } else { - // can't jump so ramp down - state.setMode(THUNDER); - } - } else { - // compute the relevant state count - long normalStates = - countStates(best, NORMAL, states) + countStates(best, THUNDER, states); - long fastStates = countStates(best, LIGHTNING, states); - - // the fast vs normal states balance depends on the give coefficient - if (fastStates < COEFFICIENT_NORMAL_PEERS * normalStates - // with a maximum number of normal states - || normalStates > MAX_NORMAL_PEERS) { - - // select the base to be used - long nextBase = selectBase(best, state.getLastBestBlock(), baseSet, chain); - - // determine if base is future block - if (nextBase > best) { - // determine if a jump is possible - if (state.getLastBestBlock() > nextBase + LARGE_REQUEST_SIZE) { - state.setMode(LIGHTNING); - state.setBase(nextBase); - } else { - // recycle unused base - baseSet.add(nextBase); - } - } - } - } - return state; - } - - /** - * Utility method that computes the number of states from the given ones that have the give mode - * and a last best block status larger than the given number. - * - * @param states the list of peer states to be explored - * @param mode the state mode we are searching for - * @param best the minimum accepted last best block status for the peer - * @return the number of states that satisfy the condition above. - */ - static long countStates(long best, Mode mode, Collection states) { - return states.stream() - .filter(s -> s.getLastBestBlock() > best) - .filter(s -> s.getMode() == mode) - .count(); - } - - /** - * Utility method that selects a number greater or equal to the given best representing the base - * value for the next LIGHTNING request. The returned base will be either retrieved from the set - * of previously generated values that have not yet been used or a new value generated by - * calling the given chain's {@link AionBlockchainImpl#nextBase(long, long)} method. - * - * @param best the starting point value for the next base - * @param baseSet list of already generated values - * @param chain the blockchain where the blocks will be imported which can be used to expand the - * set of base value options - * @return the next base from the set or the given best value when the set does not contain any - * values greater than it. - */ - static long selectBase( - long best, long knownStatus, SortedSet baseSet, AionBlockchainImpl chain) { - // remove bases that are no longer relevant - while (!baseSet.isEmpty() && baseSet.first() <= best) { - baseSet.remove(baseSet.first()); - } - - if (baseSet.isEmpty()) { - // generate new possible base value - return chain.nextBase(best, knownStatus); - } else { - Long first = baseSet.first(); - baseSet.remove(first); - return first; - } + return returnMode; } /** @@ -558,13 +280,13 @@ static long selectBase( * @param block the block for which we need to determine if it is already stored or not * @return {@code true} if the given block exists in the block store, {@code false} otherwise. * @apiNote Should be used when we aim to bypass any recovery methods set in place for importing - * old blocks, for example when blocks are imported in {@link PeerState.Mode#FORWARD} mode. + * old blocks, for example when blocks are imported in {@link SyncMode#FORWARD} mode. */ static boolean isAlreadyStored(AionBlockStore store, Block block) { return store.getMaxNumber() >= block.getNumber() && store.isBlockStored(block.getHash(), block.getNumber()); } - private ImportResult importBlock(Block b, String displayId, PeerState state) { + private ImportResult importBlock(Block b, String displayId, SyncMode mode) { ImportResult importResult; long t1 = System.currentTimeMillis(); importResult = this.chain.tryToConnect(b); @@ -574,7 +296,7 @@ private ImportResult importBlock(Block b, String displayId, PeerState state) { log.debug( "", displayId, - (state != null ? state.getMode() : NORMAL), + mode, b.getShortHash(), b.getNumber(), b.getTransactionsList().size(), @@ -585,8 +307,7 @@ private ImportResult importBlock(Block b, String displayId, PeerState state) { } else { // not printing this message when the state is in fast mode with no parent result // a different message will be printed to indicate the storage of blocks - if (log.isInfoEnabled() - && (!state.isInFastMode() || importResult != ImportResult.NO_PARENT)) { + if (log.isInfoEnabled() && (importResult != ImportResult.NO_PARENT)) { log.info( "", displayId, @@ -615,42 +336,12 @@ private ImportResult importBlock(Block b, String displayId, PeerState state) { return importResult; } - /** - * Utility method that sets the base for the next FORWARD request OR switches to NORMAL mode - * when (1) a block import resulted in an IMPORTED_BEST result or (2) the maximum number of - * repetitions has been reached. - * - * @implNote Reaching the maximum number of repetitions allowed means that the FORWARD requests - * have covered the scope of blocks between the BACKWARD request that has had a NO_PARENT - * result and the subsequent BACKWARD request that got an EXIST / IMPORTED_BEST / - * IMPORTED_NOT_BEST result. Effectively covering this space without storing the blocks - * means that either an error has occurred or that another peer has already imported these - * blocks. The second scenario is the most likely which makes switching to NORMAL mode the - * natural consequence. - * @param state the peer state to be updated - * @param lastBlock the last imported block number - * @param importResult the result for the last imported block - * @return an updated state according to the description above. - */ - static PeerState forwardModeUpdate(PeerState state, long lastBlock, ImportResult importResult) { - // when the maximum number of repeats has passed - // the peer is stuck behind other peers importing the same (old) blocks - if (importResult.isBest() || !state.isUnderRepeatThreshold()) { - state.setMode(NORMAL); - } else { - // in case we continue as FORWARD - state.setBase(lastBlock); - } - - return state; - } - /** * Imports blocks from storage as long as there are blocks to import. * * @return the total number of imported blocks from all iterations */ - private int importFromStorage(PeerState state, long first, long last) { + private SyncMode importFromStorage(SyncMode givenMode, long first, long last) { // for runtime survey information long startTime, duration; @@ -714,7 +405,7 @@ private int importFromStorage(PeerState state, long first, long last) { startTime = System.nanoTime(); for (Block b : batchFromDisk) { try { - importResult = importBlock(b, "STORAGE", state); + importResult = importBlock(b, "STORAGE", givenMode); if (importResult.isStored()) { importedBlockHashes.put(ByteArrayWrapper.wrap(b.getHash()), true); @@ -753,12 +444,16 @@ private int importFromStorage(PeerState state, long first, long last) { level++; } + log.debug("Imported {} blocks from storage.", imported); + // switch to NORMAL if in FORWARD mode - if (importResult.isBest() && state.getMode() == FORWARD) { - state.setMode(NORMAL); + if (importResult.isBest()) { + return NORMAL; + } else if (importResult.isStored() && givenMode == BACKWARD) { + return FORWARD; } - return imported; + return givenMode; } private long getBestBlockNumber() { diff --git a/modAionImpl/test/org/aion/zero/impl/db/PendingBlockStoreTest.java b/modAionImpl/test/org/aion/zero/impl/db/PendingBlockStoreTest.java index ea4d7377e6..30fe53ed91 100644 --- a/modAionImpl/test/org/aion/zero/impl/db/PendingBlockStoreTest.java +++ b/modAionImpl/test/org/aion/zero/impl/db/PendingBlockStoreTest.java @@ -2,8 +2,6 @@ import static com.google.common.truth.Truth.assertThat; import static org.aion.zero.impl.db.DatabaseUtils.deleteRecursively; -import static org.aion.p2p.P2pConstant.LARGE_REQUEST_SIZE; -import static org.aion.p2p.P2pConstant.STEP_COUNT; import java.io.File; import java.util.ArrayList; @@ -399,113 +397,4 @@ public void testDropPendingQueues_wSingleQueue() { assertThat(pb.getLevelSize()).isEqualTo(1); assertThat(pb.getQueueSize()).isEqualTo(1); } - - @Test - public void testNextBase_wException() { - Properties props = new Properties(); - props.setProperty(Props.DB_TYPE, DBVendor.MOCKDB.toValue()); - - PendingBlockStore pb = null; - try { - pb = new PendingBlockStore(props); - } catch (InvalidFilePathException e) { - e.printStackTrace(); - } - assertThat(pb.isOpen()).isTrue(); - - Block block = TestResources.consecutiveBlocks(1).get(0); - - long current = block.getNumber() + 1; // above last status - long knownBest = block.getNumber(); // will define the max status - long expected = current; - - // closing the pending block store to cause exception - pb.close(); - - assertThat(pb.nextBase(current, knownBest)).isEqualTo(expected); - } - - @Test - public void testNextBase_woStatus() { - Properties props = new Properties(); - props.setProperty(Props.DB_TYPE, DBVendor.MOCKDB.toValue()); - - PendingBlockStore pb = null; - try { - pb = new PendingBlockStore(props); - } catch (InvalidFilePathException e) { - e.printStackTrace(); - } - assertThat(pb.isOpen()).isTrue(); - - long current = 100L; - long knownBest = 0L; // not yet known - long expected = current + STEP_COUNT * LARGE_REQUEST_SIZE; - assertThat(pb.nextBase(current, knownBest)).isEqualTo(expected); - } - - @Test - public void testNextBase_wSmallKnownBest() { - Properties props = new Properties(); - props.setProperty(Props.DB_TYPE, DBVendor.MOCKDB.toValue()); - - PendingBlockStore pb = null; - try { - pb = new PendingBlockStore(props); - } catch (InvalidFilePathException e) { - e.printStackTrace(); - } - assertThat(pb.isOpen()).isTrue(); - - long current = 100L; - long knownBest = current + LARGE_REQUEST_SIZE - 1; - long expected = current; - assertThat(pb.nextBase(current, knownBest)).isEqualTo(expected); - } - - @Test - public void testNextBase_wSufficientKnownBest() { - Properties props = new Properties(); - props.setProperty(Props.DB_TYPE, DBVendor.MOCKDB.toValue()); - - PendingBlockStore pb = null; - try { - pb = new PendingBlockStore(props); - } catch (InvalidFilePathException e) { - e.printStackTrace(); - } - assertThat(pb.isOpen()).isTrue(); - - long current = 100L; - long knownBest = current + LARGE_REQUEST_SIZE; - long expected = current + STEP_COUNT * LARGE_REQUEST_SIZE; - // current will be chosen - assertThat(pb.nextBase(current, knownBest)).isEqualTo(expected); - - knownBest = expected + LARGE_REQUEST_SIZE; - expected = expected + STEP_COUNT * LARGE_REQUEST_SIZE; - // maxRequest will be chosen - assertThat(pb.nextBase(current, knownBest)).isEqualTo(expected); - } - - @Test - public void testNextBase_wFailedStatusSearch() { - Properties props = new Properties(); - props.setProperty(Props.DB_TYPE, DBVendor.MOCKDB.toValue()); - - PendingBlockStore pb = null; - try { - pb = new PendingBlockStore(props); - } catch (InvalidFilePathException e) { - e.printStackTrace(); - } - assertThat(pb.isOpen()).isTrue(); - - Block block = TestResources.consecutiveBlocks(1).get(0); - - long current = block.getNumber() + 1; // above last status - long knownBest = block.getNumber(); // will define the max status - long expected = current; - assertThat(pb.nextBase(current, knownBest)).isEqualTo(expected); - } } diff --git a/modAionImpl/test/org/aion/zero/impl/sync/TaskImportBlocksTest.java b/modAionImpl/test/org/aion/zero/impl/sync/TaskImportBlocksTest.java index 143eab40e7..2a3e7400c4 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/TaskImportBlocksTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/TaskImportBlocksTest.java @@ -1,43 +1,20 @@ package org.aion.zero.impl.sync; import static com.google.common.truth.Truth.assertThat; -import static org.aion.zero.impl.core.ImportResult.CONSENSUS_BREAK; -import static org.aion.zero.impl.core.ImportResult.EXIST; -import static org.aion.zero.impl.core.ImportResult.IMPORTED_BEST; -import static org.aion.zero.impl.core.ImportResult.IMPORTED_NOT_BEST; -import static org.aion.zero.impl.core.ImportResult.INVALID_BLOCK; -import static org.aion.zero.impl.core.ImportResult.NO_PARENT; -import static org.aion.p2p.P2pConstant.COEFFICIENT_NORMAL_PEERS; -import static org.aion.p2p.P2pConstant.LARGE_REQUEST_SIZE; -import static org.aion.p2p.P2pConstant.MAX_NORMAL_PEERS; import static org.aion.zero.impl.blockchain.BlockchainTestUtils.generateAccounts; import static org.aion.zero.impl.blockchain.BlockchainTestUtils.generateNewBlock; import static org.aion.zero.impl.blockchain.BlockchainTestUtils.generateNextBlock; import static org.aion.zero.impl.blockchain.BlockchainTestUtils.generateRandomChain; -import static org.aion.zero.impl.sync.PeerState.Mode.BACKWARD; -import static org.aion.zero.impl.sync.PeerState.Mode.FORWARD; -import static org.aion.zero.impl.sync.PeerState.Mode.LIGHTNING; -import static org.aion.zero.impl.sync.PeerState.Mode.NORMAL; -import static org.aion.zero.impl.sync.PeerState.Mode.THUNDER; -import static org.aion.zero.impl.sync.TaskImportBlocks.attemptLightningJump; +import static org.aion.zero.impl.core.ImportResult.IMPORTED_NOT_BEST; import static org.aion.zero.impl.sync.TaskImportBlocks.filterBatch; -import static org.aion.zero.impl.sync.TaskImportBlocks.forwardModeUpdate; import static org.aion.zero.impl.sync.TaskImportBlocks.isAlreadyStored; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.stream.Collectors; import junitparams.JUnitParamsRunner; -import junitparams.Parameters; import org.aion.crypto.ECKey; import org.aion.db.impl.DBVendor; import org.aion.db.impl.DatabaseFactory; @@ -46,9 +23,7 @@ import org.aion.zero.impl.db.RepositoryConfig; import org.aion.mcf.config.PruneConfig; import org.aion.util.types.ByteArrayWrapper; -import org.aion.zero.impl.blockchain.AionBlockchainImpl; import org.aion.zero.impl.blockchain.StandaloneBlockchain; -import org.aion.zero.impl.sync.PeerState.Mode; import org.aion.zero.impl.vm.AvmTestConfig; import org.junit.After; import org.junit.Before; @@ -72,103 +47,6 @@ public void tearDown() { AvmTestConfig.clearConfigurations(); } - /** @return parameters for {@link #testCountStates(long, long, Mode, Collection)} */ - @SuppressWarnings("unused") - private Object parametersForTestCountStates() { - List parameters = new ArrayList<>(); - - PeerState state; - List set1 = new ArrayList<>(); - for (Mode mode : Mode.values()) { - state = new PeerState(mode, 10L); - state.setLastBestBlock(100L); - set1.add(state); - } - - List set2 = new ArrayList<>(set1); - for (Mode mode : Mode.values()) { - state = new PeerState(mode, 10L); - state.setLastBestBlock(200L); - set2.add(state); - } - - for (Mode mode : Mode.values()) { - parameters.add(new Object[] {0L, -1L, mode, Collections.emptySet()}); - parameters.add(new Object[] {1L, 50L, mode, set1}); - parameters.add(new Object[] {0L, 100L, mode, set1}); - parameters.add(new Object[] {2L, 99L, mode, set2}); - parameters.add(new Object[] {1L, 100L, mode, set2}); - parameters.add(new Object[] {1L, 199L, mode, set2}); - parameters.add(new Object[] {0L, 200L, mode, set2}); - List set3 = - new ArrayList<>(set2) - .stream().filter(s -> s.getMode() != mode).collect(Collectors.toList()); - parameters.add(new Object[] {0L, -1L, mode, set3}); - } - - return parameters.toArray(); - } - - @Test - @Parameters(method = "parametersForTestCountStates") - public void testCountStates(long expected, long best, Mode mode, Collection set) { - long actual = TaskImportBlocks.countStates(best, mode, set); - assertThat(actual).isEqualTo(expected); - } - - /** @return parameters for {@link #testSelectBase(long, long, SortedSet, SortedSet)} */ - @SuppressWarnings("unused") - private Object parametersForTestSelectBase() { - List parameters = new ArrayList<>(); - - SortedSet emptySet = new TreeSet<>(); - parameters.add(new Object[] {100L, 100L, new TreeSet(), new TreeSet()}); - parameters.add(new Object[] {200L, 100L, new TreeSet(), new TreeSet()}); - - SortedSet set1 = new TreeSet<>(); - set1.add(200L); - parameters.add(new Object[] {200L, 100L, set1, new TreeSet()}); - - SortedSet set2 = new TreeSet<>(); - set2.add(10L); - set2.add(50L); - set2.add(100L); - set2.add(200L); - set2.add(300L); - SortedSet expectedSet = new TreeSet<>(); - expectedSet.add(300L); - parameters.add(new Object[] {200L, 100L, set2, expectedSet}); - - SortedSet set3 = new TreeSet<>(); - set3.addAll(set2); - parameters.add(new Object[] {300L, 300L, set3, new TreeSet()}); - - set3 = new TreeSet<>(); - set3.addAll(set2); - parameters.add(new Object[] {400L, 300L, set3, new TreeSet()}); - - SortedSet set4 = new TreeSet<>(); - set3.addAll(set2); - parameters.add(new Object[] {310L, 310L, set4, new TreeSet()}); - - set4 = new TreeSet<>(); - set3.addAll(set2); - parameters.add(new Object[] {400L, 350L, set4, new TreeSet()}); - - return parameters.toArray(); - } - - @Test - @Parameters(method = "parametersForTestSelectBase") - public void testSelectBase( - long expected, long best, SortedSet set, SortedSet expectedSet) { - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(best, 0L)).thenReturn(expected); - - assertThat(TaskImportBlocks.selectBase(best, 0L, set, chain)).isEqualTo(expected); - assertThat(set).isEqualTo(expectedSet); - } - @Test public void testIsAlreadyStored() { StandaloneBlockchain.Bundle bundle = @@ -314,344 +192,4 @@ public Properties getDatabaseConfig(String db_name) { // will filter out the prune restricted blocks assertThat(filterBatch(allBlocks, chain, new HashMap<>())).isEqualTo(unrestrictedBlocks); } - - @Test - public void testAttemptLightningJump_wLightningState_wJump() { - long returnedBase = 60L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE + 1; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - PeerState input = new PeerState(LIGHTNING, -1L); - input.setLastBestBlock(knownStatus); - - // with new jump base - PeerState expected = new PeerState(input); - expected.setBase(returnedBase); - - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, null, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - @Test - public void testAttemptLightningJump_wLightningState_wRampDown_andBaseRecycling() { - long returnedBase = 60L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - PeerState input = new PeerState(LIGHTNING, -1L); - input.setLastBestBlock(knownStatus); - - // with new jump base - PeerState expected = new PeerState(input); - expected.setMode(THUNDER); - - // expecting no change - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, null, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(1); - assertThat(baseSet.contains(returnedBase)).isTrue(); - } - - @Test - public void testAttemptLightningJump_wLightningState_wRampDown() { - long returnedBase = -1L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - PeerState input = new PeerState(LIGHTNING, -1L); - input.setLastBestBlock(knownStatus); - - // with new jump base - PeerState expected = new PeerState(input); - expected.setMode(THUNDER); - - // expecting no change - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, null, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - /** - * Used as parameters for: - * - *
    - *
  • {@link #testAttemptLightningJump_wMinNormalPeers(Mode)} - *
  • {@link #testAttemptLightningJump_wManyFastStates_wMaxNormalStates(Mode)} - *
  • {@link #testAttemptLightningJump_wFewFastStates_wJump(Mode)} - *
  • {@link #testAttemptLightningJump_wFewFastStates_wRampDown_andBaseRecycling(Mode)} - *
  • {@link #testAttemptLightningJump_wFewFastStates_wRampDown(Mode)} - *
  • {@link #testAttemptLightningJump_wManyNormalStates_wJump(Mode)} - *
  • {@link #testAttemptLightningJump_wManyNormalStates_wRampDown_andBaseRecycling(Mode)} - *
  • {@link #testAttemptLightningJump_wManyNormalStates_wRampDown(Mode)} - *
- */ - @SuppressWarnings("unused") - private Object allModesExceptLightning() { - return new Object[] {NORMAL, THUNDER, BACKWARD, FORWARD}; - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wManyFastStates_wMaxNormalStates(Mode mode) { - List states = new ArrayList<>(); - - // exactly max normal states - long normalStates = MAX_NORMAL_PEERS; - addStates(states, normalStates, NORMAL, 100L); - - // more than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - PeerState expected = new PeerState(input); - - // expecting no change in the input value - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, null)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wFewFastStates_wJump(Mode mode) { - long returnedBase = 60L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE + 1; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - List states = new ArrayList<>(); - - // exactly max normal states - long normalStates = MAX_NORMAL_PEERS; - addStates(states, normalStates, NORMAL, 100L); - - // less than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates - 1; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - input.setLastBestBlock(knownStatus); - - PeerState expected = new PeerState(input); - expected.setMode(LIGHTNING); - expected.setBase(returnedBase); - - // expecting correct jump state - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wFewFastStates_wRampDown_andBaseRecycling(Mode mode) { - long returnedBase = 60L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - List states = new ArrayList<>(); - - // exactly max normal states - long normalStates = MAX_NORMAL_PEERS; - addStates(states, normalStates, NORMAL, 100L); - - // less than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates - 1; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - input.setLastBestBlock(knownStatus); - - PeerState expected = new PeerState(input); - - // expecting no change - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(1); - assertThat(baseSet.contains(returnedBase)).isTrue(); - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wFewFastStates_wRampDown(Mode mode) { - long returnedBase = -1L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - List states = new ArrayList<>(); - - // exactly max normal states - long normalStates = MAX_NORMAL_PEERS; - addStates(states, normalStates, NORMAL, 100L); - - // less than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates - 1; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - input.setLastBestBlock(knownStatus); - - PeerState expected = new PeerState(input); - - // expecting no change - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wManyNormalStates_wJump(Mode mode) { - long returnedBase = 60L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE + 1; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - List states = new ArrayList<>(); - - // more than max normal states - long normalStates = MAX_NORMAL_PEERS + 1; - addStates(states, normalStates, NORMAL, 100L); - - // more than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - input.setLastBestBlock(knownStatus); - - PeerState expected = new PeerState(input); - expected.setMode(LIGHTNING); - expected.setBase(returnedBase); - - // expecting correct jump state - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wManyNormalStates_wRampDown_andBaseRecycling(Mode mode) { - long returnedBase = 60L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - List states = new ArrayList<>(); - - // more than max normal states - long normalStates = MAX_NORMAL_PEERS + 1; - addStates(states, normalStates, NORMAL, 100L); - - // more than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - input.setLastBestBlock(knownStatus); - - PeerState expected = new PeerState(input); - - // expecting no change - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(1); - assertThat(baseSet.contains(returnedBase)).isTrue(); - } - - @Test - @Parameters(method = "allModesExceptLightning") - public void testAttemptLightningJump_wManyNormalStates_wRampDown(Mode mode) { - long returnedBase = -1L; - long knownStatus = returnedBase + LARGE_REQUEST_SIZE; - - AionBlockchainImpl chain = mock(AionBlockchainImpl.class); - when(chain.nextBase(-1L, knownStatus)).thenReturn(returnedBase); - - List states = new ArrayList<>(); - - // more than max normal states - long normalStates = MAX_NORMAL_PEERS + 1; - addStates(states, normalStates, NORMAL, 100L); - - // more than balanced fast states - long fastStates = COEFFICIENT_NORMAL_PEERS * normalStates; - addStates(states, fastStates, LIGHTNING, 100L); - - PeerState input = new PeerState(mode, 100L); - input.setLastBestBlock(knownStatus); - - PeerState expected = new PeerState(input); - - // expecting no change - SortedSet baseSet = new TreeSet<>(); - assertThat(attemptLightningJump(-1L, input, states, baseSet, chain)).isEqualTo(expected); - assertThat(baseSet.size()).isEqualTo(0); - } - - /** Utility method that generates states and adds them to the given list. */ - private static void addStates(List states, long count, Mode mode, long base) { - for (long i = 0; i < count; i++) { - states.add(new PeerState(mode, base)); - } - } - - /** - * Used as parameters for: - * - *
    - *
  • {@link #testForwardModeUpdate(ImportResult)} - *
- */ - @SuppressWarnings("unused") - private Object allImportResultsExceptImportedBest() { - return new Object[] {IMPORTED_NOT_BEST, EXIST, NO_PARENT, INVALID_BLOCK, CONSENSUS_BREAK}; - } - - @Test - @Parameters(method = "allImportResultsExceptImportedBest") - public void testForwardModeUpdate(ImportResult result) { - long initialBase = 100L, newBase = 200L; - PeerState input = new PeerState(FORWARD, initialBase); - PeerState expected = new PeerState(FORWARD, newBase); - - // check when base gets updated - assertThat(forwardModeUpdate(input, newBase, result)).isEqualTo(expected); - - input = new PeerState(FORWARD, initialBase); - while (input.isUnderRepeatThreshold()) { - input.incRepeated(); - } - - expected = new PeerState(NORMAL, initialBase); - - // check with switch to NORMAL mode - assertThat(forwardModeUpdate(input, newBase, result)).isEqualTo(expected); - } - - @Test - public void testForwardModeUpdate() { - long initialBase = 100L, newBase = 200L; - PeerState input = new PeerState(FORWARD, initialBase); - PeerState expected = new PeerState(NORMAL, initialBase); - - // check with switch to NORMAL mode due to IMPORTED_BEST result - assertThat(forwardModeUpdate(input, newBase, IMPORTED_BEST)).isEqualTo(expected); - } } diff --git a/modP2p/src/org/aion/p2p/P2pConstant.java b/modP2p/src/org/aion/p2p/P2pConstant.java index f151a6a298..9316bbc652 100644 --- a/modP2p/src/org/aion/p2p/P2pConstant.java +++ b/modP2p/src/org/aion/p2p/P2pConstant.java @@ -17,26 +17,5 @@ public class P2pConstant { READ_MAX_RATE_TXBC = 20, // write queue timeout - WRITE_MSG_TIMEOUT = 5000, - REQUEST_SIZE = 24, - LARGE_REQUEST_SIZE = 40, - - /** - * The number of blocks overlapping with the current chain requested at import when the - * local best block is far from the top block in the peer's chain. - */ - FAR_OVERLAPPING_BLOCKS = 3, - - /** - * The number of blocks overlapping with the current chain requested at import when the - * local best block is close to the top block in the peer's chain. - */ - CLOSE_OVERLAPPING_BLOCKS = 15, - STEP_COUNT = 6, - MAX_NORMAL_PEERS = 16, - COEFFICIENT_NORMAL_PEERS = 3, - - // NOTE: the 3 values below are interdependent - // do not change one without considering the impact to the others - BACKWARD_SYNC_STEP = REQUEST_SIZE * STEP_COUNT - 1; + WRITE_MSG_TIMEOUT = 5000; }