libplugin: use jsonrpc_io logic for sync requests too.

It's a little overkill, but it's clear.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2025-10-23 14:23:05 +10:30
parent 4958cd3289
commit bc4bb2b0ef
3 changed files with 108 additions and 108 deletions

View File

@@ -5,6 +5,8 @@
#include <ccan/tal/str/str.h>
#include <common/jsonrpc_io.h>
#include <common/utils.h>
#include <errno.h>
#include <unistd.h>
#define READ_CHUNKSIZE 64
@@ -126,3 +128,24 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn,
&json_in->bytes_read,
next, arg);
}
bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd)
{
int r;
/* Make sure there's more room */
membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE);
/* Try to read more. */
r = read(infd,
membuf_space(&json_in->membuf),
membuf_num_space(&json_in->membuf));
if (r < 0)
return false;
if (r == 0) {
errno = 0;
return false;
}
json_in->bytes_read = r;
return true;
}

View File

@@ -43,6 +43,15 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn,
const char *jsonrpc_newly_read(struct jsonrpc_io *json_in,
size_t *len);
/**
* jsonrpc_sync_read: read from fd into buffer.
* @json_in: buffer to read into.
* @infd: file descriptort to read.
*
* Returns false on error or EOF; for EOF errno will be 0.
*/
bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd);
/**
* jsonrpc_io_parse: try to parse more of the buffer.
* @ctx: context to allocate error message off.

View File

@@ -1,7 +1,6 @@
#include "config.h"
#include <ccan/io/io.h>
#include <ccan/json_out/json_out.h>
#include <ccan/membuf/membuf.h>
#include <ccan/read_write_all/read_write_all.h>
#include <ccan/tal/path/path.h>
#include <ccan/tal/str/str.h>
@@ -31,11 +30,6 @@ struct plugin_timer {
void *cb_arg;
};
struct rpc_conn {
int fd;
MEMBUF(char) mb;
};
/* We can have more than one of these pending at once. */
struct jstream {
struct list_node list;
@@ -93,7 +87,7 @@ struct plugin {
/* To write to lightningd */
struct list_head js_list;
/* Asynchronous RPC interaction */
/* Asynchronous RPC interaction. */
struct io_conn *io_rpc_conn;
struct list_head rpc_js_list;
struct jsonrpc_io *jsonrpc_in;
@@ -102,8 +96,9 @@ struct plugin {
STRMAP(struct out_req *) out_reqs;
u64 next_outreq_id;
/* Synchronous RPC interaction */
struct rpc_conn *rpc_conn;
/* Synchronous RPC interaction: sync_io is NULL if they didn't want it. */
int sync_fd;
struct jsonrpc_io *sync_io;
/* Plugin information details */
enum plugin_restartability restartability;
@@ -534,32 +529,6 @@ struct json_out *json_out_obj(const tal_t *ctx,
return jout;
}
static int read_json_from_rpc(struct plugin *p)
{
char *end;
/* We rely on the double-\n marker which only terminates JSON top
* levels. Thanks lightningd! */
while ((end = memmem(membuf_elems(&p->rpc_conn->mb),
membuf_num_elems(&p->rpc_conn->mb), "\n\n", 2))
== NULL) {
ssize_t r;
/* Make sure we've room for at least READ_CHUNKSIZE. */
membuf_prepare_space(&p->rpc_conn->mb, READ_CHUNKSIZE);
r = read(p->rpc_conn->fd, membuf_space(&p->rpc_conn->mb),
membuf_num_space(&p->rpc_conn->mb));
/* lightningd goes away, we go away. */
if (r == 0)
exit(0);
if (r < 0)
plugin_err(p, "Reading JSON input: %s", strerror(errno));
membuf_added(&p->rpc_conn->mb, r);
}
return end + 2 - membuf_elems(&p->rpc_conn->mb);
}
/* This closes a JSON response and writes it out. */
static void finish_and_send_json(int fd, struct json_out *jout)
{
@@ -719,40 +688,63 @@ void command_set_usage(struct command *cmd, const char *usage TAKES)
cmd->methodname);
}
/* Reads rpc reply and returns tokens, setting contents to 'error' or
- * 'result' (depending on *error). */
static jsmntok_t *read_rpc_reply(const tal_t *ctx,
struct plugin *plugin,
const jsmntok_t **contents,
bool *error,
int *reqlen)
static const char *read_one_json_sync(struct plugin *p, const jsmntok_t **toks)
{
jsmntok_t *toks;
for (;;) {
const char *buf, *error;
do {
*reqlen = read_json_from_rpc(plugin);
error = jsonrpc_io_parse(tmpctx, p->sync_io, toks, &buf);
if (error)
plugin_err(p, "Parsing sync lightningd: %s", error);
if (*toks)
return buf;
toks = json_parse_simple(ctx,
membuf_elems(&plugin->rpc_conn->mb),
*reqlen);
if (!toks)
plugin_err(plugin, "Malformed JSON reply '%.*s'",
*reqlen, membuf_elems(&plugin->rpc_conn->mb));
/* FIXME: Don't simply ignore notifications here! */
} while (!json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks,
"id"));
*contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, "error");
if (*contents)
*error = true;
else {
*contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks,
"result");
if (!*contents)
plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'",
*reqlen, membuf_elems(&plugin->rpc_conn->mb));
*error = false;
/* lightningd goes away, we go away. */
if (!jsonrpc_sync_read(p->sync_io, p->sync_fd)) {
if (errno == 0)
exit(0);
else
plugin_err(p, "Reading sync lightningd: %s",
strerror(errno));
}
}
}
/* Reads rpc reply and returns result tokens */
static const jsmntok_t *read_sync_rpc_reply(const tal_t *ctx,
struct plugin *plugin,
const char *method,
const char **final_buffer)
{
const jsmntok_t *errtok, *resulttok, *toks;
const char *buffer;
for (;;) {
buffer = read_one_json_sync(plugin, &toks);
/* FIXME: Don't simply ignore notifications here! */
if (json_get_member(buffer, toks, "id"))
break;
jsonrpc_io_parse_done(plugin->sync_io);
}
errtok = json_get_member(buffer, toks, "error");
if (errtok) {
plugin_err(plugin, "Got error result to %s: '%.*s'",
method,
json_tok_full_len(toks),
json_tok_full(buffer, toks));
}
resulttok = json_get_member(buffer, toks, "result");
if (!resulttok) {
plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'",
json_tok_full_len(toks),
json_tok_full(buffer, toks));
}
/* Make the returned pointers valid tal object */
json_dup_contents(ctx, buffer, resulttok, final_buffer, &toks);
jsonrpc_io_parse_done(plugin->sync_io);
return toks;
}
@@ -763,13 +755,8 @@ static const jsmntok_t *sync_req(const tal_t *ctx,
const struct json_out *params TAKES,
const char **resp)
{
bool error;
jsmntok_t *toks;
const jsmntok_t *contents;
int reqlen;
struct json_out *jout = json_out_new(tmpctx);
const char *id = json_id(tmpctx, plugin, "init/", method);
size_t num_toks;
json_out_start(jout, NULL, '{');
json_out_addstr(jout, "jsonrpc", "2.0");
@@ -787,23 +774,15 @@ static const jsmntok_t *sync_req(const tal_t *ctx,
/* If we're past init, we may need a new fd (the old one
* is being used for async comms). */
if (plugin->rpc_conn->fd == -1)
plugin->rpc_conn->fd = rpc_open(plugin);
if (plugin->sync_fd == -1) {
plugin->sync_fd = rpc_open(plugin);
if (!plugin->sync_io)
plugin->sync_io = jsonrpc_io_new(plugin);
}
finish_and_send_json(plugin->rpc_conn->fd, jout);
finish_and_send_json(plugin->sync_fd, jout);
toks = read_rpc_reply(ctx, plugin, &contents, &error, &reqlen);
if (error)
plugin_err(plugin, "Got error reply to %s: '%.*s'",
method, reqlen, membuf_elems(&plugin->rpc_conn->mb));
*resp = membuf_consume(&plugin->rpc_conn->mb, reqlen);
/* Make the returned pointer the valid tal object of minimal length */
num_toks = json_next(contents) - contents;
memmove(toks, contents, num_toks * sizeof(*toks));
tal_resize(&toks, num_toks);
return toks;
return read_sync_rpc_reply(ctx, plugin, method, resp);
}
const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx,
@@ -812,7 +791,6 @@ const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx,
const struct json_out *params TAKES,
const char **resp)
{
return sync_req(ctx, cmd->plugin, method, params, resp);
}
@@ -1508,7 +1486,6 @@ static struct command_result *handle_init(struct command *cmd,
size_t i;
char *dir, *network;
struct plugin *p = cmd->plugin;
bool with_rpc;
const char *err;
configtok = json_get_member(buf, params, "configuration");
@@ -1535,17 +1512,10 @@ static struct command_result *handle_init(struct command *cmd,
/* Only attempt to connect if the plugin has configured the rpc_conn
* already, if that's not the case we were told to run without an RPC
* connection, so don't even log an error. */
if (p->rpc_conn != NULL) {
p->rpc_conn->fd = rpc_open(p);
if (p->rpc_conn->fd == -1)
with_rpc = false;
else
with_rpc = true;
membuf_init(&p->rpc_conn->mb, tal_arr(p, char, READ_CHUNKSIZE),
READ_CHUNKSIZE, membuf_tal_resize);
} else
with_rpc = false;
if (p->sync_io)
p->sync_fd = rpc_open(p);
else
p->sync_fd = -1;
opttok = json_get_member(buf, params, "options");
json_for_each_obj(i, t, opttok) {
@@ -1569,11 +1539,12 @@ static struct command_result *handle_init(struct command *cmd,
disable));
}
if (with_rpc) {
/* Now set up async. */
if (p->sync_fd != -1) {
struct out_req *req;
struct command *aux_cmd = aux_command(cmd);
io_new_conn(p, p->rpc_conn->fd, rpc_conn_init, p);
io_new_conn(p, p->sync_fd, rpc_conn_init, p);
/* In case they intercept rpc_command, we can't do this sync. */
req = jsonrpc_request_start(aux_cmd, "listconfigs",
get_beglist, plugin_broken_cb, NULL);
@@ -1581,7 +1552,7 @@ static struct command_result *handle_init(struct command *cmd,
send_outreq(req);
/* We will open a new one if we want to be sync. */
p->rpc_conn->fd = -1;
p->sync_fd = -1;
}
return command_success(cmd, json_out_obj(cmd, NULL, NULL));
@@ -2341,13 +2312,10 @@ static struct plugin *new_plugin(const tal_t *ctx,
p->beglist = NULL;
p->desired_features = tal_steal(p, features);
if (init_rpc) {
/* Sync RPC FIXME: maybe go full async ? */
p->rpc_conn = tal(p, struct rpc_conn);
} else {
p->rpc_conn = NULL;
}
if (init_rpc)
p->sync_io = jsonrpc_io_new(p);
else
p->sync_io = NULL;
p->init = init;
p->manifested = p->initialized = p->exiting = false;
p->restartability = restartability;