diff --git a/gossipd/gossipd.c b/gossipd/gossipd.c index 5cb2541c6..6c0d07396 100644 --- a/gossipd/gossipd.c +++ b/gossipd/gossipd.c @@ -542,6 +542,12 @@ static struct io_plan *recv_req(struct io_conn *conn, goto done; } /* fall thru */ + case WIRE_GOSSIPD_DEV_COMPACT_STORE: + if (daemon->developer) { + gossmap_manage_handle_dev_compact_store(daemon->gm, msg); + goto done; + } + /* fall thru */ /* We send these, we don't receive them */ case WIRE_GOSSIPD_INIT_CUPDATE: @@ -549,6 +555,7 @@ static struct io_plan *recv_req(struct io_conn *conn, case WIRE_GOSSIPD_INIT_REPLY: case WIRE_GOSSIPD_GET_TXOUT: case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY: + case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY: case WIRE_GOSSIPD_ADDGOSSIP_REPLY: case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY: case WIRE_GOSSIPD_REMOTE_CHANNEL_UPDATE: diff --git a/gossipd/gossipd_wire.csv b/gossipd/gossipd_wire.csv index 79de7512d..8d407aa75 100644 --- a/gossipd/gossipd_wire.csv +++ b/gossipd/gossipd_wire.csv @@ -50,6 +50,13 @@ msgtype,gossipd_dev_memleak,3033 msgtype,gossipd_dev_memleak_reply,3133 msgdata,gossipd_dev_memleak_reply,leak,bool, +# master -> gossipd: please rewrite the gossip_store +msgtype,gossipd_dev_compact_store,3034 + +# gossipd -> master: empty string means no problem. +msgtype,gossipd_dev_compact_store_reply,3134 +msgdata,gossipd_dev_compact_store_reply,result,wirestring, + # master -> gossipd: blockheight increased. msgtype,gossipd_new_blockheight,3026 msgdata,gossipd_new_blockheight,blockheight,u32, diff --git a/gossipd/gossmap_manage.c b/gossipd/gossmap_manage.c index ed11bafff..385cd66db 100644 --- a/gossipd/gossmap_manage.c +++ b/gossipd/gossmap_manage.c @@ -70,6 +70,7 @@ struct cannounce_map { struct compactd { struct io_conn *in_conn; u64 old_size; + bool dev_compact; u8 ignored; int outfd; pid_t pid; @@ -1582,6 +1583,24 @@ bool gossmap_manage_populated(const struct gossmap_manage *gm) return gm->gossip_store_populated; } +static void compactd_broken(const struct gossmap_manage *gm, + const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + status_vfmt(LOG_BROKEN, NULL, fmt, ap); + va_end(ap); + + if (gm->compactd->dev_compact) { + va_start(ap, fmt); + daemon_conn_send(gm->daemon->master, + take(towire_gossipd_dev_compact_store_reply(NULL, + tal_vfmt(tmpctx, fmt, ap)))); + va_end(ap); + } +} + static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm) { int status; @@ -1594,18 +1613,18 @@ static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm) strerror(errno)); if (!WIFEXITED(status)) { - status_broken("compactd failed with signal %u", - WTERMSIG(status)); + compactd_broken(gm, "compactd failed with signal %u", + WTERMSIG(status)); goto failed; } if (WEXITSTATUS(status) != 0) { - status_broken("compactd exited with status %u", + compactd_broken(gm, "compactd exited with status %u", WEXITSTATUS(status)); goto failed; } if (stat(GOSSIP_STORE_COMPACT_FILENAME, &st) != 0) { - status_broken("compactd did not create file? %s", + compactd_broken(gm, "compactd did not create file? %s", strerror(errno)); goto failed; } @@ -1639,6 +1658,9 @@ static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm) towire_gossip_store_ended(tmpctx, st.st_size, gm->compactd->uuid), 0); gossip_store_reopen(gm->gs); + if (gm->compactd->dev_compact) + daemon_conn_send(gm->daemon->master, + take(towire_gossipd_dev_compact_store_reply(NULL, ""))); failed: gm->compactd = tal_free(gm->compactd); @@ -1666,7 +1688,7 @@ static struct io_plan *init_compactd_conn_in(struct io_conn *conn, compactd_read_done, gm); } /* Returns false if already running */ -static bool gossmap_compact(struct gossmap_manage *gm) +static bool gossmap_compact(struct gossmap_manage *gm, bool dev_compact) { int childin[2], execfail[2], childout[2]; int saved_errno; @@ -1753,6 +1775,7 @@ static bool gossmap_compact(struct gossmap_manage *gm) } close(execfail[0]); + gm->compactd->dev_compact = dev_compact; gm->compactd->outfd = childout[1]; gm->compactd->in_conn = io_new_conn(gm->compactd, childin[0], init_compactd_conn_in, gm); @@ -1776,7 +1799,7 @@ void gossmap_manage_maybe_compact(struct gossmap_manage *gm) if (num_dead < 4 * num_live) return; - compact_started = gossmap_compact(gm); + compact_started = gossmap_compact(gm, false); status_debug("%s gossmap compaction:" " %"PRIu64" with" " %"PRIu64" live records and %"PRIu64" dead records", @@ -1784,3 +1807,15 @@ void gossmap_manage_maybe_compact(struct gossmap_manage *gm) gossip_store_len_written(gm->gs), num_live, num_dead); } + +void gossmap_manage_handle_dev_compact_store(struct gossmap_manage *gm, const u8 *msg) +{ + if (!fromwire_gossipd_dev_compact_store(msg)) + master_badmsg(WIRE_GOSSIPD_DEV_COMPACT_STORE, msg); + + if (!gossmap_compact(gm, true)) + daemon_conn_send(gm->daemon->master, + take(towire_gossipd_dev_compact_store_reply(NULL, + "Already compacting"))); +} + diff --git a/gossipd/gossmap_manage.h b/gossipd/gossmap_manage.h index c13572271..464c61f4c 100644 --- a/gossipd/gossmap_manage.h +++ b/gossipd/gossmap_manage.h @@ -118,6 +118,10 @@ bool gossmap_manage_populated(const struct gossmap_manage *gm); */ void gossmap_manage_maybe_compact(struct gossmap_manage *gm); +/* For testing */ +void gossmap_manage_handle_dev_compact_store(struct gossmap_manage *gm, + const u8 *msg); + /* For memleak to see inside of maps */ void gossmap_manage_memleak(struct htable *memtable, const struct gossmap_manage *gm); #endif /* LIGHTNING_GOSSIPD_GOSSMAP_MANAGE_H */ diff --git a/lightningd/gossip_control.c b/lightningd/gossip_control.c index 038eb1f7a..9c698cd32 100644 --- a/lightningd/gossip_control.c +++ b/lightningd/gossip_control.c @@ -196,11 +196,13 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds) case WIRE_GOSSIPD_GET_TXOUT_REPLY: case WIRE_GOSSIPD_OUTPOINTS_SPENT: case WIRE_GOSSIPD_DEV_MEMLEAK: + case WIRE_GOSSIPD_DEV_COMPACT_STORE: case WIRE_GOSSIPD_NEW_BLOCKHEIGHT: case WIRE_GOSSIPD_ADDGOSSIP: /* This is a reply, so never gets through to here. */ case WIRE_GOSSIPD_INIT_REPLY: case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY: + case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY: case WIRE_GOSSIPD_ADDGOSSIP_REPLY: case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY: break; @@ -488,3 +490,46 @@ static const struct json_command dev_set_max_scids_encode_size = { .dev_only = true, }; AUTODATA(json_command, &dev_set_max_scids_encode_size); + +static void dev_compact_gossip_store_reply(struct subd *gossip UNUSED, + const u8 *reply, + const int *fds UNUSED, + struct command *cmd) +{ + char *result; + + if (!fromwire_gossipd_dev_compact_store_reply(cmd, reply, &result)) { + was_pending(command_fail(cmd, LIGHTNINGD, + "Gossip gave bad dev_gossip_compact_store_reply")); + return; + } + + if (streq(result, "")) + was_pending(command_success(cmd, json_stream_success(cmd))); + else + was_pending(command_fail(cmd, LIGHTNINGD, + "gossip_compact_store failed: %s", result)); +} + +static struct command_result *json_dev_compact_gossip_store(struct command *cmd, + const char *buffer, + const jsmntok_t *obj UNNEEDED, + const jsmntok_t *params) +{ + u8 *msg; + + if (!param(cmd, buffer, params, NULL)) + return command_param_failed(); + + msg = towire_gossipd_dev_compact_store(NULL); + subd_req(cmd->ld->gossip, cmd->ld->gossip, + take(msg), -1, 0, dev_compact_gossip_store_reply, cmd); + return command_still_pending(cmd); +} + +static const struct json_command dev_compact_gossip_store = { + "dev-compact-gossip-store", + json_dev_compact_gossip_store, + .dev_only = true, +}; +AUTODATA(json_command, &dev_compact_gossip_store); diff --git a/tests/plugins/compacter-slow.sh b/tests/plugins/compacter-slow.sh new file mode 100755 index 000000000..0812c62e7 --- /dev/null +++ b/tests/plugins/compacter-slow.sh @@ -0,0 +1,11 @@ +#! /bin/sh -e +# This pretends to be lightning_gossip_compactd, but waits until the file "compactd-continue" +# exists. This lets us test race conditions. + +if [ x"$1" != x"--version" ]; then + while [ ! -f "compactd-continue" ]; do + sleep 1 + done +fi + +exec "$(dirname "$0")"/../../lightningd/lightning_gossip_compactd "$@" diff --git a/tests/test_gossip.py b/tests/test_gossip.py index 440a9a7c1..c17c7bf28 100644 --- a/tests/test_gossip.py +++ b/tests/test_gossip.py @@ -1281,6 +1281,9 @@ def test_gossip_store_load_announce_before_update(node_factory): l1.daemon.wait_for_log("gossipd: gossip_store: 3 live records, 0 deleted") assert not l1.daemon.is_in_log('gossip_store.*corrupt') + # Extra sanity check if we can. + l1.rpc.call('dev-compact-gossip-store') + def test_gossip_store_load_amount_truncated(node_factory): """Make sure we can read canned gossip store with truncated amount""" @@ -1300,6 +1303,11 @@ def test_gossip_store_load_amount_truncated(node_factory): wait_for(lambda: l1.daemon.is_in_log('gossip_store: 0 live records, 0 deleted')) assert os.path.exists(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.corrupt')) + # Extra sanity check if we can. + l1.rpc.call('dev-compact-gossip-store') + l1.restart() + l1.rpc.call('dev-compact-gossip-store') + @pytest.mark.openchannel('v1') @pytest.mark.openchannel('v2') @@ -1628,10 +1636,105 @@ def test_gossip_store_corrupt(node_factory, bitcoind): def test_gossip_store_load_complex(node_factory, bitcoind): l2 = setup_gossip_store_test(node_factory, bitcoind) + l2.rpc.call('dev-compact-gossip-store') l2.restart() + wait_for(lambda: l2.daemon.is_in_log('gossip_store: 9 live records, 0 deleted')) + + +def test_gossip_store_compact(node_factory, bitcoind): + l2 = setup_gossip_store_test(node_factory, bitcoind) + + # Now compact store. + l2.rpc.call('dev-compact-gossip-store') + # Splicing changes features, making this size 2365 bytes -> 2065 bytes. + l2.daemon.wait_for_logs(['gossipd: compaction done: 236[25] -> 206[25] bytes', + 'connectd: Reopened gossip_store, reduced to offset 206[25]']) + + # Should still be connected. + time.sleep(1) + assert len(l2.rpc.listpeers()['peers']) == 2 + + # Should restart ok. + l2.restart() + wait_for(lambda: l2.daemon.is_in_log('gossipd: gossip_store: 9 live records, 0 deleted')) + + +def test_gossip_store_compact_while_extending(node_factory, bitcoind, executor): + """We change gossip_store file (additions, deletion) during compaction""" + l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True, + opts=[{'subdaemon': 'gossip_compactd:../tests/plugins/compacter-slow.sh'}, {}, {}]) + l2.rpc.setchannel(l3.info['id'], 20, 1000) + l3.rpc.setchannel(l2.info['id'], 21, 1001) + + # Wait for it to hit l2's gossip store. + wait_for(lambda: sorted([c['fee_per_millionth'] for c in l1.rpc.listchannels()['channels']]) == [10, 10, 1000, 1001]) + + # We start compaction, but gossip store continues + fut = executor.submit(l1.rpc.call, 'dev-compact-gossip-store') + # Make sure it started! + l1.daemon.wait_for_log('Executing lightning_gossip_compactd') + + # We make another channel, remove old one + l4 = node_factory.get_node() + node_factory.join_nodes([l3, l4], wait_for_announce=True) + scid34 = only_one(l4.rpc.listpeerchannels()['channels'])['short_channel_id'] + wait_for(lambda: len(l1.rpc.listchannels(scid34)['channels']) == 2) + + scid23 = only_one(l2.rpc.listpeerchannels(l3.info['id'])['channels'])['short_channel_id'] + l2.rpc.close(l3.info['id']) + bitcoind.generate_block(13, wait_for_mempool=1) + wait_for(lambda: l1.rpc.listchannels(scid23) == {'channels': []}) + + l1.rpc.setchannel(l2.info['id'], 41, 1004) + scid12 = only_one(l1.rpc.listpeerchannels()['channels'])['short_channel_id'] + wait_for(lambda: sorted([c['fee_per_millionth'] for c in l1.rpc.listchannels(scid12)['channels']]) == [10, 1004]) + + pre_channels = l1.rpc.listchannels() + pre_nodes = sorted(l1.rpc.listnodes()['nodes'], key=lambda n: n['nodeid']) + + # Compaction "continues". + with open(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'compactd-continue'), 'wb') as f: + f.write(b'') + + fut.result(TIMEOUT) + # Exact gossip size varies with EXPERIMENTAL_SPLICING. + l1.daemon.wait_for_logs(['gossipd: compaction done', + 'connectd: Reopened gossip_store, reduced to offset 224[59]']) + + post_channels = l1.rpc.listchannels() + post_nodes = sorted(l1.rpc.listnodes()['nodes'], key=lambda n: n['nodeid']) + l1.daemon.wait_for_log('topology: Reopened gossip_store, reduced to offset 224[59]') + + assert post_channels == pre_channels + assert post_nodes == pre_nodes + + +def test_gossip_store_compact_miss_update(node_factory, bitcoind, executor): + """If we compact twice, you should notice the UUID difference.""" + l2 = setup_gossip_store_test(node_factory, bitcoind) + + pre_channels = l2.rpc.listchannels() + + # Now compact store twice. + l2.rpc.call('dev-compact-gossip-store') + l2.rpc.call('dev-compact-gossip-store') + + post_channels = l2.rpc.listchannels() + l2.daemon.wait_for_log('topology: Reopened gossip_store, but we missed some') + assert pre_channels == post_channels + + +def test_gossip_store_compact_restart(node_factory, bitcoind): + l2 = setup_gossip_store_test(node_factory, bitcoind) + + # Should restart ok. + l2.restart() wait_for(lambda: l2.daemon.is_in_log('gossip_store: 9 live records, 2 deleted')) + # Now compact store. + l2.rpc.call('dev-compact-gossip-store') + def test_gossip_store_load_no_channel_update(node_factory): """Make sure we can read truncated gossip store with a channel_announcement and no channel_update""" diff --git a/tools/bench-gossipd.sh b/tools/bench-gossipd.sh index 4bea1552c..b98295b58 100755 --- a/tools/bench-gossipd.sh +++ b/tools/bench-gossipd.sh @@ -5,7 +5,7 @@ set -e DIR="" TARGETS="" -DEFAULT_TARGETS=" store_load_msec vsz_kb listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec " +DEFAULT_TARGETS=" store_load_msec vsz_kb store_rewrite_sec listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec " MCP_DIR=../million-channels-project/data/1M/gossip/ CSV=false @@ -111,6 +111,12 @@ if [ -z "${TARGETS##* vsz_kb *}" ]; then ps -o vsz= -p "$(pidof lightning_gossipd)" | print_stat vsz_kb fi +# How long does rewriting the store take? +if [ -z "${TARGETS##* store_rewrite_sec *}" ]; then + # shellcheck disable=SC2086 + /usr/bin/time --append -f %e $LCLI1 dev-compact-gossip-store 2>&1 > /dev/null | print_stat store_rewrite_sec +fi + # Now, how long does listnodes take? if [ -z "${TARGETS##* listnodes_sec *}" ]; then # shellcheck disable=SC2086