Index blocks in parallel - dramatic speed-up

This commit is contained in:
joeuhren
2020-11-22 15:00:47 -07:00
parent a66f9cb7e4
commit 91dc602cfc
5 changed files with 76 additions and 105 deletions
+72 -105
View File
@@ -10,6 +10,7 @@ var mongoose = require('mongoose')
, lib = require('./explorer')
, settings = require('./settings')
, fs = require('fs')
, async = require('async')
, poloniex = require('./markets/poloniex')
, bittrex = require('./markets/bittrex')
, bleutrade = require('./markets/bleutrade')
@@ -60,90 +61,45 @@ function find_richlist(coin, cb) {
});
}
function update_address(hash, txid, amount, type, cb) {
// Check if address exists
find_address(hash, true, function(address) {
if (address) {
// if coinbase (new coins PoW), update sent only and return cb.
if ( hash == 'coinbase' ) {
Address.updateOne({a_id:hash}, {
sent: address.sent + amount,
balance: 0,
}, function() {
return cb();
});
} else {
var received = address.received;
var sent = address.sent;
if (type == 'vin') {
sent = sent + amount;
} else {
received = received + amount;
}
Address.updateOne({a_id:hash}, {
received: received,
sent: sent,
balance: received - sent
}, function() {
// ensure tx doesnt already exist in address.txs
find_address_tx(hash, txid, function(address_tx) {
if (typeof address_tx == "undefined") {
var newAddressTx = new AddressTx({
a_id: hash,
balance: received - sent,
txid: txid
});
newAddressTx.save(function(err) {
if (err) {
return cb(err);
} else {
return cb();
}
});
} else {
AddressTx.updateOne({a_id: hash, txid: txid}, {
a_id: hash,
balance: received - sent,
txid: txid
}, function() {
return cb();
});
}
});
});
}
function update_address(hash, blockheight, txid, amount, type, cb) {
var to_sent = false;
var to_received = false;
var addr_inc = {}
if ( hash == 'coinbase' ) {
addr_inc.sent = amount;
} else {
if (type == 'vin') {
addr_inc.sent = amount;
addr_inc.balance = -amount;
} else {
//new address
if (type == 'vin') {
var newAddress = new Address({
addr_inc.received = amount;
addr_inc.balance = amount;
}
}
Address.findOneAndUpdate({a_id: hash}, {
$inc: addr_inc
}, {
new: true,
upsert: true
}, function (err, address) {
if (err) {
return cb(err);
} else {
AddressTx.findOneAndUpdate({a_id: hash, txid: txid}, {
$set: {
a_id: hash,
sent: amount,
balance: amount,
});
} else {
var newAddress = new Address({
a_id: hash,
received: amount,
balance: amount,
});
}
newAddress.save(function(err) {
balance: address.balance,
blockindex: blockheight,
txid: txid
}
}, {
new: true,
upsert: true
}, function (err,addresstx) {
if (err) {
return cb(err);
} else {
var newAddressTx = new AddressTx({
a_id: hash,
balance: amount,
txid: txid
});
newAddressTx.save(function(err) {
if (err) {
return cb(err);
} else {
return cb();
}
});
return cb();
}
});
}
@@ -168,14 +124,14 @@ function save_tx(txid, blockheight, cb) {
lib.prepare_vout(tx.vout, txid, vin, ((typeof tx.vjoinsplit === 'undefined' || tx.vjoinsplit == null) ? [] : tx.vjoinsplit), function(vout, nvin) {
lib.syncLoop(vin.length, function (loop) {
var i = loop.iteration();
update_address(nvin[i].addresses, txid, nvin[i].amount, 'vin', function(){
update_address(nvin[i].addresses, blockheight, txid, nvin[i].amount, 'vin', function(){
loop.next();
});
}, function(){
lib.syncLoop(vout.length, function (subloop) {
var t = subloop.iteration();
if (vout[t].addresses) {
update_address(vout[t].addresses, txid, vout[t].amount, 'vout', function(){
update_address(vout[t].addresses, blockheight, txid, vout[t].amount, 'vout', function(){
subloop.next();
});
} else {
@@ -498,7 +454,7 @@ module.exports = {
return cb(err);
} else {
totalCount = count;
AddressTx.find({a_id: hash}).sort({_id: 'desc'}).skip(Number(start)).limit(Number(length)).exec(function (err, address) {
AddressTx.find({a_id: hash}).sort({blockindex: 'desc'}).skip(Number(start)).limit(Number(length)).exec(function (err, address) {
if (err) {
return cb(err);
} else {
@@ -820,52 +776,63 @@ module.exports = {
} else {
create_lock("db_index", function (){
var complete = false;
lib.syncLoop((end - start) + 1, function (loop) {
var x = loop.iteration();
if (x % 5000 === 0) {
Tx.find({}).where('blockindex').lt(start + x).sort({timestamp: 'desc'}).limit(settings.index.last_txs).exec(function(err, txs){
Stats.updateOne({coin: coin}, {
last: start + x - 1,
last_txs: '' //not used anymore left to clear out existing objects
}, function() {});
});
var blocks_to_scan = [];
var task_limit_blocks = settings.block_parallel_tasks;
if (task_limit_blocks < 1) { task_limit_blocks = 1; }
var task_limit_txs = 1
for (i=start; i<(end+1); i++) {
blocks_to_scan.push(i);
}
async.eachLimit(blocks_to_scan, task_limit_blocks, function(block_height, next_block) {
if (block_height % 5000 === 0) {
Stats.updateOne({coin: coin}, {
last: block_height - 1,
last_txs: '' //not used anymore left to clear out existing objects
}, function() {});
}
lib.get_blockhash(start + x, function(blockhash){
lib.get_blockhash(block_height, function(blockhash){
if (blockhash) {
lib.get_block(blockhash, function(block) {
if (block) {
lib.syncLoop(block.tx.length, function (subloop) {
var i = subloop.iteration();
Tx.findOne({txid: block.tx[i]}, function(err, tx) {
async.eachLimit(block.tx, task_limit_txs, function(txid, next_tx) {
Tx.findOne({txid: txid}, function(err, tx) {
if(tx) {
tx = null;
subloop.next();
setTimeout( function(){
tx = null;
next_tx();
}, timeout);
} else {
save_tx(block.tx[i], block.height, function(err){
save_tx(txid, block_height, function(err){
if (err) {
console.log(err);
} else {
console.log('%s: %s', block.height, block.tx[i]);
console.log('%s: %s', block_height, txid);
}
setTimeout( function(){
tx = null;
subloop.next();
next_tx();
}, timeout);
});
}
});
}, function(){
blockhash = null;
block = null;
loop.next();
setTimeout( function(){
blockhash = null;
block = null;
next_block();
}, timeout);
});
} else {
console.log('block not found: %s', blockhash);
loop.next();
setTimeout( function(){
next_block();
}, timeout);
}
});
} else {
loop.next();
setTimeout( function(){
next_block();
}, timeout);
}
});
}, function(){