From 18c27cd7904d7edf2e6b3715b012663f65e511ba Mon Sep 17 00:00:00 2001 From: Jay McDoniel Date: Tue, 21 Jun 2022 16:15:26 -0700 Subject: [PATCH] fix: use pipeline over stream.pipe `pipeline` ends up destroying streams used if there is an error in one of the streams. Due to this, there's no chance of a memory leak from errored out streams. There's also now an addition of adding an error handler to the `StreamableFile` so that stream errors by default return a 400 and can be customized to return an error message however a developer would like. These only effect the express adapter because of how Fastify already internally handles streams. fix: #9759 --- integration/send-files/e2e/express.spec.ts | 3 +++ integration/send-files/src/app.controller.ts | 5 ++++ integration/send-files/src/app.service.ts | 4 +++ .../common/file-stream/streamable-file.ts | 27 +++++++++++++++++++ .../adapters/express-adapter.ts | 14 +++++++++- 5 files changed, 52 insertions(+), 1 deletion(-) diff --git a/integration/send-files/e2e/express.spec.ts b/integration/send-files/e2e/express.spec.ts index 1657d137c39..f95e923389b 100644 --- a/integration/send-files/e2e/express.spec.ts +++ b/integration/send-files/e2e/express.spec.ts @@ -65,4 +65,7 @@ describe('Express FileSend', () => { expect(res.text).to.be.eq(readmeString); }); }); + it('should return an error if the file does not exist', async () => { + return request(app.getHttpServer()).get('/file/not/exist').expect(400); + }); }); diff --git a/integration/send-files/src/app.controller.ts b/integration/send-files/src/app.controller.ts index 606e2d5980c..9715784bdbf 100644 --- a/integration/send-files/src/app.controller.ts +++ b/integration/send-files/src/app.controller.ts @@ -31,4 +31,9 @@ export class AppController { getFileWithHeaders(): StreamableFile { return this.appService.getFileWithHeaders(); } + + @Get('file/not/exist') + getNonExistantFile(): StreamableFile { + return this.appService.getFileThatDoesNotExist(); + } } diff --git a/integration/send-files/src/app.service.ts b/integration/send-files/src/app.service.ts index ba5849a754f..af94bf5b7f1 100644 --- a/integration/send-files/src/app.service.ts +++ b/integration/send-files/src/app.service.ts @@ -35,4 +35,8 @@ export class AppService { }, ); } + + getFileThatDoesNotExist(): StreamableFile { + return new StreamableFile(createReadStream('does-not-exist.txt')); + } } diff --git a/packages/common/file-stream/streamable-file.ts b/packages/common/file-stream/streamable-file.ts index 7fe05c1d902..f8d79c3af69 100644 --- a/packages/common/file-stream/streamable-file.ts +++ b/packages/common/file-stream/streamable-file.ts @@ -3,9 +3,22 @@ import { types } from 'util'; import { isFunction } from '../utils/shared.utils'; import { StreamableFileOptions } from './streamable-options.interface'; +interface StreamableHandlerResponse { + statusCode: number; + send: (msg: string) => void; +} + export class StreamableFile { private readonly stream: Readable; + private handler: (err: Error, response: StreamableHandlerResponse) => void = ( + err: Error, + res, + ) => { + res.statusCode = 400; + res.send(err.message); + }; + constructor(buffer: Uint8Array, options?: StreamableFileOptions); constructor(readable: Readable, options?: StreamableFileOptions); constructor( @@ -38,4 +51,18 @@ export class StreamableFile { length, }; } + + get errorHandler(): ( + err: Error, + response: StreamableHandlerResponse, + ) => void { + return this.handler; + } + + setErrorHandler( + handler: (err: Error, response: StreamableHandlerResponse) => void, + ) { + this.handler = handler; + return this; + } } diff --git a/packages/platform-express/adapters/express-adapter.ts b/packages/platform-express/adapters/express-adapter.ts index 609e51459ba..23263452401 100644 --- a/packages/platform-express/adapters/express-adapter.ts +++ b/packages/platform-express/adapters/express-adapter.ts @@ -1,5 +1,6 @@ import { InternalServerErrorException, + Logger, RawBodyRequest, RequestMethod, StreamableFile, @@ -33,6 +34,7 @@ import * as cors from 'cors'; import * as express from 'express'; import * as http from 'http'; import * as https from 'https'; +import { PassThrough, pipeline } from 'stream'; import { ServeStaticOptions } from '../interfaces/serve-static-options.interface'; type VersionedRoute = < @@ -78,7 +80,17 @@ export class ExpressAdapter extends AbstractHttpAdapter { ) { response.setHeader('Content-Length', streamHeaders.length); } - return body.getStream().pipe(response); + return pipeline( + body.getStream().on('error', (err: Error) => { + body.errorHandler(err, response); + }), + response, + (err: Error) => { + if (err) { + new Logger('ExpressAdapter').error(err.message, err.stack); + } + }, + ); } return isObject(body) ? response.json(body) : response.send(String(body)); }