Skip to content

Commit

Permalink
fix: use pipeline over stream.pipe
Browse files Browse the repository at this point in the history
`pipeline` ends up destroying streams used if there is an error in one of
the streams. Due to this, there's no chance of a memory leak from errored out
streams. There's also now an addition of adding an error handler to the
`StreamableFile` so that stream errors by default return a 400 and can be
customized to return an error message however a developer would like. These
only effect the express adapter because of how Fastify already internally
handles streams.

fix: nestjs#9759
  • Loading branch information
jmcdo29 committed Jun 21, 2022
1 parent 61b11ad commit 13a11bf
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 54 deletions.
3 changes: 3 additions & 0 deletions integration/send-files/e2e/express.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ describe('Express FileSend', () => {
expect(res.text).to.be.eq(readmeString);
});
});
it('should return an error if the file does not exist', async () => {
return request(app.getHttpServer()).get('/file/not/exist').expect(400);
});
});
5 changes: 5 additions & 0 deletions integration/send-files/src/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ export class AppController {
getFileWithHeaders(): StreamableFile {
return this.appService.getFileWithHeaders();
}

@Get('file/not/exist')
getNonExistantFile(): StreamableFile {
return this.appService.getFileThatDoesNotExist();
}
}
6 changes: 5 additions & 1 deletion integration/send-files/src/app.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable, StreamableFile } from '@nestjs/common';
import { createReadStream, readFileSync } from 'fs';
import { createReadStream, readFile, readFileSync } from 'fs';
import { join } from 'path';
import { Observable, of } from 'rxjs';
import { NonFile } from './non-file';
Expand Down Expand Up @@ -35,4 +35,8 @@ export class AppService {
},
);
}

getFileThatDoesNotExist(): StreamableFile {
return new StreamableFile(createReadStream('does-not-exist.txt'));
}
}
98 changes: 46 additions & 52 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions packages/common/file-stream/streamable-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,22 @@ import { types } from 'util';
import { isFunction } from '../utils/shared.utils';
import { StreamableFileOptions } from './streamable-options.interface';

interface StreamableHandlerResponse {
statusCode: number;
send: (msg: string) => void;
}

export class StreamableFile {
private readonly stream: Readable;

private handler: (err: Error, response: StreamableHandlerResponse) => void = (
err: Error,
res,
) => {
res.statusCode = 400;
res.send(err.message);
};

constructor(buffer: Uint8Array, options?: StreamableFileOptions);
constructor(readable: Readable, options?: StreamableFileOptions);
constructor(
Expand Down Expand Up @@ -38,4 +51,18 @@ export class StreamableFile {
length,
};
}

get errorHandler(): (
err: Error,
response: StreamableHandlerResponse,
) => void {
return this.handler;
}

setErrorHandler(
handler: (err: Error, response: StreamableHandlerResponse) => void,
) {
this.handler = handler;
return this;
}
}
13 changes: 12 additions & 1 deletion packages/platform-express/adapters/express-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
BadRequestException,
InternalServerErrorException,
RawBodyRequest,
RequestMethod,
Expand Down Expand Up @@ -33,6 +34,7 @@ import * as cors from 'cors';
import * as express from 'express';
import * as http from 'http';
import * as https from 'https';
import { PassThrough, pipeline } from 'stream';
import { ServeStaticOptions } from '../interfaces/serve-static-options.interface';

type VersionedRoute = <
Expand Down Expand Up @@ -78,7 +80,16 @@ export class ExpressAdapter extends AbstractHttpAdapter {
) {
response.setHeader('Content-Length', streamHeaders.length);
}
return body.getStream().pipe(response);
return pipeline(
body.getStream().on('error', (err: Error) => {
body.errorHandler(err, response);
}),
response,
(err: Error) => {
if (err) {
}
},
);
}
return isObject(body) ? response.json(body) : response.send(String(body));
}
Expand Down

0 comments on commit 13a11bf

Please sign in to comment.