Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve broadcast handler #6500

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public String getAddress() {
// cappedBurnAmountShare is a % value represented as double. Smallest supported value is 0.01% -> 0.0001.
// By multiplying it with 10000 and using Math.floor we limit the candidate to 0.01%.
// Entries with 0 will be ignored in the selection method, so we do not need to filter them out.
// List<BurningManCandidate> burningManCandidates = new ArrayList<>(burningManCandidatesByName.values());
int ceiling = 10000;
List<Long> amountList = activeBurningManCandidates.stream()
.map(BurningManCandidate::getCappedBurnAmountShare)
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/bisq/core/provider/fee/FeeRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public void onSuccess(Tuple2<Map<String, Long>, Map<String, Long>> feeData) {
}

public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(throwable);
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
}
}, MoreExecutors.directExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public void onSuccess(String mempoolData) {
}

public void onFailure(@NotNull Throwable throwable) {
mempoolServiceCallback.setException(throwable);
if (!mempoolServiceCallback.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
mempoolServiceCallback.cancel(true);
}
}
}, MoreExecutors.directExecutor());
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/bisq/core/provider/price/PriceRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> market
if (!shutDownRequested) {
resultFuture.set(marketPriceTuple);
}

}

public void onFailure(@NotNull Throwable throwable) {
if (!shutDownRequested) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
if (!shutDownRequested && !resultFuture.setException(new PriceRequestException(throwable, baseUrl))) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
}
}, MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public enum CloseConnectionReason {

// illegal requests
RULE_VIOLATION(true, false),
PEER_BANNED(true, false),
PEER_BANNED(false, false),
INVALID_CLASS_RECEIVED(false, false),
MANDATORY_CAPABILITIES_NOT_SUPPORTED(false, false);

Expand Down
21 changes: 18 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,12 @@ public void onSuccess(Connection connection) {

public void onFailure(@NotNull Throwable throwable) {
log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString());
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());

Expand Down Expand Up @@ -311,13 +316,23 @@ public void onSuccess(Connection connection) {
}

public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());

} catch (RejectedExecutionException exception) {
log.error("RejectedExecutionException at sendMessage: ", exception);
resultFuture.setException(exception);
UserThread.execute(() -> {
if (!resultFuture.setException(exception)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
return resultFuture;
}
Expand Down
95 changes: 63 additions & 32 deletions p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class BroadcastHandler implements PeerManager.Listener {
Expand Down Expand Up @@ -72,9 +79,14 @@ public interface Listener {
private final ResultHandler resultHandler;
private final String uid;

private boolean stopped, timeoutTriggered;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast;
private final AtomicBoolean stopped = new AtomicBoolean();
private final AtomicBoolean timeoutTriggered = new AtomicBoolean();
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
@Nullable
private Timer timeoutTimer;
private final Set<SettableFuture<Connection>> sendMessageFutures = new CopyOnWriteArraySet<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -98,36 +110,40 @@ public interface Listener {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested,
ListeningExecutorService executor) {
if (broadcastRequests.isEmpty()) {
return;
}

List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);

int delay;
if (shutDownRequested) {
delay = 1;
// We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
} else {
if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
delay = 50;
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size());
numPeersForBroadcast.set(Math.min(7, confirmedConnections.size()));
delay = 100;
}
}

setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);

int iterations = numPeersForBroadcast;
int iterations = numPeersForBroadcast.get();
for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay;
Connection connection = confirmedConnections.get(i);
UserThread.runAfterRandomDelay(() -> {
if (stopped) {
if (stopped.get()) {
return;
}

Expand All @@ -139,8 +155,8 @@ public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
// Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
Expand All @@ -149,20 +165,24 @@ public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
if (connection.isStopped()) {
// Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
}

sendToPeer(connection, broadcastRequestsForConnection, executor);
try {
sendToPeer(connection, broadcastRequestsForConnection, executor);
} catch (RejectedExecutionException e) {
log.error("RejectedExecutionException at broadcast ", e);
cleanup();
}
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
}

public void cancel() {
stopped = true;
cleanup();
}

Expand Down Expand Up @@ -203,13 +223,14 @@ private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastReq
boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast.get() + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> {
if (stopped) {
if (stopped.get()) {
return;
}

timeoutTriggered = true;
timeoutTriggered.set(true);
numOfFailedBroadcasts.incrementAndGet();

log.warn("Broadcast did not complete after {} sec.\n" +
"numPeersForBroadcast={}\n" +
Expand Down Expand Up @@ -244,13 +265,13 @@ private void sendToPeer(Connection connection,
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);

sendMessageFutures.add(future);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++;
numOfCompletedBroadcasts.incrementAndGet();

if (stopped) {
if (stopped.get()) {
return;
}

Expand All @@ -260,11 +281,10 @@ public void onSuccess(Connection connection) {

@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage());
numOfFailedBroadcasts++;
log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable);
numOfFailedBroadcasts.incrementAndGet();

if (stopped) {
if (stopped.get()) {
return;
}

Expand All @@ -286,43 +306,54 @@ private BroadcastMessage getMessage(List<Broadcaster.BroadcastRequest> broadcast
}

private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3));
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast.get(), 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) {
if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.filter(Objects::nonNull)
.forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests));
} else {
// We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
// Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts;
int maxPossibleSuccessCases = numPeersForBroadcast.get() - numOfFailedBroadcasts.get();
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1;
// We did not reach resilience level and timeout prevents to reach it later
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget;
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget;
if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) {
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts));
.filter(Objects::nonNull)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get()));
}
}
}

private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) {
if (numOfCompletedBroadcasts.get() + numOfFailedBroadcasts.get() == numPeersForBroadcast.get()) {
cleanup();
}
}

private void cleanup() {
stopped = true;
if (stopped.get()) {
return;
}

stopped.set(true);

if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}

sendMessageFutures.stream()
.filter(future -> !future.isCancelled() && !future.isDone())
.forEach(future -> future.cancel(true));
sendMessageFutures.clear();

peerManager.removeListener(this);
resultHandler.onCompleted(this);
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public Broadcaster(NetworkNode networkNode,
this.peerManager = peerManager;

ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster",
maxConnections * 2,
maxConnections * 3,
maxConnections * 4,
30,
30);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
Expand Down