Skip to content

Commit

Permalink
fix: don't use WakuMessageSize in req/resp protocols (#2601)
Browse files Browse the repository at this point in the history
* fix: don't use WakuMessageSize in req/resp protocols
  • Loading branch information
chaitanyaprem authored Apr 20, 2024
1 parent 1ba9df4 commit e61e4ff
Show file tree
Hide file tree
Showing 18 changed files with 50 additions and 43 deletions.
15 changes: 9 additions & 6 deletions tests/waku_filter_v2/test_waku_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ suite "Waku Filter - End to End":

asyncTest "Subscribing to an empty content topic":
# When subscribing to an empty content topic
let subscribeResponse =
await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
let subscribeResponse = await wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()
)

# Then the subscription is not successful
check:
Expand Down Expand Up @@ -1838,8 +1839,9 @@ suite "Waku Filter - End to End":
wakuFilter.subscriptions.isSubscribed(clientPeerId)

# When unsubscribing from an empty content topic
let unsubscribeResponse =
await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
let unsubscribeResponse = await wakuFilterClient.unsubscribe(
serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()
)

# Then the unsubscription is not successful
check:
Expand Down Expand Up @@ -2076,10 +2078,11 @@ suite "Waku Filter - End to End":
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB
msg4 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxPushSize - 1024)
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxPushSize - 1024),
) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxPushSize)
contentTopic = contentTopic, payload = getByteSequence(DefaultMaxPushSize)
) # Max Size (Exclusive Limit)

# When sending the 1KiB message
Expand Down
7 changes: 5 additions & 2 deletions tests/waku_lightpush/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ suite "Waku Lightpush Client":
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
let msgLen = message.encode().buffer.len
if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024:
return err("length greater than maxMessageSize")
handlerFuture.complete((pubsubTopic, message))
return ok()

Expand Down Expand Up @@ -209,11 +212,11 @@ suite "Waku Lightpush Client":
) # 100KiB
message4 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxRpcSize - overheadBytes - 1),
payload = getByteSequence(DefaultMaxWakuMessageSize - overheadBytes - 1),
) # Inclusive Limit
message5 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxRpcSize - overheadBytes),
payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024),
) # Exclusive Limit

# When publishing the 1KiB payload
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ suite "Waku Peer Exchange":

var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
buffer = await conn.readLp(DefaultMaxRpcSize.int)

# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
Expand Down
13 changes: 7 additions & 6 deletions tests/waku_relay/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1042,14 +1042,15 @@ suite "Waku Relay":
) # 100KiB
msg4 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 38),
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 38),
) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 37),
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 37),
) # Max Size (Exclusive Limit)
msg6 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxWakuMessageSize)
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize),
) # MaxWakuMessageSize -> Out of Max Size

# Notice that the message is wrapped with more data in https://github.com/status-im/nim-libp2p/blob/3011ba4326fa55220a758838835797ff322619fc/libp2p/protocols/pubsub/gossipsub.nim#L627-L632
Expand Down Expand Up @@ -1092,7 +1093,7 @@ suite "Waku Relay":
(pubsubTopic, msg3) == handlerFuture.read()
(pubsubTopic, msg3) == otherHandlerFuture.read()

# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 38' message
# When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 38' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg4)
Expand All @@ -1104,7 +1105,7 @@ suite "Waku Relay":
(pubsubTopic, msg4) == handlerFuture.read()
(pubsubTopic, msg4) == otherHandlerFuture.read()

# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 37' message
# When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 37' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg5)
Expand All @@ -1115,7 +1116,7 @@ suite "Waku Relay":
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
(pubsubTopic, msg5) == handlerFuture.read()

# When sending the 'MaxWakuMessageSize' message
# When sending the 'DefaultMaxWakuMessageSize' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg6)
Expand Down
8 changes: 4 additions & 4 deletions tests/wakunode_rest/test_rest_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ suite "Waku v2 Rest API - Relay":
let response = await client.relayPostMessagesV1(
DefaultPubsubTopic,
RelayWakuMessage(
payload: base64.encode(getByteSequence(MaxWakuMessageSize)),
payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)),
# Message will be bigger than the max size
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022)),
Expand All @@ -608,7 +608,7 @@ suite "Waku v2 Rest API - Relay":
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data ==
fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes"
fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"

await restServer.stop()
await restServer.closeWait()
Expand Down Expand Up @@ -657,7 +657,7 @@ suite "Waku v2 Rest API - Relay":
# When
let response = await client.relayPostAutoMessagesV1(
RelayWakuMessage(
payload: base64.encode(getByteSequence(MaxWakuMessageSize)),
payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)),
# Message will be bigger than the max size
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022)),
Expand All @@ -669,7 +669,7 @@ suite "Waku v2 Rest API - Relay":
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data ==
fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes"
fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"

await restServer.stop()
await restServer.closeWait()
Expand Down
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
peerExchangeHandler = none(RoutingRecordsHandler),
maxMessageSize = int(MaxWakuMessageSize),
maxMessageSize = int(DefaultMaxWakuMessageSize),
) {.async, gcsafe.} =
if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping"
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_core/message/default_values.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import ../../common/utils/parse_size_units
const
## https://rfc.vac.dev/spec/64/#message-size
DefaultMaxWakuMessageSizeStr* = "150KiB" # Remember that 1 MiB is the PubSub default
MaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)
DefaultMaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)

