lightningd: don't copy hooks array into hook request, simply don't shrink it.
We make a copy, then attach a destructor to the hook in case that plugin exits, so we can NULL it out in the local copy. When we have 300,000 requests pending, this means we have 300,000 destructors, which don't scale (it's a single-linked list). Simply NULL out (rather than shrink) the array in the `plugin_hook`. Then we can keep using that. tests/test_coinmoves.py::test_generate_coinmoves (100,000, sqlite3): Time (from start to end of l2 node): 34 seconds **WAS 85** Worst latency: 24 seconds **WAS 75** Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -15,24 +15,10 @@ struct plugin_hook_request {
|
||||
struct db *db;
|
||||
struct lightningd *ld;
|
||||
|
||||
/* Where are we up to in the hooks[] array below */
|
||||
/* Where are we up to in the hook->hooks[] array */
|
||||
size_t hook_index;
|
||||
/* A snapshot taken at the start: destructors may NULL some out! */
|
||||
struct hook_instance **hooks;
|
||||
};
|
||||
|
||||
static void destroy_hook_in_ph_req(struct hook_instance *hook,
|
||||
struct plugin_hook_request *ph_req)
|
||||
{
|
||||
for (size_t i = 0; i < tal_count(ph_req->hooks); i++) {
|
||||
if (ph_req->hooks[i] == hook) {
|
||||
ph_req->hooks[i] = NULL;
|
||||
return;
|
||||
}
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
struct hook_instance {
|
||||
/* What plugin registered */
|
||||
struct plugin *plugin;
|
||||
@@ -62,13 +48,13 @@ static struct plugin_hook *plugin_hook_by_name(const char *name)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* When we destroy a plugin, we remove its hooks */
|
||||
/* When we destroy a plugin, we NULL out any hooks it registered */
|
||||
static void destroy_hook_instance(struct hook_instance *h,
|
||||
struct plugin_hook *hook)
|
||||
{
|
||||
for (size_t i = 0; i < tal_count(hook->hooks); i++) {
|
||||
if (h == hook->hooks[i]) {
|
||||
tal_arr_remove(&hook->hooks, i);
|
||||
hook->hooks[i] = NULL;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -89,9 +75,12 @@ struct plugin_hook *plugin_hook_register(struct plugin *plugin, const char *meth
|
||||
hook->hooks = notleak(tal_arr(NULL, struct hook_instance *, 0));
|
||||
|
||||
/* Ensure we don't register the same plugin multple times. */
|
||||
for (size_t i=0; i<tal_count(hook->hooks); i++)
|
||||
for (size_t i=0; i<tal_count(hook->hooks); i++) {
|
||||
if (!hook->hooks[i])
|
||||
continue;
|
||||
if (hook->hooks[i]->plugin == plugin)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Ok, we're sure they can register and they aren't yet registered, so
|
||||
* register them. */
|
||||
@@ -117,19 +106,6 @@ bool plugin_hook_continue(void *unused, const char *buffer, const jsmntok_t *tok
|
||||
return resrestok && json_tok_streq(buffer, resrestok, "continue");
|
||||
}
|
||||
|
||||
static void cleanup_ph_req(struct plugin_hook_request *ph_req)
|
||||
{
|
||||
/* We need to remove the destructors from the remaining
|
||||
* call-chain, otherwise they'd still be called when the
|
||||
* plugin dies or we shut down. */
|
||||
for (size_t i=0; i<tal_count(ph_req->hooks); i++) {
|
||||
tal_del_destructor2(ph_req->hooks[i],
|
||||
destroy_hook_in_ph_req, ph_req);
|
||||
}
|
||||
|
||||
tal_free(ph_req);
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback to be passed to the jsonrpc_request.
|
||||
*
|
||||
@@ -144,8 +120,9 @@ static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks,
|
||||
const struct hook_instance *h;
|
||||
enum jsonrpc_errcode ecode;
|
||||
|
||||
assert(ph_req->hook_index < tal_count(ph_req->hooks));
|
||||
h = ph_req->hooks[ph_req->hook_index];
|
||||
assert(ph_req->hook_index < tal_count(ph_req->hook->hooks));
|
||||
/* NULL if it vanished */
|
||||
h = ph_req->hook->hooks[ph_req->hook_index];
|
||||
|
||||
/* destructor NULLs out hooks[], but we get called first at the moment.
|
||||
* We handle either */
|
||||
@@ -173,7 +150,7 @@ static void plugin_hook_callback(const char *buffer, const jsmntok_t *toks,
|
||||
if (!ph_req->hook->deserialize_cb(ph_req->cb_arg,
|
||||
buffer, resulttok)) {
|
||||
tal_free(ph_req->cb_arg);
|
||||
cleanup_ph_req(ph_req);
|
||||
tal_free(ph_req);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
@@ -193,14 +170,14 @@ static void plugin_hook_call_next(struct plugin_hook_request *ph_req)
|
||||
/* Find next non-NULL hook: call final if we're done */
|
||||
do {
|
||||
ph_req->hook_index++;
|
||||
if (ph_req->hook_index >= tal_count(ph_req->hooks)) {
|
||||
if (ph_req->hook_index >= tal_count(hook->hooks)) {
|
||||
ph_req->hook->final_cb(ph_req->cb_arg);
|
||||
cleanup_ph_req(ph_req);
|
||||
tal_free(ph_req);
|
||||
return;
|
||||
}
|
||||
} while (ph_req->hooks[ph_req->hook_index] == NULL);
|
||||
} while (hook->hooks[ph_req->hook_index] == NULL);
|
||||
|
||||
plugin = ph_req->hooks[ph_req->hook_index]->plugin;
|
||||
plugin = hook->hooks[ph_req->hook_index]->plugin;
|
||||
log_trace(ph_req->ld->log, "Calling %s hook of plugin %s",
|
||||
ph_req->hook->name, plugin->shortname);
|
||||
req = jsonrpc_request_start(NULL, hook->name, ph_req->cmd_id,
|
||||
@@ -236,13 +213,6 @@ bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook,
|
||||
ph_req->db = ld->wallet->db;
|
||||
ph_req->ld = ld;
|
||||
ph_req->cmd_id = tal_strdup_or_null(ph_req, cmd_id);
|
||||
ph_req->hooks = tal_dup_talarr(ph_req,
|
||||
struct hook_instance *,
|
||||
hook->hooks);
|
||||
/* If hook goes away, NULL out our snapshot */
|
||||
for (size_t i=0; i<tal_count(ph_req->hooks); i++)
|
||||
tal_add_destructor2(ph_req->hooks[i],
|
||||
destroy_hook_in_ph_req, ph_req);
|
||||
ph_req->hook_index = -1;
|
||||
plugin_hook_call_next(ph_req);
|
||||
return false;
|
||||
@@ -322,31 +292,38 @@ void plugin_hook_db_sync(struct db *db)
|
||||
struct plugin **plugin_arr;
|
||||
struct plugins *plugins;
|
||||
size_t i;
|
||||
size_t num_hooks;
|
||||
|
||||
size_t num_live_hooks;
|
||||
const char **changes = db_changes(db);
|
||||
num_hooks = tal_count(hook->hooks);
|
||||
if (num_hooks == 0)
|
||||
|
||||
/* Common fast path */
|
||||
if (tal_count(hook->hooks) == 0)
|
||||
return;
|
||||
|
||||
plugin_arr = notleak(tal_arr(NULL, struct plugin *,
|
||||
num_hooks));
|
||||
for (i = 0; i < num_hooks; ++i)
|
||||
plugin_arr[i] = hook->hooks[i]->plugin;
|
||||
/* Could still have no non-NULL ones, if a plugin was removed. */
|
||||
plugin_arr = tal_arr(NULL, struct plugin *, 0);
|
||||
for (i = 0; i < tal_count(hook->hooks); ++i) {
|
||||
if (hook->hooks[i])
|
||||
tal_arr_expand(&plugin_arr, hook->hooks[i]->plugin);
|
||||
}
|
||||
num_live_hooks = tal_count(plugin_arr);
|
||||
if (num_live_hooks == 0) {
|
||||
tal_free(plugin_arr);
|
||||
return;
|
||||
}
|
||||
|
||||
plugins = plugin_arr[0]->plugins;
|
||||
ph_req = notleak(tal(hook->hooks, struct plugin_hook_request));
|
||||
ph_req->hook = hook;
|
||||
ph_req->db = db;
|
||||
ph_req->cb_arg = &num_hooks;
|
||||
ph_req->cb_arg = &num_live_hooks;
|
||||
|
||||
for (i = 0; i < num_hooks; ++i) {
|
||||
for (i = 0; i < num_live_hooks; ++i) {
|
||||
/* Create an object for this plugin. */
|
||||
struct db_write_hook_req *dwh_req;
|
||||
dwh_req = tal(ph_req, struct db_write_hook_req);
|
||||
dwh_req->plugin = plugin_arr[i];
|
||||
dwh_req->ph_req = ph_req;
|
||||
dwh_req->num_hooks = &num_hooks;
|
||||
dwh_req->num_hooks = &num_live_hooks;
|
||||
|
||||
/* FIXME: id_prefix from caller? */
|
||||
/* FIXME: do IO logging for this! */
|
||||
@@ -378,7 +355,7 @@ void plugin_hook_db_sync(struct db *db)
|
||||
log_debug(plugins->ld->log, "io_break: %s", __func__);
|
||||
io_break(ret);
|
||||
}
|
||||
assert(num_hooks == 0);
|
||||
assert(num_live_hooks == 0);
|
||||
tal_free(plugin_arr);
|
||||
tal_free(ph_req);
|
||||
}
|
||||
@@ -407,6 +384,8 @@ void plugin_hook_add_deps(struct plugin_hook *hook,
|
||||
|
||||
/* We just added this, it must exist */
|
||||
for (size_t i = 0; i < tal_count(hook->hooks); i++) {
|
||||
if (!hook->hooks[i])
|
||||
continue;
|
||||
if (hook->hooks[i]->plugin == plugin) {
|
||||
h = hook->hooks[i];
|
||||
break;
|
||||
@@ -459,14 +438,22 @@ static struct plugin **plugin_hook_make_ordered(const tal_t *ctx,
|
||||
struct hook_instance **done;
|
||||
|
||||
/* Populate graph nodes */
|
||||
graph = tal_arr(tmpctx, struct hook_node, tal_count(hook->hooks));
|
||||
for (size_t i = 0; i < tal_count(graph); i++) {
|
||||
graph[i].finished = false;
|
||||
graph[i].hook = hook->hooks[i];
|
||||
graph[i].num_incoming = 0;
|
||||
graph[i].outgoing = tal_arr(graph, struct hook_node *, 0);
|
||||
graph = tal_arr(tmpctx, struct hook_node, 0);
|
||||
for (size_t i = 0; i < tal_count(hook->hooks); i++) {
|
||||
struct hook_node hn;
|
||||
|
||||
if (!hook->hooks[i])
|
||||
continue;
|
||||
hn.finished = false;
|
||||
hn.hook = hook->hooks[i];
|
||||
hn.num_incoming = 0;
|
||||
hn.outgoing = tal_arr(graph, struct hook_node *, 0);
|
||||
tal_arr_expand(&graph, hn);
|
||||
}
|
||||
|
||||
if (tal_count(graph) == 0)
|
||||
return NULL;
|
||||
|
||||
/* Add edges. */
|
||||
for (size_t i = 0; i < tal_count(graph); i++) {
|
||||
for (size_t j = 0; j < tal_count(graph[i].hook->before); j++) {
|
||||
@@ -505,7 +492,7 @@ static struct plugin **plugin_hook_make_ordered(const tal_t *ctx,
|
||||
n->outgoing[i]->num_incoming--;
|
||||
}
|
||||
|
||||
if (tal_count(done) != tal_count(hook->hooks)) {
|
||||
if (tal_count(done) != tal_count(graph)) {
|
||||
struct plugin **ret = tal_arr(ctx, struct plugin *, 0);
|
||||
for (size_t i = 0; i < tal_count(graph); i++) {
|
||||
if (!graph[i].finished)
|
||||
@@ -514,9 +501,10 @@ static struct plugin **plugin_hook_make_ordered(const tal_t *ctx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Success! Copy ordered hooks back. */
|
||||
if (hook->hooks)
|
||||
memcpy(hook->hooks, done, tal_bytelen(hook->hooks));
|
||||
/* Success! Replace with sorted hooks. */
|
||||
tal_free(hook->hooks);
|
||||
hook->hooks = notleak(tal_steal(NULL, done));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user