lightningd: handle large numbers of command outputs gracefully.

Profiling shows us spending all our time in tal_arr_remove when dealing
with a giant number of output streams.  This applies both for RPC output
and plugin output.

Use linked list instead.

tests/test_coinmoves.py::test_generate_coinmoves (2,000,000, sqlite3):
	Time (from start to end of l2 node):	239 seconds **WAS 518**
	Worst latency:				56.9 seconds **WAS 353**

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2025-11-20 12:07:12 +10:30
parent 6006467824
commit 8707b7312a
4 changed files with 27 additions and 37 deletions

View File

@@ -9,6 +9,7 @@
# include <external/jsmn/jsmn.h>
#include <bitcoin/short_channel_id.h>
#include <ccan/list/list.h>
#include <ccan/time/time.h>
#include <common/amount.h>
#include <common/jsonrpc_errors.h>
@@ -36,6 +37,8 @@ struct wireaddr;
struct wireaddr_internal;
struct json_stream {
struct list_node list;
struct json_out *jout;
/* Who is writing to this buffer now; NULL if nobody is. */

View File

@@ -96,8 +96,8 @@ struct json_connection {
/* Our json_streams (owned by the commands themselves while running).
* Since multiple streams could start returning data at once, we
* always service these in order, freeing once empty. */
struct json_stream **js_arr;
* always service these in order. */
struct list_head jsouts;
};
/**
@@ -125,25 +125,10 @@ static struct json_stream *jcon_new_json_stream(const tal_t *ctx,
/* Wake writer to start streaming, in case it's not already. */
io_wake(jcon);
/* FIXME: Keep streams around for recycling. */
tal_arr_expand(&jcon->js_arr, js);
list_add_tail(&jcon->jsouts, &js->list);
return js;
}
static void jcon_remove_json_stream(struct json_connection *jcon,
struct json_stream *js)
{
for (size_t i = 0; i < tal_count(jcon->js_arr); i++) {
if (js != jcon->js_arr[i])
continue;
tal_arr_remove(&jcon->js_arr, i);
return;
}
abort();
}
/* jcon and cmd have separate lifetimes: we detach them on either destruction */
static void destroy_jcon(struct json_connection *jcon)
{
@@ -1156,13 +1141,16 @@ static struct io_plan *stream_out_complete(struct io_conn *conn,
static struct io_plan *start_json_stream(struct io_conn *conn,
struct json_connection *jcon)
{
struct json_stream *js;
/* If something has created an output buffer, start streaming. */
if (tal_count(jcon->js_arr)) {
js = list_top(&jcon->jsouts, struct json_stream, list);
if (js) {
size_t len;
const char *p = json_out_contents(jcon->js_arr[0]->jout, &len);
const char *p = json_out_contents(js->jout, &len);
if (len)
log_io(jcon->log, LOG_IO_OUT, NULL, "", p, len);
return json_stream_output(jcon->js_arr[0], conn,
return json_stream_output(js, conn,
stream_out_complete, jcon);
}
@@ -1186,7 +1174,7 @@ static struct io_plan *stream_out_complete(struct io_conn *conn,
struct json_stream *js,
struct json_connection *jcon)
{
jcon_remove_json_stream(jcon, js);
list_del_from(&jcon->jsouts, &js->list);
tal_free(js);
/* Wait for more output. */
@@ -1207,7 +1195,7 @@ static struct io_plan *read_json(struct io_conn *conn,
log_io(jcon->log, LOG_IO_IN, NULL, "", buffer, len_read);
/* We wait for pending output to be consumed, to avoid DoS */
if (tal_count(jcon->js_arr) != 0) {
if (!list_empty(&jcon->jsouts)) {
return io_wait(conn, conn, read_json, jcon);
}
@@ -1262,7 +1250,7 @@ static struct io_plan *jcon_connected(struct io_conn *conn,
jcon = notleak(tal(conn, struct json_connection));
jcon->conn = conn;
jcon->ld = ld;
jcon->js_arr = tal_arr(jcon, struct json_stream *, 0);
list_head_init(&jcon->jsouts);
jcon->json_in = jsonrpc_io_new(jcon);
jcon->notifications_enabled = false;
jcon->db_batching = false;

View File

@@ -372,7 +372,7 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES,
p->can_check = false;
p->plugin_state = UNCONFIGURED;
p->js_arr = tal_arr(p, struct json_stream *, 0);
list_head_init(&p->jsouts);
p->notification_topics = tal_arr(p, const char *, 0);
p->subscriptions = NULL;
p->dynamic = false;
@@ -474,8 +474,8 @@ void plugin_kill(struct plugin *plugin, enum log_level loglevel,
*/
static void plugin_send(struct plugin *plugin, struct json_stream *stream)
{
tal_steal(plugin->js_arr, stream);
tal_arr_expand(&plugin->js_arr, stream);
tal_steal(plugin, stream);
list_add_tail(&plugin->jsouts, &stream->list);
io_wake(plugin);
}
@@ -816,10 +816,7 @@ static struct io_plan *plugin_write_json(struct io_conn *conn,
static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin)
{
assert(tal_count(plugin->js_arr) > 0);
/* Remove js and shift all remainig over */
tal_arr_remove(&plugin->js_arr, 0);
list_del_from(&plugin->jsouts, &js->list);
/* It got dropped off the queue, free it. */
tal_free(js);
@@ -829,8 +826,11 @@ static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_
static struct io_plan *plugin_write_json(struct io_conn *conn,
struct plugin *plugin)
{
if (tal_count(plugin->js_arr)) {
return json_stream_output(plugin->js_arr[0], plugin->stdin_conn, plugin_stream_complete, plugin);
struct json_stream *js;
js = list_top(&plugin->jsouts, struct json_stream, list);
if (js) {
return json_stream_output(js, plugin->stdin_conn, plugin_stream_complete, plugin);
}
return io_out_wait(conn, plugin, plugin_write_json, plugin);

View File

@@ -78,10 +78,9 @@ struct plugin {
/* Stuff we read */
struct jsonrpc_io *json_in;
/* Our json_streams. Since multiple streams could start
* returning data at once, we always service these in order,
* freeing once empty. */
struct json_stream **js_arr;
/* Our plugin_jstream_out list. Since multiple streams could start
* returning data at once, we always service these in order. */
struct list_head jsouts;
struct logger *log;