From 848be9d5c7278e9be32b2f2114ea2a499cc46fb6 Mon Sep 17 00:00:00 2001 From: KaKa <23028015+climba03003@users.noreply.github.com> Date: Sat, 30 Mar 2024 02:05:45 +0800 Subject: [PATCH] feat: replace `into-stream` to `Readable.from` (#290) * feat: replace into-stream to Readable.from * test: regression test of issue 288 * fixup * feat: support more types * chore: fix deps * fixup * fixup * fixup * fixup * refactor: update checking condition * fixup * chore: apply suggestion Co-authored-by: Aras Abbasi Signed-off-by: KaKa <23028015+climba03003@users.noreply.github.com> * fixup * fixup --------- Signed-off-by: KaKa <23028015+climba03003@users.noreply.github.com> Co-authored-by: Aras Abbasi --- index.js | 10 ++++----- lib/utils.js | 43 ++++++++++++++++++++++++++++++++++++- package.json | 7 +++--- test/issue-288.test.js | 48 ++++++++++++++++++++++++++++++++++++++++++ test/utils.test.js | 44 +++++++++++++++++++++++++++++++++++++- types/index.d.ts | 21 ++++++++++++------ 6 files changed, 156 insertions(+), 17 deletions(-) create mode 100644 test/issue-288.test.js diff --git a/index.js b/index.js index 7343f98..cd17201 100644 --- a/index.js +++ b/index.js @@ -7,12 +7,12 @@ const fp = require('fastify-plugin') const encodingNegotiator = require('@fastify/accept-negotiator') const pump = require('pump') const mimedb = require('mime-db') -const intoStream = require('into-stream') const peek = require('peek-stream') const { Minipass } = require('minipass') const pumpify = require('pumpify') +const { Readable } = require('readable-stream') -const { isStream, isGzip, isDeflate } = require('./lib/utils') +const { isStream, isGzip, isDeflate, intoAsyncIterator } = require('./lib/utils') const InvalidRequestEncodingError = createError('FST_CP_ERR_INVALID_CONTENT_ENCODING', 'Unsupported Content-Encoding: %s', 415) const InvalidRequestCompressedPayloadError = createError('FST_CP_ERR_INVALID_CONTENT', 'Could not decompress the request payload using the provided encoding', 400) @@ -276,7 +276,7 @@ function buildRouteCompress (fastify, params, routeOptions, decorateOnly) { if (Buffer.byteLength(payload) < params.threshold) { return next() } - payload = intoStream(payload) + payload = Readable.from(intoAsyncIterator(payload)) } setVaryHeader(reply) @@ -400,7 +400,7 @@ function compress (params) { if (Buffer.byteLength(payload) < params.threshold) { return this.send(payload) } - payload = intoStream(payload) + payload = Readable.from(intoAsyncIterator(payload)) } setVaryHeader(this) @@ -509,7 +509,7 @@ function maybeUnzip (payload, serialize) { // handle case where serialize doesn't return a string or Buffer if (!Buffer.isBuffer(buf)) return result if (isCompressed(buf) === 0) return result - return intoStream(result) + return Readable.from(intoAsyncIterator(result)) } function zipStream (deflate, encoding) { diff --git a/lib/utils.js b/lib/utils.js index 599d4c8..2f265fe 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -35,4 +35,45 @@ function isStream (stream) { return stream !== null && typeof stream === 'object' && typeof stream.pipe === 'function' } -module.exports = { isGzip, isDeflate, isStream } +/** + * Provide a async iteratable for Readable.from + */ +async function * intoAsyncIterator (payload) { + if (typeof payload === 'object') { + if (Buffer.isBuffer(payload)) { + yield payload + return + } + + if ( + // ArrayBuffer + payload instanceof ArrayBuffer || + // NodeJS.TypedArray + ArrayBuffer.isView(payload) + ) { + yield Buffer.from(payload) + return + } + + // Iterator + if (Symbol.iterator in payload) { + for (const chunk of payload) { + yield chunk + } + return + } + + // Async Iterator + if (Symbol.asyncIterator in payload) { + for await (const chunk of payload) { + yield chunk + } + return + } + } + + // string + yield payload +} + +module.exports = { isGzip, isDeflate, isStream, intoAsyncIterator } diff --git a/package.json b/package.json index cd02d9c..7d78ba4 100644 --- a/package.json +++ b/package.json @@ -8,12 +8,12 @@ "dependencies": { "@fastify/accept-negotiator": "^1.1.0", "fastify-plugin": "^4.5.0", - "into-stream": "^6.0.0", "mime-db": "^1.52.0", "minipass": "^7.0.2", "peek-stream": "^1.1.3", "pump": "^3.0.0", - "pumpify": "^2.0.1" + "pumpify": "^2.0.1", + "readable-stream": "^4.5.2" }, "devDependencies": { "@fastify/pre-commit": "^2.0.2", @@ -26,7 +26,8 @@ "standard": "^17.1.0", "tap": "^16.3.7", "tsd": "^0.30.0", - "typescript": "^5.1.6" + "typescript": "^5.1.6", + "undici": "^5.28.3" }, "scripts": { "coverage": "npm run test:unit -- --coverage-report=html", diff --git a/test/issue-288.test.js b/test/issue-288.test.js new file mode 100644 index 0000000..8c53ea5 --- /dev/null +++ b/test/issue-288.test.js @@ -0,0 +1,48 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const fastifyCompress = require('..') +const { request, setGlobalDispatcher, Agent } = require('undici') + +setGlobalDispatcher(new Agent({ + keepAliveTimeout: 10, + keepAliveMaxTimeout: 10 +})) + +test('should not corrupt the file content', async (t) => { + // provide 2 byte unicode content + const twoByteUnicodeContent = new Array(5_000) + .fill('0') + .map(() => { + const random = new Array(10).fill('A').join('🍃') + return random + '- FASTIFY COMPRESS,🍃 FASTIFY COMPRESS' + }) + .join('\n') + const fastify = new Fastify() + t.teardown(() => fastify.close()) + + fastify.register(async (instance, opts) => { + await fastify.register(fastifyCompress) + // compression + instance.get('/issue', async (req, reply) => { + return twoByteUnicodeContent + }) + }) + + // no compression + fastify.get('/good', async (req, reply) => { + return twoByteUnicodeContent + }) + + await fastify.listen({ port: 0 }) + + const { port } = fastify.server.address() + const url = `http://localhost:${port}` + + const response = await request(`${url}/issue`) + const response2 = await request(`${url}/good`) + const body = await response.body.text() + const body2 = await response2.body.text() + t.equal(body, body2) +}) diff --git a/test/utils.test.js b/test/utils.test.js index 6507c2e..6c37465 100644 --- a/test/utils.test.js +++ b/test/utils.test.js @@ -4,7 +4,7 @@ const { createReadStream } = require('node:fs') const { Socket } = require('node:net') const { Duplex, PassThrough, Readable, Stream, Transform, Writable } = require('node:stream') const { test } = require('tap') -const { isStream, isDeflate, isGzip } = require('../lib/utils') +const { isStream, isDeflate, isGzip, intoAsyncIterator } = require('../lib/utils') test('isStream() utility should be able to detect Streams', async (t) => { t.plan(12) @@ -61,3 +61,45 @@ test('isGzip() utility should be able to detect gzip compressed Buffer', async ( t.equal(isGzip(undefined), false) t.equal(isGzip(''), false) }) + +test('intoAsyncIterator() utility should handle different data', async (t) => { + t.plan(8) + + const buf = Buffer.from('foo') + const str = 'foo' + const arr = [str, str] + const arrayBuffer = new ArrayBuffer(8) + const typedArray = new Int32Array(arrayBuffer) + const asyncIterator = (async function * () { + yield str + })() + const obj = {} + + for await (const buffer of intoAsyncIterator(buf)) { + t.equal(buffer, buf) + } + + for await (const string of intoAsyncIterator(str)) { + t.equal(string, str) + } + + for await (const chunk of intoAsyncIterator(arr)) { + t.equal(chunk, str) + } + + for await (const chunk of intoAsyncIterator(arrayBuffer)) { + t.equal(chunk.toString(), Buffer.from(arrayBuffer).toString()) + } + + for await (const chunk of intoAsyncIterator(typedArray)) { + t.equal(chunk.toString(), Buffer.from(typedArray).toString()) + } + + for await (const chunk of intoAsyncIterator(asyncIterator)) { + t.equal(chunk, str) + } + + for await (const chunk of intoAsyncIterator(obj)) { + t.equal(chunk, obj) + } +}) diff --git a/types/index.d.ts b/types/index.d.ts index fd1fe6b..24bcbe0 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -2,13 +2,12 @@ import { FastifyPluginCallback, FastifyReply, FastifyRequest, + RouteOptions as FastifyRouteOptions, RawServerBase, - RawServerDefault, - RouteOptions as FastifyRouteOptions -} from 'fastify' -import { Input, InputObject } from 'into-stream' -import { Stream } from 'stream' -import { BrotliOptions, ZlibOptions } from 'zlib' + RawServerDefault +} from 'fastify'; +import { Stream } from 'stream'; +import { BrotliOptions, ZlibOptions } from 'zlib'; declare module 'fastify' { export interface FastifyContextConfig { @@ -26,7 +25,7 @@ declare module 'fastify' { } interface FastifyReply { - compress(input: Stream | Input | InputObject): void; + compress(input: Stream | Input): void; } export interface RouteOptions { @@ -61,6 +60,14 @@ type EncodingToken = 'br' | 'deflate' | 'gzip' | 'identity'; type CompressibleContentTypeFunction = (contentType: string) => boolean; +type Input = + | Buffer + | NodeJS.TypedArray + | ArrayBuffer + | string + | Iterable + | AsyncIterable; + declare namespace fastifyCompress { export interface FastifyCompressOptions {