From d046fc8ec3c95caac3c8ca72fc837022ee1469e3 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 19 Aug 2024 19:03:46 +0200 Subject: [PATCH 1/2] ANALYZE messages query should be performed regularly for better performance --- waku/waku_archive/driver/builder.nim | 2 + .../postgres_driver/postgres_driver.nim | 42 ++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/waku/waku_archive/driver/builder.nim b/waku/waku_archive/driver/builder.nim index 1825477b5f..ddc447685f 100644 --- a/waku/waku_archive/driver/builder.nim +++ b/waku/waku_archive/driver/builder.nim @@ -102,6 +102,8 @@ proc new*( ## Hence, this should be run after the migration is completed. asyncSpawn driver.startPartitionFactory(onFatalErrorAction) + driver.startAnalyzeTableLoop() + info "waiting for a partition to be created" for i in 0 ..< 100: if driver.containsAnyPartition(): diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index e76806ae2a..7e14751a3f 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -25,6 +25,8 @@ type PostgresDriver* = ref object of ArchiveDriver partitionMngr: PartitionManager futLoopPartitionFactory: Future[void] + futLoopAnalyzeTable: Future[void] + const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = """INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload, @@ -997,6 +999,10 @@ method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = ## Cancel the partition factory loop s.futLoopPartitionFactory.cancelSoon() + ## Cancel analyze table loop + if not s.futLoopAnalyzeTable.isNil(): + s.futLoopAnalyzeTable.cancelSoon() + ## Close the database connection let writeCloseRes = await s.writeConnPool.close() let readCloseRes = await s.readConnPool.close() @@ -1030,6 +1036,7 @@ proc sleep*( return ok() +const EXPECTED_LOCK_ERROR* = "another waku instance is currently executing a migration" proc acquireDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = @@ -1052,7 +1059,7 @@ proc acquireDatabaseLock*( return err("error acquiring a lock: " & error) if locked == "f": - return err("another waku instance is currently executing a migration") + return err(EXPECTED_LOCK_ERROR) return ok() @@ -1508,3 +1515,36 @@ method deleteMessagesOlderThanTimestamp*( return err("error in deleteMessagesOlderThanTimestamp: " & $error) return ok() + +############################################ +## TODO: start splitting code better + +const AnalyzeQuery = "ANALYZE messages" +const AnalyzeTableLockId = 111111 ## An arbitrary and different lock id +const RunAnalyzeInterval = timer.days(1) + +proc analyzeTableLoop(self: PostgresDriver) {.async.} = + ## The database stats should be calculated regularly so that the planner + ## picks up the proper indexes and we have better query performance. + while true: + debug "analyzeTable lock db" + (await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr: + if error != EXPECTED_LOCK_ERROR: + error "failed to acquire lock in analyzeTable", error = error + await sleepAsync(RunAnalyzeInterval) + continue + + debug "analyzeTable start" + (await self.performWriteQuery(AnalyzeQuery)).isOkOr: + error "failed to run ANALYZE messages", error = error + + debug "analyzeTable unlock db" + (await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr: + error "failed to release lock analyzeTable", error = error + + debug "analyzeTable completed" + + await sleepAsync(RunAnalyzeInterval) + +proc startAnalyzeTableLoop*(self: PostgresDriver) = + self.futLoopAnalyzeTable = self.analyzeTableLoop From c7349b281f56c6a7dc3a0108e5f757596159d8b3 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 19 Aug 2024 19:11:49 +0200 Subject: [PATCH 2/2] postgres_driver: some better logs --- .../driver/postgres_driver/postgres_driver.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 7e14751a3f..c15941b213 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -1527,22 +1527,22 @@ proc analyzeTableLoop(self: PostgresDriver) {.async.} = ## The database stats should be calculated regularly so that the planner ## picks up the proper indexes and we have better query performance. while true: - debug "analyzeTable lock db" + debug "analyzeTableLoop lock db" (await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr: if error != EXPECTED_LOCK_ERROR: - error "failed to acquire lock in analyzeTable", error = error + error "failed to acquire lock in analyzeTableLoop", error = error await sleepAsync(RunAnalyzeInterval) continue - debug "analyzeTable start" + debug "analyzeTableLoop start analysis" (await self.performWriteQuery(AnalyzeQuery)).isOkOr: error "failed to run ANALYZE messages", error = error - debug "analyzeTable unlock db" + debug "analyzeTableLoop unlock db" (await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr: - error "failed to release lock analyzeTable", error = error + error "failed to release lock analyzeTableLoop", error = error - debug "analyzeTable completed" + debug "analyzeTableLoop analysis completed" await sleepAsync(RunAnalyzeInterval)