Fixed multi-threaded sync + related improvements
-The block_parallel_tasks feature has been improved and fixed so that it is now safe to cancel (Ctrl+C) or kill (kill cmd not kill -9) the task and resume the sync later without missing transactions. The new block_parallel_tasks default is 8 threads which seems to be the sweet spot for any type of cpu -Numerous improvements to the benchamark script to utilize new benchmark settings, auto-add credentials to the benchmark database, reuse the same sync code as the regular block sync instead of using a copy of the code and more -Added a new cmd to run the benchmark script `npm run benchmark` -README updated to include the new benchmark script instrutions + include multi-threaded sync as a feature
This commit is contained in:
+124
-124
@@ -1,147 +1,147 @@
|
||||
var mongoose = require('mongoose'),
|
||||
db = require('../lib/database'),
|
||||
Tx = require('../models/tx'),
|
||||
Address = require('../models/address'),
|
||||
settings = require('../lib/settings'),
|
||||
lib = require('../lib/explorer'),
|
||||
Stats = require('../models/stats'),
|
||||
async = require('async');
|
||||
const mongoose = require('mongoose');
|
||||
const blkSync = require('../lib/block_sync');
|
||||
const settings = require('../lib/settings');
|
||||
|
||||
var COUNT = 5000; // number of blocks to index
|
||||
let dbString = `mongodb://${settings.benchmark.address}:${settings.benchmark.port}/admin`
|
||||
|
||||
// prevent stopping of the sync script to be able to gracefully shut down
|
||||
process.on('SIGINT', () => {
|
||||
console.log(`${settings.localization.stopping_sync_process}.. ${settings.localization.please_wait}..`);
|
||||
blkSync.setStopSync(true);
|
||||
});
|
||||
|
||||
// prevent killing of the sync script to be able to gracefully shut down
|
||||
process.on('SIGTERM', () => {
|
||||
console.log(`${settings.localization.stopping_sync_process}.. ${settings.localization.please_wait}..`);
|
||||
blkSync.setStopSync(true);
|
||||
});
|
||||
|
||||
function exit(exitCode) {
|
||||
mongoose.disconnect();
|
||||
process.exit(exitCode);
|
||||
}
|
||||
|
||||
var dbString = 'mongodb://' + encodeURIComponent(settings.dbsettings.user);
|
||||
dbString = dbString + ':' + encodeURIComponent(settings.dbsettings.password);
|
||||
dbString = dbString + '@' + settings.dbsettings.address;
|
||||
dbString = dbString + ':' + settings.dbsettings.port;
|
||||
dbString = dbString + "/IQUIDUS-BENCHMARK";
|
||||
function check_user_exists(exists, cb) {
|
||||
// check if the user exists
|
||||
if (exists) {
|
||||
// user already exists
|
||||
return cb();
|
||||
} else {
|
||||
// user does not exist so create it now
|
||||
mongoose.connection.client.db(settings.benchmark.database).command({
|
||||
createUser: settings.benchmark.user,
|
||||
pwd: settings.benchmark.password,
|
||||
roles: [
|
||||
{
|
||||
role: 'readWrite',
|
||||
db: settings.benchmark.database
|
||||
}
|
||||
]
|
||||
})
|
||||
.then(() => {
|
||||
console.log('User created successfully');
|
||||
return cb();
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error('Error creating new user:', error);
|
||||
exit(2);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function check_create_user(cb) {
|
||||
// check if the benchmark database should be checked for the correct user
|
||||
if (settings.benchmark.auto_add_user) {
|
||||
// connect to the admin database without a username/password
|
||||
mongoose.connect(dbString).then(() => {
|
||||
// determine if the user already exists in the target database
|
||||
mongoose.connection.db
|
||||
.command({
|
||||
usersInfo: {
|
||||
user: settings.benchmark.user,
|
||||
db: settings.benchmark.database
|
||||
}
|
||||
})
|
||||
.then((userInfo) => {
|
||||
// check if the user already exists
|
||||
check_user_exists(userInfo.users.length > 0, function() {
|
||||
// disconnect the current database connection
|
||||
mongoose.disconnect().then(() => {
|
||||
// finished checking the benchmark database user
|
||||
return cb();
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error('Error disconnecting from database:', err);
|
||||
exit(2);
|
||||
});
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error('Error checking if user exists:', err);
|
||||
exit(2);
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
console.log('Error: Unable to connect to database: %s', dbString);
|
||||
exit(999);
|
||||
});
|
||||
} else
|
||||
return cb();
|
||||
}
|
||||
|
||||
console.log(`${settings.localization.script_launched}: ${process.pid}`);
|
||||
|
||||
mongoose.set('strictQuery', true);
|
||||
|
||||
mongoose.connect(dbString).then(() => {
|
||||
Tx.deleteMany({}).then(() => {
|
||||
Address.deleteMany({}).then(() => {
|
||||
var s_timer = new Date().getTime();
|
||||
// ensure the benchmark database user exists
|
||||
check_create_user(function() {
|
||||
// update db string to connect to benchmark database
|
||||
dbString = 'mongodb://' + encodeURIComponent(settings.benchmark.user);
|
||||
dbString = dbString + ':' + encodeURIComponent(settings.benchmark.password);
|
||||
dbString = dbString + '@' + settings.benchmark.address;
|
||||
dbString = dbString + ':' + settings.benchmark.port;
|
||||
dbString = dbString + '/' + settings.benchmark.database;
|
||||
|
||||
// updates tx, address & richlist db's
|
||||
function update_tx_db(coin, start, end, txes, timeout, check_only, cb) {
|
||||
var complete = false;
|
||||
var blocks_to_scan = [];
|
||||
var task_limit_blocks = settings.sync.block_parallel_tasks;
|
||||
var task_limit_txs = 1;
|
||||
// connect to the benchmark database
|
||||
mongoose.connect(dbString).then(() => {
|
||||
const Tx = require('../models/tx');
|
||||
|
||||
// fix for invalid block height (skip genesis block as it should not have valid txs)
|
||||
if (typeof start === 'undefined' || start < 1)
|
||||
start = 1;
|
||||
// delete all previous transaction records from the benchmark database
|
||||
Tx.deleteMany({}).then(() => {
|
||||
const Address = require('../models/address');
|
||||
|
||||
if (task_limit_blocks < 1)
|
||||
task_limit_blocks = 1;
|
||||
// delete all previous address records from the benchmark database
|
||||
Address.deleteMany({}).then(() => {
|
||||
// get starting timestamp
|
||||
const s_timer = new Date().getTime();
|
||||
|
||||
for (i = start; i < (end + 1); i++)
|
||||
blocks_to_scan.push(i);
|
||||
// start the block sync
|
||||
blkSync.update_tx_db(settings.coin.name, 1, settings.benchmark.block_to_sync, 0, settings.sync.update_timeout, false, function() {
|
||||
// get ending timestamp
|
||||
const e_timer = new Date().getTime();
|
||||
|
||||
async.eachLimit(blocks_to_scan, task_limit_blocks, function(block_height, next_block) {
|
||||
if (!check_only && block_height % settings.sync.save_stats_after_sync_blocks === 0) {
|
||||
Stats.updateOne({coin: coin}, {
|
||||
last: block_height - 1,
|
||||
txes: txes
|
||||
}).then(() => {});
|
||||
} else if (check_only) {
|
||||
console.log('Checking block ' + block_height + '...');
|
||||
}
|
||||
// get count of transactions
|
||||
Tx.countDocuments({}).then((txcount) => {
|
||||
// get count of addresses
|
||||
Address.countDocuments({}).then((acount) => {
|
||||
// check if the script stopped prematurely
|
||||
if (blkSync.getStopSync())
|
||||
console.log('Block sync was stopped prematurely');
|
||||
|
||||
lib.get_blockhash(block_height, function(blockhash) {
|
||||
if (blockhash) {
|
||||
lib.get_block(blockhash, function(block) {
|
||||
if (block) {
|
||||
async.eachLimit(block.tx, task_limit_txs, function(txid, next_tx) {
|
||||
Tx.findOne({txid: txid}).then((tx) => {
|
||||
if (tx) {
|
||||
setTimeout( function() {
|
||||
tx = null;
|
||||
next_tx();
|
||||
}, timeout);
|
||||
} else {
|
||||
db.save_tx(txid, block_height, block, function(err, tx_has_vout) {
|
||||
if (err)
|
||||
console.log(err);
|
||||
else
|
||||
console.log('%s: %s', block_height, txid);
|
||||
|
||||
if (tx_has_vout)
|
||||
txes++;
|
||||
|
||||
setTimeout( function() {
|
||||
tx = null;
|
||||
next_tx();
|
||||
}, timeout);
|
||||
});
|
||||
}
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
|
||||
setTimeout( function() {
|
||||
tx = null;
|
||||
next_tx();
|
||||
}, timeout);
|
||||
});
|
||||
}, function() {
|
||||
setTimeout( function() {
|
||||
blockhash = null;
|
||||
block = null;
|
||||
next_block();
|
||||
}, timeout);
|
||||
});
|
||||
} else {
|
||||
console.log('Block not found: %s', blockhash);
|
||||
|
||||
setTimeout( function() {
|
||||
next_block();
|
||||
}, timeout);
|
||||
}
|
||||
// output final benchmark stats
|
||||
console.log({
|
||||
tx_count: txcount,
|
||||
address_count: acount,
|
||||
seconds: (e_timer - s_timer) / 1000,
|
||||
});
|
||||
} else {
|
||||
setTimeout( function() {
|
||||
next_block();
|
||||
}, timeout);
|
||||
}
|
||||
});
|
||||
}, function() {
|
||||
Stats.updateOne({coin: coin}, {
|
||||
last: end,
|
||||
txes: txes
|
||||
}).then(() => {
|
||||
return cb();
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
return cb();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
update_tx_db(settings.coin.name, 1, COUNT, 0, settings.sync.update_timeout, false, function() {
|
||||
var e_timer = new Date().getTime();
|
||||
|
||||
Tx.countDocuments({}).then((txcount) => {
|
||||
Address.countDocuments({}).then((acount) => {
|
||||
var stats = {
|
||||
tx_count: txcount,
|
||||
address_count: acount,
|
||||
seconds: (e_timer - s_timer)/1000,
|
||||
};
|
||||
|
||||
console.log(stats);
|
||||
exit(0);
|
||||
exit(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).catch((err) => {
|
||||
console.log('Error: Unable to connect to database: %s', dbString);
|
||||
exit(999);
|
||||
});
|
||||
}).catch((err) => {
|
||||
console.log('Error: Unable to connect to database: %s', dbString);
|
||||
exit(999);
|
||||
});
|
||||
+8
-447
@@ -1,5 +1,6 @@
|
||||
var mongoose = require('mongoose'),
|
||||
lib = require('../lib/explorer'),
|
||||
blkSync = require('../lib/block_sync'),
|
||||
db = require('../lib/database'),
|
||||
Tx = require('../models/tx'),
|
||||
Address = require('../models/address'),
|
||||
@@ -18,12 +19,14 @@ var stopSync = false;
|
||||
// prevent stopping of the sync script to be able to gracefully shut down
|
||||
process.on('SIGINT', () => {
|
||||
console.log(`${settings.localization.stopping_sync_process}.. ${settings.localization.please_wait}..`);
|
||||
blkSync.setStopSync(true);
|
||||
stopSync = true;
|
||||
});
|
||||
|
||||
// prevent killing of the sync script to be able to gracefully shut down
|
||||
process.on('SIGTERM', () => {
|
||||
console.log(`${settings.localization.stopping_sync_process}.. ${settings.localization.please_wait}..`);
|
||||
blkSync.setStopSync(true);
|
||||
stopSync = true;
|
||||
});
|
||||
|
||||
@@ -67,170 +70,6 @@ function exit(exitCode) {
|
||||
}
|
||||
}
|
||||
|
||||
// updates tx & address balances
|
||||
function update_tx_db(coin, start, end, txes, timeout, check_only, cb) {
|
||||
var complete = false;
|
||||
var blocks_to_scan = [];
|
||||
var task_limit_blocks = settings.sync.block_parallel_tasks;
|
||||
var task_limit_txs = 1;
|
||||
|
||||
// fix for invalid block height (skip genesis block as it should not have valid txs)
|
||||
if (typeof start === 'undefined' || start < 1)
|
||||
start = 1;
|
||||
|
||||
if (task_limit_blocks < 1)
|
||||
task_limit_blocks = 1;
|
||||
|
||||
for (i = start; i < (end + 1); i++)
|
||||
blocks_to_scan.push(i);
|
||||
|
||||
async.eachLimit(blocks_to_scan, task_limit_blocks, function(block_height, next_block) {
|
||||
if (check_only == 0 && block_height % settings.sync.save_stats_after_sync_blocks === 0) {
|
||||
Stats.updateOne({coin: coin}, {
|
||||
last: block_height - 1,
|
||||
txes: txes
|
||||
}).then(() => {});
|
||||
} else if (check_only == 1) {
|
||||
console.log('Checking block ' + block_height + '...');
|
||||
}
|
||||
|
||||
lib.get_blockhash(block_height, function(blockhash) {
|
||||
if (blockhash) {
|
||||
lib.get_block(blockhash, function(block) {
|
||||
if (block) {
|
||||
async.eachLimit(block.tx, task_limit_txs, function(txid, next_tx) {
|
||||
Tx.findOne({txid: txid}).then((tx) => {
|
||||
if (tx && check_only != 2) {
|
||||
setTimeout(function() {
|
||||
tx = null;
|
||||
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_tx({});
|
||||
} else
|
||||
next_tx();
|
||||
}, timeout);
|
||||
} else {
|
||||
// check if the transaction exists but doesn't match the current block height
|
||||
check_delete_tx(tx, block_height, txes, timeout, function(updated_txes, tx_deleted) {
|
||||
// update the running tx count
|
||||
txes = updated_txes;
|
||||
|
||||
// check if this tx should be added to the local database
|
||||
if (tx_deleted || !tx) {
|
||||
// save the transaction to local database
|
||||
db.save_tx(txid, block_height, block, function(err, tx_has_vout) {
|
||||
if (err)
|
||||
console.log(err);
|
||||
else
|
||||
console.log('%s: %s', block_height, txid);
|
||||
|
||||
if (tx_has_vout)
|
||||
txes++;
|
||||
|
||||
setTimeout(function() {
|
||||
tx = null;
|
||||
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_tx({});
|
||||
} else
|
||||
next_tx();
|
||||
}, timeout);
|
||||
});
|
||||
} else {
|
||||
// skip adding the current tx
|
||||
setTimeout(function() {
|
||||
tx = null;
|
||||
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_tx({});
|
||||
} else
|
||||
next_tx();
|
||||
}, timeout);
|
||||
}
|
||||
});
|
||||
}
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
|
||||
setTimeout(function() {
|
||||
tx = null;
|
||||
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_tx({});
|
||||
} else
|
||||
next_tx();
|
||||
}, timeout);
|
||||
});
|
||||
}, function() {
|
||||
setTimeout(function() {
|
||||
blockhash = null;
|
||||
block = null;
|
||||
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_block({});
|
||||
} else
|
||||
next_block();
|
||||
}, timeout);
|
||||
});
|
||||
} else {
|
||||
console.log('Block not found: %s', blockhash);
|
||||
|
||||
setTimeout(function() {
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_block({});
|
||||
} else
|
||||
next_block();
|
||||
}, timeout);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
setTimeout(function() {
|
||||
// check if the script is stopping
|
||||
if (stopSync && check_only != 2) {
|
||||
// stop the loop
|
||||
next_block({});
|
||||
} else
|
||||
next_block();
|
||||
}, timeout);
|
||||
}
|
||||
});
|
||||
}, function() {
|
||||
var statUpdateObject = {};
|
||||
|
||||
// check what stats data should be updated
|
||||
if (stopSync || check_only == 2) {
|
||||
// only update txes when fixing invalid and missing blocks or when a "normal" sync was stopped prematurely
|
||||
statUpdateObject.txes = txes;
|
||||
} else {
|
||||
// update last and txes values for "normal" sync that finishes without being stopped prematurely
|
||||
statUpdateObject = {
|
||||
txes: txes,
|
||||
last: end
|
||||
};
|
||||
}
|
||||
|
||||
// update local stats
|
||||
Stats.updateOne({coin: coin}, statUpdateObject).then(() => {
|
||||
return cb(txes);
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
return cb(txes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// fixes data belonging to orphaned blocks
|
||||
function update_orphans(orphan_index, orphan_current, last_blockindex, timeout, cb) {
|
||||
// lookup the earliest orphaned block if this is the first time that orphans are being checked
|
||||
@@ -352,7 +191,7 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
|
||||
var t = tx_loop.iteration();
|
||||
|
||||
// remove the orphaned tx and cleanup all associated data
|
||||
delete_and_cleanup_tx(txids[t], current_block, tx_count, timeout, function(updated_tx_count) {
|
||||
blkSync.delete_and_cleanup_tx(txids[t], current_block, tx_count, timeout, function(updated_tx_count) {
|
||||
// update the running tx count
|
||||
tx_count = updated_tx_count;
|
||||
|
||||
@@ -380,7 +219,7 @@ function update_orphans(orphan_index, orphan_current, last_blockindex, timeout,
|
||||
// get the most recent stats
|
||||
Stats.findOne({coin: settings.coin.name}).then((stats) => {
|
||||
// add missing txes for the current block
|
||||
update_tx_db(settings.coin.name, current_block, current_block, (stats.txes + tx_count), timeout, 2, function(updated_tx_count) {
|
||||
blkSync.update_tx_db(settings.coin.name, current_block, current_block, (stats.txes + tx_count), timeout, 2, function(updated_tx_count) {
|
||||
// update the stats collection by removing the orphaned txes in this block from the tx count
|
||||
// and setting the orphan_index and orphan_current values in case the sync is interrupted before finishing
|
||||
Stats.updateOne({coin: settings.coin.name}, {
|
||||
@@ -673,7 +512,7 @@ function check_add_tx(txid, blockhash, tx_count, cb) {
|
||||
// check if the block was found
|
||||
if (block) {
|
||||
// save the tx to the local database
|
||||
db.save_tx(txid, block.height, block, function(save_tx_err, tx_has_vout) {
|
||||
blkSync.save_tx(txid, block.height, block, function(save_tx_err, tx_has_vout) {
|
||||
// check if there were any save errors
|
||||
if (save_tx_err)
|
||||
console.log(save_tx_err);
|
||||
@@ -700,284 +539,6 @@ function check_add_tx(txid, blockhash, tx_count, cb) {
|
||||
});
|
||||
}
|
||||
|
||||
function delete_and_cleanup_tx(txid, block_height, tx_count, timeout, cb) {
|
||||
// lookup all address tx records associated with the current tx
|
||||
AddressTx.find({txid: txid}).exec().then((address_txes) => {
|
||||
if (address_txes.length == 0) {
|
||||
// no vouts for this tx, so just delete the tx without cleaning up addresses
|
||||
delete_tx(txid, block_height, function(tx_err, tx_result) {
|
||||
if (tx_err) {
|
||||
console.log(tx_err);
|
||||
return cb(tx_count);
|
||||
} else {
|
||||
// NOTE: do not subtract from the tx_count here because only txes with vouts are counted
|
||||
return cb(tx_count);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// lookup the current tx in the local database
|
||||
Tx.findOne({txid: txid}).then((tx) => {
|
||||
var addressTxArray = [];
|
||||
var has_vouts = (tx.vout != null && tx.vout.length > 0);
|
||||
|
||||
// check if this is a coinbase tx
|
||||
if (tx.vin == null || tx.vin.length == 0) {
|
||||
// add a coinbase tx into the addressTxArray array
|
||||
addressTxArray.push({
|
||||
txid: txid,
|
||||
a_id: 'coinbase',
|
||||
amount: tx.total
|
||||
});
|
||||
}
|
||||
|
||||
// check if there are any vin addresses
|
||||
if (tx.vin != null && tx.vin.length > 0) {
|
||||
// loop through the vin data
|
||||
for (var vin_tx_counter = tx.vin.length - 1; vin_tx_counter >= 0; vin_tx_counter--) {
|
||||
// loop through the addresstxe data
|
||||
for (var vin_addresstx_counter = address_txes.length - 1; vin_addresstx_counter >= 0; vin_addresstx_counter--) {
|
||||
// check if there is a tx record that exactly matches to the addresstx
|
||||
if (tx.vin[vin_tx_counter].addresses == address_txes[vin_addresstx_counter].a_id && tx.vin[vin_tx_counter].amount == -address_txes[vin_addresstx_counter].amount) {
|
||||
// add the address into the addressTxArray array
|
||||
addressTxArray.push({
|
||||
txid: txid,
|
||||
a_id: tx.vin[vin_tx_counter].addresses,
|
||||
amount: address_txes[vin_addresstx_counter].amount
|
||||
});
|
||||
|
||||
// remove the found records from both arrays
|
||||
tx.vin.splice(vin_tx_counter, 1);
|
||||
address_txes.splice(vin_addresstx_counter, 1);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if there are any vout addresses
|
||||
if (tx.vout != null && tx.vout.length > 0) {
|
||||
// loop through the vout data
|
||||
for (var vout_tx_counter = tx.vout.length - 1; vout_tx_counter >= 0; vout_tx_counter--) {
|
||||
// loop through the addresstxe data
|
||||
for (var vout_addresstx_counter = address_txes.length - 1; vout_addresstx_counter >= 0; vout_addresstx_counter--) {
|
||||
// check if there is a tx record that exactly matches to the addresstx
|
||||
if (tx.vout[vout_tx_counter].addresses == address_txes[vout_addresstx_counter].a_id && tx.vout[vout_tx_counter].amount == address_txes[vout_addresstx_counter].amount) {
|
||||
// add the address into the addressTxArray array
|
||||
addressTxArray.push({
|
||||
txid: txid,
|
||||
a_id: tx.vout[vout_tx_counter].addresses,
|
||||
amount: address_txes[vout_addresstx_counter].amount
|
||||
});
|
||||
|
||||
// remove the found records from both arrays
|
||||
tx.vout.splice(vout_tx_counter, 1);
|
||||
address_txes.splice(vout_addresstx_counter, 1);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if there are still more vin/vout records to process
|
||||
if (tx.vin.length > 0 || tx.vout.length > 0 || address_txes.length > 0) {
|
||||
// get all unique remaining addresses
|
||||
var address_list = [];
|
||||
|
||||
// get unique addresses from the tx vin
|
||||
tx.vin.forEach(function(vin) {
|
||||
if (address_list.indexOf(vin.addresses) == -1)
|
||||
address_list.push(vin.addresses);
|
||||
});
|
||||
|
||||
// get unique addresses from the tx vout
|
||||
tx.vout.forEach(function(vout) {
|
||||
if (address_list.indexOf(vout.addresses) == -1)
|
||||
address_list.push(vout.addresses);
|
||||
});
|
||||
|
||||
// get unique addresses from the addresstxes
|
||||
address_txes.forEach(function(address_tx) {
|
||||
if (address_list.indexOf(address_tx.a_id) == -1)
|
||||
address_list.push(address_tx.a_id);
|
||||
});
|
||||
|
||||
// loop through each unique address
|
||||
address_list.forEach(function(address) {
|
||||
var vin_total = 0;
|
||||
var vout_total = 0;
|
||||
var address_tx_total = 0;
|
||||
|
||||
// add up all the vin amounts for this address
|
||||
tx.vin.forEach(function(vin) {
|
||||
// check if this is the correct address
|
||||
if (vin.addresses == address)
|
||||
vin_total += vin.amount;
|
||||
});
|
||||
|
||||
// add up all the vout amounts for this address
|
||||
tx.vout.forEach(function(vout) {
|
||||
// check if this is the correct address
|
||||
if (vout.addresses == address)
|
||||
vout_total += vout.amount;
|
||||
});
|
||||
|
||||
// add up all the addresstx amounts for this address
|
||||
address_txes.forEach(function(address_tx) {
|
||||
// check if this is the correct address
|
||||
if (address_tx.a_id == address)
|
||||
address_tx_total += address_tx.amount;
|
||||
});
|
||||
|
||||
// check if the tx and addresstx totals match
|
||||
if ((vout_total - vin_total) == address_tx_total) {
|
||||
// the values match (this indicates that this address sent coins to themselves)
|
||||
// add a vin record for this address into the addressTxArray array
|
||||
addressTxArray.push({
|
||||
txid: txid,
|
||||
a_id: address,
|
||||
amount: -vin_total
|
||||
});
|
||||
|
||||
// add a vout record for this address into the addressTxArray array
|
||||
addressTxArray.push({
|
||||
txid: txid,
|
||||
a_id: address,
|
||||
amount: vout_total
|
||||
});
|
||||
} else {
|
||||
// the values do not match (this indicates there was a problem saving the data)
|
||||
// output the data for this address as-is, using the addresstx values
|
||||
address_txes.forEach(function(address_tx) {
|
||||
// check if this is the correct address
|
||||
if (address_tx.a_id == address) {
|
||||
// add a record for this address into the addressTxArray array
|
||||
addressTxArray.push({
|
||||
txid: txid,
|
||||
a_id: address,
|
||||
amount: address_tx.amount
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// loop through the address txes
|
||||
lib.syncLoop(addressTxArray.length, function(address_loop) {
|
||||
var a = address_loop.iteration();
|
||||
|
||||
// fix the balance, sent and received data for the current address
|
||||
fix_address_data(addressTxArray[a], function() {
|
||||
setTimeout(function() {
|
||||
// move to the next address record
|
||||
address_loop.next();
|
||||
}, timeout);
|
||||
});
|
||||
}, function() {
|
||||
// delete all AddressTx records from the local collection for this tx
|
||||
AddressTx.deleteMany({txid: txid}).then((address_tx_result) => {
|
||||
// delete the tx from the local database
|
||||
delete_tx(txid, block_height, function(tx_err, tx_result) {
|
||||
if (tx_err) {
|
||||
console.log(tx_err);
|
||||
return cb(tx_count);
|
||||
} else {
|
||||
// check if the deleted tx had vouts
|
||||
if (has_vouts) {
|
||||
// keep a running total of txes that were removed
|
||||
tx_count -= tx_result.deletedCount;
|
||||
}
|
||||
|
||||
return cb(tx_count);
|
||||
}
|
||||
});
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
|
||||
// delete the tx from the local database
|
||||
delete_tx(txid, block_height, function(tx_err, tx_result) {
|
||||
if (tx_err) {
|
||||
console.log(tx_err);
|
||||
return cb(tx_count);
|
||||
} else {
|
||||
// check if the deleted tx had vouts
|
||||
if (has_vouts) {
|
||||
// keep a running total of txes that were removed
|
||||
tx_count -= tx_result.deletedCount;
|
||||
}
|
||||
|
||||
return cb(tx_count);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
return cb(tx_count);
|
||||
});
|
||||
}
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
return cb(tx_count);
|
||||
});
|
||||
}
|
||||
|
||||
function fix_address_data(address_data, cb) {
|
||||
var addr_inc = {};
|
||||
var amount = address_data.amount;
|
||||
|
||||
// determine how to fix the address balances
|
||||
if (address_data.a_id == 'coinbase')
|
||||
addr_inc.sent = -amount;
|
||||
else if (amount < 0) {
|
||||
// vin
|
||||
addr_inc.sent = amount;
|
||||
addr_inc.balance = -amount;
|
||||
} else {
|
||||
// vout
|
||||
addr_inc.received = -amount;
|
||||
addr_inc.balance = -amount;
|
||||
}
|
||||
|
||||
// reverse the amount from the running totals in the Address collection for the current address
|
||||
Address.findOneAndUpdate({a_id: address_data.a_id}, {
|
||||
$inc: addr_inc
|
||||
}, {
|
||||
upsert: false
|
||||
}).then((return_address) => {
|
||||
// finished fixing the address balance data
|
||||
return cb();
|
||||
}).catch((err) => {
|
||||
console.log(err);
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
||||
function delete_tx(txid, block_height, cb) {
|
||||
// delete the tx from the local database
|
||||
Tx.deleteOne({txid: txid, blockindex: block_height}).then((tx_result) => {
|
||||
return cb(null, tx_result);
|
||||
}).catch((err) => {
|
||||
return cb(err, null);
|
||||
});
|
||||
}
|
||||
|
||||
function check_delete_tx(tx, block_height, tx_count, timeout, cb) {
|
||||
// check if the tx object exists and does not match the current block height
|
||||
if (tx && tx.blockindex != block_height) {
|
||||
// the transaction exists but does not match the correct block height, therefore it should be deleted
|
||||
delete_and_cleanup_tx(tx.txid, tx.blockindex, tx_count, timeout, function(updated_tx_count) {
|
||||
// finished removing the transaction
|
||||
return cb(updated_tx_count, true);
|
||||
});
|
||||
} else {
|
||||
// tx dosn't exist or block heights match so nothing to do
|
||||
return cb(tx_count, false);
|
||||
}
|
||||
}
|
||||
|
||||
function update_heavy(coin, height, count, heavycoin_enabled, cb) {
|
||||
if (heavycoin_enabled == true) {
|
||||
db.update_heavy(coin, height, count, function() {
|
||||
@@ -1254,7 +815,7 @@ function block_sync(reindex, stats) {
|
||||
// Check if the sync msg should be shown
|
||||
check_show_sync_message(count - last);
|
||||
|
||||
update_tx_db(settings.coin.name, last, count, stats.txes, settings.sync.update_timeout, 0, function() {
|
||||
blkSync.update_tx_db(settings.coin.name, last, count, stats.txes, settings.sync.update_timeout, 0, function() {
|
||||
// check if the script stopped prematurely
|
||||
if (stopSync) {
|
||||
console.log(`${(reindex ? 'Reindex' : 'Block sync')} was stopped prematurely`);
|
||||
@@ -1412,7 +973,7 @@ if (lib.is_locked([database]) == false) {
|
||||
if (stats !== false) {
|
||||
console.log(`${settings.localization.checking_blocks}.. ${settings.localization.please_wait}..`);
|
||||
|
||||
update_tx_db(settings.coin.name, block_start, stats.count, stats.txes, settings.sync.check_timeout, 1, function() {
|
||||
blkSync.update_tx_db(settings.coin.name, block_start, stats.count, stats.txes, settings.sync.check_timeout, 1, function() {
|
||||
// check if the script stopped prematurely
|
||||
if (stopSync) {
|
||||
console.log('Block check was stopped prematurely');
|
||||
|
||||
Reference in New Issue
Block a user