libplugin: use jsonrpc_io for reading replies to our async commands.

This will also be more efficient than doing memmove every time.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2025-10-23 14:23:04 +10:30
parent 68f6a1a1dc
commit 690f95ff1f

View File

@@ -11,6 +11,7 @@
#include <common/json_filter.h>
#include <common/json_param.h>
#include <common/json_stream.h>
#include <common/jsonrpc_io.h>
#include <common/memleak.h>
#include <common/plugin.h>
#include <common/route.h>
@@ -98,10 +99,8 @@ struct plugin {
/* Asynchronous RPC interaction */
struct io_conn *io_rpc_conn;
struct list_head rpc_js_list;
char *rpc_buffer;
size_t rpc_used, rpc_len_read, rpc_read_offset;
jsmn_parser rpc_parser;
jsmntok_t *rpc_toks;
struct jsonrpc_io *jsonrpc_in;
/* Tracking async RPC requests */
STRMAP(struct out_req *) out_reqs;
u64 next_outreq_id;
@@ -1365,73 +1364,33 @@ static void rpc_conn_finished(struct io_conn *conn,
plugin_err(plugin, "Lost connection to the RPC socket.");
}
static bool rpc_read_response_one(struct plugin *plugin)
{
const jsmntok_t *jrtok;
bool complete;
if (!json_parse_input(&plugin->rpc_parser, &plugin->rpc_toks,
plugin->rpc_buffer + plugin->rpc_read_offset,
plugin->rpc_used - plugin->rpc_read_offset,
&complete)) {
plugin_err(plugin, "Failed to parse RPC JSON response '%.*s'",
(int)(plugin->rpc_used - plugin->rpc_read_offset),
plugin->rpc_buffer + plugin->rpc_read_offset);
}
if (!complete) {
/* We need more. */
goto compact;
}
/* Empty buffer? (eg. just whitespace). */
if (tal_count(plugin->rpc_toks) == 1) {
jsmn_init(&plugin->rpc_parser);
toks_reset(plugin->rpc_toks);
goto compact;
}
jrtok = json_get_member(plugin->rpc_buffer + plugin->rpc_read_offset,
plugin->rpc_toks, "jsonrpc");
if (!jrtok) {
plugin_err(plugin, "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'",
(int)(plugin->rpc_used - plugin->rpc_read_offset),
plugin->rpc_buffer + plugin->rpc_read_offset);
}
handle_rpc_reply(plugin, plugin->rpc_buffer + plugin->rpc_read_offset, plugin->rpc_toks);
/* Move this object out of the buffer */
plugin->rpc_read_offset += plugin->rpc_toks[0].end;
jsmn_init(&plugin->rpc_parser);
toks_reset(plugin->rpc_toks);
return true;
compact:
memmove(plugin->rpc_buffer, plugin->rpc_buffer + plugin->rpc_read_offset,
plugin->rpc_used - plugin->rpc_read_offset);
plugin->rpc_used -= plugin->rpc_read_offset;
plugin->rpc_read_offset = 0;
return false;
}
static struct io_plan *rpc_conn_read_response(struct io_conn *conn,
struct plugin *plugin)
{
plugin->rpc_used += plugin->rpc_len_read;
if (plugin->rpc_used == tal_count(plugin->rpc_buffer))
tal_resize(&plugin->rpc_buffer, plugin->rpc_used * 2);
/* Gather an parse any new bytes */
for (;;) {
const jsmntok_t *toks;
const char *buf;
const char *err;
/* Read and process all messages from the connection */
while (rpc_read_response_one(plugin))
;
err = jsonrpc_io_parse(tmpctx,
plugin->jsonrpc_in,
&toks, &buf);
if (err)
plugin_err(plugin, "%s", err);
/* Read more, if there is. */
return io_read_partial(plugin->io_rpc_conn,
plugin->rpc_buffer + plugin->rpc_used,
tal_bytelen(plugin->rpc_buffer) - plugin->rpc_used,
&plugin->rpc_len_read,
rpc_conn_read_response, plugin);
if (!toks)
break;
handle_rpc_reply(plugin, buf, toks);
jsonrpc_io_parse_done(plugin->jsonrpc_in);
}
/* Read more */
return jsonrpc_io_read(conn, plugin->jsonrpc_in,
rpc_conn_read_response,
plugin);
}
static struct io_plan *rpc_conn_write_request(struct io_conn *conn,
@@ -2416,14 +2375,9 @@ static struct plugin *new_plugin(const tal_t *ctx,
jsmn_init(&p->parser);
p->toks = toks_alloc(p);
/* Async RPC */
p->rpc_buffer = tal_arr(p, char, 64);
p->jsonrpc_in = jsonrpc_io_new(p);
list_head_init(&p->rpc_js_list);
p->io_rpc_conn = NULL;
p->rpc_used = 0;
p->rpc_read_offset = 0;
p->rpc_len_read = 0;
jsmn_init(&p->rpc_parser);
p->rpc_toks = toks_alloc(p);
p->next_outreq_id = 0;
strmap_init(&p->out_reqs);
p->beglist = NULL;