Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Postgres partition implementation #2506

Merged
merged 25 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0b4681c
new proc to foster different size retention policy implementations
Ivansete-status Feb 17, 2024
862b8ea
postgres: first step to implement partition management
Ivansete-status Feb 17, 2024
b18c535
postgres_driver: better partition management
Ivansete-status Feb 19, 2024
43b4543
postgres_driver: use of times.now().toTime().toUnix() instead of Mome…
Ivansete-status Feb 21, 2024
24b4a1b
new proc to foster different size retention policy implementations
Ivansete-status Feb 17, 2024
757868c
waku_archive: fix signatures in decreaseDatabaseSize methods
Ivansete-status Feb 21, 2024
c5e0abc
Enhancements around partition management logic
Ivansete-status Feb 22, 2024
d8e0301
queue_driver: remove duplicated method from conflict resolution in re…
Ivansete-status Mar 4, 2024
2113fc7
postgres_driver: add final details, lost in rebase, for partition man…
Ivansete-status Mar 4, 2024
80fc4bd
postgres migrations: set new version to 2
Ivansete-status Mar 4, 2024
d8e25e0
test_driver_postgres: use of assert instead of require and avoid usin…
Ivansete-status Mar 5, 2024
80496c5
postgres_driver: better implementation of the reset method with parti…
Ivansete-status Mar 5, 2024
2f5c24b
waku_archive: wait for a postgres partition to be created before proc…
Ivansete-status Mar 5, 2024
d77f76f
Remove createMessageTable, init, and deleteMessageTable procs
Ivansete-status Mar 5, 2024
1665fa5
postgres: ensure we use the version 15.4 in tests
Ivansete-status Mar 5, 2024
d204a02
postgres_driver.nim: enhance debug logs partition addition
Ivansete-status Mar 5, 2024
39183a0
ci.yml: ensure logs are printed without colors
Ivansete-status Mar 5, 2024
f4d9a81
postgres_driver: create partition only if it doesn't exist
Ivansete-status Mar 5, 2024
58388d5
postgres_driver: log enhancements
Ivansete-status Mar 5, 2024
b003dea
postgres_driver: starting the loop factory in an asynchronous task
Ivansete-status Mar 5, 2024
21785e3
postgres_driver: minor cleanup and minor comments enhancements
Ivansete-status Mar 5, 2024
76973a4
postgres_driver: avoid truncating partition because it suffice with drop
Ivansete-status Mar 6, 2024
c08e37a
postgres_driver: move retention policy log so that it is not misleading
Ivansete-status Mar 6, 2024
dce24e5
postgres_driver: log partition name and size when removing a partition
Ivansete-status Mar 6, 2024
62e7b3e
partitions_manager.nim: fix log typo
Ivansete-status Mar 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ concurrency:
env:
NPROC: 2
MAKEFLAGS: "-j${NPROC}"
NIMFLAGS: "--parallelBuild:${NPROC}"
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"

jobs:
changes: # changes detection
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
fi

if [ ${{ runner.os }} == "Linux" ]; then
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
fi

make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=1 test testwakunode2
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
db:
image: postgres:9.6-alpine
image: postgres:15.4-alpine3.18
restart: always
environment:
POSTGRES_PASSWORD: test123
Expand Down
52 changes: 24 additions & 28 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[sequtils,times,options],
std/[sequtils,options],
testutils/unittests,
chronos
import
Expand All @@ -13,8 +13,6 @@ import
../testlib/testasync,
../testlib/postgres

proc now():int64 = getTime().toUnix()

proc computeTestCursor(pubsubTopic: PubsubTopic,
message: WakuMessage):
ArchiveCursor =
Expand Down Expand Up @@ -56,7 +54,7 @@ suite "Postgres driver":
# Actually, the diff randomly goes between 1 and 2 seconds.
# although in theory it should spend 1s because we establish 100
# connections and we spawn 100 tasks that spend ~1s each.
require diff < 20
assert diff < 20_000_000_000

asyncTest "Insert a message":
const contentTopic = "test-content-topic"
Expand All @@ -69,14 +67,14 @@ suite "Postgres driver":
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()
require:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
actualMsg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic and
toHex(computedDigest.data) == toHex(digest) and
toHex(actualMsg.payload) == toHex(msg.payload)

assert storedMsg.len == 1

let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0]
assert actualMsg.contentTopic == contentTopic
assert pubsubTopic == DefaultPubsubTopic
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
Expand All @@ -96,31 +94,30 @@ suite "Postgres driver":

let countMessagesRes = await driver.getMessagesCount()

require countMessagesRes.isOk() and countMessagesRes.get() == 2
assert countMessagesRes.isOk(), $countMessagesRes.error
assert countMessagesRes.get() == 2

var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])

require messagesRes.isOk()
require messagesRes.get().len == 1
assert messagesRes.isOk(), $messagesRes.error
assert messagesRes.get().len == 1

# Get both content topics, check ordering
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2])
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 2 and
messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic1

# Descending order
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
ascendingOrder = false)
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 2 and
messagesRes.get()[0][1].contentTopic == contentTopic2
assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic2

