From 68f6a1a1dcf34e20adc4bd16e8255a3bbe92e676 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 23 Oct 2025 14:23:03 +1030 Subject: [PATCH] common/jsonrpc_io: helper routines for reading JSON from sockets. The efficient way to do this is to use membuf, which handles the buffer control (only using memmove when necessary). We have multiple places where we opencoded this, some of which did not use membuf at all. So now we create common infrastructure. I tried making it a single function but the various users are quite different, so instead I opted for a toolbox approach. Signed-off-by: Rusty Russell --- common/Makefile | 1 + common/jsonrpc_io.c | 119 +++++++++++++ common/jsonrpc_io.h | 71 ++++++++ common/test/Makefile | 2 + common/test/run-jsonrpc_io.c | 317 +++++++++++++++++++++++++++++++++++ 5 files changed, 510 insertions(+) create mode 100644 common/jsonrpc_io.c create mode 100644 common/jsonrpc_io.h create mode 100644 common/test/run-jsonrpc_io.c diff --git a/common/Makefile b/common/Makefile index 2c407e7d8..cf203bf8f 100644 --- a/common/Makefile +++ b/common/Makefile @@ -62,6 +62,7 @@ COMMON_SRC_NOGEN := \ common/json_parse.c \ common/json_parse_simple.c \ common/json_stream.c \ + common/jsonrpc_io.c \ common/key_derive.c \ common/keyset.c \ common/lease_rates.c \ diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c new file mode 100644 index 000000000..b127b6772 --- /dev/null +++ b/common/jsonrpc_io.c @@ -0,0 +1,119 @@ +#include "config.h" + +#include +#include +#include +#include +#include + +#define READ_CHUNKSIZE 64 + +struct jsonrpc_io { + MEMBUF(char) membuf; + jsmn_parser parser; + jsmntok_t *toks; + + /* Amount just read by io_read_partial */ + size_t bytes_read; +}; + +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx) +{ + struct jsonrpc_io *json_in; + + json_in = tal(ctx, struct jsonrpc_io); + json_in->bytes_read = 0; + + membuf_init(&json_in->membuf, + tal_arr(json_in, char, READ_CHUNKSIZE), + READ_CHUNKSIZE, membuf_tal_resize); + json_in->toks = toks_alloc(json_in); + jsmn_init(&json_in->parser); + + return json_in; +} + +/* Empty new bytes read into our unparsed buffer */ +static void add_newly_read(struct jsonrpc_io *json_in) +{ + /* Now added it to our unparsed buffer */ + assert(json_in->bytes_read <= membuf_num_space(&json_in->membuf)); + membuf_added(&json_in->membuf, json_in->bytes_read); + json_in->bytes_read = 0; +} + +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, + size_t *len) +{ + *len = json_in->bytes_read; + + add_newly_read(json_in); + + return membuf_space(&json_in->membuf) - *len; +} + +const char *jsonrpc_io_parse(const tal_t *ctx, + struct jsonrpc_io *json_in, + const jsmntok_t **toks, + const char **buf) +{ + bool complete; + + /* If we're read any more, add that */ + add_newly_read(json_in); + *toks = NULL; + *buf = NULL; + + if (!json_parse_input(&json_in->parser, &json_in->toks, + membuf_elems(&json_in->membuf), + membuf_num_elems(&json_in->membuf), + &complete)) { + return tal_fmt(ctx, "Failed to parse RPC JSON response '%.*s'", + (int)membuf_num_elems(&json_in->membuf), + membuf_elems(&json_in->membuf)); + } + + if (!complete) + return NULL; + + /* Must have jsonrpc to be valid! */ + if (!json_get_member(membuf_elems(&json_in->membuf), + json_in->toks, + "jsonrpc")) { + return tal_fmt(ctx, + "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'", + (int)membuf_num_elems(&json_in->membuf), + membuf_elems(&json_in->membuf)); + } + + *toks = json_in->toks; + *buf = membuf_elems(&json_in->membuf); + return NULL; +} + +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in) +{ + size_t bytes_parsed = json_in->toks[0].end; + membuf_consume(&json_in->membuf, bytes_parsed); + + jsmn_init(&json_in->parser); + toks_reset(json_in->toks); +} + +struct io_plan *jsonrpc_io_read_(struct io_conn *conn, + struct jsonrpc_io *json_in, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg) +{ + /* Make sure there's more room */ + membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE); + + /* Try to read more. */ + json_in->bytes_read = 0; + return io_read_partial(conn, + membuf_space(&json_in->membuf), + membuf_num_space(&json_in->membuf), + &json_in->bytes_read, + next, arg); +} diff --git a/common/jsonrpc_io.h b/common/jsonrpc_io.h new file mode 100644 index 000000000..9032aecc4 --- /dev/null +++ b/common/jsonrpc_io.h @@ -0,0 +1,71 @@ +/* Low-level helper library for C plugins using ccan/io and jsonrpc socket. */ +#ifndef LIGHTNING_COMMON_JSONRPC_IO_H +#define LIGHTNING_COMMON_JSONRPC_IO_H +#include "config.h" +#include +#include +#include + +struct io_conn; +struct plugin; + +/** + * jsonrpc_io_new: allocate a fresh jsonrpc_io + */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx); + + +/** + * jsonrpc_io_read: set io_plan for reading more into buffer. + * @conn: the io_conn to read. + * @json_in: the jsonrpc_io. + * @next: the callback once a read is done. + * @arg: the argument for @next (typesafe). + */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn, + struct jsonrpc_io *json_in, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); +#define jsonrpc_io_read(ctx, json_in, next, arg) \ + jsonrpc_io_read_((ctx), (json_in), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) + +/** + * jsonrpc_newly_read: how much did we read into the buffer? + * + * Returns the buffer and sets *len to the bytes just read. After + * that it will return *len == 0. + */ +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, + size_t *len); + +/** + * jsonrpc_io_parse: try to parse more of the buffer. + * @ctx: context to allocate error message off. + * @json_in: json_in after jsonrpc_io_read. + * @toks: returned non-NULL if there's a whole valid json object. + * @buf: returned non-NULL as above. + * + * On error, a message is returned. On incomplete, *@toks and *@buf + * are NULL. Usually you call this, the use the result and call + * jsonrpc_io_parse_done(), then call it again. + */ +const char *jsonrpc_io_parse(const tal_t *ctx, + struct jsonrpc_io *json_in, + const jsmntok_t **toks, + const char **buf); + +/** + * jsonrpc_io_parse_done: call aftr using toks from jsonrpc_io_parse. + * @json_in: json_in after jsonrpc_io_parse. + * + * You must call this if jsonrpc_io_parse() sets *toks non-NULL + * (i.e. complete, and no error). + */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in); + +#endif /* LIGHTNING_COMMON_JSONRPC_IO_H */ diff --git a/common/test/Makefile b/common/test/Makefile index 5c57de91f..8ae6a4136 100644 --- a/common/test/Makefile +++ b/common/test/Makefile @@ -130,4 +130,6 @@ common/test/run-shutdown_scriptpubkey: wire/towire.o wire/fromwire.o common/test/run-wireaddr: wire/towire.o wire/fromwire.o +common/test/run-jsonrpc_io: common/json_parse_simple.o + check-units: $(COMMON_TEST_PROGRAMS:%=unittest/%) diff --git a/common/test/run-jsonrpc_io.c b/common/test/run-jsonrpc_io.c new file mode 100644 index 000000000..4d5e604bb --- /dev/null +++ b/common/test/run-jsonrpc_io.c @@ -0,0 +1,317 @@ +/* Body of tests written by ChatGPT 5 */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include + +#undef io_read_partial +#define io_read_partial io_read_partial_test + +struct jsonrpc_io; + +static struct io_plan *io_read_partial_test(struct io_conn *conn, + void *data, + size_t maxlen, + size_t *lenp, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); + + +#include "../jsonrpc_io.c" + +/* AUTOGENERATED MOCKS START */ +/* Generated stub for amount_asset_is_main */ +bool amount_asset_is_main(struct amount_asset *asset UNNEEDED) +{ fprintf(stderr, "amount_asset_is_main called!\n"); abort(); } +/* Generated stub for amount_asset_to_sat */ +struct amount_sat amount_asset_to_sat(struct amount_asset *asset UNNEEDED) +{ fprintf(stderr, "amount_asset_to_sat called!\n"); abort(); } +/* Generated stub for amount_feerate */ + bool amount_feerate(u32 *feerate UNNEEDED, struct amount_sat fee UNNEEDED, size_t weight UNNEEDED) +{ fprintf(stderr, "amount_feerate called!\n"); abort(); } +/* Generated stub for amount_sat */ +struct amount_sat amount_sat(u64 satoshis UNNEEDED) +{ fprintf(stderr, "amount_sat called!\n"); abort(); } +/* Generated stub for amount_sat_add */ + bool amount_sat_add(struct amount_sat *val UNNEEDED, + struct amount_sat a UNNEEDED, + struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_add called!\n"); abort(); } +/* Generated stub for amount_sat_eq */ +bool amount_sat_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_eq called!\n"); abort(); } +/* Generated stub for amount_sat_greater_eq */ +bool amount_sat_greater_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_greater_eq called!\n"); abort(); } +/* Generated stub for amount_sat_sub */ + bool amount_sat_sub(struct amount_sat *val UNNEEDED, + struct amount_sat a UNNEEDED, + struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_sub called!\n"); abort(); } +/* Generated stub for amount_sat_to_asset */ +struct amount_asset amount_sat_to_asset(struct amount_sat *sat UNNEEDED, const u8 *asset UNNEEDED) +{ fprintf(stderr, "amount_sat_to_asset called!\n"); abort(); } +/* Generated stub for amount_tx_fee */ +struct amount_sat amount_tx_fee(u32 fee_per_kw UNNEEDED, size_t weight UNNEEDED) +{ fprintf(stderr, "amount_tx_fee called!\n"); abort(); } +/* Generated stub for fromwire */ +const u8 *fromwire(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, void *copy UNNEEDED, size_t n UNNEEDED) +{ fprintf(stderr, "fromwire called!\n"); abort(); } +/* Generated stub for fromwire_bool */ +bool fromwire_bool(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_bool called!\n"); abort(); } +/* Generated stub for fromwire_fail */ +void *fromwire_fail(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_fail called!\n"); abort(); } +/* Generated stub for fromwire_secp256k1_ecdsa_signature */ +void fromwire_secp256k1_ecdsa_signature(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, + secp256k1_ecdsa_signature *signature UNNEEDED) +{ fprintf(stderr, "fromwire_secp256k1_ecdsa_signature called!\n"); abort(); } +/* Generated stub for fromwire_sha256 */ +void fromwire_sha256(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct sha256 *sha256 UNNEEDED) +{ fprintf(stderr, "fromwire_sha256 called!\n"); abort(); } +/* Generated stub for fromwire_tal_arrn */ +u8 *fromwire_tal_arrn(const tal_t *ctx UNNEEDED, + const u8 **cursor UNNEEDED, size_t *max UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "fromwire_tal_arrn called!\n"); abort(); } +/* Generated stub for fromwire_u32 */ +u32 fromwire_u32(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u32 called!\n"); abort(); } +/* Generated stub for fromwire_u64 */ +u64 fromwire_u64(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u64 called!\n"); abort(); } +/* Generated stub for fromwire_u8 */ +u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u8 called!\n"); abort(); } +/* Generated stub for fromwire_u8_array */ +void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "fromwire_u8_array called!\n"); abort(); } +/* Generated stub for towire */ +void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED) +{ fprintf(stderr, "towire called!\n"); abort(); } +/* Generated stub for towire_bool */ +void towire_bool(u8 **pptr UNNEEDED, bool v UNNEEDED) +{ fprintf(stderr, "towire_bool called!\n"); abort(); } +/* Generated stub for towire_secp256k1_ecdsa_signature */ +void towire_secp256k1_ecdsa_signature(u8 **pptr UNNEEDED, + const secp256k1_ecdsa_signature *signature UNNEEDED) +{ fprintf(stderr, "towire_secp256k1_ecdsa_signature called!\n"); abort(); } +/* Generated stub for towire_sha256 */ +void towire_sha256(u8 **pptr UNNEEDED, const struct sha256 *sha256 UNNEEDED) +{ fprintf(stderr, "towire_sha256 called!\n"); abort(); } +/* Generated stub for towire_u32 */ +void towire_u32(u8 **pptr UNNEEDED, u32 v UNNEEDED) +{ fprintf(stderr, "towire_u32 called!\n"); abort(); } +/* Generated stub for towire_u64 */ +void towire_u64(u8 **pptr UNNEEDED, u64 v UNNEEDED) +{ fprintf(stderr, "towire_u64 called!\n"); abort(); } +/* Generated stub for towire_u8 */ +void towire_u8(u8 **pptr UNNEEDED, u8 v UNNEEDED) +{ fprintf(stderr, "towire_u8 called!\n"); abort(); } +/* Generated stub for towire_u8_array */ +void towire_u8_array(u8 **pptr UNNEEDED, const u8 *arr UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "towire_u8_array called!\n"); abort(); } +/* AUTOGENERATED MOCKS END */ + +struct test_feed { + const char *data; + size_t len, off; + size_t max_chunk; /* 0 => no artificial limit */ + unsigned calls_to_io_read; +}; +static struct test_feed FEED; + +static void feed_set(const char *s, size_t max_chunk) +{ + FEED.data = s; + FEED.len = strlen(s); + FEED.off = 0; + FEED.max_chunk = max_chunk; + FEED.calls_to_io_read = 0; +} + +static size_t feed_next_chunk(size_t want) +{ + size_t remain = FEED.len - FEED.off; + size_t cap = (FEED.max_chunk && FEED.max_chunk < want) ? FEED.max_chunk : want; + return (remain < cap) ? remain : cap; +} + +static struct io_plan *io_read_partial_test(struct io_conn *conn, + void *data, + size_t maxlen, + size_t *lenp, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) +{ + char *out = (char *)data; + size_t n = feed_next_chunk(maxlen); + + FEED.calls_to_io_read++; + if (n) { + memcpy(out, FEED.data + FEED.off, n); + FEED.off += n; + } + *lenp = n; + + /* No more input -> end the chain */ + if (n == 0) + return NULL; + + return next(conn, arg); +} + +/* ---------- minimal “handler” to count parsed messages ---------- */ + +struct handler_ctx { + unsigned called; + char last_buf[512]; + size_t last_len; +}; + +static void record_message(const char *buf, const jsmntok_t *toks, struct handler_ctx *hc) +{ + size_t obj_len = (size_t)(toks[0].end - toks[0].start); + if (obj_len > sizeof(hc->last_buf)) obj_len = sizeof(hc->last_buf); + memcpy(hc->last_buf, buf + toks[0].start, obj_len); + hc->last_len = obj_len; + hc->called++; +} + +/* ---------- pump that drives read -> parse -> (maybe) read again ---------- */ + +struct pump_ctx { + struct jsonrpc_io *jin; + struct handler_ctx *hc; +}; + +static struct io_plan *pump_next(struct io_conn *conn, struct pump_ctx *pc) +{ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err = jsonrpc_io_parse(tmpctx, pc->jin, &toks, &buf); + + assert(!err); + if (!toks) { + /* Need more bytes */ + return jsonrpc_io_read(conn, pc->jin, pump_next, pc); + } + + /* Got a full JSON-RPC message */ + record_message(buf, toks, pc->hc); + jsonrpc_io_parse_done(pc->jin); + + /* Loop to consume any additional buffered messages + * without asking for more input yet. */ + } +} + +/* ---------- helpers ---------- */ + +static struct jsonrpc_io *mk_reader(const tal_t *ctx) +{ + return jsonrpc_io_new(ctx); +} + +static void run_once(struct jsonrpc_io *jin, struct handler_ctx *hc) +{ + struct pump_ctx pc = { .jin = jin, .hc = hc }; + jsonrpc_io_read(NULL, jin, pump_next, &pc); +} + +/* ---------- tests ---------- */ + +static size_t test_single_message_chunked(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msg = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":true}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msg, chunksize); + run_once(jin, &hc); + + assert(hc.called == 1); + assert(FEED.off == FEED.len); + assert(hc.last_len > 0 && hc.last_buf[0] == '{' && hc.last_buf[hc.last_len-1] == '}'); + + tal_free(jin); + return strlen(msg); +} + +static size_t test_two_messages_back_to_back(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msgs = + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"ping\"}\n" + "{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":42}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msgs, chunksize); + run_once(jin, &hc); + + assert(hc.called == 2); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(msgs); +} + +static size_t test_whitespace_only(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *ws = " \t \n \r\n "; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(ws, chunksize); + run_once(jin, &hc); + + assert(hc.called == 0); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(ws); +} + +static size_t test_message_then_whitespace_then_message(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msgs = + "{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":true}\n" + " \n \t" + "{\"jsonrpc\":\"2.0\",\"id\":8,\"result\":false}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msgs, chunksize); + run_once(jin, &hc); + + assert(hc.called == 2); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(msgs); +} + +/* ---------- main ---------- */ + +int main(int argc, char *argv[]) +{ + size_t max = 1; + + common_setup(argv[0]); + + for (size_t i = 0; i < max + 10; i++) { + max = max_u64(max, test_single_message_chunked(i)); + max = max_u64(max, test_two_messages_back_to_back(i)); + max = max_u64(max, test_whitespace_only(i)); + max = max_u64(max, test_message_then_whitespace_then_message(i)); + } + + common_shutdown(); + return 0; +}