Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: continue pubsub/content types started in #1352 #1362

Merged
merged 2 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions tests/v2/test_message_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@ import
./testlib/common



type PubsubTopicString = string

type TestMessageCache = MessageCache[(PubsubTopicString, ContentTopic)]

type TestMessageCache = MessageCache[(PubsubTopic, ContentTopic)]

suite "MessageCache":
test "subscribe to topic":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
Expand All @@ -32,7 +28,7 @@ suite "MessageCache":

test "unsubscribe from topic":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

# Init cache content
Expand All @@ -48,7 +44,7 @@ suite "MessageCache":

test "get messages of a subscribed topic":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

Expand All @@ -67,7 +63,7 @@ suite "MessageCache":

test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

Expand All @@ -89,7 +85,7 @@ suite "MessageCache":

test "get messages of a non-subscribed topic":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let cache = TestMessageCache.init()

## When
Expand All @@ -103,7 +99,7 @@ suite "MessageCache":

test "add messages to subscribed topic":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

Expand All @@ -120,7 +116,7 @@ suite "MessageCache":

test "add messages to non-subscribed topic":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessage = fakeWakuMessage()
let cache = TestMessageCache.init()

Expand All @@ -136,7 +132,7 @@ suite "MessageCache":

test "add messages beyond the capacity":
## Given
let testTopic = ("test-pubsub-topic", ContentTopic("test-content-topic"))
let testTopic = (PubsubTopic("test-pubsub-topic"), ContentTopic("test-content-topic"))
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
Expand Down
23 changes: 11 additions & 12 deletions tests/v2/test_rest_relay_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ suite "REST API - Relay":
restServer.start()

let pubSubTopics = @[
PubSubTopicString("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3")
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3")
]

# When
Expand Down Expand Up @@ -94,10 +94,10 @@ suite "REST API - Relay":
restServer.start()

let pubSubTopics = @[
PubSubTopicString("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3"),
PubSubTopicString("pubsub-topic-y")
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3"),
PubSubTopic("pubsub-topic-y")
]

# When
Expand Down Expand Up @@ -190,23 +190,22 @@ suite "REST API - Relay":
restServer.start()

let client = newRestHttpClient(initTAddress(restAddress, restPort))
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")

# At this stage the node is only subscribed to the default topic
require(PubSub(node.wakuRelay).topics.len == 1)


# When
let newTopics = @[
PubSubTopicString("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3")
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3")
]
discard await client.relayPostSubscriptionsV1(newTopics)

let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
payload: Base64String.encode("TEST-PAYLOAD"),
contentTopic: some(ContentTopicString(defaultContentTopic)),
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022))
))

Expand Down
7 changes: 4 additions & 3 deletions tests/v2/test_rest_relay_api_serdes.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import
import
../../waku/v2/node/rest/serdes,
../../waku/v2/node/rest/base64,
../../waku/v2/node/rest/relay/api_types
../../waku/v2/node/rest/relay/api_types,
../../waku/v2/protocol/waku_message


suite "Relay API - serialization":
Expand Down Expand Up @@ -36,8 +37,8 @@ suite "Relay API - serialization":
# Given
let payload = Base64String.encode("MESSAGE")
let data = RelayWakuMessage(
payload: payload,
contentTopic: none(ContentTopicString),
payload: payload,
contentTopic: none(ContentTopic),
version: none(Natural),
timestamp: none(int64)
)
Expand Down
10 changes: 2 additions & 8 deletions waku/v2/node/jsonrpc/jsonrpc_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,18 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))

proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
const defaultCT = ContentTopic("/waku/2/default-content/proto")
var t: Timestamp
if relayMessage.timestamp.isSome:
t = relayMessage.timestamp.get
else:
# incoming WakuRelayMessages with no timestamp will get 0 timestamp
t = Timestamp(0)
WakuMessage(payload: relayMessage.payload,
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version,
timestamp: t)

proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref HmacDrbgContext, symkey: Option[SymKey], pubKey: Option[keys.PublicKey]): WakuMessage =
# @TODO global definition for default content topic
const defaultCT = ContentTopic("/waku/2/default-content/proto")

let payload = Payload(payload: relayMessage.payload,
dst: pubKey,
symkey: symkey)
Expand All @@ -72,13 +68,11 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Hm
t = Timestamp(0)

WakuMessage(payload: payload.encode(version, rng[]).get(),
contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT,
contentTopic: relayMessage.contentTopic.get(DefaultContentTopic),
version: version,
timestamp: t)

proc toWakuRelayMessage*(message: WakuMessage, symkey: Option[SymKey], privateKey: Option[keys.PrivateKey]): WakuRelayMessage =
# @TODO global definition for default content topic

let
keyInfo = if symkey.isSome(): KeyInfo(kind: Symmetric, symKey: symkey.get())
elif privateKey.isSome(): KeyInfo(kind: Asymmetric, privKey: privateKey.get())
Expand Down
35 changes: 13 additions & 22 deletions waku/v2/node/rest/relay/api_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ import

#### Types

type
PubSubTopicString* = distinct string
ContentTopicString* = distinct string

type RelayWakuMessage* = object
payload*: Base64String
contentTopic*: Option[ContentTopicString]
contentTopic*: Option[ContentTopic]
version*: Option[Natural]
timestamp*: Option[int64]

