Skip to content

Commit

Permalink
Better handling of stop/kill sync processes
Browse files Browse the repository at this point in the history
-The SIGINT and SIGTERM signals are now being caught and handled in the sync.js file so that most options for syncing blocks, markets, peers, masternodes, etc. are now being gracefully shut down instead of killed in the middle of the process. This should help prevent data anomalies when you need to stop or kill a sync process
-The update_tx_db function was moved from the database.js file into the sync.js file so that block syncs can now be gracefully stopped. The update_tx_db function was also copied to the benchmark.js
-The save_tx function was moved into the module.exports for the database.js file so that it can now be called from outside the database.js file
  • Loading branch information
joeuhren committed Jul 17, 2022
1 parent bfbc50b commit 1178038
Show file tree
Hide file tree
Showing 3 changed files with 410 additions and 220 deletions.
241 changes: 77 additions & 164 deletions lib/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ var mongoose = require('mongoose'),
settings = require('./settings'),
locale = require('./locale'),
fs = require('fs'),
coingecko = require('./apis/coingecko'),
async = require('async');
coingecko = require('./apis/coingecko');

function find_address(hash, caseSensitive, cb) {
if (caseSensitive) {
Expand Down Expand Up @@ -114,82 +113,6 @@ function find_tx(txid, cb) {
});
}

function save_tx(txid, blockheight, cb) {
lib.get_rawtransaction(txid, function(tx) {
if (tx && tx != 'There was an error. Check your console.') {
lib.prepare_vin(tx, function(vin, tx_type_vin) {
lib.prepare_vout(tx.vout, txid, vin, ((!settings.blockchain_specific.zksnarks.enabled || typeof tx.vjoinsplit === 'undefined' || tx.vjoinsplit == null) ? [] : tx.vjoinsplit), function(vout, nvin, tx_type_vout) {
lib.syncLoop(vin.length, function (loop) {
var i = loop.iteration();

// check if address is inside an array
if (Array.isArray(nvin[i].addresses)) {
// extract the address
nvin[i].addresses = nvin[i].addresses[0];
}

update_address(nvin[i].addresses, blockheight, txid, nvin[i].amount, 'vin', function() {
loop.next();
});
}, function() {
lib.syncLoop(vout.length, function (subloop) {
var t = subloop.iteration();

// check if address is inside an array
if (Array.isArray(vout[t].addresses)) {
// extract the address
vout[t].addresses = vout[t].addresses[0];
}

if (vout[t].addresses) {
update_address(vout[t].addresses, blockheight, txid, vout[t].amount, 'vout', function() {
subloop.next();
});
} else
subloop.next();
}, function() {
lib.calculate_total(vout, function(total) {
var op_return = null;
// check if the op_return value should be decoded and saved
if (settings.transaction_page.show_op_return) {
// loop through vout to find the op_return value
tx.vout.forEach(function (vout_data) {
// check if the op_return value exists
if (vout_data.scriptPubKey != null && vout_data.scriptPubKey.asm != null && vout_data.scriptPubKey.asm.indexOf('OP_RETURN') > -1) {
// decode the op_return value
op_return = hex_to_ascii(vout_data.scriptPubKey.asm.replace('OP_RETURN', '').trim());
}
});
}

var newTx = new Tx({
txid: tx.txid,
vin: nvin,
vout: vout,
total: total.toFixed(8),
timestamp: tx.time,
blockhash: tx.blockhash,
blockindex: blockheight,
tx_type: (tx_type_vout == null ? tx_type_vin : tx_type_vout),
op_return: op_return
});

newTx.save(function(err) {
if (err)
return cb(err, false);
else
return cb(null, vout.length > 0);
});
});
});
});
});
});
} else
return cb('tx not found: ' + txid, false);
});
}

function get_market_data(market, coin_symbol, pair_symbol, cb) {
if (fs.existsSync('./lib/markets/' + market + '.js')) {
exMarket = require('./markets/' + market);
Expand Down Expand Up @@ -1156,92 +1079,6 @@ module.exports = {
});
},

