Improved block sync speed

-A number of functions have been rewritten to be more optimized and faster: calculate_total, is_unique, convert_to_satoshi, get_input_addresses, processVoutAddresses, prepare_vout, prepare_vin
-Txes are now written to database via bulk writes which helps improve the sync speed and also controls memory usage with batching to write data once a certain threshold is reached
-update_address function changed to update_addresses since it now bulk writes the addresses in batches to improve sync speed and also controls memory usage with batching to write data once a certain threshold is reached
-The syncLoop function has been completely removed from the project and replaced with async library loops or even normal "for" loops in some cases which greatly improves sync speeds over large batches of data
-Fixed an issue with the flattened count of txes that is saved to the coinstats collection which could save incorrectly when using more than 1 thread
-Fixed an issue with the block sync which caused an unwanted delay when syncing less blocks than the amount of threads used to sync the data
-Fixed an issue with vout data processing that could sometimes populate data out of order
-Added a new sync.batch_size setting used to determine how many records (txes, addresses, addresstxes) should be saved in a single database transaction
-Added a new wait_for_bulk_database_save setting used to increase the block sync speed at the cost of not returning any error msgs for data that failed to save
-get_input_addresses function no longer returns in the exports section of the explorer.js file since it is only referenced in that file
-Updated explorerspec tests to use the newest function changes for any tests that needed to be updated

