Skip to content

Commit

Permalink
make channel promise more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
vita-dounai committed Sep 20, 2019
1 parent 14ea949 commit 385ec32
Showing 1 changed file with 104 additions and 30 deletions.
134 changes: 104 additions & 30 deletions packages/caliper-fisco-bcos/lib/channelPromise.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,46 @@

'use strict';

const CaliperUtils = require('caliper-core').CaliperUtils;
const tls = require('tls');
const fs = require('fs');
const net = require('net');
const uuidv4 = require('uuid/v4');
const events = require('events');
const Color = require('./constant').Color;
const commLogger = CaliperUtils.getLogger('fiscoBcosApi.js');
const { NetworkError } = require('./exceptions').NetworkError;

let sockets = new Map();
let emitters = new Map();
let buffers = new Map();
let sockets = new Map();
let lastBytesRead = new Map();
/**
* Parse response returned by node
* @param {Buffer} response Node's response
*/

function parseResponse(response) {
let seq = response.slice(6, 38).toString();
let result = JSON.parse(response.slice(42).toString());
let emitter = emitters.get(seq).emitter;

if (emitter) {
let readOnly = Object.getOwnPropertyDescriptor(emitter, 'readOnly').value;
if (readOnly) {
if (result.error || result.result !== undefined ) {
emitter.emit('gotresult', result);
}
} else {
if (result.error || result.status || (result.result && result.result.status)) {
emitter.emit('gotresult', result);
} else {
if (!result.result) {
throw new NetworkError(`unknown message receieved, seq=${seq}, data=${response.toString()}`);
}
}
}
} else {
throw new NetworkError(`unknown owner message receieved, seq=${seq}, data=${response.toString()}`);
}
}

/**
* Create a new TLS socket
Expand Down Expand Up @@ -57,6 +86,10 @@ function createNewSocket(ip, port, authentication) {
tlsSocket.on('error', function (error) {
throw new Error(error);
});

let socketID = `${ip}:${port}`;

lastBytesRead.set(socketID, 0);

tlsSocket.on('data', function (data) {
let response = null;
Expand All @@ -67,22 +100,35 @@ function createNewSocket(ip, port, authentication) {
response = Buffer.from(data, 'ascii');
}

let seq = response.slice(6, 38).toString();
let result = JSON.parse(response.slice(42).toString());

if (result.error || result.status || (result.result && result.result.status)) {
let emitter = emitters.get(seq).emitter;
if (emitter) {
emitter.emit('gotconsensus', result);
clearTimeout(emitters.get(seq).timer);
emitters.delete(seq);
if (!buffers.has(socketID)) {
// First time to read data from this socket
let expectedLength = null;
if (tlsSocket.bytesRead - lastBytesRead.get(socketID) >= 4) {
expectedLength = response.readUIntBE(0, 4);
}
else {
commLogger.error(Color.error(`Unknown owner message receieved, seq=${seq}, data=${data}`));

if (!expectedLength || tlsSocket.bytesRead < lastBytesRead.get(socketID) + expectedLength) {
buffers.set(socketID, {
expectedLength: expectedLength,
buffer: response
});
} else {
lastBytesRead.set(socketID, lastBytesRead.get(socketID) + expectedLength);
parseResponse(response);
buffers.delete(socketID);
}
} else {
if (!result.result) {
commLogger.error(Color.error(`Unknown message receieved, seq=${seq}, data=${data}`));
// Multiple reading
let cache = buffers.get(socketID);
cache.buffer = Buffer.concat([cache.buffer, response]);
if (!cache.expectedLength && tlsSocket.bytesRead - lastBytesRead.get(socketID) >= 4) {
cache.expectedLength = cache.buffer.readUIntBE(0, 4);
}

if (cache.expectedLength && tlsSocket.bytesRead - lastBytesRead.get(socketID) >= cache.expectedLength) {
lastBytesRead.set(socketID, lastBytesRead.get(socketID) + cache.expectedLength);
parseResponse(buffers.get(socketID).buffer);
buffers.delete(socketID);
}
}
});
Expand Down Expand Up @@ -115,36 +161,64 @@ function packageData(data) {
};
}

/**
* Clear context when a message got response or timeout
* @param {Socket} socket The socket who sends the message
*/
function clearContext(uuid) {
clearTimeout(emitters.get(uuid).timer);
emitters.delete(uuid);
buffers.delete(uuid);
}

/**
* Return channel promise for a request
* @param {Object} node A JSON object which contains IP and port configuration of channel server
* @param {Object} authentication A JSON object contains certificate file path, private key file path and CA file path
* @param {String} data JSON string of load
* @param {Number} timeout Timeout to wait response
* @return {Promise} a promise which will be resovled when the request is satisfied
* @return {Promise} a promise which will be resolved when the request is satisfied
*/
function channelPromise(node, authentication, data, timeout) {
function channelPromise(node, authentication, data, timeout, readOnly = false) {
let ip = node.ip;
let port = node.channelPort;
let nodeKey = `${ip}:${port}`;
if (!sockets.has(nodeKey)) {
let tlsSocket = createNewSocket(ip, port, authentication);
sockets.set(nodeKey, tlsSocket);
let port = node.port;

let connectionID = `${ip}${port}`;
if (!sockets.has(connectionID)) {
let newSocket = createNewSocket(ip, port, authentication);
newSocket.unref();
sockets.set(connectionID, newSocket);
}
let tlsSocket = sockets.get(connectionID);

let socket = sockets.get(nodeKey);
let dataPackage = packageData(JSON.stringify(data));
let uuid = dataPackage.uuid;

tlsSocket.socketID = uuid;
let packagedData = dataPackage.packagedData;
let channelPromise = new Promise(async (resolve) => {
let channelPromise = new Promise(async (resolve, reject) => {
let eventEmitter = new events.EventEmitter();
Object.defineProperty(eventEmitter, "readOnly", {
value: readOnly,
writable: false,
configurable: false,
enumerable: false
});

eventEmitter.on('gotconsensus', (result) => {
resolve(result);
eventEmitter.on('gotresult', (result) => {
clearContext(uuid);
if (result.error) {
reject(result);
} else {
resolve(result);
}
return; // This `return` is not necessary, but it may can avoid future trap
});

eventEmitter.on('timeout', () => {
resolve({ 'error': 'timeout' });
clearContext(uuid);
reject({ 'error': 'timeout' });
return; // This `return` is not necessary, but it may can avoid future trap
});

emitters.set(uuid, {
Expand All @@ -154,7 +228,7 @@ function channelPromise(node, authentication, data, timeout) {
}, timeout)
});

socket.write(packagedData);
tlsSocket.write(packagedData);
});
return channelPromise;
}
Expand Down

0 comments on commit 385ec32

Please sign in to comment.