Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(message-cache): make the topic cache generic #1097

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/all_tests_v2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import
./v2/test_waku_payload,
./v2/test_waku_swap,
./v2/test_utils_pagination,
./v2/test_message_cache,
./v2/test_message_store_queue,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_sqlite_query,
Expand All @@ -18,7 +19,6 @@ import
./v2/test_rest_debug_api_serdes,
./v2/test_rest_debug_api,
./v2/test_rest_relay_api_serdes,
./v2/test_rest_relay_api_topic_cache,
./v2/test_rest_relay_api,
./v2/test_peer_manager,
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
{.used.}

import
std/tables,
stew/byteutils,
stew/shims/net,
chronicles,
stew/[results, byteutils],
testutils/unittests,
presto,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
chronicles
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/rest/relay/topic_cache
../../waku/v2/node/message_cache


proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
Expand All @@ -22,12 +17,16 @@ proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMess
timestamp: 2022
)

type PubsubTopicString = string

suite "TopicCache":
type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]


suite "MessageCache":
test "subscribe to topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
cache.subscribe(testTopic)
Expand All @@ -39,8 +38,8 @@ suite "TopicCache":

test "unsubscribe from topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
Expand All @@ -55,9 +54,9 @@ suite "TopicCache":

test "get messages of a subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
Expand All @@ -74,9 +73,9 @@ suite "TopicCache":

test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

# Init cache content
cache.subscribe(testTopic)
Expand All @@ -96,8 +95,8 @@ suite "TopicCache":

test "get messages of a non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
let res = cache.getMessages(testTopic)
Expand All @@ -110,9 +109,9 @@ suite "TopicCache":

test "add messages to subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

cache.subscribe(testTopic)

Expand All @@ -127,9 +126,9 @@ suite "TopicCache":

test "add messages to non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
let cache = TestMessageCache.init()

## When
cache.addMessage(testTopic, testMessage)
Expand All @@ -143,14 +142,14 @@ suite "TopicCache":

test "add messages beyond the capacity":
## Given
let testTopic = "test-pubsub-topic"
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
fakeWakuMessage(toBytes("MSG-3"))
]

let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2))
let cache = TestMessageCache.init(capacity = 2)
cache.subscribe(testTopic)

## When
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ type
defaultValue: 8645
name: "rest-port" }: uint16

restRelayCacheCapaciy* {.
restRelayCacheCapacity* {.
desc: "Capacity of the Relay REST API message cache.",
defaultValue: 30
name: "rest-relay-cache-capacity" }: uint32
Expand Down
76 changes: 76 additions & 0 deletions waku/v2/node/message_cache.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{.push raises: [Defect].}

import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../protocol/waku_message

logScope: topics = "message_cache"

const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable


type MessageCacheResult*[T] = Result[T, cstring]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Typically, cstring is only used when interfacing via FFI.
Sometimes, our code uses cstring sometimes string. Imo, we should be consistent here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth noting that the Nim style guide recommends using cstring with Result: https://status-im.github.io/nim-style-guide/libraries.results.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding @kaiserd suggestions above: No strong opinions, but I agree with adding a comment to indicate that the MessageCache is designed to be a cache of messages against subscribed topics (pubsub- or contentTopic) and perhaps renaming TopicCache to PubsubTopicCache. Don't think it's necessary to be even more generic now, at least until we have identified a need for a very generic MessageCache.

Copy link
Contributor

@kaiserd kaiserd Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth noting that the Nim style guide recommends using cstring with Result: https://status-im.github.io/nim-style-guide/libraries.results.html

Good to know :). Thanks. (In one of my first PRs, I was asked why I don't use string as the error type because the context was Nim-only. I just copied and did not know this is in the style guide 😅 .)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah! Makes sense. You're absolutely right that we haven't been consistent with this (and it's only been recently that I started using cstring with Result after a closer reading of the style guide :) )


type MessageCache*[K] = ref object
capacity: uint
table: Table[K, seq[WakuMessage]]

func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T =
MessageCache[K](
capacity: capacity,
table: initTable[K, seq[WakuMessage]]()
)


proc isSubscribed*[K](t: MessageCache[K], topic: K): bool =
t.table.hasKey(topic)

proc subscribe*[K](t: MessageCache[K], topic: K) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]

proc unsubscribe*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table.del(topic)


proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) =
if not t.isSubscribed(topic):
return

# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])

if messages.len >= t.capacity.int:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)

messages.add(msg)

# Replace indexed entry with copy
t.table[topic] = messages

proc clearMessages*[K](t: MessageCache[K], topic: K) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]

proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")

let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)

ok(messages)
83 changes: 10 additions & 73 deletions waku/v2/node/rest/relay/topic_cache.nim
Original file line number Diff line number Diff line change
@@ -1,98 +1,35 @@
{.push raises: [Defect].}

import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../../../protocol/waku_message
../../../protocol/waku_message,
../../message_cache

logScope: topics = "rest_api_relay_topiccache"

const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable
export message_cache


type PubSubTopicString = string

type TopicCacheResult*[T] = Result[T, cstring]

type TopicCacheMessageHandler* = Topichandler


type TopicCacheConfig* = object
capacity*: int

proc default*(T: type TopicCacheConfig): T =
TopicCacheConfig(
capacity: DEFAULT_TOPICCACHE_CAPACITY
)


type TopicCache* = ref object
conf: TopicCacheConfig
table: Table[PubSubTopicString, seq[WakuMessage]]

func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T =
TopicCache(
conf: conf,
table: initTable[PubSubTopicString, seq[WakuMessage]]()
)

##### TopicCache

proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool =
t.table.hasKey(topic)

proc subscribe*(t: TopicCache, topic: PubSubTopicString) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]

proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table.del(topic)


proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) =
if not t.isSubscribed(topic):
return

# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])

if messages.len >= t.conf.capacity:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)

messages.add(msg)

# Replace indexed entry with copy
t.table[topic] = messages
type PubSubTopicString = string

proc clearMessages*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]
type TopicCacheResult*[T] = MessageCacheResult[T]

proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")
type TopicCache* = MessageCache[PubSubTopicString]

let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)

ok(messages)
##### Message handler

type TopicCacheMessageHandler* = Topichandler

proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =

proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} =
let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "Topic handler triggered", topic=topic

# Add message to current cache
Expand Down
7 changes: 2 additions & 5 deletions waku/v2/node/wakunode2_setup_rest.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import
./wakunode2,
./rest/server,
./rest/debug/debug_api,
./rest/relay/[relay_api,
topic_cache]
./rest/relay/[relay_api, topic_cache]


logScope:
Expand All @@ -30,9 +29,7 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:

## Relay REST API
if conf.relay:
# TODO: Simplify topic cache object initialization
let relayCacheConfig = TopicCacheConfig(capacity: int(conf.restRelayCacheCapaciy))
let relayCache = TopicCache.init(conf=relayCacheConfig)
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, node, relayCache)

server.start()
Expand Down