Special thanks to Karzo from Pepecoin for help with the bulkwrite code changes!
This commit is contained in:
Joe Uhren
2025-02-02 19:10:17 -07:00
parent 0b0ef817f1
commit 3a2f679201
10 changed files with 966 additions and 867 deletions
+89 -85
View File
@@ -1,20 +1,20 @@
var mongoose = require('mongoose'),
lib = require('../lib/explorer'),
blkSync = require('../lib/block_sync'),
db = require('../lib/database'),
Tx = require('../models/tx'),
Address = require('../models/address'),
AddressTx = require('../models/addresstx'),
Orphans = require('../models/orphans'),
Richlist = require('../models/richlist'),
Stats = require('../models/stats'),
settings = require('../lib/settings'),
async = require('async');
var mode = 'update';
var database = 'index';
var block_start = 1;
var lockCreated = false;
var stopSync = false;
const mongoose = require('mongoose');
const lib = require('../lib/explorer');
const blkSync = require('../lib/block_sync');
const db = require('../lib/database');
const Tx = require('../models/tx');
const Address = require('../models/address');
const AddressTx = require('../models/addresstx');
const Orphans = require('../models/orphans');
const Richlist = require('../models/richlist');
const Stats = require('../models/stats');
const settings = require('../lib/settings');
const async = require('async');
let mode = 'update';
let database = 'index';
let block_start = 1;
let lockCreated = false;
let stopSync = false;
// prevent stopping of the sync script to be able to gracefully shut down
process.on('SIGINT', () => {
@@ -172,7 +172,7 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
// this fork needs to be resolved
// lookup the current block data from the wallet
lib.get_block(correct_block_data.prev_hash, function (block_data) {
var tx_count = 0;
let tx_count = 0;
// check if the good block hash is in the list of blockhashes
if (blockhashes.indexOf(correct_block_data.prev_hash) > -1) {
@@ -181,9 +181,7 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
}
// loop through the remaining orphaned block hashes
lib.syncLoop(blockhashes.length, function(block_loop) {
var i = block_loop.iteration();
async.timesSeries(blockhashes.length, function(i, block_loop) {
console.log('Resolving orphaned block [' + (i + 1).toString() + '/' + blockhashes.length.toString() + ']: ' + blockhashes[i]);
// find all orphaned txid's from the current orphan block hash
@@ -191,17 +189,15 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
// save the orphan block data to the orphan collection
create_orphan(current_block, blockhashes[i], correct_block_data.prev_hash, block_data.previousblockhash, correct_block_data.next_hash, function() {
// loop through the remaining orphaned block hashes
lib.syncLoop(txids.length, function(tx_loop) {
var t = tx_loop.iteration();
async.eachSeries(txids, function(current_txid, tx_loop) {
// remove the orphaned tx and cleanup all associated data
blkSync.delete_and_cleanup_tx(txids[t], current_block, tx_count, timeout, function(updated_tx_count) {
blkSync.delete_and_cleanup_tx(current_txid, current_block, timeout, function(updated_tx_count) {
// update the running tx count
tx_count = updated_tx_count;
tx_count += updated_tx_count;
// some blockchains will reuse the same orphaned transaction ids
// lookup the transaction that was just deleted to ensure it doesn't belong to another block
check_add_tx(txids[t], blockhashes[i], tx_count, function(updated_tx_count2) {
check_add_tx(current_txid, blockhashes[i], tx_count, function(updated_tx_count2) {
// update the running tx count
tx_count = updated_tx_count2;
@@ -209,11 +205,11 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
// check if there was a memory error
if (blkSync.getStackSizeErrorId() != null) {
// stop the loop
tx_loop.break(true);
tx_loop({});
} else {
// move to the next tx record
tx_loop();
}
// move to the next tx record
tx_loop.next();
}, timeout);
});
});
@@ -222,11 +218,11 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
// check if there was a memory error
if (blkSync.getStackSizeErrorId() != null) {
// stop the loop
block_loop.break(true);
block_loop({});
} else {
// move to the next block record
block_loop();
}
// move to the next block record
block_loop.next();
}, timeout);
});
});
@@ -326,33 +322,31 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
// get the list of orphans with a null next_blockhash
Orphans.find({next_blockhash: null}).exec().then((orphans) => {
// loop through the list of orphans
lib.syncLoop(orphans.length, function(orphan_loop) {
var o = orphan_loop.iteration();
async.eachSeries(orphans, function(current_orphan, orphan_loop) {
// lookup the block data from the wallet
lib.get_block(orphans[o].good_blockhash, function (good_block_data) {
lib.get_block(current_orphan.good_blockhash, function (good_block_data) {
// check if the next block hash is known
if (good_block_data.nextblockhash != null) {
// update the next blockhash for this orphan record
Orphans.updateOne({blockindex: orphans[o].blockindex, orphan_blockhash: orphans[o].orphan_blockhash}, {
Orphans.updateOne({blockindex: current_orphan.blockindex, orphan_blockhash: current_orphan.orphan_blockhash}, {
next_blockhash: good_block_data.nextblockhash
}).then(() => {
setTimeout(function() {
// move to the next orphan record
orphan_loop.next();
orphan_loop();
}, timeout);
}).catch((err) => {
console.log(err);
setTimeout(function() {
// move to the next orphan record
orphan_loop.next();
orphan_loop();
}, timeout);
});
} else {
setTimeout(function() {
// move to the next orphan record
orphan_loop.next();
orphan_loop();
}, timeout);
}
});
@@ -542,8 +536,8 @@ function check_add_tx(txid, blockhash, tx_count, cb) {
// check if the block was found
if (block) {
// save the tx to the local database
blkSync.save_tx(txid, block.height, block, function(save_tx_err, tx_has_vout) {
// check if there were any save errors
blkSync.save_tx(txid, block.height, block, function(save_tx_err, tx_has_vout, newTx) {
// check for errors
if (save_tx_err) {
// check the error code
if (save_tx_err.code == 'StackSizeError') {
@@ -551,16 +545,25 @@ function check_add_tx(txid, blockhash, tx_count, cb) {
blkSync.setStackSizeErrorId(txid);
} else
console.log(save_tx_err);
} else
console.log('%s: %s', block.height, txid);
// check if the tx was saved correctly
if (tx_has_vout) {
// keep a running total of txes that were added
tx_count++;
return cb(tx_count);
} else {
// save the tx
newTx.save().then(() => {
console.log('%s: %s', block.height, txid);
// check if the tx has vouts
if (tx_has_vout) {
// keep a running total of txes that were added
tx_count++;
}
return cb(tx_count);
}).catch((err) => {
console.log(err);
return cb(tx_count);
});
}
return cb(tx_count);
});
} else {
// block not found so there is nothing to fix
@@ -842,10 +845,11 @@ function occurrences(string, subString, allowOverlapping) {
}
function block_sync(reindex, stats) {
// Get the last synced block index value
var last = (stats.last ? stats.last : 0);
// Get the total number of blocks
var count = (stats.count ? stats.count : 0);
// get the last synced block index value
let last = (stats.last ? stats.last : 0);
// get the total number of blocks
let count = (stats.count ? stats.count : 0);
// Check if the sync msg should be shown
check_show_sync_message(count - last);
@@ -1148,20 +1152,19 @@ if (lib.is_locked([database]) == false) {
} else if (database == 'peers') {
lib.get_peerinfo(function(body) {
if (body != null) {
lib.syncLoop(body.length, function(loop) {
var i = loop.iteration();
var address = body[i].addr;
var port = null;
async.timesSeries(body.length, function(i, loop) {
let address = body[i].addr;
let port = null;
if (occurrences(address, ':') == 1 || occurrences(address, ']:') == 1) {
// Separate the port # from the IP address
address = address.substring(0, address.lastIndexOf(":")).replace("[", "").replace("]", "");
port = body[i].addr.substring(body[i].addr.lastIndexOf(":") + 1);
// separate the port # from the IP address
address = address.substring(0, address.lastIndexOf(':')).replace('[', '').replace(']', '');
port = body[i].addr.substring(body[i].addr.lastIndexOf(':') + 1);
}
if (address.indexOf("]") > -1) {
// Remove [] characters from IPv6 addresses
address = address.replace("[", "").replace("]", "");
if (address.indexOf(']') > -1) {
// remove [] characters from IPv6 addresses
address = address.replace('[', '').replace(']', '');
}
db.find_peer(address, port, function(peer) {
@@ -1190,10 +1193,11 @@ if (lib.is_locked([database]) == false) {
// check if the script is stopping
if (stopSync) {
// stop the loop
loop.break(true);
loop({});
} else {
// move to next peer
loop();
}
loop.next();
});
});
} else {
@@ -1224,10 +1228,11 @@ if (lib.is_locked([database]) == false) {
// check if the script is stopping
if (stopSync) {
// stop the loop
loop.break(true);
loop({});
} else {
// move to next peer
loop();
}
loop.next();
});
}
});
@@ -1255,36 +1260,35 @@ if (lib.is_locked([database]) == false) {
} else if (database == 'masternodes') {
lib.get_masternodelist(function(body) {
if (body != null) {
var isObject = false;
var objectKeys = null;
let isObject = false;
let objectKeys = null;
// Check if the masternode data is an array or an object
// check if the masternode data is an array or an object
if (body.length == null) {
// Process data as an object
// process data as an object
objectKeys = Object.keys(body);
isObject = true;
}
lib.syncLoop((isObject ? objectKeys : body).length, function(loop) {
var i = loop.iteration();
async.timesSeries((isObject ? objectKeys : body).length, function(i, loop) {
db.save_masternode((isObject ? body[objectKeys[i]] : body[i]), (isObject ? objectKeys[i] : null), function(success) {
if (success) {
// check if the script is stopping
if (stopSync) {
// stop the loop
loop.break(true);
loop({});
} else {
// move to next masternode
loop();
}
loop.next();
} else {
console.log('Error: Cannot save masternode %s.', (isObject ? (body[objectKeys[i]].payee ? body[objectKeys[i]].payee : 'UNKNOWN') : (body[i].addr ? body[i].addr : 'UNKNOWN')));
exit(1);
}
});
}, function() {
db.remove_old_masternodes(function(cb) {
db.update_last_updated_stats(settings.coin.name, { masternodes_last_updated: Math.floor(new Date() / 1000) }, function(cb) {
db.remove_old_masternodes(function() {
db.update_last_updated_stats(settings.coin.name, { masternodes_last_updated: Math.floor(new Date() / 1000) }, function(update_success) {
// check if the script stopped prematurely
if (stopSync) {
console.log('Masternode sync was stopped prematurely');