Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't use WakuMessageSize in req/resp protocols #2601

Merged
merged 3 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions tests/waku_filter_v2/test_waku_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ suite "Waku Filter - End to End":

asyncTest "Subscribing to an empty content topic":
# When subscribing to an empty content topic
let subscribeResponse =
await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
let subscribeResponse = await wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()
)

# Then the subscription is not successful
check:
Expand Down Expand Up @@ -1838,8 +1839,9 @@ suite "Waku Filter - End to End":
wakuFilter.subscriptions.isSubscribed(clientPeerId)

# When unsubscribing from an empty content topic
let unsubscribeResponse =
await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]())
let unsubscribeResponse = await wakuFilterClient.unsubscribe(
serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()
)

# Then the unsubscription is not successful
check:
Expand Down Expand Up @@ -2076,10 +2078,11 @@ suite "Waku Filter - End to End":
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB
msg4 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxPushSize - 1024)
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxPushSize - 1024),
) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxPushSize)
contentTopic = contentTopic, payload = getByteSequence(DefaultMaxPushSize)
) # Max Size (Exclusive Limit)

# When sending the 1KiB message
Expand Down
7 changes: 5 additions & 2 deletions tests/waku_lightpush/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ suite "Waku Lightpush Client":
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
let msgLen = message.encode().buffer.len
if msgLen > int(DefaultMaxWakuMessageSize) + 64 * 1024:
return err("length greater than maxMessageSize")
handlerFuture.complete((pubsubTopic, message))
return ok()

Expand Down Expand Up @@ -209,11 +212,11 @@ suite "Waku Lightpush Client":
) # 100KiB
message4 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxRpcSize - overheadBytes - 1),
payload = getByteSequence(DefaultMaxWakuMessageSize - overheadBytes - 1),
) # Inclusive Limit
message5 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxRpcSize - overheadBytes),
payload = getByteSequence(DefaultMaxWakuMessageSize + 64 * 1024),
) # Exclusive Limit

# When publishing the 1KiB payload
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ suite "Waku Peer Exchange":

var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
buffer = await conn.readLp(DefaultMaxRpcSize.int)

# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
Expand Down
13 changes: 7 additions & 6 deletions tests/waku_relay/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1042,14 +1042,15 @@ suite "Waku Relay":
) # 100KiB
msg4 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 38),
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 38),
) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(MaxWakuMessageSize - sizeEmptyMsg - 37),
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 37),
) # Max Size (Exclusive Limit)
msg6 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(MaxWakuMessageSize)
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize),
) # MaxWakuMessageSize -> Out of Max Size

# Notice that the message is wrapped with more data in https://github.com/status-im/nim-libp2p/blob/3011ba4326fa55220a758838835797ff322619fc/libp2p/protocols/pubsub/gossipsub.nim#L627-L632
Expand Down Expand Up @@ -1092,7 +1093,7 @@ suite "Waku Relay":
(pubsubTopic, msg3) == handlerFuture.read()
(pubsubTopic, msg3) == otherHandlerFuture.read()

# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 38' message
# When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 38' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg4)
Expand All @@ -1104,7 +1105,7 @@ suite "Waku Relay":
(pubsubTopic, msg4) == handlerFuture.read()
(pubsubTopic, msg4) == otherHandlerFuture.read()

# When sending the 'MaxWakuMessageSize - sizeEmptyMsg - 37' message
# When sending the 'DefaultMaxWakuMessageSize - sizeEmptyMsg - 37' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg5)
Expand All @@ -1115,7 +1116,7 @@ suite "Waku Relay":
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
(pubsubTopic, msg5) == handlerFuture.read()

# When sending the 'MaxWakuMessageSize' message
# When sending the 'DefaultMaxWakuMessageSize' message
handlerFuture = newPushHandlerFuture()
otherHandlerFuture = newPushHandlerFuture()
discard await node.publish(pubsubTopic, msg6)
Expand Down
8 changes: 4 additions & 4 deletions tests/wakunode_rest/test_rest_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ suite "Waku v2 Rest API - Relay":
let response = await client.relayPostMessagesV1(
DefaultPubsubTopic,
RelayWakuMessage(
payload: base64.encode(getByteSequence(MaxWakuMessageSize)),
payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)),
# Message will be bigger than the max size
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022)),
Expand All @@ -608,7 +608,7 @@ suite "Waku v2 Rest API - Relay":
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data ==
fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes"
fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"

await restServer.stop()
await restServer.closeWait()
Expand Down Expand Up @@ -657,7 +657,7 @@ suite "Waku v2 Rest API - Relay":
# When
let response = await client.relayPostAutoMessagesV1(
RelayWakuMessage(
payload: base64.encode(getByteSequence(MaxWakuMessageSize)),
payload: base64.encode(getByteSequence(DefaultMaxWakuMessageSize)),
# Message will be bigger than the max size
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022)),
Expand All @@ -669,7 +669,7 @@ suite "Waku v2 Rest API - Relay":
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data ==
fmt"Failed to publish: Message size exceeded maximum of {MaxWakuMessageSize} bytes"
fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"

