Files
Rusty Russell 3be25d48d3 pytest: don't rely on sleep to ensure wait commands have been submitted.
Seems like sleep(1) isn't always enough.  Give in and put a log
message there, and use that:

```
	waitfut = executor.submit(l2.rpc.wait, subsystem='forwards', indexname='deleted', nextvalue=1)
        time.sleep(1)
    
        l2.rpc.delforward(scid12, 1, 'failed')
    
        waitres = waitfut.result(TIMEOUT)
>       assert waitres == {'subsystem': 'forwards',
                           'deleted': 1,
                           'forwards': {'in_channel': scid12,
                                        'in_htlc_id': 1,
                                        'status': 'failed'}}
E       AssertionError: assert {'subsystem': 'forwards', 'deleted': 1} == {'subsystem': 'forwards', 'deleted': 1, 'forwards': {'in_channel': '103x2x0', 'in_htlc_id': 1, 'status': 'failed'}}
E         
E         Common items:
E         {'deleted': 1, 'subsystem': 'forwards'}
E         Right contains 1 more item:
E         {'forwards': {'in_channel': '103x2x0', 'in_htlc_id': 1, 'status': 'failed'}}
E         
E         Full diff:
E           {
E               'deleted': 1,
E         -     'forwards': {
E         -         'in_channel': '103x2x0',
E         -         'in_htlc_id': 1,
E         -         'status': 'failed',
E         -     },
E               'subsystem': 'forwards',
E           }

tests/test_misc.py:3599: AssertionError
```

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
2026-01-14 15:41:45 +10:30

293 lines
7.0 KiB
C

