From 91dc602cfc0415bef101df43def9ba962068ae8d Mon Sep 17 00:00:00 2001 From: joeuhren <46763106+joeuhren@users.noreply.github.com> Date: Sun, 22 Nov 2020 15:00:47 -0700 Subject: [PATCH] Index blocks in parallel - dramatic speed-up --- lib/database.js | 177 +++++++++++++++++------------------------ lib/settings.js | 1 + models/addresstx.js | 1 + package.json | 1 + settings.json.template | 1 + 5 files changed, 76 insertions(+), 105 deletions(-) diff --git a/lib/database.js b/lib/database.js index c899b3d..6b196af 100644 --- a/lib/database.js +++ b/lib/database.js @@ -10,6 +10,7 @@ var mongoose = require('mongoose') , lib = require('./explorer') , settings = require('./settings') , fs = require('fs') + , async = require('async') , poloniex = require('./markets/poloniex') , bittrex = require('./markets/bittrex') , bleutrade = require('./markets/bleutrade') @@ -60,90 +61,45 @@ function find_richlist(coin, cb) { }); } -function update_address(hash, txid, amount, type, cb) { - // Check if address exists - find_address(hash, true, function(address) { - if (address) { - // if coinbase (new coins PoW), update sent only and return cb. - if ( hash == 'coinbase' ) { - Address.updateOne({a_id:hash}, { - sent: address.sent + amount, - balance: 0, - }, function() { - return cb(); - }); - } else { - var received = address.received; - var sent = address.sent; - if (type == 'vin') { - sent = sent + amount; - } else { - received = received + amount; - } - Address.updateOne({a_id:hash}, { - received: received, - sent: sent, - balance: received - sent - }, function() { - // ensure tx doesnt already exist in address.txs - find_address_tx(hash, txid, function(address_tx) { - if (typeof address_tx == "undefined") { - var newAddressTx = new AddressTx({ - a_id: hash, - balance: received - sent, - txid: txid - }); - newAddressTx.save(function(err) { - if (err) { - return cb(err); - } else { - return cb(); - } - }); - } else { - AddressTx.updateOne({a_id: hash, txid: txid}, { - a_id: hash, - balance: received - sent, - txid: txid - }, function() { - return cb(); - }); - } - }); - }); - } +function update_address(hash, blockheight, txid, amount, type, cb) { + var to_sent = false; + var to_received = false; + var addr_inc = {} + if ( hash == 'coinbase' ) { + addr_inc.sent = amount; + } else { + if (type == 'vin') { + addr_inc.sent = amount; + addr_inc.balance = -amount; } else { - //new address - if (type == 'vin') { - var newAddress = new Address({ + addr_inc.received = amount; + addr_inc.balance = amount; + } + } + Address.findOneAndUpdate({a_id: hash}, { + $inc: addr_inc + }, { + new: true, + upsert: true + }, function (err, address) { + if (err) { + return cb(err); + } else { + AddressTx.findOneAndUpdate({a_id: hash, txid: txid}, { + $set: { a_id: hash, - sent: amount, - balance: amount, - }); - } else { - var newAddress = new Address({ - a_id: hash, - received: amount, - balance: amount, - }); - } - - newAddress.save(function(err) { + balance: address.balance, + blockindex: blockheight, + txid: txid + } + }, { + new: true, + upsert: true + }, function (err,addresstx) { if (err) { return cb(err); } else { - var newAddressTx = new AddressTx({ - a_id: hash, - balance: amount, - txid: txid - }); - newAddressTx.save(function(err) { - if (err) { - return cb(err); - } else { - return cb(); - } - }); + return cb(); } }); } @@ -168,14 +124,14 @@ function save_tx(txid, blockheight, cb) { lib.prepare_vout(tx.vout, txid, vin, ((typeof tx.vjoinsplit === 'undefined' || tx.vjoinsplit == null) ? [] : tx.vjoinsplit), function(vout, nvin) { lib.syncLoop(vin.length, function (loop) { var i = loop.iteration(); - update_address(nvin[i].addresses, txid, nvin[i].amount, 'vin', function(){ + update_address(nvin[i].addresses, blockheight, txid, nvin[i].amount, 'vin', function(){ loop.next(); }); }, function(){ lib.syncLoop(vout.length, function (subloop) { var t = subloop.iteration(); if (vout[t].addresses) { - update_address(vout[t].addresses, txid, vout[t].amount, 'vout', function(){ + update_address(vout[t].addresses, blockheight, txid, vout[t].amount, 'vout', function(){ subloop.next(); }); } else { @@ -498,7 +454,7 @@ module.exports = { return cb(err); } else { totalCount = count; - AddressTx.find({a_id: hash}).sort({_id: 'desc'}).skip(Number(start)).limit(Number(length)).exec(function (err, address) { + AddressTx.find({a_id: hash}).sort({blockindex: 'desc'}).skip(Number(start)).limit(Number(length)).exec(function (err, address) { if (err) { return cb(err); } else { @@ -820,52 +776,63 @@ module.exports = { } else { create_lock("db_index", function (){ var complete = false; - lib.syncLoop((end - start) + 1, function (loop) { - var x = loop.iteration(); - if (x % 5000 === 0) { - Tx.find({}).where('blockindex').lt(start + x).sort({timestamp: 'desc'}).limit(settings.index.last_txs).exec(function(err, txs){ - Stats.updateOne({coin: coin}, { - last: start + x - 1, - last_txs: '' //not used anymore left to clear out existing objects - }, function() {}); - }); + var blocks_to_scan = []; + var task_limit_blocks = settings.block_parallel_tasks; + if (task_limit_blocks < 1) { task_limit_blocks = 1; } + var task_limit_txs = 1 + for (i=start; i<(end+1); i++) { + blocks_to_scan.push(i); + } + async.eachLimit(blocks_to_scan, task_limit_blocks, function(block_height, next_block) { + if (block_height % 5000 === 0) { + Stats.updateOne({coin: coin}, { + last: block_height - 1, + last_txs: '' //not used anymore left to clear out existing objects + }, function() {}); } - lib.get_blockhash(start + x, function(blockhash){ + lib.get_blockhash(block_height, function(blockhash){ if (blockhash) { lib.get_block(blockhash, function(block) { if (block) { - lib.syncLoop(block.tx.length, function (subloop) { - var i = subloop.iteration(); - Tx.findOne({txid: block.tx[i]}, function(err, tx) { + async.eachLimit(block.tx, task_limit_txs, function(txid, next_tx) { + Tx.findOne({txid: txid}, function(err, tx) { if(tx) { - tx = null; - subloop.next(); + setTimeout( function(){ + tx = null; + next_tx(); + }, timeout); } else { - save_tx(block.tx[i], block.height, function(err){ + save_tx(txid, block_height, function(err){ if (err) { console.log(err); } else { - console.log('%s: %s', block.height, block.tx[i]); + console.log('%s: %s', block_height, txid); } setTimeout( function(){ tx = null; - subloop.next(); + next_tx(); }, timeout); }); } }); }, function(){ - blockhash = null; - block = null; - loop.next(); + setTimeout( function(){ + blockhash = null; + block = null; + next_block(); + }, timeout); }); } else { console.log('block not found: %s', blockhash); - loop.next(); + setTimeout( function(){ + next_block(); + }, timeout); } }); } else { - loop.next(); + setTimeout( function(){ + next_block(); + }, timeout); } }); }, function(){ diff --git a/lib/settings.js b/lib/settings.js index 9fa2ea9..4de0f55 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -138,6 +138,7 @@ exports.confirmations = 6; //timeouts exports.update_timeout = 125; exports.check_timeout = 250; +exports.block_parallel_tasks = 2; //genesis exports.genesis_tx = "dd1d332ad2d8d8f49195056d482ae3c96fd2d16e9d166413b27ca7f19775644c"; diff --git a/models/addresstx.js b/models/addresstx.js index 5c52790..9e873d7 100644 --- a/models/addresstx.js +++ b/models/addresstx.js @@ -3,6 +3,7 @@ var mongoose = require('mongoose') var AddressTXSchema = new Schema({ a_id: { type: String, index: true}, + blockindex: {type: Number, default: 0, index: true}, txid: { type: String, lowercase: true, index: true}, balance: { type: Number, default: 0} }, {id: false}); diff --git a/package.json b/package.json index a704940..dbb7538 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ }, "dependencies": { "express": ">=4.17.1", + "async": "^3.1.0", "intl": "^1.2.5", "serve-favicon": "^2.5.0", "morgan": ">=1.9.1", diff --git a/settings.json.template b/settings.json.template index 403d85f..7c27c35 100644 --- a/settings.json.template +++ b/settings.json.template @@ -49,6 +49,7 @@ //update script settings "update_timeout": 10, "check_timeout": 250, + "block_parallel_tasks": 2, // wallet settings "use_rpc": false,