askrene: actually run children in parallel.

Changelog-Changed: Plugins: `askrene` now runs routing in parallel.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2026-02-16 17:37:33 +10:30
parent 68b30b1e5d
commit fb4232dff8
2 changed files with 162 additions and 53 deletions

View File

@@ -28,8 +28,29 @@
#include <plugins/askrene/layer.h>
#include <plugins/askrene/reserve.h>
#include <sys/wait.h>
#include <wire/wire_io.h>
#include <wire/wire_sync.h>
struct router_child {
/* Inside askrene->children */
struct list_node list;
struct command *cmd;
struct timemono start;
int pid;
struct io_conn *log_conn;
struct io_conn *reply_conn;
/* A whole msg read in for logging */
u8 *log_msg;
/* How much we've read so far */
char *reply_buf;
size_t reply_bytes;
/* How much we just read (populated by io_read_partial) */
size_t this_reply_len;
};
static bool have_layer(const char **layers, const char *name)
{
for (size_t i = 0; i < tal_count(layers); i++) {
@@ -375,17 +396,119 @@ static const struct layer **apply_layers(const tal_t *ctx,
return layers;
}
static void process_child_logs(struct command *cmd,
int log_fd)
static struct command_result *reap_child(struct router_child *child)
{
u8 *msg;
while ((msg = wire_sync_read(tmpctx, log_fd)) != NULL) {
enum log_level level;
char *entry;
struct node_id *peer;
if (fromwire_status_log(tmpctx, msg, &level, &peer, &entry))
cmd_log(tmpctx, cmd, level, "%s", entry);
int child_status;
struct timerel time_delta;
const char *err;
waitpid(child->pid, &child_status, 0);
time_delta = timemono_between(time_mono(), child->start);
/* log the time of computation */
cmd_log(tmpctx, child->cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms",
WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in",
time_to_msec(time_delta));
if (WIFSIGNALED(child_status)) {
err = tal_fmt(tmpctx, "child died with signal %u",
WTERMSIG(child_status));
goto fail_broken;
}
/* This is how it indicates an error message */
if (WEXITSTATUS(child_status) != 0 && child->reply_bytes) {
err = tal_strndup(child, child->reply_buf, child->reply_bytes);
goto fail;
}
if (child->reply_bytes == 0) {
err = tal_fmt(child, "child produced no output (exited %i)?",
WEXITSTATUS(child_status));
goto fail_broken;
}
/* Frees child, since it's a child of cmd */
return command_finish_rawstr(child->cmd,
child->reply_buf, child->reply_bytes);
fail_broken:
plugin_log(child->cmd->plugin, LOG_BROKEN, "%s", err);
fail:
assert(err);
/* Frees child, since it's a child of cmd */
return command_fail(child->cmd, PAY_ROUTE_NOT_FOUND, "%s", err);
}
/* Last one out finalizes */
static void log_closed(struct io_conn *conn, struct router_child *child)
{
child->log_conn = NULL;
if (child->reply_conn == NULL)
reap_child(child);
}
static void reply_closed(struct io_conn *conn, struct router_child *child)
{
child->reply_conn = NULL;
if (child->log_conn == NULL)
reap_child(child);
}
static struct io_plan *log_msg_in(struct io_conn *conn,
struct router_child *child)
{
enum log_level level;
char *entry;
struct node_id *peer;
if (fromwire_status_log(tmpctx, child->log_msg, &level, &peer, &entry))
cmd_log(tmpctx, child->cmd, level, "%s", entry);
else {
cmd_log(tmpctx, child->cmd, LOG_BROKEN,
"unexpected non-log message %s",
tal_hex(tmpctx, child->log_msg));
}
return io_read_wire(conn, child, &child->log_msg, log_msg_in, child);
}
static struct io_plan *child_log_init(struct io_conn *conn,
struct router_child *child)
{
io_set_finish(conn, log_closed, child);
return io_read_wire(conn, child, &child->log_msg, log_msg_in, child);
}
static size_t remaining_read_len(const struct router_child *child)
{
return tal_bytelen(child->reply_buf) - child->reply_bytes;
}
static struct io_plan *child_reply_in(struct io_conn *conn,
struct router_child *child)
{
child->reply_bytes += child->this_reply_len;
if (remaining_read_len(child) < 64)
tal_resize(&child->reply_buf, tal_bytelen(child->reply_buf) * 2);
return io_read_partial(conn,
child->reply_buf + child->reply_bytes,
remaining_read_len(child),
&child->this_reply_len,
child_reply_in, child);
}
static struct io_plan *child_reply_init(struct io_conn *conn,
struct router_child *child)
{
io_set_finish(conn, reply_closed, child);
child->reply_buf = tal_arr(child, char, 64);
child->reply_bytes = 0;
child->this_reply_len = 0;
return child_reply_in(conn, child);
}
static void destroy_router_child(struct router_child *child)
{
list_del(&child->list);
}
static struct command_result *do_getroutes(struct command *cmd,
@@ -395,12 +518,14 @@ static struct command_result *do_getroutes(struct command *cmd,
struct askrene *askrene = get_askrene(cmd->plugin);
const struct gossmap_node *me;
bool include_fees;
const char *err, *json;
struct timemono time_start, deadline;
int child_fd, log_fd, child_pid, child_status;
const char *err;
struct timemono deadline;
int child_fd, log_fd;
struct router_child *child;
const struct layer **layers;
s8 *biases;
fp16_t *capacities;
int ecode;
/* update the gossmap */
if (gossmap_refresh(askrene->gossmap)) {
@@ -494,8 +619,9 @@ static struct command_result *do_getroutes(struct command *cmd,
include_fees = have_layer(info->layers, "auto.include_fees");
time_start = time_mono();
deadline = timemono_add(time_start,
child = tal(cmd, struct router_child);
child->start = time_mono();
deadline = timemono_add(child->start,
time_from_sec(askrene->route_seconds));
child_fd = fork_router_child(askrene->gossmap,
layers,
@@ -507,47 +633,27 @@ static struct command_result *do_getroutes(struct command *cmd,
deadline, srcnode, dstnode, info->amount,
info->maxfee, info->finalcltv, info->maxdelay, info->maxparts,
include_fees,
cmd->id, cmd->filter, &log_fd, &child_pid);
if (child_fd == -1) {
err = tal_fmt(tmpctx, "failed to fork: %s", strerror(errno));
goto fail_broken;
}
/* FIXME: Go async! */
process_child_logs(cmd, log_fd);
close(log_fd);
json = grab_fd_str(cmd, child_fd);
close(child_fd);
waitpid(child_pid, &child_status, 0);
struct timerel time_delta = timemono_between(time_mono(), time_start);
/* log the time of computation */
cmd_log(tmpctx, cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms",
WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in",
time_to_msec(time_delta));
if (WIFSIGNALED(child_status)) {
err = tal_fmt(tmpctx, "child died with signal %u",
WTERMSIG(child_status));
goto fail_broken;
}
/* This is how it indicates an error message */
if (WEXITSTATUS(child_status) != 0 && json) {
err = json;
goto fail;
}
if (!json) {
err = tal_fmt(tmpctx, "child produced no output (exited %i)?",
WEXITSTATUS(child_status));
goto fail_broken;
}
/* At last we remove the localmods from the gossmap. */
cmd->id, cmd->filter, &log_fd, &child->pid);
/* Save this, as remove_localmods won't preserve it. */
ecode = errno;
/* We don't need this any more. */
gossmap_remove_localmods(askrene->gossmap, localmods);
/* Child already created this fully formed. We just paste it */
return command_finish_rawstr(cmd, json, strlen(json));
if (child_fd == -1) {
err = tal_fmt(tmpctx, "failed to fork: %s", strerror(ecode));
gossmap_remove_localmods(askrene->gossmap, localmods);
goto fail_broken;
}
child->reply_conn = io_new_conn(child, child_fd,
child_reply_init, child);
child->log_conn = io_new_conn(child, log_fd, child_log_init, child);
child->cmd = cmd;
/* FIXME: limit parallelism! */
list_add_tail(&askrene->children, &child->list);
tal_add_destructor(child, destroy_router_child);
return command_still_pending(cmd);
fail_broken:
plugin_log(cmd->plugin, LOG_BROKEN, "%s", err);
@@ -1248,6 +1354,7 @@ static const char *init(struct command *init_cmd,
askrene->plugin = plugin;
askrene->layers = new_layer_name_hash(askrene);
list_head_init(&askrene->children);
askrene->reserved = new_reserve_htable(askrene);
askrene->gossmap = gossmap_load(askrene, GOSSIP_STORE_FILENAME,
plugin_gossmap_logcb, plugin);

View File

@@ -28,6 +28,8 @@ struct askrene {
struct command *layer_cmd;
/* How long before we abort trying to find a route? */
u32 route_seconds;
/* Routing children currently in flight. */
struct list_head children;
};
/* Useful plugin->askrene mapping */