From 572243b9fccf906586c80070ebb0b2cf72a2a4d8 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 17 Nov 2022 13:27:36 +0100 Subject: [PATCH] fix(lightpush): waku lightpush rpc codec support optional fields --- waku/v2/protocol/waku_lightpush/client.nim | 10 ++-- waku/v2/protocol/waku_lightpush/protocol.nim | 12 ++-- waku/v2/protocol/waku_lightpush/rpc.nim | 8 ++- waku/v2/protocol/waku_lightpush/rpc_codec.nim | 56 ++++++++++++------- 4 files changed, 53 insertions(+), 33 deletions(-) diff --git a/waku/v2/protocol/waku_lightpush/client.nim b/waku/v2/protocol/waku_lightpush/client.nim index 32b895dd2d..db66e9d978 100644 --- a/waku/v2/protocol/waku_lightpush/client.nim +++ b/waku/v2/protocol/waku_lightpush/client.nim @@ -42,7 +42,7 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem return err(dialFailure) let connection = connOpt.get() - let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req) + let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req)) await connection.writeLP(rpc.encode().buffer) var buffer = await connection.readLp(MaxRpcSize.int) @@ -53,14 +53,14 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem return err(decodeRpcFailure) let pushResponseRes = decodeRespRes.get() - if pushResponseRes.response == PushResponse(): + if pushResponseRes.response.isNone(): waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) return err(emptyResponseBodyFailure) - let response = pushResponseRes.response + let response = pushResponseRes.response.get() if not response.isSuccess: - if response.info != "": - return err(response.info) + if response.info.isSome(): + return err(response.info.get()) else: return err("unknown failure") diff --git a/waku/v2/protocol/waku_lightpush/protocol.nim b/waku/v2/protocol/waku_lightpush/protocol.nim index f4d1ada712..0379b87291 100644 --- a/waku/v2/protocol/waku_lightpush/protocol.nim +++ b/waku/v2/protocol/waku_lightpush/protocol.nim @@ -45,27 +45,27 @@ proc initProtocolHandler*(wl: WakuLightPush) = return let req = reqDecodeRes.get() - if req.request == PushRequest(): + if req.request.isNone(): error "invalid lightpush rpc received", error=emptyRequestBodyFailure waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure]) return waku_lightpush_messages.inc(labelValues = ["PushRequest"]) let - pubSubTopic = req.request.pubSubTopic - message = req.request.message + pubSubTopic = req.request.get().pubSubTopic + message = req.request.get().message debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic var response: PushResponse let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message) if handleRes.isOk(): - response = PushResponse(is_success: true, info: "OK") + response = PushResponse(is_success: true, info: some("OK")) else: - response = PushResponse(is_success: false, info: handleRes.error) + response = PushResponse(is_success: false, info: some(handleRes.error)) waku_lightpush_errors.inc(labelValues = [messagePushFailure]) error "pushed message handling failed", error=handleRes.error - let rpc = PushRPC(requestId: req.requestId, response: response) + let rpc = PushRPC(requestId: req.requestId, response: some(response)) await conn.writeLp(rpc.encode().buffer) wl.handler = handle diff --git a/waku/v2/protocol/waku_lightpush/rpc.nim b/waku/v2/protocol/waku_lightpush/rpc.nim index 0bb05746e7..b276867eed 100644 --- a/waku/v2/protocol/waku_lightpush/rpc.nim +++ b/waku/v2/protocol/waku_lightpush/rpc.nim @@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} +import + std/options import ../waku_message @@ -13,9 +15,9 @@ type PushResponse* = object isSuccess*: bool - info*: string + info*: Option[string] PushRPC* = object requestId*: string - request*: PushRequest - response*: PushResponse \ No newline at end of file + request*: Option[PushRequest] + response*: Option[PushResponse] diff --git a/waku/v2/protocol/waku_lightpush/rpc_codec.nim b/waku/v2/protocol/waku_lightpush/rpc_codec.nim index ceb1ffa397..b71a9c06e8 100644 --- a/waku/v2/protocol/waku_lightpush/rpc_codec.nim +++ b/waku/v2/protocol/waku_lightpush/rpc_codec.nim @@ -4,6 +4,8 @@ else: {.push raises: [].} +import + std/options import ../../../common/protobuf, ../waku_message, @@ -27,12 +29,16 @@ proc decode*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = var rpc = PushRequest() var pubSubTopic: PubsubTopic - discard ?pb.getField(1, pubSubTopic) - rpc.pubSubTopic = pubSubTopic + if not ?pb.getField(1, pubSubTopic): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.pubSubTopic = pubSubTopic - var buf: seq[byte] - discard ?pb.getField(2, buf) - rpc.message = ?WakuMessage.decode(buf) + var messageBuf: seq[byte] + if not ?pb.getField(2, messageBuf): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.message = ?WakuMessage.decode(messageBuf) ok(rpc) @@ -48,15 +54,19 @@ proc encode*(rpc: PushResponse): ProtoBuffer = proc decode*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var rpc = PushResponse(isSuccess: false, info: "") + var rpc = PushResponse() var isSuccess: uint64 - if ?pb.getField(1, isSuccess): + if not ?pb.getField(1, isSuccess): + return err(ProtoError.RequiredFieldMissing) + else: rpc.isSuccess = bool(isSuccess) var info: string - discard ?pb.getField(2, info) - rpc.info = info + if not ?pb.getField(2, info): + rpc.info = none(string) + else: + rpc.info = some(info) ok(rpc) @@ -65,8 +75,8 @@ proc encode*(rpc: PushRPC): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, rpc.requestId) - pb.write3(2, rpc.request.encode()) - pb.write3(3, rpc.response.encode()) + pb.write3(2, rpc.request.map(encode)) + pb.write3(3, rpc.response.map(encode)) pb.finish3() pb @@ -76,15 +86,23 @@ proc decode*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = var rpc = PushRPC() var requestId: string - discard ?pb.getField(1, requestId) - rpc.requestId = requestId + if not ?pb.getField(1, requestId): + return err(ProtoError.RequiredFieldMissing) + else: + rpc.requestId = requestId var requestBuffer: seq[byte] - discard ?pb.getField(2, requestBuffer) - rpc.request = ?PushRequest.decode(requestBuffer) - - var pushBuffer: seq[byte] - discard ?pb.getField(3, pushBuffer) - rpc.response = ?PushResponse.decode(pushBuffer) + if not ?pb.getField(2, requestBuffer): + rpc.request = none(PushRequest) + else: + let request = ?PushRequest.decode(requestBuffer) + rpc.request = some(request) + + var responseBuffer: seq[byte] + if not ?pb.getField(3, responseBuffer): + rpc.response = none(PushResponse) + else: + let response = ?PushResponse.decode(responseBuffer) + rpc.response = some(response) ok(rpc) \ No newline at end of file