lightningd: use jsonrpc_io for plugin JSON commands.
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
#include <common/configvar.h>
|
||||
#include <common/deprecation.h>
|
||||
#include <common/json_command.h>
|
||||
#include <common/jsonrpc_io.h>
|
||||
#include <common/memleak.h>
|
||||
#include <common/plugin.h>
|
||||
#include <common/timeout.h>
|
||||
@@ -221,36 +222,36 @@ static bool request_add(const char *reqid, struct jsonrpc_request *req,
|
||||
}
|
||||
|
||||
/* FIXME: reorder */
|
||||
static const char *plugin_read_json_one(struct plugin *plugin,
|
||||
bool want_transaction,
|
||||
bool *complete,
|
||||
bool *destroyed);
|
||||
static void plugin_response_handle(struct plugin *plugin,
|
||||
const char *buffer,
|
||||
const jsmntok_t *toks,
|
||||
const jsmntok_t *idtok);
|
||||
|
||||
/* We act as if the plugin itself said "I'm dead!" */
|
||||
static void plugin_terminated_fail_req(struct plugin *plugin,
|
||||
struct jsonrpc_request *req)
|
||||
{
|
||||
bool complete, destroyed;
|
||||
const char *err;
|
||||
jsmntok_t *toks = toks_alloc(plugin);
|
||||
const jsmntok_t *idtok;
|
||||
const char *buf;
|
||||
jsmn_parser parser;
|
||||
bool complete;
|
||||
|
||||
jsmn_init(&plugin->parser);
|
||||
toks_reset(plugin->toks);
|
||||
tal_free(plugin->buffer);
|
||||
plugin->buffer = tal_fmt(plugin,
|
||||
"{\"jsonrpc\": \"2.0\","
|
||||
"\"id\": %s,"
|
||||
"\"error\":"
|
||||
" {\"code\":%i, \"message\":\"%s\"}"
|
||||
"}\n\n",
|
||||
req->id,
|
||||
PLUGIN_TERMINATED,
|
||||
"Plugin terminated before replying to RPC call.");
|
||||
plugin->used = strlen(plugin->buffer);
|
||||
|
||||
/* We're already in a transaction, don't do it again! */
|
||||
err = plugin_read_json_one(plugin, false, &complete, &destroyed);
|
||||
assert(!err);
|
||||
buf = tal_fmt(plugin,
|
||||
"{\"jsonrpc\": \"2.0\","
|
||||
"\"id\": %s,"
|
||||
"\"error\":"
|
||||
" {\"code\":%i, \"message\":\"%s\"}"
|
||||
"}\n\n",
|
||||
req->id,
|
||||
PLUGIN_TERMINATED,
|
||||
"Plugin terminated before replying to RPC call.");
|
||||
jsmn_init(&parser);
|
||||
if (!json_parse_input(&parser, &toks, buf, strlen(buf), &complete))
|
||||
abort();
|
||||
assert(complete);
|
||||
idtok = json_get_member(buf, toks, "id");
|
||||
plugin_response_handle(plugin, buf, toks, idtok);
|
||||
}
|
||||
|
||||
static void destroy_plugin(struct plugin *p)
|
||||
@@ -371,7 +372,6 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES,
|
||||
|
||||
p->plugin_state = UNCONFIGURED;
|
||||
p->js_arr = tal_arr(p, struct json_stream *, 0);
|
||||
p->used = 0;
|
||||
p->notification_topics = tal_arr(p, const char *, 0);
|
||||
p->subscriptions = NULL;
|
||||
p->dynamic = false;
|
||||
@@ -701,186 +701,112 @@ static void plugin_response_handle(struct plugin *plugin,
|
||||
tal_free(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to parse a complete message from the plugin's buffer.
|
||||
*
|
||||
* Returns NULL if there was no error.
|
||||
* If it can parse a JSON message, sets *@complete, and returns any error
|
||||
* from the callback.
|
||||
*
|
||||
* If @destroyed was set, it means the plugin called plugin stop on itself.
|
||||
*/
|
||||
static const char *plugin_read_json_one(struct plugin *plugin,
|
||||
bool want_transaction,
|
||||
bool *complete,
|
||||
bool *destroyed)
|
||||
{
|
||||
const jsmntok_t *jrtok, *idtok;
|
||||
struct plugin_destroyed *pd;
|
||||
const char *err;
|
||||
struct wallet *wallet = plugin->plugins->ld->wallet;
|
||||
|
||||
*destroyed = false;
|
||||
/* Note that in the case of 'plugin stop' this can free request (since
|
||||
* plugin is parent), so detect that case */
|
||||
|
||||
if (!json_parse_input(&plugin->parser, &plugin->toks,
|
||||
plugin->buffer, plugin->used,
|
||||
complete)) {
|
||||
return tal_fmt(plugin,
|
||||
"Failed to parse JSON response '%.*s'",
|
||||
(int)plugin->used, plugin->buffer);
|
||||
}
|
||||
|
||||
if (!*complete) {
|
||||
/* We need more. */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Empty buffer? (eg. just whitespace). */
|
||||
if (tal_count(plugin->toks) == 1) {
|
||||
plugin->used = 0;
|
||||
jsmn_init(&plugin->parser);
|
||||
toks_reset(plugin->toks);
|
||||
/* We need more. */
|
||||
*complete = false;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (plugin->toks->type != JSMN_OBJECT)
|
||||
return tal_fmt(
|
||||
plugin,
|
||||
"JSON-RPC message is not a valid JSON object type");
|
||||
|
||||
jrtok = json_get_member(plugin->buffer, plugin->toks, "jsonrpc");
|
||||
idtok = json_get_member(plugin->buffer, plugin->toks, "id");
|
||||
|
||||
if (!jrtok) {
|
||||
return tal_fmt(
|
||||
plugin,
|
||||
"JSON-RPC message does not contain \"jsonrpc\" field");
|
||||
}
|
||||
|
||||
/* We can be called extremely early, or as db hook, or for
|
||||
* fake "terminated" request. */
|
||||
if (want_transaction)
|
||||
db_begin_transaction(wallet->db);
|
||||
|
||||
pd = plugin_detect_destruction(plugin);
|
||||
if (!idtok) {
|
||||
/* A Notification is a Request object without an "id"
|
||||
* member. A Request object that is a Notification
|
||||
* signifies the Client's lack of interest in the
|
||||
* corresponding Response object, and as such no
|
||||
* Response object needs to be returned to the
|
||||
* client. The Server MUST NOT reply to a
|
||||
* Notification, including those that are within a
|
||||
* batch request.
|
||||
*
|
||||
* https://www.jsonrpc.org/specification#notification
|
||||
*/
|
||||
err = plugin_notification_handle(plugin, plugin->buffer, plugin->toks);
|
||||
|
||||
} else {
|
||||
/* When a rpc call is made, the Server MUST reply with
|
||||
* a Response, except for in the case of
|
||||
* Notifications. The Response is expressed as a
|
||||
* single JSON Object, with the following members:
|
||||
*
|
||||
* - jsonrpc: A String specifying the version of the
|
||||
* JSON-RPC protocol. MUST be exactly "2.0".
|
||||
*
|
||||
* - result: This member is REQUIRED on success. This
|
||||
* member MUST NOT exist if there was an error
|
||||
* invoking the method. The value of this member is
|
||||
* determined by the method invoked on the Server.
|
||||
*
|
||||
* - error: This member is REQUIRED on error. This
|
||||
* member MUST NOT exist if there was no error
|
||||
* triggered during invocation.
|
||||
*
|
||||
* - id: This member is REQUIRED. It MUST be the same
|
||||
* as the value of the id member in the Request
|
||||
* Object. If there was an error in detecting the id
|
||||
* in the Request object (e.g. Parse error/Invalid
|
||||
* Request), it MUST be Null. Either the result
|
||||
* member or error member MUST be included, but both
|
||||
* members MUST NOT be included.
|
||||
*
|
||||
* https://www.jsonrpc.org/specification#response_object
|
||||
*/
|
||||
plugin_response_handle(plugin, plugin->buffer, plugin->toks, idtok);
|
||||
err = NULL;
|
||||
}
|
||||
if (want_transaction)
|
||||
db_commit_transaction(wallet->db);
|
||||
|
||||
/* Corner case: rpc_command hook can destroy plugin for 'plugin
|
||||
* stop'! */
|
||||
if (was_plugin_destroyed(pd)) {
|
||||
*destroyed = true;
|
||||
} else {
|
||||
/* Move this object out of the buffer */
|
||||
memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end,
|
||||
tal_count(plugin->buffer) - plugin->toks[0].end);
|
||||
plugin->used -= plugin->toks[0].end;
|
||||
jsmn_init(&plugin->parser);
|
||||
toks_reset(plugin->toks);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Try to parse complete messages from the plugin's buffer. */
|
||||
static struct io_plan *plugin_read_json(struct io_conn *conn,
|
||||
struct plugin *plugin)
|
||||
{
|
||||
bool success;
|
||||
bool have_full;
|
||||
struct wallet *wallet = plugin->plugins->ld->wallet;
|
||||
const char *new_bytes, *buffer;
|
||||
const jsmntok_t *toks;
|
||||
size_t new_bytes_len;
|
||||
/* wallet is NULL in really early code */
|
||||
bool want_transaction = (plugin->plugins->want_db_transaction
|
||||
&& plugin->plugins->ld->wallet != NULL);
|
||||
&& wallet != NULL);
|
||||
|
||||
log_io(plugin->log, LOG_IO_IN, NULL, "",
|
||||
plugin->buffer + plugin->used, plugin->len_read);
|
||||
new_bytes = jsonrpc_newly_read(plugin->json_in, &new_bytes_len);
|
||||
if (new_bytes_len) {
|
||||
log_io(plugin->log, LOG_IO_IN, NULL, "",
|
||||
new_bytes, new_bytes_len);
|
||||
}
|
||||
|
||||
/* Our JSON parser is pretty good at incremental parsing, but
|
||||
* `getrawblock` gives a giant 2MB token, which forces it to re-parse
|
||||
* every time until we have all of it. However, we can't complete a
|
||||
* JSON object without a '}', so we do a cheaper check here.
|
||||
*/
|
||||
have_full = memchr(plugin->buffer + plugin->used, '}',
|
||||
plugin->len_read);
|
||||
/* Parse until we get incomplete JSON */
|
||||
for (;;) {
|
||||
const char *error;
|
||||
const jsmntok_t *idtok;
|
||||
struct plugin_destroyed *pd;
|
||||
|
||||
plugin->used += plugin->len_read;
|
||||
if (plugin->used == tal_count(plugin->buffer))
|
||||
tal_resize(&plugin->buffer, plugin->used * 2);
|
||||
error = jsonrpc_io_parse(tmpctx, plugin->json_in,
|
||||
&toks, &buffer);
|
||||
if (error) {
|
||||
plugin_kill(plugin, LOG_UNUSUAL, "%s", error);
|
||||
/* plugin_kill frees plugin */
|
||||
return io_close(NULL);
|
||||
}
|
||||
/* Incomplete? */
|
||||
if (!toks)
|
||||
break;
|
||||
|
||||
/* Read and process all messages from the connection */
|
||||
if (have_full) {
|
||||
do {
|
||||
bool destroyed;
|
||||
const char *err;
|
||||
idtok = json_get_member(buffer, toks, "id");
|
||||
|
||||
err = plugin_read_json_one(plugin, want_transaction,
|
||||
&success, &destroyed);
|
||||
/* We can be called extremely early, or as db hook, or for
|
||||
* fake "terminated" request. */
|
||||
if (want_transaction)
|
||||
db_begin_transaction(wallet->db);
|
||||
|
||||
/* If it's destroyed, conn is already freed! */
|
||||
if (destroyed)
|
||||
return io_close(NULL);
|
||||
pd = plugin_detect_destruction(plugin);
|
||||
if (!idtok) {
|
||||
/* A Notification is a Request object without an "id"
|
||||
* member. A Request object that is a Notification
|
||||
* signifies the Client's lack of interest in the
|
||||
* corresponding Response object, and as such no
|
||||
* Response object needs to be returned to the
|
||||
* client. The Server MUST NOT reply to a
|
||||
* Notification, including those that are within a
|
||||
* batch request.
|
||||
*
|
||||
* https://www.jsonrpc.org/specification#notification
|
||||
*/
|
||||
error = plugin_notification_handle(plugin, buffer, toks);
|
||||
} else {
|
||||
/* When a rpc call is made, the Server MUST reply with
|
||||
* a Response, except for in the case of
|
||||
* Notifications. The Response is expressed as a
|
||||
* single JSON Object, with the following members:
|
||||
*
|
||||
* - jsonrpc: A String specifying the version of the
|
||||
* JSON-RPC protocol. MUST be exactly "2.0".
|
||||
*
|
||||
* - result: This member is REQUIRED on success. This
|
||||
* member MUST NOT exist if there was an error
|
||||
* invoking the method. The value of this member is
|
||||
* determined by the method invoked on the Server.
|
||||
*
|
||||
* - error: This member is REQUIRED on error. This
|
||||
* member MUST NOT exist if there was no error
|
||||
* triggered during invocation.
|
||||
*
|
||||
* - id: This member is REQUIRED. It MUST be the same
|
||||
* as the value of the id member in the Request
|
||||
* Object. If there was an error in detecting the id
|
||||
* in the Request object (e.g. Parse error/Invalid
|
||||
* Request), it MUST be Null. Either the result
|
||||
* member or error member MUST be included, but both
|
||||
* members MUST NOT be included.
|
||||
*
|
||||
* https://www.jsonrpc.org/specification#response_object
|
||||
*/
|
||||
plugin_response_handle(plugin, buffer, toks, idtok);
|
||||
error = NULL;
|
||||
}
|
||||
if (want_transaction)
|
||||
db_commit_transaction(wallet->db);
|
||||
|
||||
if (err) {
|
||||
plugin_kill(plugin, LOG_UNUSUAL,
|
||||
"%s", err);
|
||||
/* plugin_kill frees plugin */
|
||||
return io_close(NULL);
|
||||
}
|
||||
} while (success);
|
||||
/* If it's destroyed, conn is already freed! */
|
||||
if (was_plugin_destroyed(pd))
|
||||
return io_close(NULL);
|
||||
|
||||
if (error) {
|
||||
plugin_kill(plugin, LOG_UNUSUAL, "%s", error);
|
||||
/* plugin_kill frees plugin */
|
||||
return io_close(NULL);
|
||||
}
|
||||
|
||||
jsonrpc_io_parse_done(plugin->json_in);
|
||||
}
|
||||
|
||||
/* Now read more from the connection */
|
||||
return io_read_partial(plugin->stdout_conn,
|
||||
plugin->buffer + plugin->used,
|
||||
tal_count(plugin->buffer) - plugin->used,
|
||||
&plugin->len_read, plugin_read_json, plugin);
|
||||
return jsonrpc_io_read(plugin->stdout_conn, plugin->json_in,
|
||||
plugin_read_json, plugin);
|
||||
}
|
||||
|
||||
/* Mutual recursion */
|
||||
@@ -935,6 +861,7 @@ static void plugin_conn_finish(struct io_conn *conn, struct plugin *plugin)
|
||||
struct io_plan *plugin_stdin_conn_init(struct io_conn *conn,
|
||||
struct plugin *plugin)
|
||||
{
|
||||
plugin->stdin_conn = conn;
|
||||
/* We write to their stdin */
|
||||
/* We don't have anything queued yet, wait for notification */
|
||||
return io_wait(conn, plugin, plugin_write_json, plugin);
|
||||
@@ -944,10 +871,9 @@ struct io_plan *plugin_stdout_conn_init(struct io_conn *conn,
|
||||
struct plugin *plugin)
|
||||
{
|
||||
/* We read from their stdout */
|
||||
plugin->stdout_conn = conn;
|
||||
io_set_finish(conn, plugin_conn_finish, plugin);
|
||||
return io_read_partial(conn, plugin->buffer,
|
||||
tal_bytelen(plugin->buffer), &plugin->len_read,
|
||||
plugin_read_json, plugin);
|
||||
return plugin_read_json(conn, plugin);
|
||||
}
|
||||
|
||||
static char *plugin_opt_check(struct plugin_opt *popt)
|
||||
@@ -2043,8 +1969,8 @@ static void plugin_set_timeout(struct plugin *p)
|
||||
time_from_sec(PLUGIN_STARTUP_TIMEOUT),
|
||||
plugin_manifest_timeout, p);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id)
|
||||
{
|
||||
char **cmd;
|
||||
@@ -2063,14 +1989,12 @@ const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id)
|
||||
return tal_fmt(p, "opening pipe: %s", strerror(errno));
|
||||
|
||||
log_debug(p->plugins->log, "started(%u) %s", p->pid, p->cmd);
|
||||
p->buffer = tal_arr(p, char, 64);
|
||||
jsmn_init(&p->parser);
|
||||
p->toks = toks_alloc(p);
|
||||
p->json_in = jsonrpc_io_new(p);
|
||||
|
||||
/* Create two connections, one read-only on top of p->stdout, and one
|
||||
* write-only on p->stdin */
|
||||
p->stdout_conn = io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p);
|
||||
p->stdin_conn = io_new_conn(p, stdinfd, plugin_stdin_conn_init, p);
|
||||
io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p);
|
||||
io_new_conn(p, stdinfd, plugin_stdin_conn_init, p);
|
||||
req = jsonrpc_request_start(p, "getmanifest", cmd_id,
|
||||
p->log, NULL, plugin_manifest_cb, p);
|
||||
json_add_bool(req->stream, "allow-deprecated-apis",
|
||||
|
||||
@@ -76,10 +76,7 @@ struct plugin {
|
||||
bool dynamic;
|
||||
|
||||
/* Stuff we read */
|
||||
char *buffer;
|
||||
size_t used, len_read;
|
||||
jsmn_parser parser;
|
||||
jsmntok_t *toks;
|
||||
struct jsonrpc_io *json_in;
|
||||
|
||||
/* Our json_streams. Since multiple streams could start
|
||||
* returning data at once, we always service these in order,
|
||||
|
||||
Reference in New Issue
Block a user