Skip to content

Commit

Permalink
Merge c7349b2 into e8bce67
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Aug 19, 2024
2 parents e8bce67 + c7349b2 commit eea8972
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
2 changes: 2 additions & 0 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ proc new*(
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)

driver.startAnalyzeTableLoop()

info "waiting for a partition to be created"
for i in 0 ..< 100:
if driver.containsAnyPartition():
Expand Down
42 changes: 41 additions & 1 deletion waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type PostgresDriver* = ref object of ArchiveDriver
partitionMngr: PartitionManager
futLoopPartitionFactory: Future[void]

futLoopAnalyzeTable: Future[void]

const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition =
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
Expand Down Expand Up @@ -997,6 +999,10 @@ method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
## Cancel the partition factory loop
s.futLoopPartitionFactory.cancelSoon()

## Cancel analyze table loop
if not s.futLoopAnalyzeTable.isNil():
s.futLoopAnalyzeTable.cancelSoon()

## Close the database connection
let writeCloseRes = await s.writeConnPool.close()
let readCloseRes = await s.readConnPool.close()
Expand Down Expand Up @@ -1030,6 +1036,7 @@ proc sleep*(

return ok()

const EXPECTED_LOCK_ERROR* = "another waku instance is currently executing a migration"
proc acquireDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
Expand All @@ -1052,7 +1059,7 @@ proc acquireDatabaseLock*(
return err("error acquiring a lock: " & error)

if locked == "f":
return err("another waku instance is currently executing a migration")
return err(EXPECTED_LOCK_ERROR)

return ok()

Expand Down Expand Up @@ -1508,3 +1515,36 @@ method deleteMessagesOlderThanTimestamp*(
return err("error in deleteMessagesOlderThanTimestamp: " & $error)

return ok()

############################################
## TODO: start splitting code better

const AnalyzeQuery = "ANALYZE messages"
const AnalyzeTableLockId = 111111 ## An arbitrary and different lock id
const RunAnalyzeInterval = timer.days(1)

proc analyzeTableLoop(self: PostgresDriver) {.async.} =
## The database stats should be calculated regularly so that the planner
## picks up the proper indexes and we have better query performance.
while true:
debug "analyzeTableLoop lock db"
(await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr:
if error != EXPECTED_LOCK_ERROR:
error "failed to acquire lock in analyzeTableLoop", error = error
await sleepAsync(RunAnalyzeInterval)
continue

debug "analyzeTableLoop start analysis"
(await self.performWriteQuery(AnalyzeQuery)).isOkOr:
error "failed to run ANALYZE messages", error = error

debug "analyzeTableLoop unlock db"
(await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr:
error "failed to release lock analyzeTableLoop", error = error

debug "analyzeTableLoop analysis completed"

await sleepAsync(RunAnalyzeInterval)

proc startAnalyzeTableLoop*(self: PostgresDriver) =
self.futLoopAnalyzeTable = self.analyzeTableLoop

0 comments on commit eea8972

Please sign in to comment.