gossipd: dev-compact-gossip-store to manually invoke compaction.
And tests! Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -542,6 +542,12 @@ static struct io_plan *recv_req(struct io_conn *conn,
|
||||
goto done;
|
||||
}
|
||||
/* fall thru */
|
||||
case WIRE_GOSSIPD_DEV_COMPACT_STORE:
|
||||
if (daemon->developer) {
|
||||
gossmap_manage_handle_dev_compact_store(daemon->gm, msg);
|
||||
goto done;
|
||||
}
|
||||
/* fall thru */
|
||||
|
||||
/* We send these, we don't receive them */
|
||||
case WIRE_GOSSIPD_INIT_CUPDATE:
|
||||
@@ -549,6 +555,7 @@ static struct io_plan *recv_req(struct io_conn *conn,
|
||||
case WIRE_GOSSIPD_INIT_REPLY:
|
||||
case WIRE_GOSSIPD_GET_TXOUT:
|
||||
case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY:
|
||||
case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY:
|
||||
case WIRE_GOSSIPD_ADDGOSSIP_REPLY:
|
||||
case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY:
|
||||
case WIRE_GOSSIPD_REMOTE_CHANNEL_UPDATE:
|
||||
|
||||
@@ -50,6 +50,13 @@ msgtype,gossipd_dev_memleak,3033
|
||||
msgtype,gossipd_dev_memleak_reply,3133
|
||||
msgdata,gossipd_dev_memleak_reply,leak,bool,
|
||||
|
||||
# master -> gossipd: please rewrite the gossip_store
|
||||
msgtype,gossipd_dev_compact_store,3034
|
||||
|
||||
# gossipd -> master: empty string means no problem.
|
||||
msgtype,gossipd_dev_compact_store_reply,3134
|
||||
msgdata,gossipd_dev_compact_store_reply,result,wirestring,
|
||||
|
||||
# master -> gossipd: blockheight increased.
|
||||
msgtype,gossipd_new_blockheight,3026
|
||||
msgdata,gossipd_new_blockheight,blockheight,u32,
|
||||
|
||||
|
@@ -70,6 +70,7 @@ struct cannounce_map {
|
||||
struct compactd {
|
||||
struct io_conn *in_conn;
|
||||
u64 old_size;
|
||||
bool dev_compact;
|
||||
u8 ignored;
|
||||
int outfd;
|
||||
pid_t pid;
|
||||
@@ -1582,6 +1583,24 @@ bool gossmap_manage_populated(const struct gossmap_manage *gm)
|
||||
return gm->gossip_store_populated;
|
||||
}
|
||||
|
||||
static void compactd_broken(const struct gossmap_manage *gm,
|
||||
const char *fmt, ...)
|
||||
{
|
||||
va_list ap;
|
||||
|
||||
va_start(ap, fmt);
|
||||
status_vfmt(LOG_BROKEN, NULL, fmt, ap);
|
||||
va_end(ap);
|
||||
|
||||
if (gm->compactd->dev_compact) {
|
||||
va_start(ap, fmt);
|
||||
daemon_conn_send(gm->daemon->master,
|
||||
take(towire_gossipd_dev_compact_store_reply(NULL,
|
||||
tal_vfmt(tmpctx, fmt, ap))));
|
||||
va_end(ap);
|
||||
}
|
||||
}
|
||||
|
||||
static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm)
|
||||
{
|
||||
int status;
|
||||
@@ -1594,18 +1613,18 @@ static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm)
|
||||
strerror(errno));
|
||||
|
||||
if (!WIFEXITED(status)) {
|
||||
status_broken("compactd failed with signal %u",
|
||||
compactd_broken(gm, "compactd failed with signal %u",
|
||||
WTERMSIG(status));
|
||||
goto failed;
|
||||
}
|
||||
if (WEXITSTATUS(status) != 0) {
|
||||
status_broken("compactd exited with status %u",
|
||||
compactd_broken(gm, "compactd exited with status %u",
|
||||
WEXITSTATUS(status));
|
||||
goto failed;
|
||||
}
|
||||
|
||||
if (stat(GOSSIP_STORE_COMPACT_FILENAME, &st) != 0) {
|
||||
status_broken("compactd did not create file? %s",
|
||||
compactd_broken(gm, "compactd did not create file? %s",
|
||||
strerror(errno));
|
||||
goto failed;
|
||||
}
|
||||
@@ -1639,6 +1658,9 @@ static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm)
|
||||
towire_gossip_store_ended(tmpctx, st.st_size, gm->compactd->uuid),
|
||||
0);
|
||||
gossip_store_reopen(gm->gs);
|
||||
if (gm->compactd->dev_compact)
|
||||
daemon_conn_send(gm->daemon->master,
|
||||
take(towire_gossipd_dev_compact_store_reply(NULL, "")));
|
||||
|
||||
failed:
|
||||
gm->compactd = tal_free(gm->compactd);
|
||||
@@ -1666,7 +1688,7 @@ static struct io_plan *init_compactd_conn_in(struct io_conn *conn,
|
||||
compactd_read_done, gm);
|
||||
}
|
||||
/* Returns false if already running */
|
||||
static bool gossmap_compact(struct gossmap_manage *gm)
|
||||
static bool gossmap_compact(struct gossmap_manage *gm, bool dev_compact)
|
||||
{
|
||||
int childin[2], execfail[2], childout[2];
|
||||
int saved_errno;
|
||||
@@ -1753,6 +1775,7 @@ static bool gossmap_compact(struct gossmap_manage *gm)
|
||||
}
|
||||
close(execfail[0]);
|
||||
|
||||
gm->compactd->dev_compact = dev_compact;
|
||||
gm->compactd->outfd = childout[1];
|
||||
gm->compactd->in_conn = io_new_conn(gm->compactd, childin[0],
|
||||
init_compactd_conn_in, gm);
|
||||
@@ -1776,7 +1799,7 @@ void gossmap_manage_maybe_compact(struct gossmap_manage *gm)
|
||||
if (num_dead < 4 * num_live)
|
||||
return;
|
||||
|
||||
compact_started = gossmap_compact(gm);
|
||||
compact_started = gossmap_compact(gm, false);
|
||||
status_debug("%s gossmap compaction:"
|
||||
" %"PRIu64" with"
|
||||
" %"PRIu64" live records and %"PRIu64" dead records",
|
||||
@@ -1784,3 +1807,15 @@ void gossmap_manage_maybe_compact(struct gossmap_manage *gm)
|
||||
gossip_store_len_written(gm->gs),
|
||||
num_live, num_dead);
|
||||
}
|
||||
|
||||
void gossmap_manage_handle_dev_compact_store(struct gossmap_manage *gm, const u8 *msg)
|
||||
{
|
||||
if (!fromwire_gossipd_dev_compact_store(msg))
|
||||
master_badmsg(WIRE_GOSSIPD_DEV_COMPACT_STORE, msg);
|
||||
|
||||
if (!gossmap_compact(gm, true))
|
||||
daemon_conn_send(gm->daemon->master,
|
||||
take(towire_gossipd_dev_compact_store_reply(NULL,
|
||||
"Already compacting")));
|
||||
}
|
||||
|
||||
|
||||
@@ -118,6 +118,10 @@ bool gossmap_manage_populated(const struct gossmap_manage *gm);
|
||||
*/
|
||||
void gossmap_manage_maybe_compact(struct gossmap_manage *gm);
|
||||
|
||||
/* For testing */
|
||||
void gossmap_manage_handle_dev_compact_store(struct gossmap_manage *gm,
|
||||
const u8 *msg);
|
||||
|
||||
/* For memleak to see inside of maps */
|
||||
void gossmap_manage_memleak(struct htable *memtable, const struct gossmap_manage *gm);
|
||||
#endif /* LIGHTNING_GOSSIPD_GOSSMAP_MANAGE_H */
|
||||
|
||||
@@ -196,11 +196,13 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds)
|
||||
case WIRE_GOSSIPD_GET_TXOUT_REPLY:
|
||||
case WIRE_GOSSIPD_OUTPOINTS_SPENT:
|
||||
case WIRE_GOSSIPD_DEV_MEMLEAK:
|
||||
case WIRE_GOSSIPD_DEV_COMPACT_STORE:
|
||||
case WIRE_GOSSIPD_NEW_BLOCKHEIGHT:
|
||||
case WIRE_GOSSIPD_ADDGOSSIP:
|
||||
/* This is a reply, so never gets through to here. */
|
||||
case WIRE_GOSSIPD_INIT_REPLY:
|
||||
case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY:
|
||||
case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY:
|
||||
case WIRE_GOSSIPD_ADDGOSSIP_REPLY:
|
||||
case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY:
|
||||
break;
|
||||
@@ -488,3 +490,46 @@ static const struct json_command dev_set_max_scids_encode_size = {
|
||||
.dev_only = true,
|
||||
};
|
||||
AUTODATA(json_command, &dev_set_max_scids_encode_size);
|
||||
|
||||
static void dev_compact_gossip_store_reply(struct subd *gossip UNUSED,
|
||||
const u8 *reply,
|
||||
const int *fds UNUSED,
|
||||
struct command *cmd)
|
||||
{
|
||||
char *result;
|
||||
|
||||
if (!fromwire_gossipd_dev_compact_store_reply(cmd, reply, &result)) {
|
||||
was_pending(command_fail(cmd, LIGHTNINGD,
|
||||
"Gossip gave bad dev_gossip_compact_store_reply"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (streq(result, ""))
|
||||
was_pending(command_success(cmd, json_stream_success(cmd)));
|
||||
else
|
||||
was_pending(command_fail(cmd, LIGHTNINGD,
|
||||
"gossip_compact_store failed: %s", result));
|
||||
}
|
||||
|
||||
static struct command_result *json_dev_compact_gossip_store(struct command *cmd,
|
||||
const char *buffer,
|
||||
const jsmntok_t *obj UNNEEDED,
|
||||
const jsmntok_t *params)
|
||||
{
|
||||
u8 *msg;
|
||||
|
||||
if (!param(cmd, buffer, params, NULL))
|
||||
return command_param_failed();
|
||||
|
||||
msg = towire_gossipd_dev_compact_store(NULL);
|
||||
subd_req(cmd->ld->gossip, cmd->ld->gossip,
|
||||
take(msg), -1, 0, dev_compact_gossip_store_reply, cmd);
|
||||
return command_still_pending(cmd);
|
||||
}
|
||||
|
||||
static const struct json_command dev_compact_gossip_store = {
|
||||
"dev-compact-gossip-store",
|
||||
json_dev_compact_gossip_store,
|
||||
.dev_only = true,
|
||||
};
|
||||
AUTODATA(json_command, &dev_compact_gossip_store);
|
||||
|
||||
11
tests/plugins/compacter-slow.sh
Executable file
11
tests/plugins/compacter-slow.sh
Executable file
@@ -0,0 +1,11 @@
|
||||
#! /bin/sh -e
|
||||
# This pretends to be lightning_gossip_compactd, but waits until the file "compactd-continue"
|
||||
# exists. This lets us test race conditions.
|
||||
|
||||
if [ x"$1" != x"--version" ]; then
|
||||
while [ ! -f "compactd-continue" ]; do
|
||||
sleep 1
|
||||
done
|
||||
fi
|
||||
|
||||
exec "$(dirname "$0")"/../../lightningd/lightning_gossip_compactd "$@"
|
||||
@@ -1281,6 +1281,9 @@ def test_gossip_store_load_announce_before_update(node_factory):
|
||||
l1.daemon.wait_for_log("gossipd: gossip_store: 3 live records, 0 deleted")
|
||||
assert not l1.daemon.is_in_log('gossip_store.*corrupt')
|
||||
|
||||
# Extra sanity check if we can.
|
||||
l1.rpc.call('dev-compact-gossip-store')
|
||||
|
||||
|
||||
def test_gossip_store_load_amount_truncated(node_factory):
|
||||
"""Make sure we can read canned gossip store with truncated amount"""
|
||||
@@ -1300,6 +1303,11 @@ def test_gossip_store_load_amount_truncated(node_factory):
|
||||
wait_for(lambda: l1.daemon.is_in_log('gossip_store: 0 live records, 0 deleted'))
|
||||
assert os.path.exists(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.corrupt'))
|
||||
|
||||
# Extra sanity check if we can.
|
||||
l1.rpc.call('dev-compact-gossip-store')
|
||||
l1.restart()
|
||||
l1.rpc.call('dev-compact-gossip-store')
|
||||
|
||||
|
||||
@pytest.mark.openchannel('v1')
|
||||
@pytest.mark.openchannel('v2')
|
||||
@@ -1628,10 +1636,105 @@ def test_gossip_store_corrupt(node_factory, bitcoind):
|
||||
def test_gossip_store_load_complex(node_factory, bitcoind):
|
||||
l2 = setup_gossip_store_test(node_factory, bitcoind)
|
||||
|
||||
l2.rpc.call('dev-compact-gossip-store')
|
||||
l2.restart()
|
||||
|
||||
wait_for(lambda: l2.daemon.is_in_log('gossip_store: 9 live records, 0 deleted'))
|
||||
|
||||
|
||||
def test_gossip_store_compact(node_factory, bitcoind):
|
||||
l2 = setup_gossip_store_test(node_factory, bitcoind)
|
||||
|
||||
# Now compact store.
|
||||
l2.rpc.call('dev-compact-gossip-store')
|
||||
# Splicing changes features, making this size 2365 bytes -> 2065 bytes.
|
||||
l2.daemon.wait_for_logs(['gossipd: compaction done: 236[25] -> 206[25] bytes',
|
||||
'connectd: Reopened gossip_store, reduced to offset 206[25]'])
|
||||
|
||||
# Should still be connected.
|
||||
time.sleep(1)
|
||||
assert len(l2.rpc.listpeers()['peers']) == 2
|
||||
|
||||
# Should restart ok.
|
||||
l2.restart()
|
||||
wait_for(lambda: l2.daemon.is_in_log('gossipd: gossip_store: 9 live records, 0 deleted'))
|
||||
|
||||
|
||||
def test_gossip_store_compact_while_extending(node_factory, bitcoind, executor):
|
||||
"""We change gossip_store file (additions, deletion) during compaction"""
|
||||
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True,
|
||||
opts=[{'subdaemon': 'gossip_compactd:../tests/plugins/compacter-slow.sh'}, {}, {}])
|
||||
l2.rpc.setchannel(l3.info['id'], 20, 1000)
|
||||
l3.rpc.setchannel(l2.info['id'], 21, 1001)
|
||||
|
||||
# Wait for it to hit l2's gossip store.
|
||||
wait_for(lambda: sorted([c['fee_per_millionth'] for c in l1.rpc.listchannels()['channels']]) == [10, 10, 1000, 1001])
|
||||
|
||||
# We start compaction, but gossip store continues
|
||||
fut = executor.submit(l1.rpc.call, 'dev-compact-gossip-store')
|
||||
# Make sure it started!
|
||||
l1.daemon.wait_for_log('Executing lightning_gossip_compactd')
|
||||
|
||||
# We make another channel, remove old one
|
||||
l4 = node_factory.get_node()
|
||||
node_factory.join_nodes([l3, l4], wait_for_announce=True)
|
||||
scid34 = only_one(l4.rpc.listpeerchannels()['channels'])['short_channel_id']
|
||||
wait_for(lambda: len(l1.rpc.listchannels(scid34)['channels']) == 2)
|
||||
|
||||
scid23 = only_one(l2.rpc.listpeerchannels(l3.info['id'])['channels'])['short_channel_id']
|
||||
l2.rpc.close(l3.info['id'])
|
||||
bitcoind.generate_block(13, wait_for_mempool=1)
|
||||
wait_for(lambda: l1.rpc.listchannels(scid23) == {'channels': []})
|
||||
|
||||
l1.rpc.setchannel(l2.info['id'], 41, 1004)
|
||||
scid12 = only_one(l1.rpc.listpeerchannels()['channels'])['short_channel_id']
|
||||
wait_for(lambda: sorted([c['fee_per_millionth'] for c in l1.rpc.listchannels(scid12)['channels']]) == [10, 1004])
|
||||
|
||||
pre_channels = l1.rpc.listchannels()
|
||||
pre_nodes = sorted(l1.rpc.listnodes()['nodes'], key=lambda n: n['nodeid'])
|
||||
|
||||
# Compaction "continues".
|
||||
with open(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'compactd-continue'), 'wb') as f:
|
||||
f.write(b'')
|
||||
|
||||
fut.result(TIMEOUT)
|
||||
# Exact gossip size varies with EXPERIMENTAL_SPLICING.
|
||||
l1.daemon.wait_for_logs(['gossipd: compaction done',
|
||||
'connectd: Reopened gossip_store, reduced to offset 224[59]'])
|
||||
|
||||
post_channels = l1.rpc.listchannels()
|
||||
post_nodes = sorted(l1.rpc.listnodes()['nodes'], key=lambda n: n['nodeid'])
|
||||
l1.daemon.wait_for_log('topology: Reopened gossip_store, reduced to offset 224[59]')
|
||||
|
||||
assert post_channels == pre_channels
|
||||
assert post_nodes == pre_nodes
|
||||
|
||||
|
||||
def test_gossip_store_compact_miss_update(node_factory, bitcoind, executor):
|
||||
"""If we compact twice, you should notice the UUID difference."""
|
||||
l2 = setup_gossip_store_test(node_factory, bitcoind)
|
||||
|
||||
pre_channels = l2.rpc.listchannels()
|
||||
|
||||
# Now compact store twice.
|
||||
l2.rpc.call('dev-compact-gossip-store')
|
||||
l2.rpc.call('dev-compact-gossip-store')
|
||||
|
||||
post_channels = l2.rpc.listchannels()
|
||||
l2.daemon.wait_for_log('topology: Reopened gossip_store, but we missed some')
|
||||
assert pre_channels == post_channels
|
||||
|
||||
|
||||
def test_gossip_store_compact_restart(node_factory, bitcoind):
|
||||
l2 = setup_gossip_store_test(node_factory, bitcoind)
|
||||
|
||||
# Should restart ok.
|
||||
l2.restart()
|
||||
wait_for(lambda: l2.daemon.is_in_log('gossip_store: 9 live records, 2 deleted'))
|
||||
|
||||
# Now compact store.
|
||||
l2.rpc.call('dev-compact-gossip-store')
|
||||
|
||||
|
||||
def test_gossip_store_load_no_channel_update(node_factory):
|
||||
"""Make sure we can read truncated gossip store with a channel_announcement and no channel_update"""
|
||||
|
||||
@@ -5,7 +5,7 @@ set -e
|
||||
|
||||
DIR=""
|
||||
TARGETS=""
|
||||
DEFAULT_TARGETS=" store_load_msec vsz_kb listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec "
|
||||
DEFAULT_TARGETS=" store_load_msec vsz_kb store_rewrite_sec listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec "
|
||||
MCP_DIR=../million-channels-project/data/1M/gossip/
|
||||
CSV=false
|
||||
|
||||
@@ -111,6 +111,12 @@ if [ -z "${TARGETS##* vsz_kb *}" ]; then
|
||||
ps -o vsz= -p "$(pidof lightning_gossipd)" | print_stat vsz_kb
|
||||
fi
|
||||
|
||||
# How long does rewriting the store take?
|
||||
if [ -z "${TARGETS##* store_rewrite_sec *}" ]; then
|
||||
# shellcheck disable=SC2086
|
||||
/usr/bin/time --append -f %e $LCLI1 dev-compact-gossip-store 2>&1 > /dev/null | print_stat store_rewrite_sec
|
||||
fi
|
||||
|
||||
# Now, how long does listnodes take?
|
||||
if [ -z "${TARGETS##* listnodes_sec *}" ]; then
|
||||
# shellcheck disable=SC2086
|
||||
|
||||
Reference in New Issue
Block a user