Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace into-stream to Readable.from #290

Merged
merged 14 commits into from
Mar 29, 2024
10 changes: 5 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ const fp = require('fastify-plugin')
const encodingNegotiator = require('@fastify/accept-negotiator')
const pump = require('pump')
const mimedb = require('mime-db')
const intoStream = require('into-stream')
const peek = require('peek-stream')
const { Minipass } = require('minipass')
const pumpify = require('pumpify')
const { Readable } = require('readable-stream')

const { isStream, isGzip, isDeflate } = require('./lib/utils')
const { isStream, isGzip, isDeflate, intoAsyncIterator } = require('./lib/utils')

const InvalidRequestEncodingError = createError('FST_CP_ERR_INVALID_CONTENT_ENCODING', 'Unsupported Content-Encoding: %s', 415)
const InvalidRequestCompressedPayloadError = createError('FST_CP_ERR_INVALID_CONTENT', 'Could not decompress the request payload using the provided encoding', 400)
Expand Down Expand Up @@ -276,7 +276,7 @@ function buildRouteCompress (fastify, params, routeOptions, decorateOnly) {
if (Buffer.byteLength(payload) < params.threshold) {
return next()
}
payload = intoStream(payload)
payload = Readable.from(intoAsyncIterator(payload))
}

setVaryHeader(reply)
Expand Down Expand Up @@ -400,7 +400,7 @@ function compress (params) {
if (Buffer.byteLength(payload) < params.threshold) {
return this.send(payload)
}
payload = intoStream(payload)
payload = Readable.from(intoAsyncIterator(payload))
}

setVaryHeader(this)
Expand Down Expand Up @@ -509,7 +509,7 @@ function maybeUnzip (payload, serialize) {
// handle case where serialize doesn't return a string or Buffer
if (!Buffer.isBuffer(buf)) return result
if (isCompressed(buf) === 0) return result
return intoStream(result)
return Readable.from(intoAsyncIterator(result))
Uzlopak marked this conversation as resolved.
Show resolved Hide resolved
}

