From fb7e42c93d594d63055da05cdeee6491877c07f9 Mon Sep 17 00:00:00 2001 From: Aras Abbasi Date: Sun, 1 Oct 2023 15:46:29 +0200 Subject: [PATCH] feat: bytesPerSecond function generated by request, allow async bytePerSecond functions (#8) * feat: bytesPerSecond function generated by request, allow async bytesPerSecond functions * run test sequential * fix remarks * make tests less flaky --- .taprc | 2 + README.md | 52 ++++++-- index.js | 97 +++++++++++---- lib/is-async-function.js | 13 ++ test/global-throttle.throttling.test.js | 54 ++++++++ test/lib/is-async-function.test.js | 14 +++ test/route-throttle.buffer-payloads.test.js | 44 +++++++ test/route-throttle.stream-payloads.test.js | 131 ++++++++++++++++++++ test/route-throttle.string-payloads.test.js | 44 +++++++ test/route-throttle.throttling.test.js | 62 +++++++++ types/index.d.ts | 16 ++- types/index.test-d.ts | 36 +++++- 12 files changed, 525 insertions(+), 40 deletions(-) create mode 100644 lib/is-async-function.js create mode 100644 test/lib/is-async-function.test.js diff --git a/.taprc b/.taprc index eb6eb3e..d096314 100644 --- a/.taprc +++ b/.taprc @@ -1,2 +1,4 @@ files: - test/**/*.test.js + +jobs: 1 diff --git a/README.md b/README.md index 0dde531..0060f3d 100644 --- a/README.md +++ b/README.md @@ -55,10 +55,11 @@ The throttle options per route are the same as the plugin options. | Header | Description | Default | |--------|-------------|---------| -|`bytesPerSecond` | The allowed bytes per second, number or a function | 16384 | -|`streamPayloads` | Throttle the payload if it is a stream | true | -|`bufferPayloads` | Throttle the payload if it is a Buffer | false | -|`stringPayloads` | Throttle the payload if it is a string | false | +| `bytesPerSecond` | The allowed bytes per second, number or a function | 16384 | +| `streamPayloads` | Throttle the payload if it is a stream | true | +| `bufferPayloads` | Throttle the payload if it is a Buffer | false | +| `stringPayloads` | Throttle the payload if it is a string | false | +| `async` | Set to true if bytesPerSecond is a function returning a Promise | false | Example for setting throttling globally: @@ -101,9 +102,11 @@ Example for setting the throttling per route: The `bytesPerSecond` option can be a number or a function. The function for `bytesPerSecond` has the following TypeScript definition: ```typescript -(elapsedTime: number, bytes: number) => number +type BytesPerSecond = (request: FastifyRequest) => ((elapsedTime: number, bytes: number) => number) | Promise<((elapsedTime: number, bytes: number) => number)> ``` +`request` is the Fastify request object. + `elapsedTime` is the time since the streaming started in seconds. `bytes` are the bytes already sent. @@ -120,11 +123,12 @@ the `bytesPerSecond` like this: fastify.get('/', { config: { throttle: { - bytesPerSecond: function (elapsedTime, bytes) { - if (elapsedTime < 2) { - return 0 - } else { - return Infinity + bytesPerSecond: function bytesPerSecondfactory(request) { + // this function runs for every request + const client = request.headers['customer-id'] + + return function (elapsedTime, bytes) { + return CONFIG[client] * 2 // return a number of xyz } } } @@ -136,6 +140,34 @@ the `bytesPerSecond` like this: fastify.listen({ port: 3000 }) ``` +The `bytesPerSecond` function can be a sync function or an async function. If you provide an async function then it will be detected by the plugin. If it is a sync function returning a Promise, you must set the `async` option to `true`, so that the plugin knows that it should await the Promise. + +```js + const fastify = require('fastify')() + + await fastify.register(import('@fastify/throttle')) + + fastify.get('/', { + config: { + throttle: { + async: true, + bytesPerSecond: function bytesPerSecondfactory(request) { + // this function runs for every request + const client = request.headers['customer-id'] + + return Promise.resolve(function (elapsedTime, bytes) { + return CONFIG[client] * 2 // return a number of xyz + }) + } + } + } + }, (req, reply) => { + reply.send(createReadStream(resolve(__dirname, __filename))) + }) + + fastify.listen({ port: 3000 }) +``` + ## License **[MIT](https://github.com/fastify/fastify-throttle/blob/master/LICENSE)** diff --git a/index.js b/index.js index 3bd3729..1ec4701 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,7 @@ const fp = require('fastify-plugin') +const { isAsyncFunction } = require('./lib/is-async-function') const { ThrottleStream } = require('./lib/throttle-stream') const { Readable, Stream, pipeline } = require('node:stream') @@ -11,6 +12,7 @@ function fastifyThrottle (fastify, options, done) { options.streamPayloads = options.streamPayloads ?? true options.bufferPayloads = options.bufferPayloads || false options.stringPayloads = options.stringPayloads || false + options.async = options.async || false fastify.addHook('onRoute', (routeOptions) => { const opts = Object.assign({}, options, routeOptions.config?.throttle) @@ -36,32 +38,81 @@ async function addRouteThrottleHook (fastify, routeOptions, throttleOptions) { function throttleOnSendHandler (fastify, throttleOpts) { const bytesPerSecond = throttleOpts.bytesPerSecond - return function onSendHandler (request, reply, payload, done) { - if (throttleOpts.streamPayloads && payload instanceof Stream) { - done(null, pipeline( - payload, - new ThrottleStream({ bytesPerSecond }), - err => { fastify.log.error(err) } - )) - return + if (typeof bytesPerSecond === 'number') { + return async function onSendHandler (request, reply, payload, done) { + if (throttleOpts.streamPayloads && payload instanceof Stream) { + return pipeline( + payload, + new ThrottleStream({ bytesPerSecond }), + err => { fastify.log.error(err) } + ) + } + if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) { + return pipeline( + Readable.from(payload), + new ThrottleStream({ bytesPerSecond }), + err => { fastify.log.error(err) } + ) + } + if (throttleOpts.stringPayloads && typeof payload === 'string') { + return pipeline( + Readable.from(Buffer.from(payload)), + new ThrottleStream({ bytesPerSecond }), + err => { fastify.log.error(err) } + ) + } + return payload } - if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) { - done(null, pipeline( - Readable.from(payload), - new ThrottleStream({ bytesPerSecond }), - err => { fastify.log.error(err) } - )) - return + } else if (throttleOpts.async || isAsyncFunction(bytesPerSecond)) { + return async function onSendHandler (request, reply, payload) { + if (throttleOpts.streamPayloads && payload instanceof Stream) { + return pipeline( + payload, + new ThrottleStream({ bytesPerSecond: await bytesPerSecond(request) }), + err => { fastify.log.error(err) } + ) + } + if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) { + return pipeline( + Readable.from(payload), + new ThrottleStream({ bytesPerSecond: await bytesPerSecond(request) }), + err => { fastify.log.error(err) } + ) + } + if (throttleOpts.stringPayloads && typeof payload === 'string') { + return pipeline( + Readable.from(Buffer.from(payload)), + new ThrottleStream({ bytesPerSecond: await bytesPerSecond(request) }), + err => { fastify.log.error(err) } + ) + } + return payload } - if (throttleOpts.stringPayloads && typeof payload === 'string') { - done(null, pipeline( - Readable.from(Buffer.from(payload)), - new ThrottleStream({ bytesPerSecond }), - err => { fastify.log.error(err) } - )) - return + } else { + return async function onSendHandler (request, reply, payload) { + if (throttleOpts.streamPayloads && payload instanceof Stream) { + return pipeline( + payload, + new ThrottleStream({ bytesPerSecond: bytesPerSecond(request) }), + err => { fastify.log.error(err) } + ) + } + if (throttleOpts.bufferPayloads && Buffer.isBuffer(payload)) { + return pipeline( + Readable.from(payload), + new ThrottleStream({ bytesPerSecond: bytesPerSecond(request) }), + err => { fastify.log.error(err) } + ) + } + if (throttleOpts.stringPayloads && typeof payload === 'string') { + return pipeline( + Readable.from(Buffer.from(payload)), + new ThrottleStream({ bytesPerSecond: bytesPerSecond(request) }), + err => { fastify.log.error(err) } + ) + } + return payload } - done(null, payload) } } diff --git a/lib/is-async-function.js b/lib/is-async-function.js new file mode 100644 index 0000000..da4580b --- /dev/null +++ b/lib/is-async-function.js @@ -0,0 +1,13 @@ +'use strict' + +/* istanbul ignore next */ +const AsyncFunctionConstructor = (async () => {}).constructor + +/** + * A function that returns true if the given function is an async function. + */ +const isAsyncFunction = (fn) => fn.constructor === AsyncFunctionConstructor + +module.exports = { + isAsyncFunction +} diff --git a/test/global-throttle.throttling.test.js b/test/global-throttle.throttling.test.js index fb341d4..390d844 100644 --- a/test/global-throttle.throttling.test.js +++ b/test/global-throttle.throttling.test.js @@ -38,3 +38,57 @@ test('should throttle globally and set the bytesPerSecond', async t => { await fastify.inject('/throttled') assertTimespan(t, startTime, Date.now(), 2000) }) + +test('should throttle globally and set the bytesPerSecond function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle, { + bytesPerSecond: (request) => { + t.equal(request.headers['x-throttle-speed'], '10000') + const bps = parseInt(request.headers['x-throttle-speed'], 10) + return (elapsedTime, bytes) => { + return bps + } + } + }) + + fastify.get('/throttled', (req, reply) => { reply.send(new RandomStream(30000)) }) + + const startTime = Date.now() + + await fastify.inject({ + url: '/throttled', + headers: { + 'x-throttle-speed': '10000' + } + }) + assertTimespan(t, startTime, Date.now(), 2000) +}) + +test('should throttle globally and set the bytesPerSecond async function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle, { + bytesPerSecond: async (request) => { + t.equal(request.headers['x-throttle-speed'], '10000') + const bps = parseInt(request.headers['x-throttle-speed'], 10) + return (elapsedTime, bytes) => { + return bps + } + } + }) + + fastify.get('/throttled', (req, reply) => { reply.send(new RandomStream(30000)) }) + + const startTime = Date.now() + + await fastify.inject({ + url: '/throttled', + headers: { + 'x-throttle-speed': '10000' + } + }) + assertTimespan(t, startTime, Date.now(), 2000) +}) diff --git a/test/lib/is-async-function.test.js b/test/lib/is-async-function.test.js new file mode 100644 index 0000000..76d382e --- /dev/null +++ b/test/lib/is-async-function.test.js @@ -0,0 +1,14 @@ +'use strict' + +const { test } = require('tap') +const { isAsyncFunction } = require('../../lib/is-async-function.js') + +test('isAsyncFunction returns true for async functions', (t) => { + t.plan(1) + t.ok(isAsyncFunction(async () => { })) +}) + +test('isAsyncFunction returns false for non-async functions', (t) => { + t.plan(1) + t.notOk(isAsyncFunction(() => { })) +}) diff --git a/test/route-throttle.buffer-payloads.test.js b/test/route-throttle.buffer-payloads.test.js index d61f43c..6fa9a3c 100644 --- a/test/route-throttle.buffer-payloads.test.js +++ b/test/route-throttle.buffer-payloads.test.js @@ -70,3 +70,47 @@ test('should throttle Buffer payloads if bufferPayloads is set to true', async t assertTimespan(t, startTime, Date.now(), 2000) t.equal(response.body.length, 3000) }) + +test('should throttle Buffer payloads if bufferPayloads is set to true and bytesPerSecond is a function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => (elapsedTime, bytes) => 1000, + bufferPayloads: true + } + } + }, (req, reply) => { reply.send(Buffer.alloc(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) + +test('should throttle Buffer payloads if bufferPayloads is set to true and bytesPerSecond is an async function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: async (request) => (elapsedTime, bytes) => 1000, + bufferPayloads: true + } + } + }, (req, reply) => { reply.send(Buffer.alloc(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) diff --git a/test/route-throttle.stream-payloads.test.js b/test/route-throttle.stream-payloads.test.js index 05cfa9b..c04f7c0 100644 --- a/test/route-throttle.stream-payloads.test.js +++ b/test/route-throttle.stream-payloads.test.js @@ -50,6 +50,73 @@ test('should throttle streams payloads if streamPayloads is set to true', async t.equal(response.body.length, 3000) }) +test('should throttle streams payloads if streamPayloads is set to true and bytesPerSecond is a function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => (elapsedTime, bytes) => 1000, + streamPayloads: true + } + } + }, (req, reply) => { reply.send(new RandomStream(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) + +test('should throttle streams payloads if streamPayloads is set to true and bytesPerSecond is an async function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: async (request) => (elapsedTime, bytes) => 1000, + streamPayloads: true + } + } + }, (req, reply) => { reply.send(new RandomStream(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) + +test('should throttle streams payloads if streamPayloads is set to true and bytesPerSecond is an sync function and async is set to true', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => Promise.resolve((elapsedTime, bytes) => 1000), + async: true, + streamPayloads: true + } + } + }, (req, reply) => { reply.send(new RandomStream(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) + test('should not throttle streams payloads if streamPayloads is set to false', async t => { t.plan(2) const fastify = Fastify() @@ -71,3 +138,67 @@ test('should not throttle streams payloads if streamPayloads is set to false', a assertTimespan(t, startTime, Date.now(), 50, 100) t.equal(response.body.length, 3000) }) + +test('should not throttle streams payloads if streamPayloads is set to false', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/', { + config: { + throttle: { + bytesPerSecond: (request) => (elapsedTime, bytes) => 1000, + streamPayloads: false + } + } + }, (req, reply) => { reply.send(new RandomStream(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/') + assertTimespan(t, startTime, Date.now(), 50, 100) + t.equal(response.body.length, 3000) +}) + +test('should not throttle streams payloads if streamPayloads is set to false and bytesPerSecond is a function returning a Promise', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => Promise.resolve((elapsedTime, bytes) => 1000), + streamPayloads: false, + async: true + } + } + }, (req, reply) => { reply.send(new RandomStream(3000)) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 50, 100) + t.equal(response.body.length, 3000) +}) + +test('should not crash if async is set to true and bytesPerSecond is an sync function returning a rejected Promise', async t => { + t.plan(1) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => Promise.reject(new Error('Arbitrary Error')), + async: true + } + } + }, (req, reply) => { reply.send(new RandomStream(3000)) }) + + const response = await fastify.inject('/throttled') + t.equal(response.statusCode, 500) +}) diff --git a/test/route-throttle.string-payloads.test.js b/test/route-throttle.string-payloads.test.js index 33a14a7..7a35973 100644 --- a/test/route-throttle.string-payloads.test.js +++ b/test/route-throttle.string-payloads.test.js @@ -70,3 +70,47 @@ test('should throttle string payloads when stringPayloads is true', async t => { assertTimespan(t, startTime, Date.now(), 2000) t.equal(response.body.length, 3000) }) + +test('should throttle string payloads when stringPayloads is true and bytesPerSecond is a function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => (elapsedTime, bytes) => 1000, + stringPayloads: true + } + } + }, (req, reply) => { reply.send(Buffer.alloc(3000).toString()) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) + +test('should throttle string payloads when stringPayloads is true and bytesPerSecond is an async function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: async (request) => (elapsedTime, bytes) => 1000, + stringPayloads: true + } + } + }, (req, reply) => { reply.send(Buffer.alloc(3000).toString()) }) + + const startTime = Date.now() + + const response = await fastify.inject('/throttled') + assertTimespan(t, startTime, Date.now(), 2000) + t.equal(response.body.length, 3000) +}) diff --git a/test/route-throttle.throttling.test.js b/test/route-throttle.throttling.test.js index 1cb1594..b2bdb7d 100644 --- a/test/route-throttle.throttling.test.js +++ b/test/route-throttle.throttling.test.js @@ -46,3 +46,65 @@ test('should throttle per route and set the bytesPerSecond', async t => { await fastify.inject('/throttled') assertTimespan(t, startTime, Date.now(), 2000) }) + +test('should throttle per route and set the bytesPerSecond as function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: (request) => { + t.equal(request.headers['x-throttle-speed'], '10000') + const bps = parseInt(request.headers['x-throttle-speed'], 10) + return (elapsedTime, bytes) => { + return bps + } + } + } + } + }, (req, reply) => { reply.send(new RandomStream(30000)) }) + + const startTime = Date.now() + + await fastify.inject({ + url: '/throttled', + headers: { + 'x-throttle-speed': '10000' + } + }) + assertTimespan(t, startTime, Date.now(), 2000) +}) + +test('should throttle per route and set the bytesPerSecond as async function', async t => { + t.plan(2) + const fastify = Fastify() + + await fastify.register(fastifyThrottle) + + fastify.get('/throttled', { + config: { + throttle: { + bytesPerSecond: async (request) => { + t.equal(request.headers['x-throttle-speed'], '10000') + const bps = parseInt(request.headers['x-throttle-speed'], 10) + return (elapsedTime, bytes) => { + return bps + } + } + } + } + }, (req, reply) => { reply.send(new RandomStream(30000)) }) + + const startTime = Date.now() + + await fastify.inject({ + url: '/throttled', + headers: { + 'x-throttle-speed': '10000' + } + }) + assertTimespan(t, startTime, Date.now(), 2000) +}) diff --git a/types/index.d.ts b/types/index.d.ts index a160690..feab7cd 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1,5 +1,5 @@ import { - FastifyPluginCallback, + FastifyPluginCallback, FastifyRequest, } from 'fastify'; declare module 'fastify' { @@ -19,6 +19,11 @@ type FastifyThrottle = FastifyPluginCallback number +/** + * Represents a function that generates a BytesPerSecondFn. + */ +type BytesPerSecondGenerator = (request: FastifyRequest) => BytesPerSecondFn | Promise + /** * Namespace for fastify-throttle plugin options. * @@ -37,7 +42,7 @@ declare namespace fastifyThrottle { * @type {number|BytesPerSecondFn} * @default 16384 */ - bytesPerSecond: number | BytesPerSecondFn + bytesPerSecond: number | BytesPerSecondGenerator /** * Throttle stream payloads. @@ -59,6 +64,13 @@ declare namespace fastifyThrottle { * @default false */ stringPayloads?: boolean + + /** + * The bytesPerSecond function is a sync function returning a Promise. + * @type {boolean} + * @default false + */ + async?: boolean; } export interface FastifyThrottlePluginOptions extends FastifyThrottleOptions { diff --git a/types/index.test-d.ts b/types/index.test-d.ts index 97f8547..c632a95 100644 --- a/types/index.test-d.ts +++ b/types/index.test-d.ts @@ -1,4 +1,4 @@ -import fastify from 'fastify' +import fastify, { FastifyRequest } from 'fastify' import fastifyThrottle from '..' import { expectType } from 'tsd'; @@ -11,10 +11,36 @@ server.register(fastifyThrottle, { stringPayloads: false }) server.register(fastifyThrottle, { - bytesPerSecond: (elapsedTime, bytes) => { - expectType(elapsedTime) - expectType(bytes) - return 200 + bytesPerSecond: (_req) => { + expectType(_req) + return (elapsedTime, bytes) => { + expectType(elapsedTime) + expectType(bytes) + return 200 + } + } +}) + +server.register(fastifyThrottle, { + bytesPerSecond: async (_req) => { + expectType(_req) + return (elapsedTime, bytes) => { + expectType(elapsedTime) + expectType(bytes) + return 200 + } + } +}) + +server.register(fastifyThrottle, { + async: true, + bytesPerSecond: (_req) => { + expectType(_req) + return Promise.resolve((elapsedTime, bytes) => { + expectType(elapsedTime) + expectType(bytes) + return 200 + }) } })