diff --git a/index.d.ts b/index.d.ts index 57a2368ae..b28b85eec 100644 --- a/index.d.ts +++ b/index.d.ts @@ -126,6 +126,10 @@ interface ClientOptions { password?: string; }; disablePrototypePoisoningProtection?: boolean | 'proto' | 'constructor'; + memoryCircuitBreaker?: { + enabled: boolean; + maxPercentage: number; + } } declare class Client { diff --git a/index.js b/index.js index 6c137649a..c5e244d2c 100644 --- a/index.js +++ b/index.js @@ -178,7 +178,8 @@ class Client extends OpenSearchAPI { generateRequestId: options.generateRequestId, name: options.name, opaqueIdPrefix: options.opaqueIdPrefix, - context: options.context + context: options.context, + memoryCircuitBreaker: options.memoryCircuitBreaker }) this.helpers = new Helpers({ diff --git a/lib/Transport.js b/lib/Transport.js index e4d50ca9c..ce67a5c42 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -51,6 +51,7 @@ const clientVersion = require('../package.json').version const userAgent = `opensearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})` const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH +const HEAP_SIZE_LIMIT = require('v8').getHeapStatistics().heap_size_limit const kCompatibleCheck = Symbol('compatible check') const kApiVersioning = Symbol('api versioning') @@ -81,6 +82,7 @@ class Transport { this.opaqueIdPrefix = opts.opaqueIdPrefix this[kCompatibleCheck] = 0 // 0 = to be checked, 1 = checking, 2 = checked-ok, 3 checked-notok this[kApiVersioning] = process.env.OPENSEARCH_CLIENT_APIVERSIONING === 'true' + this.memoryCircuitBreaker = opts.memoryCircuitBreaker this.nodeFilter = opts.nodeFilter || defaultNodeFilter if (typeof opts.nodeSelector === 'function') { @@ -247,10 +249,10 @@ class Transport { const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase() const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1 - /* istanbul ignore else */ if (result.headers['content-length'] !== undefined) { const contentLength = Number(result.headers['content-length']) + // nodeJS data type limit check if (isCompressed && contentLength > MAX_BUFFER_LENGTH) { response.destroy() return onConnectionError( @@ -261,6 +263,12 @@ class Transport { return onConnectionError( new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed string (${MAX_STRING_LENGTH})`, result) ) + } else if (shouldApplyCircuitBreaker(contentLength)) { + // Abort this response to avoid OOM crash of dashboards. + response.destroy() + return onConnectionError( + new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed heap memory limit.`, result) + ) } } // if the response is compressed, we must handle it @@ -301,6 +309,13 @@ class Transport { response.on('end', onEnd) response.on('aborted', onAbort) } + // Helper function to check if memory circuit breaker enabled and the response payload is too large to fit into available heap memory. + const shouldApplyCircuitBreaker = (contentLength) => { + if (!this.memoryCircuitBreaker || !this.memoryCircuitBreaker.enabled) return false + const maxPercentage = validateMemoryPercentage(this.memoryCircuitBreaker.maxPercentage) + const heapUsed = process.memoryUsage().heapUsed + return contentLength + heapUsed > HEAP_SIZE_LIMIT * maxPercentage + } const onBody = (err, payload) => { if (err) { @@ -636,6 +651,11 @@ function lowerCaseHeaders (oldHeaders) { return newHeaders } +function validateMemoryPercentage (percentage) { + if (percentage < 0 || percentage > 1) return 1.0 + return percentage +} + module.exports = Transport module.exports.internals = { defaultNodeFilter, diff --git a/package.json b/package.json index f363690f2..f6d646a16 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "./": "./" }, "homepage": "https://www.opensearch.org/", - "version": "1.0.0", + "version": "1.1.0", "versionCanary": "7.10.0-canary.6", "keywords": [ "opensearch", diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 075739d53..cc2e04c83 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -951,6 +951,52 @@ test('Content length too big (string)', t => { }) }) +test('Content length exceeds max heap limit', t => { + t.plan(4) + const percentage = 0.8 + const HEAP_SIZE_LIMIT_WITH_BUFFER = Number(require('v8').getHeapStatistics().heap_size_limit * percentage) + const contentLength = buffer.constants.MAX_STRING_LENGTH - 1 + const memoryCircuitBreaker = { + enabled: true, + maxPercentage: percentage + } + // Simulate allocation of bytes + const memoryAllocations = [] + while (process.memoryUsage().heapUsed + contentLength <= HEAP_SIZE_LIMIT_WITH_BUFFER) { + const allocation = 50 * 1024 * 1024 // 50MB + const numbers = allocation / 8 + const arr = [] + arr.length = numbers + for (let i = 0; i < numbers; i++) { + arr[i] = i + } + memoryAllocations.push(arr) + } + + class MockConnection extends Connection { + request (params, callback) { + const stream = intoStream(JSON.stringify({ hello: 'world' })) + stream.statusCode = 200 + stream.headers = { + 'content-type': 'application/json;utf=8', + 'content-length': contentLength, + connection: 'keep-alive', + date: new Date().toISOString() + } + stream.on('close', () => t.pass('Stream destroyed')) + process.nextTick(callback, null, stream) + return { abort () { } } + } + } + + const client = new Client({ node: 'http://localhost:9200', Connection: MockConnection, memoryCircuitBreaker }) + client.info((err, result) => { + t.ok(err instanceof errors.RequestAbortedError) + t.equal(err.message, `The content length (${contentLength}) is bigger than the maximum allowed heap memory limit.`) + t.equal(result.meta.attempts, 0) + }) +}) + test('Prototype poisoning protection enabled by default', t => { t.plan(1)