diff --git a/common/src/main/java/bisq/common/storage/FileManager.java b/common/src/main/java/bisq/common/storage/FileManager.java index c6c0e1537e7..c66d08803d7 100644 --- a/common/src/main/java/bisq/common/storage/FileManager.java +++ b/common/src/main/java/bisq/common/storage/FileManager.java @@ -190,7 +190,7 @@ public synchronized void backupFile(String fileName, int numMaxBackupFiles) { private void saveNowInternal(T persistable) { long now = System.currentTimeMillis(); saveToFile(persistable, dir, storageFile); - log.trace("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now); + log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now); } private synchronized void saveToFile(T persistable, File dir, File storageFile) { diff --git a/core/src/main/java/bisq/core/app/BisqExecutable.java b/core/src/main/java/bisq/core/app/BisqExecutable.java index b88423c7bfb..7f514326dda 100644 --- a/core/src/main/java/bisq/core/app/BisqExecutable.java +++ b/core/src/main/java/bisq/core/app/BisqExecutable.java @@ -281,12 +281,14 @@ public void gracefulShutDown(ResultHandler resultHandler) { injector.getInstance(TradeManager.class).shutDown(); injector.getInstance(DaoSetup.class).shutDown(); injector.getInstance(OpenOfferManager.class).shutDown(() -> { + log.info("OpenOfferManager shutdown completed"); injector.getInstance(P2PService.class).shutDown(() -> { + log.info("P2PService shutdown completed"); injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { + log.info("WalletsSetup shutdown completed"); module.close(injector); - log.debug("Graceful shutdown completed"); resultHandler.handleResult(); - + log.info("Graceful shutdown completed. Exiting now."); System.exit(0); }); injector.getInstance(WalletsSetup.class).shutDown(); diff --git a/core/src/main/java/bisq/core/app/BisqSetup.java b/core/src/main/java/bisq/core/app/BisqSetup.java index f1d9d9193fc..44e0d15245f 100644 --- a/core/src/main/java/bisq/core/app/BisqSetup.java +++ b/core/src/main/java/bisq/core/app/BisqSetup.java @@ -289,6 +289,11 @@ public void addBisqSetupCompleteListener(BisqSetupCompleteListener listener) { } public void start() { + if (log.isDebugEnabled()) { + UserThread.runPeriodically(() -> { + log.debug("1 second heartbeat"); + }, 1); + } maybeReSyncSPVChain(); maybeShowTac(); } diff --git a/core/src/main/java/bisq/core/arbitration/DisputeList.java b/core/src/main/java/bisq/core/arbitration/DisputeList.java index 411c053bc82..d1060bd019a 100644 --- a/core/src/main/java/bisq/core/arbitration/DisputeList.java +++ b/core/src/main/java/bisq/core/arbitration/DisputeList.java @@ -79,8 +79,6 @@ public Message toProtoMessage() { public static DisputeList fromProto(protobuf.DisputeList proto, CoreProtoResolver coreProtoResolver, Storage storage) { - log.debug("DisputeList fromProto of {} ", proto); - List list = proto.getDisputeList().stream() .map(disputeProto -> Dispute.fromProto(disputeProto, coreProtoResolver)) .collect(Collectors.toList()); diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalValidator.java b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalValidator.java index e0f0118b1cc..2f7d97a7f5a 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalValidator.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalValidator.java @@ -114,7 +114,7 @@ private boolean isValid(Proposal proposal, boolean allowUnconfirmed) { if (isTxConfirmed) { int txHeight = optionalTx.get().getBlockHeight(); if (!periodService.isTxInCorrectCycle(txHeight, chainHeight)) { - log.debug("Tx is not in current cycle. proposal.getTxId()={}", proposal.getTxId()); + log.trace("Tx is not in current cycle. proposal.getTxId()={}", proposal.getTxId()); return false; } if (!periodService.isInPhase(txHeight, DaoPhase.Phase.PROPOSAL)) { diff --git a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java index 797a6ebf833..01ae89c8521 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java @@ -157,7 +157,14 @@ public void onDaoStateChanged(Block block) { System.currentTimeMillis() - ts); } } - maybeUpdateHashChain(blockHeight); + + long ts = System.currentTimeMillis(); + boolean updated = maybeUpdateHashChain(blockHeight); + if (updated) { + log.info("updateHashChain for block {} took {} ms", + blockHeight, + System.currentTimeMillis() - ts); + } } @SuppressWarnings("Duplicates") @@ -241,11 +248,11 @@ public void removeListener(Listener listener) { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void maybeUpdateHashChain(int blockHeight) { + private boolean maybeUpdateHashChain(int blockHeight) { // We use first block in blind vote phase to create the hash of our blindVotes. We prefer to wait as long as // possible to increase the chance that we have received all blindVotes. if (!isFirstBlockOfBlindVotePhase(blockHeight)) { - return; + return false; } periodService.getCycle(blockHeight).ifPresent(cycle -> { @@ -281,9 +288,12 @@ private void maybeUpdateHashChain(int blockHeight) { UserThread.runAfter(() -> blindVoteStateNetworkService.broadcastMyStateHash(myBlindVoteStateHash), delayInSec); } }); + return true; } - private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteStateHash, Optional peersNodeAddress, boolean notifyListeners) { + private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteStateHash, + Optional peersNodeAddress, + boolean notifyListeners) { AtomicBoolean changed = new AtomicBoolean(false); AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode); AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode); diff --git a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java index d0846e11b54..843b2f570d0 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java @@ -117,6 +117,8 @@ public interface Listener { ); private boolean checkpointFailed; private boolean ignoreDevMsg; + private int numCalls; + private long accumulatedDuration; private final File storageDir; @@ -176,6 +178,12 @@ public void onParseBlockChainComplete() { if (!ignoreDevMsg) { verifyCheckpoints(); } + + log.info("ParseBlockChainComplete: Accumulated updateHashChain() calls for {} block took {} ms " + + "({} ms in average / block)", + numCalls, + accumulatedDuration, + (int) ((double) accumulatedDuration / (double) numCalls)); } @Override @@ -277,6 +285,7 @@ public void removeListener(Listener listener) { /////////////////////////////////////////////////////////////////////////////////////////// private void updateHashChain(Block block) { + long ts = System.currentTimeMillis(); byte[] prevHash; int height = block.getHeight(); if (daoStateBlockChain.isEmpty()) { @@ -316,6 +325,13 @@ private void updateHashChain(Block block) { int delayInSec = 5 + new Random().nextInt(10); UserThread.runAfter(() -> daoStateNetworkService.broadcastMyStateHash(myDaoStateHash), delayInSec); } + long duration = System.currentTimeMillis() - ts; + // We don't want to spam the output. We log accumulated time after parsing is completed. + log.trace("updateHashChain for block {} took {} ms", + block.getHeight(), + duration); + accumulatedDuration += duration; + numCalls++; } private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional peersNodeAddress, diff --git a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java index 1e610d5a21f..9b7863cf734 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java @@ -160,7 +160,13 @@ public void onDaoStateChanged(Block block) { System.currentTimeMillis() - ts); } } - maybeUpdateHashChain(blockHeight); + long ts = System.currentTimeMillis(); + boolean updated = maybeUpdateHashChain(blockHeight); + if (updated) { + log.info("updateHashChain for block {} took {} ms", + blockHeight, + System.currentTimeMillis() - ts); + } } @SuppressWarnings("Duplicates") diff --git a/core/src/main/java/bisq/core/dao/node/BsqNodeProvider.java b/core/src/main/java/bisq/core/dao/node/BsqNodeProvider.java index c5c2302db4b..ad03966af3c 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNodeProvider.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNodeProvider.java @@ -45,9 +45,10 @@ public BsqNodeProvider(LiteNode bsqLiteNode, !preferences.getRpcPw().isEmpty() && preferences.getBlockNotifyPort() > 0; boolean daoFullNode = preferences.isDaoFullNode(); - if (daoFullNode && !rpcDataSet) - log.warn("daoFullNode is set but RPC user and pw are missing"); - + if (daoFullNode && !rpcDataSet) { + log.warn("daoFullNode is set in preferences but RPC user and pw are missing. We reset daoFullNode in preferences to false."); + preferences.setDaoFullNode(false); + } bsqNode = daoFullNode && rpcDataSet ? bsqFullNode : bsqLiteNode; } } diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index df6e4e1b604..a51c5a11235 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -88,9 +88,6 @@ public void start() { liteNodeNetworkService.start(); bsqWalletService.addNewBestBlockListener(block -> { - int height = block.getHeight(); - log.info("New block at height {} from bsqWalletService", height); - // Check if we are done with parsing if (!daoStateService.isParseBlockChainComplete()) return; @@ -100,6 +97,9 @@ public void start() { checkForBlockReceivedTimer.stop(); } + int height = block.getHeight(); + log.info("New block at height {} from bsqWalletService", height); + // We expect to receive the new BSQ block from the network shortly after BitcoinJ has been aware of it. // If we don't receive it we request it manually from seed nodes checkForBlockReceivedTimer = UserThread.runAfter(() -> { diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index c6720a79e2d..2cb081834f6 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -143,6 +143,10 @@ public void addListener(Listener listener) { listeners.add(listener); } + /** + * + * @param startBlockHeight Block height from where we expect new blocks (current block height in bsqState + 1) + */ public void requestBlocks(int startBlockHeight) { lastRequestedBlockHeight = startBlockHeight; Optional connectionToSeedNodeOptional = networkNode.getConfirmedConnections().stream() diff --git a/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java b/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java index 5cb3d8e17fb..a1b7409bf2b 100644 --- a/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java +++ b/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java @@ -80,7 +80,7 @@ public BlockParser(TxParser txParser, */ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException { int blockHeight = rawBlock.getHeight(); - log.debug("Parse block at height={} ", blockHeight); + log.trace("Parse block at height={} ", blockHeight); validateIfBlockIsConnecting(rawBlock); diff --git a/core/src/main/java/bisq/core/dao/node/parser/TxParser.java b/core/src/main/java/bisq/core/dao/node/parser/TxParser.java index 57c0cf11816..42be945a632 100644 --- a/core/src/main/java/bisq/core/dao/node/parser/TxParser.java +++ b/core/src/main/java/bisq/core/dao/node/parser/TxParser.java @@ -385,7 +385,7 @@ static TxType evaluateTxType(TempTx tempTx, Optional optionalOpRet } // TRANSFER_BSQ has no fee, no opReturn and no UNLOCK_OUTPUT at first output - log.debug("No burned fee and no OP_RETURN, so this is a TRANSFER_BSQ tx."); + log.trace("No burned fee and no OP_RETURN, so this is a TRANSFER_BSQ tx."); return TxType.TRANSFER_BSQ; } diff --git a/core/src/main/java/bisq/core/offer/Offer.java b/core/src/main/java/bisq/core/offer/Offer.java index b6672f8dc8b..d0c505edf8c 100644 --- a/core/src/main/java/bisq/core/offer/Offer.java +++ b/core/src/main/java/bisq/core/offer/Offer.java @@ -187,7 +187,7 @@ public Price getPrice() { return null; } } else { - log.debug("We don't have a market price.\n" + + log.trace("We don't have a market price. " + "That case could only happen if you don't have a price feed."); return null; } diff --git a/core/src/main/java/bisq/core/offer/OpenOfferManager.java b/core/src/main/java/bisq/core/offer/OpenOfferManager.java index 89169919398..02361d49aeb 100644 --- a/core/src/main/java/bisq/core/offer/OpenOfferManager.java +++ b/core/src/main/java/bisq/core/offer/OpenOfferManager.java @@ -196,15 +196,14 @@ public void shutDown(@Nullable Runnable completeHandler) { stopPeriodicRepublishOffersTimer(); stopRetryRepublishOffersTimer(); - log.debug("remove all open offers at shutDown"); + log.info("Remove open offers at shutDown. Number of open offers: {}", openOffers.size()); // we remove own offers from offerbook when we go offline // Normally we use a delay for broadcasting to the peers, but at shut down we want to get it fast out - - final int size = openOffers != null ? openOffers.size() : 0; + int size = openOffers != null ? openOffers.size() : 0; if (offerBookService.isBootstrapped() && size > 0) { openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())); if (completeHandler != null) - UserThread.runAfter(completeHandler::run, size * 200 + 500, TimeUnit.MILLISECONDS); + UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS); } else { if (completeHandler != null) completeHandler.run(); @@ -223,7 +222,7 @@ public void removeOpenOffers(List openOffers, @Nullable Runnable comp }, errorMessage -> { })); if (completeHandler != null) - UserThread.runAfter(completeHandler::run, size * 200 + 500, TimeUnit.MILLISECONDS); + UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS); } diff --git a/core/src/main/java/bisq/core/provider/price/PriceRequest.java b/core/src/main/java/bisq/core/provider/price/PriceRequest.java index 04718345d9f..e1d3cc91f7d 100644 --- a/core/src/main/java/bisq/core/provider/price/PriceRequest.java +++ b/core/src/main/java/bisq/core/provider/price/PriceRequest.java @@ -49,7 +49,7 @@ public SettableFuture, Map>> reque Futures.addCallback(future, new FutureCallback, Map>>() { public void onSuccess(Tuple2, Map> marketPriceTuple) { - log.debug("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider); + log.trace("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider); resultFuture.set(marketPriceTuple); } diff --git a/core/src/main/java/bisq/core/trade/TradableList.java b/core/src/main/java/bisq/core/trade/TradableList.java index bc4a6890950..0c823de42f6 100644 --- a/core/src/main/java/bisq/core/trade/TradableList.java +++ b/core/src/main/java/bisq/core/trade/TradableList.java @@ -40,8 +40,6 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import javax.annotation.Nullable; - @Slf4j public final class TradableList implements PersistableEnvelope { transient final private Storage> storage; @@ -80,13 +78,10 @@ public Message toProtoMessage() { .build(); } - @Nullable public static TradableList fromProto(protobuf.TradableList proto, CoreProtoResolver coreProtoResolver, Storage> storage, BtcWalletService btcWalletService) { - log.debug("TradableList fromProto of {} ", proto); - List list = proto.getTradableList().stream() .map(tradable -> { switch (tradable.getMessageCase()) { diff --git a/core/src/main/java/bisq/core/trade/statistics/AssetTradeActivityCheck.java b/core/src/main/java/bisq/core/trade/statistics/AssetTradeActivityCheck.java index a53fa51ea70..25caffeccb7 100644 --- a/core/src/main/java/bisq/core/trade/statistics/AssetTradeActivityCheck.java +++ b/core/src/main/java/bisq/core/trade/statistics/AssetTradeActivityCheck.java @@ -141,7 +141,7 @@ public void onAllServicesInitialized() { "\n\n" + newAssets.toString() + "\n\n" + sufficientlyTraded.toString(); // Utilities.copyToClipboard(result); - log.debug(result); + log.trace(result); } private boolean isWarmingUp(String code) { diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java index 6410032375d..3f40cd660bd 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java @@ -170,7 +170,7 @@ private void addToMap(TradeStatistics2 tradeStatistics, boolean storeLocally) { private void addToMap(TradeStatistics2 tradeStatistics, Map map) { TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics); if (prevValue != null) - log.debug("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics"); + log.trace("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics"); } private void dump() { diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 5ae9033cf2f..0e7a566f295 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -537,7 +537,7 @@ private void processMailboxEntry(ProtectedMailboxStorageEntry protectedMailboxSt log.error("Protobuffer data could not be processed: {}", e.toString()); } } else { - log.debug("Wrong blurredAddressHash. The message is not intended for us."); + log.trace("Wrong blurredAddressHash. The message is not intended for us."); } } } diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 09d676ba3c0..d3db7d72657 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -247,7 +247,7 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null"); protobuf.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope(); - log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000)); + log.trace("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000)); if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) { // pings and offer refresh msg we don't want to log in production diff --git a/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java b/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java index dbea7462c99..6e5f4d98b61 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java @@ -483,7 +483,7 @@ private void printReportedPeers() { List reportedPeersClone = new ArrayList<>(reportedPeers); reportedPeersClone.stream().forEach(e -> result.append("\n").append(e)); result.append("\n------------------------------------------------------------\n"); - log.debug(result.toString()); + log.trace(result.toString()); } log.debug("Number of reported peers: {}", reportedPeers.size()); } @@ -495,7 +495,7 @@ private void printNewReportedPeers(Set reportedPeers) { StringBuilder result = new StringBuilder("We received new reportedPeers:"); List reportedPeersClone = new ArrayList<>(reportedPeers); reportedPeersClone.stream().forEach(e -> result.append("\n\t").append(e)); - log.debug(result.toString()); + log.trace(result.toString()); } log.debug("Number of new arrived reported peers: {}", reportedPeers.size()); } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index e524076d3d6..f3304f4fa83 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -42,13 +42,12 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -56,8 +55,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static com.google.common.base.Preconditions.checkArgument; - @Slf4j class RequestDataHandler implements MessageListener { private static final long TIMEOUT = 90; @@ -93,10 +90,10 @@ public interface Listener { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public RequestDataHandler(NetworkNode networkNode, - P2PDataStorage dataStorage, - PeerManager peerManager, - Listener listener) { + RequestDataHandler(NetworkNode networkNode, + P2PDataStorage dataStorage, + PeerManager peerManager, + Listener listener) { this.networkNode = networkNode; this.dataStorage = dataStorage; this.peerManager = peerManager; @@ -112,7 +109,7 @@ public void cancel() { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) { + void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) { peersNodeAddress = nodeAddress; if (!stopped) { GetDataRequest getDataRequest; @@ -155,6 +152,7 @@ public void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataReques log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress); networkNode.addMessageListener(this); SettableFuture future = networkNode.sendMessage(nodeAddress, getDataRequest); + //noinspection UnstableApiUsage Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { @@ -198,7 +196,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { GetDataResponse getDataResponse = (GetDataResponse) networkEnvelope; Map> payloadByClassName = new HashMap<>(); final Set dataSet = getDataResponse.getDataSet(); - dataSet.stream().forEach(e -> { + dataSet.forEach(e -> { final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload(); if (protectedStoragePayload == null) { log.warn("StoragePayload was null: {}", networkEnvelope.toString()); @@ -216,7 +214,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { Set persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet(); if (persistableNetworkPayloadSet != null) { - persistableNetworkPayloadSet.stream().forEach(persistableNetworkPayload -> { + persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> { // For logging different data types String className = persistableNetworkPayload.getClass().getSimpleName(); if (!payloadByClassName.containsKey(className)) @@ -233,62 +231,57 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { final int items = dataSet.size() + (persistableNetworkPayloadSet != null ? persistableNetworkPayloadSet.size() : 0); sb.append("Received ").append(items).append(" instances\n"); - payloadByClassName.entrySet().stream().forEach(e -> sb.append(e.getKey()) + payloadByClassName.forEach((key, value) -> sb.append(key) .append(": ") - .append(e.getValue().size()) + .append(value.size()) .append("\n")); sb.append("#################################################################"); log.info(sb.toString()); if (getDataResponse.getRequestNonce() == nonce) { stopTimeoutTimer(); - checkArgument(connection.getPeersNodeAddressOptional().isPresent(), - "RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " + - "at that moment"); + if (!connection.getPeersNodeAddressOptional().isPresent()) { + log.error("RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " + + "at that moment"); + return; + } final NodeAddress sender = connection.getPeersNodeAddressOptional().get(); - List processDelayedItems = new ArrayList<>(); - dataSet.stream().forEach(e -> { - if (e.getProtectedStoragePayload() instanceof LazyProcessedPayload) { - processDelayedItems.add(e); - } else { - // We dont broadcast here (last param) as we are only connected to the seed node and would be pointless - dataStorage.addProtectedStorageEntry(e, sender, null, false, false); - } + long ts = System.currentTimeMillis(); + AtomicInteger counter = new AtomicInteger(); + dataSet.forEach(e -> { + // We don't broadcast here (last param) as we are only connected to the seed node and would be pointless + dataStorage.addProtectedStorageEntry(e, sender, null, false, false); + counter.getAndIncrement(); + }); + log.info("Processing {} protectedStorageEntries took {} ms.", counter.get(), System.currentTimeMillis() - ts); if (persistableNetworkPayloadSet != null) { - persistableNetworkPayloadSet.stream().forEach(e -> { + ts = System.currentTimeMillis(); + persistableNetworkPayloadSet.forEach(e -> { if (e instanceof LazyProcessedPayload) { - processDelayedItems.add(e); + // We use an optimized method as many checks are not required in that case to avoid + // performance issues. + // Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min). + // Usually we only get about a few hundred or max. a few 1000 items. 82645 is all + // trade stats stats and all account age witness data. + dataStorage.addPersistableNetworkPayloadFromInitialRequest(e); } else { - // We dont broadcast here as we are only connected to the seed node and would be pointless - dataStorage.addPersistableNetworkPayload(e, sender, false, false, false, false); + // We don't broadcast here as we are only connected to the seed node and would be pointless + dataStorage.addPersistableNetworkPayload(e, sender, false, + false, false, false); } }); + log.info("Processing {} persistableNetworkPayloads took {} ms.", + persistableNetworkPayloadSet.size(), System.currentTimeMillis() - ts); } - long ts = System.currentTimeMillis(); - processDelayedItems.forEach(item -> { - if (item instanceof ProtectedStorageEntry) - dataStorage.addProtectedStorageEntry((ProtectedStorageEntry) item, sender, null, - false, false); - else if (item instanceof PersistableNetworkPayload) { - // We use an optimized method as many checks are not required in that case to avoid - // performance issues. - // Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min). - // Usually we only get about a few hundred or max. a few 1000 items. 82645 is all - // trade stats stats and all account age witness data. - dataStorage.addPersistableNetworkPayloadFromInitialRequest((PersistableNetworkPayload) item); - } - }); - log.info("Processing {} items took {} ms.", processDelayedItems.size(), System.currentTimeMillis() - ts); - cleanup(); listener.onComplete(); } else { - log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " + + log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " + "handshake (timeout causes connection close but peer might have sent a msg before " + "connection was closed).\n\t" + "We drop that message. nonce={} / requestNonce={}", @@ -313,7 +306,9 @@ public void stop() { @SuppressWarnings("UnusedParameters") - private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) { + private void handleFault(String errorMessage, + NodeAddress nodeAddress, + CloseConnectionReason closeConnectionReason) { cleanup(); log.info(errorMessage); //peerManager.shutDownConnection(nodeAddress, closeConnectionReason); diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index c2327be472a..6170d9efc11 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -317,7 +317,7 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, boolean allowBroadcast, boolean reBroadcast, boolean checkDate) { - log.debug("addPersistableNetworkPayload payload={}", payload); + log.trace("addPersistableNetworkPayload payload={}", payload); byte[] hash = payload.getHash(); if (payload.verifyHashSize()) { ByteArray hashAsByteArray = new ByteArray(hash); @@ -683,7 +683,7 @@ private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfDa msg = "Sequence number is equal to the stored one. sequenceNumber = " + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber; } - log.debug(msg); + log.trace(msg); return false; } else { log.debug("Sequence number is invalid. sequenceNumber = " @@ -834,7 +834,7 @@ private void printData(String info) { .append(Utilities.toTruncatedString(protectedStoragePayload)); }); sb.append("\n------------------------------------------------------------\n"); - log.debug(sb.toString()); + log.trace(sb.toString()); //log.debug("Data set " + info + " operation: size=" + map.values().size()); } }