From 8e622220c8af437687927d25aeb3f340447aed43 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 7 Jun 2024 10:44:44 -0700 Subject: [PATCH 1/2] grpc-js: Avoid buffering significantly more than max_receive_message_size per received message (1.8.x) --- packages/grpc-js/src/compression-filter.ts | 72 +++++++++++---- packages/grpc-js/src/internal-channel.ts | 2 - .../grpc-js/src/max-message-size-filter.ts | 89 ------------------- packages/grpc-js/src/server-call.ts | 87 +++++++++++------- packages/grpc-js/src/stream-decoder.ts | 5 ++ packages/grpc-js/src/subchannel-call.ts | 14 ++- packages/grpc-js/src/transport.ts | 15 +++- .../grpc-js/test/fixtures/test_service.proto | 1 + packages/grpc-js/test/test-server-errors.ts | 49 +++++++++- 9 files changed, 186 insertions(+), 148 deletions(-) delete mode 100644 packages/grpc-js/src/max-message-size-filter.ts diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index b8c2e6d1e..86af3a5f6 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; -import { LogVerbosity } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import * as logging from './logging'; import { Metadata, MetadataValue } from './metadata'; @@ -94,6 +94,10 @@ class IdentityHandler extends CompressionHandler { } class DeflateHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.deflate(message, (err, output) => { @@ -108,18 +112,34 @@ class DeflateHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.inflate(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createInflate(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } class GzipHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.gzip(message, (err, output) => { @@ -134,13 +154,25 @@ class GzipHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.unzip(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createGunzip(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } @@ -165,14 +197,14 @@ class UnknownHandler extends CompressionHandler { } } -function getCompressionHandler(compressionName: string): CompressionHandler { +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler { switch (compressionName) { case 'identity': return new IdentityHandler(); case 'deflate': - return new DeflateHandler(); + return new DeflateHandler(maxReceiveMessageSize); case 'gzip': - return new GzipHandler(); + return new GzipHandler(maxReceiveMessageSize); default: return new UnknownHandler(compressionName); } @@ -182,11 +214,14 @@ export class CompressionFilter extends BaseFilter implements Filter { private sendCompression: CompressionHandler = new IdentityHandler(); private receiveCompression: CompressionHandler = new IdentityHandler(); private currentCompressionAlgorithm: CompressionAlgorithm = 'identity'; + private maxReceiveMessageLength: number; constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) { super(); - const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; + const compressionAlgorithmKey = + channelOptions['grpc.default_compression_algorithm']; + this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH if (compressionAlgorithmKey !== undefined) { if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm; @@ -200,7 +235,10 @@ export class CompressionFilter extends BaseFilter implements Filter { */ if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) { this.currentCompressionAlgorithm = clientSelectedEncoding; - this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm); + this.sendCompression = getCompressionHandler( + this.currentCompressionAlgorithm, + -1 + ); } } else { logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`); @@ -228,7 +266,7 @@ export class CompressionFilter extends BaseFilter implements Filter { if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { - this.receiveCompression = getCompressionHandler(encoding); + this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength); } } metadata.remove('grpc-encoding'); diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 38646a0cd..242a1cbd8 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -33,7 +33,6 @@ import { } from './resolver'; import { trace } from './logging'; import { SubchannelAddress } from './subchannel-address'; -import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; @@ -310,7 +309,6 @@ export class InternalChannel { } ); this.filterStackFactory = new FilterStackFactory([ - new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this, this.options), ]); this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); diff --git a/packages/grpc-js/src/max-message-size-filter.ts b/packages/grpc-js/src/max-message-size-filter.ts deleted file mode 100644 index 25e4fdc03..000000000 --- a/packages/grpc-js/src/max-message-size-filter.ts +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { BaseFilter, Filter, FilterFactory } from './filter'; -import { WriteObject } from './call-interface'; -import { - Status, - DEFAULT_MAX_SEND_MESSAGE_LENGTH, - DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, -} from './constants'; -import { ChannelOptions } from './channel-options'; -import { Metadata } from './metadata'; - -export class MaxMessageSizeFilter extends BaseFilter implements Filter { - private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; - constructor( - options: ChannelOptions - ) { - super(); - if ('grpc.max_send_message_length' in options) { - this.maxSendMessageSize = options['grpc.max_send_message_length']!; - } - if ('grpc.max_receive_message_length' in options) { - this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; - } - } - - async sendMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxSendMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.message.length > this.maxSendMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`, - metadata: new Metadata() - }; - } else { - return concreteMessage; - } - } - } - - async receiveMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxReceiveMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.length > this.maxReceiveMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`, - metadata: new Metadata() - }; - } else { - return concreteMessage; - } - } - } -} - -export class MaxMessageSizeFilterFactory - implements FilterFactory { - constructor(private readonly options: ChannelOptions) {} - - createFilter(): MaxMessageSizeFilter { - return new MaxMessageSizeFilter(this.options); - } -} diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index be807b959..b06feda14 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,7 +19,6 @@ import { EventEmitter } from 'events'; import * as http2 from 'http2'; import { Duplex, Readable, Writable } from 'stream'; import * as zlib from 'zlib'; -import { promisify } from 'util'; import { Status, @@ -38,8 +37,6 @@ import { Deadline } from './deadline'; import { getErrorCode, getErrorMessage } from './error'; const TRACER_NAME = 'server_call'; -const unzip = promisify(zlib.unzip); -const inflate = promisify(zlib.inflate); function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); @@ -478,19 +475,42 @@ export class Http2ServerCallStream< private getDecompressedMessage( message: Buffer, encoding: string - ): Buffer | Promise { - if (encoding === 'deflate') { - return inflate(message.subarray(5)); - } else if (encoding === 'gzip') { - return unzip(message.subarray(5)); - } else if (encoding === 'identity') { - return message.subarray(5); + ): Buffer | Promise { const messageContents = message.subarray(5); + if (encoding === 'identity') { + return messageContents; + } else if (encoding === 'deflate' || encoding === 'gzip') { + let decompresser: zlib.Gunzip | zlib.Deflate; + if (encoding === 'deflate') { + decompresser = zlib.createInflate(); + } else { + decompresser = zlib.createGunzip(); + } + return new Promise((resolve, reject) => { + let totalLength = 0 + const messageParts: Buffer[] = []; + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}` + }); + } + }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(messageContents); + decompresser.end(); + }); + } else { + return Promise.reject({ + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }); } - - return Promise.reject({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, - }); } sendMetadata(customMetadata?: Metadata) { @@ -807,7 +827,7 @@ export class Http2ServerCallStream< | ServerDuplexStream, encoding: string ) { - const decoder = new StreamDecoder(); + const decoder = new StreamDecoder(this.maxReceiveMessageSize); let readsDone = false; @@ -823,29 +843,34 @@ export class Http2ServerCallStream< }; this.stream.on('data', async (data: Buffer) => { - const messages = decoder.write(data); + let messages: Buffer[]; + try { + messages = decoder.write(data); + } catch (e) { + this.sendError({ + code: Status.RESOURCE_EXHAUSTED, + details: (e as Error).message + }); + return; + } pendingMessageProcessing = true; this.stream.pause(); for (const message of messages) { - if ( - this.maxReceiveMessageSize !== -1 && - message.length > this.maxReceiveMessageSize - ) { - this.sendError({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`, - }); - return; - } this.emit('receiveMessage'); const compressed = message.readUInt8(0) === 1; const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = await this.getDecompressedMessage( - message, - compressedMessageEncoding - ); + let decompressedMessage: Buffer; + try { + decompressedMessage = await this.getDecompressedMessage( + message, + compressedMessageEncoding + ); + } catch (e) { + this.sendError(e as Partial); + return; + } // Encountered an error with decompression; it'll already have been propogated back // Just return early diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 671ad41ae..ea669d14c 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -30,6 +30,8 @@ export class StreamDecoder { private readPartialMessage: Buffer[] = []; private readMessageRemaining = 0; + constructor(private maxReadMessageLength: number) {} + write(data: Buffer): Buffer[] { let readHead = 0; let toRead: number; @@ -60,6 +62,9 @@ export class StreamDecoder { // readSizeRemaining >=0 here if (this.readSizeRemaining === 0) { this.readMessageSize = this.readPartialSize.readUInt32BE(0); + if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) { + throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`); + } this.readMessageRemaining = this.readMessageSize; if (this.readMessageRemaining > 0) { this.readState = ReadState.READING_MESSAGE; diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index f9c24f6bd..a210c9ce7 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -18,7 +18,7 @@ import * as http2 from 'http2'; import * as os from 'os'; -import { Status } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import * as logging from './logging'; @@ -76,7 +76,7 @@ export interface SubchannelCallInterceptingListener extends InterceptingListener } export class Http2SubchannelCall implements SubchannelCall { - private decoder = new StreamDecoder(); + private decoder: StreamDecoder; private isReadFilterPending = false; private isPushPending = false; @@ -106,6 +106,8 @@ export class Http2SubchannelCall implements SubchannelCall { private readonly transport: Transport, private readonly callId: number ) { + const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; + this.decoder = new StreamDecoder(maxReceiveMessageLength); http2Stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { @@ -163,7 +165,13 @@ export class Http2SubchannelCall implements SubchannelCall { return; } this.trace('receive HTTP/2 data frame of length ' + data.length); - const messages = this.decoder.write(data); + let messages: Buffer[]; + try { + messages = this.decoder.write(data); + } catch (e) { + this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message); + return; + } for (const message of messages) { this.trace('parsed message of length ' + message.length); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index a9da5db4d..8c0164e9d 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -62,7 +62,14 @@ export interface TransportDisconnectListener { export interface Transport { getChannelzRef(): SocketRef; getPeerName(): string; - createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): SubchannelCall; + getOptions(): ChannelOptions; + createCall( + metadata: Metadata, + host: string, + method: string, + listener: SubchannelCallInterceptingListener, + subchannelCallStatsTracker: Partial + ): SubchannelCall; addDisconnectListener(listener: TransportDisconnectListener): void; shutdown(): void; } @@ -119,7 +126,7 @@ class Http2Transport implements Transport { constructor( private session: http2.ClientHttp2Session, subchannelAddress: SubchannelAddress, - options: ChannelOptions, + private options: ChannelOptions, /** * Name of the remote server, if it is not the same as the subchannel * address, i.e. if connecting through an HTTP CONNECT proxy. @@ -495,6 +502,10 @@ class Http2Transport implements Transport { return this.subchannelAddressString; } + getOptions() { + return this.options; + } + shutdown() { this.session.close(); unregisterChannelzRef(this.channelzRef); diff --git a/packages/grpc-js/test/fixtures/test_service.proto b/packages/grpc-js/test/fixtures/test_service.proto index 64ce0d378..2a7a303f3 100644 --- a/packages/grpc-js/test/fixtures/test_service.proto +++ b/packages/grpc-js/test/fixtures/test_service.proto @@ -21,6 +21,7 @@ message Request { bool error = 1; string message = 2; int32 errorAfter = 3; + int32 responseLength = 4; } message Response { diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 91b7c196c..2ad67c6cc 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -33,6 +33,7 @@ import { } from '../src/server-call'; import { loadProtoFile } from './common'; +import { CompressionAlgorithms } from '../src/compression-algorithms'; const protoFile = join(__dirname, 'fixtures', 'test_service.proto'); const testServiceDef = loadProtoFile(protoFile); @@ -309,7 +310,7 @@ describe('Other conditions', () => { trailerMetadata ); } else { - cb(null, { count: 1 }, trailerMetadata); + cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata); } }, @@ -319,6 +320,7 @@ describe('Other conditions', () => { ) { let count = 0; let errored = false; + let responseLength = 0; stream.on('data', (data: any) => { if (data.error) { @@ -326,13 +328,14 @@ describe('Other conditions', () => { errored = true; cb(new Error(message) as ServiceError, null, trailerMetadata); } else { + responseLength += data.responseLength; count++; } }); stream.on('end', () => { if (!errored) { - cb(null, { count }, trailerMetadata); + cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata); } }); }, @@ -348,7 +351,7 @@ describe('Other conditions', () => { }); } else { for (let i = 1; i <= 5; i++) { - stream.write({ count: i }); + stream.write({ count: i, message: 'a'.repeat(req.responseLength) }); if (req.errorAfter && req.errorAfter === i) { stream.emit('error', { code: grpc.status.UNKNOWN, @@ -375,7 +378,7 @@ describe('Other conditions', () => { err.metadata.add('count', '' + count); stream.emit('error', err); } else { - stream.write({ count }); + stream.write({ count, message: 'a'.repeat(data.responseLength) }); count++; } }); @@ -739,6 +742,44 @@ describe('Other conditions', () => { }); }); }); + + describe('Max message size', () => { + const largeMessage = 'a'.repeat(10_000_000); + it('Should be enforced on the server', done => { + client.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + it('Should be enforced on the client', done => { + client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + describe('Compressed messages', () => { + it('Should be enforced with gzip', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + it('Should be enforced with deflate', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + }); + }); }); function identity(arg: any): any { From 3b110cddfe4c895d6f642092b99a6667cef5ae00 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 7 Jun 2024 10:58:16 -0700 Subject: [PATCH 2/2] grpc-js: Bump to 1.8.22 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 29c2a00ee..19e37d19b 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.21", + "version": "1.8.22", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",