Skip to content

Commit

Permalink
Fix tradestat hash issue (#3529)
Browse files Browse the repository at this point in the history
* Recreate hash from protobuf data

To ensure all data are using the new hash method (excluding extraMap) we
do not use the hash field from the protobug data but pass null which
causes to create the hash new based on the new hash method.

* Add filter.toString method and log filter in case of wrong signature

We have atm a invalid filter (prob. some dev polluted a test filter to mainnet)

* Change log level, add log

* Refactor: Move code to dump method

* Add TRADE_STATISTICS_HASH_UPDATE capability

We changed the hash method in 1.2.0 and that requires update to 1.2.2
for handling it correctly, otherwise the seed nodes have to process too
much data.

* Add logs for size of data exchange messages

* Add more data in log

* Improve logs

* Fix wrong msg in log, cahnge log level

* Add check for depositTxId not empty

* Remove check for duplicates

As we recreate the hash for all trade stat objects we don't need that
check anymore.

* Add logs
  • Loading branch information
chimp1984 authored and ripcurlx committed Oct 31, 2019
1 parent 42fcee4 commit 9f35383
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 80 deletions.
4 changes: 2 additions & 2 deletions common/src/main/java/bisq/common/app/Capability.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public enum Capability {

SIGNED_ACCOUNT_AGE_WITNESS, // Supports the signed account age witness feature
MEDIATION, // Supports mediation feature
REFUND_AGENT // Supports refund agents
;
REFUND_AGENT, // Supports refund agents
TRADE_STATISTICS_HASH_UPDATE // We changed the hash method in 1.2.0 and that requires update to 1.2.2 for handling it correctly, otherwise the seed nodes have to process too much data.
}
22 changes: 22 additions & 0 deletions core/src/main/java/bisq/core/filter/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,26 @@ void setSigAndPubKey(String signatureAsBase64, PublicKey ownerPubKey) {

ownerPubKeyBytes = Sig.getPublicKeyBytes(this.ownerPubKey);
}

@Override
public String toString() {
return "Filter{" +
"\n bannedOfferIds=" + bannedOfferIds +
",\n bannedNodeAddress=" + bannedNodeAddress +
",\n bannedPaymentAccounts=" + bannedPaymentAccounts +
",\n bannedCurrencies=" + bannedCurrencies +
",\n bannedPaymentMethods=" + bannedPaymentMethods +
",\n arbitrators=" + arbitrators +
",\n seedNodes=" + seedNodes +
",\n priceRelayNodes=" + priceRelayNodes +
",\n preventPublicBtcNetwork=" + preventPublicBtcNetwork +
",\n btcNodes=" + btcNodes +
",\n extraDataMap=" + extraDataMap +
",\n disableDao=" + disableDao +
",\n disableDaoBelowVersion='" + disableDaoBelowVersion + '\'' +
",\n disableTradeBelowVersion='" + disableTradeBelowVersion + '\'' +
",\n mediators=" + mediators +
",\n refundAgents=" + refundAgents +
"\n}";
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private boolean verifySignature(Filter filter) {
ECKey.fromPublicOnly(HEX.decode(pubKeyAsHex)).verifyMessage(getHexFromData(filter), filter.getSignatureAsBase64());
return true;
} catch (SignatureException e) {
log.warn("verifySignature failed");
log.warn("verifySignature failed. filter={}", filter);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public static TradeStatistics2 fromProto(protobuf.TradeStatistics2 proto) {
proto.getTradeAmount(),
proto.getTradeDate(),
proto.getDepositTxId(),
proto.getHash().toByteArray(),
null, // We want to clean up the hashes with the changed hash method in v.1.2.0 so we don't use the value from the proto
CollectionUtils.isEmpty(proto.getExtraDataMap()) ? null : proto.getExtraDataMap());
}

Expand All @@ -235,12 +235,13 @@ public boolean verifyHashSize() {
}

// With v1.2.0 we changed the way how the hash is created. To not create too heavy load for seed nodes from
// requests from old nodes we use the SIGNED_ACCOUNT_AGE_WITNESS capability to send trade statistics only to new
// requests from old nodes we use the TRADE_STATISTICS_HASH_UPDATE capability to send trade statistics only to new
// nodes. As trade statistics are only used for informational purpose it will not have any critical issue for the
// old nodes beside that they don't see the latest trades.
// old nodes beside that they don't see the latest trades. We added TRADE_STATISTICS_HASH_UPDATE in v1.2.2 to fix a
// problem of not handling the hashes correctly.
@Override
public Capabilities getRequiredCapabilities() {
return new Capabilities(Capability.SIGNED_ACCOUNT_AGE_WITNESS);
return new Capabilities(Capability.TRADE_STATISTICS_HASH_UPDATE);
}


Expand Down Expand Up @@ -279,10 +280,9 @@ public boolean isValid() {
// Since the trade wasn't executed it's better to filter it out to avoid it having an undue influence on the
// BSQ trade stats.
boolean excludedFailedTrade = offerId.equals("6E5KOI6O-3a06a037-6f03-4bfa-98c2-59f49f73466a-112");
return tradeAmount > 0 && tradePrice > 0 && !excludedFailedTrade;
return tradeAmount > 0 && tradePrice > 0 && !excludedFailedTrade && !depositTxId.isEmpty();
}


@Override
public String toString() {
return "TradeStatistics2{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import java.io.File;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -70,19 +68,6 @@ public boolean canHandle(PersistableNetworkPayload payload) {
return payload instanceof TradeStatistics2;
}

Collection<TradeStatistics2> cleanupMap(Collection<TradeStatistics2> collection) {
Map<P2PDataStorage.ByteArray, TradeStatistics2> tempMap = new HashMap<>();
// We recreate the hash as there have been duplicates from diff. extraMap entries introduced at software updates
collection.forEach(item -> tempMap.putIfAbsent(new P2PDataStorage.ByteArray(item.createHash()), item));

Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = getMap();
map.clear();
map.putAll(tempMap);
persist();

return tempMap.values();
}


///////////////////////////////////////////////////////////////////////////////////////////
// Protected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@
import java.io.File;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -57,7 +54,6 @@ public class TradeStatisticsManager {
private final TradeStatistics2StorageService tradeStatistics2StorageService;
private final boolean dumpStatistics;
private final ObservableSet<TradeStatistics2> observableTradeStatisticsSet = FXCollections.observableSet();
private int duplicates = 0;

@Inject
public TradeStatisticsManager(P2PService p2PService,
Expand All @@ -76,51 +72,17 @@ public TradeStatisticsManager(P2PService p2PService,
}

public void onAllServicesInitialized() {
if (dumpStatistics) {
ArrayList<CurrencyTuple> fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
.collect(Collectors.toCollection(ArrayList::new));
jsonFileManager.writeToDisc(Utilities.objectToJson(fiatCurrencyList), "fiat_currency_list");

ArrayList<CurrencyTuple> cryptoCurrencyList = CurrencyUtil.getAllSortedCryptoCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
.collect(Collectors.toCollection(ArrayList::new));
cryptoCurrencyList.add(0, new CurrencyTuple(Res.getBaseCurrencyCode(), Res.getBaseCurrencyName(), 8));
jsonFileManager.writeToDisc(Utilities.objectToJson(cryptoCurrencyList), "crypto_currency_list");
}

p2PService.getP2PDataStorage().addAppendOnlyDataStoreListener(payload -> {
if (payload instanceof TradeStatistics2)
addToSet((TradeStatistics2) payload);
});

Map<String, TradeStatistics2> map = new HashMap<>();
AtomicInteger origSize = new AtomicInteger();
p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().stream()
Set<TradeStatistics2> collect = tradeStatistics2StorageService.getMap().values().stream()
.filter(e -> e instanceof TradeStatistics2)
.map(e -> (TradeStatistics2) e)
.filter(TradeStatistics2::isValid)
.forEach(tradeStatistics -> {
origSize.getAndIncrement();
TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics);
if (prevValue != null) {
duplicates++;
}
});

Collection<TradeStatistics2> items = map.values();
// At startup we check if we have duplicate entries. This might be the case from software updates when we
// introduced new entries to the extraMap. As that map is for flexibility in updates we keep it excluded from
// json so that it will not cause duplicates anymore. Until all users have updated we keep the cleanup code.
// Should not be needed later anymore, but will also not hurt if no duplicates exist.
if (duplicates > 0) {
long ts = System.currentTimeMillis();
items = tradeStatistics2StorageService.cleanupMap(items);
log.info("We found {} duplicate entries. Size of map entries before and after cleanup: {} / {}. Cleanup took {} ms.",
duplicates, origSize, items.size(), System.currentTimeMillis() - ts);
}

observableTradeStatisticsSet.addAll(items);
.collect(Collectors.toSet());
observableTradeStatisticsSet.addAll(collect);

priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);

Expand Down Expand Up @@ -149,6 +111,17 @@ private void addToSet(TradeStatistics2 tradeStatistics) {

private void dump() {
if (dumpStatistics) {
ArrayList<CurrencyTuple> fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
.collect(Collectors.toCollection(ArrayList::new));
jsonFileManager.writeToDisc(Utilities.objectToJson(fiatCurrencyList), "fiat_currency_list");

ArrayList<CurrencyTuple> cryptoCurrencyList = CurrencyUtil.getAllSortedCryptoCurrencies().stream()
.map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8))
.collect(Collectors.toCollection(ArrayList::new));
cryptoCurrencyList.add(0, new CurrencyTuple(Res.getBaseCurrencyCode(), Res.getBaseCurrencyName(), 8));
jsonFileManager.writeToDisc(Utilities.objectToJson(cryptoCurrencyList), "crypto_currency_list");

// We store the statistics as json so it is easy for further processing (e.g. for web based services)
// TODO This is just a quick solution for storing to one file.
// 1 statistic entry has 500 bytes as json.
Expand Down
8 changes: 7 additions & 1 deletion p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
data = ((AddDataMessage) msg).getProtectedStorageEntry().getProtectedStoragePayload();
}
// Monitoring nodes have only one capability set, we don't want to log those
log.info("We did not send the message because the peer does not support our required capabilities. " +
log.debug("We did not send the message because the peer does not support our required capabilities. " +
"messageClass={}, peer={}, peers supportedCapabilities={}",
data.getClass().getSimpleName(), peersNodeAddressOptional, capabilities);
}
Expand Down Expand Up @@ -800,6 +800,12 @@ && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {

// Capabilities can be empty. We only check for mandatory if we get some capabilities.
if (!capabilities.isEmpty() && !Capabilities.hasMandatoryCapability(capabilities)) {
String senderNodeAddress = networkEnvelope instanceof SendersNodeAddressMessage ?
((SendersNodeAddressMessage) networkEnvelope).getSenderNodeAddress().getFullAddress() :
"[unknown address]";
log.info("We close a connection to old node {}. " +
"Capabilities of old node: {}, networkEnvelope class name={}",
senderNodeAddress, capabilities.prettyPrint(), networkEnvelope.getClass().getSimpleName());
shutDown(CloseConnectionReason.MANDATORY_CAPABILITIES_NOT_SUPPORTED);
return;
}
Expand Down
3 changes: 1 addition & 2 deletions p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ private boolean checkMaxConnections() {
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> UserThread.runAfter(this::checkMaxConnections, 100, TimeUnit.MILLISECONDS));
return true;
} else {
log.warn("No candidates found to remove (That case should not be possible as we use in the " +
"last case all connections).\n\t" +
log.debug("No candidates found to remove.\n\t" +
"size={}, allConnections={}", size, allConnections);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public GetDataRequestHandler(NetworkNode networkNode, P2PDataStorage dataStorage
///////////////////////////////////////////////////////////////////////////////////////////

public void handle(GetDataRequest getDataRequest, final Connection connection) {
long ts = System.currentTimeMillis();
GetDataResponse getDataResponse = new GetDataResponse(getFilteredProtectedStorageEntries(getDataRequest, connection),
getFilteredPersistableNetworkPayload(getDataRequest, connection),
getDataRequest.getNonce(),
Expand All @@ -105,7 +106,7 @@ public void handle(GetDataRequest getDataRequest, final Connection connection) {
}

SettableFuture<Connection> future = networkNode.sendMessage(connection, getDataResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
Expand All @@ -130,9 +131,11 @@ public void onFailure(@NotNull Throwable throwable) {
}
}
});
log.info("handle GetDataRequest took {} ms", System.currentTimeMillis() - ts);
}

private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest, Connection connection) {
private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest,
Connection connection) {
final Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());

Expand All @@ -144,7 +147,8 @@ private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetD
.collect(Collectors.toSet());
}

private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest, Connection connection) {
private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest,
Connection connection) {
final Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
final Set<Integer> lookupSet = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetDataResponse) {
if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress)) {
if (!stopped) {
long ts1 = System.currentTimeMillis();
GetDataResponse getDataResponse = (GetDataResponse) networkEnvelope;
final Set<ProtectedStorageEntry> dataSet = getDataResponse.getDataSet();
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = getDataResponse.getPersistableNetworkPayloadSet();
Expand All @@ -219,22 +220,22 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {

final NodeAddress sender = connection.getPeersNodeAddressOptional().get();

long ts = System.currentTimeMillis();
long ts2 = System.currentTimeMillis();
AtomicInteger counter = new AtomicInteger();
dataSet.forEach(e -> {
// We don't broadcast here (last param) as we are only connected to the seed node and would be pointless
dataStorage.addProtectedStorageEntry(e, sender, null, false, false);
counter.getAndIncrement();

});
log.info("Processing {} protectedStorageEntries took {} ms.", counter.get(), System.currentTimeMillis() - ts);
log.info("Processing {} protectedStorageEntries took {} ms.", counter.get(), System.currentTimeMillis() - ts2);

/* // engage the firstRequest logic only if we are a seed node. Normal clients get here twice at most.
if (!Capabilities.app.containsAll(Capability.SEED_NODE))
firstRequest = true;*/

if (persistableNetworkPayloadSet != null /*&& firstRequest*/) {
ts = System.currentTimeMillis();
ts2 = System.currentTimeMillis();
persistableNetworkPayloadSet.forEach(e -> {
if (e instanceof LazyProcessedPayload) {
// We use an optimized method as many checks are not required in that case to avoid
Expand All @@ -259,7 +260,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
initialRequestApplied = true;

log.info("Processing {} persistableNetworkPayloads took {} ms.",
persistableNetworkPayloadSet.size(), System.currentTimeMillis() - ts);
persistableNetworkPayloadSet.size(), System.currentTimeMillis() - ts2);
}

cleanup();
Expand All @@ -272,6 +273,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
"We drop that message. nonce={} / requestNonce={}",
nonce, getDataResponse.getRequestNonce());
}
log.info("Processing GetDataResponse took {} ms", System.currentTimeMillis() - ts1);
} else {
log.warn("We have stopped already. We ignore that onDataRequest call.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,17 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
.map(PersistableNetworkPayload::toProtoMessage)
.collect(Collectors.toList())));

return getNetworkEnvelopeBuilder()
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetDataResponse(builder)
.build();
log.info("Sending a GetDataResponse with size = {} bytes", proto.toByteArray().length);
return proto;
}

public static GetDataResponse fromProto(protobuf.GetDataResponse proto, NetworkProtoResolver resolver, int messageVersion) {
public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
NetworkProtoResolver resolver,
int messageVersion) {
log.info("Received a GetDataResponse with size = {} bytes", proto.toByteArray().length);
Set<ProtectedStorageEntry> dataSet = new HashSet<>(
proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry))
Expand Down
Loading

0 comments on commit 9f35383

Please sign in to comment.