Improved syncing features

This commit is contained in:
joeuhren
2020-12-04 10:59:58 -07:00
parent 4b208b74a4
commit b6808da454
5 changed files with 385 additions and 300 deletions
+20 -20
View File
@@ -59,34 +59,34 @@ To stop the cluster you can use
### Syncing databases with the blockchain
sync.js (located in scripts/) is used for updating the local databases. This script must be called from the explorers root directory.
sync.sh (located in scripts/) is used for updating the local databases. This script must be called from the explorers root directory.
Usage: node scripts/sync.js [database] [mode]
Usage: scripts/sync.sh /path/to/nodejs [mode]
database: (required)
index [mode] Main index: coin info/stats, transactions & addresses
market Market data: summaries, orderbooks, trade history & chartdata
Mode: (required)
update Updates index from last sync to current block
check Checks index for (and adds) any missing transactions/addresses
reindex Clears index then resyncs from genesis to current block
reindex-rich Clears and recreates the richlist data
reindex-txcount Rescan and flatten the tx count value for faster access
market Updates market summaries, orderbooks, trade history + charts
peers Updates peer info based on local wallet connections
mode: (required for index database only)
update Updates index from last sync to current block
check checks index for (and adds) any missing transactions/addresses
reindex Clears index then resyncs from genesis to current block
notes:
* 'current block' is the latest created block when script is executed.
* The market database only supports (& defaults to) reindex mode.
* If check mode finds missing data(ignoring new data since last sync),
Notes:
- 'current block' is the latest created block when script is executed.
- The market + peers databases only support (& defaults to) reindex mode.
- If check mode finds missing data(ignoring new data since last sync),
index_timeout in settings.json is set too low.
*It is recommended to have this script launched via a cronjob at 1+ min intervals.*
**crontab**
*Example crontab; update index every minute and market data every 2 minutes*
*Example crontab; update index every minute, market data every 2 minutes and peers every 5 minutes*
*/1 * * * * /path/to/explorer/scripts/index.sh /path/to/nodejs > /dev/null 2>&1
*/2 * * * * cd /path/to/explorer && /path/to/nodejs scripts/sync.js market > /dev/null 2>&1
*/5 * * * * cd /path/to/explorer && /path/to/nodejs scripts/peers.js > /dev/null 2>&1
*/1 * * * * /path/to/explorer/scripts/sync.sh /path/to/nodejs update > /dev/null 2>&1
*/2 * * * * /path/to/explorer/scripts/sync.sh /path/to/nodejs market > /dev/null 2>&1
*/5 * * * * /path/to/explorer/scripts/sync.sh /path/to/nodejs peers > /dev/null 2>&1
### Wallet
@@ -200,13 +200,13 @@ A pair of scripts to backup or restore the internal mongo database are included.
RangeError: Maximum call stack size exceeded
Nodes default stack size may be too small to index addresses with many tx's. If you experience the above error while running sync.js the stack size needs to be increased.
Nodes default stack size may be too small to index addresses with many tx's. If you experience the above error while running sync.sh the stack size needs to be increased.
To determine the default setting run
node --v8-options | grep -B0 -A1 stack_size
To run sync.js with a larger stack size launch with
To run a sync with a larger stack size launch with
node --stack-size=[SIZE] scripts/sync.js index update
-16
View File
@@ -1,16 +0,0 @@
#!/bin/bash
EXPLORER_PATH=$(dirname $(dirname $(readlink -f "$0")))
if [ -f "${EXPLORER_PATH}/tmp/index.pid" ];
then
ps -p `cat ${EXPLORER_PATH}/tmp/index.pid` > /dev/null
if [ $? -eq 0 ]; then
exit 1
else
rm "${EXPLORER_PATH}/tmp/index.pid"
fi
fi
if [ -z "${1}" ]; then
eval "cd ${EXPLORER_PATH} && $(which node) scripts/sync.js index update"
else
eval "cd ${EXPLORER_PATH} && ${1} scripts/sync.js index update"
fi
-113
View File
@@ -1,113 +0,0 @@
var mongoose = require('mongoose')
, lib = require('../lib/explorer')
, db = require('../lib/database')
, settings = require('../lib/settings')
, request = require('postman-request');
var COUNT = 5000; //number of blocks to index
function exit() {
mongoose.disconnect();
process.exit(0);
}
var dbString = 'mongodb://' + settings.dbsettings.user;
dbString = dbString + ':' + settings.dbsettings.password;
dbString = dbString + '@' + settings.dbsettings.address;
dbString = dbString + ':' + settings.dbsettings.port;
dbString = dbString + '/' + settings.dbsettings.database;
mongoose.connect(dbString, { useNewUrlParser: true, useCreateIndex: true, useUnifiedTopology: true, useFindAndModify: false }, function(err) {
if (err) {
console.log('Unable to connect to database: %s', dbString);
console.log('Aborting');
exit();
} else {
request({uri: 'http://127.0.0.1:' + settings.port + '/api/getpeerinfo', json: true, headers: {'User-Agent': 'eiquidus'}}, function (error, response, body) {
lib.syncLoop(body.length, function (loop) {
var i = loop.iteration();
var address = body[i].addr.substring(0, body[i].addr.lastIndexOf(":")).replace("[","").replace("]","");
var port = body[i].addr.substring(body[i].addr.lastIndexOf(":") + 1);
var rateLimit = new RateLimit(1, 2000, false);
db.find_peer(address, function(peer) {
if (peer) {
if (isNaN(peer['port']) || peer['port'].length < 2 || peer['country'].length < 1 || peer['country_code'].length < 1) {
db.drop_peers(function() {
console.log('Saved peers missing ports or country, dropping peers. Re-run this script afterwards.');
exit();
});
}
// peer already exists
loop.next();
} else {
rateLimit.schedule(function() {
request({uri: 'https://freegeoip.app/json/' + address, json: true, headers: {'User-Agent': 'eiquidus'}}, function (error, response, geo) {
db.create_peer({
address: address,
port: port,
protocol: body[i].version,
version: body[i].subver.replace('/', '').replace('/', ''),
country: geo.country_name,
country_code: geo.country_code
}, function(){
loop.next();
});
});
});
}
});
}, function() {
exit();
});
});
}
});
// rate limiting class from Matteo Agosti via https://www.matteoagosti.com/blog/2013/01/22/rate-limiting-function-calls-in-javascript/
var RateLimit = (function() {
var RateLimit = function(maxOps, interval, allowBursts) {
this._maxRate = allowBursts ? maxOps : maxOps / interval;
this._interval = interval;
this._allowBursts = allowBursts;
this._numOps = 0;
this._start = new Date().getTime();
this._queue = [];
};
RateLimit.prototype.schedule = function(fn) {
var that = this,
rate = 0,
now = new Date().getTime(),
elapsed = now - this._start;
if (elapsed > this._interval) {
this._numOps = 0;
this._start = now;
}
rate = this._numOps / (this._allowBursts ? 1 : elapsed);
if (rate < this._maxRate) {
if (this._queue.length === 0) {
this._numOps++;
fn();
}
else {
if (fn) this._queue.push(fn);
this._numOps++;
this._queue.shift()();
}
}
else {
if (fn) this._queue.push(fn);
setTimeout(function() {
that.schedule();
}, 1 / this._maxRate);
}
};
return RateLimit;
})();
+245 -151
View File
@@ -1,4 +1,5 @@
var mongoose = require('mongoose')
, lib = require('../lib/explorer')
, db = require('../lib/database')
, Tx = require('../models/tx')
, Address = require('../models/address')
@@ -6,6 +7,7 @@ var mongoose = require('mongoose')
, Richlist = require('../models/richlist')
, Stats = require('../models/stats')
, settings = require('../lib/settings')
, request = require('postman-request')
, fs = require('fs');
var mode = 'update';
@@ -13,22 +15,21 @@ var database = 'index';
// displays usage and exits
function usage() {
console.log('Usage: node scripts/sync.js [database] [mode]');
console.log('Usage: scripts/sync.sh /path/to/nodejs [mode]');
console.log('');
console.log('database: (required)');
console.log('index [mode] Main index: coin info/stats, transactions & addresses');
console.log('market Market data: summaries, orderbooks, trade history & chartdata')
console.log('');
console.log('mode: (required for index database only)');
console.log('Mode: (required)');
console.log('update Updates index from last sync to current block');
console.log('check Checks index for (and adds) any missing transactions/addresses');
console.log('reindex Clears index then resyncs from genesis to current block');
console.log('reindex-rich Clears and recreates the richlist data');
console.log('reindex-txcount Rescan and flatten the tx count value for faster access');
console.log('market Updates market summaries, orderbooks, trade history + charts');
console.log('peers Updates peer info based on local wallet connections');
console.log('');
console.log('notes:');
console.log('* \'current block\' is the latest created block when script is executed.');
console.log('* The market database only supports (& defaults to) reindex mode.');
console.log('* If check mode finds missing data(ignoring new data since last sync),');
console.log('Notes:');
console.log('- \'current block\' is the latest created block when script is executed.');
console.log('- The market + peers databases only support (& defaults to) reindex mode.');
console.log('- If check mode finds missing data (ignoring new data since last sync),');
console.log(' index_timeout in settings.json is set too low.')
console.log('');
process.exit(0);
@@ -52,6 +53,7 @@ if (process.argv[2] == 'index') {
break;
case 'reindex-rich':
mode = 'reindex-rich';
break;
case 'reindex-txcount':
mode = 'reindex-txcount';
break;
@@ -59,8 +61,10 @@ if (process.argv[2] == 'index') {
usage();
}
}
} else if (process.argv[2] == 'market'){
} else if (process.argv[2] == 'market') {
database = 'market';
} else if (process.argv[2] == 'peers') {
database = 'peers';
} else {
usage();
}
@@ -125,54 +129,153 @@ dbString = dbString + '@' + settings.dbsettings.address;
dbString = dbString + ':' + settings.dbsettings.port;
dbString = dbString + '/' + settings.dbsettings.database;
is_locked(function (exists) {
if (exists) {
console.log("Script already running..");
process.exit(0);
} else {
create_lock(function (){
console.log("script launched with pid: " + process.pid);
mongoose.connect(dbString, { useNewUrlParser: true, useCreateIndex: true, useUnifiedTopology: true, useFindAndModify: false }, function(err) {
if (err) {
console.log('Unable to connect to database: %s', dbString);
console.log('Aborting');
exit();
} else if (database == 'index') {
db.check_stats(settings.coin, function(exists) {
if (exists == false) {
console.log('Run \'npm start\' to create database structures before running this script.');
exit();
} else {
db.update_db(settings.coin, function(stats){
if (settings.heavy == true) {
db.update_heavy(settings.coin, stats.count, 20, function(){
if (database == 'peers') {
console.log('syncing peers.. please wait..');
// Initialize the rate limiting class from Matteo Agosti via https://www.matteoagosti.com/blog/2013/01/22/rate-limiting-function-calls-in-javascript/
var RateLimit = (function() {
var RateLimit = function(maxOps, interval, allowBursts) {
this._maxRate = allowBursts ? maxOps : maxOps / interval;
this._interval = interval;
this._allowBursts = allowBursts;
});
}
if (mode == 'reindex') {
Tx.deleteMany({}, function(err) {
console.log('TXs cleared.');
Address.deleteMany({}, function(err2) {
console.log('Addresses cleared.');
AddressTx.deleteMany({}, function(err3) {
console.log('Address TXs cleared.');
Richlist.updateOne({coin: settings.coin}, {
received: [],
balance: [],
}, function(err3) {
Stats.updateOne({coin: settings.coin}, {
last: 0,
count: 0,
supply: 0
}, function() {
console.log('index cleared (reindex)');
});
db.update_tx_db(settings.coin, 1, stats.count, stats.txes, settings.update_timeout, function(){
db.update_richlist('received', function(){
db.update_richlist('balance', function(){
db.get_stats(settings.coin, function(nstats){
console.log('reindex complete (block: %s)', nstats.last);
exit();
this._numOps = 0;
this._start = new Date().getTime();
this._queue = [];
};
RateLimit.prototype.schedule = function(fn) {
var that = this,
rate = 0,
now = new Date().getTime(),
elapsed = now - this._start;
if (elapsed > this._interval) {
this._numOps = 0;
this._start = now;
}
rate = this._numOps / (this._allowBursts ? 1 : elapsed);
if (rate < this._maxRate) {
if (this._queue.length === 0) {
this._numOps++;
fn();
}
else {
if (fn) this._queue.push(fn);
this._numOps++;
this._queue.shift()();
}
}
else {
if (fn) this._queue.push(fn);
setTimeout(function() {
that.schedule();
}, 1 / this._maxRate);
}
};
return RateLimit;
})();
// syncing peers does not require a lock
mongoose.connect(dbString, { useNewUrlParser: true, useCreateIndex: true, useUnifiedTopology: true, useFindAndModify: false }, function(err) {
if (err) {
console.log('Unable to connect to database: %s', dbString);
console.log('Aborting');
exit();
} else {
request({uri: 'http://127.0.0.1:' + settings.port + '/api/getpeerinfo', json: true, headers: {'User-Agent': 'eiquidus'}}, function (error, response, body) {
lib.syncLoop(body.length, function (loop) {
var i = loop.iteration();
var address = body[i].addr.substring(0, body[i].addr.lastIndexOf(":")).replace("[","").replace("]","");
var port = body[i].addr.substring(body[i].addr.lastIndexOf(":") + 1);
var rateLimit = new RateLimit(1, 2000, false);
db.find_peer(address, function(peer) {
if (peer) {
if (isNaN(peer['port']) || peer['port'].length < 2 || peer['country'].length < 1 || peer['country_code'].length < 1) {
db.drop_peers(function() {
console.log('Saved peers missing ports or country, dropping peers. Re-run this script afterwards.');
exit();
});
}
// peer already exists
loop.next();
} else {
rateLimit.schedule(function() {
request({uri: 'https://freegeoip.app/json/' + address, json: true, headers: {'User-Agent': 'eiquidus'}}, function (error, response, geo) {
db.create_peer({
address: address,
port: port,
protocol: body[i].version,
version: body[i].subver.replace('/', '').replace('/', ''),
country: geo.country_name,
country_code: geo.country_code
}, function(){
loop.next();
});
});
});
}
});
}, function() {
console.log('peer sync complete');
exit();
});
});
}
});
} else {
// index and market sync requires locking
is_locked(function (exists) {
if (exists) {
console.log("Script already running..");
process.exit(0);
} else {
create_lock(function (){
console.log("script launched with pid: " + process.pid);
mongoose.connect(dbString, { useNewUrlParser: true, useCreateIndex: true, useUnifiedTopology: true, useFindAndModify: false }, function(err) {
if (err) {
console.log('Unable to connect to database: %s', dbString);
console.log('Aborting');
exit();
} else if (database == 'index') {
db.check_stats(settings.coin, function(exists) {
if (exists == false) {
console.log('Run \'npm start\' to create database structures before running this script.');
exit();
} else {
db.update_db(settings.coin, function(stats){
if (settings.heavy == true) {
db.update_heavy(settings.coin, stats.count, 20, function(){
});
}
if (mode == 'reindex') {
Tx.deleteMany({}, function(err) {
console.log('TXs cleared.');
Address.deleteMany({}, function(err2) {
console.log('Addresses cleared.');
AddressTx.deleteMany({}, function(err3) {
console.log('Address TXs cleared.');
Richlist.updateOne({coin: settings.coin}, {
received: [],
balance: [],
}, function(err3) {
Stats.updateOne({coin: settings.coin}, {
last: 0,
count: 0,
supply: 0
}, function() {
console.log('index cleared (reindex)');
});
db.update_tx_db(settings.coin, 1, stats.count, stats.txes, settings.update_timeout, function(){
db.update_richlist('received', function(){
db.update_richlist('balance', function(){
db.get_stats(settings.coin, function(nstats){
console.log('reindex complete (block: %s)', nstats.last);
exit();
});
});
});
});
@@ -180,112 +283,103 @@ is_locked(function (exists) {
});
});
});
});
} else if (mode == 'check') {
db.update_tx_db(settings.coin, 1, stats.count, stats.txes, settings.check_timeout, function(){
db.get_stats(settings.coin, function(nstats){
console.log('check complete (block: %s)', nstats.last);
exit();
});
});
} else if (mode == 'update') {
// Lookup the last block index
Tx.findOne({}, {blockindex:1}).sort({blockindex:-1}).limit(1).exec(function(err, data){
var nLast = stats.last;
if (!err && data) {
// start from the last block index
nLast = data.blockindex;
}
db.update_tx_db(settings.coin, nLast, stats.count, stats.txes, settings.update_timeout, function(){
db.update_richlist('received', function(){
db.update_richlist('balance', function(){
db.get_stats(settings.coin, function(nstats){
console.log('update complete (block: %s)', nstats.last);
exit();
});
});
} else if (mode == 'check') {
console.log('starting check.. please wait..');
db.update_tx_db(settings.coin, 1, stats.count, stats.txes, settings.check_timeout, function(){
db.get_stats(settings.coin, function(nstats){
console.log('check complete (block: %s)', nstats.last);
exit();
});
});
});
} else if (mode == 'reindex-rich') {
console.log('update started');
db.update_tx_db(settings.coin, stats.last, stats.count, stats.txes, settings.check_timeout, function(){
console.log('update finished');
db.check_richlist(settings.coin, function(exists){
if (exists == true) {
console.log('richlist entry found, deleting now..');
} else if (mode == 'update') {
// Lookup the last block index
Tx.findOne({}, {blockindex:1}).sort({blockindex:-1}).limit(1).exec(function(err, data){
var nLast = stats.last;
if (!err && data) {
// start from the last block index
nLast = data.blockindex;
}
db.delete_richlist(settings.coin, function(deleted) {
if (deleted == true) {
console.log('richlist entry deleted');
}
db.create_richlist(settings.coin, function() {
console.log('richlist created.');
db.update_richlist('received', function(){
console.log('richlist updated received.');
db.update_richlist('balance', function(){
console.log('richlist updated balance.');
db.get_stats(settings.coin, function(nstats){
console.log('update complete (block: %s)', nstats.last);
exit();
});
db.update_tx_db(settings.coin, nLast, stats.count, stats.txes, settings.update_timeout, function(){
db.update_richlist('received', function(){
db.update_richlist('balance', function(){
db.get_stats(settings.coin, function(nstats){
console.log('update complete (block: %s)', nstats.last);
exit();
});
});
});
});
});
});
} else if (mode == 'reindex-txcount') {
console.log('calculating tx count.. please wait..');
// Resetting the transaction counter requires a single lookup on the txes collection to find all txes that have a positive or zero total and 1 or more vout
Tx.find({'total': {$gte: 0}, 'vout': { $gte: { $size: 1 }}}).countDocuments(function(err, count) {
console.log('found tx count: ' + count.toString());
Stats.updateOne({coin: settings.coin}, {
txes: count
}, function() {
console.log('tx count update complete');
exit();
});
});
} else if (mode == 'reindex-rich') {
console.log('check richlist');
db.check_richlist(settings.coin, function(exists) {
if (exists) console.log('richlist entry found, deleting now..');
db.delete_richlist(settings.coin, function(deleted) {
if (deleted) console.log('richlist entry deleted');
db.create_richlist(settings.coin, function() {
console.log('richlist created.');
db.update_richlist('received', function() {
console.log('richlist updated received.');
db.update_richlist('balance', function() {
console.log('richlist update complete');
exit();
});
});
});
});
});
} else if (mode == 'reindex-txcount') {
console.log('calculating tx count.. please wait..');
// Resetting the transaction counter requires a single lookup on the txes collection to find all txes that have a positive or zero total and 1 or more vout
Tx.find({'total': {$gte: 0}, 'vout': { $gte: { $size: 1 }}}).countDocuments(function(err, count) {
console.log('found tx count: ' + count.toString());
Stats.updateOne({coin: settings.coin}, {
txes: count
}, function() {
console.log('tx count update complete');
exit();
});
});
}
});
}
});
} else {
//update markets
var markets = settings.markets.enabled;
var complete = 0;
for (var x = 0; x < markets.length; x++) {
var market = markets[x];
db.check_market(market, function(mkt, exists) {
if (exists) {
db.update_markets_db(mkt, function(err) {
if (!err) {
console.log('%s market data updated successfully.', mkt);
complete++;
if (complete == markets.length)
get_last_usd_price();
} else {
console.log('%s: %s', mkt, err);
complete++;
if (complete == markets.length)
get_last_usd_price();
}
});
} else {
console.log('error: entry for %s does not exists in markets db.', mkt);
complete++;
if (complete == markets.length)
get_last_usd_price();
}
});
}
});
} else {
//update markets
var markets = settings.markets.enabled;
var complete = 0;
for (var x = 0; x < markets.length; x++) {
var market = markets[x];
db.check_market(market, function(mkt, exists) {
if (exists) {
db.update_markets_db(mkt, function(err) {
if (!err) {
console.log('%s market data updated successfully.', mkt);
complete++;
if (complete == markets.length)
get_last_usd_price();
} else {
console.log('%s: %s', mkt, err);
complete++;
if (complete == markets.length)
get_last_usd_price();
}
});
} else {
console.log('error: entry for %s does not exists in markets db.', mkt);
complete++;
if (complete == markets.length)
get_last_usd_price();
}
});
}
}
});
});
});
}
});
}
});
}
function get_last_usd_price() {
// Get the last usd price for coinstats
+120
View File
@@ -0,0 +1,120 @@
#!/bin/bash
readonly EXPLORER_PATH=$(dirname $(dirname $(readlink -f "$0")))
NODE_PATH=""
MODE=""
# Check if parameters were passed into the script
if [ -n "${1}" ]; then
# At least one parameter has been passed in
case ${1} in
"update")
# Index update
MODE="index update"
;;
"check")
# Index check
MODE="index check"
;;
"reindex")
# Index reindex
MODE="index reindex"
;;
"reindex-rich")
# Index reindex-rich
MODE="index reindex-rich"
;;
"reindex-txcount")
# Index reindex-txcount
MODE="index reindex-txcount"
;;
"market")
# Market update
MODE="market"
;;
"peers")
# Peers update
MODE="peers"
;;
*)
# Check if this is a file that exists on the filesystem
if [ -f ${1} ]; then
# The file exists. Assume this is the path to node
NODE_PATH="${1}"
fi
;;
esac
# Check if the mode is already set
if [ -z "${MODE}" ]; then
# Mode is not set so check if the next parameter exists
if [ -n "${2}" ]; then
# Determine which mode this last parameter is
case ${2} in
"update")
# Index update
MODE="index update"
;;
"check")
# Index check
MODE="index check"
;;
"reindex")
# Index reindex
MODE="index reindex"
;;
"reindex-rich")
# Index reindex-rich
MODE="index reindex-rich"
;;
"reindex-txcount")
# Index reindex-txcount
MODE="index reindex-txcount"
;;
"market")
# Market update
MODE="market"
;;
"peers")
# Peers update
MODE="peers"
;;
esac
elif [ -n "${NODE_PATH}" ]; then
# Node path was specified but no mode, so default to 'index update' mode
MODE="index update"
fi
fi
else
# No parameters specified so default to 'index update' mode
MODE="index update"
fi
# Check if the mode is set
if [ -n "${MODE}" ]; then
# Mode is set
# Check if the desired mode requires a lock
if [ "${MODE}" != "peers" ]; then
# A lock is required
# Check if the script is already running (tmp/index.pid file already exists)
if [ -f "${EXPLORER_PATH}/tmp/index.pid" ]; then
# The tmp/index.pid file exists. Check if the process is actually still running
ps -p `cat ${EXPLORER_PATH}/tmp/index.pid` > /dev/null
if [ $? -eq 0 ]; then
# Script is running so the data is locked and we must exit now and try again later
echo "Script already running.."
exit 1
else
# Script is not actually running so we can delete the lock file
rm "${EXPLORER_PATH}/tmp/index.pid"
fi
fi
fi
# Check if the node path was specified
if [ -z "${NODE_PATH}" ]; then
# Node path not specified so lookup using the 'which' cmd
eval "cd ${EXPLORER_PATH} && $(which node) scripts/sync.js ${MODE}"
else
# Node path specified
eval "cd ${EXPLORER_PATH} && ${NODE_PATH} scripts/sync.js ${MODE}"
fi
else
# Mode not set so load the sync script without specifying the mode to return the usage options
eval "cd ${EXPLORER_PATH} && $(which node) scripts/sync.js"
fi