Files
palladum-lightning/lightningd/wait.c
Rusty Russell 52d69df10f lightningd: migrate events from bookkeeper at startup.
We take over the --bookkeeper-dir and --bookkeeper-db options, and
then if we can find the bookkeeper db we extract the records to
initialize our chain_moves and channel_moves tables.

Of course, bookkeeper now needs to not register those options.

When bookkeeper gets invoked the first time, it will reconstruct
everything from listchannelmoves and listcoinmoves.  It cannot
preserve manually-added descriptions, so we put those in the datastore
for it ready to go.

Note that the order of onchain_fee changes slightly from the original.
But this is fine.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
2025-08-19 13:37:50 +09:30

282 lines
6.6 KiB
C

/* Code to be notified when various standardized events happen. */
#include "config.h"
#include <ccan/array_size/array_size.h>
#include <ccan/tal/str/str.h>
#include <common/json_command.h>
#include <common/overflows.h>
#include <db/exec.h>
#include <lightningd/jsonrpc.h>
#include <lightningd/lightningd.h>
#include <lightningd/wait.h>
struct waiter {
struct list_node list;
struct command *cmd;
/* These are pointers because of how param_ works */
enum wait_subsystem *subsystem;
enum wait_index *index;
u64 *nextval;
};
static const char *subsystem_names[] = {
"forwards",
"sendpays",
"invoices",
"htlcs",
"chainmoves",
"channelmoves",
};
static const char *index_names[] = {
"created",
"updated",
"deleted",
};
/* This is part of the API, so no changing! */
const char *wait_index_name(enum wait_index index)
{
switch (index) {
case WAIT_INDEX_CREATED:
case WAIT_INDEX_UPDATED:
case WAIT_INDEX_DELETED:
return index_names[index];
}
abort();
}
const char *wait_subsystem_name(enum wait_subsystem subsystem)
{
switch (subsystem) {
case WAIT_SUBSYSTEM_FORWARD:
case WAIT_SUBSYSTEM_SENDPAY:
case WAIT_SUBSYSTEM_INVOICE:
case WAIT_SUBSYSTEM_HTLCS:
case WAIT_SUBSYSTEM_CHAINMOVES:
case WAIT_SUBSYSTEM_CHANNELMOVES:
return subsystem_names[subsystem];
}
abort();
}
static u64 *wait_index_ptr(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index)
{
struct indexes *indexes;
assert(subsystem < ARRAY_SIZE(ld->indexes));
indexes = &ld->indexes[subsystem];
assert(index < ARRAY_SIZE(indexes->i));
return &indexes->i[index];
}
static void json_add_index(struct command *cmd,
struct json_stream *response,
enum wait_subsystem subsystem,
enum wait_index index,
u64 val,
va_list *ap)
{
const char *name, *value;
va_list ap2;
json_add_string(response, "subsystem", wait_subsystem_name(subsystem));
json_add_u64(response, wait_index_name(index), val);
if (!ap)
return;
va_copy(ap2, *ap);
/* "htlcs" never had details field: it came after! */
if (subsystem != WAIT_SUBSYSTEM_HTLCS
&& command_deprecated_out_ok(cmd, "details", "v25.05", "v26.06")) {
json_object_start(response, "details");
while ((name = va_arg(*ap, const char *)) != NULL) {
value = va_arg(*ap, const char *);
if (!value)
continue;
/* This is a hack! */
if (name[0] == '=') {
/* Copy in literallty! */
json_add_jsonstr(response, name + 1, value, strlen(value));
} else {
json_add_string(response, name, value);
}
}
json_object_end(response);
}
json_object_start(response, wait_subsystem_name(subsystem));
while ((name = va_arg(ap2, const char *)) != NULL) {
value = va_arg(ap2, const char *);
if (!value)
continue;
/* This is a hack! */
if (name[0] == '=') {
/* Copy in literally! */
json_add_jsonstr(response, name + 1, value, strlen(value));
} else {
json_add_string(response, name, value);
}
}
json_object_end(response);
}
static u64 wait_index_bump(struct lightningd *ld,
struct db *db,
enum wait_subsystem subsystem,
enum wait_index index,
u64 num,
va_list ap_master)
{
struct waiter *i, *n;
u64 *idxval = wait_index_ptr(ld, subsystem, index);
assert(!add_overflows_u64(*idxval, num));
(*idxval) += num;
/* FIXME: We can optimize this! It's always the max of the fields in
* the table, *unless* we delete one. So we can lazily write this on
* delete, and fix it up to MAX() when we startup. */
db_set_intvar(db,
tal_fmt(tmpctx, "last_%s_%s_index",
wait_subsystem_name(subsystem),
wait_index_name(index)),
*idxval);
list_for_each_safe(&ld->wait_commands, i, n, list) {
struct json_stream *response;
va_list ap;
if (*i->subsystem != subsystem)
continue;
if (*i->index != index)
continue;
if (*idxval < *i->nextval)
continue;
response = json_stream_success(i->cmd);
va_copy(ap, ap_master);
json_add_index(i->cmd, response, subsystem, index, *idxval, &ap);
va_end(ap);
/* Delete before freeing */
list_del_from(&ld->wait_commands, &i->list);
was_pending(command_success(i->cmd, response));
}
return *idxval;
}
u64 wait_index_increment(struct lightningd *ld,
struct db *db,
enum wait_subsystem subsystem,
enum wait_index index,
...)
{
va_list ap;
u64 ret;
va_start(ap, index);
ret = wait_index_bump(ld, db, subsystem, index, 1, ap);
va_end(ap);
return ret;
}
void wait_index_increase(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index,
u64 num,
...)
{
va_list ap;
if (num == 0)
return;
va_start(ap, num);
wait_index_bump(ld, ld->wallet->db, subsystem, index, num, ap);
va_end(ap);
}
static struct command_result *param_subsystem(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_subsystem **subsystem)
{
for (size_t i = 0; i < ARRAY_SIZE(subsystem_names); i++) {
if (json_tok_streq(buffer, tok, subsystem_names[i])) {
*subsystem = tal(cmd, enum wait_subsystem);
**subsystem = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown subsystem");
}
struct command_result *param_index(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_index **index)
{
for (size_t i = 0; i < ARRAY_SIZE(index_names); i++) {
if (json_tok_streq(buffer, tok, index_names[i])) {
*index = tal(cmd, enum wait_index);
**index = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown index");
}
static struct command_result *json_wait(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
{
struct waiter *waiter = tal(cmd, struct waiter);
u64 val;
if (!param(cmd, buffer, params,
p_req("subsystem", param_subsystem,
&waiter->subsystem),
p_req("indexname", param_index, &waiter->index),
p_req("nextvalue", param_u64, &waiter->nextval),
NULL))
return command_param_failed();
/* Are we there already? Return immediately. */
val = *wait_index_ptr(cmd->ld, *waiter->subsystem, *waiter->index);
if (val >= *waiter->nextval) {
struct json_stream *response;
response = json_stream_success(cmd);
json_add_index(cmd, response,
*waiter->subsystem,
*waiter->index,
val, NULL);
return command_success(cmd, response);
}
waiter->cmd = cmd;
list_add_tail(&cmd->ld->wait_commands, &waiter->list);
return command_still_pending(cmd);
}
static const struct json_command wait_command = {
"wait",
json_wait,
};
AUTODATA(json_command, &wait_command);