Skip to content

Commit

Permalink
fix(lightpush): waku lightpush rpc codec support optional fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorenzo Delgado committed Nov 18, 2022
1 parent f89e686 commit 572243b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 33 deletions.
10 changes: 5 additions & 5 deletions waku/v2/protocol/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
return err(dialFailure)
let connection = connOpt.get()

let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req)
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
await connection.writeLP(rpc.encode().buffer)

var buffer = await connection.readLp(MaxRpcSize.int)
Expand All @@ -53,14 +53,14 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
return err(decodeRpcFailure)

let pushResponseRes = decodeRespRes.get()
if pushResponseRes.response == PushResponse():
if pushResponseRes.response.isNone():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)

let response = pushResponseRes.response
let response = pushResponseRes.response.get()
if not response.isSuccess:
if response.info != "":
return err(response.info)
if response.info.isSome():
return err(response.info.get())
else:
return err("unknown failure")

Expand Down
12 changes: 6 additions & 6 deletions waku/v2/protocol/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,27 @@ proc initProtocolHandler*(wl: WakuLightPush) =
return

let req = reqDecodeRes.get()
if req.request == PushRequest():
if req.request.isNone():
error "invalid lightpush rpc received", error=emptyRequestBodyFailure
waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure])
return

waku_lightpush_messages.inc(labelValues = ["PushRequest"])
let
pubSubTopic = req.request.pubSubTopic
message = req.request.message
pubSubTopic = req.request.get().pubSubTopic
message = req.request.get().message
debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic

var response: PushResponse
let handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message)
if handleRes.isOk():
response = PushResponse(is_success: true, info: "OK")
response = PushResponse(is_success: true, info: some("OK"))
else:
response = PushResponse(is_success: false, info: handleRes.error)
response = PushResponse(is_success: false, info: some(handleRes.error))
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
error "pushed message handling failed", error=handleRes.error

let rpc = PushRPC(requestId: req.requestId, response: response)
let rpc = PushRPC(requestId: req.requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)

wl.handler = handle
Expand Down
8 changes: 5 additions & 3 deletions waku/v2/protocol/waku_lightpush/rpc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import
std/options
import
../waku_message

Expand All @@ -13,9 +15,9 @@ type

PushResponse* = object
isSuccess*: bool
info*: string
info*: Option[string]

PushRPC* = object
requestId*: string
request*: PushRequest
response*: PushResponse
request*: Option[PushRequest]
response*: Option[PushResponse]
56 changes: 37 additions & 19 deletions waku/v2/protocol/waku_lightpush/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ else:
{.push raises: [].}


import
std/options
import
../../../common/protobuf,
../waku_message,
Expand All @@ -27,12 +29,16 @@ proc decode*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
var rpc = PushRequest()

var pubSubTopic: PubsubTopic
discard ?pb.getField(1, pubSubTopic)
rpc.pubSubTopic = pubSubTopic
if not ?pb.getField(1, pubSubTopic):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.pubSubTopic = pubSubTopic

var buf: seq[byte]
discard ?pb.getField(2, buf)
rpc.message = ?WakuMessage.decode(buf)
var messageBuf: seq[byte]
if not ?pb.getField(2, messageBuf):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.message = ?WakuMessage.decode(messageBuf)

ok(rpc)

Expand All @@ -48,15 +54,19 @@ proc encode*(rpc: PushResponse): ProtoBuffer =

proc decode*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PushResponse(isSuccess: false, info: "")
var rpc = PushResponse()

var isSuccess: uint64
if ?pb.getField(1, isSuccess):
if not ?pb.getField(1, isSuccess):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.isSuccess = bool(isSuccess)

var info: string
discard ?pb.getField(2, info)
rpc.info = info
if not ?pb.getField(2, info):
rpc.info = none(string)
else:
rpc.info = some(info)

ok(rpc)

Expand All @@ -65,8 +75,8 @@ proc encode*(rpc: PushRPC): ProtoBuffer =
var pb = initProtoBuffer()

pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.encode())
pb.write3(3, rpc.response.encode())
pb.write3(2, rpc.request.map(encode))
pb.write3(3, rpc.response.map(encode))
pb.finish3()

pb
Expand All @@ -76,15 +86,23 @@ proc decode*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = PushRPC()

var requestId: string
discard ?pb.getField(1, requestId)
rpc.requestId = requestId
if not ?pb.getField(1, requestId):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.requestId = requestId

var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer)
rpc.request = ?PushRequest.decode(requestBuffer)

var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer)
rpc.response = ?PushResponse.decode(pushBuffer)
if not ?pb.getField(2, requestBuffer):
rpc.request = none(PushRequest)
else:
let request = ?PushRequest.decode(requestBuffer)
rpc.request = some(request)

var responseBuffer: seq[byte]
if not ?pb.getField(3, responseBuffer):
rpc.response = none(PushResponse)
else:
let response = ?PushResponse.decode(responseBuffer)
rpc.response = some(response)

ok(rpc)

0 comments on commit 572243b

Please sign in to comment.