diff --git a/plugins/askrene/askrene.c b/plugins/askrene/askrene.c index d69dc426b..94fe5d715 100644 --- a/plugins/askrene/askrene.c +++ b/plugins/askrene/askrene.c @@ -28,8 +28,29 @@ #include #include #include +#include #include +struct router_child { + /* Inside askrene->children */ + struct list_node list; + struct command *cmd; + struct timemono start; + int pid; + struct io_conn *log_conn; + struct io_conn *reply_conn; + + /* A whole msg read in for logging */ + u8 *log_msg; + + /* How much we've read so far */ + char *reply_buf; + size_t reply_bytes; + + /* How much we just read (populated by io_read_partial) */ + size_t this_reply_len; +}; + static bool have_layer(const char **layers, const char *name) { for (size_t i = 0; i < tal_count(layers); i++) { @@ -375,17 +396,119 @@ static const struct layer **apply_layers(const tal_t *ctx, return layers; } -static void process_child_logs(struct command *cmd, - int log_fd) +static struct command_result *reap_child(struct router_child *child) { - u8 *msg; - while ((msg = wire_sync_read(tmpctx, log_fd)) != NULL) { - enum log_level level; - char *entry; - struct node_id *peer; - if (fromwire_status_log(tmpctx, msg, &level, &peer, &entry)) - cmd_log(tmpctx, cmd, level, "%s", entry); + int child_status; + struct timerel time_delta; + const char *err; + + waitpid(child->pid, &child_status, 0); + time_delta = timemono_between(time_mono(), child->start); + + /* log the time of computation */ + cmd_log(tmpctx, child->cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms", + WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in", + time_to_msec(time_delta)); + + if (WIFSIGNALED(child_status)) { + err = tal_fmt(tmpctx, "child died with signal %u", + WTERMSIG(child_status)); + goto fail_broken; } + + /* This is how it indicates an error message */ + if (WEXITSTATUS(child_status) != 0 && child->reply_bytes) { + err = tal_strndup(child, child->reply_buf, child->reply_bytes); + goto fail; + } + if (child->reply_bytes == 0) { + err = tal_fmt(child, "child produced no output (exited %i)?", + WEXITSTATUS(child_status)); + goto fail_broken; + } + + /* Frees child, since it's a child of cmd */ + return command_finish_rawstr(child->cmd, + child->reply_buf, child->reply_bytes); + +fail_broken: + plugin_log(child->cmd->plugin, LOG_BROKEN, "%s", err); +fail: + assert(err); + /* Frees child, since it's a child of cmd */ + return command_fail(child->cmd, PAY_ROUTE_NOT_FOUND, "%s", err); +} + +/* Last one out finalizes */ +static void log_closed(struct io_conn *conn, struct router_child *child) +{ + child->log_conn = NULL; + if (child->reply_conn == NULL) + reap_child(child); +} + +static void reply_closed(struct io_conn *conn, struct router_child *child) +{ + child->reply_conn = NULL; + if (child->log_conn == NULL) + reap_child(child); +} + +static struct io_plan *log_msg_in(struct io_conn *conn, + struct router_child *child) +{ + enum log_level level; + char *entry; + struct node_id *peer; + + if (fromwire_status_log(tmpctx, child->log_msg, &level, &peer, &entry)) + cmd_log(tmpctx, child->cmd, level, "%s", entry); + else { + cmd_log(tmpctx, child->cmd, LOG_BROKEN, + "unexpected non-log message %s", + tal_hex(tmpctx, child->log_msg)); + } + return io_read_wire(conn, child, &child->log_msg, log_msg_in, child); +} + +static struct io_plan *child_log_init(struct io_conn *conn, + struct router_child *child) +{ + io_set_finish(conn, log_closed, child); + return io_read_wire(conn, child, &child->log_msg, log_msg_in, child); +} + +static size_t remaining_read_len(const struct router_child *child) +{ + return tal_bytelen(child->reply_buf) - child->reply_bytes; +} + +static struct io_plan *child_reply_in(struct io_conn *conn, + struct router_child *child) +{ + child->reply_bytes += child->this_reply_len; + if (remaining_read_len(child) < 64) + tal_resize(&child->reply_buf, tal_bytelen(child->reply_buf) * 2); + return io_read_partial(conn, + child->reply_buf + child->reply_bytes, + remaining_read_len(child), + &child->this_reply_len, + child_reply_in, child); +} + +static struct io_plan *child_reply_init(struct io_conn *conn, + struct router_child *child) +{ + io_set_finish(conn, reply_closed, child); + child->reply_buf = tal_arr(child, char, 64); + child->reply_bytes = 0; + child->this_reply_len = 0; + return child_reply_in(conn, child); +} + +static void destroy_router_child(struct router_child *child) +{ + list_del(&child->list); } static struct command_result *do_getroutes(struct command *cmd, @@ -395,12 +518,14 @@ static struct command_result *do_getroutes(struct command *cmd, struct askrene *askrene = get_askrene(cmd->plugin); const struct gossmap_node *me; bool include_fees; - const char *err, *json; - struct timemono time_start, deadline; - int child_fd, log_fd, child_pid, child_status; + const char *err; + struct timemono deadline; + int child_fd, log_fd; + struct router_child *child; const struct layer **layers; s8 *biases; fp16_t *capacities; + int ecode; /* update the gossmap */ if (gossmap_refresh(askrene->gossmap)) { @@ -494,8 +619,9 @@ static struct command_result *do_getroutes(struct command *cmd, include_fees = have_layer(info->layers, "auto.include_fees"); - time_start = time_mono(); - deadline = timemono_add(time_start, + child = tal(cmd, struct router_child); + child->start = time_mono(); + deadline = timemono_add(child->start, time_from_sec(askrene->route_seconds)); child_fd = fork_router_child(askrene->gossmap, layers, @@ -507,47 +633,27 @@ static struct command_result *do_getroutes(struct command *cmd, deadline, srcnode, dstnode, info->amount, info->maxfee, info->finalcltv, info->maxdelay, info->maxparts, include_fees, - cmd->id, cmd->filter, &log_fd, &child_pid); - if (child_fd == -1) { - err = tal_fmt(tmpctx, "failed to fork: %s", strerror(errno)); - goto fail_broken; - } - - /* FIXME: Go async! */ - process_child_logs(cmd, log_fd); - close(log_fd); - json = grab_fd_str(cmd, child_fd); - close(child_fd); - waitpid(child_pid, &child_status, 0); - - struct timerel time_delta = timemono_between(time_mono(), time_start); - - /* log the time of computation */ - cmd_log(tmpctx, cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms", - WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in", - time_to_msec(time_delta)); - - if (WIFSIGNALED(child_status)) { - err = tal_fmt(tmpctx, "child died with signal %u", - WTERMSIG(child_status)); - goto fail_broken; - } - /* This is how it indicates an error message */ - if (WEXITSTATUS(child_status) != 0 && json) { - err = json; - goto fail; - } - if (!json) { - err = tal_fmt(tmpctx, "child produced no output (exited %i)?", - WEXITSTATUS(child_status)); - goto fail_broken; - } - - /* At last we remove the localmods from the gossmap. */ + cmd->id, cmd->filter, &log_fd, &child->pid); + /* Save this, as remove_localmods won't preserve it. */ + ecode = errno; + /* We don't need this any more. */ gossmap_remove_localmods(askrene->gossmap, localmods); - /* Child already created this fully formed. We just paste it */ - return command_finish_rawstr(cmd, json, strlen(json)); + if (child_fd == -1) { + err = tal_fmt(tmpctx, "failed to fork: %s", strerror(ecode)); + gossmap_remove_localmods(askrene->gossmap, localmods); + goto fail_broken; + } + + child->reply_conn = io_new_conn(child, child_fd, + child_reply_init, child); + child->log_conn = io_new_conn(child, log_fd, child_log_init, child); + child->cmd = cmd; + + /* FIXME: limit parallelism! */ + list_add_tail(&askrene->children, &child->list); + tal_add_destructor(child, destroy_router_child); + return command_still_pending(cmd); fail_broken: plugin_log(cmd->plugin, LOG_BROKEN, "%s", err); @@ -1248,6 +1354,7 @@ static const char *init(struct command *init_cmd, askrene->plugin = plugin; askrene->layers = new_layer_name_hash(askrene); + list_head_init(&askrene->children); askrene->reserved = new_reserve_htable(askrene); askrene->gossmap = gossmap_load(askrene, GOSSIP_STORE_FILENAME, plugin_gossmap_logcb, plugin); diff --git a/plugins/askrene/askrene.h b/plugins/askrene/askrene.h index 4dec403a1..a125102fc 100644 --- a/plugins/askrene/askrene.h +++ b/plugins/askrene/askrene.h @@ -28,6 +28,8 @@ struct askrene { struct command *layer_cmd; /* How long before we abort trying to find a route? */ u32 route_seconds; + /* Routing children currently in flight. */ + struct list_head children; }; /* Useful plugin->askrene mapping */