Skip to content

Commit

Permalink
fix: invalid cursor returning messages (#2724)
Browse files Browse the repository at this point in the history
Co-authored-by: Ivan FB <[email protected]>
  • Loading branch information
SionoiS and Ivansete-status authored May 27, 2024
1 parent 6fbab63 commit a65b13f
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 21 deletions.
13 changes: 11 additions & 2 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,20 @@ suite "Postgres driver - queries":
)
).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let cursor = ArchiveCursor(hash: fakeCursor)

## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
includeData = true,
contentTopicSeq = @[DefaultContentTopic],
pubsubTopic = none(PubsubTopic),
cursor = some(cursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = @[],
maxPageSize = 5,
ascendingOrder = true,
)

## Then
Expand Down
13 changes: 11 additions & 2 deletions tests/waku_archive/test_driver_queue_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,20 @@ suite "Queue driver - query by cursor":
)
require retFut.isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let cursor = ArchiveCursor(hash: fakeCursor)

## When
let res = waitFor driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
includeData = true,
contentTopic = @[DefaultContentTopic],
pubsubTopic = none(PubsubTopic),
cursor = some(cursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = @[],
maxPageSize = 5,
ascendingOrder = true,
)

## Then
Expand Down
17 changes: 13 additions & 4 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -701,17 +701,26 @@ suite "SQLite driver - query by cursor":
)
).isOk()

let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage())
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let cursor = ArchiveCursor(hash: fakeCursor)

## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
includeData = true,
contentTopic = @[DefaultContentTopic],
pubsubTopic = none(PubsubTopic),
cursor = some(cursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = @[],
maxPageSize = 5,
ascendingOrder = true,
)

## Then
check:
res.isOk()
res.value.len == 0
res.isErr()
res.error == "cursor not found"

## Cleanup
(await driver.close()).expect("driver to close")
Expand Down
87 changes: 84 additions & 3 deletions tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{.used.}

import
std/[options, times, sugar],
std/[options, sugar],
stew/shims/net as stewNet,
chronicles,
chronos/timer,
testutils/unittests,
eth/keys,
presto,
Expand All @@ -24,8 +25,10 @@ import
../../../waku/waku_api/rest/store/types,
../../../waku/waku_archive,
../../../waku/waku_archive/driver/queue_driver,
../../../waku/waku_archive/driver/sqlite_driver,
../../../waku/common/databases/db_sqlite,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_store as waku_store,
../../../waku/common/base64,
../testlib/wakucore,
../testlib/wakunode

Expand All @@ -42,7 +45,7 @@ proc put(
if message.timestamp > 0:
message.timestamp
else:
getNanosecondTime(getTime().toUnixFloat())
getNowInNanosecondTime()

store.put(pubsubTopic, message, digest, msgHash, receivedTime)

Expand Down Expand Up @@ -84,6 +87,84 @@ procSuite "Waku Rest API - Store v3":
check:
expected.get() == msgHashRes.get().get().toRestStringWakuMessageHash()

asyncTest "invalid cursor":
let node = testWakuNode()
await node.start()
await node.mountRelay()

let restPort = Port(58011)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()

installStoreApiHandlers(restServer.router, node)
restServer.start()

# WakuStore setup
let db: SqliteDatabase =
SqliteDatabase.new(string.none().get(":memory:")).expect("valid DB")
let driver: ArchiveDriver = SqliteDriver.new(db).expect("valid driver")
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error

await node.mountStore()
node.mountStoreClient()

let key = generateEcdsaKey()
var peerSwitch = newStandardSwitch(some(key))
await peerSwitch.start()

peerSwitch.mount(node.wakuStore)

await sleepAsync(1.seconds())

# Now prime it with some history before tests
let msgList =
@[
fakeWakuMessage(@[byte 0], contentTopic = ContentTopic("ct1"), ts = 0),
fakeWakuMessage(@[byte 1], ts = 1),
fakeWakuMessage(@[byte 1, byte 2], ts = 2),
fakeWakuMessage(@[byte 1], ts = 3),
fakeWakuMessage(@[byte 1], ts = 4),
fakeWakuMessage(@[byte 1], ts = 5),
fakeWakuMessage(@[byte 1], ts = 6),
fakeWakuMessage(@[byte 9], contentTopic = ContentTopic("c2"), ts = 9),
]
for msg in msgList:
require (await driver.put(DefaultPubsubTopic, msg)).isOk()

let client = newRestHttpClient(initTAddress(restAddress, restPort))

let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
let fullAddr = $remotePeerInfo.addrs[0] & "/p2p/" & $remotePeerInfo.peerId

await sleepAsync(1.seconds())

let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let encodedCursor = fakeCursor.toRestStringWakuMessageHash()

# Apply filter by start and end timestamps
var response = await client.getStoreMessagesV3(
encodeUrl(fullAddr),
"true", # include data
"", # pubsub topic
"ct1,c2", # empty content topics.
"", # start time
"", # end time
"", # hashes
encodedCursor, # base64-encoded hash
"true", # ascending
"5", # empty implies default page size
)

check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 0

await restServer.stop()
await restServer.closeWait()
await node.stop()

asyncTest "Filter by start and end time":
let node = testWakuNode()
await node.start()
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string]
let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
return err("waku message hash parsing error: " & error)

if decodedBytes.len != 32:
return err("waku message hash parsing error: invalid hash length: " & $decodedBytes.len)

let hash: WakuMessageHash = fromBytes(decodedBytes)

return ok(some(hash))
Expand Down
25 changes: 15 additions & 10 deletions waku/waku_archive/driver/sqlite_driver/queries.nim
Original file line number Diff line number Diff line change
Expand Up @@ -674,26 +674,31 @@ proc selectMessagesByStoreQueryWithLimit*(
if cursor.isSome() and cursor.get() != EmptyWakuMessageHash:
let hash: WakuMessageHash = cursor.get()

var wakuMessage: WakuMessage
var wakuMessage: Option[WakuMessage]

proc queryRowCallback(s: ptr sqlite3_stmt) =
wakuMessage = queryRowWakuMessageCallback(
s,
contentTopicCol = 0,
payloadCol = 1,
versionCol = 2,
senderTimestampCol = 3,
metaCol = 4,
wakuMessage = some(
queryRowWakuMessageCallback(
s,
contentTopicCol = 0,
payloadCol = 1,
versionCol = 2,
senderTimestampCol = 3,
metaCol = 4,
)
)

let query = selectMessageByHashQuery()
let dbStmt = ?db.prepareStmt(query)
?dbStmt.execSelectMessageByHash(hash, queryRowCallback)
dbStmt.dispose()

let time: Timestamp = wakuMessage.timestamp
if wakuMessage.isSome():
let time = wakuMessage.get().timestamp

some((time, hash))
some((time, hash))
else:
return err("cursor not found")
else:
none((Timestamp, WakuMessageHash))

Expand Down

0 comments on commit a65b13f

Please sign in to comment.