Skip to content

Commit

Permalink
[1.x] backpack circuit-breaker PR (#215)
Browse files Browse the repository at this point in the history
Backport PR: #207

Signed-off-by: Anan Zhuang <[email protected]>
  • Loading branch information
ananzh authored Apr 26, 2022
1 parent bf265ac commit 343d8ce
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 3 deletions.
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ interface ClientOptions {
password?: string;
};
disablePrototypePoisoningProtection?: boolean | 'proto' | 'constructor';
memoryCircuitBreaker?: {
enabled: boolean;
maxPercentage: number;
}
}

declare class Client {
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ class Client extends OpenSearchAPI {
generateRequestId: options.generateRequestId,
name: options.name,
opaqueIdPrefix: options.opaqueIdPrefix,
context: options.context
context: options.context,
memoryCircuitBreaker: options.memoryCircuitBreaker
})

this.helpers = new Helpers({
Expand Down
22 changes: 21 additions & 1 deletion lib/Transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const clientVersion = require('../package.json').version
const userAgent = `opensearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})`
const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH
const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH
const HEAP_SIZE_LIMIT = require('v8').getHeapStatistics().heap_size_limit
const kCompatibleCheck = Symbol('compatible check')
const kApiVersioning = Symbol('api versioning')

Expand Down Expand Up @@ -81,6 +82,7 @@ class Transport {
this.opaqueIdPrefix = opts.opaqueIdPrefix
this[kCompatibleCheck] = 0 // 0 = to be checked, 1 = checking, 2 = checked-ok, 3 checked-notok
this[kApiVersioning] = process.env.OPENSEARCH_CLIENT_APIVERSIONING === 'true'
this.memoryCircuitBreaker = opts.memoryCircuitBreaker

this.nodeFilter = opts.nodeFilter || defaultNodeFilter
if (typeof opts.nodeSelector === 'function') {
Expand Down Expand Up @@ -247,10 +249,10 @@ class Transport {

const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase()
const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1

/* istanbul ignore else */
if (result.headers['content-length'] !== undefined) {
const contentLength = Number(result.headers['content-length'])
// nodeJS data type limit check
if (isCompressed && contentLength > MAX_BUFFER_LENGTH) {
response.destroy()
return onConnectionError(
Expand All @@ -261,6 +263,12 @@ class Transport {
return onConnectionError(
new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed string (${MAX_STRING_LENGTH})`, result)
)
} else if (shouldApplyCircuitBreaker(contentLength)) {
// Abort this response to avoid OOM crash of dashboards.
response.destroy()
return onConnectionError(
new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed heap memory limit.`, result)
)
}
}
// if the response is compressed, we must handle it
Expand Down Expand Up @@ -301,6 +309,13 @@ class Transport {
response.on('end', onEnd)
response.on('aborted', onAbort)
}
// Helper function to check if memory circuit breaker enabled and the response payload is too large to fit into available heap memory.
const shouldApplyCircuitBreaker = (contentLength) => {
if (!this.memoryCircuitBreaker || !this.memoryCircuitBreaker.enabled) return false
const maxPercentage = validateMemoryPercentage(this.memoryCircuitBreaker.maxPercentage)
const heapUsed = process.memoryUsage().heapUsed
return contentLength + heapUsed > HEAP_SIZE_LIMIT * maxPercentage
}

const onBody = (err, payload) => {
if (err) {
Expand Down Expand Up @@ -636,6 +651,11 @@ function lowerCaseHeaders (oldHeaders) {
return newHeaders
}

function validateMemoryPercentage (percentage) {
if (percentage < 0 || percentage > 1) return 1.0
return percentage
}

module.exports = Transport
module.exports.internals = {
defaultNodeFilter,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"./": "./"
},
"homepage": "https://www.opensearch.org/",
"version": "1.0.0",
"version": "1.1.0",
"versionCanary": "7.10.0-canary.6",
"keywords": [
"opensearch",
Expand Down
46 changes: 46 additions & 0 deletions test/unit/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,52 @@ test('Content length too big (string)', t => {
})
})

test('Content length exceeds max heap limit', t => {
t.plan(4)
const percentage = 0.8
const HEAP_SIZE_LIMIT_WITH_BUFFER = Number(require('v8').getHeapStatistics().heap_size_limit * percentage)
const contentLength = buffer.constants.MAX_STRING_LENGTH - 1
const memoryCircuitBreaker = {
enabled: true,
maxPercentage: percentage
}
// Simulate allocation of bytes
const memoryAllocations = []
while (process.memoryUsage().heapUsed + contentLength <= HEAP_SIZE_LIMIT_WITH_BUFFER) {
const allocation = 50 * 1024 * 1024 // 50MB
const numbers = allocation / 8
const arr = []
arr.length = numbers
for (let i = 0; i < numbers; i++) {
arr[i] = i
}
memoryAllocations.push(arr)
}

class MockConnection extends Connection {
request (params, callback) {
const stream = intoStream(JSON.stringify({ hello: 'world' }))
stream.statusCode = 200
stream.headers = {
'content-type': 'application/json;utf=8',
'content-length': contentLength,
connection: 'keep-alive',
date: new Date().toISOString()
}
stream.on('close', () => t.pass('Stream destroyed'))
process.nextTick(callback, null, stream)
return { abort () { } }
}
}

const client = new Client({ node: 'http://localhost:9200', Connection: MockConnection, memoryCircuitBreaker })
client.info((err, result) => {
t.ok(err instanceof errors.RequestAbortedError)
t.equal(err.message, `The content length (${contentLength}) is bigger than the maximum allowed heap memory limit.`)
t.equal(result.meta.attempts, 0)
})
})

test('Prototype poisoning protection enabled by default', t => {
t.plan(1)

Expand Down

0 comments on commit 343d8ce

Please sign in to comment.