Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #35 from LiskHQ/26-exclusive_sync_tasks
Browse files Browse the repository at this point in the history
Ensure forge and sync process exclusiveness - Closes #26
  • Loading branch information
Oliver Beddows authored Jan 23, 2018
2 parents a504361 + 957e514 commit d305c88
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 42 deletions.
2 changes: 0 additions & 2 deletions logic/transactionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,6 @@ TransactionPool.prototype.expireTransactions = function (cb) {
* @returns {setImmediateCallback|applyUnconfirmedList}
*/
TransactionPool.prototype.fillPool = function (cb) {
if (modules.loader.syncing()) { return setImmediate(cb); }

var unconfirmedCount = self.countUnconfirmed();
library.logger.debug('Transaction pool size: ' + unconfirmedCount);

Expand Down
77 changes: 38 additions & 39 deletions modules/delegates.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ __private.loaded = false;
__private.blockReward = new BlockReward();
__private.keypairs = {};
__private.tmpKeypairs = {};
__private.forgeInterval = 1000;

/**
* Initializes library with scope content and generates a Delegate instance.
Expand Down Expand Up @@ -61,7 +62,7 @@ function Delegates (cb, scope) {
self = this;

__private.assetTypes[transactionTypes.DELEGATE] = library.logic.transaction.attachAssetType(
transactionTypes.DELEGATE,
transactionTypes.DELEGATE,
new Delegate(
scope.logger,
scope.schema
Expand All @@ -76,7 +77,7 @@ function Delegates (cb, scope) {
* Gets delegate public keys sorted by vote descending.
* @private
* @param {function} cb - Callback function.
* @returns {setImmediateCallback}
* @returns {setImmediateCallback}
*/
__private.getKeysSortByVote = function (cb) {
modules.accounts.getAccounts({
Expand All @@ -97,7 +98,7 @@ __private.getKeysSortByVote = function (cb) {
* Gets delegate public keys from previous round, sorted by vote descending.
* @private
* @param {function} cb - Callback function.
* @returns {setImmediateCallback}
* @returns {setImmediateCallback}
*/
__private.getDelegatesFromPreviousRound = function (cb) {
library.db.query(sql.getDelegatesSnapshot, {limit: slots.delegates}).then(function (rows) {
Expand Down Expand Up @@ -169,11 +170,11 @@ __private.getBlockSlotData = function (slot, height, cb) {
};

/**
* Gets peers, checks consensus and generates new block, once delegates
* Gets peers, checks consensus and generates new block, once delegates
* are enabled, client is ready to forge and is the correct slot.
* @private
* @param {function} cb - Callback function.
* @returns {setImmediateCallback}
* @returns {setImmediateCallback}
*/
__private.forge = function (cb) {
if (!Object.keys(__private.keypairs).length) {
Expand Down Expand Up @@ -207,26 +208,20 @@ __private.forge = function (cb) {
return setImmediate(cb);
}

library.sequence.add(function (cb) {
async.series({
getPeers: function (seriesCb) {
return modules.transport.getPeers({limit: constants.maxPeers}, seriesCb);
},
checkBroadhash: function (seriesCb) {
if (modules.transport.poorConsensus()) {
return setImmediate(seriesCb, ['Inadequate broadhash consensus', modules.transport.consensus(), '%'].join(' '));
} else {
return setImmediate(seriesCb);
}
}
}, function (err) {
if (err) {
library.logger.warn(err);
return setImmediate(cb, err);
async.series({
getPeers: function (seriesCb) {
return modules.transport.getPeers({limit: constants.maxPeers}, seriesCb);
},
checkBroadhash: function (seriesCb) {
if (modules.transport.poorConsensus()) {
return setImmediate(seriesCb, ['Inadequate broadhash consensus', modules.transport.consensus(), '%'].join(' '));
} else {
return modules.blocks.process.generateBlock(currentBlockData.keypair, currentBlockData.time, cb);
return setImmediate(seriesCb);
}
});
},
generateBlock: function (seriesCb) {
return modules.blocks.process.generateBlock(currentBlockData.keypair, currentBlockData.time, seriesCb);
}
}, function (err) {
if (err) {
library.logger.error('Failed to generate block within delegate slot', err);
Expand Down Expand Up @@ -338,7 +333,7 @@ __private.checkDelegates = function (publicKey, votes, state, cb) {
* Loads delegates from config and stores in private `keypairs`.
* @private
* @param {function} cb - Callback function.
* @returns {setImmediateCallback}
* @returns {setImmediateCallback}
*/
__private.loadDelegates = function (cb) {
var secrets;
Expand Down Expand Up @@ -584,29 +579,33 @@ Delegates.prototype.onBind = function (scope) {
);
};

__private.nextForge = function (sequenceCb) {
async.series([
__private.forge,
modules.transactions.fillPool
], sequenceCb);
};

__private.logLoadDelegatesError = function (err, cb) {
library.logger.error('Failed to load delegates', err);
return setImmediate(cb);
};

/**
* Loads delegates.
* @implements module:transactions#Transactions~fillPool
*/
Delegates.prototype.onBlockchainReady = function () {
__private.loaded = true;

__private.loadDelegates(function (err) {

function nextForge (cb) {
if (err) {
library.logger.error('Failed to load delegates', err);
}

async.series([
__private.forge,
modules.transactions.fillPool
], function () {
return setImmediate(cb);
});
if (err) {
jobsQueue.register('logLoadDelegatesError', function (jobsQueueCb) {
__private.logLoadDelegatesError(err, jobsQueueCb);
}, __private.forgeInterval);
}

jobsQueue.register('delegatesNextForge', nextForge, 1000);
jobsQueue.register('delegatesNextForge', function (jobsQueueCb) {
library.sequence.add(__private.nextForge, jobsQueueCb);
}, __private.forgeInterval);
});
};

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"z-schema": "=3.18.2"
},
"devDependencies": {
"rewire": "=2.5.2",
"bitcore-mnemonic": "=1.1.1",
"bluebird": "=3.5.0",
"browserify-bignum": "=1.3.0-2",
Expand All @@ -77,6 +76,8 @@
"lisk-js": "=0.5.0",
"mocha": "=3.2.0",
"moment": "=2.17.1",
"rewire": "=2.5.2",
"rx-node": "=1.0.2",
"sinon": "=1.17.7",
"supertest": "=3.0.0"
}
Expand Down
97 changes: 97 additions & 0 deletions test/system/synchronousTasks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict';

var expect = require('chai').expect;
var Rx = require('rx');
var application = require('./../common/application');

describe('synchronousTasks', function () {

var library;

before('init sandboxed application', function (done) {
application.init({sandbox: {name: 'lisk_test_synchronous_tasks'}}, function (scope) {
library = scope;
done();
});
});

after('cleanup sandboxed application', function (done) {
application.cleanup(done);
});

describe('when events are emitted after any of synchronous task starts', function () {

var intervalMs;
var durationMs;
var attemptToForgeRunningSubject;
var synchronizeBlockchainRunningSubject;

var synchronousTaskMock = function (isTaskRunningSubject, nextCb) {
isTaskRunningSubject.onNext(true);
setTimeout(function () {
isTaskRunningSubject.onNext(false);
nextCb();
}, durationMs);
};

before(function () {
attemptToForgeRunningSubject = new Rx.BehaviorSubject();
synchronizeBlockchainRunningSubject = new Rx.BehaviorSubject();
});

after(function () {
attemptToForgeRunningSubject.dispose();
synchronizeBlockchainRunningSubject.dispose();
});

describe('when "attempt to forge" synchronous task runs every 100 ms and takes 101 ms', function () {

intervalMs = 100;
durationMs = intervalMs + 1;

before(function () {
library.modules.delegates.onBlockchainReady = library.rewiredModules.delegates.prototype.onBlockchainReady;
library.rewiredModules.delegates.__set__('__private.forgeInterval', intervalMs);
library.rewiredModules.delegates.__set__('__private.nextForge', synchronousTaskMock.bind(null, attemptToForgeRunningSubject));
library.modules.delegates.onBlockchainReady();
});

describe('when "blockchain synchronization" synchronous task runs every 100 ms and takes 101 ms', function () {

before(function () {
library.rewiredModules.loader.__set__('__private.syncInterval', intervalMs);
library.rewiredModules.loader.__set__('__private.sync', synchronousTaskMock.bind(null, synchronizeBlockchainRunningSubject));
var jobsQueue = require('../../helpers/jobsQueue');
var originalLoaderSyncTimerJob = jobsQueue.jobs['loaderSyncTimer'];
clearTimeout(originalLoaderSyncTimerJob); // Terminate original job
jobsQueue.jobs['loaderSyncTimer'] = null; // Remove original job
library.modules.loader.onPeersReady(); // Execute the mocked blockchain synchronization process
});

describe('within 5000 ms', function () {

beforeEach(function () {
setTimeout(function () {
attemptToForgeRunningSubject.onCompleted();
synchronizeBlockchainRunningSubject.onCompleted();
attemptToForgeRunningSubject = new Rx.BehaviorSubject();
synchronizeBlockchainRunningSubject = new Rx.BehaviorSubject();
}, 5000);
});

it('"attempt to forge" task should never start when "blockchain synchronization" task is running', function (done) {
attemptToForgeRunningSubject
.filter(function (isStarting) {return isStarting;})
.subscribe(function () {expect(synchronizeBlockchainRunningSubject.getValue()).to.be.false;}, done, done);
});

it('"blockchain synchronization" task should never start when "attempt to forge" task is running', function (done) {
synchronizeBlockchainRunningSubject
.filter(function (isStarting) {return isStarting;})
.subscribe(function () {expect(attemptToForgeRunningSubject.getValue()).to.be.false;}, done, done);
});
});
});
});
});
});

0 comments on commit d305c88

Please sign in to comment.