function zipStream (deflate, encoding) {
Expand Down
40 changes: 39 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,42 @@ function isStream (stream) {
return stream !== null && typeof stream === 'object' && typeof stream.pipe === 'function'
}

module.exports = { isGzip, isDeflate, isStream }
// It is a helper used to provide a async iteratable for
// Readable.from
async function * intoAsyncIterator (payload) {
climba03003 marked this conversation as resolved.
Show resolved Hide resolved
const isBuffer = Buffer.isBuffer(payload)

if (
(
// ArrayBuffer
payload instanceof ArrayBuffer ||
// NodeJS.TypedArray
ArrayBuffer.isView(payload)
) &&
// Exclude Buffer to prevent double cast
!isBuffer
climba03003 marked this conversation as resolved.
Show resolved Hide resolved
) {
payload = Buffer.from(payload)
}

// Iterator
if (typeof payload === 'object' && typeof payload[Symbol.iterator] === 'function' && !isBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mayba add fast path if Buffer...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't provides any goods but one more branch and === check.

for (const chunk of payload) {
yield chunk
}
return
}

// Async Iterator
if (typeof payload === 'object' && typeof payload[Symbol.asyncIterator] === 'function' && !isBuffer) {
climba03003 marked this conversation as resolved.
Show resolved Hide resolved
for await (const chunk of payload) {
yield chunk
}
return
}

// string | Buffer
yield payload
climba03003 marked this conversation as resolved.
Show resolved Hide resolved
}

module.exports = { isGzip, isDeflate, isStream, intoAsyncIterator }
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
"dependencies": {
"@fastify/accept-negotiator": "^1.1.0",
"fastify-plugin": "^4.5.0",
"into-stream": "^6.0.0",
"mime-db": "^1.52.0",
"minipass": "^7.0.2",
"peek-stream": "^1.1.3",
"pump": "^3.0.0",
"pumpify": "^2.0.1"
"pumpify": "^2.0.1",
"readable-stream": "^4.5.2"
Copy link
Member

@gurgunday gurgunday Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking but I just don't see the reason for adding readable-stream if the test suit is passing without it, I remember PRs that removed that package being merged as well

I get the stability argument, however with the same point of view one could say that any part of node's API can change in a breaking manner between major versions, no? Then why not install every single built-in module as a package?

But maybe readable-stream is special, so just asking to learn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can ensure that bugs in streams not solved in older node versions, like in node14, are not affecting our implementation.

},
"devDependencies": {
"@fastify/pre-commit": "^2.0.2",
Expand All @@ -26,7 +26,8 @@
"standard": "^17.1.0",
"tap": "^16.3.7",
"tsd": "^0.30.0",
"typescript": "^5.1.6"
"typescript": "^5.1.6",
"undici": "^5.28.3"
},
"scripts": {
"coverage": "npm run test:unit -- --coverage-report=html",
Expand Down
42 changes: 42 additions & 0 deletions test/issue-288.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const fastifyCompress = require('..')
const { fetch } = require('undici')

test('should not corrupt the file content', async (t) => {
// provide 2 byte unicode content
const twoByteUnicodeContent = new Array(5_000)
.fill('0')
.map(() => {
const random = new Array(10).fill('A').join('🍃')
return random + '- FASTIFY COMPRESS,🍃 FASTIFY COMPRESS'
})
.join('\n')
const fastify = new Fastify()
t.teardown(() => fastify.close())

fastify.register(async (instance, opts) => {
await fastify.register(fastifyCompress)
// compression
instance.get('/issue', async (req, reply) => {
return twoByteUnicodeContent
})
})

// no compression
fastify.get('/good', async (req, reply) => {
return twoByteUnicodeContent
})

await fastify.listen({ port: 0 })

const { port } = fastify.server.address()
const url = `http://localhost:${port}`
const response = await fetch(`${url}/issue`)
const response2 = await fetch(`${url}/good`)
const body = await response.text()
const body2 = await response2.text()
t.equal(body, body2)
})
39 changes: 38 additions & 1 deletion test/utils.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { createReadStream } = require('node:fs')
const { Socket } = require('node:net')
const { Duplex, PassThrough, Readable, Stream, Transform, Writable } = require('node:stream')
const { test } = require('tap')
const { isStream, isDeflate, isGzip } = require('../lib/utils')
const { isStream, isDeflate, isGzip, intoAsyncIterator } = require('../lib/utils')

test('isStream() utility should be able to detect Streams', async (t) => {
t.plan(12)
Expand Down Expand Up @@ -61,3 +61,40 @@ test('isGzip() utility should be able to detect gzip compressed Buffer', async (
t.equal(isGzip(undefined), false)
t.equal(isGzip(''), false)
})

test('intoAsyncIterator() utility should handle different data', async (t) => {
t.plan(15)

const buf = Buffer.from('foo')
const str = 'foo'
const arr = [str, str]
const arrayBuffer = new ArrayBuffer(8)
const typedArray = new Int32Array(arrayBuffer)
const asyncIterator = (async function * () {
yield str
})()

for await (const buffer of intoAsyncIterator(buf)) {
t.equal(buffer, buf)
}

for await (const string of intoAsyncIterator(str)) {
t.equal(string, str)
}

for await (const chunk of intoAsyncIterator(arr)) {
t.equal(chunk, str)
}

for await (const chunk of intoAsyncIterator(arrayBuffer)) {
t.equal(chunk, 0)
}

for await (const chunk of intoAsyncIterator(typedArray)) {
t.equal(chunk, 0)
}

for await (const chunk of intoAsyncIterator(asyncIterator)) {
t.equal(chunk, str)
}
})
21 changes: 14 additions & 7 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import {
FastifyPluginCallback,
FastifyReply,
FastifyRequest,
RouteOptions as FastifyRouteOptions,
RawServerBase,
RawServerDefault,
RouteOptions as FastifyRouteOptions
} from 'fastify'
import { Input, InputObject } from 'into-stream'
import { Stream } from 'stream'
import { BrotliOptions, ZlibOptions } from 'zlib'
RawServerDefault
} from 'fastify';
import { Stream } from 'stream';
import { BrotliOptions, ZlibOptions } from 'zlib';

declare module 'fastify' {
export interface FastifyContextConfig {
Expand All @@ -26,7 +25,7 @@ declare module 'fastify' {
}

interface FastifyReply {
compress(input: Stream | Input | InputObject): void;
compress(input: Stream | Input): void;
}

export interface RouteOptions {
Expand Down Expand Up @@ -61,6 +60,14 @@ type EncodingToken = 'br' | 'deflate' | 'gzip' | 'identity';

type CompressibleContentTypeFunction = (contentType: string) => boolean;

type Input =
| Buffer
| NodeJS.TypedArray
| ArrayBuffer
| string
| Iterable<Buffer | string>
| AsyncIterable<Buffer | string>;

declare namespace fastifyCompress {

export interface FastifyCompressOptions {
Expand Down
Loading