DefaultSafetyBufferProtocolOverhead* = 64 * 1024 # overhead measured in bytes
12 changes: 6 additions & 6 deletions waku/waku_filter_v2/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ proc sendSubscribeRequest(
# TODO: this can raise an exception
await connection.writeLP(filterSubscribeRequest.encode().buffer)

let respBuf = await connection.readLp(MaxSubscribeResponseSize)
let respBuf = await connection.readLp(DefaultMaxSubscribeResponseSize)
let respDecodeRes = FilterSubscribeResponse.decode(respBuf)
if respDecodeRes.isErr():
trace "Failed to decode filter subscribe response", servicePeer
Expand Down Expand Up @@ -79,7 +79,7 @@ proc subscribe*(
wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: ContentTopic|seq[ContentTopic],
contentTopics: ContentTopic | seq[ContentTopic],
): Future[FilterSubscribeResult] {.async.} =
var contentTopicSeq: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
Expand All @@ -98,14 +98,14 @@ proc unsubscribe*(
wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: ContentTopic|seq[ContentTopic],
contentTopics: ContentTopic | seq[ContentTopic],
): Future[FilterSubscribeResult] {.async.} =
var contentTopicSeq: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
contentTopicSeq = contentTopics
else:
contentTopicSeq = @[contentTopics]

let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
Expand All @@ -127,7 +127,7 @@ proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =

proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(int(MaxPushSize))
let buf = await conn.readLp(int(DefaultMaxPushSize))

let decodeRes = MessagePush.decode(buf)
if decodeRes.isErr():
Expand All @@ -152,4 +152,4 @@ proc new*(
): T =
let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[])
wfc.initProtocolHandler()
wfc
wfc
2 changes: 1 addition & 1 deletion waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peerId = conn.peerId

let buf = await conn.readLp(int(MaxSubscribeSize))
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))

let decodeRes = FilterSubscribeRequest.decode(buf)
if decodeRes.isErr():
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_filter_v2/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import std/options
import ../common/protobuf, ../waku_core, ./rpc

const
MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64 * 1024
DefaultMaxSubscribeSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead
MaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer.
MaxPushSize* = 10 * MaxWakuMessageSize + 64 * 1024
DefaultMaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer.
DefaultMaxPushSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead

proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer =
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ proc sendPushRequest(

var buffer: seq[byte]
try:
buffer = await connection.readLp(MaxRpcSize.int)
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
return err("Exception reading: " & getCurrentExceptionMsg())

Expand Down
2 changes: 1 addition & 1 deletion waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ proc handleRequest*(

proc initProtocolHandler(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let buffer = await conn.readLp(DefaultMaxRpcSize)
let rpc = await handleRequest(wl, conn.peerId, buffer)
await conn.writeLp(rpc.encode().buffer)

Expand Down
3 changes: 1 addition & 2 deletions waku/waku_lightpush/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ else:
import std/options
import ../common/protobuf, ../waku_core, ./rpc

const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead
const DefaultMaxRpcSize* = -1

proc encode*(rpc: PushRequest): ProtoBuffer =
var pb = initProtoBuffer()
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logScope:
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxPeersCacheSize = 60
CacheRefreshInterval = 15.minutes
Expand Down Expand Up @@ -61,7 +61,7 @@ proc request*(
var error: string
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
error = $exc.msg
Expand Down Expand Up @@ -153,7 +153,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(MaxRpcSize.int)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ proc initProtocolHandler(w: WakuRelay) =
w.codec = WakuRelayCodec

proc new*(
T: type WakuRelay, switch: Switch, maxMessageSize = int(MaxWakuMessageSize)
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
): WakuRelayResult[T] =
## maxMessageSize: max num bytes that are allowed for the WakuMessage

Expand Down
4 changes: 3 additions & 1 deletion waku/waku_store/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ proc sendHistoryQueryRPC(
let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC()))
await connection.writeLP(reqRpc.encode().buffer)

let buf = await connection.readLp(MaxRpcSize.int)
#TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail.
# Need to find a workaround for this.
let buf = await connection.readLp(DefaultMaxRpcSize.int)
let respDecodeRes = HistoryRPC.decode(buf)
if respDecodeRes.isErr():
waku_store_errors.inc(labelValues = [decodeRpcFailure])
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_store/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type WakuStore* = ref object of LPProtocol

proc initProtocolHandler(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
let buf = await conn.readLp(DefaultMaxRpcSize.int)

let decodeRes = HistoryRPC.decode(buf)
if decodeRes.isErr():
Expand Down
3 changes: 1 addition & 2 deletions waku/waku_store/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ else:
import std/options, nimcrypto/hash
import ../common/[protobuf, paging], ../waku_core, ./common, ./rpc

const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead
const DefaultMaxRpcSize* = -1

## Pagination

Expand Down

0 comments on commit e61e4ff

Please sign in to comment.