Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: ANALYZE messages query should be performed regularly #2986

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ proc new*(
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)

driver.startAnalyzeTableLoop()

info "waiting for a partition to be created"
for i in 0 ..< 100:
if driver.containsAnyPartition():
Expand Down
42 changes: 41 additions & 1 deletion waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type PostgresDriver* = ref object of ArchiveDriver
partitionMngr: PartitionManager
futLoopPartitionFactory: Future[void]

futLoopAnalyzeTable: Future[void]

const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition =
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
Expand Down Expand Up @@ -997,6 +999,10 @@ method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
## Cancel the partition factory loop
s.futLoopPartitionFactory.cancelSoon()

## Cancel analyze table loop
if not s.futLoopAnalyzeTable.isNil():
s.futLoopAnalyzeTable.cancelSoon()

## Close the database connection
let writeCloseRes = await s.writeConnPool.close()
let readCloseRes = await s.readConnPool.close()
Expand Down Expand Up @@ -1030,6 +1036,7 @@ proc sleep*(

return ok()

const EXPECTED_LOCK_ERROR* = "another waku instance is currently executing a migration"
proc acquireDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
Expand All @@ -1052,7 +1059,7 @@ proc acquireDatabaseLock*(
return err("error acquiring a lock: " & error)

if locked == "f":
return err("another waku instance is currently executing a migration")
return err(EXPECTED_LOCK_ERROR)

return ok()

Expand Down Expand Up @@ -1508,3 +1515,36 @@ method deleteMessagesOlderThanTimestamp*(
return err("error in deleteMessagesOlderThanTimestamp: " & $error)

return ok()

############################################
## TODO: start splitting code better

const AnalyzeQuery = "ANALYZE messages"
const AnalyzeTableLockId = 111111 ## An arbitrary and different lock id
const RunAnalyzeInterval = timer.days(1)

proc analyzeTableLoop(self: PostgresDriver) {.async.} =
## The database stats should be calculated regularly so that the planner
## picks up the proper indexes and we have better query performance.
while true:
debug "analyzeTableLoop lock db"
(await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr:
if error != EXPECTED_LOCK_ERROR:
error "failed to acquire lock in analyzeTableLoop", error = error
await sleepAsync(RunAnalyzeInterval)
continue

debug "analyzeTableLoop start analysis"
(await self.performWriteQuery(AnalyzeQuery)).isOkOr:
error "failed to run ANALYZE messages", error = error

debug "analyzeTableLoop unlock db"
(await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr:
error "failed to release lock analyzeTableLoop", error = error

debug "analyzeTableLoop analysis completed"

await sleepAsync(RunAnalyzeInterval)

proc startAnalyzeTableLoop*(self: PostgresDriver) =
self.futLoopAnalyzeTable = self.analyzeTableLoop
Loading