From 4958cd3289bc0c8398aec3c6862cea739857a36a Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 23 Oct 2025 14:23:05 +1030 Subject: [PATCH] lightningd: use jsonrpc_io for plugin JSON commands. Signed-off-by: Rusty Russell --- lightningd/plugin.c | 320 +++++++++++++++++--------------------------- lightningd/plugin.h | 5 +- 2 files changed, 123 insertions(+), 202 deletions(-) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 9e7a8acd1..41a2d61e7 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -221,36 +222,36 @@ static bool request_add(const char *reqid, struct jsonrpc_request *req, } /* FIXME: reorder */ -static const char *plugin_read_json_one(struct plugin *plugin, - bool want_transaction, - bool *complete, - bool *destroyed); +static void plugin_response_handle(struct plugin *plugin, + const char *buffer, + const jsmntok_t *toks, + const jsmntok_t *idtok); /* We act as if the plugin itself said "I'm dead!" */ static void plugin_terminated_fail_req(struct plugin *plugin, struct jsonrpc_request *req) { - bool complete, destroyed; - const char *err; + jsmntok_t *toks = toks_alloc(plugin); + const jsmntok_t *idtok; + const char *buf; + jsmn_parser parser; + bool complete; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - tal_free(plugin->buffer); - plugin->buffer = tal_fmt(plugin, - "{\"jsonrpc\": \"2.0\"," - "\"id\": %s," - "\"error\":" - " {\"code\":%i, \"message\":\"%s\"}" - "}\n\n", - req->id, - PLUGIN_TERMINATED, - "Plugin terminated before replying to RPC call."); - plugin->used = strlen(plugin->buffer); - - /* We're already in a transaction, don't do it again! */ - err = plugin_read_json_one(plugin, false, &complete, &destroyed); - assert(!err); + buf = tal_fmt(plugin, + "{\"jsonrpc\": \"2.0\"," + "\"id\": %s," + "\"error\":" + " {\"code\":%i, \"message\":\"%s\"}" + "}\n\n", + req->id, + PLUGIN_TERMINATED, + "Plugin terminated before replying to RPC call."); + jsmn_init(&parser); + if (!json_parse_input(&parser, &toks, buf, strlen(buf), &complete)) + abort(); assert(complete); + idtok = json_get_member(buf, toks, "id"); + plugin_response_handle(plugin, buf, toks, idtok); } static void destroy_plugin(struct plugin *p) @@ -371,7 +372,6 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES, p->plugin_state = UNCONFIGURED; p->js_arr = tal_arr(p, struct json_stream *, 0); - p->used = 0; p->notification_topics = tal_arr(p, const char *, 0); p->subscriptions = NULL; p->dynamic = false; @@ -701,186 +701,112 @@ static void plugin_response_handle(struct plugin *plugin, tal_free(ctx); } -/** - * Try to parse a complete message from the plugin's buffer. - * - * Returns NULL if there was no error. - * If it can parse a JSON message, sets *@complete, and returns any error - * from the callback. - * - * If @destroyed was set, it means the plugin called plugin stop on itself. - */ -static const char *plugin_read_json_one(struct plugin *plugin, - bool want_transaction, - bool *complete, - bool *destroyed) -{ - const jsmntok_t *jrtok, *idtok; - struct plugin_destroyed *pd; - const char *err; - struct wallet *wallet = plugin->plugins->ld->wallet; - - *destroyed = false; - /* Note that in the case of 'plugin stop' this can free request (since - * plugin is parent), so detect that case */ - - if (!json_parse_input(&plugin->parser, &plugin->toks, - plugin->buffer, plugin->used, - complete)) { - return tal_fmt(plugin, - "Failed to parse JSON response '%.*s'", - (int)plugin->used, plugin->buffer); - } - - if (!*complete) { - /* We need more. */ - return NULL; - } - - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->toks) == 1) { - plugin->used = 0; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - /* We need more. */ - *complete = false; - return NULL; - } - - if (plugin->toks->type != JSMN_OBJECT) - return tal_fmt( - plugin, - "JSON-RPC message is not a valid JSON object type"); - - jrtok = json_get_member(plugin->buffer, plugin->toks, "jsonrpc"); - idtok = json_get_member(plugin->buffer, plugin->toks, "id"); - - if (!jrtok) { - return tal_fmt( - plugin, - "JSON-RPC message does not contain \"jsonrpc\" field"); - } - - /* We can be called extremely early, or as db hook, or for - * fake "terminated" request. */ - if (want_transaction) - db_begin_transaction(wallet->db); - - pd = plugin_detect_destruction(plugin); - if (!idtok) { - /* A Notification is a Request object without an "id" - * member. A Request object that is a Notification - * signifies the Client's lack of interest in the - * corresponding Response object, and as such no - * Response object needs to be returned to the - * client. The Server MUST NOT reply to a - * Notification, including those that are within a - * batch request. - * - * https://www.jsonrpc.org/specification#notification - */ - err = plugin_notification_handle(plugin, plugin->buffer, plugin->toks); - - } else { - /* When a rpc call is made, the Server MUST reply with - * a Response, except for in the case of - * Notifications. The Response is expressed as a - * single JSON Object, with the following members: - * - * - jsonrpc: A String specifying the version of the - * JSON-RPC protocol. MUST be exactly "2.0". - * - * - result: This member is REQUIRED on success. This - * member MUST NOT exist if there was an error - * invoking the method. The value of this member is - * determined by the method invoked on the Server. - * - * - error: This member is REQUIRED on error. This - * member MUST NOT exist if there was no error - * triggered during invocation. - * - * - id: This member is REQUIRED. It MUST be the same - * as the value of the id member in the Request - * Object. If there was an error in detecting the id - * in the Request object (e.g. Parse error/Invalid - * Request), it MUST be Null. Either the result - * member or error member MUST be included, but both - * members MUST NOT be included. - * - * https://www.jsonrpc.org/specification#response_object - */ - plugin_response_handle(plugin, plugin->buffer, plugin->toks, idtok); - err = NULL; - } - if (want_transaction) - db_commit_transaction(wallet->db); - - /* Corner case: rpc_command hook can destroy plugin for 'plugin - * stop'! */ - if (was_plugin_destroyed(pd)) { - *destroyed = true; - } else { - /* Move this object out of the buffer */ - memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end, - tal_count(plugin->buffer) - plugin->toks[0].end); - plugin->used -= plugin->toks[0].end; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - } - return err; -} - +/* Try to parse complete messages from the plugin's buffer. */ static struct io_plan *plugin_read_json(struct io_conn *conn, struct plugin *plugin) { - bool success; - bool have_full; + struct wallet *wallet = plugin->plugins->ld->wallet; + const char *new_bytes, *buffer; + const jsmntok_t *toks; + size_t new_bytes_len; /* wallet is NULL in really early code */ bool want_transaction = (plugin->plugins->want_db_transaction - && plugin->plugins->ld->wallet != NULL); + && wallet != NULL); - log_io(plugin->log, LOG_IO_IN, NULL, "", - plugin->buffer + plugin->used, plugin->len_read); + new_bytes = jsonrpc_newly_read(plugin->json_in, &new_bytes_len); + if (new_bytes_len) { + log_io(plugin->log, LOG_IO_IN, NULL, "", + new_bytes, new_bytes_len); + } - /* Our JSON parser is pretty good at incremental parsing, but - * `getrawblock` gives a giant 2MB token, which forces it to re-parse - * every time until we have all of it. However, we can't complete a - * JSON object without a '}', so we do a cheaper check here. - */ - have_full = memchr(plugin->buffer + plugin->used, '}', - plugin->len_read); + /* Parse until we get incomplete JSON */ + for (;;) { + const char *error; + const jsmntok_t *idtok; + struct plugin_destroyed *pd; - plugin->used += plugin->len_read; - if (plugin->used == tal_count(plugin->buffer)) - tal_resize(&plugin->buffer, plugin->used * 2); + error = jsonrpc_io_parse(tmpctx, plugin->json_in, + &toks, &buffer); + if (error) { + plugin_kill(plugin, LOG_UNUSUAL, "%s", error); + /* plugin_kill frees plugin */ + return io_close(NULL); + } + /* Incomplete? */ + if (!toks) + break; - /* Read and process all messages from the connection */ - if (have_full) { - do { - bool destroyed; - const char *err; + idtok = json_get_member(buffer, toks, "id"); - err = plugin_read_json_one(plugin, want_transaction, - &success, &destroyed); + /* We can be called extremely early, or as db hook, or for + * fake "terminated" request. */ + if (want_transaction) + db_begin_transaction(wallet->db); - /* If it's destroyed, conn is already freed! */ - if (destroyed) - return io_close(NULL); + pd = plugin_detect_destruction(plugin); + if (!idtok) { + /* A Notification is a Request object without an "id" + * member. A Request object that is a Notification + * signifies the Client's lack of interest in the + * corresponding Response object, and as such no + * Response object needs to be returned to the + * client. The Server MUST NOT reply to a + * Notification, including those that are within a + * batch request. + * + * https://www.jsonrpc.org/specification#notification + */ + error = plugin_notification_handle(plugin, buffer, toks); + } else { + /* When a rpc call is made, the Server MUST reply with + * a Response, except for in the case of + * Notifications. The Response is expressed as a + * single JSON Object, with the following members: + * + * - jsonrpc: A String specifying the version of the + * JSON-RPC protocol. MUST be exactly "2.0". + * + * - result: This member is REQUIRED on success. This + * member MUST NOT exist if there was an error + * invoking the method. The value of this member is + * determined by the method invoked on the Server. + * + * - error: This member is REQUIRED on error. This + * member MUST NOT exist if there was no error + * triggered during invocation. + * + * - id: This member is REQUIRED. It MUST be the same + * as the value of the id member in the Request + * Object. If there was an error in detecting the id + * in the Request object (e.g. Parse error/Invalid + * Request), it MUST be Null. Either the result + * member or error member MUST be included, but both + * members MUST NOT be included. + * + * https://www.jsonrpc.org/specification#response_object + */ + plugin_response_handle(plugin, buffer, toks, idtok); + error = NULL; + } + if (want_transaction) + db_commit_transaction(wallet->db); - if (err) { - plugin_kill(plugin, LOG_UNUSUAL, - "%s", err); - /* plugin_kill frees plugin */ - return io_close(NULL); - } - } while (success); + /* If it's destroyed, conn is already freed! */ + if (was_plugin_destroyed(pd)) + return io_close(NULL); + + if (error) { + plugin_kill(plugin, LOG_UNUSUAL, "%s", error); + /* plugin_kill frees plugin */ + return io_close(NULL); + } + + jsonrpc_io_parse_done(plugin->json_in); } /* Now read more from the connection */ - return io_read_partial(plugin->stdout_conn, - plugin->buffer + plugin->used, - tal_count(plugin->buffer) - plugin->used, - &plugin->len_read, plugin_read_json, plugin); + return jsonrpc_io_read(plugin->stdout_conn, plugin->json_in, + plugin_read_json, plugin); } /* Mutual recursion */ @@ -935,6 +861,7 @@ static void plugin_conn_finish(struct io_conn *conn, struct plugin *plugin) struct io_plan *plugin_stdin_conn_init(struct io_conn *conn, struct plugin *plugin) { + plugin->stdin_conn = conn; /* We write to their stdin */ /* We don't have anything queued yet, wait for notification */ return io_wait(conn, plugin, plugin_write_json, plugin); @@ -944,10 +871,9 @@ struct io_plan *plugin_stdout_conn_init(struct io_conn *conn, struct plugin *plugin) { /* We read from their stdout */ + plugin->stdout_conn = conn; io_set_finish(conn, plugin_conn_finish, plugin); - return io_read_partial(conn, plugin->buffer, - tal_bytelen(plugin->buffer), &plugin->len_read, - plugin_read_json, plugin); + return plugin_read_json(conn, plugin); } static char *plugin_opt_check(struct plugin_opt *popt) @@ -2043,8 +1969,8 @@ static void plugin_set_timeout(struct plugin *p) time_from_sec(PLUGIN_STARTUP_TIMEOUT), plugin_manifest_timeout, p); } -} +} const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id) { char **cmd; @@ -2063,14 +1989,12 @@ const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id) return tal_fmt(p, "opening pipe: %s", strerror(errno)); log_debug(p->plugins->log, "started(%u) %s", p->pid, p->cmd); - p->buffer = tal_arr(p, char, 64); - jsmn_init(&p->parser); - p->toks = toks_alloc(p); + p->json_in = jsonrpc_io_new(p); /* Create two connections, one read-only on top of p->stdout, and one * write-only on p->stdin */ - p->stdout_conn = io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); - p->stdin_conn = io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); + io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); + io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); req = jsonrpc_request_start(p, "getmanifest", cmd_id, p->log, NULL, plugin_manifest_cb, p); json_add_bool(req->stream, "allow-deprecated-apis", diff --git a/lightningd/plugin.h b/lightningd/plugin.h index 3481d3565..fb16d4b25 100644 --- a/lightningd/plugin.h +++ b/lightningd/plugin.h @@ -76,10 +76,7 @@ struct plugin { bool dynamic; /* Stuff we read */ - char *buffer; - size_t used, len_read; - jsmn_parser parser; - jsmntok_t *toks; + struct jsonrpc_io *json_in; /* Our json_streams. Since multiple streams could start * returning data at once, we always service these in order,