Skip to content

Commit

Permalink
feat: Added simple, configurable rate limit for lightpush and store-q…
Browse files Browse the repository at this point in the history
…uery (#2390)

* feat: Added simple, configurable rate limit for lightpush and store-query
Adjust lightpush rest response to rate limit, added tests ann some fixes
Add rest store query test for rate limit checks and proper error response
Update apps/wakunode2/external_config.nim
Move chronos/tokenbucket to nwaku codebasee with limited and fixed feature set
Add meterics counter to lightpush rate limits

Co-authored-by: gabrielmer <[email protected]>
  • Loading branch information
NagyZoltanPeter and gabrielmer authored Apr 15, 2024
1 parent 4117fe6 commit a00f350
Show file tree
Hide file tree
Showing 22 changed files with 686 additions and 29 deletions.
6 changes: 4 additions & 2 deletions tests/waku_lightpush/lightpush_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import
../testlib/[common, wakucore]

proc newTestWakuLightpushNode*(
switch: Switch, handler: PushMessageHandler
switch: Switch,
handler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuLightPush.new(peerManager, rng, handler)
proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting)

await proto.start()
switch.mount(proto)
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_lightpush/test_all.nim
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import ./test_client
import ./test_client, ./test_ratelimit
2 changes: 1 addition & 1 deletion tests/waku_lightpush/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ suite "Waku Lightpush Client":
# 1KiB
message2 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
) # 10KiB
) # 10KiB
message3 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB
Expand Down
151 changes: 151 additions & 0 deletions tests/waku_lightpush/test_ratelimit.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{.used.}

import
std/[options, strscans],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto

import
../../waku/[
node/peer_manager,
common/ratelimit,
waku_core,
waku_lightpush,
waku_lightpush/client,
waku_lightpush/common,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
../resources/[pubsub_topics, content_topics, payloads]

suite "Rate limited push service":
asyncTest "push message with rate limit not violated":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler: PushMessageHandler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
tokenPeriod = 500.millis
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod)))
client = newTestWakuLightpushClient(clientSwitch)

let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()

let sendMsgProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()

handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)

check await handlerFuture.withTimeout(50.millis)

assert requestRes.isOk(), requestRes.error
check handlerFuture.finished()

let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()

check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message

let waitInBetweenFor = 20.millis

# Test cannot be too explicit about the time when the TokenBucket resets
# the internal timer, although in normal use there is no use case to care about it.
var firstWaitExtend = 300.millis

for runCnt in 0 ..< 3:
let startTime = Moment.now()
for testCnt in 0 ..< 3:
await sendMsgProc()
await sleepAsync(20.millis)

var endTime = Moment.now()
var elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
firstWaitEXtend = 100.millis

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

asyncTest "push message with rate limit reject":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, 500.millis)))
client = newTestWakuLightpushClient(clientSwitch)

let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic

let successProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)

check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message

let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)

check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"

for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)

await rejectProc()

await sleepAsync(500.millis)

## next one shall succeed due to the rate limit time window has passed
await successProc()

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
98 changes: 95 additions & 3 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ procSuite "WakuNode - Store":

server.wakuFilterClient.registerPushHandler(filterHandler)
let resp = waitFor server.filterSubscribe(
some(DefaultPubsubTopic),
DefaultContentTopic,
peer = filterSourcePeer,
some(DefaultPubsubTopic), DefaultContentTopic, peer = filterSourcePeer
)

waitFor sleepAsync(100.millis)
Expand Down Expand Up @@ -319,3 +317,97 @@ procSuite "WakuNode - Store":

# Cleanup
waitFor allFutures(client.stop(), server.stop())

test "Store protocol queries does not violate request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor allFutures(client.start(), server.start())

let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore((4, 500.millis))

client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let requestProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

assert queryRes.isOk(), queryRes.error

let response = queryRes.get()
check:
response.messages == msgListA

for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)

waitFor sleepAsync(500.millis)

for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)

# Cleanup
waitFor allFutures(client.stop(), server.stop())

test "Store protocol queries overrun request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor allFutures(client.start(), server.start())

let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore((3, 500.millis))

client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let successProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isOk()

let response = queryRes.get()
check:
response.messages == msgListA

let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isErr()
check queryRes.error == "TOO_MANY_REQUESTS"

for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)

waitFor failsProc()

waitFor sleepAsync(500.millis)

for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)

# Cleanup
waitFor allFutures(client.stop(), server.stop())
78 changes: 75 additions & 3 deletions tests/wakunode_rest/test_rest_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import
../../waku/waku_api/rest/lightpush/handlers as lightpush_api,
../../waku/waku_api/rest/lightpush/client as lightpush_api_client,
../../waku/waku_relay,
../../waku/common/ratelimit,
../testlib/wakucore,
../testlib/wakunode
../testlib/wakunode,
../testlib/testutils

proc testWakuNode(): WakuNode =
let
Expand All @@ -41,7 +43,9 @@ type RestLightPushTest = object
restServer: WakuRestServerRef
client: RestClientRef

proc init(T: type RestLightPushTest): Future[T] {.async.} =
proc init(
T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis)
): Future[T] {.async.} =
var testSetup = RestLightPushTest()
testSetup.serviceNode = testWakuNode()
testSetup.pushNode = testWakuNode()
Expand All @@ -55,7 +59,7 @@ proc init(T: type RestLightPushTest): Future[T] {.async.} =

await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountLightPush()
await testSetup.serviceNode.mountLightPush(rateLimit)
testSetup.pushNode.mountLightPushClient()

testSetup.serviceNode.peerManager.addServicePeer(
Expand Down Expand Up @@ -178,6 +182,74 @@ suite "Waku v2 Rest API - lightpush":

await restLightPushTest.shutdown()

# disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500
xasyncTest "Request rate limit push message":
# Given
let budgetCap = 3
let tokenPeriod = 500.millis
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))

restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

# When
let pushProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()

let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT

let pushRejectedProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()

let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 429

await pushProc()
await pushProc()
await pushProc()
await pushRejectedProc()

await sleepAsync(tokenPeriod)

for runCnt in 0 ..< 3:
let startTime = Moment.now()
for sendCnt in 0 ..< budgetCap:
await pushProc()

let endTime = Moment.now()
let elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed)

await restLightPushTest.shutdown()

## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93
## This test is similar when no available peer exists for publish. Currently it is returning success,
## that makes this test not useful.
Expand Down
Loading

0 comments on commit a00f350

Please sign in to comment.