Skip to content

Commit

Permalink
refactor: enhance elastic control of connections (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer authored Feb 26, 2019
1 parent 4373f2c commit 61142bb
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 12 deletions.
33 changes: 30 additions & 3 deletions lib/client/address_group.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,28 @@ class AddressGroup extends Base {
}

_refresh() {
this.addressList = this._allAddressList;
if (this._allAddressList) {
this.addressList = this._allAddressList;
}
}

_chooseAddresses(addressList) {
const newSet = new Set();
const oldSet = new Set();

// 只有地址真正改变的时候才 shuffle 一次
if (this._allAddressList !== addressList) {
// 找出推送过来新增加的地址(扩容的)
if (this._allAddressList) {
for (const addr of this._allAddressList) {
oldSet.add(addr.host);
}
}
for (const addr of addressList) {
if (!oldSet.has(addr.host)) {
newSet.add(addr.host);
}
}
this._allAddressList = utils.shuffle(addressList);
}
this._totalSize = this._allAddressList.length;
Expand All @@ -178,7 +194,12 @@ class AddressGroup extends Base {
const connections = this.connectionManager.connections;
for (const address of this._allAddressList) {
const unHealthy = this._faultAddressMap.has(address.host) || this._weightMap.get(address.host) < DEFAULT_WEIGHT;
if (connections.has(address.host) && !unHealthy && leftCount) {
const isHealthy = connections.has(address.host) && !unHealthy;
// 需求:新推送的地址或已经连接上的地址,优先被选中
//
// 因为当客户端集群很大,服务端很小的场景,客户端这边的分摊到一台机器的 qps 可能不高,
// 但是服务端确可能很高,这个时候服务端扩容,如果按照老的逻辑,客户端这边认为不需要扩容,则不会去尝试连接扩容的机器(连上的优先)
if ((isHealthy || newSet.has(address.host)) && leftCount > 0) {
choosedAddressList.push(address);
leftCount--;
} else if (unHealthy) {
Expand Down Expand Up @@ -322,6 +343,7 @@ class AddressGroup extends Base {
.then(conn => {
if (conn) {
this._faultAddressMap.delete(address.host);

} else if (this._weightMap.has(address.host)) {
this._weightMap.set(address.host, 0);
this._faultAddressMap.set(address.host, address);
Expand Down Expand Up @@ -392,7 +414,12 @@ class AddressGroup extends Base {
}

function printAddresses(addressList) {
return '\n' + addressList.map(addr => ' - ' + addr.href).join('\n');
let list = addressList.map(addr => ' - ' + addr.href);
if (list.length > 20) {
list = list.slice(0, 20);
list.push('... only 20 first addresses will be shown here!');
}
return '\n' + list.join('\n');
}

module.exports = AddressGroup;
13 changes: 9 additions & 4 deletions lib/client/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,8 @@ class RpcConsumer extends Base {
}
return res.appResponse;
} catch (err) {
if (req.meta.resultCode === '00') {
req.meta.resultCode = err.resultCode || '01';
}
this._wrapError(err, req);
if (this.options.errorAsNull !== true) throw err;
this.logger.warn(err);
return null;
} finally {
if (req.meta.connectionGroup) {
Expand All @@ -146,6 +143,14 @@ class RpcConsumer extends Base {
}
}

_wrapError(err, req) {
if (req.meta.resultCode === '00') {
req.meta.resultCode = err.resultCode || '01';
}
err.resultCode = req.meta.resultCode;
req.meta.error = err;
}

parseUrl(url) {
assert(typeof url === 'string', 'parseUrl(url) url should be string');
const address = urlparse(url.indexOf('://') >= 0 ? url : `bolt://${url}`, true);
Expand Down
1 change: 1 addition & 0 deletions lib/client/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class RpcRequest {
responseDecodeRT: 0,
resSize: 0,
rt: null,
error: null,
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/server/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class RpcConnection extends Base {
}

_handleSocketError(err) {
// antvip | vipserver 心跳检查可能频繁的建连和断连,所以如果是 ECONNRESET 就忽略,避免打印很多无用的日志
// 心跳检查可能频繁的建连和断连,所以如果是 ECONNRESET 就忽略,避免打印很多无用的日志
if (err.code !== 'ECONNRESET') {
this.logger.warn('[RpcConnection] error occured on socket: %s, errName: %s, errMsg: %s', this.remoteAddress, err.name, err.message);
}
Expand Down
10 changes: 7 additions & 3 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const defaultOptions = {
protocol: require('sofa-bolt-node'),
serviceClass: RpcService,
responseClass: RpcResponse,
connectionClass: RpcConnection,
idleTime: 5000,
killTimeout: 30000,
maxIdleTime: 90 * 1000,
Expand Down Expand Up @@ -57,6 +58,9 @@ class RpcServer extends Base {
this.proto = options.proto;
this.publishAddress = this.options.publishAddress || this._localIp;
this.publishPort = this.options.port;
this.serviceClass = this.options.serviceClass;
this.responseClass = this.options.responseClass;
this.connectionClass = this.options.connectionClass;
}

// 给单元测试用
Expand Down Expand Up @@ -137,7 +141,7 @@ class RpcServer extends Base {
if (typeof info === 'string') {
info = { interfaceName: info };
}
const service = new this.options.serviceClass(Object.assign({
const service = new this.serviceClass(Object.assign({
registry: this.registry,
logger: this.logger,
group: this.options.group,
Expand Down Expand Up @@ -244,7 +248,7 @@ class RpcServer extends Base {
// 每一个 connection 实例化一个 classCache
options.classCache = new this.options.classCacheClass();
}
const conn = new RpcConnection(options);
const conn = new this.connectionClass(options);
const key = conn.remoteAddress;
this._connections.set(key, conn);
conn.on('request', req => {
Expand All @@ -262,7 +266,7 @@ class RpcServer extends Base {
const id = req.data.serverSignature;
req.data.interfaceName = req.data.interfaceName || req.data.serverSignature.split(':')[0];
const service = this._services.get(id);
const res = new this.options.responseClass(req, conn);
const res = new this.responseClass(req, conn);
const ctx = this.createContext(req, res);
this.emit('request', { req, ctx });
try {
Expand Down
2 changes: 1 addition & 1 deletion test/client/address_group.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ describe('test/address_group.test.js', () => {
}

addressGroup = new AddressGroup({
key: 'com.alipay.cif.user.UserInfoQueryService:1.0@SOFA@rz00a',
key: 'com.alipay.TestQueryService:1.0@SOFA@xxxx',
logger,
connectionManager,
connectionClass: MockConnection,
Expand Down
64 changes: 64 additions & 0 deletions test/client/elastic_control.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'use strict';

const mm = require('mm');
const assert = require('assert');
const urlparse = require('url').parse;
const utils = require('../../lib/client/utils');
const MockConnection = require('../fixtures/mock_connection');
const AddressGroup = require('../../lib/client/address_group');
const ConnectionManager = require('../../lib/client/connection_mgr');

const logger = console;

describe('test/client/elastic_control.test.js', () => {
let connectionManager;
let addressGroup;
const count = 10;

const addressList = [];

before(async () => {
connectionManager = new ConnectionManager({ logger });
await connectionManager.ready();

for (let i = 0; i < count; i++) {
const address = urlparse(`tr://127.0.0.${i}:12200`);
addressList.push(address);
MockConnection.addAvailableAddress(address);
}

addressGroup = new AddressGroup({
key: 'xxx',
logger,
connectionManager,
connectionClass: MockConnection,
retryFaultInterval: 5000,
});
addressGroup.connectionPoolSize = 2;
addressGroup.addressList = addressList;

await addressGroup.ready();
});

afterEach(mm.restore);

after(async () => {
MockConnection.clearAvailableAddress();
addressGroup.close();
await connectionManager.closeAllConnections();
});

it('should use new address', () => {
assert(addressGroup.addressList.length === 2);
mm(utils, 'shuffle', arr => arr);

const newAddress = urlparse('rpc://127.0.0.11:12200');
MockConnection.addAvailableAddress(newAddress);

const newAddressList = [ newAddress ].concat(addressList);
addressGroup.addressList = newAddressList;

assert(addressGroup.addressList.length === 2);
assert(addressGroup.addressList[0].href === 'rpc://127.0.0.11:12200');
});
});

0 comments on commit 61142bb

Please sign in to comment.