// updates tx, address & richlist db's; called by sync.js
update_tx_db: function(coin, start, end, txes, timeout, check_only, cb) {
var complete = false;
var blocks_to_scan = [];
var task_limit_blocks = settings.sync.block_parallel_tasks;
var task_limit_txs = 1;

// fix for invalid block height (skip genesis block as it should not have valid txs)
if (typeof start === 'undefined' || start < 1)
start = 1;

if (task_limit_blocks < 1)
task_limit_blocks = 1;

for (i = start; i < (end + 1); i++)
blocks_to_scan.push(i);

async.eachLimit(blocks_to_scan, task_limit_blocks, function(block_height, next_block) {
if (!check_only && block_height % settings.sync.save_stats_after_sync_blocks === 0) {
Stats.updateOne({coin: coin}, {
last: block_height - 1,
txes: txes
}, function() {});
} else if (check_only) {
console.log('Checking block ' + block_height + '...');
}

lib.get_blockhash(block_height, function(blockhash) {
if (blockhash) {
lib.get_block(blockhash, function(block) {
if (block) {
async.eachLimit(block.tx, task_limit_txs, function(txid, next_tx) {
Tx.findOne({txid: txid}, function(err, tx) {
if (tx) {
setTimeout( function() {
tx = null;
next_tx();
}, timeout);
} else {
save_tx(txid, block_height, function(err, tx_has_vout) {
if (err)
console.log(err);
else
console.log('%s: %s', block_height, txid);

if (tx_has_vout)
txes++;

setTimeout( function() {
tx = null;
next_tx();
}, timeout);
});
}
});
}, function() {
setTimeout( function() {
blockhash = null;
block = null;
next_block();
}, timeout);
});
} else {
console.log('Block not found: %s', blockhash);

setTimeout( function() {
next_block();
}, timeout);
}
});
} else {
setTimeout( function() {
next_block();
}, timeout);
}
});
}, function() {
Stats.updateOne({coin: coin}, {
last: end,
txes: txes
}, function() {
return cb();
});
});
},

create_peer: function(params, cb) {
var newPeer = new Peers(params);

Expand Down Expand Up @@ -1667,5 +1504,81 @@ module.exports = {
}
},

save_tx: function(txid, blockheight, cb) {
lib.get_rawtransaction(txid, function(tx) {
if (tx && tx != 'There was an error. Check your console.') {
lib.prepare_vin(tx, function(vin, tx_type_vin) {
lib.prepare_vout(tx.vout, txid, vin, ((!settings.blockchain_specific.zksnarks.enabled || typeof tx.vjoinsplit === 'undefined' || tx.vjoinsplit == null) ? [] : tx.vjoinsplit), function(vout, nvin, tx_type_vout) {
lib.syncLoop(vin.length, function (loop) {
var i = loop.iteration();

// check if address is inside an array
if (Array.isArray(nvin[i].addresses)) {
// extract the address
nvin[i].addresses = nvin[i].addresses[0];
}

update_address(nvin[i].addresses, blockheight, txid, nvin[i].amount, 'vin', function() {
loop.next();
});
}, function() {
lib.syncLoop(vout.length, function (subloop) {
var t = subloop.iteration();

// check if address is inside an array
if (Array.isArray(vout[t].addresses)) {
// extract the address
vout[t].addresses = vout[t].addresses[0];
}

if (vout[t].addresses) {
update_address(vout[t].addresses, blockheight, txid, vout[t].amount, 'vout', function() {
subloop.next();
});
} else
subloop.next();
}, function() {
lib.calculate_total(vout, function(total) {
var op_return = null;
// check if the op_return value should be decoded and saved
if (settings.transaction_page.show_op_return) {
// loop through vout to find the op_return value
tx.vout.forEach(function (vout_data) {
// check if the op_return value exists
if (vout_data.scriptPubKey != null && vout_data.scriptPubKey.asm != null && vout_data.scriptPubKey.asm.indexOf('OP_RETURN') > -1) {
// decode the op_return value
op_return = hex_to_ascii(vout_data.scriptPubKey.asm.replace('OP_RETURN', '').trim());
}
});
}

var newTx = new Tx({
txid: tx.txid,
vin: nvin,
vout: vout,
total: total.toFixed(8),
timestamp: tx.time,
blockhash: tx.blockhash,
blockindex: blockheight,
tx_type: (tx_type_vout == null ? tx_type_vin : tx_type_vout),
op_return: op_return
});

newTx.save(function(err) {
if (err)
return cb(err, false);
else
return cb(null, vout.length > 0);
});
});
});
});
});
});
} else
return cb('tx not found: ' + txid, false);
});
},

