From 4d0c527f0025bd2ea6ec47b1bd724d19b9aadb1e Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Fri, 7 Aug 2020 23:54:57 -0500 Subject: [PATCH 1/2] Add pruning support for trade statistics We add an interface for prunable PersistableNetworkPayloads so we can handle it generically in the p2p module. We check at startup if our persisted data has old elements and remove those. Data we receive from the seed node at startup will get checked as well for old objects as well data we receive from the network. --- .../trade/statistics/TradeStatistics2.java | 13 ++++- .../TradeStatistics2StorageService.java | 33 +++++++++++-- .../statistics/TradeStatisticsManager.java | 48 ++++++++++--------- .../network/p2p/storage/P2PDataStorage.java | 30 +++++++++--- .../PrunablePersistableNetworkPayload.java | 27 +++++++++++ .../AppendOnlyDataStoreService.java | 7 +++ .../persistence/PrunableStoreService.java | 25 ++++++++++ 7 files changed, 148 insertions(+), 35 deletions(-) create mode 100644 p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java create mode 100644 p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java index bb31911d2bd..49abada260c 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java @@ -27,6 +27,7 @@ import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.network.p2p.storage.payload.PersistableNetworkPayload; import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload; +import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload; import bisq.common.app.Capabilities; import bisq.common.app.Capability; @@ -48,6 +49,7 @@ import java.util.Date; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import lombok.Value; import lombok.extern.slf4j.Slf4j; @@ -65,11 +67,15 @@ @Slf4j @Value public final class TradeStatistics2 implements ProcessOncePersistableNetworkPayload, PersistableNetworkPayload, - CapabilityRequiringPayload, Comparable { + CapabilityRequiringPayload, Comparable, PrunablePersistableNetworkPayload { public static final String MEDIATOR_ADDRESS = "medAddr"; public static final String REFUND_AGENT_ADDRESS = "refAddr"; + // We keep only recent trade statistics data (max. 60 days old). Older data gets removed and will be + // handled by the historical trade statistics data framework. + private static final long INCLUSION_PERIOD = TimeUnit.DAYS.toMillis(60); + private final OfferPayload.Direction direction; private final String baseCurrency; private final String counterCurrency; @@ -246,6 +252,11 @@ public Capabilities getRequiredCapabilities() { return new Capabilities(Capability.TRADE_STATISTICS_HASH_UPDATE); } + @Override + public boolean doExclude() { + return System.currentTimeMillis() - tradeDate > INCLUSION_PERIOD; + } + /////////////////////////////////////////////////////////////////////////////////////////// // Getters diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java index c732a077f2c..2ae9b6deb76 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java @@ -19,23 +19,27 @@ import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.payload.PersistableNetworkPayload; +import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload; import bisq.network.p2p.storage.persistence.MapStoreService; +import bisq.network.p2p.storage.persistence.PrunableStoreService; import bisq.common.config.Config; import bisq.common.storage.Storage; -import javax.inject.Named; - import javax.inject.Inject; +import javax.inject.Named; import java.io.File; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @Slf4j -public class TradeStatistics2StorageService extends MapStoreService { +public class TradeStatistics2StorageService extends MapStoreService + implements PrunableStoreService { private static final String FILE_NAME = "TradeStatistics2Store"; @@ -79,8 +83,27 @@ protected TradeStatistics2Store createStore() { return new TradeStatistics2Store(); } + // At startup we check our persisted data if it contains too old entries and remove those. + // This method is called from a non user thread. @Override - protected void readStore() { - super.readStore(); + public synchronized void prune() { + AtomicBoolean hasExcludedElements = new AtomicBoolean(false); + Map map = getMap(); + Map newMap = map.entrySet().stream() + .filter(e -> { + if (((PrunablePersistableNetworkPayload) e.getValue()).doExclude()) { + hasExcludedElements.set(true); + return false; + } else { + return true; + } + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (hasExcludedElements.get()) { + map.clear(); + map.putAll(newMap); + persist(); + } } } diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java index 4d90fc143c9..4f06d33f682 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java @@ -91,7 +91,7 @@ public void onAllServicesInitialized() { priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); - dump(); + maybeDump(); } public ObservableSet getObservableTradeStatisticsSet() { @@ -99,35 +99,37 @@ public ObservableSet getObservableTradeStatisticsSet() { } private void addToSet(TradeStatistics2 tradeStatistics) { + if (observableTradeStatisticsSet.contains(tradeStatistics)) { + return; + } + + if (!tradeStatistics.isValid()) { + return; + } - if (!observableTradeStatisticsSet.contains(tradeStatistics)) { - Optional duplicate = observableTradeStatisticsSet.stream().filter( - e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny(); - - if (duplicate.isPresent()) { - // TODO: Can be removed as soon as everyone uses v1.2.6+ - // Removes an existing object with a trade id if the new one matches the existing except - // for the deposit tx id - if (tradeStatistics.getDepositTxId() == null && - tradeStatistics.isValid() && - duplicate.get().compareTo(tradeStatistics) == 0) { - observableTradeStatisticsSet.remove(duplicate.get()); - } else { - return; - } - } - if (!tradeStatistics.isValid()) { + // TODO remove that part + Optional duplicate = observableTradeStatisticsSet.stream().filter( + e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny(); + if (duplicate.isPresent()) { + // TODO: Can be removed as soon as everyone uses v1.2.6+ + // Removes an existing object with a trade id if the new one matches the existing except + // for the deposit tx id + if (tradeStatistics.getDepositTxId() == null && + tradeStatistics.isValid() && + duplicate.get().compareTo(tradeStatistics) == 0) { + observableTradeStatisticsSet.remove(duplicate.get()); + } else { return; } - - observableTradeStatisticsSet.add(tradeStatistics); - priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); - dump(); } + + observableTradeStatisticsSet.add(tradeStatistics); + priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); + maybeDump(); } - private void dump() { + private void maybeDump() { if (dumpStatistics) { ArrayList fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream() .map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8)) diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 34e29f3f5fa..1cf93b6743c 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -44,6 +44,7 @@ import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStoragePayload; +import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload; import bisq.network.p2p.storage.payload.RequiresOwnerIsOnlinePayload; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; @@ -185,8 +186,12 @@ public synchronized void readFromResources(String postFix) { resourceDataStoreService.readFromResources(postFix); map.putAll(protectedDataStoreService.getMap()); + + // Prune in case we have a PrunableStoreService + appendOnlyDataStoreService.prune(); } + /////////////////////////////////////////////////////////////////////////////////////////// // RequestData API /////////////////////////////////////////////////////////////////////////////////////////// @@ -521,6 +526,12 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, return false; } + // If we receive data which are considered pruned from the seed node we ignore it + if (payload instanceof PrunablePersistableNetworkPayload && + ((PrunablePersistableNetworkPayload) payload).doExclude()) { + return false; + } + // Add the payload and publish the state update to the appendOnlyDataStoreListeners if (!payloadHashAlreadyInStore) { appendOnlyDataStoreService.put(hashAsByteArray, payload); @@ -540,13 +551,20 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners // might be added then but as we have the data already added calling them would be irrelevant as well. private void addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) { - byte[] hash = payload.getHash(); - if (payload.verifyHashSize()) { - ByteArray hashAsByteArray = new ByteArray(hash); - appendOnlyDataStoreService.put(hashAsByteArray, payload); - } else { - log.warn("We got a hash exceeding our permitted size"); + if (!payload.verifyHashSize()) { + log.warn("We got a hash not matching our defined size"); + return; } + + // If we receive data which are considered pruned from the seed node we ignore it + if (payload instanceof PrunablePersistableNetworkPayload && + ((PrunablePersistableNetworkPayload) payload).doExclude()) { + return; + } + + byte[] hash = payload.getHash(); + ByteArray hashAsByteArray = new ByteArray(hash); + appendOnlyDataStoreService.put(hashAsByteArray, payload); } /** diff --git a/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java b/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java new file mode 100644 index 00000000000..c679b232674 --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java @@ -0,0 +1,27 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.storage.payload; + +import bisq.common.Payload; + +/** + * Interface for PersistableNetworkPayloads which can be pruned (e.g. old objects will be removed from data store). + */ +public interface PrunablePersistableNetworkPayload extends Payload { + boolean doExclude(); +} diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java index c7ea20c9438..b64a40e258f 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java @@ -66,4 +66,11 @@ public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayl .filter(service -> service.canHandle(payload)) .forEach(service -> service.putIfAbsent(hashAsByteArray, payload)); } + + public void prune() { + services.stream() + .filter(e -> e instanceof PrunableStoreService) + .map((e -> (PrunableStoreService) e)) + .forEach(PrunableStoreService::prune); + } } diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java new file mode 100644 index 00000000000..07af3fbf51c --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java @@ -0,0 +1,25 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.storage.persistence; + +/** + * Interface for StoreService which can be pruned (e.g. old objects will be removed from data store). + */ +public interface PrunableStoreService { + void prune(); +} From dee596e5e176f589379cd568f0a5e5b752b683c0 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Sat, 8 Aug 2020 20:31:06 -0500 Subject: [PATCH 2/2] Let PrunablePersistableNetworkPayload extend PersistableNetworkPayload --- .../storage/payload/PrunablePersistableNetworkPayload.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java b/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java index c679b232674..82b268d800f 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java @@ -17,11 +17,9 @@ package bisq.network.p2p.storage.payload; -import bisq.common.Payload; - /** * Interface for PersistableNetworkPayloads which can be pruned (e.g. old objects will be removed from data store). */ -public interface PrunablePersistableNetworkPayload extends Payload { +public interface PrunablePersistableNetworkPayload extends PersistableNetworkPayload { boolean doExclude(); }