#include "config.h" #include #include #include #include #include #include /* Struct containing all the information needed to deserialize and * dispatch an eventual plugin_hook response. */ struct plugin_hook_request { const char *cmd_id; struct plugin_hook *hook; void *cb_arg; /* db_hook doesn't have ld yet */ struct db *db; struct lightningd *ld; /* Only one of these can be non-NULL */ const char *strfilterfield; u64 intfilterfield; /* Where are we up to in the hook->hooks[] array */ size_t hook_index; }; struct hook_instance { /* What plugin registered */ struct plugin *plugin; /* Dependencies it asked for. */ const char **before, **after; /* Optional filter fields. */ const char **strfilters; const u64 *intfilters; }; static struct plugin_hook **get_hooks(size_t *num) { static struct plugin_hook **hooks = NULL; static size_t num_hooks; if (!hooks) hooks = autodata_get(hooks, &num_hooks); *num = num_hooks; return hooks; } static struct plugin_hook *plugin_hook_by_name(const char *name) { size_t num_hooks; struct plugin_hook **hooks = get_hooks(&num_hooks); for (size_t i=0; iname, name)) return hooks[i]; return NULL; } /* When we destroy a plugin, we NULL out any hooks it registered */ static void remove_hook_instance(const struct hook_instance *h, struct hook_instance **hookarr) { for (size_t i = 0; i < tal_count(hookarr); i++) { if (h == hookarr[i]) { hookarr[i] = NULL; return; } } abort(); } static void destroy_hook_instance(struct hook_instance *h, struct plugin_hook *hook) { /* NULL it out. */ remove_hook_instance(h, hook->hooks); /* If there's a pending set of hooks, remove ourselves there too! */ if (hook->new_hooks) remove_hook_instance(h, hook->new_hooks); } /* Filters in an array of strings */ static const char *parse_str_filters(const tal_t *ctx, const char *buffer, const jsmntok_t *filterstok, const char ***filters) { size_t i; const jsmntok_t *t; if (!filterstok) { *filters = NULL; return NULL; } if (filterstok->type != JSMN_ARRAY) return tal_fmt(ctx, "filters token must be an array"); *filters = tal_arr(ctx, const char *, filterstok->size); json_for_each_arr(i, t, filterstok) { if (t->type != JSMN_STRING) return tal_fmt(ctx, "filters must be array of strings, not '%.*s'", json_tok_full_len(t), json_tok_full(buffer, t)); (*filters)[i] = json_strdup(*filters, buffer, t); } return NULL; } /* Filters in an array of ints */ static const char *parse_int_filters(const tal_t *ctx, const char *buffer, const jsmntok_t *filterstok, u64 **filters) { size_t i; const jsmntok_t *t; if (!filterstok) { *filters = NULL; return NULL; } if (filterstok->type != JSMN_ARRAY) return tal_fmt(ctx, "filters token must be an array"); *filters = tal_arr(ctx, u64, filterstok->size); json_for_each_arr(i, t, filterstok) { if (!json_to_u64(buffer, t, &(*filters)[i])) return tal_fmt(ctx, "filters must be array of unsigned integers, not '%.*s'", json_tok_full_len(t), json_tok_full(buffer, t)); } return NULL; } const char *plugin_hook_register(struct plugin *plugin, const char *method, const char *buf, const jsmntok_t *filterstok, struct plugin_hook **plugin_hook) { struct hook_instance *h; struct plugin_hook *hook; const char *err; const char **strfilters; u64 *intfilters; hook = plugin_hook_by_name(method); if (!hook) return tal_fmt(plugin, "Unknown hook name %s", method); switch (hook->filter_type) { case JSMN_UNDEFINED: if (filterstok) return tal_fmt(plugin, "Hook %s does not allow filters", method); intfilters = NULL; strfilters = NULL; break; case JSMN_PRIMITIVE: strfilters = NULL; err = parse_int_filters(plugin, buf, filterstok, &intfilters); if (err) return err; break; case JSMN_STRING: intfilters = NULL; err = parse_str_filters(plugin, buf, filterstok, &strfilters); if (err) return err; break; /* Nothing else is valid (yet?) */ default: abort(); } /* Make sure the hook_elements array is initialized. */ if (hook->hooks == NULL) { hook->hooks = notleak(tal_arr(NULL, struct hook_instance *, 0)); hook->new_hooks = NULL; hook->num_users = 0; } /* Ensure we don't register the same plugin multple times. */ for (size_t i=0; ihooks); i++) { if (!hook->hooks[i]) continue; if (hook->hooks[i]->plugin == plugin) return tal_fmt(plugin, "Registered for hook %s multiple times", method); } /* Ok, we're sure they can register and they aren't yet registered, so * register them. */ h = tal(plugin, struct hook_instance); h->plugin = plugin; h->before = tal_arr(h, const char *, 0); h->after = tal_arr(h, const char *, 0); h->strfilters = tal_steal(h, strfilters); h->intfilters = tal_steal(h, intfilters); tal_add_destructor2(h, destroy_hook_instance, hook); tal_arr_expand(&hook->hooks, h); *plugin_hook = hook; return NULL; } /* Mutual recursion */ static bool plugin_hook_call_next(struct plugin_hook_request *ph_req); static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok, struct plugin_hook_request *r); bool plugin_hook_continue(void *unused, const char *buffer, const jsmntok_t *toks) { const jsmntok_t *resrestok = json_get_member(buffer, toks, "result"); return resrestok && json_tok_streq(buffer, resrestok, "continue"); } static void hook_start(struct plugin_hook *hook) { hook->num_users++; } static void hook_done(struct lightningd *ld, struct plugin_hook *hook, void *cb_arg) { /* If we're the last one out, we can update hooks */ if (--hook->num_users == 0) { if (hook->new_hooks) { log_unusual(ld->log, "Updating hooks for %s now usage is done.", hook->name); /* Free this later (after final_cb) if not already done */ tal_steal(tmpctx, hook->hooks); hook->hooks = hook->new_hooks; hook->new_hooks = NULL; } } hook->final_cb(cb_arg); } /** * Callback to be passed to the jsonrpc_request. * * Unbundles the arguments, deserializes the response and dispatches * it to the hook callback. */ static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok, struct plugin_hook_request *ph_req) { const jsmntok_t *resulttok; const struct hook_instance *h; enum jsonrpc_errcode ecode; assert(ph_req->hook_index < tal_count(ph_req->hook->hooks)); /* NULL if it vanished */ h = ph_req->hook->hooks[ph_req->hook_index]; /* destructor NULLs out hooks[], but we get called first at the moment. * We handle either */ ecode = 0; json_scan(tmpctx, buffer, toks, "{error:{code:%}}", JSON_SCAN(json_to_jsonrpc_errcode, &ecode)); if (ecode == PLUGIN_TERMINATED) h = NULL; /* We really only handle plugins dying: other errors are fatal. */ if (h) { log_trace(ph_req->ld->log, "Plugin %s returned from %s hook call", h->plugin->shortname, ph_req->hook->name); resulttok = json_get_member(buffer, toks, "result"); if (!resulttok) fatal("Plugin %s for %s returned non-result response %.*s", h->plugin->shortname, ph_req->hook->name, toks->end - toks->start, buffer + toks->start); dev_save_plugin_io_in(h->plugin->plugins, "hook_in", ph_req->hook->name, buffer, toks); if (!ph_req->hook->deserialize_cb(ph_req->cb_arg, buffer, resulttok)) { tal_free(ph_req->cb_arg); tal_free(ph_req); return; } } else { log_debug(ph_req->ld->log, "Plugin died from %s hook call", ph_req->hook->name); } plugin_hook_call_next(ph_req); } static bool hook_callable(const struct hook_instance *hook, const char *strfilterfield, u64 intfilterfield) { /* NULL? Skip */ if (!hook) return false; /* String filters? If there are some we must match one. */ if (hook->strfilters) { for (size_t i = 0; i < tal_count(hook->strfilters); i++) { if (streq(strfilterfield, hook->strfilters[i])) return true; } return false; } /* Integer filters? */ if (hook->intfilters) { for (size_t i = 0; i < tal_count(hook->intfilters); i++) { if (intfilterfield == hook->intfilters[i]) return true; } return false; } /* No filters: always call. */ return true; } /* Returns true if it finished all the hooks (and thus didn't call anything) */ static bool plugin_hook_call_next(struct plugin_hook_request *ph_req) { struct jsonrpc_request *req; const struct plugin_hook *hook = ph_req->hook; struct plugin *plugin; /* Find next non-NULL hook: call final if we're done */ do { ph_req->hook_index++; if (ph_req->hook_index >= tal_count(hook->hooks)) { hook_done(ph_req->ld, ph_req->hook, ph_req->cb_arg); tal_free(ph_req); return true; } } while (!hook_callable(hook->hooks[ph_req->hook_index], ph_req->strfilterfield, ph_req->intfilterfield)); plugin = hook->hooks[ph_req->hook_index]->plugin; log_trace(ph_req->ld->log, "Calling %s hook of plugin %s", ph_req->hook->name, plugin->shortname); req = jsonrpc_request_start(NULL, hook->name, ph_req->cmd_id, plugin_get_logger(plugin), NULL, plugin_hook_callback, ph_req); hook->serialize_payload(ph_req->cb_arg, req->stream, plugin); jsonrpc_request_end(req); dev_save_plugin_io_out(plugin->plugins, "hook_out", hook->name, req->stream); plugin_request_send(plugin, req); return false; } bool plugin_hook_call_(struct lightningd *ld, struct plugin_hook *hook, const char *strfilterfield TAKES, u64 intfilterfield, const char *cmd_id TAKES, tal_t *cb_arg STEALS) { hook_start(hook); if (tal_count(hook->hooks)) { /* If we have a plugin that has registered for this * hook, serialize and call it */ /* FIXME: technically this is a leak, but we don't * currently have a list to store these. We might want * to eventually to inspect in-flight requests. */ struct plugin_hook_request *ph_req; ph_req = notleak(tal(hook->hooks, struct plugin_hook_request)); ph_req->hook = hook; ph_req->cb_arg = tal_steal(ph_req, cb_arg); ph_req->db = ld->wallet->db; ph_req->ld = ld; ph_req->cmd_id = tal_strdup_or_null(ph_req, cmd_id); ph_req->hook_index = -1; ph_req->strfilterfield = tal_strdup_or_null(ph_req, strfilterfield); ph_req->intfilterfield = intfilterfield; return plugin_hook_call_next(ph_req); } else { /* If no plugin has registered for this hook, just * call the callback with a NULL result. Saves us the * roundtrip to the serializer and deserializer. If we * were expecting a default response it should have * been part of the `cb_arg`. */ hook_done(ld, hook, cb_arg); return true; } } /* We open-code this, because it's just different and special enough to be * annoying, and to make it clear that it's totally synchronous. */ /* Special synchronous hook for db */ static struct plugin_hook db_write_hook = {"db_write", NULL, NULL}; AUTODATA(hooks, &db_write_hook); /* A `db_write` for one particular plugin hook. */ struct db_write_hook_req { struct plugin *plugin; struct plugin_hook_request *ph_req; size_t *num_hooks; }; static void db_hook_response(const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok, struct db_write_hook_req *dwh_req) { const jsmntok_t *resulttok; resulttok = json_get_member(buffer, toks, "result"); if (!resulttok) fatal("Plugin '%s' returned an invalid response to the " "db_write hook: %.*s", dwh_req->plugin->cmd, json_tok_full_len(toks), json_tok_full(buffer, toks)); /* We expect result: { 'result' : 'continue' }. * Anything else we abort. */ resulttok = json_get_member(buffer, resulttok, "result"); if (resulttok) { if (!json_tok_streq(buffer, resulttok, "continue")) fatal("Plugin '%s' returned failed db_write: %.*s.", dwh_req->plugin->cmd, json_tok_full_len(toks), json_tok_full(buffer, toks)); } else fatal("Plugin '%s' returned an invalid result to the db_write " "hook: %.*s", dwh_req->plugin->cmd, json_tok_full_len(toks), json_tok_full(buffer, toks)); assert((*dwh_req->num_hooks) != 0); --(*dwh_req->num_hooks); /* If there are other runners, do not exit yet. */ if ((*dwh_req->num_hooks) != 0) return; /* We're done, exit exclusive loop. */ log_debug(dwh_req->plugin->plugins->ld->log, "io_break: %s", __func__); io_break(dwh_req->ph_req); } void plugin_hook_db_sync(struct db *db) { struct plugin_hook *hook = &db_write_hook; struct jsonrpc_request *req; struct plugin_hook_request *ph_req; void *ret; struct plugin **plugin_arr; struct plugins *plugins; size_t i; size_t num_live_hooks; const char **changes = db_changes(db); /* Common fast path */ if (tal_count(hook->hooks) == 0) return; /* Could still have no non-NULL ones, if a plugin was removed. */ plugin_arr = tal_arr(NULL, struct plugin *, 0); for (i = 0; i < tal_count(hook->hooks); ++i) { if (hook->hooks[i]) tal_arr_expand(&plugin_arr, hook->hooks[i]->plugin); } num_live_hooks = tal_count(plugin_arr); if (num_live_hooks == 0) { tal_free(plugin_arr); return; } plugins = plugin_arr[0]->plugins; ph_req = notleak(tal(hook->hooks, struct plugin_hook_request)); ph_req->hook = hook; ph_req->db = db; ph_req->cb_arg = &num_live_hooks; for (i = 0; i < num_live_hooks; ++i) { /* Create an object for this plugin. */ struct db_write_hook_req *dwh_req; dwh_req = tal(ph_req, struct db_write_hook_req); dwh_req->plugin = plugin_arr[i]; dwh_req->ph_req = ph_req; dwh_req->num_hooks = &num_live_hooks; /* FIXME: id_prefix from caller? */ /* FIXME: do IO logging for this! */ req = jsonrpc_request_start(NULL, hook->name, NULL, NULL, NULL, db_hook_response, dwh_req); json_add_num(req->stream, "data_version", db_data_version_get(db)); json_array_start(req->stream, "writes"); for (size_t j = 0; j < tal_count(changes); j++) json_add_string(req->stream, NULL, changes[j]); json_array_end(req->stream); jsonrpc_request_end(req); plugin_request_send(plugin_arr[i], req); } ret = plugins_exclusive_loop(plugin_arr); /* We can be called on way out of an io_loop, which is already breaking. * That will make this immediately return; save the break value and call * again, then hand it onwards. */ if (ret != ph_req) { void *ret2 = plugins_exclusive_loop(plugin_arr); assert(ret2 == ph_req); log_debug(plugins->ld->log, "io_break: %s", __func__); io_break(ret); } assert(num_live_hooks == 0); tal_free(plugin_arr); tal_free(ph_req); } static void add_deps(const char ***arr, const char *buffer, const jsmntok_t *arrtok) { const jsmntok_t *t; size_t i; if (!arrtok) return; json_for_each_arr(i, t, arrtok) tal_arr_expand(arr, json_strdup(*arr, buffer, t)); } void plugin_hook_add_deps(struct plugin_hook *hook, struct plugin *plugin, const char *buffer, const jsmntok_t *before, const jsmntok_t *after) { struct hook_instance *h = NULL; /* We just added this, it must exist */ for (size_t i = 0; i < tal_count(hook->hooks); i++) { if (!hook->hooks[i]) continue; if (hook->hooks[i]->plugin == plugin) { h = hook->hooks[i]; break; } } assert(h); add_deps(&h->before, buffer, before); add_deps(&h->after, buffer, after); } struct hook_node { /* Is this copied into the ordered array yet? */ bool finished; struct hook_instance *hook; size_t num_incoming; struct hook_node **outgoing; }; static struct hook_node *find_hook(struct hook_node *graph, const char *name) { for (size_t i = 0; i < tal_count(graph); i++) { if (plugin_paths_match(graph[i].hook->plugin->cmd, name)) return graph + i; } return NULL; } /* Sometimes naive is best. */ static struct hook_node *get_best_candidate(struct hook_node *graph) { struct hook_node *best = NULL; for (size_t i = 0; i < tal_count(graph); i++) { if (graph[i].finished) continue; if (graph[i].num_incoming != 0) continue; if (!best || best->hook->plugin->index > graph[i].hook->plugin->index) best = &graph[i]; } return best; } static struct plugin **plugin_hook_make_ordered(const tal_t *ctx, struct logger *log, struct plugin_hook *hook) { struct hook_node *graph, *n; struct hook_instance **done; /* Populate graph nodes */ graph = tal_arr(tmpctx, struct hook_node, 0); for (size_t i = 0; i < tal_count(hook->hooks); i++) { struct hook_node hn; if (!hook->hooks[i]) continue; hn.finished = false; hn.hook = hook->hooks[i]; hn.num_incoming = 0; hn.outgoing = tal_arr(graph, struct hook_node *, 0); tal_arr_expand(&graph, hn); } if (tal_count(graph) == 0) return NULL; /* Add edges. */ for (size_t i = 0; i < tal_count(graph); i++) { for (size_t j = 0; j < tal_count(graph[i].hook->before); j++) { n = find_hook(graph, graph[i].hook->before[j]); if (!n) { /* This is useful for typos! */ log_debug(graph[i].hook->plugin->log, "hook %s before unknown plugin %s", hook->name, graph[i].hook->before[j]); continue; } tal_arr_expand(&graph[i].outgoing, n); n->num_incoming++; } for (size_t j = 0; j < tal_count(graph[i].hook->after); j++) { n = find_hook(graph, graph[i].hook->after[j]); if (!n) { /* This is useful for typos! */ log_debug(graph[i].hook->plugin->log, "hook %s after unknown plugin %s", hook->name, graph[i].hook->after[j]); continue; } tal_arr_expand(&n->outgoing, &graph[i]); graph[i].num_incoming++; } } done = tal_arr(tmpctx, struct hook_instance *, 0); while ((n = get_best_candidate(graph)) != NULL) { tal_arr_expand(&done, n->hook); n->finished = true; for (size_t i = 0; i < tal_count(n->outgoing); i++) n->outgoing[i]->num_incoming--; } if (tal_count(done) != tal_count(graph)) { struct plugin **ret = tal_arr(ctx, struct plugin *, 0); for (size_t i = 0; i < tal_count(graph); i++) { if (!graph[i].finished) tal_arr_expand(&ret, graph[i].hook->plugin); } return ret; } /* If we had previous update pending, this subsumes it */ tal_free(hook->new_hooks); hook->new_hooks = notleak(tal_steal(NULL, done)); /* If nobody is using it now, we can just replace the hooks array. * Otherwise defer. */ if (hook->num_users == 0) { tal_free(hook->hooks); hook->hooks = hook->new_hooks; hook->new_hooks = NULL; } else /* If this ever live locks, we will see this in the log! */ log_unusual(log, "Deferring registration of hook %s until it's not in use.", hook->name); return NULL; } /* Plugins could fail due to multiple hooks, but only add once. */ static void append_plugin_once(struct plugin ***ret, struct plugin *p) { for (size_t i = 0; i < tal_count(*ret); i++) { if ((*ret)[i] == p) return; } tal_arr_expand(ret, p); } struct plugin **plugin_hooks_make_ordered(const tal_t *ctx, struct logger *log) { size_t num_hooks; struct plugin_hook **hooks = get_hooks(&num_hooks); struct plugin **ret = tal_arr(ctx, struct plugin *, 0); for (size_t i=0; i