Expand All @@ -33,25 +29,24 @@ type
RelayPostMessagesRequest* = RelayWakuMessage

type
RelayPostSubscriptionsRequest* = seq[PubSubTopicString]
RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString]
RelayPostSubscriptionsRequest* = seq[PubSubTopic]
RelayDeleteSubscriptionsRequest* = seq[PubSubTopic]


#### Type conversion

proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
RelayWakuMessage(
payload: base64.encode(Base64String, msg.payload),
contentTopic: some(ContentTopicString(msg.contentTopic)),
contentTopic: some(msg.contentTopic),
version: some(Natural(msg.version)),
timestamp: some(msg.timestamp)
)

proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] =
const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto")
let
payload = ?msg.payload.decode()
contentTopic = ContentTopic(msg.contentTopic.get(defaultContentTopic))
contentTopic = msg.contentTopic.get(DefaultContentTopic)
version = uint32(msg.version.get(version))
timestamp = msg.timestamp.get(0)

Expand All @@ -64,13 +59,9 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))

proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString)
proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))

proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))
writer.writeValue(string(topic))

proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
{.raises: [IOError, Defect].} =
Expand All @@ -88,19 +79,19 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
{.raises: [SerializationError, IOError, Defect].} =
value = Base64String(reader.readValue(string))

proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString)
proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic)
{.raises: [SerializationError, IOError, Defect].} =
value = PubSubTopicString(reader.readValue(string))
pubsubTopic = PubSubTopic(reader.readValue(string))

proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString)
proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic)
{.raises: [SerializationError, IOError, Defect].} =
value = ContentTopicString(reader.readValue(string))
contentTopic = ContentTopic(reader.readValue(string))

proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
{.raises: [SerializationError, IOError, Defect].} =
var
payload = none(Base64String)
contentTopic = none(ContentTopicString)
contentTopic = none(ContentTopic)
version = none(Natural)
timestamp = none(int64)

Expand All @@ -116,7 +107,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
of "payload":
payload = some(reader.readValue(Base64String))
of "contentTopic":
contentTopic = some(reader.readValue(ContentTopicString))
contentTopic = some(reader.readValue(ContentTopic))
of "version":
version = some(reader.readValue(Natural))
of "timestamp":
Expand Down
3 changes: 2 additions & 1 deletion waku/v2/node/rest/relay/relay_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
presto/[route, client, common]
import
../../waku_node,
../../../protocol/waku_message,
../serdes,
../utils,
./api_types,
Expand Down Expand Up @@ -157,7 +158,7 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache

#### Client

proc encodeBytes*(value: seq[PubSubTopicString],
proc encodeBytes*(value: seq[PubSubTopic],
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
Expand Down
14 changes: 6 additions & 8 deletions waku/v2/node/rest/relay/topic_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ export message_cache

##### TopicCache

type PubSubTopicString = string

type TopicCacheResult*[T] = MessageCacheResult[T]

type TopicCache* = MessageCache[PubSubTopicString]
type TopicCache* = MessageCache[PubSubTopic]


##### Message handler
Expand All @@ -33,17 +31,17 @@ type TopicCacheMessageHandler* = Topichandler

proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =

let handler = proc(topic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "Topic handler triggered", topic=topic
let handler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "PubsubTopic handler triggered", pubsubTopic=pubsubTopic

# Add message to current cache
let msg = WakuMessage.decode(data)
if msg.isErr():
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
debug "WakuMessage received but failed to decode", msg=msg, pubsubTopic=pubsubTopic
# TODO: handle message decode failure
return

trace "WakuMessage received", msg=msg, topic=topic
cache.addMessage(PubSubTopicString(topic), msg.get())
trace "WakuMessage received", msg=msg, pubsubTopic=pubsubTopic
cache.addMessage(PubSubTopic(pubsubTopic), msg.get())

handler
4 changes: 2 additions & 2 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} =
info "starting relay"

# PubsubTopic subscriptions
for topic in node.wakuRelay.defaultTopics:
for topic in node.wakuRelay.defaultPubsubTopics:
node.subscribe(topic, none(TopicHandler))

# Resume previous relay connections
Expand Down Expand Up @@ -393,7 +393,7 @@ proc mountRelay*(node: WakuNode,

## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay.defaultTopics = concat(@[DefaultPubsubTopic], topics)
wakuRelay.defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics)

## Add peer exchange handler
if peerExchangeHandler.isSome():
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/waku_message.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type
ContentTopic* = string

const
DefaultPubsubTopic*: PubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic*: ContentTopic = "/waku/2/default-content/proto"
DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
DefaultContentTopic*: ContentTopic = ContentTopic("/waku/2/default-content/proto")
Comment on lines +30 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant and not necessary, the types are not distinct string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, thought it was nice for reference, so that people declaring topics they would see this and follow the pattern:

let x = PubsubTopic("x/y/z/a")

instead of let y = "x/y/z/a" which I would say its bad practise?

no strong opinion about it, though :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is redundant since the variable is already type annotated:

 DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
 #~~~~~~~~~~~~~~~~~~~~~~~~~^



type WakuMessage* = object
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/waku_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const

type
WakuRelay* = ref object of GossipSub
defaultTopics*: seq[PubsubTopic] # Default configured PubSub topics
defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics

method init*(w: WakuRelay) =
debug "init WakuRelay"
Expand Down
Loading