gossipd: code to invoke compactd and reopen store.
This isn't called anywhere yet. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -218,6 +218,25 @@ static int make_new_gossip_store(u64 *total_len)
|
||||
return new_fd;
|
||||
}
|
||||
|
||||
static int gossip_store_open(u64 *total_len, bool *recent)
|
||||
{
|
||||
struct stat st;
|
||||
int fd = open(GOSSIP_STORE_FILENAME, O_RDWR);
|
||||
if (fd == -1)
|
||||
return -1;
|
||||
|
||||
if (fstat(fd, &st) != 0) {
|
||||
close_noerr(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (recent)
|
||||
*recent = (st.st_mtime > clock_time().ts.tv_sec - 3600);
|
||||
|
||||
*total_len = st.st_size;
|
||||
return fd;
|
||||
}
|
||||
|
||||
/* If this returns -1, we cannot upgrade. */
|
||||
static int gossip_store_upgrade(struct daemon *daemon,
|
||||
u64 *total_len,
|
||||
@@ -227,12 +246,12 @@ static int gossip_store_upgrade(struct daemon *daemon,
|
||||
u64 old_len, cur_off;
|
||||
struct gossip_hdr hdr;
|
||||
u8 oldversion, version = GOSSIP_STORE_VER;
|
||||
struct stat st;
|
||||
struct timemono start = time_mono();
|
||||
const char *bad;
|
||||
bool recent;
|
||||
u8 *uuid;
|
||||
|
||||
old_fd = open(GOSSIP_STORE_FILENAME, O_RDWR);
|
||||
old_fd = gossip_store_open(total_len, &recent);
|
||||
if (old_fd == -1) {
|
||||
if (errno == ENOENT) {
|
||||
*populated = false;
|
||||
@@ -249,22 +268,13 @@ static int gossip_store_upgrade(struct daemon *daemon,
|
||||
goto upgrade_failed;
|
||||
}
|
||||
|
||||
if (fstat(old_fd, &st) != 0) {
|
||||
status_broken("Could not stat gossip_store: %s",
|
||||
strerror(errno));
|
||||
goto upgrade_failed;
|
||||
}
|
||||
|
||||
/* If we have any contents (beyond uuid), and the file is less
|
||||
* than 1 hour old, say "seems good" */
|
||||
*populated = (st.st_mtime > time_now().ts.tv_sec - 3600
|
||||
&& st.st_size > 1 + sizeof(hdr) + 2 + 32);
|
||||
*populated = recent && *total_len > 1 + sizeof(hdr) + 2 + 32;
|
||||
|
||||
/* No upgrade necessary? We're done. */
|
||||
if (oldversion == GOSSIP_STORE_VER) {
|
||||
*total_len = st.st_size;
|
||||
if (oldversion == GOSSIP_STORE_VER)
|
||||
return old_fd;
|
||||
}
|
||||
|
||||
if (!can_upgrade(oldversion)) {
|
||||
status_unusual("Cannot upgrade gossip_store version %u",
|
||||
@@ -423,7 +433,16 @@ struct gossip_store *gossip_store_new(const tal_t *ctx,
|
||||
return gs;
|
||||
}
|
||||
|
||||
static void gossip_store_fsync(const struct gossip_store *gs)
|
||||
void gossip_store_reopen(struct gossip_store *gs)
|
||||
{
|
||||
close(gs->fd);
|
||||
gs->fd = gossip_store_open(&gs->len, NULL);
|
||||
if (gs->fd < 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"gossmap reopen failed: %s", strerror(errno));
|
||||
}
|
||||
|
||||
void gossip_store_fsync(const struct gossip_store *gs)
|
||||
{
|
||||
if (fsync(gs->fd) != 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
|
||||
@@ -34,11 +34,24 @@ struct gossip_store *gossip_store_new(const tal_t *ctx,
|
||||
struct daemon *daemon,
|
||||
bool *populated);
|
||||
|
||||
/**
|
||||
* Reopen the gossip_store
|
||||
* @gs: the gossip store.
|
||||
*
|
||||
* We've rewritten it, so reopen the file.
|
||||
*/
|
||||
void gossip_store_reopen(struct gossip_store *gs);
|
||||
|
||||
/**
|
||||
* Move the old gossip store out the way. Log a broken message about it.
|
||||
*/
|
||||
void gossip_store_corrupt(void);
|
||||
|
||||
/**
|
||||
* Extra paranoia: make sure it's on disk. Don't call often!\
|
||||
*/
|
||||
void gossip_store_fsync(const struct gossip_store *gs);
|
||||
|
||||
/**
|
||||
* Append a gossip message to the gossip_store
|
||||
* @gs: gossip store
|
||||
|
||||
@@ -372,12 +372,13 @@ static void master_or_connectd_gone(struct daemon_conn *dc UNUSED)
|
||||
static void gossip_init(struct daemon *daemon, const u8 *msg)
|
||||
{
|
||||
if (!fromwire_gossipd_init(daemon, msg,
|
||||
&chainparams,
|
||||
&daemon->our_features,
|
||||
&daemon->id,
|
||||
&daemon->dev_fast_gossip,
|
||||
&daemon->dev_fast_gossip_prune,
|
||||
&daemon->autoconnect_seeker_peers)) {
|
||||
&chainparams,
|
||||
&daemon->our_features,
|
||||
&daemon->id,
|
||||
&daemon->autoconnect_seeker_peers,
|
||||
&daemon->compactd_helper,
|
||||
&daemon->dev_fast_gossip,
|
||||
&daemon->dev_fast_gossip_prune)) {
|
||||
master_badmsg(WIRE_GOSSIPD_INIT, msg);
|
||||
}
|
||||
|
||||
|
||||
@@ -63,6 +63,9 @@ struct daemon {
|
||||
/* Features lightningd told us to set. */
|
||||
struct feature_set *our_features;
|
||||
|
||||
/* Program to run to compact the datastore */
|
||||
char *compactd_helper;
|
||||
|
||||
/* Speed up gossip. */
|
||||
bool dev_fast_gossip;
|
||||
|
||||
|
||||
@@ -9,9 +9,10 @@ msgtype,gossipd_init,3000
|
||||
msgdata,gossipd_init,chainparams,chainparams,
|
||||
msgdata,gossipd_init,our_features,feature_set,
|
||||
msgdata,gossipd_init,id,node_id,
|
||||
msgdata,gossipd_init,autoconnect_seeker_peers,u32,
|
||||
msgdata,gossipd_init,compactd_helper,wirestring,
|
||||
msgdata,gossipd_init,dev_fast_gossip,bool,
|
||||
msgdata,gossipd_init,dev_fast_gossip_prune,bool,
|
||||
msgdata,gossipd_init,autoconnect_seeker_peers,u32,
|
||||
|
||||
# Gossipd tells us all our public channel_updates before init_reply.
|
||||
msgtype,gossipd_init_cupdate,3101
|
||||
|
||||
|
@@ -1,6 +1,9 @@
|
||||
#include "config.h"
|
||||
#include <bitcoin/script.h>
|
||||
#include <ccan/array_size/array_size.h>
|
||||
#include <ccan/closefrom/closefrom.h>
|
||||
#include <ccan/err/err.h>
|
||||
#include <ccan/read_write_all/read_write_all.h>
|
||||
#include <ccan/tal/str/str.h>
|
||||
#include <common/clock_time.h>
|
||||
#include <common/daemon_conn.h>
|
||||
@@ -12,6 +15,8 @@
|
||||
#include <common/timeout.h>
|
||||
#include <common/utils.h>
|
||||
#include <common/wire_error.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <gossipd/gossip_store.h>
|
||||
#include <gossipd/gossipd.h>
|
||||
#include <gossipd/gossipd_wiregen.h>
|
||||
@@ -19,6 +24,12 @@
|
||||
#include <gossipd/seeker.h>
|
||||
#include <gossipd/sigcheck.h>
|
||||
#include <gossipd/txout_failures.h>
|
||||
#include <stdio.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define GOSSIP_STORE_COMPACT_FILENAME "gossip_store.compact"
|
||||
|
||||
struct pending_cannounce {
|
||||
const u8 *scriptpubkey;
|
||||
@@ -56,6 +67,15 @@ struct cannounce_map {
|
||||
bool flood_reported;
|
||||
};
|
||||
|
||||
struct compactd {
|
||||
struct io_conn *in_conn;
|
||||
u64 old_size;
|
||||
u8 ignored;
|
||||
int outfd;
|
||||
pid_t pid;
|
||||
u8 uuid[32];
|
||||
};
|
||||
|
||||
struct gossmap_manage {
|
||||
struct daemon *daemon;
|
||||
|
||||
@@ -92,6 +112,9 @@ struct gossmap_manage {
|
||||
|
||||
/* Are we populated yet? */
|
||||
bool gossip_store_populated;
|
||||
|
||||
/* Non-NULL if a compactd is running. */
|
||||
struct compactd *compactd;
|
||||
};
|
||||
|
||||
/* Timer recursion */
|
||||
@@ -506,6 +529,7 @@ struct gossmap_manage *gossmap_manage_new(const tal_t *ctx,
|
||||
assert(gm->gs);
|
||||
assert(gm->raw_gossmap);
|
||||
gm->daemon = daemon;
|
||||
gm->compactd = NULL;
|
||||
|
||||
map_init(&gm->pending_ann_map, "pending announcements");
|
||||
gm->pending_cupdates = tal_arr(gm, struct pending_cupdate *, 0);
|
||||
@@ -1557,3 +1581,177 @@ bool gossmap_manage_populated(const struct gossmap_manage *gm)
|
||||
{
|
||||
return gm->gossip_store_populated;
|
||||
}
|
||||
|
||||
static void compactd_done(struct io_conn *unused, struct gossmap_manage *gm)
|
||||
{
|
||||
int status;
|
||||
struct stat st;
|
||||
|
||||
if (waitpid(gm->compactd->pid, &status, 0) < 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"Waiting for %u: %s",
|
||||
(unsigned int)gm->compactd->pid,
|
||||
strerror(errno));
|
||||
|
||||
if (!WIFEXITED(status)) {
|
||||
status_broken("compactd failed with signal %u",
|
||||
WTERMSIG(status));
|
||||
goto failed;
|
||||
}
|
||||
if (WEXITSTATUS(status) != 0) {
|
||||
status_broken("compactd exited with status %u",
|
||||
WEXITSTATUS(status));
|
||||
goto failed;
|
||||
}
|
||||
|
||||
if (stat(GOSSIP_STORE_COMPACT_FILENAME, &st) != 0) {
|
||||
status_broken("compactd did not create file? %s",
|
||||
strerror(errno));
|
||||
goto failed;
|
||||
}
|
||||
|
||||
status_debug("compaction done: %"PRIu64" -> %"PRIu64" bytes",
|
||||
gm->compactd->old_size, (u64)st.st_size);
|
||||
|
||||
/* Switch gossmap to new one, as a sanity check (rather than
|
||||
* writing end marker and letting it reopen) */
|
||||
tal_free(gm->raw_gossmap);
|
||||
gm->raw_gossmap = gossmap_load_initial(gm, GOSSIP_STORE_COMPACT_FILENAME,
|
||||
st.st_size,
|
||||
gossmap_logcb,
|
||||
NULL,
|
||||
gm);
|
||||
if (!gm->raw_gossmap)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"compacted gossip_store is invalid");
|
||||
|
||||
if (rename(GOSSIP_STORE_COMPACT_FILENAME, GOSSIP_STORE_FILENAME) != 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"Error renaming gossip store: %s",
|
||||
strerror(errno));
|
||||
|
||||
/* Now append record to old one, so everyone will switch */
|
||||
gossip_store_add(gm->gs,
|
||||
towire_gossip_store_ended(tmpctx, st.st_size, gm->compactd->uuid),
|
||||
0);
|
||||
gossip_store_reopen(gm->gs);
|
||||
|
||||
failed:
|
||||
gm->compactd = tal_free(gm->compactd);
|
||||
}
|
||||
|
||||
/* When it's caught up to where we were, we wait. */
|
||||
static struct io_plan *compactd_read_done(struct io_conn *conn,
|
||||
struct gossmap_manage *gm)
|
||||
{
|
||||
status_debug("compactd caught up, waiting for final bytes.");
|
||||
|
||||
/* Make sure everything has hit storage in the current version. */
|
||||
gossip_store_fsync(gm->gs);
|
||||
gossmap_manage_get_gossmap(gm);
|
||||
|
||||
/* Tell it to do the remainder, then we wait for it to exit in destructor. */
|
||||
write_all(gm->compactd->outfd, "", 1);
|
||||
return io_close(conn);
|
||||
}
|
||||
|
||||
static struct io_plan *init_compactd_conn_in(struct io_conn *conn,
|
||||
struct gossmap_manage *gm)
|
||||
{
|
||||
return io_read(conn, &gm->compactd->ignored, sizeof(gm->compactd->ignored),
|
||||
compactd_read_done, gm);
|
||||
}
|
||||
/* Returns false if already running */
|
||||
static UNNEEDED bool gossmap_compact(struct gossmap_manage *gm)
|
||||
{
|
||||
int childin[2], execfail[2], childout[2];
|
||||
int saved_errno;
|
||||
|
||||
/* Only one at a time please! */
|
||||
if (gm->compactd)
|
||||
return false;
|
||||
|
||||
/* This checks contents: we want to make sure compactd sees an
|
||||
* up-to-date version. */
|
||||
gossmap_manage_get_gossmap(gm);
|
||||
|
||||
gm->compactd = tal(gm, struct compactd);
|
||||
for (size_t i = 0; i < ARRAY_SIZE(gm->compactd->uuid); i++)
|
||||
gm->compactd->uuid[i] = pseudorand(256);
|
||||
|
||||
gm->compactd->old_size = gossip_store_len_written(gm->gs);
|
||||
status_debug("Executing lightning_gossip_compactd %s %s %s %s",
|
||||
GOSSIP_STORE_FILENAME,
|
||||
GOSSIP_STORE_COMPACT_FILENAME,
|
||||
tal_fmt(tmpctx, "%"PRIu64, gm->compactd->old_size),
|
||||
tal_hexstr(tmpctx, gm->compactd->uuid, sizeof(gm->compactd->uuid)));
|
||||
|
||||
if (pipe(childin) != 0 || pipe(childout) != 0 || pipe(execfail) != 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"Could not create pipes for compactd: %s",
|
||||
strerror(errno));
|
||||
|
||||
if (fcntl(execfail[1], F_SETFD, fcntl(execfail[1], F_GETFD)
|
||||
| FD_CLOEXEC) < 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"Could not set cloexec on compactd fd: %s",
|
||||
strerror(errno));
|
||||
|
||||
gm->compactd->pid = fork();
|
||||
if (gm->compactd->pid < 0)
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"Could not fork for compactd: %s",
|
||||
strerror(errno));
|
||||
|
||||
if (gm->compactd->pid == 0) {
|
||||
close(childin[0]);
|
||||
close(childout[1]);
|
||||
close(execfail[0]);
|
||||
|
||||
/* In practice, low fds are all open, so we don't have
|
||||
* to handle those horrible cases */
|
||||
assert(childin[1] > 2);
|
||||
assert(childout[0] > 2);
|
||||
if (dup2(childin[1], STDOUT_FILENO) == -1)
|
||||
err(1, "Failed to duplicate fd to stdout");
|
||||
close(childin[1]);
|
||||
if (dup2(childout[0], STDIN_FILENO) == -1)
|
||||
err(1, "Failed to duplicate fd to stdin");
|
||||
close(childin[2]);
|
||||
closefrom_limit(0);
|
||||
closefrom(3);
|
||||
/* Tell compactd helper what we read so far. */
|
||||
execlp(gm->daemon->compactd_helper,
|
||||
gm->daemon->compactd_helper,
|
||||
GOSSIP_STORE_FILENAME,
|
||||
GOSSIP_STORE_COMPACT_FILENAME,
|
||||
tal_fmt(tmpctx, "%"PRIu64, gm->compactd->old_size),
|
||||
tal_hexstr(tmpctx, gm->compactd->uuid, sizeof(gm->compactd->uuid)),
|
||||
NULL);
|
||||
saved_errno = errno;
|
||||
/* Gcc's warn-unused-result fail. */
|
||||
if (write(execfail[1], &saved_errno, sizeof(saved_errno))) {
|
||||
;
|
||||
}
|
||||
exit(127);
|
||||
}
|
||||
close(childin[1]);
|
||||
close(childout[0]);
|
||||
close(execfail[1]);
|
||||
|
||||
/* Child will close this without writing on successful exec. */
|
||||
if (read(execfail[0], &saved_errno, sizeof(saved_errno)) == sizeof(saved_errno)) {
|
||||
close(execfail[0]);
|
||||
waitpid(gm->compactd->pid, NULL, 0);
|
||||
status_failed(STATUS_FAIL_INTERNAL_ERROR,
|
||||
"Exec of %s failed: %s",
|
||||
gm->daemon->compactd_helper, strerror(saved_errno));
|
||||
}
|
||||
close(execfail[0]);
|
||||
|
||||
gm->compactd->outfd = childout[1];
|
||||
gm->compactd->in_conn = io_new_conn(gm->compactd, childin[0],
|
||||
init_compactd_conn_in, gm);
|
||||
io_set_finish(gm->compactd->in_conn, compactd_done, gm);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -319,9 +319,10 @@ void gossip_init(struct lightningd *ld, int connectd_fd)
|
||||
chainparams,
|
||||
ld->our_features,
|
||||
&ld->our_nodeid,
|
||||
ld->autoconnect_seeker_peers,
|
||||
subdaemon_path(tmpctx, ld, "lightning_gossip_compactd"),
|
||||
ld->dev_fast_gossip,
|
||||
ld->dev_fast_gossip_prune,
|
||||
ld->autoconnect_seeker_peers);
|
||||
ld->dev_fast_gossip_prune);
|
||||
|
||||
subd_req(ld->gossip, ld->gossip, take(msg), -1, 0,
|
||||
gossipd_init_done, NULL);
|
||||
|
||||
@@ -381,9 +381,10 @@ static const char *subdaemons[] = {
|
||||
"lightning_closingd",
|
||||
"lightning_connectd",
|
||||
"lightning_gossipd",
|
||||
"lightning_gossip_compactd",
|
||||
"lightning_hsmd",
|
||||
"lightning_onchaind",
|
||||
"lightning_openingd"
|
||||
"lightning_openingd",
|
||||
};
|
||||
|
||||
/* Return true if called with a recognized subdaemon e.g. "hsmd" */
|
||||
|
||||
Reference in New Issue
Block a user