From ed90534f1719f78a1582b5df2baa8592b73f4ab3 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Mon, 29 Aug 2022 20:51:12 +0200 Subject: [PATCH] feat(message-cache): make the topic cache generic --- tests/all_tests_v2.nim | 2 +- ...topic_cache.nim => test_message_cache.nim} | 49 ++++++----- waku/v2/node/config.nim | 2 +- waku/v2/node/message_cache.nim | 76 +++++++++++++++++ waku/v2/node/rest/relay/topic_cache.nim | 83 +++---------------- waku/v2/node/wakunode2_setup_rest.nim | 7 +- 6 files changed, 114 insertions(+), 105 deletions(-) rename tests/v2/{test_rest_relay_api_topic_cache.nim => test_message_cache.nim} (69%) create mode 100644 waku/v2/node/message_cache.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 68892b3c07..9141368fd2 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -9,6 +9,7 @@ import ./v2/test_waku_payload, ./v2/test_waku_swap, ./v2/test_utils_pagination, + ./v2/test_message_cache, ./v2/test_message_store_queue, ./v2/test_message_store_queue_pagination, ./v2/test_message_store_sqlite_query, @@ -18,7 +19,6 @@ import ./v2/test_rest_debug_api_serdes, ./v2/test_rest_debug_api, ./v2/test_rest_relay_api_serdes, - ./v2/test_rest_relay_api_topic_cache, ./v2/test_rest_relay_api, ./v2/test_peer_manager, ./v2/test_web3, # TODO remove it when rln-relay tests get finalized diff --git a/tests/v2/test_rest_relay_api_topic_cache.nim b/tests/v2/test_message_cache.nim similarity index 69% rename from tests/v2/test_rest_relay_api_topic_cache.nim rename to tests/v2/test_message_cache.nim index 081dc79c9f..d69f8dbf49 100644 --- a/tests/v2/test_rest_relay_api_topic_cache.nim +++ b/tests/v2/test_message_cache.nim @@ -1,17 +1,12 @@ {.used.} import - std/tables, - stew/byteutils, - stew/shims/net, - chronicles, + stew/[results, byteutils], testutils/unittests, - presto, - libp2p/crypto/crypto, - libp2p/protocols/pubsub/pubsub + chronicles import ../../waku/v2/protocol/waku_message, - ../../waku/v2/node/rest/relay/topic_cache + ../../waku/v2/node/message_cache proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage = @@ -22,12 +17,16 @@ proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMess timestamp: 2022 ) +type PubsubTopicString = string -suite "TopicCache": +type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)] + + +suite "MessageCache": test "subscribe to topic": ## Given - let testTopic = "test-pubsub-topic" - let cache = TopicCache.init() + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let cache = TestMessageCache.init() ## When cache.subscribe(testTopic) @@ -39,8 +38,8 @@ suite "TopicCache": test "unsubscribe from topic": ## Given - let testTopic = "test-pubsub-topic" - let cache = TopicCache.init() + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let cache = TestMessageCache.init() # Init cache content cache.subscribe(testTopic) @@ -55,9 +54,9 @@ suite "TopicCache": test "get messages of a subscribed topic": ## Given - let testTopic = "test-pubsub-topic" + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TopicCache.init() + let cache = TestMessageCache.init() # Init cache content cache.subscribe(testTopic) @@ -74,9 +73,9 @@ suite "TopicCache": test "get messages with clean flag shoud clear the messages cache": ## Given - let testTopic = "test-pubsub-topic" + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TopicCache.init() + let cache = TestMessageCache.init() # Init cache content cache.subscribe(testTopic) @@ -96,8 +95,8 @@ suite "TopicCache": test "get messages of a non-subscribed topic": ## Given - let testTopic = "test-pubsub-topic" - let cache = TopicCache.init() + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) + let cache = TestMessageCache.init() ## When let res = cache.getMessages(testTopic) @@ -110,9 +109,9 @@ suite "TopicCache": test "add messages to subscribed topic": ## Given - let testTopic = "test-pubsub-topic" + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TopicCache.init() + let cache = TestMessageCache.init() cache.subscribe(testTopic) @@ -127,9 +126,9 @@ suite "TopicCache": test "add messages to non-subscribed topic": ## Given - let testTopic = "test-pubsub-topic" + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testMessage = fakeWakuMessage() - let cache = TopicCache.init() + let cache = TestMessageCache.init() ## When cache.addMessage(testTopic, testMessage) @@ -143,14 +142,14 @@ suite "TopicCache": test "add messages beyond the capacity": ## Given - let testTopic = "test-pubsub-topic" + let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic")) let testMessages = @[ fakeWakuMessage(toBytes("MSG-1")), fakeWakuMessage(toBytes("MSG-2")), fakeWakuMessage(toBytes("MSG-3")) ] - let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2)) + let cache = TestMessageCache.init(capacity = 2) cache.subscribe(testTopic) ## When diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 198126a94e..a06cff7ad7 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -290,7 +290,7 @@ type defaultValue: 8645 name: "rest-port" }: uint16 - restRelayCacheCapaciy* {. + restRelayCacheCapacity* {. desc: "Capacity of the Relay REST API message cache.", defaultValue: 30 name: "rest-relay-cache-capacity" }: uint32 diff --git a/waku/v2/node/message_cache.nim b/waku/v2/node/message_cache.nim new file mode 100644 index 0000000000..8fff7ab896 --- /dev/null +++ b/waku/v2/node/message_cache.nim @@ -0,0 +1,76 @@ +{.push raises: [Defect].} + +import + std/[tables, sequtils], + stew/results, + chronicles, + chronos, + libp2p/protocols/pubsub +import + ../protocol/waku_message + +logScope: topics = "message_cache" + +const DefaultMessageCacheCapacity*: uint = 30 # Max number of messages cached per topic @TODO make this configurable + + +type MessageCacheResult*[T] = Result[T, cstring] + +type MessageCache*[K] = ref object + capacity: uint + table: Table[K, seq[WakuMessage]] + +func init*[K](T: type MessageCache[K], capacity=DefaultMessageCacheCapacity): T = + MessageCache[K]( + capacity: capacity, + table: initTable[K, seq[WakuMessage]]() + ) + + +proc isSubscribed*[K](t: MessageCache[K], topic: K): bool = + t.table.hasKey(topic) + +proc subscribe*[K](t: MessageCache[K], topic: K) = + if t.isSubscribed(topic): + return + t.table[topic] = @[] + +proc unsubscribe*[K](t: MessageCache[K], topic: K) = + if not t.isSubscribed(topic): + return + t.table.del(topic) + + +proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) = + if not t.isSubscribed(topic): + return + + # Make a copy of msgs for this topic to modify + var messages = t.table.getOrDefault(topic, @[]) + + if messages.len >= t.capacity.int: + debug "Topic cache capacity reached", topic=topic + # Message cache on this topic exceeds maximum. Delete oldest. + # TODO: this may become a bottle neck if called as the norm rather than + # exception when adding messages. Performance profile needed. + messages.delete(0,0) + + messages.add(msg) + + # Replace indexed entry with copy + t.table[topic] = messages + +proc clearMessages*[K](t: MessageCache[K], topic: K) = + if not t.isSubscribed(topic): + return + t.table[topic] = @[] + +proc getMessages*[K](t: MessageCache[K], topic: K, clear=false): MessageCacheResult[seq[WakuMessage]] = + if not t.isSubscribed(topic): + return err("Not subscribed to topic") + + let messages = t.table.getOrDefault(topic, @[]) + if clear: + t.clearMessages(topic) + + ok(messages) diff --git a/waku/v2/node/rest/relay/topic_cache.nim b/waku/v2/node/rest/relay/topic_cache.nim index b224d0850d..3a1a424288 100644 --- a/waku/v2/node/rest/relay/topic_cache.nim +++ b/waku/v2/node/rest/relay/topic_cache.nim @@ -1,98 +1,35 @@ {.push raises: [Defect].} import - std/[tables, sequtils], stew/results, chronicles, chronos, libp2p/protocols/pubsub import - ../../../protocol/waku_message + ../../../protocol/waku_message, + ../../message_cache logScope: topics = "rest_api_relay_topiccache" -const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable +export message_cache -type PubSubTopicString = string - -type TopicCacheResult*[T] = Result[T, cstring] - -type TopicCacheMessageHandler* = Topichandler - - -type TopicCacheConfig* = object - capacity*: int - -proc default*(T: type TopicCacheConfig): T = - TopicCacheConfig( - capacity: DEFAULT_TOPICCACHE_CAPACITY - ) - - -type TopicCache* = ref object - conf: TopicCacheConfig - table: Table[PubSubTopicString, seq[WakuMessage]] - -func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T = - TopicCache( - conf: conf, - table: initTable[PubSubTopicString, seq[WakuMessage]]() - ) - +##### TopicCache -proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool = - t.table.hasKey(topic) - -proc subscribe*(t: TopicCache, topic: PubSubTopicString) = - if t.isSubscribed(topic): - return - t.table[topic] = @[] - -proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) = - if not t.isSubscribed(topic): - return - t.table.del(topic) - - -proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) = - if not t.isSubscribed(topic): - return - - # Make a copy of msgs for this topic to modify - var messages = t.table.getOrDefault(topic, @[]) - - if messages.len >= t.conf.capacity: - debug "Topic cache capacity reached", topic=topic - # Message cache on this topic exceeds maximum. Delete oldest. - # TODO: this may become a bottle neck if called as the norm rather than - # exception when adding messages. Performance profile needed. - messages.delete(0,0) - - messages.add(msg) - - # Replace indexed entry with copy - t.table[topic] = messages +type PubSubTopicString = string -proc clearMessages*(t: TopicCache, topic: PubSubTopicString) = - if not t.isSubscribed(topic): - return - t.table[topic] = @[] +type TopicCacheResult*[T] = MessageCacheResult[T] -proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] = - if not t.isSubscribed(topic): - return err("Not subscribed to topic") +type TopicCache* = MessageCache[PubSubTopicString] - let messages = t.table.getOrDefault(topic, @[]) - if clear: - t.clearMessages(topic) - ok(messages) +##### Message handler +type TopicCacheMessageHandler* = Topichandler proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = - proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} = + let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} = trace "Topic handler triggered", topic=topic # Add message to current cache diff --git a/waku/v2/node/wakunode2_setup_rest.nim b/waku/v2/node/wakunode2_setup_rest.nim index fc63440eda..387cd28e59 100644 --- a/waku/v2/node/wakunode2_setup_rest.nim +++ b/waku/v2/node/wakunode2_setup_rest.nim @@ -9,8 +9,7 @@ import ./wakunode2, ./rest/server, ./rest/debug/debug_api, - ./rest/relay/[relay_api, - topic_cache] + ./rest/relay/[relay_api, topic_cache] logScope: @@ -30,9 +29,7 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: ## Relay REST API if conf.relay: - # TODO: Simplify topic cache object initialization - let relayCacheConfig = TopicCacheConfig(capacity: int(conf.restRelayCacheCapaciy)) - let relayCache = TopicCache.init(conf=relayCacheConfig) + let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) installRelayApiHandlers(server.router, node, relayCache) server.start()