From f95a7f5d3f04ba5d61afb32fb245479f5a383df6 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 27 Aug 2019 22:03:23 +0200 Subject: [PATCH 1/7] Add missing listener notification and clean up class --- .../bisq/network/p2p/network/Connection.java | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index b8bf05465a3..a60de75dd88 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -121,8 +121,8 @@ public enum PeerType { private static ConnectionConfig connectionConfig; // Leaving some constants package-private for tests to know limits. - static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb - static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb. + private static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb + private static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb. //TODO decrease limits again after testing private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120); @@ -172,7 +172,7 @@ public static int getPermittedMessageSize() { private RuleViolation ruleViolation; private final ConcurrentHashMap ruleViolations = new ConcurrentHashMap<>(); - private Capabilities capabilities = new Capabilities(); + private final Capabilities capabilities = new Capabilities(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -233,9 +233,9 @@ public Capabilities getCapabilities() { return capabilities; } - Object lock = new Object(); - Queue queueOfBundles = new ConcurrentLinkedQueue<>(); - ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor(); + private final Object lock = new Object(); + private final Queue queueOfBundles = new ConcurrentLinkedQueue<>(); + private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor(); // Called from various threads public void sendMessage(NetworkEnvelope networkEnvelope) { @@ -250,7 +250,7 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000)); if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) { - // pings and offer refresh msg we dont want to log in production + // pings and offer refresh msg we don't want to log in production log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + "Sending direct message to peer" + "Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" + @@ -298,10 +298,11 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { if (!stopped) { synchronized (lock) { BundleOfEnvelopes current = queueOfBundles.poll(); - if(current.getEnvelopes().size() == 1) + if (current != null && current.getEnvelopes().size() == 1) { protoOutputStream.writeEnvelope(current.getEnvelopes().get(0)); - else + } else { protoOutputStream.writeEnvelope(current); + } } } }, lastSendTimeStamp - now, TimeUnit.MILLISECONDS); @@ -386,10 +387,10 @@ private boolean violatesThrottleLimit() { messageTimeStamps.add(now); // clean list - while(messageTimeStamps.size() > msgThrottlePer10Sec) + while (messageTimeStamps.size() > msgThrottlePer10Sec) messageTimeStamps.remove(0); - return violatesThrottleLimit(now,1, msgThrottlePerSec) || violatesThrottleLimit(now,10, msgThrottlePer10Sec); + return violatesThrottleLimit(now, 1, msgThrottlePerSec) || violatesThrottleLimit(now, 10, msgThrottlePer10Sec); } private boolean violatesThrottleLimit(long now, int seconds, int messageCountLimit) { @@ -399,7 +400,7 @@ private boolean violatesThrottleLimit(long now, int seconds, int messageCountLim long compareValue = messageTimeStamps.get(messageTimeStamps.size() - messageCountLimit); // if duration < seconds sec we received too much network_messages - if(now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) { + if (now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) { log.error("violatesThrottleLimit {}/{} second(s)", messageCountLimit, seconds); return true; @@ -436,7 +437,7 @@ public void setPeerType(PeerType peerType) { this.peerType = peerType; } - public void setPeersNodeAddress(NodeAddress peerNodeAddress) { + private void setPeersNodeAddress(NodeAddress peerNodeAddress) { checkNotNull(peerNodeAddress, "peerAddress must not be null"); peersNodeAddressOptional = Optional.of(peerNodeAddress); @@ -494,6 +495,7 @@ public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runn stopped = true; + //noinspection UnstableApiUsage Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } catch (Throwable t) { log.error(t.getMessage()); @@ -534,6 +536,7 @@ private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable R e.printStackTrace(); } + //noinspection UnstableApiUsage MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS); log.debug("Connection shutdown complete " + this.toString()); @@ -705,7 +708,7 @@ public void run() { Thread.sleep(20); } - // Reading the protobuffer message from the inputstream + // Reading the protobuffer message from the inputStream protobuf.NetworkEnvelope proto = protobuf.NetworkEnvelope.parseDelimitedFrom(protoInputStream); if (proto == null) { @@ -794,6 +797,12 @@ && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) { Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities(); if (supportedCapabilities != null) { capabilities.set(supportedCapabilities); + capabilitiesListeners.forEach(weakListener -> { + SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get(); + if (supportedCapabilitiesListener != null) { + supportedCapabilitiesListener.onChanged(supportedCapabilities); + } + }); } } From 79c08b6e049cc2c5634e65c43cfa68588df0f203 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 28 Aug 2019 01:20:10 +0200 Subject: [PATCH 2/7] Remove Capability.SIGNED_ACCOUNT_AGE_WITNESS --- .../bisq/core/setup/CoreNetworkCapabilities.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java b/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java index 0a84eee15f3..b590b3d447f 100644 --- a/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java +++ b/core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java @@ -28,11 +28,21 @@ @Slf4j public class CoreNetworkCapabilities { public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) { - Capabilities.app.addAll(Capability.TRADE_STATISTICS, Capability.TRADE_STATISTICS_2, Capability.ACCOUNT_AGE_WITNESS, Capability.ACK_MSG); - Capabilities.app.addAll(Capability.BUNDLE_OF_ENVELOPES,Capability.SIGNED_ACCOUNT_AGE_WITNESS); + Capabilities.app.addAll( + Capability.TRADE_STATISTICS, + Capability.TRADE_STATISTICS_2, + Capability.ACCOUNT_AGE_WITNESS, + Capability.ACK_MSG, + Capability.BUNDLE_OF_ENVELOPES + ); if (BisqEnvironment.isDaoActivated(bisqEnvironment)) { - Capabilities.app.addAll(Capability.PROPOSAL, Capability.BLIND_VOTE, Capability.BSQ_BLOCK, Capability.DAO_STATE); + Capabilities.app.addAll( + Capability.PROPOSAL, + Capability.BLIND_VOTE, + Capability.BSQ_BLOCK, + Capability.DAO_STATE + ); maybeApplyDaoFullMode(bisqEnvironment); } From 72e291cf9c80066611b600fed049b54a67260971 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 28 Aug 2019 01:21:16 +0200 Subject: [PATCH 3/7] Call supportedCapabilitiesListener on Userthread --- .../bisq/network/p2p/network/Connection.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index a60de75dd88..269d775215b 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -298,10 +298,12 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { if (!stopped) { synchronized (lock) { BundleOfEnvelopes current = queueOfBundles.poll(); - if (current != null && current.getEnvelopes().size() == 1) { - protoOutputStream.writeEnvelope(current.getEnvelopes().get(0)); - } else { - protoOutputStream.writeEnvelope(current); + if (current != null) { + if (current.getEnvelopes().size() == 1) { + protoOutputStream.writeEnvelope(current.getEnvelopes().get(0)); + } else { + protoOutputStream.writeEnvelope(current); + } } } } @@ -796,13 +798,15 @@ && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) { if (networkEnvelope instanceof SupportedCapabilitiesMessage) { Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities(); if (supportedCapabilities != null) { - capabilities.set(supportedCapabilities); - capabilitiesListeners.forEach(weakListener -> { - SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get(); - if (supportedCapabilitiesListener != null) { - supportedCapabilitiesListener.onChanged(supportedCapabilities); - } - }); + if (!capabilities.equals(supportedCapabilities)) { + capabilities.set(supportedCapabilities); + capabilitiesListeners.forEach(weakListener -> { + SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get(); + if (supportedCapabilitiesListener != null) { + UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities)); + } + }); + } } } From ca0a42ba42d3e150a58fc21fd69a9c425d2f59fb Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 28 Aug 2019 01:21:43 +0200 Subject: [PATCH 4/7] Use node address for thread name; cleanup --- .../java/bisq/network/p2p/network/NetworkNode.java | 13 ++++++++----- .../bisq/network/p2p/peers/peerexchange/Peer.java | 3 ++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index caac29de1db..ac1ff69256b 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -96,7 +96,8 @@ public abstract class NetworkNode implements MessageListener { // when the events happen. abstract public void start(@Nullable SetupListener setupListener); - public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelope) { + public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, + NetworkEnvelope networkEnvelope) { log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope)); checkNotNull(peersNodeAddress, "peerAddress must not be null"); @@ -112,9 +113,9 @@ public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddr final SettableFuture resultFuture = SettableFuture.create(); ListenableFuture future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress); + Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress()); - if(peersNodeAddress.equals(getNodeAddress())){ + if (peersNodeAddress.equals(getNodeAddress())) { throw new ConnectException("We do not send a message to ourselves"); } @@ -162,7 +163,8 @@ public void onConnection(Connection connection) { } @Override - public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { + public void onDisconnect(CloseConnectionReason closeConnectionReason, + Connection connection) { log.trace("onDisconnect connectionListener\n\tconnection={}" + connection); //noinspection SuspiciousMethodCalls outBoundConnections.remove(connection); @@ -264,7 +266,8 @@ public Socks5Proxy getSocksProxy() { public SettableFuture sendMessage(Connection connection, NetworkEnvelope networkEnvelope) { // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block ListenableFuture future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid()); + String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid(); + Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id); connection.sendMessage(networkEnvelope); return connection; }); diff --git a/p2p/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java b/p2p/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java index d08e1dbe78f..cb3f8d64e7e 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java @@ -97,8 +97,9 @@ public Date getDate() { @Override public void onChanged(Capabilities supportedCapabilities) { - if (!supportedCapabilities.isEmpty()) + if (!supportedCapabilities.isEmpty()) { capabilities.set(supportedCapabilities); + } } From bb628dc8515d9d675229624e55fbe954633bb591 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 28 Aug 2019 20:31:06 +0200 Subject: [PATCH 5/7] Avoid that removed mailbox messages get added again --- .../network/p2p/storage/P2PDataStorage.java | 68 ++++++++++++++----- .../p2p/storage/messages/AddOncePayload.java | 24 +++++++ .../payload/MailboxStoragePayload.java | 3 +- 3 files changed, 78 insertions(+), 17 deletions(-) create mode 100644 p2p/src/main/java/bisq/network/p2p/storage/messages/AddOncePayload.java diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 9a35caefbf7..11066b22ca6 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -26,6 +26,7 @@ import bisq.network.p2p.peers.BroadcastHandler; import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.storage.messages.AddDataMessage; +import bisq.network.p2p.storage.messages.AddOncePayload; import bisq.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage; import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.network.p2p.storage.messages.RefreshOfferMessage; @@ -112,6 +113,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers @Getter private final Map map = new ConcurrentHashMap<>(); + private final Set removedAddOncePayloads = new HashSet<>(); private final Set hashMapChangedListeners = new CopyOnWriteArraySet<>(); private Timer removeExpiredEntriesTimer; @@ -366,18 +368,30 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn return addProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner, true); } - public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, - @Nullable BroadcastHandler.Listener listener, boolean isDataOwner, boolean allowBroadcast) { - final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, + @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener, + boolean isDataOwner, + boolean allowBroadcast) { + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); + + if (protectedStoragePayload instanceof AddOncePayload && + removedAddOncePayloads.contains(hashOfPayload)) { + log.warn("We have already removed that AddOncePayload by a previous removeDataMessage. " + + "We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString()); + return false; + } + boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload); boolean result = checkPublicKeys(protectedStorageEntry, true) && checkSignature(protectedStorageEntry) && sequenceNrValid; boolean containsKey = map.containsKey(hashOfPayload); - if (containsKey) + if (containsKey) { result = result && checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload); + } // printData("before add"); if (result) { @@ -422,7 +436,9 @@ public void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorag broadcast(new AddDataMessage(protectedStorageEntry), sender, broadcastListener, isDataOwner); } - public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, @Nullable NodeAddress sender, boolean isDataOwner) { + public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, + @Nullable NodeAddress sender, + boolean isDataOwner) { byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr(); byte[] signature = refreshTTLMessage.getSignature(); ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload()); @@ -464,7 +480,9 @@ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, @Nullable NodeA } } - public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) { + public boolean remove(ProtectedStorageEntry protectedStorageEntry, + @Nullable NodeAddress sender, + boolean isDataOwner) { final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); boolean containsKey = map.containsKey(hashOfPayload); @@ -483,6 +501,8 @@ && checkSignature(protectedStorageEntry) sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis())); sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); + maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); + broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner); removeFromProtectedDataStore(protectedStorageEntry); @@ -506,25 +526,31 @@ private void removeFromProtectedDataStore(ProtectedStorageEntry protectedStorage } @SuppressWarnings("UnusedReturnValue") - public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) { - ByteArray hashOfData = get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload()); - boolean containsKey = map.containsKey(hashOfData); + public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, + @Nullable NodeAddress sender, + boolean isDataOwner) { + ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload(); + ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); + boolean containsKey = map.containsKey(hashOfPayload); if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data."); + boolean result = containsKey && checkPublicKeys(protectedMailboxStorageEntry, false) - && isSequenceNrValid(protectedMailboxStorageEntry.getSequenceNumber(), hashOfData) + && isSequenceNrValid(protectedMailboxStorageEntry.getSequenceNumber(), hashOfPayload) && protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(protectedMailboxStorageEntry.getReceiversPubKey()) // at remove both keys are the same (only receiver is able to remove data) && checkSignature(protectedMailboxStorageEntry) - && checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.getReceiversPubKey(), hashOfData); + && checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.getReceiversPubKey(), hashOfPayload); // printData("before removeMailboxData"); if (result) { - doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData); + doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload); printData("after removeMailboxData"); - sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.getSequenceNumber(), System.currentTimeMillis())); + sequenceNumberMap.put(hashOfPayload, new MapValue(protectedMailboxStorageEntry.getSequenceNumber(), System.currentTimeMillis())); sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); + maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); + broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner); } else { log.debug("removeMailboxData failed"); @@ -532,7 +558,15 @@ && checkSignature(protectedMailboxStorageEntry) return result; } - public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload, KeyPair ownerStoragePubKey) + private void maybeAddToRemoveAddOncePayloads(ProtectedStoragePayload protectedStoragePayload, + ByteArray hashOfData) { + if (protectedStoragePayload instanceof AddOncePayload) { + removedAddOncePayloads.add(hashOfData); + } + } + + public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload, + KeyPair ownerStoragePubKey) throws CryptoException { ByteArray hashOfData = get32ByteHashAsByteArray(protectedStoragePayload); int sequenceNumber; @@ -546,7 +580,8 @@ public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload pr return new ProtectedStorageEntry(protectedStoragePayload, ownerStoragePubKey.getPublic(), sequenceNumber, signature); } - public RefreshOfferMessage getRefreshTTLMessage(ProtectedStoragePayload protectedStoragePayload, KeyPair ownerStoragePubKey) + public RefreshOfferMessage getRefreshTTLMessage(ProtectedStoragePayload protectedStoragePayload, + KeyPair ownerStoragePubKey) throws CryptoException { ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); int sequenceNumber; @@ -561,7 +596,8 @@ public RefreshOfferMessage getRefreshTTLMessage(ProtectedStoragePayload protecte } public ProtectedMailboxStorageEntry getMailboxDataWithSignedSeqNr(MailboxStoragePayload expirableMailboxStoragePayload, - KeyPair storageSignaturePubKey, PublicKey receiversPublicKey) + KeyPair storageSignaturePubKey, + PublicKey receiversPublicKey) throws CryptoException { ByteArray hashOfData = get32ByteHashAsByteArray(expirableMailboxStoragePayload); int sequenceNumber; diff --git a/p2p/src/main/java/bisq/network/p2p/storage/messages/AddOncePayload.java b/p2p/src/main/java/bisq/network/p2p/storage/messages/AddOncePayload.java new file mode 100644 index 00000000000..5cd5b74864d --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/storage/messages/AddOncePayload.java @@ -0,0 +1,24 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.storage.messages; + +/** + * Marker interface for messages which must not be added again after a remove message has been received (e.g. MailboxMessages). + */ +public interface AddOncePayload { +} diff --git a/p2p/src/main/java/bisq/network/p2p/storage/payload/MailboxStoragePayload.java b/p2p/src/main/java/bisq/network/p2p/storage/payload/MailboxStoragePayload.java index 64d94caaacb..7f40c8e9def 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/payload/MailboxStoragePayload.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/payload/MailboxStoragePayload.java @@ -18,6 +18,7 @@ package bisq.network.p2p.storage.payload; import bisq.network.p2p.PrefixedSealedAndSignedMessage; +import bisq.network.p2p.storage.messages.AddOncePayload; import bisq.common.crypto.Sig; import bisq.common.util.ExtraDataMapValidator; @@ -49,7 +50,7 @@ @Getter @EqualsAndHashCode @Slf4j -public final class MailboxStoragePayload implements ProtectedStoragePayload, ExpirablePayload { +public final class MailboxStoragePayload implements ProtectedStoragePayload, ExpirablePayload, AddOncePayload { private final PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage; private PublicKey senderPubKeyForAddOperation; private final byte[] senderPubKeyForAddOperationBytes; From ab4aa64424a9c42d7081af563fb9f7644555f590 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 28 Aug 2019 20:51:01 +0200 Subject: [PATCH 6/7] Smal performance optimisations, cleanups --- .../network/p2p/storage/P2PDataStorage.java | 108 +++++++++--------- 1 file changed, 56 insertions(+), 52 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 11066b22ca6..c2327be472a 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -76,6 +76,7 @@ import java.time.Clock; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -264,7 +265,8 @@ public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection NetworkPayload networkPayload = protectedStorageEntry.getProtectedStoragePayload(); if (networkPayload instanceof ExpirablePayload && networkPayload instanceof RequiresOwnerIsOnlinePayload) { NodeAddress ownerNodeAddress = ((RequiresOwnerIsOnlinePayload) networkPayload).getOwnerNodeAddress(); - if (ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) { + if (connection.getPeersNodeAddressOptional().isPresent() && + ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) { // We have a RequiresLiveOwnerData data object with the node address of the // disconnected peer. We remove that data from our map. @@ -316,9 +318,9 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, boolean reBroadcast, boolean checkDate) { log.debug("addPersistableNetworkPayload payload={}", payload); - final byte[] hash = payload.getHash(); + byte[] hash = payload.getHash(); if (payload.verifyHashSize()) { - final ByteArray hashAsByteArray = new ByteArray(hash); + ByteArray hashAsByteArray = new ByteArray(hash); boolean containsKey = getAppendOnlyDataStoreMap().containsKey(hashAsByteArray); if (!containsKey || reBroadcast) { if (!(payload instanceof DateTolerantPayload) || !checkDate || ((DateTolerantPayload) payload).isDateInTolerance(clock)) { @@ -352,9 +354,9 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // might be added then but as we have the data already added calling them would be irrelevant as well. // TODO find a way to avoid the second call... public boolean addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) { - final byte[] hash = payload.getHash(); + byte[] hash = payload.getHash(); if (payload.verifyHashSize()) { - final ByteArray hashAsByteArray = new ByteArray(hash); + ByteArray hashAsByteArray = new ByteArray(hash); appendOnlyDataStoreService.put(hashAsByteArray, payload); return true; } else { @@ -384,9 +386,9 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn } boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload); - boolean result = checkPublicKeys(protectedStorageEntry, true) - && checkSignature(protectedStorageEntry) - && sequenceNrValid; + boolean result = sequenceNrValid && + checkPublicKeys(protectedStorageEntry, true) + && checkSignature(protectedStorageEntry); boolean containsKey = map.containsKey(hashOfPayload); if (containsKey) { @@ -395,7 +397,7 @@ && checkSignature(protectedStorageEntry) // printData("before add"); if (result) { - final boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload); + boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload); if (!containsKey || hasSequenceNrIncreased) { // At startup we don't have the item so we store it. At updates of the seq nr we store as well. @@ -429,39 +431,32 @@ && checkSignature(protectedStorageEntry) return result; } - public void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, - @Nullable NodeAddress sender, - @Nullable BroadcastHandler.Listener broadcastListener, - boolean isDataOwner) { + private void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, + @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener broadcastListener, + boolean isDataOwner) { broadcast(new AddDataMessage(protectedStorageEntry), sender, broadcastListener, isDataOwner); } public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, @Nullable NodeAddress sender, boolean isDataOwner) { - byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr(); - byte[] signature = refreshTTLMessage.getSignature(); ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload()); - int sequenceNumber = refreshTTLMessage.getSequenceNumber(); - if (map.containsKey(hashOfPayload)) { ProtectedStorageEntry storedData = map.get(hashOfPayload); + int sequenceNumber = refreshTTLMessage.getSequenceNumber(); if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) { log.trace("We got that message with that seq nr already from another peer. We ignore that message."); return true; } else { PublicKey ownerPubKey = storedData.getProtectedStoragePayload().getOwnerPubKey(); - final boolean checkSignature = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature); - final boolean hasSequenceNrIncreased = hasSequenceNrIncreased(sequenceNumber, hashOfPayload); - final boolean checkIfStoredDataPubKeyMatchesNewDataPubKey = checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, - hashOfPayload); - boolean allValid = checkSignature && - hasSequenceNrIncreased && - checkIfStoredDataPubKeyMatchesNewDataPubKey; - + byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr(); + byte[] signature = refreshTTLMessage.getSignature(); // printData("before refreshTTL"); - if (allValid) { + if (hasSequenceNrIncreased(sequenceNumber, hashOfPayload) && + checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload) && + checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature)) { log.debug("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100)); storedData.refreshTTL(); storedData.updateSequenceNumber(sequenceNumber); @@ -471,8 +466,10 @@ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000); broadcast(refreshTTLMessage, sender, null, isDataOwner); + return true; } - return allValid; + + return false; } } else { log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing."); @@ -483,7 +480,7 @@ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) { - final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); boolean containsKey = map.containsKey(hashOfPayload); if (!containsKey) @@ -535,18 +532,20 @@ public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxSt if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data."); - boolean result = containsKey - && checkPublicKeys(protectedMailboxStorageEntry, false) - && isSequenceNrValid(protectedMailboxStorageEntry.getSequenceNumber(), hashOfPayload) - && protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(protectedMailboxStorageEntry.getReceiversPubKey()) // at remove both keys are the same (only receiver is able to remove data) - && checkSignature(protectedMailboxStorageEntry) - && checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.getReceiversPubKey(), hashOfPayload); + int sequenceNumber = protectedMailboxStorageEntry.getSequenceNumber(); + PublicKey receiversPubKey = protectedMailboxStorageEntry.getReceiversPubKey(); + boolean result = containsKey && + isSequenceNrValid(sequenceNumber, hashOfPayload) && + checkPublicKeys(protectedMailboxStorageEntry, false) && + protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(receiversPubKey) && // at remove both keys are the same (only receiver is able to remove data) + checkSignature(protectedMailboxStorageEntry) && + checkIfStoredMailboxDataMatchesNewMailboxData(receiversPubKey, hashOfPayload); // printData("before removeMailboxData"); if (result) { doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload); printData("after removeMailboxData"); - sequenceNumberMap.put(hashOfPayload, new MapValue(protectedMailboxStorageEntry.getSequenceNumber(), System.currentTimeMillis())); + sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); @@ -624,14 +623,17 @@ public void addAppendOnlyDataStoreListener(AppendOnlyDataStoreListener listener) appendOnlyDataStoreListeners.add(listener); } + @SuppressWarnings("unused") public void removeAppendOnlyDataStoreListener(AppendOnlyDataStoreListener listener) { appendOnlyDataStoreListeners.remove(listener); } + @SuppressWarnings("unused") public void addProtectedDataStoreListener(ProtectedDataStoreListener listener) { protectedDataStoreListeners.add(listener); } + @SuppressWarnings("unused") public void removeProtectedDataStoreListener(ProtectedDataStoreListener listener) { protectedDataStoreListeners.remove(listener); } @@ -644,7 +646,7 @@ public void removeProtectedDataStoreListener(ProtectedDataStoreListener listener private void doRemoveProtectedExpirableData(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) { map.remove(hashOfPayload); log.trace("Data removed from our map. We broadcast the message to our peers."); - hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedStorageEntry)); + hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry)); } private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) { @@ -717,7 +719,7 @@ private boolean checkSignature(ProtectedStorageEntry protectedStorageEntry) { // in the contained mailbox message, or the pubKey of other kinds of network_messages. private boolean checkPublicKeys(ProtectedStorageEntry protectedStorageEntry, boolean isAddOperation) { boolean result; - final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); if (protectedStoragePayload instanceof MailboxStoragePayload) { MailboxStoragePayload payload = (MailboxStoragePayload) protectedStoragePayload; if (isAddOperation) @@ -787,7 +789,7 @@ public static ByteArray getCompactHashAsByteArray(ProtectedStoragePayload protec return new ByteArray(getCompactHash(protectedStoragePayload)); } - public static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) { + private static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) { return Hash.getSha256Ripemd160hash(protectedStoragePayload.toProtoMessage().toByteArray()); } @@ -795,9 +797,9 @@ public static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayl private Map getPurgedSequenceNumberMap(Map persisted) { Map purged = new HashMap<>(); long maxAgeTs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(PURGE_AGE_DAYS); - persisted.entrySet().stream().forEach(entry -> { - if (entry.getValue().timeStamp > maxAgeTs) - purged.put(entry.getKey(), entry.getValue()); + persisted.forEach((key, value) -> { + if (value.timeStamp > maxAgeTs) + purged.put(key, value); }); return purged; } @@ -809,12 +811,12 @@ private void printData(String info) { // We print the items sorted by hash with the payload class name and id List> tempList = map.values().stream() .map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(get32ByteHashAsByteArray(e.getProtectedStoragePayload()).bytes), e)) + .sorted(Comparator.comparing(o -> o.first)) .collect(Collectors.toList()); - tempList.sort((o1, o2) -> o1.first.compareTo(o2.first)); - tempList.stream().forEach(e -> { - final ProtectedStorageEntry storageEntry = e.second; - final ProtectedStoragePayload protectedStoragePayload = storageEntry.getProtectedStoragePayload(); - final MapValue mapValue = sequenceNumberMap.get(get32ByteHashAsByteArray(protectedStoragePayload)); + tempList.forEach(e -> { + ProtectedStorageEntry storageEntry = e.second; + ProtectedStoragePayload protectedStoragePayload = storageEntry.getProtectedStoragePayload(); + MapValue mapValue = sequenceNumberMap.get(get32ByteHashAsByteArray(protectedStoragePayload)); sb.append("\n") .append("Hash=") .append(e.first) @@ -838,7 +840,7 @@ private void printData(String info) { } /** - * @param data + * @param data Network payload * @return Hash of data */ public static byte[] get32ByteHash(NetworkPayload data) { @@ -901,10 +903,6 @@ public ByteArray(byte[] bytes) { // Protobuffer /////////////////////////////////////////////////////////////////////////////////////////// - public ByteArray(String hex) { - this.bytes = Utilities.decodeFromHex(hex); - } - @Override public protobuf.ByteArray toProtoMessage() { return protobuf.ByteArray.newBuilder().setBytes(ByteString.copyFrom(bytes)).build(); @@ -914,6 +912,12 @@ public static ByteArray fromProto(protobuf.ByteArray proto) { return new ByteArray(proto.getBytes().toByteArray()); } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Util + /////////////////////////////////////////////////////////////////////////////////////////// + + @SuppressWarnings("unused") public String getHex() { return Utilities.encodeToHex(bytes); } @@ -937,7 +941,7 @@ public static final class MapValue implements PersistablePayload { final public int sequenceNr; final public long timeStamp; - public MapValue(int sequenceNr, long timeStamp) { + MapValue(int sequenceNr, long timeStamp) { this.sequenceNr = sequenceNr; this.timeStamp = timeStamp; } From 55092edb6c5b3fea5b917eadc8d4c21dc33e614b Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 28 Aug 2019 21:06:17 +0200 Subject: [PATCH 7/7] Disconnect nodes which do not have the mandatory capability --- common/src/main/java/bisq/common/app/Capabilities.java | 10 +++++++++- .../main/java/bisq/network/p2p/network/Connection.java | 5 +++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/bisq/common/app/Capabilities.java b/common/src/main/java/bisq/common/app/Capabilities.java index 841b9d1c494..943174a5315 100644 --- a/common/src/main/java/bisq/common/app/Capabilities.java +++ b/common/src/main/java/bisq/common/app/Capabilities.java @@ -39,6 +39,10 @@ public class Capabilities { */ public static final Capabilities app = new Capabilities(); + // Defines which most recent capability any node need to support. + // This helps to clean network from very old inactive but still running nodes. + private static final Capability mandatoryCapability = Capability.DAO_STATE; + protected final Set capabilities = new HashSet<>(); public Capabilities(Capability... capabilities) { @@ -71,7 +75,7 @@ public void addAll(Capability... capabilities) { } public void addAll(Capabilities capabilities) { - if(capabilities != null) + if (capabilities != null) this.capabilities.addAll(capabilities.capabilities); } @@ -111,6 +115,10 @@ public static Capabilities fromIntList(List capabilities) { .collect(Collectors.toSet())); } + public static boolean hasMandatoryCapability(Capabilities capabilities) { + return capabilities.capabilities.stream().anyMatch(c -> c == mandatoryCapability); + } + @Override public String toString() { return Arrays.toString(Capabilities.toIntList(this).toArray()); diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 269d775215b..9981e1af378 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -799,6 +799,11 @@ && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) { Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities(); if (supportedCapabilities != null) { if (!capabilities.equals(supportedCapabilities)) { + if (!Capabilities.hasMandatoryCapability(capabilities)) { + shutDown(CloseConnectionReason.RULE_VIOLATION); + return; + } + capabilities.set(supportedCapabilities); capabilitiesListeners.forEach(weakListener -> { SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();