Skip to content

Commit

Permalink
feat(message-cache): make the topic cache generic
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorenzo Delgado committed Aug 30, 2022
1 parent 791ce6d commit ed90534
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 105 deletions.
2 changes: 1 addition & 1 deletion tests/all_tests_v2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import
./v2/test_waku_payload,
./v2/test_waku_swap,
./v2/test_utils_pagination,
./v2/test_message_cache,
./v2/test_message_store_queue,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_sqlite_query,
Expand All @@ -18,7 +19,6 @@ import
./v2/test_rest_debug_api_serdes,
./v2/test_rest_debug_api,
./v2/test_rest_relay_api_serdes,
./v2/test_rest_relay_api_topic_cache,
./v2/test_rest_relay_api,
./v2/test_peer_manager,
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
{.used.}

import
std/tables,
stew/byteutils,
stew/shims/net,
chronicles,
stew/[results, byteutils],
testutils/unittests,
presto,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
chronicles
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/rest/relay/topic_cache
../../waku/v2/node/message_cache


proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
Expand All @@ -22,12 +17,16 @@ proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMess
timestamp: 2022
)

type PubsubTopicString = string

suite "TopicCache":
type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]


suite "MessageCache":
test "subscribe to topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
cache.subscribe(testTopic)
Expand All @@ -39,8 +38,8 @@ suite "TopicCache":

test "unsubscribe from topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
Expand All @@ -55,9 +54,9 @@ suite "TopicCache":

test "get messages of a subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
Expand All @@ -74,9 +73,9 @@ suite "TopicCache":

test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
Expand All @@ -96,8 +95,8 @@ suite "TopicCache":

test "get messages of a non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
let res = cache.getMessages(testTopic)
Expand All @@ -110,9 +109,9 @@ suite "TopicCache":

test "add messages to subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

cache.subscribe(testTopic)

Expand All @@ -127,9 +126,9 @@ suite "TopicCache":

test "add messages to non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

## When
cache.addMessage(testTopic, testMessage)
Expand All @@ -143,14 +142,14 @@ suite "TopicCache":

test "add messages beyond the capacity":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
fakeWakuMessage(toBytes("MSG-3"))
]

let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2))
let cache = TestMessageCache.init(capacity = 2)
cache.subscribe(testTopic)

## When
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ type
defaultValue: 8645
name: "rest-port" }: uint16

restRelayCacheCapaciy* {.
restRelayCacheCapacity* {.
desc: "Capacity of the Relay REST API message cache.",
defaultValue: 30
name: "rest-relay-cache-capacity" }: uint32
Expand Down
76 changes: 76 additions & 0 deletions waku/v2/node/message_cache.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{.push raises: [Defect].}

import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../protocol/waku_message

logScope: topics = "message_cache"

const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable


type MessageCacheResult*[T] = Result[T, cstring]

type MessageCache*[K] = ref object
capacity: uint
table: Table[K, seq[WakuMessage]]

func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
MessageCache[K](
capacity: capacity,
table: initTable[K, seq[WakuMessage]]()
)


proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
t.table.hasKey(topic)

proc subscribe*[K](t: MessageCache[K], topic: K) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]

proc unsubscribe*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table.del(topic)


proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
if not t.isSubscribed(topic):
return

# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])

if messages.len >= t.capacity.int:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)

messages.add(msg)

# Replace indexed entry with copy
t.table[topic] = messages

proc clearMessages*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]

proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")

let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)

ok(messages)
83 changes: 10 additions & 73 deletions waku/v2/node/rest/relay/topic_cache.nim
Original file line number Diff line number Diff line change
@@ -1,98 +1,35 @@
{.push raises: [Defect].}

import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../../../protocol/waku_message
../../../protocol/waku_message,
../../message_cache

logScope: topics = "rest_api_relay_topiccache"

const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable
export message_cache


type PubSubTopicString = string

type TopicCacheResult*[T] = Result[T, cstring]

type TopicCacheMessageHandler* = Topichandler


type TopicCacheConfig* = object
capacity*: int

proc default*(T: type TopicCacheConfig): T =
TopicCacheConfig(
capacity: DEFAULT_TOPICCACHE_CAPACITY
)


type TopicCache* = ref object
conf: TopicCacheConfig
table: Table[PubSubTopicString, seq[WakuMessage]]

func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T =
TopicCache(
conf: conf,
table: initTable[PubSubTopicString, seq[WakuMessage]]()
)

##### TopicCache

proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool =
t.table.hasKey(topic)

proc subscribe*(t: TopicCache, topic: PubSubTopicString) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]

proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table.del(topic)


proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) =
if not t.isSubscribed(topic):
return

# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])

if messages.len >= t.conf.capacity:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)

messages.add(msg)

# Replace indexed entry with copy
t.table[topic] = messages
type PubSubTopicString = string

proc clearMessages*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]
type TopicCacheResult*[T] = MessageCacheResult[T]

proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")
type TopicCache* = MessageCache[PubSubTopicString]

let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)

ok(messages)
##### Message handler

type TopicCacheMessageHandler* = Topichandler

proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =

proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} =
let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "Topic handler triggered", topic=topic

# Add message to current cache
Expand Down
7 changes: 2 additions & 5 deletions waku/v2/node/wakunode2_setup_rest.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import
./wakunode2,
./rest/server,
./rest/debug/debug_api,
./rest/relay/[relay_api,
topic_cache]
./rest/relay/[relay_api, topic_cache]


logScope:
Expand All @@ -30,9 +29,7 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:

## Relay REST API
if conf.relay:
# TODO: Simplify topic cache object initialization
let relayCacheConfig = TopicCacheConfig(capacity: int(conf.restRelayCacheCapaciy))
let relayCache = TopicCache.init(conf=relayCacheConfig)
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, node, relayCache)

server.start()
Expand Down

0 comments on commit ed90534

Please sign in to comment.