Skip to content

Commit

Permalink
chore: Better timing and requestId detail for slower store db queries (
Browse files Browse the repository at this point in the history
…#2994)

* Better timing and requestId detail for store db queries slower than two seconds
* Adapt tests and client to allow sending custom store requestId
  • Loading branch information
Ivansete-status authored Aug 29, 2024
1 parent fd84363 commit e8a49b7
Show file tree
Hide file tree
Showing 20 changed files with 111 additions and 27 deletions.
7 changes: 7 additions & 0 deletions tests/waku_store_legacy/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ suite "Store Client":
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
direction: PagingDirection.FORWARD,
requestId: "customRequestId",
)

serverSwitch = newTestSwitch()
Expand Down Expand Up @@ -93,33 +94,39 @@ suite "Store Client":
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[],
direction: PagingDirection.FORWARD,
requestId: "reqId1",
)
invalidQuery2 = HistoryQuery(
pubsubTopic: PubsubTopic.none(),
contentTopics: @[DefaultContentTopic],
direction: PagingDirection.FORWARD,
requestId: "reqId2",
)
invalidQuery3 = HistoryQuery(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
pageSize: 0,
requestId: "reqId3",
)
invalidQuery4 = HistoryQuery(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
pageSize: 0,
requestId: "reqId4",
)
invalidQuery5 = HistoryQuery(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
startTime: some(0.Timestamp),
endTime: some(0.Timestamp),
requestId: "reqId5",
)
invalidQuery6 = HistoryQuery(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
startTime: some(0.Timestamp),
endTime: some(-1.Timestamp),
requestId: "reqId6",
)

# When the query is sent to the server
Expand Down
8 changes: 6 additions & 2 deletions tests/waku_store_legacy/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ suite "Waku Store - query handler legacy":
client = newTestWakuStoreClient(clientSwitch)

let req = HistoryQuery(
contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD
contentTopics: @[DefaultContentTopic],
direction: PagingDirection.FORWARD,
requestId: "reqId",
)

## When
Expand Down Expand Up @@ -96,7 +98,9 @@ suite "Waku Store - query handler legacy":
client = newTestWakuStoreClient(clientSwitch)

let req = HistoryQuery(
contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD
contentTopics: @[DefaultContentTopic],
direction: PagingDirection.FORWARD,
requestId: "reqId",
)

info "check point" # log added to track flaky test
Expand Down
21 changes: 19 additions & 2 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Inspired by: https://github.com/treeform/pg/
{.push raises: [].}

import std/[sequtils, nre, strformat, sets], results, chronos
import ./dbconn, ../common
import std/[sequtils, nre, strformat, sets], results, chronos, chronicles
import ./dbconn, ../common, ../../../waku_core/time

type PgAsyncPoolState {.pure.} = enum
Closed
Expand Down Expand Up @@ -149,18 +149,26 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
if pool.conns[i].dbConn == conn:
pool.conns[i].busy = false

const SlowQueryThresholdInNanoSeconds = 2_000_000_000

proc pgQuery*(
pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0),
rowCallback: DataProc = nil,
requestId: string = "",
): Future[DatabaseResult[void]] {.async.} =
let connIndex = (await pool.getConnIndex()).valueOr:
return err("connRes.isErr in query: " & $error)

let queryStartTime = getNowInNanosecondTime()
let conn = pool.conns[connIndex].dbConn
defer:
pool.releaseConn(conn)
let queryDuration = getNowInNanosecondTime() - queryStartTime
if queryDuration > SlowQueryThresholdInNanoSeconds:
debug "pgQuery slow query",
query_duration_secs = (queryDuration / 1_000_000_000), query, requestId

