diff --git a/Makefile b/Makefile index 8575e1fc1f..718115f932 100644 --- a/Makefile +++ b/Makefile @@ -37,10 +37,10 @@ endif ########## ## Main ## ########## -.PHONY: all test update clean negentropy +.PHONY: all test update clean # default target, because it's the first one that doesn't start with '.' -all: | negentropy wakunode2 example2 chat2 chat2bridge libwaku +all: | wakunode2 example2 chat2 chat2bridge libwaku test: | testcommon testwaku @@ -51,7 +51,7 @@ update: | update-common rm -rf waku.nims && \ $(MAKE) waku.nims $(HANDLE_OUTPUT) -clean: | negentropy-clean +clean: rm -rf build # must be included after the default target @@ -84,8 +84,6 @@ endif endif ## end of Heaptracker options -## Pass libnegentropy to linker. -NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so ################## ## Dependencies ## @@ -412,9 +410,22 @@ release-notes: sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, # asked here: https://github.com/bvieira/sv4git/discussions/101 + +###################### +### NEGENTROPY ### +###################### +.PHONY: negentropy + +## Pass libnegentropy to linker. +NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so + +all: | negentropy + +clean: | negentropy-clean + negentropy: $(MAKE) -C vendor/negentropy/cpp && \ cp vendor/negentropy/cpp/libnegentropy.so ./ negentropy-clean: $(MAKE) -C vendor/negentropy/cpp clean && \ - rm libnegentropy.so + rm libnegentropy.so \ No newline at end of file diff --git a/tests/node/test_wakunode_sync.nim b/tests/node/test_wakunode_sync.nim new file mode 100644 index 0000000000..ce09ec77c7 --- /dev/null +++ b/tests/node/test_wakunode_sync.nim @@ -0,0 +1,194 @@ +{.used.} + +import std/net, testutils/unittests, chronos, libp2p/crypto/crypto + +import + ../../../waku/ + [node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync], + ../waku_store/store_utils, + ../waku_archive/archive_utils, + ../testlib/[wakucore, wakunode, testasync] + +suite "Store Sync - End to End": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + asyncSetup: + let timeOrigin = now() + + let messages = + @[ + fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)), + fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)), + fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)), + fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)), + fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)), + fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)), + fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)), + fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)), + fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)), + fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)), + ] + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) + client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) + + let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + + let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) + let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) + + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() + + await server.mountStore() + await client.mountStore() + + client.mountStoreClient() + server.mountStoreClient() + + let mountServerSync = await server.mountWakuSync( + maxFrameSize = 0, + syncInterval = 1.hours, + relayJitter = 0.seconds, + enablePruning = false, + ) + let mountClientSync = await client.mountWakuSync( + maxFrameSize = 0, + syncInterval = 2.milliseconds, + relayJitter = 0.seconds, + enablePruning = false, + ) + + assert mountServerSync.isOk(), mountServerSync.error + assert mountClientSync.isOk(), mountClientSync.error + + # messages are retreived when mounting Waku sync + # but based on interval so this is needed for client only + for msg in messages: + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + + await allFutures(server.start(), client.start()) + + let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo() + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec) + server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec) + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec) + server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec) + + asyncTeardown: + # prevent premature channel shutdown + await sleepAsync(10.milliseconds) + + await allFutures(client.stop(), server.stop()) + + asyncTest "no message set differences": + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "client message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "server message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + +suite "Waku Sync - Pruning": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + asyncSetup: + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) + client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) + + let serverArchiveDriver = newSqliteArchiveDriver() + let clientArchiveDriver = newSqliteArchiveDriver() + + let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) + let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) + + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() + + await server.mountStore() + await client.mountStore() + + client.mountStoreClient() + server.mountStoreClient() + + let mountServerSync = await server.mountWakuSync( + maxFrameSize = 0, + relayJitter = 0.seconds, + syncInterval = 1.hours, + enablePruning = false, + ) + let mountClientSync = await client.mountWakuSync( + maxFrameSize = 0, + syncInterval = 10.milliseconds, + relayJitter = 0.seconds, + enablePruning = true, + ) + + assert mountServerSync.isOk(), mountServerSync.error + assert mountClientSync.isOk(), mountClientSync.error + + await allFutures(server.start(), client.start()) + + asyncTeardown: + await sleepAsync(10.milliseconds) + + await allFutures(client.stop(), server.stop()) + + asyncTest "pruning": + for _ in 0 ..< 4: + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == 10 + server.wakuSync.storageSize() == 40 diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim index 0d01265aa1..236bc987b9 100644 --- a/tests/waku_sync/sync_utils.nim +++ b/tests/waku_sync/sync_utils.nim @@ -5,14 +5,23 @@ import std/options, chronos, chronicles, libp2p/crypto/crypto import ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore proc newTestWakuSync*( - switch: Switch, handler: SyncCallback + switch: Switch, + transfer: Option[TransferCallback] = none(TransferCallback), + prune: Option[PruneCallback] = none(PruneCallback), + interval: Duration = DefaultSyncInterval, ): Future[WakuSync] {.async.} = - let - peerManager = PeerManager.new(switch) - proto = WakuSync.new( - peerManager = peerManager, relayJitter = 0.seconds, syncCB = some(handler) - ) - assert proto != nil + let peerManager = PeerManager.new(switch) + + let res = await WakuSync.new( + peerManager = peerManager, + relayJitter = 0.seconds, + syncInterval = interval, + pruning = false, + wakuArchive = nil, + wakuStoreClient = nil, + ) + + let proto = res.get() proto.start() switch.mount(proto) diff --git a/tests/waku_sync/test_all.nim b/tests/waku_sync/test_all.nim index 178e9277ea..b5801e4acb 100644 --- a/tests/waku_sync/test_all.nim +++ b/tests/waku_sync/test_all.nim @@ -1,4 +1,3 @@ {.used.} -import - ./test_protocol +import ./test_protocol, ./test_bindings diff --git a/tests/waku_sync/test_bindings b/tests/waku_sync/test_bindings new file mode 100755 index 0000000000..575bb67889 Binary files /dev/null and b/tests/waku_sync/test_bindings differ diff --git a/tests/waku_sync/test_bindings.nim b/tests/waku_sync/test_bindings.nim new file mode 100644 index 0000000000..f2099ea50a --- /dev/null +++ b/tests/waku_sync/test_bindings.nim @@ -0,0 +1,141 @@ +{.used.} + +import std/[options], testutils/unittests, chronos, libp2p/crypto/crypto, std/random + +import + ../../waku/ + [node/peer_manager, waku_core, waku_core/message/digest, waku_sync/raw_bindings], + ../testlib/[wakucore], + ./sync_utils + +random.randomize() + +#TODO clean this up + +suite "Bindings": + var storage {.threadvar.}: NegentropyStorage + var messages {.threadvar.}: seq[(WakuMessageHash, WakuMessage)] + + setup: + let storageRes = NegentropyStorage.new() + assert storageRes.isOk(), $storageRes.error + storage = storageRes.get() + + messages = @[] + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + let hash = computeMessageHash(DefaultPubsubTopic, msg) + messages.add((hash, msg)) + + teardown: + storage.delete() + + test "storage insert": + check: + storage.len() == 0 + + let insRes = storage.insert(messages[0][1].timestamp, messages[0][0]) + + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 1 + + test "storage erase": + let insRes = storage.insert(messages[0][1].timestamp, messages[0][0]) + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 1 + + var delRes = storage.erase(messages[0][1].timestamp, messages[0][0]) + assert delRes.isOk() + + check: + storage.len() == 0 + + delRes = storage.erase(messages[0][1].timestamp, messages[0][0]) + assert delRes.isErr() + + check: + storage.len() == 0 + + test "subrange": + for (hash, msg) in messages: + let insRes = storage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 10 + + let subrangeRes = NegentropySubRangeStorage.new(storage) + assert subrangeRes.isOk(), subrangeRes.error + let subrange = subrangeRes.get() + + check: + subrange.len() == 10 + + #[ test "storage memory size": + for (hash, msg) in messages: + let insRes = storage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 10 + + for (hash, msg) in messages: + let delRes = storage.erase(msg.timestamp, hash) + assert delRes.isOk(), $delRes.error + + check: + storage.len() == 0 + + #TODO validate that the occupied memory didn't grow. ]# + + test "reconcile server differences": + for (hash, msg) in messages: + let insRes = storage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + let clientNegentropyRes = Negentropy.new(storage, 0) + + let storageRes = NegentropyStorage.new() + assert storageRes.isOk(), $storageRes.error + let serverStorage = storageRes.get() + + for (hash, msg) in messages: + let insRes = serverStorage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + # the extra msg + let msg = fakeWakuMessage() + let hash = computeMessageHash(DefaultPubsubTopic, msg) + let insRes = serverStorage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + let serverNegentropyRes = Negentropy.new(serverStorage, 0) + + assert clientNegentropyRes.isOk(), $clientNegentropyRes.error + assert serverNegentropyRes.isOk(), $serverNegentropyRes.error + + let clientNegentropy = clientNegentropyRes.get() + let serverNegentropy = serverNegentropyRes.get() + + let initRes = clientNegentropy.initiate() + assert initRes.isOk(), $initRes.error + let init = initRes.get() + + let reconRes = serverNegentropy.serverReconcile(init) + assert reconRes.isOk(), $reconRes.error + let srecon = reconRes.get() + + var + haves: seq[WakuMessageHash] + needs: seq[WakuMessageHash] + let creconRes = clientNegentropy.clientReconcile(srecon, haves, needs) + assert creconRes.isOk(), $creconRes.error + let reconOpt = creconRes.get() + + check: + reconOpt.isNone() + needs[0] == hash diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index cd43cde602..1d89ae6b43 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, times], + std/[options, sets], testutils/unittests, chronos, chronicles, @@ -9,18 +9,15 @@ import stew/byteutils, std/random -from std/os import sleep - import ../../waku/[ - common/paging, node/peer_manager, waku_core, waku_core/message/digest, waku_sync, waku_sync/raw_bindings, ], - ../testlib/[common, wakucore, testasync], + ../testlib/[wakucore, testasync], ./sync_utils random.randomize() @@ -29,12 +26,10 @@ suite "Waku Sync": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch - var protoHandler {.threadvar.}: SyncCallback - var server {.threadvar.}: WakuSync var client {.threadvar.}: WakuSync - var serverPeerInfo {.threadvar.}: RemotePeerInfo + var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo] asyncSetup: serverSwitch = newTestSwitch() @@ -42,28 +37,23 @@ suite "Waku Sync": await allFutures(serverSwitch.start(), clientSwitch.start()) - protoHandler = proc( - hashes: seq[WakuMessageHash], peer: RemotePeerInfo - ) {.async: (raises: []), closure, gcsafe.} = - debug "Received needHashes from peer:", len = hashes.len - for hash in hashes: - debug "Hash received from peer:", hash = hash.to0xHex() + server = await newTestWakuSync(serverSwitch) + client = await newTestWakuSync(clientSwitch) - server = await newTestWakuSync(serverSwitch, handler = protoHandler) - client = await newTestWakuSync(clientSwitch, handler = protoHandler) - - serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo()) asyncTeardown: + await sleepAsync(10.milliseconds) + await allFutures(server.stop(), client.stop()) await allFutures(serverSwitch.stop(), clientSwitch.stop()) suite "Protocol": asyncTest "sync 2 nodes both empty": - var hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) + let hashes = await client.sync(serverPeerInfo) + assert hashes.isOk(), hashes.error check: - hashes.value.len == 0 + hashes.value[0].len == 0 asyncTest "sync 2 nodes empty client full server": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -75,13 +65,13 @@ suite "Waku Sync": server.ingessMessage(DefaultPubsubTopic, msg3) var hashes = await client.sync(serverPeerInfo) - await sleepAsync(1) # to ensure graceful shutdown - require (hashes.isOk()) + + assert hashes.isOk(), hashes.error check: - hashes.value.len == 3 - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value + hashes.value[0].len == 3 + computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0] + computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0] + computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0] asyncTest "sync 2 nodes full client empty server": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -93,9 +83,9 @@ suite "Waku Sync": client.ingessMessage(DefaultPubsubTopic, msg3) var hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) + assert hashes.isOk(), hashes.error check: - hashes.value.len == 0 + hashes.value[0].len == 0 asyncTest "sync 2 nodes different hashes": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -105,40 +95,30 @@ suite "Waku Sync": client.ingessMessage(DefaultPubsubTopic, msg1) server.ingessMessage(DefaultPubsubTopic, msg2) - var hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) - check: - hashes.value.len == 1 - hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) - #Assuming message is fetched from peer - client.ingessMessage(DefaultPubsubTopic, msg2) - sleep(1000) - hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) - check: - hashes.value.len == 0 + var syncRes = await client.sync(serverPeerInfo) - #[ asyncTest "sync 2 nodes duplicate hashes": - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + check: + syncRes.isOk() - server.ingessMessage(DefaultPubsubTopic, msg1) - server.ingessMessage(DefaultPubsubTopic, msg1) - client.ingessMessage(DefaultPubsubTopic, msg1) - server.ingessMessage(DefaultPubsubTopic, msg2) + var hashes = syncRes.get() - var hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) check: - hashes.value.len == 1 - #hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + hashes[0].len == 1 + hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer client.ingessMessage(DefaultPubsubTopic, msg2) - sleep(1000) - hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) + + syncRes = await client.sync(serverPeerInfo) + + check: + syncRes.isOk() + + hashes = syncRes.get() + check: - hashes.value.len == 0 ]# + hashes[0].len == 0 + asyncTest "sync 2 nodes same hashes": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -151,13 +131,12 @@ suite "Waku Sync": let hashes = await client.sync(serverPeerInfo) assert hashes.isOk(), $hashes.error check: - hashes.value.len == 0 + hashes.value[0].len == 0 asyncTest "sync 2 nodes 100K msgs": var i = 0 let msgCount = 100000 var diffIndex = rand(msgCount) - debug "diffIndex is ", diffIndex = diffIndex var diffMsg: WakuMessage while i < msgCount: let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -166,18 +145,20 @@ suite "Waku Sync": else: diffMsg = msg server.ingessMessage(DefaultPubsubTopic, msg) - i = i + 1 + i += 1 let hashes = await client.sync(serverPeerInfo) assert hashes.isOk(), $hashes.error + check: - hashes.value.len == 1 - hashes.value[0] == computeMessageHash(DefaultPubsubTopic, diffMsg) + hashes.value[0].len == 1 + hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg) asyncTest "sync 2 nodes 100K msgs 10K diffs": var i = 0 let msgCount = 100000 var diffCount = 10000 + var diffMsgHashes: seq[WakuMessageHash] var randIndexes: seq[int] while i < diffCount: @@ -185,38 +166,39 @@ suite "Waku Sync": if randInt in randIndexes: continue randIndexes.add(randInt) - i = i + 1 + i += 1 i = 0 var tmpDiffCnt = diffCount while i < msgCount: let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) if tmpDiffCnt > 0 and i in randIndexes: - #info "not ingessing in client", i=i diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg)) tmpDiffCnt = tmpDiffCnt - 1 else: client.ingessMessage(DefaultPubsubTopic, msg) server.ingessMessage(DefaultPubsubTopic, msg) - i = i + 1 + i += 1 + let hashes = await client.sync(serverPeerInfo) assert hashes.isOk(), $hashes.error + check: - hashes.value.len == diffCount - #TODO: Check if all diffHashes are there in needHashes + hashes.value[0].len == diffCount + toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes) asyncTest "sync 3 nodes 2 client 1 server": ## Setup let client2Switch = newTestSwitch() await client2Switch.start() - let client2 = await newTestWakuSync(client2Switch, handler = protoHandler) + let client2 = await newTestWakuSync(client2Switch) let msgCount = 10000 var i = 0 while i < msgCount: - i = i + 1 + i += 1 let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) if i mod 2 == 0: client2.ingessMessage(DefaultPubsubTopic, msg) @@ -235,10 +217,8 @@ suite "Waku Sync": assert hashes2.isOk(), $hashes2.error check: - hashes1.value.len == int(msgCount / 2) - hashes2.value.len == int(msgCount / 2) - - #TODO: Check if all diffHashes are there in needHashes + hashes1.value[0].len == int(msgCount / 2) + hashes2.value[0].len == int(msgCount / 2) await client2.stop() await client2Switch.stop() @@ -259,10 +239,10 @@ suite "Waku Sync": ) let - client2 = await newTestWakuSync(client2Switch, handler = protoHandler) - client3 = await newTestWakuSync(client3Switch, handler = protoHandler) - client4 = await newTestWakuSync(client4Switch, handler = protoHandler) - client5 = await newTestWakuSync(client5Switch, handler = protoHandler) + client2 = await newTestWakuSync(client2Switch) + client3 = await newTestWakuSync(client3Switch) + client4 = await newTestWakuSync(client4Switch) + client5 = await newTestWakuSync(client5Switch) let msgCount = 100000 var i = 0 @@ -280,60 +260,55 @@ suite "Waku Sync": if i < msgCount - 10000: client5.ingessMessage(DefaultPubsubTopic, msg) server.ingessMessage(DefaultPubsubTopic, msg) - i = i + 1 - #info "client2 storage size", size = client2.storageSize() + i += 1 - var timeBefore = cpuTime() + var timeBefore = getNowInNanosecondTime() let hashes1 = await client.sync(serverPeerInfo) - var timeAfter = cpuTime() + var timeAfter = getNowInNanosecondTime() var syncTime = (timeAfter - timeBefore) - info "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime + debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime assert hashes1.isOk(), $hashes1.error check: - hashes1.value.len == 1 - #TODO: Check if all diffHashes are there in needHashes + hashes1.value[0].len == 1 - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes2 = await client2.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) - info "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime + debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime assert hashes2.isOk(), $hashes2.error check: - hashes2.value.len == 10 - #TODO: Check if all diffHashes are there in needHashes + hashes2.value[0].len == 10 - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes3 = await client3.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) - info "sync time in seconds", msgsTotal = msgCount, diff = 100, syncTime = syncTime + debug "sync time in seconds", + msgsTotal = msgCount, diff = 100, syncTime = syncTime assert hashes3.isOk(), $hashes3.error check: - hashes3.value.len == 100 - #TODO: Check if all diffHashes are there in needHashes + hashes3.value[0].len == 100 - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes4 = await client4.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) - info "sync time in seconds", + debug "sync time in seconds", msgsTotal = msgCount, diff = 1000, syncTime = syncTime assert hashes4.isOk(), $hashes4.error check: - hashes4.value.len == 1000 - #TODO: Check if all diffHashes are there in needHashes + hashes4.value[0].len == 1000 - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes5 = await client5.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) - info "sync time in seconds", + debug "sync time in seconds", msgsTotal = msgCount, diff = 10000, syncTime = syncTime assert hashes5.isOk(), $hashes5.error check: - hashes5.value.len == 10000 - #TODO: Check if all diffHashes are there in needHashes + hashes5.value[0].len == 10000 await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop()) await allFutures( @@ -351,9 +326,9 @@ suite "Waku Sync": await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start()) - let node1PeerInfo = node1Switch.peerInfo.toRemotePeerInfo() - let node2PeerInfo = node2Switch.peerInfo.toRemotePeerInfo() - let node3PeerInfo = node3Switch.peerInfo.toRemotePeerInfo() + let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo()) + let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo()) + let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo()) let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash1 = computeMessageHash(DefaultPubsubTopic, msg1) @@ -362,17 +337,10 @@ suite "Waku Sync": let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash3 = computeMessageHash(DefaultPubsubTopic, msg3) - let protoHandler: SyncCallback = proc( - hashes: seq[WakuMessageHash], peer: RemotePeerInfo - ) {.async: (raises: []), closure, gcsafe.} = - debug "Received needHashes from peer:", len = hashes.len - for hash in hashes: - debug "Hash received from peer:", hash = hash.to0xHex() - let - node1 = await newTestWakuSync(node1Switch, handler = protoHandler) - node2 = await newTestWakuSync(node2Switch, handler = protoHandler) - node3 = await newTestWakuSync(node3Switch, handler = protoHandler) + node1 = await newTestWakuSync(node1Switch) + node2 = await newTestWakuSync(node2Switch) + node3 = await newTestWakuSync(node3Switch) node1.ingessMessage(DefaultPubsubTopic, msg1) node2.ingessMessage(DefaultPubsubTopic, msg1) @@ -394,74 +362,13 @@ suite "Waku Sync": assert hashes3.isOk(), hashes3.error check: - hashes1.get().len == 1 - hashes2.get().len == 1 - hashes3.get().len == 1 + hashes1.get()[0].len == 1 + hashes2.get()[0].len == 1 + hashes3.get()[0].len == 1 - hashes1.get()[0] == hash2 - hashes2.get()[0] == hash3 - hashes3.get()[0] == hash1 + hashes1.get()[0][0] == hash2 + hashes2.get()[0][0] == hash3 + hashes3.get()[0][0] == hash1 await allFutures(node1.stop(), node2.stop(), node3.stop()) await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop()) - - suite "Bindings": - asyncTest "test c integration": - let s1Res = Storage.new() - let s1 = s1Res.value - assert s1Res.isOk(), $s1Res.error - let s2Res = Storage.new() - let s2 = s2Res.value - assert s2Res.isOk(), $s2Res.error - - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msgHash: WakuMessageHash = - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) - - check: - s1.insert(msg1.timestamp, msgHash).isOk() - s2.insert(msg1.timestamp, msgHash).isOk() - - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msgHash2: WakuMessageHash = - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) - - check: - s2.insert(msg2.timestamp, msgHash2).isOk() - - let subrange1Res = SubRange.new(s1, 0, uint64.high) - assert subrange1Res.isOk(), $subrange1Res.error - let subrange1 = subrange1Res.value - let subrange2Res = SubRange.new(s2, 0, uint64.high) - assert subrange2Res.isOk(), $subrange2Res.error - - let subrange2 = subrange2Res.value - - let ng1Res = NegentropySubRange.new(subrange1, 10000) - assert ng1Res.isOk(), $ng1Res.error - let ng1 = ng1Res.value - let ng2Res = NegentropySubRange.new(subrange2, 10000) - assert ng2Res.isOk(), $ng2Res.error - let ng2 = ng2Res.value - - let ng1_q1 = ng1.initiate() - check: - ng1_q1.isOk() - - let ng2_q1 = ng2.serverReconcile(ng1_q1.get()) - check: - ng2_q1.isOk() - - var - haveHashes: seq[WakuMessageHash] - needHashes: seq[WakuMessageHash] - let ng1_q2 = ng1.clientReconcile(ng2_q1.get(), haveHashes, needHashes) - - check: - needHashes.len() == 1 - haveHashes.len() == 0 - ng1_q2.isOk() - needHashes[0] == msgHash2 - - check: - s1.erase(msg1.timestamp, msgHash).isOk() diff --git a/vendor/negentropy b/vendor/negentropy index 028dbf3470..311a21a22b 160000 --- a/vendor/negentropy +++ b/vendor/negentropy @@ -1 +1 @@ -Subproject commit 028dbf34706fe490d75ce0f6dfc32ded35c10d4e +Subproject commit 311a21a22bdb6d80e5c4ba5e3d2f550e0062b2cb diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 439f07bc90..13c2845a9d 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -355,6 +355,34 @@ type WakuNodeConf* = object name: "store-max-num-db-connections" .}: int + ## Sync config + storeSync* {. + desc: "Enable store sync protocol: true|false", + defaultValue: false, + name: "store-sync" + .}: bool + + storeSyncInterval* {. + desc: "Interval between store sync attempts. In seconds.", + defaultValue: 3600, # 1 hours + name: "store-sync-interval" + .}: int64 + + storeSyncRelayJitter* {. + hidden, + desc: "Time offset to account for message propagation jitter. In seconds.", + defaultValue: 20, + name: "store-sync-relay-jitter" + .}: int64 + + storeSyncMaxPayloadSize* {. + hidden, + desc: + "Max size in bytes of the inner negentropy payload. Cannot be less than 5K, 0 is unlimited.", + defaultValue: 0, + name: "store-sync-max-payload-size" + .}: int64 + ## Filter config filter* {. desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter" diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index f3cbb7fe74..a81c794db8 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -275,6 +275,16 @@ proc setupProtocols( else: return err("failed to set node waku legacy store peer: " & storeNode.error) + if conf.storeSync: + ( + await node.mountWakuSync( + int(conf.storeSyncMaxPayloadSize), + conf.storeSyncInterval.seconds(), + conf.storeSyncRelayJitter.seconds(), + ) + ).isOkOr: + return err("failed to mount waku sync protocol: " & $error) + # NOTE Must be mounted after relay if conf.lightpush: try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 6a3268fbef..992f466455 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -196,59 +196,37 @@ proc connectToNodes*( ## Waku Sync -proc mountWakuSync*(node: WakuNode): Result[void, string] = +proc mountWakuSync*( + node: WakuNode, + maxFrameSize: int = DefaultMaxFrameSize, + syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: Duration = DefaultGossipSubJitter, + enablePruning: bool = true, # For testing purposes +): Future[Result[void, string]] {.async.} = if not node.wakuSync.isNil(): - return err("Waku sync already mounted, skipping") - - let prune: PruneCallback = proc( - pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] - ): Future[ - Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []), closure, gcsafe.} = - let archiveCursor = - if cursor.isSome(): - some(ArchiveCursor(hash: cursor.get())) - else: - none(ArchiveCursor) - - let query = ArchiveQuery( - cursor: archiveCursor, - startTime: some(pruneStart), - endTime: some(pruneStop), - pageSize: 100, + return err("already mounted") + + node.wakuSync = ( + await WakuSync.new( + peerManager = node.peerManager, + maxFrameSize = maxFrameSize, + syncInterval = syncInterval, + relayJitter = relayJitter, + pruning = enablePruning, + wakuArchive = node.wakuArchive, + wakuStoreClient = node.wakuStoreClient, ) + ).valueOr: + return err("initialization failed: " & error) - let catchable = catch: - await node.wakuArchive.findMessages(query) - - let res = - if catchable.isErr(): - return err(catchable.error.msg) - else: - catchable.get() - - let response = res.valueOr: - return err($error) - - let elements = collect(newSeq): - for (hash, msg) in response.hashes.zip(response.messages): - (hash, msg.timestamp) - - let cursor = - if response.cursor.isNone(): - none(WakuMessageHash) - else: - some(response.cursor.get().hash) - - return ok((elements, cursor)) + let catchable = catch: + node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) - #TODO add sync callback and options - node.wakuSync = WakuSync.new(peerManager = node.peerManager, pruneCB = some(prune)) + if catchable.isErr(): + return err("switch mounting failed: " & catchable.error.msg) - let catchRes = catch: - node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) - if catchRes.isErr(): - return err(catchRes.error.msg) + if node.started: + node.wakuSync.start() return ok() @@ -1324,6 +1302,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuMetadata.isNil(): node.wakuMetadata.start() + if not node.wakuSync.isNil(): + node.wakuSync.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 788757ac2d..c53722084b 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -99,7 +99,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = tuplesToWakuPeers(peers, lightpushPeers) if not node.wakuSync.isNil(): - # Map WakuStore peers to WakuPeers and add to return list + # Map WakuSync peers to WakuPeers and add to return list let syncPeers = node.peerManager.peerStore.peers(WakuSyncCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), diff --git a/waku/waku_core/time.nim b/waku/waku_core/time.nim index a87d4f3c11..94af9af21c 100644 --- a/waku/waku_core/time.nim +++ b/waku/waku_core/time.nim @@ -37,7 +37,8 @@ template nanosecondTime*(collector: Gauge, body: untyped) = else: body -proc timestampInSeconds*(time: Timestamp): Timestamp = +# Unused yet +#[ proc timestampInSeconds*(time: Timestamp): Timestamp = let timeStr = $time var timestamp: Timestamp = time @@ -47,4 +48,4 @@ proc timestampInSeconds*(time: Timestamp): Timestamp = timestamp = Timestamp(time div Timestamp(1_000_000)) elif timeStr.len() > 10: timestamp = Timestamp(time div Timestamp(1000)) - return timestamp + return timestamp ]# diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 4f378ddbcf..48e35a543c 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -51,7 +51,7 @@ proc sendStoreRequest( return ok(res) proc query*( - self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo + self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId ): Future[StoreQueryResult] {.async, gcsafe.} = if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) diff --git a/waku/waku_sync/common.nim b/waku/waku_sync/common.nim index 528d0e1855..e0fbdea209 100644 --- a/waku/waku_sync/common.nim +++ b/waku/waku_sync/common.nim @@ -3,25 +3,25 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[options], chronos +import std/[options], chronos, libp2p/peerId import ../waku_core const DefaultSyncInterval*: Duration = 1.hours - DefaultPruneInterval*: Duration = 30.minutes WakuSyncCodec* = "/vac/waku/sync/1.0.0" - DefaultFrameSize* = 153600 + DefaultMaxFrameSize* = 1048576 # 1 MiB DefaultGossipSubJitter*: Duration = 20.seconds type - SyncCallback* = - proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.async: (raises: []).} + TransferCallback* = proc( + hashes: seq[WakuMessageHash], peerId: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} PruneCallback* = proc( startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash) ): Future[ Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []).} + ] {.async: (raises: []), closure.} SyncPayload* = object rangeStart*: Option[uint64] diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 6ea78736bf..4ff99fcbd8 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -4,8 +4,8 @@ else: {.push raises: [].} import - std/[options], - stew/results, + std/[options, sugar, sequtils], + stew/[results, byteutils], chronicles, chronos, metrics, @@ -17,42 +17,47 @@ import ../common/nimchronos, ../common/enr, ../waku_core, + ../waku_archive, + ../waku_store/[client, common], ../waku_enr, ../node/peer_manager/peer_manager, ./raw_bindings, ./common, - ./session, - ./storage_manager + ./session logScope: topics = "waku sync" type WakuSync* = ref object of LPProtocol - storage: Storage # Negentropy protocol storage + storage: NegentropyStorage peerManager: PeerManager maxFrameSize: int + syncInterval: timer.Duration # Time between each syncronisation attempt relayJitter: Duration # Time delay until all messages are mostly received network wide - syncCallBack: Option[SyncCallBack] # Callback with the result of the syncronisation + transferCallBack: Option[TransferCallback] # Callback for message transfer. + pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query - pruneStart: Timestamp # Last pruning start timestamp - pruneInterval: Duration # Time between each pruning attempt + pruneStart: Timestamp # Last pruning start timestamp + pruneOffset: timer.Duration # Offset to prune a bit more than necessary. + periodicSyncFut: Future[void] periodicPruneFut: Future[void] +proc storageSize*(self: WakuSync): int = + self.storage.len + proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: return - #TODO: Do we need to check if this message has already been ingessed? - # because what if messages is received via gossip and sync as well? - # Might 2 entries to be inserted into storage which is inefficient. + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) - trace "inserting message into storage ", - hash = msgHash.toHex(), timestamp = msg.timestamp + trace "inserting message into waku sync storage ", + msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp if self.storage.insert(msg.timestamp, msgHash).isErr(): - debug "failed to insert message ", hash = msgHash.toHex() + error "failed to insert message ", msg_hash = msgHash.to0xHex() proc calculateRange(jitter: Duration, syncRange: Duration = 1.hours): (int64, int64) = var now = getNowInNanosecondTime() @@ -111,39 +116,29 @@ proc request( return ok(hashes) proc sync*( - self: WakuSync + self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) ): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} = - let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: - return err("No suitable peer found for sync") - let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) - let conn: Connection = connOpt.valueOr: - return err("Cannot establish sync connection") + let peer = + if peerInfo.isSome(): + peerInfo.get() + else: + let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: + return err("No suitable peer found for sync") - let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: - debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - error = $error - return err("Sync request error: " & error) + peer - return ok((hashes, peer)) - -proc sync*( - self: WakuSync, peer: RemotePeerInfo -): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) + let conn: Connection = connOpt.valueOr: return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - error = $error + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error return err("Sync request error: " & error) - return ok(hashes) + return ok((hashes, peer)) proc handleLoop( self: WakuSync, conn: Connection @@ -184,14 +179,16 @@ proc initProtocolHandler(self: WakuSync) = let hashes = (await self.handleLoop(conn)).valueOr: debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - error = $error + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error #TODO send error code and desc to client return - #TODO handle the hashes that the server need from the client + if hashes.len > 0 and self.transferCallBack.isSome(): + let callback = self.transferCallBack.get() + + (await callback(hashes, conn.peerId)).isOkOr: + error "transfer callback failed", error = $error debug "sync session ended gracefully", server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId @@ -199,20 +196,141 @@ proc initProtocolHandler(self: WakuSync) = self.handler = handle self.codec = WakuSyncCodec +proc initPruningHandler(self: WakuSync, wakuArchive: WakuArchive) = + if wakuArchive.isNil(): + return + + self.pruneCallBack = some( + proc( + pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure.} = + let archiveCursor = + if cursor.isSome(): + some(ArchiveCursor(hash: cursor.get())) + else: + none(ArchiveCursor) + + let query = ArchiveQuery( + includeData: true, + cursor: archiveCursor, + startTime: some(pruneStart), + endTime: some(pruneStop), + pageSize: 100, + ) + + let catchable = catch: + await wakuArchive.findMessages(query) + + if catchable.isErr(): + return err("archive error: " & catchable.error.msg) + + let res = catchable.get() + let response = res.valueOr: + return err("archive error: " & $error) + + let elements = collect(newSeq): + for (hash, msg) in response.hashes.zip(response.messages): + (hash, msg.timestamp) + + let cursor = + if response.cursor.isNone(): + none(WakuMessageHash) + else: + some(response.cursor.get().hash) + + return ok((elements, cursor)) + ) + +proc initTransferHandler( + self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient +) = + if wakuArchive.isNil() or wakuStoreClient.isNil(): + return + + self.transferCallBack = some( + proc( + hashes: seq[WakuMessageHash], peerId: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} = + var query = StoreQueryRequest() + query.includeData = true + query.messageHashes = hashes + query.paginationLimit = some(uint64(100)) + + while true: + let catchable = catch: + await wakuStoreClient.query(query, peerId) + + if catchable.isErr(): + return err("store client error: " & catchable.error.msg) + + let res = catchable.get() + let response = res.valueOr: + return err("store client error: " & $error) + + query.paginationCursor = response.paginationCursor + + for kv in response.messages: + let handleRes = catch: + await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) + + if handleRes.isErr(): + error "message transfer failed", error = handleRes.error.msg + # Messages can be synced next time since they are not added to storage yet. + continue + + self.ingessMessage(kv.pubsubTopic.get(), kv.message.get()) + + if query.paginationCursor.isNone(): + break + + return ok() + ) + +proc initFillStorage( + self: WakuSync, wakuArchive: WakuArchive +): Future[Result[void, string]] {.async.} = + if wakuArchive.isNil(): + return ok() + + let endTime = getNowInNanosecondTime() + let starTime = endTime - self.syncInterval.nanos + + var query = ArchiveQuery( + includeData: true, + cursor: none(ArchiveCursor), + startTime: some(starTime), + endTime: some(endTime), + pageSize: 100, + ) + + while true: + let response = (await wakuArchive.findMessages(query)).valueOr: + return err($error) + + for (topic, msg) in response.topics.zip(response.messages): + self.ingessMessage(topic, msg) + + if response.cursor.isNone(): + break + + query.cursor = response.cursor + + return ok() + proc new*( T: type WakuSync, peerManager: PeerManager, - maxFrameSize: int = DefaultFrameSize, + maxFrameSize: int = DefaultMaxFrameSize, syncInterval: timer.Duration = DefaultSyncInterval, relayJitter: Duration = DefaultGossipSubJitter, - #Default gossipsub jitter in network. - syncCB: Option[SyncCallback] = none(SyncCallback), - pruneInterval: Duration = DefaultPruneInterval, - pruneCB: Option[PruneCallBack] = none(PruneCallback), -): T = - let storage = Storage.new().valueOr: - debug "storage creation failed" - return + pruning: bool, + wakuArchive: WakuArchive, + wakuStoreClient: WakuStoreClient, +): Future[Result[T, string]] {.async.} = + let storage = NegentropyStorage.new().valueOr: + return err("negentropy storage creation failed") let sync = WakuSync( storage: storage, @@ -220,73 +338,132 @@ proc new*( maxFrameSize: maxFrameSize, syncInterval: syncInterval, relayJitter: relayJitter, - syncCallBack: syncCB, - pruneCallBack: pruneCB, - pruneStart: getNowInNanosecondTime(), - pruneInterval: pruneInterval, + pruneOffset: syncInterval div 2, ) sync.initProtocolHandler() + if pruning: + sync.initPruningHandler(wakuArchive) + + sync.initTransferHandler(wakuArchive, wakuStoreClient) + + let res = await sync.initFillStorage(wakuArchive) + if res.isErr(): + return err("initial storage filling error: " & res.error) + info "WakuSync protocol initialized" - return sync + return ok(sync) + +proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} = + debug "periodic sync initialized", interval = $self.syncInterval -proc periodicSync(self: WakuSync) {.async.} = while true: await sleepAsync(self.syncInterval) - let (hashes, peer) = (await self.sync()).valueOr: - debug "periodic sync failed", error = error - continue + debug "periodic sync started" + + var + hashes: seq[WakuMessageHash] + peer: RemotePeerInfo + tries = 3 - let callback = self.syncCallBack.valueOr: - continue + while true: + let res = (await self.sync()).valueOr: + error "sync failed", error = $error + if tries > 0: + tries -= 1 + await sleepAsync(30.seconds) + continue + else: + break + + hashes = res[0] + peer = res[1] + break + + if hashes.len > 0: + tries = 3 + while true: + (await callback(hashes, peer.peerId)).isOkOr: + error "transfer callback failed", error = $error + if tries > 0: + tries -= 1 + await sleepAsync(30.seconds) + continue + else: + break - await callback(hashes, peer) + break -proc periodicPrune(self: WakuSync) {.async.} = - let callback = self.pruneCallBack.valueOr: - return + debug "periodic sync done", hashSynced = hashes.len + +proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} = + debug "periodic prune initialized", interval = $self.syncInterval + + await sleepAsync(self.syncInterval) + + var pruneStop = getNowInNanosecondTime() while true: - await sleepAsync(self.pruneInterval) + await sleepAsync(self.syncInterval) - let pruneStop = getNowInNanosecondTime() + debug "periodic prune started", + startTime = self.pruneStart - self.pruneOffset.nanos, + endTime = pruneStop, + storageSize = self.storage.len var (elements, cursor) = (newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash)) + var tries = 3 while true: - (elements, cursor) = (await callback(self.pruneStart, pruneStop, cursor)).valueOr: - debug "storage pruning failed", error = $error - break + (elements, cursor) = ( + await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor) + ).valueOr: + error "pruning callback failed", error = $error + if tries > 0: + tries -= 1 + await sleepAsync(30.seconds) + continue + else: + break if elements.len == 0: break for (hash, timestamp) in elements: self.storage.erase(timestamp, hash).isOkOr: - trace "element pruning failed", time = timestamp, hash = hash, error = error + error "storage erase failed", + timestamp = timestamp, msg_hash = hash, error = $error continue if cursor.isNone(): break self.pruneStart = pruneStop + pruneStop = getNowInNanosecondTime() + + debug "periodic prune done", storageSize = self.storage.len proc start*(self: WakuSync) = self.started = true + self.pruneStart = getNowInNanosecondTime() - if self.syncCallBack.isSome() and self.syncInterval > ZeroDuration: - self.periodicSyncFut = self.periodicSync() + if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: + self.periodicSyncFut = self.periodicSync(self.transferCallBack.get()) - if self.pruneCallBack.isSome() and self.pruneInterval > ZeroDuration: - self.periodicPruneFut = self.periodicPrune() + if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: + self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get()) info "WakuSync protocol started" proc stopWait*(self: WakuSync) {.async.} = - await self.periodicSyncFut.cancelAndWait() + if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: + await self.periodicSyncFut.cancelAndWait() + + if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: + await self.periodicPruneFut.cancelAndWait() info "WakuSync protocol stopped" diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim index df20f30715..5aa57ae8d7 100644 --- a/waku/waku_sync/raw_bindings.nim +++ b/waku/waku_sync/raw_bindings.nim @@ -19,11 +19,11 @@ const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h" logScope: topics = "waku sync" -type Buffer* = object +type Buffer = object len*: uint64 `ptr`*: ptr uint8 -type BindingResult* = object +type BindingResult = object output: Buffer have_ids_len: uint need_ids_len: uint @@ -40,7 +40,7 @@ proc toWakuMessageHash(buffer: Buffer): WakuMessageHash = return hash -proc toBuffer*(x: openArray[byte]): Buffer = +proc toBuffer(x: openArray[byte]): Buffer = ## converts the input to a Buffer object ## the Buffer object is used to communicate data with the rln lib var temp = @x @@ -48,7 +48,7 @@ proc toBuffer*(x: openArray[byte]): Buffer = let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len)) return output -proc BufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] = +proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] = var bufLen: uint64 if isNone(len): bufLen = buffer.len @@ -70,63 +70,65 @@ proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] = ### Storage ### -type Storage* = distinct pointer +type NegentropyStorage* = distinct pointer # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27 proc storage_init( db_path: cstring, name: cstring -): Storage {.header: NEGENTROPY_HEADER, importc: "storage_new".} +): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41 proc raw_insert( - storage: Storage, timestamp: uint64, id: ptr Buffer + storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer ): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43 proc raw_erase( - storage: Storage, timestamp: uint64, id: ptr Buffer + storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer ): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29 -proc free(storage: Storage) {.header: NEGENTROPY_HEADER, importc: "storage_delete".} +proc free( + storage: NegentropyStorage +) {.header: NEGENTROPY_HEADER, importc: "storage_delete".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31 -proc size*( - storage: Storage +proc size( + storage: NegentropyStorage ): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".} ### Negentropy ### -type Negentropy* = distinct pointer +type RawNegentropy* = distinct pointer # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33 proc constructNegentropy( - storage: Storage, frameSizeLimit: uint64 -): Negentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".} + storage: NegentropyStorage, frameSizeLimit: uint64 +): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37 proc raw_initiate( - negentropy: Negentropy, r: ptr BindingResult + negentropy: RawNegentropy, r: ptr BindingResult ): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39 proc raw_setInitiator( - negentropy: Negentropy + negentropy: RawNegentropy ) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45 proc raw_reconcile( - negentropy: Negentropy, query: ptr Buffer, r: ptr BindingResult + negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult ): int {.header: NEGENTROPY_HEADER, importc: "reconcile".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51 proc raw_reconcile_with_ids( - negentropy: Negentropy, query: ptr Buffer, r: ptr BindingResult + negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult ): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35 proc free( - negentropy: Negentropy + negentropy: RawNegentropy ) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53 @@ -136,57 +138,61 @@ proc free_result( ### SubRange ### -type SubRange* = distinct pointer +type NegentropySubRangeStorage* = distinct pointer # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57 proc subrange_init( - storage: Storage, startTimestamp: uint64, endTimestamp: uint64 -): SubRange {.header: NEGENTROPY_HEADER, importc: "subrange_new".} + storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64 +): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".} # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59 -proc free(subrange: SubRange) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".} +proc free( + subrange: NegentropySubRangeStorage +) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".} # https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31 -proc size*( - subrange: SubRange +proc size( + subrange: NegentropySubRangeStorage ): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".} -### Negentropy with SubRange ### +### Negentropy with NegentropySubRangeStorage ### -type NegentropySubRange* = distinct pointer +type RawNegentropySubRange = distinct pointer # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61 proc constructNegentropyWithSubRange( - subrange: SubRange, frameSizeLimit: uint64 -): NegentropySubRange {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new".} + subrange: NegentropySubRangeStorage, frameSizeLimit: uint64 +): RawNegentropySubRange {. + header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new" +.} # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65 proc raw_initiate_subrange( - negentropy: NegentropySubRange, r: ptr BindingResult + negentropy: RawNegentropySubRange, r: ptr BindingResult ): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".} # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67 proc raw_reconcile_subrange( - negentropy: NegentropySubRange, query: ptr Buffer, r: ptr BindingResult + negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult ): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".} # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69 proc raw_reconcile_with_ids_subrange( - negentropy: NegentropySubRange, query: ptr Buffer, r: ptr BindingResult + negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult ): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".} # https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63 proc free( - negentropy: NegentropySubRange + negentropy: RawNegentropySubRange ) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".} ### Wrappings ### -type NegentropyPayload* = distinct seq[byte] +### Storage ### -proc `==`*(a: Storage, b: pointer): bool {.borrow.} +proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.} -proc new*(T: type Storage): Result[T, string] = +proc new*(T: type NegentropyStorage): Result[T, string] = #TODO db name and path let storage = storage_init("", "") @@ -195,13 +201,15 @@ proc new*(T: type Storage): Result[T, string] = return err("storage initialization failed") ]# return ok(storage) -proc delete*(storage: Storage) = - free(storage) - -proc erase*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, string] = - let cString = toBuffer(hash) +proc delete*(storage: NegentropyStorage) = + storage.free() - let res = raw_erase(storage, uint64(id), cString.unsafeAddr) +proc erase*( + storage: NegentropyStorage, id: int64, hash: WakuMessageHash +): Result[void, string] = + var buffer = toBuffer(hash) + var bufPtr = addr(buffer) + let res = raw_erase(storage, uint64(id), bufPtr) #TODO error handling once we move to lmdb @@ -210,7 +218,9 @@ proc erase*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, st else: return err("erase error") -proc insert*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, string] = +proc insert*( + storage: NegentropyStorage, id: int64, hash: WakuMessageHash +): Result[void, string] = var buffer = toBuffer(hash) var bufPtr = addr(buffer) let res = raw_insert(storage, uint64(id), bufPtr) @@ -222,39 +232,107 @@ proc insert*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, s else: return err("insert error") -proc `==`*(a: NegentropySubRange, b: pointer): bool {.borrow.} +proc len*(storage: NegentropyStorage): int = + int(storage.size) + +### SubRange ### + +proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.} + +proc new*( + T: type NegentropySubRangeStorage, + storage: NegentropyStorage, + startTime: uint64 = uint64.low, + endTime: uint64 = uint64.high, +): Result[T, string] = + let subrange = subrange_init(storage, startTime, endTime) + + #[ TODO: Uncomment once we move to lmdb + if storage == nil: + return err("storage initialization failed") ]# + return ok(subrange) + +proc delete*(subrange: NegentropySubRangeStorage) = + subrange.free() + +proc len*(subrange: NegentropySubRangeStorage): int = + int(subrange.size) + +### Interface ### + +type + Negentropy* = ref object of RootObj + + NegentropyWithSubRange = ref object of Negentropy + inner: RawNegentropySubRange + + NegentropyWithStorage = ref object of Negentropy + inner: RawNegentropy + + NegentropyPayload* = distinct seq[byte] + +method delete*(self: Negentropy) {.base.} = + discard + +method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} = + discard + +method serverReconcile*( + self: Negentropy, query: NegentropyPayload +): Result[NegentropyPayload, string] {.base.} = + discard + +method clientReconcile*( + self: Negentropy, + query: NegentropyPayload, + haves: var seq[WakuMessageHash], + needs: var seq[WakuMessageHash], +): Result[Option[NegentropyPayload], string] {.base.} = + discard + +### Impl. ### proc new*( - T: type NegentropySubrange, subrange: SubRange, frameSizeLimit: int + T: type Negentropy, + storage: NegentropyStorage | NegentropySubRangeStorage, + frameSizeLimit: int, ): Result[T, string] = - let negentropy = constructNegentropyWithSubRange(subrange, uint64(frameSizeLimit)) - if negentropy == nil: - return err("negentropy initialization failed due to lower framesize") - return ok(negentropy) + if storage is NegentropyStorage: + let raw_negentropy = + constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit)) + + let negentropy = NegentropyWithStorage(inner: raw_negentropy) -proc delete*(negentropy: NegentropySubRange) = - free(negentropy) + return ok(negentropy) + elif storage is NegentropySubRangeStorage: + let raw_negentropy = constructNegentropyWithSubRange( + NegentropySubRangeStorage(storage), uint64(frameSizeLimit) + ) -proc initiate*(negentropy: NegentropySubrange): Result[NegentropyPayload, string] = + let negentropy = NegentropyWithSubRange(inner: raw_negentropy) + + return ok(negentropy) + +method delete*(self: NegentropyWithSubRange) = + self.inner.free() + +method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] = ## Client inititate a sync session with a server by sending a payload var myResult {.noinit.}: BindingResult = BindingResult() var myResultPtr = addr myResult - let ret = raw_initiate_subrange(negentropy, myResultPtr) + let ret = self.inner.raw_initiate_subrange(myResultPtr) if ret < 0 or myResultPtr == nil: error "negentropy initiate failed with code ", code = ret return err("negentropy already initiated!") - let bytes: seq[byte] = BufferToBytes(addr(myResultPtr.output)) + let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) free_result(myResultPtr) trace "received return from initiate", len = myResultPtr.output.len return ok(NegentropyPayload(bytes)) -proc setInitiator*(negentropy: Negentropy) = - raw_setInitiator(negentropy) - -proc serverReconcile*( - negentropy: NegentropySubrange, query: NegentropyPayload +method serverReconcile*( + self: NegentropyWithSubRange, query: NegentropyPayload ): Result[NegentropyPayload, string] = ## Server response to a negentropy payload. ## Always return an answer. @@ -264,23 +342,23 @@ proc serverReconcile*( var myResult {.noinit.}: BindingResult = BindingResult() var myResultPtr = addr myResult - let ret = raw_reconcile_subrange(negentropy, queryBufPtr, myResultPtr) + let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr) if ret < 0: error "raw_reconcile failed with code ", code = ret return err($myResultPtr.error) trace "received return from raw_reconcile", len = myResultPtr.output.len - let outputBytes: seq[byte] = BufferToBytes(addr(myResultPtr.output)) + let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) trace "outputBytes len", len = outputBytes.len free_result(myResultPtr) return ok(NegentropyPayload(outputBytes)) -proc clientReconcile*( - negentropy: NegentropySubrange, +method clientReconcile*( + self: NegentropyWithSubRange, query: NegentropyPayload, - haveIds: var seq[WakuMessageHash], - needIds: var seq[WakuMessageHash], + haves: var seq[WakuMessageHash], + needs: var seq[WakuMessageHash], ): Result[Option[NegentropyPayload], string] = ## Client response to a negentropy payload. ## May return an answer, if not the sync session done. @@ -292,12 +370,12 @@ proc clientReconcile*( myResult.need_ids_len = 0 var myResultPtr = addr myResult - let ret = raw_reconcile_with_ids_subrange(negentropy, cQuery.unsafeAddr, myResultPtr) + let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr) if ret < 0: error "raw_reconcile failed with code ", code = ret return err($myResultPtr.error) - let output = BufferToBytes(addr myResult.output) + let output = bufferToBytes(addr myResult.output) var have_hashes: seq[Buffer] @@ -313,13 +391,13 @@ proc clientReconcile*( for i in 0 .. have_hashes.len - 1: var hash = toWakuMessageHash(have_hashes[i]) - trace "have hashes ", index = i, hash = hash.to0xHex() - haveIds.add(hash) + trace "have hashes ", index = i, msg_hash = hash.to0xHex() + haves.add(hash) for i in 0 .. need_hashes.len - 1: var hash = toWakuMessageHash(need_hashes[i]) - trace "need hashes ", index = i, hash = hash.to0xHex() - needIds.add(hash) + trace "need hashes ", index = i, msg_hash = hash.to0xHex() + needs.add(hash) trace "return ", output = output, len = output.len @@ -330,20 +408,97 @@ proc clientReconcile*( return ok(some(NegentropyPayload(output))) -### Subrange specific methods +method delete*(self: NegentropyWithStorage) = + self.inner.free() -proc new*( - T: type SubRange, - storage: Storage, - startTime: uint64 = uint64.low, - endTime: uint64 = uint64.high, -): Result[T, string] = - let subrange = subrange_init(storage, startTime, endTime) +method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] = + ## Client inititate a sync session with a server by sending a payload + var myResult {.noinit.}: BindingResult = BindingResult() + var myResultPtr = addr myResult - #[ TODO: Uncomment once we move to lmdb - if storage == nil: - return err("storage initialization failed") ]# - return ok(subrange) + let ret = self.inner.raw_initiate(myResultPtr) + if ret < 0 or myResultPtr == nil: + error "negentropy initiate failed with code ", code = ret + return err("negentropy already initiated!") + let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) + free_result(myResultPtr) + trace "received return from initiate", len = myResultPtr.output.len + + return ok(NegentropyPayload(bytes)) -proc delete*(subrange: SubRange) = - free(subrange) +method serverReconcile*( + self: NegentropyWithStorage, query: NegentropyPayload +): Result[NegentropyPayload, string] = + ## Server response to a negentropy payload. + ## Always return an answer. + + let queryBuf = toBuffer(seq[byte](query)) + var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error + var myResult {.noinit.}: BindingResult = BindingResult() + var myResultPtr = addr myResult + + let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr) + if ret < 0: + error "raw_reconcile failed with code ", code = ret + return err($myResultPtr.error) + trace "received return from raw_reconcile", len = myResultPtr.output.len + + let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) + trace "outputBytes len", len = outputBytes.len + free_result(myResultPtr) + + return ok(NegentropyPayload(outputBytes)) + +method clientReconcile*( + self: NegentropyWithStorage, + query: NegentropyPayload, + haves: var seq[WakuMessageHash], + needs: var seq[WakuMessageHash], +): Result[Option[NegentropyPayload], string] = + ## Client response to a negentropy payload. + ## May return an answer, if not the sync session done. + + let cQuery = toBuffer(seq[byte](query)) + + var myResult {.noinit.}: BindingResult = BindingResult() + myResult.have_ids_len = 0 + myResult.need_ids_len = 0 + var myResultPtr = addr myResult + + let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr) + if ret < 0: + error "raw_reconcile failed with code ", code = ret + return err($myResultPtr.error) + + let output = bufferToBytes(addr myResult.output) + + var + have_hashes: seq[Buffer] + need_hashes: seq[Buffer] + + if myResult.have_ids_len > 0: + have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) + if myResult.need_ids_len > 0: + need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) + + trace "have and need hashes ", + have_count = have_hashes.len, need_count = need_hashes.len + + for i in 0 .. have_hashes.len - 1: + var hash = toWakuMessageHash(have_hashes[i]) + trace "have hashes ", index = i, msg_hash = hash.to0xHex() + haves.add(hash) + + for i in 0 .. need_hashes.len - 1: + var hash = toWakuMessageHash(need_hashes[i]) + trace "need hashes ", index = i, msg_hash = hash.to0xHex() + needs.add(hash) + + trace "return ", output = output, len = output.len + + free_result(myResultPtr) + + if output.len < 1: + return ok(none(NegentropyPayload)) + + return ok(some(NegentropyPayload(output))) diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim index 7ebe9e708d..9f65973233 100644 --- a/waku/waku_sync/session.nim +++ b/waku/waku_sync/session.nim @@ -24,42 +24,42 @@ type ServerSync* = object type Reconciled*[T] = object sync: T - negentropy: NegentropySubRange + negentropy: Negentropy connection: Connection frameSize: int payload*: SyncPayload type Sent*[T] = object sync: T - negentropy: NegentropySubRange + negentropy: Negentropy connection: Connection frameSize: int type Received*[T] = object sync: T - negentropy: NegentropySubRange + negentropy: Negentropy connection: Connection frameSize: int payload*: SyncPayload type Completed*[T] = object sync: T - negentropy: NegentropySubRange + negentropy: Negentropy connection: Connection haveHashes: seq[WakuMessageHash] ### State Transition ### proc clientInitialize*( - store: Storage, + store: NegentropyStorage, conn: Connection, - frameSize = DefaultFrameSize, + frameSize = DefaultMaxFrameSize, start = int64.low, `end` = int64.high, ): Result[Reconciled[ClientSync], string] = - let subrange = ?SubRange.new(store, uint64(start), uint64(`end`)) + let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`)) - let negentropy = ?NegentropySubrange.new(subrange, frameSize) + let negentropy = ?Negentropy.new(subrange, frameSize) let negentropyPayload = ?negentropy.initiate() @@ -78,15 +78,15 @@ proc clientInitialize*( ) proc serverInitialize*( - store: Storage, + store: NegentropyStorage, conn: Connection, - frameSize = DefaultFrameSize, + frameSize = DefaultMaxFrameSize, start = int64.low, `end` = int64.high, ): Result[Sent[ServerSync], string] = - let subrange = ?SubRange.new(store, uint64(start), uint64(`end`)) + let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`)) - let negentropy = ?NegentropySubrange.new(subrange, frameSize) + let negentropy = ?Negentropy.new(subrange, frameSize) let sync = ServerSync() diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim index 64fea97842..b6a64e6b2d 100644 --- a/waku/waku_sync/storage_manager.nim +++ b/waku/waku_sync/storage_manager.nim @@ -1,4 +1,5 @@ -when (NimMajor, NimMinor) < (1, 4): +# Unused yet. +#[ when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: {.push raises: [].} @@ -72,3 +73,4 @@ proc retrieveStorage*( self.storages[dateTime] = storage return ok(some(storage)) + ]#