/* Code to be notified when various standardized events happen. */
#include "config.h"
#include <ccan/array_size/array_size.h>
#include <ccan/tal/str/str.h>
#include <common/json_command.h>
#include <common/overflows.h>
#include <db/exec.h>
#include <inttypes.h>
#include <lightningd/jsonrpc.h>
#include <lightningd/lightningd.h>
#include <lightningd/wait.h>
struct waiter {
struct list_node list;
struct command *cmd;
/* These are pointers because of how param_ works */
enum wait_subsystem *subsystem;
enum wait_index *index;
u64 *nextval;
};
static const char *subsystem_names[] = {
"forwards",
"sendpays",
"invoices",
"htlcs",
"chainmoves",
"channelmoves",
"networkevents",
};
static const char *index_names[] = {
"created",
"updated",
"deleted",
};
/* This is part of the API, so no changing! */
const char *wait_index_name(enum wait_index index)
{
switch (index) {
case WAIT_INDEX_CREATED:
case WAIT_INDEX_UPDATED:
case WAIT_INDEX_DELETED:
return index_names[index];
}
abort();
}
const char *wait_subsystem_name(enum wait_subsystem subsystem)
{
switch (subsystem) {
case WAIT_SUBSYSTEM_FORWARD:
case WAIT_SUBSYSTEM_SENDPAY:
case WAIT_SUBSYSTEM_INVOICE:
case WAIT_SUBSYSTEM_HTLCS:
case WAIT_SUBSYSTEM_CHAINMOVES:
case WAIT_SUBSYSTEM_CHANNELMOVES:
case WAIT_SUBSYSTEM_NETWORKEVENTS:
return subsystem_names[subsystem];
}
abort();
}
static u64 *wait_index_ptr(struct lightningd *ld,
enum wait_subsystem subsystem,
enum wait_index index)
{
struct indexes *indexes;
assert(subsystem < ARRAY_SIZE(ld->indexes));
indexes = &ld->indexes[subsystem];
assert(index < ARRAY_SIZE(indexes->i));
return &indexes->i[index];
}
static void json_add_index(struct command *cmd,
struct json_stream *response,
enum wait_subsystem subsystem,
enum wait_index index,
u64 val,
va_list *ap)
{
const char *name, *value;
va_list ap2;
json_add_string(response, "subsystem", wait_subsystem_name(subsystem));
json_add_u64(response, wait_index_name(index), val);
if (!ap)
return;
va_copy(ap2, *ap);
/* "htlcs" etc never had details field: they came after! */
if (subsystem < WAIT_SUBSYSTEM_HTLCS
&& command_deprecated_out_ok(cmd, "details", "v25.05", "v26.06")) {
json_object_start(response, "details");
while ((name = va_arg(*ap, const char *)) != NULL) {
value = va_arg(*ap, const char *);
if (!value)
continue;
/* This is a hack! */
if (name[0] == '=') {
/* Copy in literallty! */
json_add_jsonstr(response, name + 1, value, strlen(value));
} else {
json_add_string(response, name, value);
}
}
json_object_end(response);
}
json_object_start(response, wait_subsystem_name(subsystem));
while ((name = va_arg(ap2, const char *)) != NULL) {
value = va_arg(ap2, const char *);
if (!value)
continue;
/* This is a hack! Means "copy in literally". */
if (name[0] == '=') {
/* This is also a hack! Means "current value"*/
if (streq(value, ""))
json_add_u64(response, name + 1, val);
else
json_add_jsonstr(response, name + 1, value, strlen(value));
} else {
json_add_string(response, name, value);
}
}
json_object_end(response);
}
static u64 wait_index_bump(struct lightningd *ld,
struct db *db,
enum wait_subsystem subsystem,
enum wait_index index,
u64 num,
va_list ap_master)
{
struct waiter *i, *n;
u64 *idxval = wait_index_ptr(ld, subsystem, index);
assert(!add_overflows_u64(*idxval, num));
(*idxval) += num;
/* FIXME: We can optimize this! It's always the max of the fields in
* the table, *unless* we delete one. So we can lazily write this on
* delete, and fix it up to MAX() when we startup. */
db_set_intvar(db,
tal_fmt(tmpctx, "last_%s_%s_index",
wait_subsystem_name(subsystem),
wait_index_name(index)),
*idxval);
list_for_each_safe(&ld->wait_commands, i, n, list) {
struct json_stream *response;
va_list ap;
if (*i->subsystem != subsystem)
continue;
if (*i->index != index)
continue;
if (*idxval < *i->nextval)
continue;
response = json_stream_success(i->cmd);
va_copy(ap, ap_master);
json_add_index(i->cmd, response, subsystem, index, *idxval, &ap);
va_end(ap);
/* Delete before freeing */
list_del_from(&ld->wait_commands, &i->list);
was_pending(command_success(i->cmd, response));
}
return *idxval;
}
u64 wait_index_increment(struct lightningd *ld,
struct db *db,
enum wait_subsystem subsystem,
enum wait_index index,
...)
{
va_list ap;
u64 ret;
va_start(ap, index);
ret = wait_index_bump(ld, db, subsystem, index, 1, ap);
va_end(ap);
return ret;
}
void wait_index_increase(struct lightningd *ld,
struct db *db,
enum wait_subsystem subsystem,
enum wait_index index,
u64 num,
...)
{
va_list ap;
if (num == 0)
return;
va_start(ap, num);
wait_index_bump(ld, db, subsystem, index, num, ap);
va_end(ap);
}
static struct command_result *param_subsystem(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_subsystem **subsystem)
{
for (size_t i = 0; i < ARRAY_SIZE(subsystem_names); i++) {
if (json_tok_streq(buffer, tok, subsystem_names[i])) {
*subsystem = tal(cmd, enum wait_subsystem);
**subsystem = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown subsystem");
}
struct command_result *param_index(struct command *cmd,
const char *name,
const char *buffer,
const jsmntok_t *tok,
enum wait_index **index)
{
for (size_t i = 0; i < ARRAY_SIZE(index_names); i++) {
if (json_tok_streq(buffer, tok, index_names[i])) {
*index = tal(cmd, enum wait_index);
**index = i;
return NULL;
}
}
return command_fail_badparam(cmd, name, buffer, tok,
"unknown index");
}
static struct command_result *json_wait(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
{
struct waiter *waiter = tal(cmd, struct waiter);
u64 val;
if (!param(cmd, buffer, params,
p_req("subsystem", param_subsystem,
&waiter->subsystem),
p_req("indexname", param_index, &waiter->index),
p_req("nextvalue", param_u64, &waiter->nextval),
NULL))
return command_param_failed();
/* Are we there already? Return immediately. */
val = *wait_index_ptr(cmd->ld, *waiter->subsystem, *waiter->index);
if (val >= *waiter->nextval) {
struct json_stream *response;
response = json_stream_success(cmd);
json_add_index(cmd, response,
*waiter->subsystem,
*waiter->index,
val, NULL);
return command_success(cmd, response);
}
waiter->cmd = cmd;
list_add_tail(&cmd->ld->wait_commands, &waiter->list);
log_trace(cmd->ld->log, "waiting on %s %s %"PRIu64,
wait_subsystem_name(*waiter->subsystem),
wait_index_name(*waiter->index),
*waiter->nextval);
return command_still_pending(cmd);
}
static const struct json_command wait_command = {
"wait",
json_wait,
};
AUTODATA(json_command, &wait_command);