diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index eff3d0990d..4cad6e53d5 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -31,6 +31,7 @@ type WakuFilter* = ref object of LPProtocol # a mapping of peer ids to a sequence of filter criteria peerManager: PeerManager maintenanceTask: TimerCallback + messageArchive: Table[PeerID, HashSet[string]] proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = trace "pinging subscriber", peerId = peerId @@ -185,9 +186,21 @@ proc pushToPeers( let bufferToPublish = messagePush.encode().buffer var pushFuts: seq[Future[void]] + var skipMessageToPeers: seq[PeerId] for peerId in peers: - let pushFut = wf.pushToPeer(peerId, bufferToPublish) - pushFuts.add(pushFut) + if not wf.messageArchive.hasKey(peerId): + wf.messageArchive[peerId] = initHashSet[string]() + + if not wf.messageArchive[peerId].contains(msgHash): + wf.messageArchive[peerId].incl(msgHash) + let pushFut = wf.pushToPeer(peerId, bufferToPublish) + pushFuts.add(pushFut) + else: + skipMessageToPeers.add(peerId) + + if skipMessageToPeers.len > 0: + notice "skipping message to these peers: duplicate message detected", + peer_ids = skipMessageToPeers, msg_hash = msgHash await allFutures(pushFuts)