diff --git a/lib/client/address_group.js b/lib/client/address_group.js index 336f68b..d13790a 100644 --- a/lib/client/address_group.js +++ b/lib/client/address_group.js @@ -155,6 +155,22 @@ class AddressGroup extends Base { } } + _needElasticControl(addressCount) { + const connectionPoolConfig = this.connectionPoolConfig; + if (!connectionPoolConfig) return false; + + // 如果地址不够,禁用弹性控制 + if (addressCount < connectionPoolConfig.minAddressCount) { + return false; + } + + const enableThreshold = connectionPoolConfig.enableThreshold || 50; + // 开启弹性控制有两个条件 + // 1. 配置 elasticControl = true + // 2. 当前分组的地址数量要大于开启的阈值(enableThreshold) + return connectionPoolConfig.elasticControl && addressCount > enableThreshold; + } + _chooseAddresses(addressList) { const newSet = new Set(); const oldSet = new Set(); @@ -177,7 +193,7 @@ class AddressGroup extends Base { this._totalSize = this._allAddressList.length; this._unChoosedAddressList = []; // 禁用弹性控制直接返回 或者 地址太少,直接返回 - if (!this.connectionPoolConfig.elasticControl || this._totalSize < this.connectionPoolConfig.minAddressCount) { + if (!this._needElasticControl(this._totalSize)) { return this._allAddressList; } if (this._connectionPoolSize > this._totalSize) { diff --git a/lib/client/consumer.js b/lib/client/consumer.js index 01d5efb..a119668 100644 --- a/lib/client/consumer.js +++ b/lib/client/consumer.js @@ -92,8 +92,8 @@ class RpcConsumer extends Base { await this._addressGroup.ready(); } - createAddressGroup(key) { - return new AddressGroup(Object.assign({ key }, this.options)); + createAddressGroup(key, options) { + return new AddressGroup(Object.assign({ key }, this.options, options)); } createRequest(method, args, options) { diff --git a/lib/client/dynamic_config.js b/lib/client/dynamic_config.js index 48da57e..9274ae7 100644 --- a/lib/client/dynamic_config.js +++ b/lib/client/dynamic_config.js @@ -56,6 +56,7 @@ module.exports = { // 连接池配置 connectionPoolConfig: { + enableThreshold: 50, minAddressCount: 5, maxAddressCount: 50, initConnectionSize: 6, diff --git a/package.json b/package.json index 2ec216f..626af5f 100644 --- a/package.json +++ b/package.json @@ -56,10 +56,10 @@ "coffee": "^5.2.1", "contributors": "^0.5.1", "dubbo-remoting": "^2.1.4", - "egg-bin": "^4.11.0", + "egg-bin": "^4.11.1", "eslint": "^5.15.1", - "eslint-config-egg": "^7.1.0", - "mm": "^2.4.1", + "eslint-config-egg": "^7.2.0", + "mm": "^2.5.0", "node-zookeeper-client": "^0.2.2", "pedding": "^1.1.0" }, @@ -68,6 +68,6 @@ }, "ci": { "type": "travis", - "version": "8, 10" + "version": "8, 10, 11" } } diff --git a/test/client/address_group.test.js b/test/client/address_group.test.js index 09bfdc1..33679ef 100644 --- a/test/client/address_group.test.js +++ b/test/client/address_group.test.js @@ -1520,5 +1520,23 @@ describe('test/address_group.test.js', () => { assert(addressGroup.addressList.length === 50); assert(addressGroup._choosedSize === 50); }); + + it('弹性控制', () => { + mm(addressGroup, 'connectionPoolConfig', null); + assert(!addressGroup._needElasticControl(100)); + assert(!addressGroup._needElasticControl(10)); + + mm.restore(); + + mm(addressGroup.connectionPoolConfig, 'minAddressCount', 10); + assert(!addressGroup._needElasticControl(9)); + assert(!addressGroup._needElasticControl(2)); + + mm(addressGroup.connectionPoolConfig, 'enableThreshold', 50); + + assert(addressGroup._needElasticControl(51)); + assert(!addressGroup._needElasticControl(50)); + assert(!addressGroup._needElasticControl(49)); + }); }); }); diff --git a/test/client/elastic_control.test.js b/test/client/elastic_control.test.js index e73875d..10f1c0f 100644 --- a/test/client/elastic_control.test.js +++ b/test/client/elastic_control.test.js @@ -13,7 +13,7 @@ const logger = console; describe('test/client/elastic_control.test.js', () => { let connectionManager; let addressGroup; - const count = 10; + const count = 51; const addressList = []; @@ -52,13 +52,13 @@ describe('test/client/elastic_control.test.js', () => { assert(addressGroup.addressList.length === 2); mm(utils, 'shuffle', arr => arr); - const newAddress = urlparse('rpc://127.0.0.11:12200'); + const newAddress = urlparse('rpc://127.0.0.52:12200'); MockConnection.addAvailableAddress(newAddress); const newAddressList = [ newAddress ].concat(addressList); addressGroup.addressList = newAddressList; assert(addressGroup.addressList.length === 2); - assert(addressGroup.addressList[0].href === 'rpc://127.0.0.11:12200'); + assert(addressGroup.addressList[0].href === 'rpc://127.0.0.52:12200'); }); });