fs: fs
};
93 changes: 91 additions & 2 deletions scripts/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ var mongoose = require('mongoose'),
db = require('../lib/database'),
Tx = require('../models/tx'),
Address = require('../models/address'),
settings = require('../lib/settings');
settings = require('../lib/settings'),
lib = require('../lib/explorer'),
Stats = require('../models/stats'),
async = require('async');

var COUNT = 5000; // number of blocks to index

Expand All @@ -27,7 +30,93 @@ mongoose.connect(dbString, function(err) {
Address.deleteMany({}, function(err2) {
var s_timer = new Date().getTime();

db.update_tx_db(settings.coin.name, 1, COUNT, 0, settings.sync.update_timeout, false, function() {
// updates tx, address & richlist db's
function update_tx_db(coin, start, end, txes, timeout, check_only, cb) {
var complete = false;
var blocks_to_scan = [];
var task_limit_blocks = settings.sync.block_parallel_tasks;
var task_limit_txs = 1;

// fix for invalid block height (skip genesis block as it should not have valid txs)
if (typeof start === 'undefined' || start < 1)
start = 1;

if (task_limit_blocks < 1)
task_limit_blocks = 1;

for (i = start; i < (end + 1); i++)
blocks_to_scan.push(i);

async.eachLimit(blocks_to_scan, task_limit_blocks, function(block_height, next_block) {
if (!check_only && block_height % settings.sync.save_stats_after_sync_blocks === 0) {
Stats.updateOne({coin: coin}, {
last: block_height - 1,
txes: txes
}, function() {});
} else if (check_only) {
console.log('Checking block ' + block_height + '...');
}

lib.get_blockhash(block_height, function(blockhash) {
if (blockhash) {
lib.get_block(blockhash, function(block) {
if (block) {
async.eachLimit(block.tx, task_limit_txs, function(txid, next_tx) {
Tx.findOne({txid: txid}, function(err, tx) {
if (tx) {
setTimeout( function() {
tx = null;
next_tx();
}, timeout);
} else {
db.save_tx(txid, block_height, function(err, tx_has_vout) {
if (err)
console.log(err);
else
console.log('%s: %s', block_height, txid);

if (tx_has_vout)
txes++;

setTimeout( function() {
tx = null;
next_tx();
}, timeout);
});
}
});
}, function() {
setTimeout( function() {
blockhash = null;
block = null;
next_block();
}, timeout);
});
} else {
console.log('Block not found: %s', blockhash);

setTimeout( function() {
next_block();
}, timeout);
}
});
} else {
setTimeout( function() {
next_block();
}, timeout);
}
});
}, function() {
Stats.updateOne({coin: coin}, {
last: end,
txes: txes
}, function() {
return cb();
});
});
}

update_tx_db(settings.coin.name, 1, COUNT, 0, settings.sync.update_timeout, false, function() {
var e_timer = new Date().getTime();

Tx.countDocuments({}, function(txerr, txcount) {
Expand Down
Loading

0 comments on commit 1178038

Please sign in to comment.