From a65b13fcb8dfdf7378791cd709c82395d41d9295 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Mon, 27 May 2024 10:54:10 -0400 Subject: [PATCH] fix: invalid cursor returning messages (#2724) Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- .../test_driver_postgres_query.nim | 13 ++- .../waku_archive/test_driver_queue_query.nim | 13 ++- .../waku_archive/test_driver_sqlite_query.nim | 17 +++- tests/wakunode_rest/test_rest_store.nim | 87 ++++++++++++++++++- waku/waku_api/rest/store/types.nim | 3 + .../driver/sqlite_driver/queries.nim | 25 +++--- 6 files changed, 137 insertions(+), 21 deletions(-) diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index 80e0a231a5..8614b6af1f 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -675,11 +675,20 @@ suite "Postgres driver - queries": ) ).isOk() - let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage()) + let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage()) + let cursor = ArchiveCursor(hash: fakeCursor) ## When let res = await driver.getMessages( - cursor = some(cursor), maxPageSize = 2, ascendingOrder = false + includeData = true, + contentTopicSeq = @[DefaultContentTopic], + pubsubTopic = none(PubsubTopic), + cursor = some(cursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + hashes = @[], + maxPageSize = 5, + ascendingOrder = true, ) ## Then diff --git a/tests/waku_archive/test_driver_queue_query.nim b/tests/waku_archive/test_driver_queue_query.nim index 4b09932644..60016c771d 100644 --- a/tests/waku_archive/test_driver_queue_query.nim +++ b/tests/waku_archive/test_driver_queue_query.nim @@ -637,11 +637,20 @@ suite "Queue driver - query by cursor": ) require retFut.isOk() - let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage()) + let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage()) + let cursor = ArchiveCursor(hash: fakeCursor) ## When let res = waitFor driver.getMessages( - cursor = some(cursor), maxPageSize = 2, ascendingOrder = false + includeData = true, + contentTopic = @[DefaultContentTopic], + pubsubTopic = none(PubsubTopic), + cursor = some(cursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + hashes = @[], + maxPageSize = 5, + ascendingOrder = true, ) ## Then diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index 873107f3bb..58968b1b87 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -701,17 +701,26 @@ suite "SQLite driver - query by cursor": ) ).isOk() - let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage()) + let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage()) + let cursor = ArchiveCursor(hash: fakeCursor) ## When let res = await driver.getMessages( - cursor = some(cursor), maxPageSize = 2, ascendingOrder = false + includeData = true, + contentTopic = @[DefaultContentTopic], + pubsubTopic = none(PubsubTopic), + cursor = some(cursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + hashes = @[], + maxPageSize = 5, + ascendingOrder = true, ) ## Then check: - res.isOk() - res.value.len == 0 + res.isErr() + res.error == "cursor not found" ## Cleanup (await driver.close()).expect("driver to close") diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 3d699c38a8..cb5c0cd87b 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -1,9 +1,10 @@ {.used.} import - std/[options, times, sugar], + std/[options, sugar], stew/shims/net as stewNet, chronicles, + chronos/timer, testutils/unittests, eth/keys, presto, @@ -24,8 +25,10 @@ import ../../../waku/waku_api/rest/store/types, ../../../waku/waku_archive, ../../../waku/waku_archive/driver/queue_driver, + ../../../waku/waku_archive/driver/sqlite_driver, + ../../../waku/common/databases/db_sqlite, + ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_store as waku_store, - ../../../waku/common/base64, ../testlib/wakucore, ../testlib/wakunode @@ -42,7 +45,7 @@ proc put( if message.timestamp > 0: message.timestamp else: - getNanosecondTime(getTime().toUnixFloat()) + getNowInNanosecondTime() store.put(pubsubTopic, message, digest, msgHash, receivedTime) @@ -84,6 +87,84 @@ procSuite "Waku Rest API - Store v3": check: expected.get() == msgHashRes.get().get().toRestStringWakuMessageHash() + asyncTest "invalid cursor": + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58011) + let restAddress = parseIpAddress("0.0.0.0") + let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + + installStoreApiHandlers(restServer.router, node) + restServer.start() + + # WakuStore setup + let db: SqliteDatabase = + SqliteDatabase.new(string.none().get(":memory:")).expect("valid DB") + let driver: ArchiveDriver = SqliteDriver.new(db).expect("valid driver") + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + + await node.mountStore() + node.mountStoreClient() + + let key = generateEcdsaKey() + var peerSwitch = newStandardSwitch(some(key)) + await peerSwitch.start() + + peerSwitch.mount(node.wakuStore) + + await sleepAsync(1.seconds()) + + # Now prime it with some history before tests + let msgList = + @[ + fakeWakuMessage(@[byte 0], contentTopic = ContentTopic("ct1"), ts = 0), + fakeWakuMessage(@[byte 1], ts = 1), + fakeWakuMessage(@[byte 1, byte 2], ts = 2), + fakeWakuMessage(@[byte 1], ts = 3), + fakeWakuMessage(@[byte 1], ts = 4), + fakeWakuMessage(@[byte 1], ts = 5), + fakeWakuMessage(@[byte 1], ts = 6), + fakeWakuMessage(@[byte 9], contentTopic = ContentTopic("c2"), ts = 9), + ] + for msg in msgList: + require (await driver.put(DefaultPubsubTopic, msg)).isOk() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo() + let fullAddr = $remotePeerInfo.addrs[0] & "/p2p/" & $remotePeerInfo.peerId + + await sleepAsync(1.seconds()) + + let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage()) + let encodedCursor = fakeCursor.toRestStringWakuMessageHash() + + # Apply filter by start and end timestamps + var response = await client.getStoreMessagesV3( + encodeUrl(fullAddr), + "true", # include data + "", # pubsub topic + "ct1,c2", # empty content topics. + "", # start time + "", # end time + "", # hashes + encodedCursor, # base64-encoded hash + "true", # ascending + "5", # empty implies default page size + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.messages.len == 0 + + await restServer.stop() + await restServer.closeWait() + await node.stop() + asyncTest "Filter by start and end time": let node = testWakuNode() await node.start() diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index b899dc0d09..872d0ce596 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -35,6 +35,9 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string] let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr: return err("waku message hash parsing error: " & error) + if decodedBytes.len != 32: + return err("waku message hash parsing error: invalid hash length: " & $decodedBytes.len) + let hash: WakuMessageHash = fromBytes(decodedBytes) return ok(some(hash)) diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 3aeac41cbd..94f323b2de 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -674,16 +674,18 @@ proc selectMessagesByStoreQueryWithLimit*( if cursor.isSome() and cursor.get() != EmptyWakuMessageHash: let hash: WakuMessageHash = cursor.get() - var wakuMessage: WakuMessage + var wakuMessage: Option[WakuMessage] proc queryRowCallback(s: ptr sqlite3_stmt) = - wakuMessage = queryRowWakuMessageCallback( - s, - contentTopicCol = 0, - payloadCol = 1, - versionCol = 2, - senderTimestampCol = 3, - metaCol = 4, + wakuMessage = some( + queryRowWakuMessageCallback( + s, + contentTopicCol = 0, + payloadCol = 1, + versionCol = 2, + senderTimestampCol = 3, + metaCol = 4, + ) ) let query = selectMessageByHashQuery() @@ -691,9 +693,12 @@ proc selectMessagesByStoreQueryWithLimit*( ?dbStmt.execSelectMessageByHash(hash, queryRowCallback) dbStmt.dispose() - let time: Timestamp = wakuMessage.timestamp + if wakuMessage.isSome(): + let time = wakuMessage.get().timestamp - some((time, hash)) + some((time, hash)) + else: + return err("cursor not found") else: none((Timestamp, WakuMessageHash))