From bc4bb2b0ef7261a82cb6ff84e635dadf4f283e89 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 23 Oct 2025 14:23:05 +1030 Subject: [PATCH] libplugin: use jsonrpc_io logic for sync requests too. It's a little overkill, but it's clear. Signed-off-by: Rusty Russell --- common/jsonrpc_io.c | 23 ++++++ common/jsonrpc_io.h | 9 +++ plugins/libplugin.c | 184 ++++++++++++++++++-------------------------- 3 files changed, 108 insertions(+), 108 deletions(-) diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c index 179b3c1f4..48c46c42c 100644 --- a/common/jsonrpc_io.c +++ b/common/jsonrpc_io.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include #define READ_CHUNKSIZE 64 @@ -126,3 +128,24 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn, &json_in->bytes_read, next, arg); } + +bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd) +{ + int r; + + /* Make sure there's more room */ + membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE); + + /* Try to read more. */ + r = read(infd, + membuf_space(&json_in->membuf), + membuf_num_space(&json_in->membuf)); + if (r < 0) + return false; + if (r == 0) { + errno = 0; + return false; + } + json_in->bytes_read = r; + return true; +} diff --git a/common/jsonrpc_io.h b/common/jsonrpc_io.h index 9032aecc4..fd0babb9d 100644 --- a/common/jsonrpc_io.h +++ b/common/jsonrpc_io.h @@ -43,6 +43,15 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn, const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, size_t *len); +/** + * jsonrpc_sync_read: read from fd into buffer. + * @json_in: buffer to read into. + * @infd: file descriptort to read. + * + * Returns false on error or EOF; for EOF errno will be 0. + */ +bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd); + /** * jsonrpc_io_parse: try to parse more of the buffer. * @ctx: context to allocate error message off. diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 166d0404c..db49590d6 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -1,7 +1,6 @@ #include "config.h" #include #include -#include #include #include #include @@ -31,11 +30,6 @@ struct plugin_timer { void *cb_arg; }; -struct rpc_conn { - int fd; - MEMBUF(char) mb; -}; - /* We can have more than one of these pending at once. */ struct jstream { struct list_node list; @@ -93,7 +87,7 @@ struct plugin { /* To write to lightningd */ struct list_head js_list; - /* Asynchronous RPC interaction */ + /* Asynchronous RPC interaction. */ struct io_conn *io_rpc_conn; struct list_head rpc_js_list; struct jsonrpc_io *jsonrpc_in; @@ -102,8 +96,9 @@ struct plugin { STRMAP(struct out_req *) out_reqs; u64 next_outreq_id; - /* Synchronous RPC interaction */ - struct rpc_conn *rpc_conn; + /* Synchronous RPC interaction: sync_io is NULL if they didn't want it. */ + int sync_fd; + struct jsonrpc_io *sync_io; /* Plugin information details */ enum plugin_restartability restartability; @@ -534,32 +529,6 @@ struct json_out *json_out_obj(const tal_t *ctx, return jout; } -static int read_json_from_rpc(struct plugin *p) -{ - char *end; - - /* We rely on the double-\n marker which only terminates JSON top - * levels. Thanks lightningd! */ - while ((end = memmem(membuf_elems(&p->rpc_conn->mb), - membuf_num_elems(&p->rpc_conn->mb), "\n\n", 2)) - == NULL) { - ssize_t r; - - /* Make sure we've room for at least READ_CHUNKSIZE. */ - membuf_prepare_space(&p->rpc_conn->mb, READ_CHUNKSIZE); - r = read(p->rpc_conn->fd, membuf_space(&p->rpc_conn->mb), - membuf_num_space(&p->rpc_conn->mb)); - /* lightningd goes away, we go away. */ - if (r == 0) - exit(0); - if (r < 0) - plugin_err(p, "Reading JSON input: %s", strerror(errno)); - membuf_added(&p->rpc_conn->mb, r); - } - - return end + 2 - membuf_elems(&p->rpc_conn->mb); -} - /* This closes a JSON response and writes it out. */ static void finish_and_send_json(int fd, struct json_out *jout) { @@ -719,40 +688,63 @@ void command_set_usage(struct command *cmd, const char *usage TAKES) cmd->methodname); } -/* Reads rpc reply and returns tokens, setting contents to 'error' or -- * 'result' (depending on *error). */ -static jsmntok_t *read_rpc_reply(const tal_t *ctx, - struct plugin *plugin, - const jsmntok_t **contents, - bool *error, - int *reqlen) +static const char *read_one_json_sync(struct plugin *p, const jsmntok_t **toks) { - jsmntok_t *toks; + for (;;) { + const char *buf, *error; - do { - *reqlen = read_json_from_rpc(plugin); + error = jsonrpc_io_parse(tmpctx, p->sync_io, toks, &buf); + if (error) + plugin_err(p, "Parsing sync lightningd: %s", error); + if (*toks) + return buf; - toks = json_parse_simple(ctx, - membuf_elems(&plugin->rpc_conn->mb), - *reqlen); - if (!toks) - plugin_err(plugin, "Malformed JSON reply '%.*s'", - *reqlen, membuf_elems(&plugin->rpc_conn->mb)); - /* FIXME: Don't simply ignore notifications here! */ - } while (!json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, - "id")); - - *contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, "error"); - if (*contents) - *error = true; - else { - *contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, - "result"); - if (!*contents) - plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'", - *reqlen, membuf_elems(&plugin->rpc_conn->mb)); - *error = false; + /* lightningd goes away, we go away. */ + if (!jsonrpc_sync_read(p->sync_io, p->sync_fd)) { + if (errno == 0) + exit(0); + else + plugin_err(p, "Reading sync lightningd: %s", + strerror(errno)); + } } +} + +/* Reads rpc reply and returns result tokens */ +static const jsmntok_t *read_sync_rpc_reply(const tal_t *ctx, + struct plugin *plugin, + const char *method, + const char **final_buffer) +{ + const jsmntok_t *errtok, *resulttok, *toks; + const char *buffer; + + for (;;) { + buffer = read_one_json_sync(plugin, &toks); + /* FIXME: Don't simply ignore notifications here! */ + if (json_get_member(buffer, toks, "id")) + break; + jsonrpc_io_parse_done(plugin->sync_io); + } + + errtok = json_get_member(buffer, toks, "error"); + if (errtok) { + plugin_err(plugin, "Got error result to %s: '%.*s'", + method, + json_tok_full_len(toks), + json_tok_full(buffer, toks)); + } + resulttok = json_get_member(buffer, toks, "result"); + if (!resulttok) { + plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'", + json_tok_full_len(toks), + json_tok_full(buffer, toks)); + } + + /* Make the returned pointers valid tal object */ + json_dup_contents(ctx, buffer, resulttok, final_buffer, &toks); + jsonrpc_io_parse_done(plugin->sync_io); + return toks; } @@ -763,13 +755,8 @@ static const jsmntok_t *sync_req(const tal_t *ctx, const struct json_out *params TAKES, const char **resp) { - bool error; - jsmntok_t *toks; - const jsmntok_t *contents; - int reqlen; struct json_out *jout = json_out_new(tmpctx); const char *id = json_id(tmpctx, plugin, "init/", method); - size_t num_toks; json_out_start(jout, NULL, '{'); json_out_addstr(jout, "jsonrpc", "2.0"); @@ -787,23 +774,15 @@ static const jsmntok_t *sync_req(const tal_t *ctx, /* If we're past init, we may need a new fd (the old one * is being used for async comms). */ - if (plugin->rpc_conn->fd == -1) - plugin->rpc_conn->fd = rpc_open(plugin); + if (plugin->sync_fd == -1) { + plugin->sync_fd = rpc_open(plugin); + if (!plugin->sync_io) + plugin->sync_io = jsonrpc_io_new(plugin); + } - finish_and_send_json(plugin->rpc_conn->fd, jout); + finish_and_send_json(plugin->sync_fd, jout); - toks = read_rpc_reply(ctx, plugin, &contents, &error, &reqlen); - if (error) - plugin_err(plugin, "Got error reply to %s: '%.*s'", - method, reqlen, membuf_elems(&plugin->rpc_conn->mb)); - - *resp = membuf_consume(&plugin->rpc_conn->mb, reqlen); - - /* Make the returned pointer the valid tal object of minimal length */ - num_toks = json_next(contents) - contents; - memmove(toks, contents, num_toks * sizeof(*toks)); - tal_resize(&toks, num_toks); - return toks; + return read_sync_rpc_reply(ctx, plugin, method, resp); } const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx, @@ -812,7 +791,6 @@ const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx, const struct json_out *params TAKES, const char **resp) { - return sync_req(ctx, cmd->plugin, method, params, resp); } @@ -1508,7 +1486,6 @@ static struct command_result *handle_init(struct command *cmd, size_t i; char *dir, *network; struct plugin *p = cmd->plugin; - bool with_rpc; const char *err; configtok = json_get_member(buf, params, "configuration"); @@ -1535,17 +1512,10 @@ static struct command_result *handle_init(struct command *cmd, /* Only attempt to connect if the plugin has configured the rpc_conn * already, if that's not the case we were told to run without an RPC * connection, so don't even log an error. */ - if (p->rpc_conn != NULL) { - p->rpc_conn->fd = rpc_open(p); - if (p->rpc_conn->fd == -1) - with_rpc = false; - else - with_rpc = true; - - membuf_init(&p->rpc_conn->mb, tal_arr(p, char, READ_CHUNKSIZE), - READ_CHUNKSIZE, membuf_tal_resize); - } else - with_rpc = false; + if (p->sync_io) + p->sync_fd = rpc_open(p); + else + p->sync_fd = -1; opttok = json_get_member(buf, params, "options"); json_for_each_obj(i, t, opttok) { @@ -1569,11 +1539,12 @@ static struct command_result *handle_init(struct command *cmd, disable)); } - if (with_rpc) { + /* Now set up async. */ + if (p->sync_fd != -1) { struct out_req *req; struct command *aux_cmd = aux_command(cmd); - io_new_conn(p, p->rpc_conn->fd, rpc_conn_init, p); + io_new_conn(p, p->sync_fd, rpc_conn_init, p); /* In case they intercept rpc_command, we can't do this sync. */ req = jsonrpc_request_start(aux_cmd, "listconfigs", get_beglist, plugin_broken_cb, NULL); @@ -1581,7 +1552,7 @@ static struct command_result *handle_init(struct command *cmd, send_outreq(req); /* We will open a new one if we want to be sync. */ - p->rpc_conn->fd = -1; + p->sync_fd = -1; } return command_success(cmd, json_out_obj(cmd, NULL, NULL)); @@ -2341,13 +2312,10 @@ static struct plugin *new_plugin(const tal_t *ctx, p->beglist = NULL; p->desired_features = tal_steal(p, features); - if (init_rpc) { - /* Sync RPC FIXME: maybe go full async ? */ - p->rpc_conn = tal(p, struct rpc_conn); - } else { - p->rpc_conn = NULL; - } - + if (init_rpc) + p->sync_io = jsonrpc_io_new(p); + else + p->sync_io = NULL; p->init = init; p->manifested = p->initialized = p->exiting = false; p->restartability = restartability;