bkpr: convert all the local db sql queries into calls to sql plugin.

With some help (and hinderance!) from ChatGPT: the field names
differ slightly from our internal db.

The particilar wrinkle is that we have to restrict all queries to
limit them to entries we've seen already.  Our code expects this (we
used to only enter it into the db when we processed it), and it would
otherwise be confusing if a sql query returned inconsistent results
because an event occurred while bookkeeper was processing.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2025-08-19 10:30:52 +09:30
parent 2a191479a3
commit b70f4f6184
8 changed files with 347 additions and 657 deletions

View File

@@ -219,7 +219,7 @@ getblockheight_done(struct command *cmd,
/* Get the income events */
db_begin_transaction(bkpr->db);
apys = compute_channel_apys(cmd, bkpr,
apys = compute_channel_apys(cmd, bkpr, cmd,
*req->start_time,
*req->end_time,
blockheight);
@@ -314,7 +314,7 @@ static struct command_result *do_dump_income(struct command *cmd,
/* Ok, go find me some income events! */
db_begin_transaction(bkpr->db);
evs = list_income_events(cmd, bkpr, *info->start_time, *info->end_time,
evs = list_income_events(cmd, bkpr, cmd, *info->start_time, *info->end_time,
*info->consolidate_fees);
db_commit_transaction(bkpr->db);
@@ -366,7 +366,7 @@ static struct command_result *do_list_income(struct command *cmd,
/* Ok, go find me some income events! */
db_begin_transaction(bkpr->db);
evs = list_income_events(cmd, bkpr, *info->start_time, *info->end_time,
evs = list_income_events(cmd, bkpr, cmd, *info->start_time, *info->end_time,
*info->consolidate_fees);
db_commit_transaction(bkpr->db);
@@ -418,7 +418,7 @@ static struct command_result *do_inspect(struct command *cmd,
acct_name);
db_begin_transaction(bkpr->db);
find_txo_chain(cmd, bkpr, acct, &txos);
find_txo_chain(cmd, bkpr, cmd, acct, &txos);
fee_sums = find_account_onchain_fees(cmd, bkpr, acct);
db_commit_transaction(bkpr->db);
@@ -625,22 +625,22 @@ static struct command_result *do_account_events(struct command *cmd,
db_begin_transaction(bkpr->db);
if (acct) {
channel_events = account_get_channel_events(cmd, bkpr->db, acct);
chain_events = account_get_chain_events(cmd, bkpr, acct);
channel_events = account_get_channel_events(cmd, bkpr, cmd, acct);
chain_events = account_get_chain_events(cmd, bkpr, cmd, acct);
onchain_fees = account_get_chain_fees(tmpctx, bkpr, acct->name);
} else if (info->payment_id != NULL) {
channel_events = get_channel_events_by_id(cmd, bkpr->db, info->payment_id);
channel_events = get_channel_events_by_id(cmd, bkpr, cmd, info->payment_id);
tx_id = tal(cmd, struct bitcoin_txid);
tx_id->shad.sha = *info->payment_id;
/* Transaction ids are stored as big-endian in the database */
reverse_bytes(tx_id->shad.sha.u.u8, sizeof(tx_id->shad.sha.u.u8));
chain_events = find_chain_events_bytxid(cmd, bkpr, tx_id);
chain_events = find_chain_events_bytxid(cmd, bkpr, cmd, tx_id);
onchain_fees = get_chain_fees_by_txid(cmd, bkpr, tx_id);
} else {
channel_events = list_channel_events(cmd, bkpr->db);
chain_events = list_chain_events(cmd, bkpr);
channel_events = list_channel_events(cmd, bkpr, cmd);
chain_events = list_chain_events(cmd, bkpr, cmd);
onchain_fees = list_chain_fees(cmd, bkpr);
}
db_commit_transaction(bkpr->db);
@@ -682,7 +682,7 @@ static struct command_result *do_edit_desc(struct command *cmd,
db_begin_transaction(bkpr->db);
add_utxo_description(cmd, bkpr, info->outpoint, info->new_desc);
chain_events = get_chain_events_by_outpoint(cmd, bkpr, info->outpoint, true);
chain_events = get_chain_events_by_outpoint(cmd, bkpr, cmd, info->outpoint);
db_commit_transaction(bkpr->db);
res = jsonrpc_stream_success(cmd);
@@ -724,8 +724,8 @@ static struct command_result *do_edit_desc_payment(struct command *cmd,
db_begin_transaction(bkpr->db);
add_payment_hash_description(cmd, bkpr, info->identifier, info->new_desc);
chain_events = get_chain_events_by_id(cmd, bkpr, info->identifier);
channel_events = get_channel_events_by_id(cmd, bkpr->db, info->identifier);
chain_events = get_chain_events_by_id(cmd, bkpr, cmd, info->identifier);
channel_events = get_channel_events_by_id(cmd, bkpr, cmd, info->identifier);
db_commit_transaction(bkpr->db);
res = jsonrpc_stream_success(cmd);
@@ -768,7 +768,7 @@ static struct command_result *do_list_balances(struct command *cmd,
struct amount_msat credit, debit, balance;
bool has_events;
has_events = account_get_credit_debit(cmd->plugin, bkpr->db,
has_events = account_get_credit_debit(bkpr, cmd,
accts[i]->name,
&credit, &debit);
if (!amount_msat_sub(&balance, credit, debit)) {
@@ -845,7 +845,7 @@ static void try_update_open_fees(struct command *cmd,
struct bkpr *bkpr = bkpr_of(cmd->plugin);
assert(acct->closed_event_db_id);
ev = find_chain_event_by_id(cmd, bkpr, *acct->closed_event_db_id);
ev = find_chain_event_by_id(cmd, bkpr, cmd, *acct->closed_event_db_id);
assert(ev);
err = maybe_update_onchain_fees(cmd, cmd, bkpr, ev->spending_txid);
@@ -1163,7 +1163,7 @@ static char *do_account_close_checks(struct command *cmd,
} else if (!is_channel_account(acct->name) && !e->spending_txid) {
const char *acctname;
acctname = find_close_account_name(tmpctx, bkpr->db, &e->outpoint.txid);
acctname = find_close_account_name(tmpctx, bkpr, cmd, &e->outpoint.txid);
if (acctname) {
closed_acct = find_account(bkpr, acctname);
} else {
@@ -1175,7 +1175,7 @@ static char *do_account_close_checks(struct command *cmd,
if (closed_acct && closed_acct->closed_event_db_id) {
u64 closeheight = account_onchain_closeheight(bkpr, closed_acct);
u64 closeheight = account_onchain_closeheight(bkpr, cmd, closed_acct);
if (closeheight != 0) {
char *err;
account_update_closeheight(cmd, closed_acct, closeheight);
@@ -1398,7 +1398,7 @@ listpeerchannels_done(struct command *cmd,
info->acct,
info->ev->timestamp)) {
db_begin_transaction(bkpr->db);
account_get_credit_debit(cmd->plugin, bkpr->db, info->acct->name,
account_get_credit_debit(bkpr, cmd, info->acct->name,
&credit, &debit);
db_commit_transaction(bkpr->db);

View File

@@ -97,6 +97,7 @@ static struct account *search_account(struct account **accts, const char *acctna
}
static void fillin_apy_acct_details(const struct bkpr *bkpr,
struct command *cmd,
const struct account *acct,
u32 current_blockheight,
struct channel_apy *apy)
@@ -107,7 +108,7 @@ static void fillin_apy_acct_details(const struct bkpr *bkpr,
apy->acct_name = tal_strdup(apy, acct->name);
assert(acct->open_event_db_id);
ev = find_chain_event_by_id(tmpctx, bkpr, *acct->open_event_db_id);
ev = find_chain_event_by_id(tmpctx, bkpr, cmd, *acct->open_event_db_id);
assert(ev);
apy->start_blockheight = ev->blockheight;
@@ -116,7 +117,7 @@ static void fillin_apy_acct_details(const struct bkpr *bkpr,
/* if this account is closed, add closing blockheight */
if (acct->closed_event_db_id) {
ev = find_chain_event_by_id(acct, bkpr,
ev = find_chain_event_by_id(acct, bkpr, cmd,
*acct->closed_event_db_id);
assert(ev);
apy->end_blockheight = ev->blockheight;
@@ -140,7 +141,8 @@ static void fillin_apy_acct_details(const struct bkpr *bkpr,
}
struct channel_apy **compute_channel_apys(const tal_t *ctx,
struct bkpr *bkpr,
const struct bkpr *bkpr,
struct command *cmd,
u64 start_time,
u64 end_time,
u32 current_blockheight)
@@ -149,7 +151,7 @@ struct channel_apy **compute_channel_apys(const tal_t *ctx,
struct channel_apy *apy, **apys;
struct account *acct, **accts;
evs = list_channel_events_timebox(ctx, bkpr->db, start_time, end_time);
evs = list_channel_events_timebox(ctx, bkpr, cmd, start_time, end_time);
accts = list_accounts(ctx, bkpr);
apys = tal_arr(ctx, struct channel_apy *, 0);
@@ -167,7 +169,7 @@ struct channel_apy **compute_channel_apys(const tal_t *ctx,
if (!acct || !streq(acct->name, ev->acct_name)) {
if (acct && is_channel_account(acct->name)) {
fillin_apy_acct_details(bkpr, acct,
fillin_apy_acct_details(bkpr, cmd, acct,
current_blockheight,
apy);
/* Save current apy, make new */
@@ -225,7 +227,7 @@ struct channel_apy **compute_channel_apys(const tal_t *ctx,
}
if (acct && is_channel_account(acct->name)) {
fillin_apy_acct_details(bkpr, acct,
fillin_apy_acct_details(bkpr, cmd, acct,
current_blockheight,
apy);
/* Save current apy, make new */

View File

@@ -34,7 +34,8 @@ WARN_UNUSED_RESULT bool channel_apy_sum(struct channel_apy *sum_apy,
const struct channel_apy *entry);
struct channel_apy **compute_channel_apys(const tal_t *ctx,
struct bkpr *bkpr,
const struct bkpr *bkpr,
struct command *cmd,
u64 start_time,
u64 end_time,
u32 current_blockheight);

View File

@@ -1,14 +1,12 @@
#include "config.h"
#include <assert.h>
#include <bitcoin/chainparams.h>
#include <ccan/array_size/array_size.h>
#include <ccan/json_escape/json_escape.h>
#include <ccan/tal/str/str.h>
#include <common/coin_mvt.h>
#include <common/json_parse_simple.h>
#include <common/json_stream.h>
#include <db/bindings.h>
#include <db/common.h>
#include <db/exec.h>
#include <db/utils.h>
#include <inttypes.h>
#include <plugins/bkpr/account.h>
#include <plugins/bkpr/account_entry.h>
@@ -20,6 +18,7 @@
#include <plugins/bkpr/onchain_fee.h>
#include <plugins/bkpr/rebalances.h>
#include <plugins/bkpr/recorder.h>
#include <plugins/bkpr/sql.h>
#include <time.h>
#define ONCHAIN_FEE "onchain_fee"
@@ -111,6 +110,7 @@ static char *csv_safe_str(const tal_t *ctx, const char *input TAKES)
static struct income_event *maybe_chain_income(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
struct account *acct,
struct chain_event *ev)
{
@@ -141,7 +141,8 @@ static struct income_event *maybe_chain_income(const tal_t *ctx,
/* income */
if (streq(ev->tag, "deposit")) {
struct db_stmt *stmt;
const jsmntok_t *toks;
const char *buf;
/* deposit to external is cost to us */
if (is_external_account(ev->acct_name)) {
@@ -168,16 +169,16 @@ static struct income_event *maybe_chain_income(const tal_t *ctx,
* into a tx that included funds from a 3rd party
* coming to us... eg. a splice out from the peer
* to our onchain wallet */
stmt = db_prepare_v2(bkpr->db, SQL("SELECT"
" 1"
" FROM chain_events e"
" WHERE "
" e.spending_txid = ?"));
db_bind_txid(stmt, &ev->outpoint.txid);
db_query_prepared(stmt);
if (!db_step(stmt)) {
tal_free(stmt);
toks = sql_req(tmpctx, cmd, &buf,
"SELECT"
" 1"
" FROM chainmoves"
" WHERE "
" spending_txid = X'%s'"
" AND created_index <= %"PRIu64,
fmt_bitcoin_txid(tmpctx, &ev->outpoint.txid),
bkpr->chainmoves_index);
if (json_get_member(buf, toks, "rows")->size == 0) {
/* no matching withdrawal from internal,
* so must be new deposit (external) */
return chain_to_income(ctx, bkpr, ev,
@@ -185,9 +186,6 @@ static struct income_event *maybe_chain_income(const tal_t *ctx,
ev->credit,
ev->debit);
}
db_col_ignore(stmt, "1");
tal_free(stmt);
return NULL;
}
@@ -306,6 +304,7 @@ static struct onchain_fee **find_consolidated_fees(const tal_t *ctx,
struct income_event **list_income_events(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
u64 start_time,
u64 end_time,
bool consolidate_fees)
@@ -315,9 +314,9 @@ struct income_event **list_income_events(const tal_t *ctx,
struct onchain_fee **onchain_fees;
struct income_event **evs;
channel_events = list_channel_events_timebox(ctx, bkpr->db,
channel_events = list_channel_events_timebox(ctx, bkpr, cmd,
start_time, end_time);
chain_events = list_chain_events_timebox(ctx, bkpr, start_time, end_time);
chain_events = list_chain_events_timebox(ctx, bkpr, cmd, start_time, end_time);
if (consolidate_fees) {
onchain_fees = find_consolidated_fees(ctx, bkpr,
@@ -369,7 +368,7 @@ struct income_event **list_income_events(const tal_t *ctx,
struct account *acct =
find_account(bkpr, chain->acct_name);
ev = maybe_chain_income(evs, bkpr, acct, chain);
ev = maybe_chain_income(evs, bkpr, cmd, acct, chain);
if (ev)
tal_arr_expand(&evs, ev);
i++;
@@ -407,9 +406,10 @@ struct income_event **list_income_events(const tal_t *ctx,
struct income_event **list_income_events_all(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
bool consolidate_fees)
{
return list_income_events(ctx, bkpr, 0, SQLITE_MAX_UINT,
return list_income_events(ctx, bkpr, cmd, 0, SQLITE_MAX_UINT,
consolidate_fees);
}

View File

@@ -31,12 +31,14 @@ struct csv_fmt {
/* List all the events that are income related (gain/loss) */
struct income_event **list_income_events_all(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
bool consolidate_fees);
/* List all the events that are income related (gain/loss),
* by a start and end date */
struct income_event **list_income_events(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
u64 start_time,
u64 end_time,
bool consolidate_fees);

View File

@@ -397,9 +397,9 @@ char *update_channel_onchain_fees(const tal_t *ctx,
struct amount_msat onchain_amt;
assert(acct->onchain_resolved_block);
close_ev = find_chain_event_by_id(ctx, bkpr,
close_ev = find_chain_event_by_id(ctx, bkpr, cmd,
*acct->closed_event_db_id);
events = find_chain_events_bytxid(ctx, bkpr,
events = find_chain_events_bytxid(ctx, bkpr, cmd,
close_ev->spending_txid);
/* Starting balance is close-ev's debit amount */
@@ -453,6 +453,7 @@ char *update_channel_onchain_fees(const tal_t *ctx,
static char *is_closed_channel_txid(const tal_t *ctx,
struct bkpr *bkpr,
struct command *cmd,
struct chain_event *ev,
struct bitcoin_txid *txid,
bool *is_channel_close_tx)
@@ -475,7 +476,7 @@ static char *is_closed_channel_txid(const tal_t *ctx,
/* is the closed utxo the same as the one
* we're trying to find fees for now */
closed = find_chain_event_by_id(inner_ctx, bkpr,
closed = find_chain_event_by_id(inner_ctx, bkpr, cmd,
*acct->closed_event_db_id);
if (!closed) {
*is_channel_close_tx = false;
@@ -516,7 +517,7 @@ char *maybe_update_onchain_fees(const tal_t *ctx,
u8 *inner_ctx = tal(NULL, u8);
/* Find all the deposits/withdrawals for this txid */
events = find_chain_events_bytxid(inner_ctx, bkpr, txid);
events = find_chain_events_bytxid(inner_ctx, bkpr, cmd, txid);
/* If we don't even have two events, skip */
if (tal_count(events) < 2)
@@ -524,7 +525,7 @@ char *maybe_update_onchain_fees(const tal_t *ctx,
for (size_t i = 0; i < tal_count(events); i++) {
bool is_channel_close_tx;
err = is_closed_channel_txid(ctx, bkpr,
err = is_closed_channel_txid(ctx, bkpr, cmd,
events[i], txid,
&is_channel_close_tx);

File diff suppressed because it is too large Load Diff

View File

@@ -33,16 +33,20 @@ struct txo_set {
/* Get all channel events for this account */
struct channel_event **account_get_channel_events(const tal_t *ctx,
struct db *db,
const struct bkpr *bkpr,
struct command *cmd,
struct account *acct);
/* Get all channel events for a payment id, order by timestamp */
struct channel_event **get_channel_events_by_id(const tal_t *ctx,
struct db *db,
struct sha256 *id);
const struct bkpr *bkpr,
struct command *cmd,
const struct sha256 *id);
/* Get all channel events, ordered by timestamp */
struct channel_event **list_channel_events(const tal_t *ctx, struct db *db);
struct channel_event **list_channel_events(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd);
/* Get all channel events, order by timestamp.
*
@@ -52,50 +56,56 @@ struct channel_event **list_channel_events(const tal_t *ctx, struct db *db);
* @end_time - UNIX timestamp to query until (inclusive)
*/
struct channel_event **list_channel_events_timebox(const tal_t *ctx,
struct db *db,
const struct bkpr *bkpr,
struct command *cmd,
u64 start_time,
u64 end_time);
/* Get all chain events for this account */
struct chain_event **account_get_chain_events(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
struct account *acct);
/* Get all chain events for a transaction id, order by timestamp */
struct chain_event **find_chain_events_bytxid(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
const struct bitcoin_txid *txid);
/* Get all chain events, order by timestamp. */
struct chain_event **list_chain_events(const tal_t *ctx, const struct bkpr *bkpr);
struct chain_event **list_chain_events(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd);
/* Get all chain events, order by timestamp.
*
* @ctx - context to allocate from
* @db - database to query
* @start_time - UNIX timestamp to query after (exclusive)
* @end_time - UNIX timestamp to query until (inclusive)
*/
struct chain_event **list_chain_events_timebox(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
u64 start_time,
u64 end_time);
/* Get all chain events for a payment hash */
struct chain_event **get_chain_events_by_id(const tal_t *ctx,
const struct bkpr *bkpr,
const struct sha256 *id);
const struct bkpr *bkpr,
struct command *cmd,
const struct sha256 *id);
/* Get all chain events for a utxo */
struct chain_event **get_chain_events_by_outpoint(const tal_t *ctx,
const struct bkpr *bkpr,
const struct bitcoin_outpoint *outpoint,
bool credits_only);
struct command *cmd,
const struct bitcoin_outpoint *outpoint);
/* Get total credits and debits for this account: returns false if no entries at all
* (in which case, credit and debit will both be AMOUNT_MSAT(0)). */
bool account_get_credit_debit(struct plugin *plugin,
struct db *db,
bool account_get_credit_debit(const struct bkpr *bkpr,
struct command *cmd,
const char *acct_name,
struct amount_msat *credit,
struct amount_msat *debit);
@@ -104,6 +114,7 @@ bool account_get_credit_debit(struct plugin *plugin,
/* Find a chain event by its database id */
struct chain_event *find_chain_event_by_id(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
u64 event_db_id);
/* Find the utxos for this account.
@@ -113,13 +124,15 @@ struct chain_event *find_chain_event_by_id(const tal_t *ctx,
*/
bool find_txo_chain(const tal_t *ctx,
const struct bkpr *bkpr,
struct command *cmd,
const struct account *acct,
struct txo_set ***sets);
/* Find the account that was closed by this txid.
* Returns NULL if none */
const char *find_close_account_name(const tal_t *ctx,
struct db *db,
const struct bkpr *bkpr,
struct command *cmd,
const struct bitcoin_txid *txid);
/* Have all the outputs for this account's close tx
@@ -127,7 +140,9 @@ const char *find_close_account_name(const tal_t *ctx,
* highest blockheight that has a resolving tx in it.
*
* The point of this is to allow us to prune data, eventually */
u64 account_onchain_closeheight(const struct bkpr *bkpr, const struct account *acct);
u64 account_onchain_closeheight(const struct bkpr *bkpr,
struct command *cmd,
const struct account *acct);
/* When we make external deposits from the wallet, we don't
* count them until any output that was spent *into* them is