# cursor
# Get both content topics
Expand All @@ -130,25 +127,24 @@ suite "Postgres driver":
cursor = some(
computeTestCursor(pubsubTopic1,
messagesRes.get()[1][1])))
require messagesRes.isOk()
require messagesRes.get().len == 1
assert messagesRes.isOk()
assert messagesRes.get().len == 1

# Get both content topics but one pubsub topic
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
pubsubTopic = some(pubsubTopic1))
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 1 and
messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get().len == 1
assert messagesRes.get()[0][1].contentTopic == contentTopic1

# Limit
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
maxPageSize = 1)
assert messagesRes.isOk(), messagesRes.error
require messagesRes.get().len == 1
assert messagesRes.get().len == 1

asyncTest "Insert true duplicated messages":
# Validates that two completely equal messages can not be stored.
Expand All @@ -164,5 +160,5 @@ suite "Postgres driver":

putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()
assert not putRes.isOk()

3 changes: 2 additions & 1 deletion waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method decreaseDatabaseSize*(driver: ArchiveDriver,
targetSizeInBytes: int64):
targetSizeInBytes: int64,
forceRemoval: bool = false):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method close*(driver: ArchiveDriver):
Expand Down
13 changes: 13 additions & 0 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ proc new*(T: type ArchiveDriver,
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)

## This should be started once we make sure the 'messages' table exists
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)
Ivansete-status marked this conversation as resolved.
Show resolved Hide resolved

info "waiting for a partition to be created"
for i in 0..<100:
if driver.containsAnyPartition():
break
await sleepAsync(chronos.milliseconds(100))

if not driver.containsAnyPartition():
onFatalErrorAction("a partition could not be created")

return ok(driver)

else:
Expand Down
11 changes: 9 additions & 2 deletions waku/waku_archive/driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import ./postgres_driver/postgres_driver
import
./postgres_driver/postgres_driver,
./postgres_driver/partitions_manager,
./postgres_driver/postgres_healthcheck

export
postgres_driver,
partitions_manager,
postgres_healthcheck

export postgres_driver
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 1 # increase this when there is an update in the database schema
const SchemaVersion* = 2 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
105 changes: 105 additions & 0 deletions waku/waku_archive/driver/postgres_driver/partitions_manager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@

## This module is aimed to handle the creation and truncation of partition tables
## in order to limit the space occupied in disk by the database.
##
## The created partitions are referenced by the 'storedAt' field.
##

import
std/deques
import
chronos,
chronicles

logScope:
topics = "waku archive partitions_manager"

## The time range has seconds resolution
type TimeRange* = tuple[beginning: int64, `end`: int64]

type
Partition = object
name: string
timeRange: TimeRange

PartitionManager* = ref object
partitions: Deque[Partition] # FIFO of partition table names. The first is the oldest partition

proc new*(T: type PartitionManager): T =
return PartitionManager()

proc getPartitionFromDateTime*(self: PartitionManager,
targetMoment: int64):
Result[Partition, string] =
## Returns the partition name that might store a message containing the passed timestamp.
## In order words, it simply returns the partition name which contains the given timestamp.
## targetMoment - represents the time of interest, measured in seconds since epoch.

if self.partitions.len == 0:
return err("There are no partitions")

for partition in self.partitions:
let timeRange = partition.timeRange

let beginning = timeRange.beginning
let `end` = timeRange.`end`

if beginning <= targetMoment and targetMoment < `end`:
return ok(partition)

return err("Couldn't find a partition table for given time: " & $targetMoment)

proc getNewestPartition*(self: PartitionManager): Result[Partition, string] =
if self.partitions.len == 0:
return err("there are no partitions allocated")

let newestPartition = self.partitions.peekLast
return ok(newestPartition)

proc getOldestPartition*(self: PartitionManager): Result[Partition, string] =
if self.partitions.len == 0:
return err("there are no partitions allocated")

let oldestPartition = self.partitions.peekFirst
return ok(oldestPartition)

proc addPartitionInfo*(self: PartitionManager,
partitionName: string,
beginning: int64,
`end`: int64) =
## The given partition range has seconds resolution.
## We just store information of the new added partition merely to keep track of it.
let partitionInfo = Partition(name: partitionName, timeRange: (beginning, `end`))
trace "Adding partition info"
self.partitions.addLast(partitionInfo)

proc removeOldestPartitionName*(self: PartitionManager) =
## Simply removed the partition from the tracked/known partitions queue.
## Just remove it and ignore it.
discard self.partitions.popFirst()

proc isEmpty*(self: PartitionManager): bool =
return self.partitions.len == 0

proc getLastMoment*(partition: Partition): int64 =
## Considering the time range covered by the partition, this
## returns the `end` time (number of seconds since epoch) of such range.
let lastTimeInSec = partition.timeRange.`end`
return lastTimeInSec

proc containsMoment*(partition: Partition, time: int64): bool =
## Returns true if the given moment is contained within the partition window,
## 'false' otherwise.
## time - number of seconds since epoch
if partition.timeRange.beginning <= time and
time < partition.timeRange.`end`:
return true

return false

proc getName*(partition: Partition): string =
return partition.name

func `==`*(a, b: Partition): bool {.inline.} =
return a.name == b.name

Loading
Loading