diff --git a/common/json_stream.h b/common/json_stream.h index 05e2df74c..7756c013d 100644 --- a/common/json_stream.h +++ b/common/json_stream.h @@ -9,6 +9,7 @@ # include #include +#include #include #include #include @@ -36,6 +37,8 @@ struct wireaddr; struct wireaddr_internal; struct json_stream { + struct list_node list; + struct json_out *jout; /* Who is writing to this buffer now; NULL if nobody is. */ diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 76baaa2d9..6e2d5f7a5 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -96,8 +96,8 @@ struct json_connection { /* Our json_streams (owned by the commands themselves while running). * Since multiple streams could start returning data at once, we - * always service these in order, freeing once empty. */ - struct json_stream **js_arr; + * always service these in order. */ + struct list_head jsouts; }; /** @@ -125,25 +125,10 @@ static struct json_stream *jcon_new_json_stream(const tal_t *ctx, /* Wake writer to start streaming, in case it's not already. */ io_wake(jcon); - - /* FIXME: Keep streams around for recycling. */ - tal_arr_expand(&jcon->js_arr, js); + list_add_tail(&jcon->jsouts, &js->list); return js; } -static void jcon_remove_json_stream(struct json_connection *jcon, - struct json_stream *js) -{ - for (size_t i = 0; i < tal_count(jcon->js_arr); i++) { - if (js != jcon->js_arr[i]) - continue; - - tal_arr_remove(&jcon->js_arr, i); - return; - } - abort(); -} - /* jcon and cmd have separate lifetimes: we detach them on either destruction */ static void destroy_jcon(struct json_connection *jcon) { @@ -1156,13 +1141,16 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, static struct io_plan *start_json_stream(struct io_conn *conn, struct json_connection *jcon) { + struct json_stream *js; + /* If something has created an output buffer, start streaming. */ - if (tal_count(jcon->js_arr)) { + js = list_top(&jcon->jsouts, struct json_stream, list); + if (js) { size_t len; - const char *p = json_out_contents(jcon->js_arr[0]->jout, &len); + const char *p = json_out_contents(js->jout, &len); if (len) log_io(jcon->log, LOG_IO_OUT, NULL, "", p, len); - return json_stream_output(jcon->js_arr[0], conn, + return json_stream_output(js, conn, stream_out_complete, jcon); } @@ -1186,7 +1174,7 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, struct json_stream *js, struct json_connection *jcon) { - jcon_remove_json_stream(jcon, js); + list_del_from(&jcon->jsouts, &js->list); tal_free(js); /* Wait for more output. */ @@ -1207,7 +1195,7 @@ static struct io_plan *read_json(struct io_conn *conn, log_io(jcon->log, LOG_IO_IN, NULL, "", buffer, len_read); /* We wait for pending output to be consumed, to avoid DoS */ - if (tal_count(jcon->js_arr) != 0) { + if (!list_empty(&jcon->jsouts)) { return io_wait(conn, conn, read_json, jcon); } @@ -1262,7 +1250,7 @@ static struct io_plan *jcon_connected(struct io_conn *conn, jcon = notleak(tal(conn, struct json_connection)); jcon->conn = conn; jcon->ld = ld; - jcon->js_arr = tal_arr(jcon, struct json_stream *, 0); + list_head_init(&jcon->jsouts); jcon->json_in = jsonrpc_io_new(jcon); jcon->notifications_enabled = false; jcon->db_batching = false; diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 17e4a1922..e18fbb63e 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -372,7 +372,7 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES, p->can_check = false; p->plugin_state = UNCONFIGURED; - p->js_arr = tal_arr(p, struct json_stream *, 0); + list_head_init(&p->jsouts); p->notification_topics = tal_arr(p, const char *, 0); p->subscriptions = NULL; p->dynamic = false; @@ -474,8 +474,8 @@ void plugin_kill(struct plugin *plugin, enum log_level loglevel, */ static void plugin_send(struct plugin *plugin, struct json_stream *stream) { - tal_steal(plugin->js_arr, stream); - tal_arr_expand(&plugin->js_arr, stream); + tal_steal(plugin, stream); + list_add_tail(&plugin->jsouts, &stream->list); io_wake(plugin); } @@ -816,10 +816,7 @@ static struct io_plan *plugin_write_json(struct io_conn *conn, static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin) { - assert(tal_count(plugin->js_arr) > 0); - /* Remove js and shift all remainig over */ - tal_arr_remove(&plugin->js_arr, 0); - + list_del_from(&plugin->jsouts, &js->list); /* It got dropped off the queue, free it. */ tal_free(js); @@ -829,8 +826,11 @@ static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_ static struct io_plan *plugin_write_json(struct io_conn *conn, struct plugin *plugin) { - if (tal_count(plugin->js_arr)) { - return json_stream_output(plugin->js_arr[0], plugin->stdin_conn, plugin_stream_complete, plugin); + struct json_stream *js; + + js = list_top(&plugin->jsouts, struct json_stream, list); + if (js) { + return json_stream_output(js, plugin->stdin_conn, plugin_stream_complete, plugin); } return io_out_wait(conn, plugin, plugin_write_json, plugin); diff --git a/lightningd/plugin.h b/lightningd/plugin.h index fb16d4b25..23b7554b3 100644 --- a/lightningd/plugin.h +++ b/lightningd/plugin.h @@ -78,10 +78,9 @@ struct plugin { /* Stuff we read */ struct jsonrpc_io *json_in; - /* Our json_streams. Since multiple streams could start - * returning data at once, we always service these in order, - * freeing once empty. */ - struct json_stream **js_arr; + /* Our plugin_jstream_out list. Since multiple streams could start + * returning data at once, we always service these in order. */ + struct list_head jsouts; struct logger *log;