Skip to content

Commit

Permalink
feat: bytesPerSecond function generated by request, allow async byteP…
Browse files Browse the repository at this point in the history
…erSecond functions (#8)

* feat: bytesPerSecond function generated by request, allow async bytesPerSecond functions

* run test sequential

* fix remarks

* make tests less flaky
  • Loading branch information
Uzlopak authored Oct 1, 2023
1 parent 7064a47 commit fb7e42c
Show file tree
Hide file tree
Showing 12 changed files with 525 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .taprc
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
files:
- test/**/*.test.js

jobs: 1
52 changes: 42 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ The throttle options per route are the same as the plugin options.

| Header | Description | Default |
|--------|-------------|---------|
|`bytesPerSecond` | The allowed bytes per second, number or a function | 16384 |
|`streamPayloads` | Throttle the payload if it is a stream | true |
|`bufferPayloads` | Throttle the payload if it is a Buffer | false |
|`stringPayloads` | Throttle the payload if it is a string | false |
| `bytesPerSecond` | The allowed bytes per second, number or a function | 16384 |
| `streamPayloads` | Throttle the payload if it is a stream | true |
| `bufferPayloads` | Throttle the payload if it is a Buffer | false |
| `stringPayloads` | Throttle the payload if it is a string | false |
| `async` | Set to true if bytesPerSecond is a function returning a Promise | false |

Example for setting throttling globally:

Expand Down Expand Up @@ -101,9 +102,11 @@ Example for setting the throttling per route:
The `bytesPerSecond` option can be a number or a function. The function for `bytesPerSecond` has the following TypeScript definition:

```typescript
(elapsedTime: number, bytes: number) => number
type BytesPerSecond = (request: FastifyRequest) => ((elapsedTime: number, bytes: number) => number) | Promise<((elapsedTime: number, bytes: number) => number)>
```
`request` is the Fastify request object.
`elapsedTime` is the time since the streaming started in seconds.
`bytes` are the bytes already sent.
Expand All @@ -120,11 +123,12 @@ the `bytesPerSecond` like this:
fastify.get('/', {
config: {
throttle: {
bytesPerSecond: function (elapsedTime, bytes) {
if (elapsedTime < 2) {
return 0
} else {
return Infinity
bytesPerSecond: function bytesPerSecondfactory(request) {
// this function runs for every request
const client = request.headers['customer-id']

return function (elapsedTime, bytes) {
return CONFIG[client] * 2 // return a number of xyz
}
}
}
Expand All @@ -136,6 +140,34 @@ the `bytesPerSecond` like this:
fastify.listen({ port: 3000 })
```

The `bytesPerSecond` function can be a sync function or an async function. If you provide an async function then it will be detected by the plugin. If it is a sync function returning a Promise, you must set the `async` option to `true`, so that the plugin knows that it should await the Promise.

```js
const fastify = require('fastify')()

await fastify.register(import('@fastify/throttle'))

fastify.get('/', {
config: {
throttle: {
async: true,
bytesPerSecond: function bytesPerSecondfactory(request) {
// this function runs for every request
const client = request.headers['customer-id']

return Promise.resolve(function (elapsedTime, bytes) {
return CONFIG[client] * 2 // return a number of xyz
})
}
}
}
}, (req, reply) => {
reply.send(createReadStream(resolve(__dirname, __filename)))
})

fastify.listen({ port: 3000 })
```

<a name="license"></a>
## License
**[MIT](https://github.com/fastify/fastify-throttle/blob/master/LICENSE)**
97 changes: 74 additions & 23 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const fp = require('fastify-plugin')

const { isAsyncFunction } = require('./lib/is-async-function')
const { ThrottleStream } = require('./lib/throttle-stream')
const { Readable, Stream, pipeline } = require('node:stream')

Expand All @@ -11,6 +12,7 @@ function fastifyThrottle (fastify, options, done) {
options.streamPayloads = options.streamPayloads ?? true
options.bufferPayloads = options.bufferPayloads || false
options.stringPayloads = options.stringPayloads || false
options.async = options.async || false

fastify.addHook('onRoute', (routeOptions) => {
const opts = Object.assign({}, options, routeOptions.config?.throttle)
Expand All @@ -36,32 +38,81 @@ async function addRouteThrottleHook (fastify, routeOptions, throttleOptions) {
function throttleOnSendHandler (fastify, throttleOpts) {
const bytesPerSecond = throttleOpts.bytesPerSecond

return function onSendHandler (request, reply, payload, done) {
if (throttleOpts.streamPayloads && payload instanceof Stream) {
done(null, pipeline(
payload,
new ThrottleStream({ bytesPerSecond }),
err => { fastify.log.error(err) }
))
return
if (typeof bytesPerSecond === 'number') {
return async function onSendHandler (request, reply, payload, done) {
if (throttleOpts.streamPayloads && payload instanceof Stream) {
return pipeline(
payload,
new ThrottleStream({ bytesPerSecond }),
err => { fastify.log.error(err) }
)
}
if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) {
return pipeline(
Readable.from(payload),
new ThrottleStream({ bytesPerSecond }),
err => { fastify.log.error(err) }
)
}
if (throttleOpts.stringPayloads && typeof payload === 'string') {
return pipeline(
Readable.from(Buffer.from(payload)),
new ThrottleStream({ bytesPerSecond }),
err => { fastify.log.error(err) }
)
}
return payload
}
if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) {
done(null, pipeline(
Readable.from(payload),
new ThrottleStream({ bytesPerSecond }),
err => { fastify.log.error(err) }
))
return
} else if (throttleOpts.async || isAsyncFunction(bytesPerSecond)) {
return async function onSendHandler (request, reply, payload) {
if (throttleOpts.streamPayloads && payload instanceof Stream) {
return pipeline(
payload,
new ThrottleStream({ bytesPerSecond: await bytesPerSecond(request) }),
err => { fastify.log.error(err) }
)
}
if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) {
return pipeline(
Readable.from(payload),
new ThrottleStream({ bytesPerSecond: await bytesPerSecond(request) }),
err => { fastify.log.error(err) }
)
}
if (throttleOpts.stringPayloads && typeof payload === 'string') {
return pipeline(
Readable.from(Buffer.from(payload)),
new ThrottleStream({ bytesPerSecond: await bytesPerSecond(request) }),
err => { fastify.log.error(err) }
)
}
return payload
}
if (throttleOpts.stringPayloads && typeof payload === 'string') {
done(null, pipeline(
Readable.from(Buffer.from(payload)),
new ThrottleStream({ bytesPerSecond }),
err => { fastify.log.error(err) }
))
return
} else {
return async function onSendHandler (request, reply, payload) {
if (throttleOpts.streamPayloads && payload instanceof Stream) {
return pipeline(
payload,
new ThrottleStream({ bytesPerSecond: bytesPerSecond(request) }),
err => { fastify.log.error(err) }
)
}
if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) {
return pipeline(
Readable.from(payload),
new ThrottleStream({ bytesPerSecond: bytesPerSecond(request) }),
err => { fastify.log.error(err) }
)
}
if (throttleOpts.stringPayloads && typeof payload === 'string') {
return pipeline(
Readable.from(Buffer.from(payload)),
new ThrottleStream({ bytesPerSecond: bytesPerSecond(request) }),
err => { fastify.log.error(err) }
)
}
return payload
}
done(null, payload)
}
}

Expand Down
13 changes: 13 additions & 0 deletions lib/is-async-function.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict'

/* istanbul ignore next */
const AsyncFunctionConstructor = (async () => {}).constructor

/**
* A function that returns true if the given function is an async function.
*/
const isAsyncFunction = (fn) => fn.constructor === AsyncFunctionConstructor

module.exports = {
isAsyncFunction
}
54 changes: 54 additions & 0 deletions test/global-throttle.throttling.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,57 @@ test('should throttle globally and set the bytesPerSecond', async t => {
await fastify.inject('/throttled')
assertTimespan(t, startTime, Date.now(), 2000)
})

test('should throttle globally and set the bytesPerSecond function', async t => {
t.plan(2)
const fastify = Fastify()

await fastify.register(fastifyThrottle, {
bytesPerSecond: (request) => {
t.equal(request.headers['x-throttle-speed'], '10000')
const bps = parseInt(request.headers['x-throttle-speed'], 10)
return (elapsedTime, bytes) => {
return bps
}
}
})

fastify.get('/throttled', (req, reply) => { reply.send(new RandomStream(30000)) })

const startTime = Date.now()

await fastify.inject({
url: '/throttled',
headers: {
'x-throttle-speed': '10000'
}
})
assertTimespan(t, startTime, Date.now(), 2000)
})

test('should throttle globally and set the bytesPerSecond async function', async t => {
t.plan(2)
const fastify = Fastify()

await fastify.register(fastifyThrottle, {
bytesPerSecond: async (request) => {
t.equal(request.headers['x-throttle-speed'], '10000')
const bps = parseInt(request.headers['x-throttle-speed'], 10)
return (elapsedTime, bytes) => {
return bps
}
}
})

fastify.get('/throttled', (req, reply) => { reply.send(new RandomStream(30000)) })

const startTime = Date.now()

await fastify.inject({
url: '/throttled',
headers: {
'x-throttle-speed': '10000'
}
})
assertTimespan(t, startTime, Date.now(), 2000)
})
14 changes: 14 additions & 0 deletions test/lib/is-async-function.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

const { test } = require('tap')
const { isAsyncFunction } = require('../../lib/is-async-function.js')

test('isAsyncFunction returns true for async functions', (t) => {
t.plan(1)
t.ok(isAsyncFunction(async () => { }))
})

test('isAsyncFunction returns false for non-async functions', (t) => {
t.plan(1)
t.notOk(isAsyncFunction(() => { }))
})
44 changes: 44 additions & 0 deletions test/route-throttle.buffer-payloads.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,47 @@ test('should throttle Buffer payloads if bufferPayloads is set to true', async t
assertTimespan(t, startTime, Date.now(), 2000)
t.equal(response.body.length, 3000)
})

test('should throttle Buffer payloads if bufferPayloads is set to true and bytesPerSecond is a function', async t => {
t.plan(2)
const fastify = Fastify()

await fastify.register(fastifyThrottle)

fastify.get('/throttled', {
config: {
throttle: {
bytesPerSecond: (request) => (elapsedTime, bytes) => 1000,
bufferPayloads: true
}
}
}, (req, reply) => { reply.send(Buffer.alloc(3000)) })

const startTime = Date.now()

const response = await fastify.inject('/throttled')
assertTimespan(t, startTime, Date.now(), 2000)
t.equal(response.body.length, 3000)
})

test('should throttle Buffer payloads if bufferPayloads is set to true and bytesPerSecond is an async function', async t => {
t.plan(2)
const fastify = Fastify()

await fastify.register(fastifyThrottle)

fastify.get('/throttled', {
config: {
throttle: {
bytesPerSecond: async (request) => (elapsedTime, bytes) => 1000,
bufferPayloads: true
}
}
}, (req, reply) => { reply.send(Buffer.alloc(3000)) })

const startTime = Date.now()

const response = await fastify.inject('/throttled')
assertTimespan(t, startTime, Date.now(), 2000)
t.equal(response.body.length, 3000)
})
Loading

0 comments on commit fb7e42c

Please sign in to comment.