connectd: close connection properly after dev-disconnect +.

We need to drain subds too, otherwise if timing is correct we never close.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2025-10-09 14:06:02 +10:30
parent af340a6817
commit fc6bee8950

View File

@@ -67,6 +67,8 @@ struct subd {
/* FIXME: reorder! */
static void destroy_connected_subd(struct subd *subd);
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
struct peer *peer);
static struct subd *find_subd(struct peer *peer,
const struct channel_id *channel_id)
@@ -160,6 +162,21 @@ void disconnect_peer(struct peer *peer)
}
}
static void free_all_subds(struct peer *peer)
{
for (size_t i = 0; i < tal_count(peer->subds); i++) {
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
if (peer->subds[i]->conn) {
tal_del_destructor(peer->subds[i], destroy_connected_subd);
tal_free(peer->subds[i]->conn);
} else {
/* We told lightningd that peer spoke, but it hasn't returned yet. */
tal_free(peer->subds[i]);
}
}
tal_resize(&peer->subds, 0);
}
/* Send warning, close connection to peer */
static void send_warning(struct peer *peer, const char *fmt, ...)
{
@@ -177,17 +194,7 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
inject_peer_msg(peer, take(msg));
/* Free all the subds immediately */
for (size_t i = 0; i < tal_count(peer->subds); i++) {
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
if (peer->subds[i]->conn) {
tal_del_destructor(peer->subds[i], destroy_connected_subd);
tal_free(peer->subds[i]->conn);
} else {
/* We told lightningd that peer spoke, but it hasn't returned yet. */
tal_free(peer->subds[i]);
}
}
tal_resize(&peer->subds, 0);
free_all_subds(peer);
disconnect_peer(peer);
}
@@ -397,12 +404,6 @@ static bool is_urgent(enum peer_wire type)
return false;
}
/* io_sock_shutdown, but in format suitable for an io_plan callback */
static struct io_plan *io_sock_shutdown_cb(struct io_conn *conn, struct peer *unused)
{
return io_sock_shutdown(conn);
}
/* Process and eat protocol_batch_element messages, encrypt each element message
* and return the encrypted messages as one long byte array. */
static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
@@ -466,11 +467,7 @@ static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
return ret;
}
static struct io_plan *encrypt_and_send(struct peer *peer,
const u8 *msg TAKES,
struct io_plan *(*next)
(struct io_conn *peer_conn,
struct peer *peer))
static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES)
{
int type = fromwire_peektype(msg);
@@ -482,8 +479,8 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
case DEV_DISCONNECT_OUT_AFTER:
/* Disallow reads from now on */
peer->dev_read_enabled = false;
/* Using io_close here can lose the data we're about to send! */
next = io_sock_shutdown_cb;
free_all_subds(peer);
drain_peer(peer);
break;
case DEV_DISCONNECT_OUT_BLACKHOLE:
/* Disable both reads and writes from now on */
@@ -499,7 +496,7 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
/* Tell them to read again, */
io_wake(&peer->subds);
return msg_queue_wait(peer->to_peer, peer->peer_outq,
next, peer);
write_to_peer, peer);
case DEV_DISCONNECT_OUT_DISABLE_AFTER:
peer->dev_read_enabled = false;
peer->dev_writes_enabled = tal(peer, u32);
@@ -523,7 +520,7 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
return io_write(peer->to_peer,
peer->sent_to_peer,
tal_bytelen(peer->sent_to_peer),
next, peer);
write_to_peer, peer);
}
/* Kicks off write_to_peer() to look for more gossip to send from store */
@@ -1120,7 +1117,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
(*peer->dev_writes_enabled)--;
}
return encrypt_and_send(peer, take(msg), write_to_peer);
return encrypt_and_send(peer, take(msg));
}
static struct io_plan *read_from_subd(struct io_conn *subd_conn,