Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Split trade statistics between recent data and historical data #4405

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload;

import bisq.common.app.Capabilities;
import bisq.common.app.Capability;
Expand All @@ -48,6 +49,7 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import lombok.Value;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -65,11 +67,15 @@
@Slf4j
@Value
public final class TradeStatistics2 implements ProcessOncePersistableNetworkPayload, PersistableNetworkPayload,
CapabilityRequiringPayload, Comparable<TradeStatistics2> {
CapabilityRequiringPayload, Comparable<TradeStatistics2>, PrunablePersistableNetworkPayload {

public static final String MEDIATOR_ADDRESS = "medAddr";
public static final String REFUND_AGENT_ADDRESS = "refAddr";

// We keep only recent trade statistics data (max. 60 days old). Older data gets removed and will be
// handled by the historical trade statistics data framework.
private static final long INCLUSION_PERIOD = TimeUnit.DAYS.toMillis(60);

private final OfferPayload.Direction direction;
private final String baseCurrency;
private final String counterCurrency;
Expand Down Expand Up @@ -246,6 +252,11 @@ public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.TRADE_STATISTICS_HASH_UPDATE);
}

@Override
public boolean doExclude() {
return System.currentTimeMillis() - tradeDate > INCLUSION_PERIOD;
}


///////////////////////////////////////////////////////////////////////////////////////////
// Getters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@

import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload;
import bisq.network.p2p.storage.persistence.MapStoreService;
import bisq.network.p2p.storage.persistence.PrunableStoreService;

import bisq.common.config.Config;
import bisq.common.storage.Storage;

import javax.inject.Named;

import javax.inject.Inject;
import javax.inject.Named;

import java.io.File;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TradeStatistics2StorageService extends MapStoreService<TradeStatistics2Store, PersistableNetworkPayload> {
public class TradeStatistics2StorageService extends MapStoreService<TradeStatistics2Store, PersistableNetworkPayload>
implements PrunableStoreService {
private static final String FILE_NAME = "TradeStatistics2Store";


Expand Down Expand Up @@ -79,8 +83,27 @@ protected TradeStatistics2Store createStore() {
return new TradeStatistics2Store();
}

// At startup we check our persisted data if it contains too old entries and remove those.
// This method is called from a non user thread.
@Override
protected void readStore() {
super.readStore();
public synchronized void prune() {
AtomicBoolean hasExcludedElements = new AtomicBoolean(false);
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = getMap();
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> newMap = map.entrySet().stream()
.filter(e -> {
if (((PrunablePersistableNetworkPayload) e.getValue()).doExclude()) {
hasExcludedElements.set(true);
return false;
} else {
return true;
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (hasExcludedElements.get()) {
map.clear();
map.putAll(newMap);
persist();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,43 +91,45 @@ public void onAllServicesInitialized() {

priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);

dump();
maybeDump();
}

public ObservableSet<TradeStatistics2> getObservableTradeStatisticsSet() {
return observableTradeStatisticsSet;
}

private void addToSet(TradeStatistics2 tradeStatistics) {
if (observableTradeStatisticsSet.contains(tradeStatistics)) {
return;
}

if (!tradeStatistics.isValid()) {
return;
}

if (!observableTradeStatisticsSet.contains(tradeStatistics)) {
Optional<TradeStatistics2> duplicate = observableTradeStatisticsSet.stream().filter(
e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny();

if (duplicate.isPresent()) {
// TODO: Can be removed as soon as everyone uses v1.2.6+
// Removes an existing object with a trade id if the new one matches the existing except
// for the deposit tx id
if (tradeStatistics.getDepositTxId() == null &&
tradeStatistics.isValid() &&
duplicate.get().compareTo(tradeStatistics) == 0) {
observableTradeStatisticsSet.remove(duplicate.get());
} else {
return;
}
}

if (!tradeStatistics.isValid()) {
// TODO remove that part
Optional<TradeStatistics2> duplicate = observableTradeStatisticsSet.stream().filter(
e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny();
if (duplicate.isPresent()) {
// TODO: Can be removed as soon as everyone uses v1.2.6+
// Removes an existing object with a trade id if the new one matches the existing except
// for the deposit tx id
if (tradeStatistics.getDepositTxId() == null &&
tradeStatistics.isValid() &&
duplicate.get().compareTo(tradeStatistics) == 0) {
observableTradeStatisticsSet.remove(duplicate.get());
} else {
return;
}

observableTradeStatisticsSet.add(tradeStatistics);
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
dump();
}

observableTradeStatisticsSet.add(tradeStatistics);
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
maybeDump();
}

private void dump() {
private void maybeDump() {
if (dumpStatistics) {
ArrayList<CurrencyTuple> fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
Expand Down
30 changes: 24 additions & 6 deletions p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.RequiresOwnerIsOnlinePayload;
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener;
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
Expand Down Expand Up @@ -185,8 +186,12 @@ public synchronized void readFromResources(String postFix) {
resourceDataStoreService.readFromResources(postFix);

map.putAll(protectedDataStoreService.getMap());

// Prune in case we have a PrunableStoreService
appendOnlyDataStoreService.prune();
}


///////////////////////////////////////////////////////////////////////////////////////////
// RequestData API
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -521,6 +526,12 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload,
return false;
}

// If we receive data which are considered pruned from the seed node we ignore it
if (payload instanceof PrunablePersistableNetworkPayload &&
((PrunablePersistableNetworkPayload) payload).doExclude()) {
return false;
}

// Add the payload and publish the state update to the appendOnlyDataStoreListeners
if (!payloadHashAlreadyInStore) {
appendOnlyDataStoreService.put(hashAsByteArray, payload);
Expand All @@ -540,13 +551,20 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload,
// is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners
// might be added then but as we have the data already added calling them would be irrelevant as well.
private void addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) {
byte[] hash = payload.getHash();
if (payload.verifyHashSize()) {
ByteArray hashAsByteArray = new ByteArray(hash);
appendOnlyDataStoreService.put(hashAsByteArray, payload);
} else {
log.warn("We got a hash exceeding our permitted size");
if (!payload.verifyHashSize()) {
log.warn("We got a hash not matching our defined size");
return;
}

// If we receive data which are considered pruned from the seed node we ignore it
if (payload instanceof PrunablePersistableNetworkPayload &&
((PrunablePersistableNetworkPayload) payload).doExclude()) {
return;
}

byte[] hash = payload.getHash();
ByteArray hashAsByteArray = new ByteArray(hash);
appendOnlyDataStoreService.put(hashAsByteArray, payload);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.network.p2p.storage.payload;

/**
* Interface for PersistableNetworkPayloads which can be pruned (e.g. old objects will be removed from data store).
*/
public interface PrunablePersistableNetworkPayload extends PersistableNetworkPayload {
boolean doExclude();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayl
.filter(service -> service.canHandle(payload))
.forEach(service -> service.putIfAbsent(hashAsByteArray, payload));
}

public void prune() {
services.stream()
.filter(e -> e instanceof PrunableStoreService)
.map((e -> (PrunableStoreService) e))
.forEach(PrunableStoreService::prune);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.network.p2p.storage.persistence;

/**
* Interface for StoreService which can be pruned (e.g. old objects will be removed from data store).
*/
public interface PrunableStoreService {
void prune();
}