(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
return err("error in asyncpool query: " & $error)
Expand All @@ -175,6 +183,7 @@ proc runStmt*(
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc = nil,
requestId: string = "",
): Future[DatabaseResult[void]] {.async.} =
## Runs a stored statement, for performance purposes.
## The stored statements are connection specific and is a technique of caching a very common
Expand All @@ -187,8 +196,16 @@ proc runStmt*(
return err("Error in runStmt: " & $error)

let conn = pool.conns[connIndex].dbConn
let queryStartTime = getNowInNanosecondTime()

defer:
pool.releaseConn(conn)
let queryDuration = getNowInNanosecondTime() - queryStartTime
if queryDuration > SlowQueryThresholdInNanoSeconds:
debug "runStmt slow query",
query_duration = queryDuration / 1_000_000_000,
query = stmtDefinition,
requestId

if not pool.conns[connIndex].preparedStmts.contains(stmtName):
# The connection doesn't have that statement yet. Let's create it.
Expand Down
2 changes: 2 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ proc toArchiveQuery(
endTime: request.endTime,
pageSize: request.pageSize.uint,
direction: request.direction,
requestId: request.requestId,
)

# TODO: Review this mapping logic. Maybe, move it to the appplication code
Expand Down Expand Up @@ -910,6 +911,7 @@ proc toArchiveQuery(request: StoreQueryRequest): waku_archive.ArchiveQuery =
query.hashes = request.messageHashes
query.cursor = request.paginationCursor
query.direction = request.paginationForward
query.requestId = request.requestId

if request.paginationLimit.isSome():
query.pageSize = uint(request.paginationLimit.get())
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ proc findMessages*(
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
requestId = query.requestId,
)
).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type
hashes*: seq[WakuMessageHash]
pageSize*: uint
direction*: PagingDirection
requestId*: string

ArchiveResponse* = object
hashes*: seq[WakuMessageHash]
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ method getMessages*(
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} =
discard

Expand Down
32 changes: 24 additions & 8 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ proc getMessagesArbitraryQuery(
hexHashes: seq[string] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.

Expand Down Expand Up @@ -489,7 +490,7 @@ proc getMessagesArbitraryQuery(
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)

(await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr:
(await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr:
return err("failed to run query: " & $error)

return ok(rows)
Expand All @@ -504,6 +505,7 @@ proc getMessageHashesArbitraryQuery(
hexHashes: seq[string] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[(WakuMessageHash, PubsubTopic, WakuMessage)]]] {.
async
.} =
Expand Down Expand Up @@ -571,7 +573,7 @@ proc getMessageHashesArbitraryQuery(
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)

(await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr:
(await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr:
return err("failed to run query: " & $error)

return ok(rows)
Expand All @@ -586,6 +588,7 @@ proc getMessagesPreparedStmt(
hashes: string,
maxPageSize = DefaultPageSize,
ascOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc aims to run the most typical queries in a more performant way,
## i.e. by means of prepared statements.
Expand Down Expand Up @@ -619,6 +622,7 @@ proc getMessagesPreparedStmt(
],
@[int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
Expand Down Expand Up @@ -659,6 +663,7 @@ proc getMessagesPreparedStmt(
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
Expand All @@ -675,6 +680,7 @@ proc getMessageHashesPreparedStmt(
hashes: string,
maxPageSize = DefaultPageSize,
ascOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc aims to run the most typical queries in a more performant way,
## i.e. by means of prepared statements.
Expand Down Expand Up @@ -710,6 +716,7 @@ proc getMessageHashesPreparedStmt(
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
Expand Down Expand Up @@ -753,14 +760,15 @@ proc getMessageHashesPreparedStmt(
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)

return ok(rows)

proc getMessagesByMessageHashes(
s: PostgresDriver, hashes: string, maxPageSize: uint
s: PostgresDriver, hashes: string, maxPageSize: uint, requestId: string
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
Expand Down Expand Up @@ -797,7 +805,11 @@ proc getMessagesByMessageHashes(
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)

(await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr:
(
await s.readConnPool.pgQuery(
query = query, rowCallback = rowCallback, requestId = requestId
)
).isOkOr:
return err("failed to run query: " & $error)

debug "end of getMessagesByMessageHashes"
Expand All @@ -814,6 +826,7 @@ method getMessages*(
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"

Expand All @@ -825,8 +838,9 @@ method getMessages*(

if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return
await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize)
return await s.getMessagesByMessageHashes(
"'" & hexHashes.join("','") & "'", maxPageSize, requestId
)

if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
Expand All @@ -841,6 +855,7 @@ method getMessages*(
hexHashes.join(","),
maxPageSize,
ascendingOrder,
requestId,
)
else:
return await s.getMessageHashesPreparedStmt(
Expand All @@ -852,18 +867,19 @@ method getMessages*(
hexHashes.join(","),
maxPageSize,
ascendingOrder,
requestId,
)
else:
if includeData:
## We will run atypical query. In this case we don't use prepared statemets
return await s.getMessagesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder,
ascendingOrder, requestId,
)
else:
return await s.getMessageHashesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder,
ascendingOrder, requestId,
)

proc getStr(
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ method getMessages*(
hashes: seq[WakuMessageHash] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
var index = none(Index)

Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ method getMessages*(
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
if not includeData:
return s.db.selectMessageHashesByStoreQueryWithLimit(
Expand Down
2 changes: 2 additions & 0 deletions waku/waku_archive_legacy/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ proc findMessages*(
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
requestId = query.requestId,
)
).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))
Expand Down Expand Up @@ -230,6 +231,7 @@ proc findMessagesV2*(
endTime = query.endTime,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
requestId = query.requestId,
)
).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive_legacy/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type
hashes*: seq[WakuMessageHash]
pageSize*: uint
direction*: PagingDirection
requestId*: string

ArchiveResponse* = object
hashes*: seq[WakuMessageHash]
Expand Down
2 changes: 2 additions & 0 deletions waku/waku_archive_legacy/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ method getMessagesV2*(
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, deprecated, async.} =
discard

Expand All @@ -55,6 +56,7 @@ method getMessages*(
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} =
discard

Expand Down
Loading

0 comments on commit e8a49b7

Please sign in to comment.