Seems like sleep(1) isn't always enough. Give in and put a log
message there, and use that:
```
waitfut = executor.submit(l2.rpc.wait, subsystem='forwards', indexname='deleted', nextvalue=1)
time.sleep(1)
l2.rpc.delforward(scid12, 1, 'failed')
waitres = waitfut.result(TIMEOUT)
> assert waitres == {'subsystem': 'forwards',
'deleted': 1,
'forwards': {'in_channel': scid12,
'in_htlc_id': 1,
'status': 'failed'}}
E AssertionError: assert {'subsystem': 'forwards', 'deleted': 1} == {'subsystem': 'forwards', 'deleted': 1, 'forwards': {'in_channel': '103x2x0', 'in_htlc_id': 1, 'status': 'failed'}}
E
E Common items:
E {'deleted': 1, 'subsystem': 'forwards'}
E Right contains 1 more item:
E {'forwards': {'in_channel': '103x2x0', 'in_htlc_id': 1, 'status': 'failed'}}
E
E Full diff:
E {
E 'deleted': 1,
E - 'forwards': {
E - 'in_channel': '103x2x0',
E - 'in_htlc_id': 1,
E - 'status': 'failed',
E - },
E 'subsystem': 'forwards',
E }
tests/test_misc.py:3599: AssertionError
```
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
293 lines
7.0 KiB
C
293 lines
7.0 KiB
C
/* Code to be notified when various standardized events happen. */
|
|
#include "config.h"
|
|
#include <ccan/array_size/array_size.h>
|
|
#include <ccan/tal/str/str.h>
|
|
#include <common/json_command.h>
|
|
#include <common/overflows.h>
|
|
#include <db/exec.h>
|
|
#include <inttypes.h>
|
|
#include <lightningd/jsonrpc.h>
|
|
#include <lightningd/lightningd.h>
|
|
#include <lightningd/wait.h>
|
|
|
|
struct waiter {
|
|
struct list_node list;
|
|
struct command *cmd;
|
|
/* These are pointers because of how param_ works */
|
|
enum wait_subsystem *subsystem;
|
|
enum wait_index *index;
|
|
u64 *nextval;
|
|
};
|
|
|
|
|
|
static const char *subsystem_names[] = {
|
|
"forwards",
|
|
"sendpays",
|
|
"invoices",
|
|
"htlcs",
|
|
"chainmoves",
|
|
"channelmoves",
|
|
"networkevents",
|
|
};
|
|
|
|
static const char *index_names[] = {
|
|
"created",
|
|
"updated",
|
|
"deleted",
|
|
};
|
|
|
|
/* This is part of the API, so no changing! */
|
|
const char *wait_index_name(enum wait_index index)
|
|
{
|
|
switch (index) {
|
|
case WAIT_INDEX_CREATED:
|
|
case WAIT_INDEX_UPDATED:
|
|
case WAIT_INDEX_DELETED:
|
|
return index_names[index];
|
|
}
|
|
abort();
|
|
}
|
|
|
|
const char *wait_subsystem_name(enum wait_subsystem subsystem)
|
|
{
|
|
switch (subsystem) {
|
|
case WAIT_SUBSYSTEM_FORWARD:
|
|
case WAIT_SUBSYSTEM_SENDPAY:
|
|
case WAIT_SUBSYSTEM_INVOICE:
|
|
case WAIT_SUBSYSTEM_HTLCS:
|
|
case WAIT_SUBSYSTEM_CHAINMOVES:
|
|
case WAIT_SUBSYSTEM_CHANNELMOVES:
|
|
case WAIT_SUBSYSTEM_NETWORKEVENTS:
|
|
return subsystem_names[subsystem];
|
|
}
|
|
abort();
|
|
}
|
|
|
|
static u64 *wait_index_ptr(struct lightningd *ld,
|
|
enum wait_subsystem subsystem,
|
|
enum wait_index index)
|
|
{
|
|
struct indexes *indexes;
|
|
|
|
assert(subsystem < ARRAY_SIZE(ld->indexes));
|
|
indexes = &ld->indexes[subsystem];
|
|
|
|
assert(index < ARRAY_SIZE(indexes->i));
|
|
|
|
return &indexes->i[index];
|
|
}
|
|
|
|
static void json_add_index(struct command *cmd,
|
|
struct json_stream *response,
|
|
enum wait_subsystem subsystem,
|
|
enum wait_index index,
|
|
u64 val,
|
|
va_list *ap)
|
|
{
|
|
const char *name, *value;
|
|
va_list ap2;
|
|
json_add_string(response, "subsystem", wait_subsystem_name(subsystem));
|
|
json_add_u64(response, wait_index_name(index), val);
|
|
|
|
if (!ap)
|
|
return;
|
|
|
|
va_copy(ap2, *ap);
|
|
/* "htlcs" etc never had details field: they came after! */
|
|
if (subsystem < WAIT_SUBSYSTEM_HTLCS
|
|
&& command_deprecated_out_ok(cmd, "details", "v25.05", "v26.06")) {
|
|
json_object_start(response, "details");
|
|
while ((name = va_arg(*ap, const char *)) != NULL) {
|
|
value = va_arg(*ap, const char *);
|
|
if (!value)
|
|
continue;
|
|
|
|
/* This is a hack! */
|
|
if (name[0] == '=') {
|
|
/* Copy in literallty! */
|
|
json_add_jsonstr(response, name + 1, value, strlen(value));
|
|
} else {
|
|
json_add_string(response, name, value);
|
|
}
|
|
}
|
|
json_object_end(response);
|
|
}
|
|
|
|
json_object_start(response, wait_subsystem_name(subsystem));
|
|
while ((name = va_arg(ap2, const char *)) != NULL) {
|
|
value = va_arg(ap2, const char *);
|
|
if (!value)
|
|
continue;
|
|
|
|
/* This is a hack! Means "copy in literally". */
|
|
if (name[0] == '=') {
|
|
/* This is also a hack! Means "current value"*/
|
|
if (streq(value, ""))
|
|
json_add_u64(response, name + 1, val);
|
|
else
|
|
json_add_jsonstr(response, name + 1, value, strlen(value));
|
|
} else {
|
|
json_add_string(response, name, value);
|
|
}
|
|
}
|
|
json_object_end(response);
|
|
}
|
|
|
|
static u64 wait_index_bump(struct lightningd *ld,
|
|
struct db *db,
|
|
enum wait_subsystem subsystem,
|
|
enum wait_index index,
|
|
u64 num,
|
|
va_list ap_master)
|
|
{
|
|
struct waiter *i, *n;
|
|
u64 *idxval = wait_index_ptr(ld, subsystem, index);
|
|
|
|
assert(!add_overflows_u64(*idxval, num));
|
|
(*idxval) += num;
|
|
|
|
/* FIXME: We can optimize this! It's always the max of the fields in
|
|
* the table, *unless* we delete one. So we can lazily write this on
|
|
* delete, and fix it up to MAX() when we startup. */
|
|
db_set_intvar(db,
|
|
tal_fmt(tmpctx, "last_%s_%s_index",
|
|
wait_subsystem_name(subsystem),
|
|
wait_index_name(index)),
|
|
*idxval);
|
|
|
|
list_for_each_safe(&ld->wait_commands, i, n, list) {
|
|
struct json_stream *response;
|
|
va_list ap;
|
|
|
|
if (*i->subsystem != subsystem)
|
|
continue;
|
|
if (*i->index != index)
|
|
continue;
|
|
if (*idxval < *i->nextval)
|
|
continue;
|
|
|
|
response = json_stream_success(i->cmd);
|
|
va_copy(ap, ap_master);
|
|
json_add_index(i->cmd, response, subsystem, index, *idxval, &ap);
|
|
va_end(ap);
|
|
/* Delete before freeing */
|
|
list_del_from(&ld->wait_commands, &i->list);
|
|
was_pending(command_success(i->cmd, response));
|
|
}
|
|
|
|
return *idxval;
|
|
}
|
|
|
|
u64 wait_index_increment(struct lightningd *ld,
|
|
struct db *db,
|
|
enum wait_subsystem subsystem,
|
|
enum wait_index index,
|
|
...)
|
|
{
|
|
va_list ap;
|
|
u64 ret;
|
|
|
|
va_start(ap, index);
|
|
ret = wait_index_bump(ld, db, subsystem, index, 1, ap);
|
|
va_end(ap);
|
|
|
|
return ret;
|
|
}
|
|
|
|
void wait_index_increase(struct lightningd *ld,
|
|
struct db *db,
|
|
enum wait_subsystem subsystem,
|
|
enum wait_index index,
|
|
u64 num,
|
|
...)
|
|
{
|
|
va_list ap;
|
|
|
|
if (num == 0)
|
|
return;
|
|
|
|
va_start(ap, num);
|
|
wait_index_bump(ld, db, subsystem, index, num, ap);
|
|
va_end(ap);
|
|
}
|
|
|
|
static struct command_result *param_subsystem(struct command *cmd,
|
|
const char *name,
|
|
const char *buffer,
|
|
const jsmntok_t *tok,
|
|
enum wait_subsystem **subsystem)
|
|
{
|
|
for (size_t i = 0; i < ARRAY_SIZE(subsystem_names); i++) {
|
|
if (json_tok_streq(buffer, tok, subsystem_names[i])) {
|
|
*subsystem = tal(cmd, enum wait_subsystem);
|
|
**subsystem = i;
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return command_fail_badparam(cmd, name, buffer, tok,
|
|
"unknown subsystem");
|
|
}
|
|
|
|
struct command_result *param_index(struct command *cmd,
|
|
const char *name,
|
|
const char *buffer,
|
|
const jsmntok_t *tok,
|
|
enum wait_index **index)
|
|
{
|
|
for (size_t i = 0; i < ARRAY_SIZE(index_names); i++) {
|
|
if (json_tok_streq(buffer, tok, index_names[i])) {
|
|
*index = tal(cmd, enum wait_index);
|
|
**index = i;
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return command_fail_badparam(cmd, name, buffer, tok,
|
|
"unknown index");
|
|
}
|
|
|
|
static struct command_result *json_wait(struct command *cmd,
|
|
const char *buffer,
|
|
const jsmntok_t *obj UNNEEDED,
|
|
const jsmntok_t *params)
|
|
{
|
|
struct waiter *waiter = tal(cmd, struct waiter);
|
|
u64 val;
|
|
|
|
if (!param(cmd, buffer, params,
|
|
p_req("subsystem", param_subsystem,
|
|
&waiter->subsystem),
|
|
p_req("indexname", param_index, &waiter->index),
|
|
p_req("nextvalue", param_u64, &waiter->nextval),
|
|
NULL))
|
|
return command_param_failed();
|
|
|
|
/* Are we there already? Return immediately. */
|
|
val = *wait_index_ptr(cmd->ld, *waiter->subsystem, *waiter->index);
|
|
if (val >= *waiter->nextval) {
|
|
struct json_stream *response;
|
|
|
|
response = json_stream_success(cmd);
|
|
json_add_index(cmd, response,
|
|
*waiter->subsystem,
|
|
*waiter->index,
|
|
val, NULL);
|
|
return command_success(cmd, response);
|
|
}
|
|
|
|
waiter->cmd = cmd;
|
|
list_add_tail(&cmd->ld->wait_commands, &waiter->list);
|
|
log_trace(cmd->ld->log, "waiting on %s %s %"PRIu64,
|
|
wait_subsystem_name(*waiter->subsystem),
|
|
wait_index_name(*waiter->index),
|
|
*waiter->nextval);
|
|
return command_still_pending(cmd);
|
|
}
|
|
|
|
static const struct json_command wait_command = {
|
|
"wait",
|
|
json_wait,
|
|
};
|
|
AUTODATA(json_command, &wait_command);
|