From 8268df9a4bf83effb0f7c8e182a4a2b78885a349 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 14 May 2024 14:01:44 +0930 Subject: [PATCH] connectd: implement "transient" connections. Currently, anything which doesn't have a live channel is considered transient. We free this first under stress, and also if they're still connecting. Signed-off-by: Rusty Russell --- connectd/connectd.c | 101 ++++++++++++++++++++++++++--------- connectd/connectd.h | 17 +++++- connectd/connectd_wire.csv | 1 + lightningd/connect_control.c | 9 +++- 4 files changed, 99 insertions(+), 29 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 19d3633d2..61a430fcb 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -152,7 +152,7 @@ static struct peer *new_peer(struct daemon *daemon, const u8 *their_features, enum is_websocket is_websocket, struct io_conn *conn STEALS, - bool deliberate_connection, + enum connection_prio prio, int *fd_for_subd) { struct peer *peer = tal(daemon, struct peer); @@ -169,7 +169,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->peer_outq = msg_queue_new(peer, false); peer->last_recv_time = time_now(); peer->is_websocket = is_websocket; - peer->deliberate_connection = deliberate_connection; + peer->prio = prio; peer->dev_writes_enabled = NULL; peer->dev_read_enabled = true; @@ -202,7 +202,7 @@ struct io_plan *peer_connected(struct io_conn *conn, int subd_fd; bool option_gossip_queries; struct connecting *connect; - bool deliberate_connection; + enum connection_prio prio; /* We remove any previous connection immediately, on the assumption it's dead */ peer = peer_htable_get(daemon->peers, id); @@ -257,18 +257,22 @@ struct io_plan *peer_connected(struct io_conn *conn, } if (connect) { - deliberate_connection = true; + if (connect->transient) + prio = PRIO_TRANSIENT; + else + prio = PRIO_DELIBERATE; + /*~ Now we've connected, disable the callback which would * cause us to to try the next address on failure. */ io_set_finish(connect->conn, NULL, NULL); tal_free(connect); } else { - deliberate_connection = false; + prio = PRIO_UNSOLICITED; } /* This contains the per-peer state info; gossipd fills in pps->gs */ peer = new_peer(daemon, id, cs, their_features, is_websocket, conn, - deliberate_connection, &subd_fd); + prio, &subd_fd); /* Only takes over conn if it succeeds. */ if (!peer) return io_close(conn); @@ -386,23 +390,31 @@ static struct io_plan *conn_in(struct io_conn *conn, * Note, again, the notleak() to avoid our simplistic leak detection * code from thinking `conn` (which we don't keep a pointer to) is * leaked */ - return responder_handshake(notleak(conn), &daemon->mykey, + return responder_handshake(notleak_with_children(conn), &daemon->mykey, &conn_in_arg->addr, timeout, conn_in_arg->is_websocket, handshake_in_success, daemon); } -/* How much is peer worth (when considering disconnet)? */ -static size_t peer_score(const struct peer *peer) +/* How much is peer worth (when considering disconnect)? */ +static size_t peer_score(enum connection_prio prio, + struct subd **subds) { -#define PEER_SCORE_MAX 2 +#define PEER_SCORE_MAX 3 - /* We deliberately connected to it? Highest prio */ - if (peer->deliberate_connection) - return 2; - /* It has subds now? Higher prio */ - if (tal_count(peer->subds)) + switch (prio) { + case PRIO_DELIBERATE: + /* We definitely want this one */ + return 3; + case PRIO_TRANSIENT: + /* We're explicitly told to dispose of these! */ + return 0; + case PRIO_UNSOLICITED: + /* It has subds now? Higher prio */ + if (tal_count(subds)) + return 2; return 1; + } return 0; } @@ -413,13 +425,31 @@ void close_random_connection(struct daemon *daemon) struct peer *peer, *best_peer = NULL; size_t best_peer_score = PEER_SCORE_MAX + 1; struct peer_htable_iter it; + struct connecting *c; + struct connecting_htable_iter cit; + bool closed_connect_attempt = false; + + /* First, close all transient connection attempts in-flight */ + for (c = connecting_htable_first(daemon->connecting, &cit); + c; + c = connecting_htable_next(daemon->connecting, &cit)) { + if (!c->transient) + continue; + + status_debug("due to stress, closing transient connect attempt to %s", + fmt_node_id(tmpctx, &c->id)); + /* This tells destructor why it was closed */ + errno = EMFILE; + tal_free(c); + closed_connect_attempt = true; + } /* Prefer ones with no subds (just chatting), or failing that, * ones we didn't deliberately connect to. */ peer = peer_htable_pick(daemon->peers, pseudorand_u64(), &it); for (size_t i = 0; i < peer_htable_count(daemon->peers); i++) { - size_t score = peer_score(peer); + size_t score = peer_score(peer->prio, peer->subds); if (score < best_peer_score) { best_peer = peer; best_peer_score = score; @@ -432,11 +462,17 @@ void close_random_connection(struct daemon *daemon) peer = peer_htable_first(daemon->peers, &it); } - if (best_peer) { - status_debug("due to stress, randomly closing peer %s (score %zu)", - fmt_node_id(tmpctx, &best_peer->id), best_peer_score); - io_close(best_peer->to_peer); - } + if (!best_peer) + return; + + /* Don't close active peer if we closed an attempt */ + if (closed_connect_attempt + && best_peer_score > peer_score(PRIO_UNSOLICITED, NULL)) + return; + + status_debug("due to stress, randomly closing peer %s (score %zu)", + fmt_node_id(tmpctx, &best_peer->id), best_peer_score); + io_close(best_peer->to_peer); } /*~ When we get a direct connection in we set up its network address @@ -667,6 +703,8 @@ static void destroy_io_conn(struct io_conn *conn, struct connecting *connect) errstr = "peer closed connection"; if (streq(connect->connstate, "Cryptographic handshake")) errstr = "peer closed connection (wrong key?)"; + } else if (errno == EMFILE) { + errstr = "Terminated due to too many connections"; } add_errors_to_error_list(connect, @@ -891,6 +929,12 @@ static void try_connect_one_addr(struct connecting *connect) } fd = socket(af, SOCK_STREAM, 0); + /* If we're out of fds, and can drop one, re-try */ + if (fd < 0 && errno == EMFILE) { + close_random_connection(connect->daemon); + fd = socket(af, SOCK_STREAM, 0); + } + if (fd < 0) { tal_append_fmt(&connect->errors, "%s: opening %i socket gave %s. ", @@ -1631,7 +1675,8 @@ static void try_connect_peer(struct daemon *daemon, const struct node_id *id, struct wireaddr *gossip_addrs, struct wireaddr_internal *addrhint STEALS, - bool dns_fallback) + bool dns_fallback, + bool transient) { struct wireaddr_internal *addrs; bool use_proxy = daemon->always_use_proxy; @@ -1641,8 +1686,9 @@ static void try_connect_peer(struct daemon *daemon, /* Already existing? Must have crossed over, it'll know soon. */ peer = peer_htable_get(daemon->peers, id); if (peer) { - /* Note now that we explicitly tried to connect */ - peer->deliberate_connection = true; + /* Note if we explicitly tried to connect non-transiently */ + if (!transient) + peer->prio = PRIO_DELIBERATE; return; } @@ -1716,6 +1762,7 @@ static void try_connect_peer(struct daemon *daemon, connect->addrhint = tal_steal(connect, addrhint); connect->errors = tal_strdup(connect, ""); connect->conn = NULL; + connect->transient = transient; connecting_htable_add(daemon->connecting, connect); tal_add_destructor(connect, destroy_connecting); @@ -1730,13 +1777,15 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg) struct wireaddr_internal *addrhint; struct wireaddr *addrs; bool dns_fallback; + bool transient; if (!fromwire_connectd_connect_to_peer(tmpctx, msg, &id, &addrs, &addrhint, - &dns_fallback)) + &dns_fallback, + &transient)) master_badmsg(WIRE_CONNECTD_CONNECT_TO_PEER, msg); - try_connect_peer(daemon, &id, addrs, addrhint, dns_fallback); + try_connect_peer(daemon, &id, addrs, addrhint, dns_fallback, transient); } /* lightningd tells us a peer should be disconnected. */ diff --git a/connectd/connectd.h b/connectd/connectd.h index e2b6a200e..41b2c12e5 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -40,6 +40,16 @@ enum pong_expect_type { PONG_EXPECTED_PROBING = 2, }; +/*~ We classify connections by loose priority */ +enum connection_prio { + /* We deliberately connected to them. */ + PRIO_DELIBERATE, + /* They connected to us, unsolicited. */ + PRIO_UNSOLICITED, + /* We connected, but transiently. */ + PRIO_TRANSIENT, +}; + /*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are * already connected (by peer->id). */ struct peer { @@ -90,8 +100,8 @@ struct peer { /* Last time we received traffic */ struct timeabs last_recv_time; - /* Were we explicitly told to connect to this peer? */ - bool deliberate_connection; + /* How important does this peer seem to be? */ + enum connection_prio prio; bool dev_read_enabled; /* If non-NULL, this counts down; 0 means disable */ @@ -144,6 +154,9 @@ struct connecting { /* Accumulated errors */ char *errors; + + /* Is this a transient connection? */ + bool transient; }; static const struct node_id *connecting_keyof(const struct connecting *connecting) diff --git a/connectd/connectd_wire.csv b/connectd/connectd_wire.csv index f5fb279e5..0efad17af 100644 --- a/connectd/connectd_wire.csv +++ b/connectd/connectd_wire.csv @@ -55,6 +55,7 @@ msgdata,connectd_connect_to_peer,len,u32, msgdata,connectd_connect_to_peer,addrs,wireaddr,len msgdata,connectd_connect_to_peer,addrhint,?wireaddr_internal, msgdata,connectd_connect_to_peer,dns_fallback,bool, +msgdata,connectd_connect_to_peer,transient,bool, # Connectd->master: connect failed. msgtype,connectd_connect_failed,2020 diff --git a/lightningd/connect_control.c b/lightningd/connect_control.c index 31662b540..8cf469212 100644 --- a/lightningd/connect_control.c +++ b/lightningd/connect_control.c @@ -292,16 +292,23 @@ static void gossipd_got_addrs(struct subd *subd, { struct wireaddr *addrs; u8 *connectmsg; + struct peer *peer; + bool transient; if (!fromwire_gossipd_get_addrs_reply(tmpctx, msg, &addrs)) fatal("Gossipd gave bad GOSSIPD_GET_ADDRS_REPLY %s", tal_hex(msg, msg)); + /* We consider this transient unless we have a channel */ + peer = peer_by_id(d->ld, &d->id); + transient = !peer || !peer_any_channel(peer, channel_state_wants_peercomms, NULL); + connectmsg = towire_connectd_connect_to_peer(NULL, &d->id, addrs, d->addrhint, - d->dns_fallback); + d->dns_fallback, + transient); subd_send_msg(d->ld->connectd, take(connectmsg)); tal_free(d); }