await restServer.stop()
await restServer.closeWait()
Expand Down
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
peerExchangeHandler = none(RoutingRecordsHandler),
maxMessageSize = int(MaxWakuMessageSize),
maxMessageSize = int(DefaultMaxWakuMessageSize),
) {.async, gcsafe.} =
if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping"
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_core/message/default_values.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import ../../common/utils/parse_size_units
const
## https://rfc.vac.dev/spec/64/#message-size
DefaultMaxWakuMessageSizeStr* = "150KiB" # Remember that 1 MiB is the PubSub default
MaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)
DefaultMaxWakuMessageSize* = parseCorrectMsgSize(DefaultMaxWakuMessageSizeStr)

DefaultSafetyBufferProtocolOverhead* = 64 * 1024 # overhead measured in bytes
12 changes: 6 additions & 6 deletions waku/waku_filter_v2/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ proc sendSubscribeRequest(
# TODO: this can raise an exception
await connection.writeLP(filterSubscribeRequest.encode().buffer)

let respBuf = await connection.readLp(MaxSubscribeResponseSize)
let respBuf = await connection.readLp(DefaultMaxSubscribeResponseSize)
let respDecodeRes = FilterSubscribeResponse.decode(respBuf)
if respDecodeRes.isErr():
trace "Failed to decode filter subscribe response", servicePeer
Expand Down Expand Up @@ -79,7 +79,7 @@ proc subscribe*(
wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: ContentTopic|seq[ContentTopic],
contentTopics: ContentTopic | seq[ContentTopic],
): Future[FilterSubscribeResult] {.async.} =
var contentTopicSeq: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
Expand All @@ -98,14 +98,14 @@ proc unsubscribe*(
wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopics: ContentTopic|seq[ContentTopic],
contentTopics: ContentTopic | seq[ContentTopic],
): Future[FilterSubscribeResult] {.async.} =
var contentTopicSeq: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
contentTopicSeq = contentTopics
else:
contentTopicSeq = @[contentTopics]

let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe(
requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq
Expand All @@ -127,7 +127,7 @@ proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =

proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(int(MaxPushSize))
let buf = await conn.readLp(int(DefaultMaxPushSize))

let decodeRes = MessagePush.decode(buf)
if decodeRes.isErr():
Expand All @@ -152,4 +152,4 @@ proc new*(
): T =
let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[])
wfc.initProtocolHandler()
wfc
wfc
2 changes: 1 addition & 1 deletion waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peerId = conn.peerId

let buf = await conn.readLp(int(MaxSubscribeSize))
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))

let decodeRes = FilterSubscribeRequest.decode(buf)
if decodeRes.isErr():
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_filter_v2/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import std/options
import ../common/protobuf, ../waku_core, ./rpc

const
MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64 * 1024
DefaultMaxSubscribeSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead
MaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer.
MaxPushSize* = 10 * MaxWakuMessageSize + 64 * 1024
DefaultMaxSubscribeResponseSize* = 64 * 1024 # Responses are small. 64kB safety buffer.
DefaultMaxPushSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead

proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer =
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ proc sendPushRequest(

var buffer: seq[byte]
try:
buffer = await connection.readLp(MaxRpcSize.int)
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
return err("Exception reading: " & getCurrentExceptionMsg())

Expand Down
2 changes: 1 addition & 1 deletion waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ proc handleRequest*(

proc initProtocolHandler(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let buffer = await conn.readLp(DefaultMaxRpcSize)
let rpc = await handleRequest(wl, conn.peerId, buffer)
await conn.writeLp(rpc.encode().buffer)

Expand Down
3 changes: 1 addition & 2 deletions waku/waku_lightpush/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ else:
import std/options
import ../common/protobuf, ../waku_core, ./rpc

const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead
const DefaultMaxRpcSize* = -1

proc encode*(rpc: PushRequest): ProtoBuffer =
var pb = initProtoBuffer()
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logScope:
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024
# TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxPeersCacheSize = 60
CacheRefreshInterval = 15.minutes
Expand Down Expand Up @@ -61,7 +61,7 @@ proc request*(
var error: string
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
error = $exc.msg
Expand Down Expand Up @@ -153,7 +153,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(MaxRpcSize.int)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ proc initProtocolHandler(w: WakuRelay) =
w.codec = WakuRelayCodec

proc new*(
T: type WakuRelay, switch: Switch, maxMessageSize = int(MaxWakuMessageSize)
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
): WakuRelayResult[T] =
## maxMessageSize: max num bytes that are allowed for the WakuMessage

Expand Down
4 changes: 3 additions & 1 deletion waku/waku_store/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ proc sendHistoryQueryRPC(
let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC()))
await connection.writeLP(reqRpc.encode().buffer)

let buf = await connection.readLp(MaxRpcSize.int)
#TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail.
# Need to find a workaround for this.
let buf = await connection.readLp(DefaultMaxRpcSize.int)
let respDecodeRes = HistoryRPC.decode(buf)
if respDecodeRes.isErr():
waku_store_errors.inc(labelValues = [decodeRpcFailure])
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_store/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type WakuStore* = ref object of LPProtocol

proc initProtocolHandler(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
let buf = await conn.readLp(DefaultMaxRpcSize.int)

let decodeRes = HistoryRPC.decode(buf)
if decodeRes.isErr():
Expand Down
3 changes: 1 addition & 2 deletions waku/waku_store/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ else:
import std/options, nimcrypto/hash
import ../common/[protobuf, paging], ../waku_core, ./common, ./rpc

const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64 * 1024
# We add a 64kB safety buffer for protocol overhead
const DefaultMaxRpcSize* = -1

## Pagination

Expand Down
Loading