diff --git a/.gitignore b/.gitignore index 7caf5e114e..0ce04e8094 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,9 @@ node_modules/ # Ignore Jetbrains IDE files .idea/ +# ignore vscode files +.vscode/ + # RLN / keystore rlnKeystore.json *.tar.gz diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 3b96a4b97b..9a97ecf2ca 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -45,6 +45,7 @@ import ../../waku/v2/node/rest/debug/handlers as rest_debug_api, ../../waku/v2/node/rest/relay/handlers as rest_relay_api, ../../waku/v2/node/rest/relay/topic_cache, + ../../waku/v2/node/rest/filter/handlers as rest_filter_api, ../../waku/v2/node/rest/store/handlers as rest_store_api, ../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api, ../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api, @@ -548,6 +549,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) installRelayApiHandlers(server.router, app.node, relayCache) + ## Filter REST API + if conf.filter: + let filterCache = rest_filter_api.MessageCache.init(capacity=30) + installFilterApiHandlers(server.router, app.node, filterCache) + ## Store REST API installStoreApiHandlers(server.router, app.node) diff --git a/tests/v2/wakunode_rest/test_rest_filter.nim b/tests/v2/wakunode_rest/test_rest_filter.nim new file mode 100644 index 0000000000..aaa5ac9233 --- /dev/null +++ b/tests/v2/wakunode_rest/test_rest_filter.nim @@ -0,0 +1,205 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto +import + ../../waku/v2/node/message_cache, + ../../waku/common/base64, + ../../waku/v2/waku_core, + ../../waku/v2/waku_node, + ../../waku/v2/node/peer_manager, + ../../waku/v2/waku_filter, + ../../waku/v2/node/rest/server, + ../../waku/v2/node/rest/client, + ../../waku/v2/node/rest/responses, + ../../waku/v2/node/rest/filter/types, + ../../waku/v2/node/rest/filter/handlers as filter_api, + ../../waku/v2/node/rest/filter/client as filter_api_client, + ../../waku/v2/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode + + +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + + +suite "Waku v2 Rest API - Filter": + asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + let messageCache = filter_api.MessageCache.init(capacity=30) + installFilterPostSubscriptionsV1Handler(restServer.router, node, messageCache) + + restServer.start() + + await node.mountFilter() + await node.mountFilterClient() + + # let remotePeerInfo = PeerInfo.init(node.peerId, node.multiaddrs) + # node.peerManager.addServicePeer(remotePeerInfo, WakuStoreCodec) + + + let key = generateEcdsaKey() + var peerSwitch = newStandardSwitch(some(key)) + await peerSwitch.start() + + peerSwitch.mount(node.wakuFilter) + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo() + let fullAddr = $remotePeerInfo.addrs[0] & + "/p2p/" & $remotePeerInfo.peerId + + + node.peerManager.addServicePeer(remotePeerInfo, + WakuFilterCodec) + + let contentFilters = @[DefaultContentTopic + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ] + + # When + + let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, pubsubTopic: DefaultPubsubTopic) + let response = await client.filterPostSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + messageCache.isSubscribed(DefaultContentTopic) + messageCache.isSubscribed("2") + messageCache.isSubscribed("3") + messageCache.isSubscribed("4") + + #check: + # TODO check wakuFilter subscriptions as count. + # concening that filter cache handles subscriptions by pubsubTopic+seq[contentTopic] conjunction. + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + # asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": + # # Given + # let node = testWakuNode() + # await node.start() + # await node.mountRelay() + + # let restPort = Port(58012) + # let restAddress = ValidIpAddress.init("0.0.0.0") + # let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + # let messageCache = filter_api.messageCache.init() + # messageCache.subscribe("1") + # messageCache.subscribe("2") + # messageCache.subscribe("3") + # messageCache.subscribe("4") + + # installFilterDeleteSubscriptionsV1Handler(restServer.router, node, messageCache) + # restServer.start() + + # let contentFilters = @[ContentTopic("1") + # ,ContentTopic("2") + # ,ContentTopic("3") + # ,ContentTopic("4") + # ] + + # # When + # let client = newRestHttpClient(initTAddress(restAddress, restPort)) + # let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, pubsubTopic: DefaultPubsubTopic) + # let response = await client.filterDeleteSubscriptionsV1(requestBody) + + # # Then + # check: + # response.status == 200 + # $response.contentType == $MIMETYPE_TEXT + # response.data == "OK" + + # check: + # not messageCache.isSubscribed("1") + # not messageCache.isSubscribed("2") + # not messageCache.isSubscribed("3") + # messageCache.isSubscribed("4") + + # await restServer.stop() + # await restServer.closeWait() + # await node.stop() + + + asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58013) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + let pubSubTopic = "/waku/2/default-waku/proto" + let contentTopic = ContentTopic( "content-topic-x" ) + + let messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + ] + + let messageCache = filter_api.MessageCache.init(capacity = 30) + + messageCache.subscribe(contentTopic) + for msg in messages: + messageCache.addMessage(contentTopic, msg) + + installFilterGetMessagesV1Handler(restServer.router, node, messageCache) + restServer.start() + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let response = await client.filterGetMessagesV1(contentTopic) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: FilterWakuMessage) -> bool: + msg.payload == base64.encode("TEST-1") and + msg.contentTopic.get().string == "content-topic-x" and + msg.version.get() == 2 and + msg.timestamp.get() != Timestamp(0) + + + check: + messageCache.isSubscribed(contentTopic) + messageCache.getMessages(contentTopic).tryGet().len == 0 + + await restServer.stop() + await restServer.closeWait() + await node.stop() diff --git a/waku/v2/node/rest/filter/client.nim b/waku/v2/node/rest/filter/client.nim new file mode 100644 index 0000000000..8cf022c5ed --- /dev/null +++ b/waku/v2/node/rest/filter/client.nim @@ -0,0 +1,65 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sets, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../waku_core, + ../serdes, + ../responses, + ./types + +export types + + +logScope: + topics = "waku node rest client" + + +proc encodeBytes*(value: FilterSubscriptionsRequest, + contentType: string): RestResult[seq[byte]] = + + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + + +proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported response contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) + return ok(decoded) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterGetMessagesV1*(contentTopic: string): RestResponse[FilterGetMessagesResponse] {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/v2/node/rest/filter/handlers.nim b/waku/v2/node/rest/filter/handlers.nim new file mode 100644 index 0000000000..b5a309772d --- /dev/null +++ b/waku/v2/node/rest/filter/handlers.nim @@ -0,0 +1,149 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common +import + ../../../waku_core, + ../../../waku_filter, + ../../../waku_filter/client, + ../../message_cache, + ../../peer_manager, + ../../waku_node, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest filter_api" + + +##### Topic cache + +const futTimeout* = 5.seconds # Max time to wait for futures + + +#### Request handlers + +const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" + +type + MessageCache* = message_cache.MessageCache[ContentTopic] + + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + # Check the request body + if contentBody.isNone(): + return err(RestApiResponse.badRequest()) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest()) + + let reqBodyData = contentBody.get().data + + let reqResult = decodeFromJsonBytes(T, reqBodyData) + if reqResult.isErr(): + return err(RestApiResponse.badRequest()) + + return ok(reqResult.get()) + +proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: MessageCache) = + + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + # ## Subscribes a node to a list of contentTopics of a pubsubTopic + # debug "post_waku_v2_filter_v1_subscriptions" + + let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + error "Failed to decode body", error=decodedBody.error() + return decodedBody.error() + + let req: FilterSubscriptionsRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote filter peers") + + let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + let subFut = node.filterSubscribe(req.pubsubTopic, req.contentFilters, handler, peerOpt.get()) + if not await subFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) + + return RestApiResponse.ok() + + +proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + # ## Subscribes a node to a list of contentTopics of a PubSub topic + # debug "delete_waku_v2_filter_v1_subscriptions" + + let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error() + + let req: FilterSubscriptionsRequest = decodedBody.value() + + let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters) + if not await unsubFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to unsubscribe from contentFilters") + + for cTopic in req.contentFilters: + cache.unsubscribe(cTopic) + + # Successfully unsubscribed from all requested contentTopics + return RestApiResponse.ok() + + +const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{contentTopic}" + +proc installFilterGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, cache: MessageCache) = + router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + # ## Returns all WakuMessages received on a specified content topic since the + # ## last time this method was called + # ## TODO: ability to specify a return message limit + # debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + + if contentTopic.isErr(): + return RestApiResponse.badRequest() + let contentTopic = contentTopic.get() + + if not cache.isSubscribed(contentTopic): + raise newException(ValueError, "Not subscribed to topic: " & contentTopic) + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + raise newException(ValueError, "Not subscribed to topic: " & contentTopic) + + let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + debug "An error ocurred while building the json respose", error=resp.error + return RestApiResponse.internalServerError() + + return resp.get() + + +proc installFilterApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache) = + installFilterPostSubscriptionsV1Handler(router, node, cache) + installFilterDeleteSubscriptionsV1Handler(router, node, cache) + installFilterGetMessagesV1Handler(router, node, cache) diff --git a/waku/v2/node/rest/filter/openapi.yaml b/waku/v2/node/rest/filter/openapi.yaml index 38b9ae7302..d913eb08a5 100644 --- a/waku/v2/node/rest/filter/openapi.yaml +++ b/waku/v2/node/rest/filter/openapi.yaml @@ -21,7 +21,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterPostSubscriptionsRequest' + $ref: '#/components/schemas/FilterSubscriptionsRequest' responses: '200': description: OK @@ -45,7 +45,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterDeleteSubscriptionsRequest' + $ref: '#/components/schemas/FilterSubscriptionsRequest' responses: '200': description: OK @@ -61,7 +61,8 @@ paths: '5XX': description: Unexpected error. - /filter/v1/messages/{topic}: + # TODO: Review the path of this endpoint due maybe query for list of contentTopics matching + /filter/v1/messages/{contentTopic}: get: # get_waku_v2_filter_v1_messages summary: Get the latest messages on the polled content topic description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. @@ -70,11 +71,11 @@ paths: - filter parameters: - in: path - name: topic # Note the name is the same as in the path + name: contentTopic # Note the name is the same as in the path required: true schema: type: string - description: The user ID + description: Content topic of message responses: '200': description: The latest messages on the polled topic. @@ -112,19 +113,7 @@ components: required: - payload - FilterPostSubscriptionsRequest: - type: object - properties: - contentFilters: - type: array - items: - $ref: '#/components/schemas/ContentTopic' - pubsubTopic: - $ref: "#/components/schemas/PubSubTopic" - required: - - contentFilters - - FilterDeleteSubscriptionsRequest: + FilterSubscriptionsRequest: type: object properties: contentFilters: diff --git a/waku/v2/node/rest/filter/types.nim b/waku/v2/node/rest/filter/types.nim new file mode 100644 index 0000000000..e1d4c7cd84 --- /dev/null +++ b/waku/v2/node/rest/filter/types.nim @@ -0,0 +1,157 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[sets, strformat], + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../../common/base64, + ../../../waku_core, + ../serdes + + +#### Types + +type FilterWakuMessage* = object + payload*: Base64String + contentTopic*: Option[ContentTopic] + version*: Option[Natural] + timestamp*: Option[int64] + + +type FilterGetMessagesResponse* = seq[FilterWakuMessage] + +type FilterSubscriptionsRequest* = object + pubsubTopic*: PubSubTopic + contentFilters*: seq[ContentTopic] + + +#### Type conversion + +proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = + FilterWakuMessage( + payload: base64.encode(msg.payload), + contentTopic: some(msg.contentTopic), + version: some(Natural(msg.version)), + timestamp: some(msg.timestamp) + ) + +proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, string] = + let + payload = ?msg.payload.decode() + contentTopic = msg.contentTopic.get(DefaultContentTopic) + version = uint32(msg.version.get(version)) + timestamp = msg.timestamp.get(0) + + ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp)) + + +#### Serialization and deserialization + +proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String) + {.raises: [IOError].} = + writer.writeValue(string(value)) + +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("payload", value.payload) + if value.contentTopic.isSome: + writer.writeField("contentTopic", value.contentTopic) + if value.version.isSome: + writer.writeField("version", value.version) + if value.timestamp.isSome: + writer.writeField("timestamp", value.timestamp) + writer.endRecord() + +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("pubsubTopic", value.pubsubTopic) + writer.writeField("contentFilters", value.contentFilters) + writer.endRecord() + +proc readValue*(reader: var JsonReader[RestJson], value: var Base64String) + {.raises: [SerializationError, IOError].} = + value = Base64String(reader.readValue(string)) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) + {.raises: [SerializationError, IOError].} = + var + payload = none(Base64String) + contentTopic = none(ContentTopic) + version = none(Natural) + timestamp = none(int64) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterWakuMessage") + + case fieldName + of "payload": + payload = some(reader.readValue(Base64String)) + of "contentTopic": + contentTopic = some(reader.readValue(ContentTopic)) + of "version": + version = some(reader.readValue(Natural)) + of "timestamp": + timestamp = some(reader.readValue(int64)) + else: + unrecognizedFieldWarning() + + if payload.isNone(): + reader.raiseUnexpectedValue("Field `payload` is missing") + + value = FilterWakuMessage( + payload: payload.get(), + contentTopic: contentTopic, + version: version, + timestamp: timestamp + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest) + {.raises: [SerializationError, IOError].} = + + var + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest") + + case fieldName + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if pubsubTopic.isNone(): + reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") + + + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterSubscriptionsRequest( + pubsubTopic: pubsubTopic.get(), + contentFilters: contentFilters.get() + ) \ No newline at end of file