diff --git a/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/bisq_easy/open_trades/trade_state/states/BuyerState2a.java b/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/bisq_easy/open_trades/trade_state/states/BuyerState2a.java index 81cafef656..c54770b0c5 100644 --- a/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/bisq_easy/open_trades/trade_state/states/BuyerState2a.java +++ b/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/bisq_easy/open_trades/trade_state/states/BuyerState2a.java @@ -27,8 +27,7 @@ import bisq.desktop.components.controls.validator.SettableErrorValidator; import bisq.desktop.components.overlay.Popup; import bisq.i18n.Res; -import bisq.support.mediation.MediationRequestService; -import bisq.support.moderator.ModeratorService; +import bisq.support.moderator.ModerationRequestService; import bisq.trade.bisq_easy.BisqEasyTrade; import bisq.user.profile.UserProfile; import javafx.beans.property.BooleanProperty; @@ -55,8 +54,7 @@ public View getView() { private static class Controller extends BaseState.Controller { private final BisqEasyService bisqEasyService; - private final MediationRequestService mediationRequestService; - private final ModeratorService moderatorService; + private final ModerationRequestService moderationRequestService; private Controller(ServiceProvider serviceProvider, BisqEasyTrade bisqEasyTrade, @@ -64,8 +62,7 @@ private Controller(ServiceProvider serviceProvider, super(serviceProvider, bisqEasyTrade, channel); bisqEasyService = serviceProvider.getBisqEasyService(); - mediationRequestService = serviceProvider.getSupportService().getMediationRequestService(); - moderatorService = serviceProvider.getSupportService().getModeratorService(); + moderationRequestService = serviceProvider.getSupportService().getModerationRequestService(); } @Override @@ -92,7 +89,7 @@ public void onActivate() { // Report to moderator String message = "Account data of " + peerUserName + " is banned: " + sellersAccountData; - moderatorService.reportUserProfile(peer, message); + moderationRequestService.reportUserProfile(peer, message); // We reject the trade to avoid the banned user can continue bisqEasyTradeService.cancelTrade(bisqEasyTrade); diff --git a/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/components/ReportToModeratorWindow.java b/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/components/ReportToModeratorWindow.java index 9c762e4174..e01082aeff 100644 --- a/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/components/ReportToModeratorWindow.java +++ b/apps/desktop/desktop/src/main/java/bisq/desktop/main/content/components/ReportToModeratorWindow.java @@ -24,7 +24,7 @@ import bisq.desktop.overlay.OverlayController; import bisq.desktop.overlay.OverlayModel; import bisq.i18n.Res; -import bisq.support.moderator.ModeratorService; +import bisq.support.moderator.ModerationRequestService; import bisq.support.moderator.ReportToModeratorMessage; import bisq.user.profile.UserProfile; import javafx.beans.property.BooleanProperty; @@ -67,11 +67,11 @@ private static class Controller implements InitWithDataController { @Getter private final View view; private final Model model; - private final ModeratorService moderatorService; + private final ModerationRequestService moderationRequestService; private final ServiceProvider serviceProvider; private Controller(ServiceProvider serviceProvider) { - moderatorService = serviceProvider.getSupportService().getModeratorService(); + moderationRequestService = serviceProvider.getSupportService().getModerationRequestService(); this.serviceProvider = serviceProvider; model = new Model(); view = new View(model, this); @@ -101,7 +101,7 @@ void onReport() { return; } - moderatorService.reportUserProfile(model.getAccusedUserProfile(), message); + moderationRequestService.reportUserProfile(model.getAccusedUserProfile(), message); onCancel(); } diff --git a/chat/src/main/java/bisq/chat/ChatChannelService.java b/chat/src/main/java/bisq/chat/ChatChannelService.java index 910edf970d..ae8eefaed5 100644 --- a/chat/src/main/java/bisq/chat/ChatChannelService.java +++ b/chat/src/main/java/bisq/chat/ChatChannelService.java @@ -62,19 +62,32 @@ public void setChatChannelNotificationType(ChatChannel ch } public void addMessage(M message, C channel) { - if (bannedUserService.isUserProfileBanned(message.getAuthorUserProfileId())) { + String authorUserProfileId = message.getAuthorUserProfileId(); + if (bannedUserService.isUserProfileBanned(authorUserProfileId)) { log.warn("Message ignored as sender is banned"); return; } + if (bannedUserService.isRateLimitExceeding(authorUserProfileId)) { + log.warn("Message ignored as sender exceeded rate limit"); + return; + } synchronized (getPersistableStore()) { - channel.addChatMessage(message); + boolean changed = channel.addChatMessage(message); + if (changed) { + checkRateLimit(authorUserProfileId, message.getDate()); + } } persist(); } protected boolean isValid(M message) { - if (bannedUserService.isUserProfileBanned(message.getAuthorUserProfileId())) { - log.warn("Message invalid as sender is banned. AuthorUserProfileId={}",message.getAuthorUserProfileId()); + String authorUserProfileId = message.getAuthorUserProfileId(); + if (bannedUserService.isUserProfileBanned(authorUserProfileId)) { + log.warn("Message invalid as sender is banned. AuthorUserProfileId={}", authorUserProfileId); + return false; + } + if (bannedUserService.isRateLimitExceeding(authorUserProfileId)) { + log.warn("Message ignored as sender exceeded rate limit"); return false; } return true; @@ -123,12 +136,20 @@ public Optional getDefaultChannel() { protected abstract String getChannelTitlePostFix(ChatChannel chatChannel); protected void addMessageReaction(ChatMessageReaction chatMessageReaction, M message) { - if (bannedUserService.isUserProfileBanned(chatMessageReaction.getUserProfileId())) { - log.warn("Reaction ignored as sender is banned."); + String authorUserProfileId = chatMessageReaction.getUserProfileId(); + if (bannedUserService.isUserProfileBanned(authorUserProfileId)) { + log.warn("ChatMessageReaction ignored as sender is banned."); + return; + } + if (bannedUserService.isRateLimitExceeding(authorUserProfileId)) { + log.warn("ChatMessageReaction ignored as sender exceeded rate limit"); return; } synchronized (getPersistableStore()) { - message.addChatMessageReaction(chatMessageReaction); + boolean changed = message.addChatMessageReaction(chatMessageReaction); + if (changed) { + checkRateLimit(authorUserProfileId, chatMessageReaction.getDate()); + } } persist(); } @@ -139,4 +160,6 @@ protected void removeMessageReaction(ChatMessageReaction chatMessageReaction, M } persist(); } + + protected abstract void checkRateLimit(String authorUserProfileId, long messageDate); } diff --git a/chat/src/main/java/bisq/chat/ChatMessage.java b/chat/src/main/java/bisq/chat/ChatMessage.java index 92f7c0aa0e..08596caeca 100644 --- a/chat/src/main/java/bisq/chat/ChatMessage.java +++ b/chat/src/main/java/bisq/chat/ChatMessage.java @@ -184,5 +184,5 @@ public boolean canShowReactions() { return false; } - public abstract void addChatMessageReaction(ChatMessageReaction reaction); + public abstract boolean addChatMessageReaction(ChatMessageReaction reaction); } diff --git a/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeMessage.java b/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeMessage.java index bb8145c9ca..82d3dd0e1d 100644 --- a/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeMessage.java +++ b/chat/src/main/java/bisq/chat/bisq_easy/open_trades/BisqEasyOpenTradeMessage.java @@ -104,20 +104,20 @@ public BisqEasyOpenTradeMessage(String tradeId, } public BisqEasyOpenTradeMessage(String tradeId, - String messageId, - ChatChannelDomain chatChannelDomain, - String channelId, - UserProfile senderUserProfile, - String receiverUserProfileId, - NetworkId receiverNetworkId, - @Nullable String text, - Optional citation, - long date, - boolean wasEdited, - Optional mediator, - ChatMessageType chatMessageType, - Optional bisqEasyOffer, - Set reactions) { + String messageId, + ChatChannelDomain chatChannelDomain, + String channelId, + UserProfile senderUserProfile, + String receiverUserProfileId, + NetworkId receiverNetworkId, + @Nullable String text, + Optional citation, + long date, + boolean wasEdited, + Optional mediator, + ChatMessageType chatMessageType, + Optional bisqEasyOffer, + Set reactions) { super(messageId, chatChannelDomain, channelId, senderUserProfile, receiverUserProfileId, receiverNetworkId, text, citation, date, wasEdited, chatMessageType, reactions); this.tradeId = tradeId; @@ -222,8 +222,7 @@ public boolean canShowReactions() { } @Override - public void addChatMessageReaction(ChatMessageReaction newReaction) { - BisqEasyOpenTradeMessageReaction newBisqEasyOpenTradeReaction = (BisqEasyOpenTradeMessageReaction) newReaction; - addPrivateChatMessageReaction(newBisqEasyOpenTradeReaction); + public boolean addChatMessageReaction(ChatMessageReaction chatMessageReaction) { + return addPrivateChatMessageReaction((BisqEasyOpenTradeMessageReaction) chatMessageReaction); } } diff --git a/chat/src/main/java/bisq/chat/priv/PrivateChatChannelService.java b/chat/src/main/java/bisq/chat/priv/PrivateChatChannelService.java index 7c12002a15..e1e3560e63 100644 --- a/chat/src/main/java/bisq/chat/priv/PrivateChatChannelService.java +++ b/chat/src/main/java/bisq/chat/priv/PrivateChatChannelService.java @@ -261,4 +261,9 @@ protected abstract R createAndGetNewPrivateChatMessageReaction(M message, Reaction reaction, String messageReactionId, boolean isRemoved); + + @Override + protected void checkRateLimit(String authorUserProfileId, long messageDate) { + // For private messages we don't check, as user can ignore anyway the peer. + } } diff --git a/chat/src/main/java/bisq/chat/priv/PrivateChatMessage.java b/chat/src/main/java/bisq/chat/priv/PrivateChatMessage.java index 06035e4ea4..be37847fec 100644 --- a/chat/src/main/java/bisq/chat/priv/PrivateChatMessage.java +++ b/chat/src/main/java/bisq/chat/priv/PrivateChatMessage.java @@ -130,19 +130,22 @@ public NetworkId getReceiver() { return receiverNetworkId; } - public void addPrivateChatMessageReaction(R newReaction) { - getChatMessageReactions().stream() + public boolean addPrivateChatMessageReaction(R newReaction) { + Optional existingReaction = getChatMessageReactions().stream() .filter(privateChatReaction -> privateChatReaction.matches(newReaction)) - .findFirst() - .ifPresentOrElse( - existingPrivateChatReaction -> { - if (newReaction.getDate() > existingPrivateChatReaction.getDate()) { - // only update if more recent - getChatMessageReactions().remove(existingPrivateChatReaction); - getChatMessageReactions().add(newReaction); - } - }, - () -> getChatMessageReactions().add(newReaction) - ); + .findFirst(); + if (existingReaction.isPresent()) { + R existingPrivateChatReaction = existingReaction.get(); + if (newReaction.getDate() > existingPrivateChatReaction.getDate()) { + // only update if more recent + getChatMessageReactions().remove(existingPrivateChatReaction); + return getChatMessageReactions().add(newReaction); + } else { + // Ignore older reaction + return false; + } + } else { + return getChatMessageReactions().add(newReaction); + } } } diff --git a/chat/src/main/java/bisq/chat/pub/PublicChatChannelService.java b/chat/src/main/java/bisq/chat/pub/PublicChatChannelService.java index a416f4e19a..7bc7a2c39b 100644 --- a/chat/src/main/java/bisq/chat/pub/PublicChatChannelService.java +++ b/chat/src/main/java/bisq/chat/pub/PublicChatChannelService.java @@ -20,10 +20,13 @@ import bisq.chat.*; import bisq.chat.reactions.ChatMessageReaction; import bisq.chat.reactions.Reaction; +import bisq.common.observable.Pin; import bisq.network.NetworkService; import bisq.network.identity.NetworkIdWithKeyPair; +import bisq.network.p2p.ServiceNode; import bisq.network.p2p.services.data.BroadcastResult; import bisq.network.p2p.services.data.DataService; +import bisq.network.p2p.services.data.inventory.InventoryService; import bisq.network.p2p.services.data.storage.DistributedData; import bisq.network.p2p.services.data.storage.auth.AuthenticatedData; import bisq.persistence.PersistableStore; @@ -33,7 +36,9 @@ import lombok.extern.slf4j.Slf4j; import java.security.KeyPair; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static com.google.common.base.Preconditions.checkArgument; @@ -42,6 +47,10 @@ public abstract class PublicChatChannelService, S extends PersistableStore, R extends ChatMessageReaction> extends ChatChannelService implements DataService.Listener { + private boolean initialized = false; + private boolean allInventoryDataReceived = false; + private final Set allInventoryDataReceivedPins = new HashSet<>(); + public PublicChatChannelService(NetworkService networkService, UserService userService, ChatChannelDomain chatChannelDomain) { @@ -61,13 +70,26 @@ public CompletableFuture initialize() { networkService.getDataService().ifPresent(dataService -> dataService.getAuthenticatedData().forEach(this::handleAuthenticatedDataAdded)); + networkService.getSupportedTransportTypes().forEach(type -> + networkService.getServiceNodesByTransport().findServiceNode(type) + .flatMap(ServiceNode::getInventoryService).stream() + .map(InventoryService::getInventoryRequestService) + .forEach(inventoryRequestService -> { + Pin pin = inventoryRequestService.getAllDataReceived().addObserver(allDataReceived -> { + if (allDataReceived) { + allInventoryDataReceived = true; + } + }); + allInventoryDataReceivedPins.add(pin); + })); + initialized= true; return CompletableFuture.completedFuture(true); } - protected abstract void handleAuthenticatedDataAdded(AuthenticatedData authenticatedData); - @Override public CompletableFuture shutdown() { + allInventoryDataReceivedPins.forEach(Pin::unbind); + allInventoryDataReceivedPins.clear(); networkService.removeDataServiceListener(this); return CompletableFuture.completedFuture(true); } @@ -87,12 +109,21 @@ public CompletableFuture publishChatMessage(String text, public CompletableFuture publishChatMessage(M message, UserIdentity userIdentity) { - if (bannedUserService.isUserProfileBanned(message.getAuthorUserProfileId())) { + String authorUserProfileId = message.getAuthorUserProfileId(); + + // For rate limit violation we let the user know that his message was not sent, by not inserting the message. + if (bannedUserService.isRateLimitExceeding(authorUserProfileId)) { return CompletableFuture.failedFuture(new RuntimeException()); } + // Sender adds the message at sending to avoid the delayed display if using the received message from the network. findChannel(message.getChannelId()).ifPresent(channel -> addMessage(message, channel)); + // For banned users we hide that their message is not published by inserting it to their local message list. + if (bannedUserService.isUserProfileBanned(authorUserProfileId)) { + return CompletableFuture.failedFuture(new RuntimeException()); + } + KeyPair keyPair = userIdentity.getNetworkIdWithKeyPair().getKeyPair(); return networkService.publishAuthenticatedData(message, keyPair); } @@ -108,7 +139,8 @@ public CompletableFuture publishEditedChatMessage(M originalCha }); } - public CompletableFuture deleteChatMessage(M chatMessage, NetworkIdWithKeyPair networkIdWithKeyPair) { + public CompletableFuture deleteChatMessage(M chatMessage, + NetworkIdWithKeyPair networkIdWithKeyPair) { return networkService.removeAuthenticatedData(chatMessage, networkIdWithKeyPair.getKeyPair()); } @@ -129,7 +161,8 @@ public CompletableFuture publishChatMessageReaction(M message, return networkService.publishAuthenticatedData((DistributedData) chatMessageReaction, keyPair); } - public CompletableFuture deleteChatMessageReaction(R chatMessageReaction, NetworkIdWithKeyPair networkIdWithKeyPair) { + public CompletableFuture deleteChatMessageReaction(R chatMessageReaction, + NetworkIdWithKeyPair networkIdWithKeyPair) { checkArgument(chatMessageReaction instanceof DistributedData, "A public chat message reaction needs to implement DistributedData."); return networkService.removeAuthenticatedData((DistributedData) chatMessageReaction, networkIdWithKeyPair.getKeyPair()); } @@ -163,6 +196,8 @@ protected void removeMessage(M message, C channel) { persist(); } + protected abstract void handleAuthenticatedDataAdded(AuthenticatedData authenticatedData); + protected abstract M createChatMessage(String text, Optional citation, C publicChannel, @@ -189,4 +224,13 @@ protected void processRemovedReaction(R chatMessageReaction) { } protected abstract R createChatMessageReaction(M message, Reaction reaction, UserIdentity userIdentity); + + + protected void checkRateLimit(String authorUserProfileId, long messageDate) { + // If we receive the message from the network after inventory requests are completed, we use our local receive time. + // Otherwise, at batch processing inventory data we use the senders date. + // Using the senders date for all cases would add risk for abuse by manipulating the date. + long timestamp = allInventoryDataReceived && initialized ? System.currentTimeMillis() : messageDate; + bannedUserService.checkRateLimit(authorUserProfileId, timestamp); + } } diff --git a/chat/src/main/java/bisq/chat/pub/PublicChatMessage.java b/chat/src/main/java/bisq/chat/pub/PublicChatMessage.java index cc57faa7bb..9fe2772ce8 100644 --- a/chat/src/main/java/bisq/chat/pub/PublicChatMessage.java +++ b/chat/src/main/java/bisq/chat/pub/PublicChatMessage.java @@ -71,7 +71,7 @@ public boolean isDataInvalid(byte[] pubKeyHash) { } @Override - public void addChatMessageReaction(ChatMessageReaction reaction) { - getChatMessageReactions().add(reaction); + public boolean addChatMessageReaction(ChatMessageReaction reaction) { + return getChatMessageReactions().add(reaction); } } diff --git a/chat/src/main/java/bisq/chat/two_party/TwoPartyPrivateChatMessage.java b/chat/src/main/java/bisq/chat/two_party/TwoPartyPrivateChatMessage.java index b389d165d4..2d92fe4523 100644 --- a/chat/src/main/java/bisq/chat/two_party/TwoPartyPrivateChatMessage.java +++ b/chat/src/main/java/bisq/chat/two_party/TwoPartyPrivateChatMessage.java @@ -136,8 +136,7 @@ public boolean canShowReactions() { } @Override - public void addChatMessageReaction(ChatMessageReaction newReaction) { - TwoPartyPrivateChatMessageReaction newTwoPartyReaction = (TwoPartyPrivateChatMessageReaction) newReaction; - addPrivateChatMessageReaction(newTwoPartyReaction); + public boolean addChatMessageReaction(ChatMessageReaction chatMessageReaction) { + return addPrivateChatMessageReaction((TwoPartyPrivateChatMessageReaction) chatMessageReaction); } } diff --git a/common/src/main/java/bisq/common/timer/Clock.java b/common/src/main/java/bisq/common/timer/Clock.java new file mode 100644 index 0000000000..f3ba2b5ac3 --- /dev/null +++ b/common/src/main/java/bisq/common/timer/Clock.java @@ -0,0 +1,22 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.common.timer; + +public interface Clock { + long now(); +} diff --git a/common/src/main/java/bisq/common/timer/RateLimiter.java b/common/src/main/java/bisq/common/timer/RateLimiter.java new file mode 100644 index 0000000000..4935c56b1c --- /dev/null +++ b/common/src/main/java/bisq/common/timer/RateLimiter.java @@ -0,0 +1,93 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.common.timer; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class RateLimiter { + private final static long SECOND = 1000L; + private final static long MINUTE = 60 * SECOND; + private final static long HOUR = 60 * MINUTE; + private final static long DAY = 24 * HOUR; + + private final static int DEFAULT_MAX_PER_SECOND = 3; + private final static int DEFAULT_MAX_PER_MINUTE = 30; + private final static int DEFAULT_MAX_PER_HOUR = 120; + private final static int DEFAULT_MAX_PER_DAY = 300; + + private final Clock clock; + private final int maxPerSecond; + private final int maxPerMinute; + private final int maxPerHour; + private final int maxPerDay; + + private final Map> timestampsByUserProfileId = new ConcurrentHashMap<>(); + + public RateLimiter() { + this(DEFAULT_MAX_PER_SECOND, DEFAULT_MAX_PER_MINUTE, DEFAULT_MAX_PER_HOUR, DEFAULT_MAX_PER_DAY); + } + + public RateLimiter(int maxPerSecond, int maxPerMinute, int maxPerHour, int maxPerDay) { + this(new SystemClock(), maxPerSecond, maxPerMinute, maxPerHour, maxPerDay); + } + + public RateLimiter(Clock clock, int maxPerSecond, int maxPerMinute, int maxPerHour, int maxPerDay) { + this.clock = clock; + this.maxPerSecond = maxPerSecond; + this.maxPerMinute = maxPerMinute; + this.maxPerHour = maxPerHour; + this.maxPerDay = maxPerDay; + } + + public boolean exceedsLimit(String userProfileId) { + return exceedsLimit(userProfileId, clock.now()); + } + + public boolean exceedsLimit(String userProfileId, long timeStamp) { + timestampsByUserProfileId.putIfAbsent(userProfileId, new ArrayDeque<>()); + Deque timestamps = timestampsByUserProfileId.get(userProfileId); + synchronized (timestamps) { + timestamps.addLast(timeStamp); + + while (!timestamps.isEmpty() && timeStamp - timestamps.peekFirst() > DAY) { + timestamps.pollFirst(); + } + + long now = clock.now(); + long countLastSecond = timestamps.stream().filter(t -> now - t <= SECOND).count(); + long countLastMinute = timestamps.stream().filter(t -> now - t <= MINUTE).count(); + long countLastHour = timestamps.stream().filter(t -> now - t <= HOUR).count(); + long countLastDay = timestamps.stream().filter(t -> now - t <= DAY).count(); + if (countLastSecond > maxPerSecond || + countLastMinute > maxPerMinute || + countLastHour > maxPerHour || + countLastDay > maxPerDay) { + log.info("Rate limit exceeded: countLastSecond {}, countLastMinute {}, countLastHour {}, countLastDay={}", countLastSecond, countLastMinute, countLastHour, countLastDay); + return true; + } + } + + return false; + } +} diff --git a/common/src/main/java/bisq/common/timer/SystemClock.java b/common/src/main/java/bisq/common/timer/SystemClock.java new file mode 100644 index 0000000000..70c18992f9 --- /dev/null +++ b/common/src/main/java/bisq/common/timer/SystemClock.java @@ -0,0 +1,26 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.common.timer; + +public class SystemClock implements Clock { + @Override + public long now() { + return System.currentTimeMillis(); + } +} + diff --git a/common/src/test/java/bisq/common/timer/MockClock.java b/common/src/test/java/bisq/common/timer/MockClock.java new file mode 100644 index 0000000000..6c9028ec0e --- /dev/null +++ b/common/src/test/java/bisq/common/timer/MockClock.java @@ -0,0 +1,36 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.common.timer; + +public class MockClock implements Clock { + private long currentTime; + + public void setTime(long currentTime) { + this.currentTime = currentTime; + } + + @Override + public long now() { + return currentTime; + } + + public void advanceTime(long millis) { + this.currentTime += millis; + } +} + diff --git a/common/src/test/java/bisq/common/timer/RateLimiterTest.java b/common/src/test/java/bisq/common/timer/RateLimiterTest.java new file mode 100644 index 0000000000..aad7e63086 --- /dev/null +++ b/common/src/test/java/bisq/common/timer/RateLimiterTest.java @@ -0,0 +1,94 @@ +package bisq.common.timer; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class RateLimiterTest { + @SneakyThrows + @Test + public void testRateLimiter() { + MockClock clock = new MockClock(); + clock.setTime(0); + RateLimiter rateLimiter = new RateLimiter(clock, 5, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); + String userId = "user123"; + for (int i = 0; i < 5; i++) { + clock.advanceTime(200); + assertFalse(rateLimiter.exceedsLimit(userId)); + } + // Time is now 1000 ms, so 6th message exceeds limit + assertTrue(rateLimiter.exceedsLimit(userId)); + + // We pass another second. This cleans up the first 4 entries. We have only 2 now + clock.advanceTime(1000); + assertFalse(rateLimiter.exceedsLimit(userId)); + // We added the 3rd item for the last second + assertFalse(rateLimiter.exceedsLimit(userId)); + assertFalse(rateLimiter.exceedsLimit(userId)); + // 6th fails again + assertTrue(rateLimiter.exceedsLimit(userId)); + + // Reset + clock.setTime(0); + rateLimiter = new RateLimiter(clock, Integer.MAX_VALUE, 20, Integer.MAX_VALUE, Integer.MAX_VALUE); + // Simulate 20 messages in 1 minute + for (int i = 0; i < 20; i++) { + clock.advanceTime(3000); + assertFalse(rateLimiter.exceedsLimit(userId)); + } + + // 21st message within the same minute should be spam + assertTrue(rateLimiter.exceedsLimit(userId)); + + // Reset + clock.setTime(0); + rateLimiter = new RateLimiter(clock, Integer.MAX_VALUE, Integer.MAX_VALUE, 100, Integer.MAX_VALUE); + for (int i = 0; i < 100; i++) { + clock.advanceTime(36); + assertFalse(rateLimiter.exceedsLimit(userId)); + } + + // 101st message within the same minute should be spam + assertTrue(rateLimiter.exceedsLimit(userId)); + + // Reset + clock.setTime(0); + rateLimiter = new RateLimiter(clock, 5, 20, 100, Integer.MAX_VALUE); + for (int i = 0; i < 100; i++) { + clock.advanceTime(3000); + if (i < 20) { + assertFalse(rateLimiter.exceedsLimit(userId)); + } else { + assertTrue(rateLimiter.exceedsLimit(userId)); + } + } + + // Different users or each message + clock.setTime(0); + rateLimiter = new RateLimiter(clock, 5, 20, 100, Integer.MAX_VALUE); + for (int i = 0; i < 100; i++) { + clock.advanceTime(3000); + assertFalse(rateLimiter.exceedsLimit("user" + i)); + } + + // One user with 20 messages, then diff. users + clock.setTime(0); + rateLimiter = new RateLimiter(clock, 5, 20, 100, Integer.MAX_VALUE); + for (int i = 0; i < 100; i++) { + clock.advanceTime(3000); + int userIndex = i < 20 ? 1 : i; + assertFalse(rateLimiter.exceedsLimit("user" + i)); + } + + // Using system clock, only the first 5 messages pass. + rateLimiter = new RateLimiter(); + for (int i = 0; i < 100; i++) { + if (i < 5) { + assertFalse(rateLimiter.exceedsLimit(userId)); + } else { + assertTrue(rateLimiter.exceedsLimit(userId)); + } + } + } +} \ No newline at end of file diff --git a/support/src/main/java/bisq/support/SupportService.java b/support/src/main/java/bisq/support/SupportService.java index ca6e0ca6fe..d41bd628c7 100644 --- a/support/src/main/java/bisq/support/SupportService.java +++ b/support/src/main/java/bisq/support/SupportService.java @@ -24,6 +24,7 @@ import bisq.persistence.PersistenceService; import bisq.support.mediation.MediationRequestService; import bisq.support.mediation.MediatorService; +import bisq.support.moderator.ModerationRequestService; import bisq.support.moderator.ModeratorService; import bisq.support.release_manager.ReleaseManagerService; import bisq.support.security_manager.SecurityManagerService; @@ -42,6 +43,7 @@ public class SupportService implements Service { private final ModeratorService moderatorService; private final ReleaseManagerService releaseManagerService; private final MediatorService mediatorService; + private final ModerationRequestService moderationRequestService; @Getter @ToString @@ -94,6 +96,9 @@ public SupportService(SupportService.Config config, userService, bondedRolesService, chatService); + moderationRequestService = new ModerationRequestService(networkService, + userService, + bondedRolesService); } @@ -106,6 +111,7 @@ public CompletableFuture initialize() { log.info("initialize"); return mediationRequestService.initialize() .thenCompose(result -> mediatorService.initialize()) + .thenCompose(result -> moderationRequestService.initialize()) .thenCompose(result -> moderatorService.initialize()) .thenCompose(result -> releaseManagerService.initialize()) .thenCompose(result -> securityManagerService.initialize()); @@ -116,6 +122,7 @@ public CompletableFuture shutdown() { log.info("shutdown"); return mediationRequestService.shutdown() .thenCompose(result -> mediatorService.shutdown()) + .thenCompose(result -> moderationRequestService.shutdown()) .thenCompose(result -> moderatorService.shutdown()) .thenCompose(result -> releaseManagerService.shutdown()) .thenCompose(result -> securityManagerService.shutdown()); diff --git a/support/src/main/java/bisq/support/moderator/ModerationRequestService.java b/support/src/main/java/bisq/support/moderator/ModerationRequestService.java new file mode 100644 index 0000000000..d673e49da8 --- /dev/null +++ b/support/src/main/java/bisq/support/moderator/ModerationRequestService.java @@ -0,0 +1,69 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.support.moderator; + +import bisq.bonded_roles.BondedRoleType; +import bisq.bonded_roles.BondedRolesService; +import bisq.bonded_roles.bonded_role.AuthorizedBondedRolesService; +import bisq.chat.ChatChannelDomain; +import bisq.common.application.Service; +import bisq.network.NetworkService; +import bisq.network.identity.NetworkIdWithKeyPair; +import bisq.user.UserService; +import bisq.user.banned.BannedUserService; +import bisq.user.identity.UserIdentity; +import bisq.user.identity.UserIdentityService; +import bisq.user.profile.UserProfile; +import lombok.extern.slf4j.Slf4j; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class ModerationRequestService implements Service { + private final NetworkService networkService; + private final UserIdentityService userIdentityService; + private final AuthorizedBondedRolesService authorizedBondedRolesService; + private final BannedUserService bannedUserService; + + public ModerationRequestService(NetworkService networkService, + UserService userService, + BondedRolesService bondedRolesService) { + this.networkService = networkService; + userIdentityService = userService.getUserIdentityService(); + authorizedBondedRolesService = bondedRolesService.getAuthorizedBondedRolesService(); + bannedUserService = userService.getBannedUserService(); + } + + public void reportUserProfile(UserProfile accusedUserProfile, String message) { + UserIdentity myUserIdentity = userIdentityService.getSelectedUserIdentity(); + checkArgument(!bannedUserService.isUserProfileBanned(myUserIdentity.getUserProfile())); + String reportSenderUserProfileId = myUserIdentity.getUserProfile().getId(); + NetworkIdWithKeyPair senderNetworkIdWithKeyPair = myUserIdentity.getNetworkIdWithKeyPair(); + long date = System.currentTimeMillis(); + authorizedBondedRolesService.getAuthorizedBondedRoleStream() + .filter(e -> e.getBondedRoleType() == BondedRoleType.MODERATOR) + .forEach(bondedRole -> { + ReportToModeratorMessage report = new ReportToModeratorMessage(date, + reportSenderUserProfileId, + accusedUserProfile, + message, + ChatChannelDomain.DISCUSSION); + networkService.confidentialSend(report, bondedRole.getNetworkId(), senderNetworkIdWithKeyPair); + }); + } +} \ No newline at end of file diff --git a/support/src/main/java/bisq/support/moderator/ModeratorService.java b/support/src/main/java/bisq/support/moderator/ModeratorService.java index 18a3ba8bfa..d7ab8824be 100644 --- a/support/src/main/java/bisq/support/moderator/ModeratorService.java +++ b/support/src/main/java/bisq/support/moderator/ModeratorService.java @@ -19,7 +19,9 @@ import bisq.bonded_roles.BondedRoleType; import bisq.bonded_roles.BondedRolesService; +import bisq.bonded_roles.bonded_role.AuthorizedBondedRole; import bisq.bonded_roles.bonded_role.AuthorizedBondedRolesService; +import bisq.bonded_roles.bonded_role.BondedRole; import bisq.chat.ChatChannelDomain; import bisq.chat.ChatChannelSelectionService; import bisq.chat.ChatService; @@ -27,11 +29,12 @@ import bisq.chat.two_party.TwoPartyPrivateChatChannelService; import bisq.common.application.Service; import bisq.common.observable.Observable; +import bisq.common.observable.Pin; +import bisq.common.observable.collection.CollectionObserver; import bisq.common.observable.collection.ObservableSet; import bisq.i18n.Res; import bisq.network.NetworkService; import bisq.network.SendMessageResult; -import bisq.network.identity.NetworkIdWithKeyPair; import bisq.network.p2p.message.EnvelopePayloadMessage; import bisq.network.p2p.services.confidential.ConfidentialMessageService; import bisq.network.p2p.services.data.BroadcastResult; @@ -45,15 +48,15 @@ import bisq.user.identity.UserIdentity; import bisq.user.identity.UserIdentityService; import bisq.user.profile.UserProfile; +import bisq.user.profile.UserProfileService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.security.KeyPair; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; -import static com.google.common.base.Preconditions.checkArgument; - @Slf4j public class ModeratorService implements PersistenceClient, Service, ConfidentialMessageService.Listener { @Getter @@ -69,20 +72,22 @@ public static ModeratorService.Config from(com.typesafe.config.Config config) { } } + private final NetworkService networkService; + private final ChatService chatService; + private final UserIdentityService userIdentityService; + private final TwoPartyPrivateChatChannelService twoPartyPrivateChatChannelService; + private final BannedUserService bannedUserService; + private final UserProfileService userProfileService; + private final AuthorizedBondedRolesService authorizedBondedRolesService; + private final boolean staticPublicKeysProvided; + @Getter private final ModeratorStore persistableStore = new ModeratorStore(); @Getter private final Persistence persistence; - private final NetworkService networkService; - @Getter private final Observable hasNotificationSenderIdentity = new Observable<>(); - private final AuthorizedBondedRolesService authorizedBondedRolesService; - private final UserIdentityService userIdentityService; - private final TwoPartyPrivateChatChannelService twoPartyPrivateChatChannelService; - private final BannedUserService bannedUserService; - private final boolean staticPublicKeysProvided; - private final ChatService chatService; + private Pin rateLimitExceedingUserProfileIdMapPin; public ModeratorService(ModeratorService.Config config, PersistenceService persistenceService, @@ -90,14 +95,16 @@ public ModeratorService(ModeratorService.Config config, UserService userService, BondedRolesService bondedRolesService, ChatService chatService) { - persistence = persistenceService.getOrCreatePersistence(this, DbSubDirectory.PRIVATE, persistableStore); this.networkService = networkService; + this.chatService = chatService; userIdentityService = userService.getUserIdentityService(); - authorizedBondedRolesService = bondedRolesService.getAuthorizedBondedRolesService(); twoPartyPrivateChatChannelService = chatService.getTwoPartyPrivateChatChannelService(); bannedUserService = userService.getBannedUserService(); + userProfileService = userService.getUserProfileService(); + authorizedBondedRolesService = bondedRolesService.getAuthorizedBondedRolesService(); staticPublicKeysProvided = config.isStaticPublicKeysProvided(); - this.chatService = chatService; + + persistence = persistenceService.getOrCreatePersistence(this, DbSubDirectory.PRIVATE, persistableStore); } @@ -108,11 +115,18 @@ public ModeratorService(ModeratorService.Config config, @Override public CompletableFuture initialize() { networkService.addConfidentialMessageListener(this); + + addObserverIfModerator(); + return CompletableFuture.completedFuture(true); } @Override public CompletableFuture shutdown() { + if (rateLimitExceedingUserProfileIdMapPin != null) { + rateLimitExceedingUserProfileIdMapPin.unbind(); + rateLimitExceedingUserProfileIdMapPin = null; + } networkService.removeConfidentialMessageListener(this); return CompletableFuture.completedFuture(true); } @@ -134,29 +148,6 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage) { // API /* --------------------------------------------------------------------- */ - public ObservableSet getReportToModeratorMessages() { - return persistableStore.getReportToModeratorMessages(); - } - - public void reportUserProfile(UserProfile accusedUserProfile, String message) { - ChatChannelDomain chatChannelDomain= ChatChannelDomain.DISCUSSION; - UserIdentity myUserIdentity = userIdentityService.getSelectedUserIdentity(); - checkArgument(!bannedUserService.isUserProfileBanned(myUserIdentity.getUserProfile())); - - NetworkIdWithKeyPair senderNetworkIdWithKeyPair = myUserIdentity.getNetworkIdWithKeyPair(); - long date = System.currentTimeMillis(); - authorizedBondedRolesService.getAuthorizedBondedRoleStream().filter(e -> e.getBondedRoleType() == BondedRoleType.MODERATOR) - .forEach(bondedRole -> { - String reportSenderUserProfileId = myUserIdentity.getUserProfile().getId(); - ReportToModeratorMessage report = new ReportToModeratorMessage(date, - reportSenderUserProfileId, - accusedUserProfile, - message, - chatChannelDomain); - networkService.confidentialSend(report, bondedRole.getNetworkId(), senderNetworkIdWithKeyPair); - }); - } - public void deleteReportToModeratorMessage(ReportToModeratorMessage reportToModeratorMessage) { getReportToModeratorMessages().remove(reportToModeratorMessage); persist(); @@ -205,6 +196,10 @@ public CompletableFuture contactUser(UserProfile userProfile, .orElse(CompletableFuture.failedFuture(new RuntimeException("No channel found"))); } + public ObservableSet getReportToModeratorMessages() { + return persistableStore.getReportToModeratorMessages(); + } + private void processReportToModeratorMessage(ReportToModeratorMessage message) { if (bannedUserService.isUserProfileBanned(message.getReporterUserProfileId())) { log.warn("Message ignored as sender is banned"); @@ -213,4 +208,60 @@ private void processReportToModeratorMessage(ReportToModeratorMessage message) { getReportToModeratorMessages().add(message); persist(); } + + private void addObserverIfModerator() { + Set myUserProfileIds = userIdentityService.getMyUserProfileIds(); + authorizedBondedRolesService.getBondedRoles().addObserver(new CollectionObserver<>() { + @Override + public void add(BondedRole bondedRole) { + AuthorizedBondedRole authorizedBondedRole = bondedRole.getAuthorizedBondedRole(); + boolean isMyProfile = myUserProfileIds.contains(authorizedBondedRole.getProfileId()); + if (authorizedBondedRole.getBondedRoleType() == BondedRoleType.MODERATOR && + isMyProfile && + rateLimitExceedingUserProfileIdMapPin == null) { + rateLimitExceedingUserProfileIdMapPin = bannedUserService.getRateLimitExceedingUserProfiles().addObserver(new CollectionObserver<>() { + @Override + public void add(String userProfileId) { + selfReportRateLimitExceedingUserProfileId(userProfileId); + } + + @Override + public void remove(Object element) { + } + + @Override + public void clear() { + } + }); + } + } + + @Override + public void remove(Object element) { + } + + @Override + public void clear() { + } + }); + } + + private void selfReportRateLimitExceedingUserProfileId(String userProfileId) { + ObservableSet reportToModeratorMessages = getReportToModeratorMessages(); + boolean notYetReported = reportToModeratorMessages.stream() + .noneMatch(message -> message.getAccusedUserProfile().getId().equals(userProfileId)); + if (notYetReported) { + userProfileService.findUserProfile(userProfileId) + .ifPresent(accusedUserProfile -> { + String myUserProfileId = userIdentityService.getSelectedUserIdentity().getUserProfile().getId(); + ReportToModeratorMessage report = new ReportToModeratorMessage(System.currentTimeMillis(), + myUserProfileId, + accusedUserProfile, + "Moderator self-reported user who exceeded message rate limit", // Only for moderator, thus not translated + ChatChannelDomain.DISCUSSION); + reportToModeratorMessages.add(report); + persist(); + }); + } + } } \ No newline at end of file diff --git a/user/src/main/java/bisq/user/banned/BannedUserService.java b/user/src/main/java/bisq/user/banned/BannedUserService.java index 4110404e8b..40a8b851ff 100644 --- a/user/src/main/java/bisq/user/banned/BannedUserService.java +++ b/user/src/main/java/bisq/user/banned/BannedUserService.java @@ -21,6 +21,7 @@ import bisq.bonded_roles.bonded_role.AuthorizedBondedRolesService; import bisq.common.application.Service; import bisq.common.observable.collection.ObservableSet; +import bisq.common.timer.RateLimiter; import bisq.network.identity.NetworkId; import bisq.network.p2p.services.data.storage.auth.authorized.AuthorizedData; import bisq.persistence.DbSubDirectory; @@ -31,15 +32,20 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; @Slf4j public class BannedUserService implements PersistenceClient, Service, AuthorizedBondedRolesService.Listener { + private final AuthorizedBondedRolesService authorizedBondedRolesService; @Getter private final BannedUserStore persistableStore = new BannedUserStore(); @Getter private final Persistence persistence; - private final AuthorizedBondedRolesService authorizedBondedRolesService; + @Getter + private final ObservableSet rateLimitExceedingUserProfiles = new ObservableSet<>(); + private final RateLimiter rateLimiter = new RateLimiter(); public BannedUserService(PersistenceService persistenceService, AuthorizedBondedRolesService authorizedBondedRolesService) { @@ -113,11 +119,34 @@ public boolean isNetworkIdBanned(NetworkId networkId) { .anyMatch(e -> e.getUserProfile().getNetworkId().equals(networkId)); } + public void checkRateLimit(String userProfileId, long timeStamp) { + boolean exceedsLimit = rateLimiter.exceedsLimit(userProfileId, timeStamp); + if (exceedsLimit) { + log.warn("User with profile ID {} exceeded rate limit.", userProfileId); + rateLimitExceedingUserProfiles.remove(userProfileId); // For triggering observable update + rateLimitExceedingUserProfiles.add(userProfileId); + persist(); + } + } + + public boolean isRateLimitExceeding(String userProfileId) { + refresh(userProfileId); + return rateLimitExceedingUserProfiles.contains(userProfileId); + } + /* --------------------------------------------------------------------- */ // Private /* --------------------------------------------------------------------- */ + private void refresh(String userProfileId) { + Set clone = new HashSet<>(rateLimitExceedingUserProfiles); + clone.stream().filter(e -> e.equals(userProfileId)) + .filter(e -> !rateLimiter.exceedsLimit(e)) // Time window has moved so we are not exceeding anymore + .findAny() + .ifPresent(rateLimitExceedingUserProfiles::remove); + } + private boolean isAuthorized(AuthorizedData authorizedData) { return authorizedBondedRolesService.hasAuthorizedPubKey(authorizedData, BondedRoleType.MODERATOR); }