/*~ This contains all the code to shuffle data between socket to the peer * itself, and the subdaemons. */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* Size of write(), to create uniform size packets. */ #define UNIFORM_MESSAGE_SIZE 1460 struct subd { /* Owner: we are in peer->subds[] */ struct peer *peer; /* The temporary or permanant channel_id */ struct channel_id channel_id; /* In passing, we can have a temporary one, too. */ struct channel_id *temporary_channel_id; /* The opening revocation basepoint, for v2 channel_id. */ struct pubkey *opener_revocation_basepoint; /* The actual connection to talk to it (NULL if it's not connected yet) */ struct io_conn *conn; /* Input buffer */ u8 *in; /* Output buffer */ struct msg_queue *outq; /* After we've told it to tx_abort, we don't send anything else. */ bool rcvd_tx_abort; }; /* FIXME: reorder! */ static bool is_urgent(enum peer_wire type); static void destroy_connected_subd(struct subd *subd); static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer); static struct subd *find_subd(struct peer *peer, const struct channel_id *channel_id) { for (size_t i = 0; i < tal_count(peer->subds); i++) { struct subd *subd = peer->subds[i]; /* Once we sent it tx_abort, we pretend it doesn't exist */ if (subd->rcvd_tx_abort) continue; /* Once we see a message using the real channel_id, we * clear the temporary_channel_id */ if (channel_id_eq(&subd->channel_id, channel_id)) { subd->temporary_channel_id = tal_free(subd->temporary_channel_id); return subd; } if (subd->temporary_channel_id && channel_id_eq(subd->temporary_channel_id, channel_id)) { return subd; } } return NULL; } /* We try to send the final messages, but if buffer is full and they're * not reading, we have to give up. */ static void close_peer_io_timeout(struct peer *peer) { status_peer_unusual(&peer->id, CI_UNEXPECTED "Peer did not close, forcing close"); io_close(peer->to_peer); } static void close_subd_timeout(struct subd *subd) { status_peer_broken(&subd->peer->id, "Subd did not close, forcing close"); io_close(subd->conn); } static void msg_to_peer_outq(struct peer *peer, const u8 *msg TAKES) { if (is_urgent(fromwire_peektype(msg))) peer->peer_out_urgent++; msg_enqueue(peer->peer_outq, msg); } void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { status_peer_io(LOG_IO_OUT, &peer->id, msg); msg_to_peer_outq(peer, msg); } static void drain_peer(struct peer *peer) { assert(tal_count(peer->subds) == 0); /* You have five seconds to drain. */ peer->draining_state = WRITING_TO_PEER; status_peer_debug(&peer->id, "disconnect_peer: draining with 5 second timer."); notleak(new_reltimer(&peer->daemon->timers, peer->to_peer, time_from_sec(5), close_peer_io_timeout, peer)); io_wake(peer->peer_outq); /* We will discard what they send us, but listen so we catch closes */ io_wake(&peer->peer_in); } void disconnect_peer(struct peer *peer) { peer->draining_state = READING_FROM_SUBDS; for (size_t i = 0; i < tal_count(peer->subds); i++) { /* Start timer in case it doesn't close by itself */ if (peer->subds[i]->conn) { status_peer_debug(&peer->id, "disconnect_peer: setting 5 second timer for subd %zu/%zu.", i, tal_count(peer->subds)); notleak(new_reltimer(&peer->daemon->timers, peer->subds[i], time_from_sec(5), close_subd_timeout, peer->subds[i])); } else { /* We told lightningd that peer spoke, but it hasn't returned yet. */ tal_arr_remove(&peer->subds, i); i--; } } if (tal_count(peer->subds) != 0) { status_peer_debug(&peer->id, "disconnect_peer: waking %zu subds.", tal_count(peer->subds)); /* Wake them up so we read again */ io_wake(&peer->subds); } else { status_peer_debug(&peer->id, "disconnect_peer: no subds, draining now."); /* No subds left, start draining peer */ drain_peer(peer); } } static void free_all_subds(struct peer *peer) { for (size_t i = 0; i < tal_count(peer->subds); i++) { /* Once conn exists, subd is a child of the conn. Free conn, free subd. */ if (peer->subds[i]->conn) { tal_del_destructor(peer->subds[i], destroy_connected_subd); tal_free(peer->subds[i]->conn); } else { /* We told lightningd that peer spoke, but it hasn't returned yet. */ tal_free(peer->subds[i]); } } tal_resize(&peer->subds, 0); } /* Send warning, close connection to peer */ static void send_warning(struct peer *peer, const char *fmt, ...) { va_list ap; u8 *msg; va_start(ap, fmt); status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap); va_end(ap); va_start(ap, fmt); msg = towire_warningfmtv(NULL, NULL, fmt, ap); va_end(ap); inject_peer_msg(peer, take(msg)); /* Free all the subds immediately */ free_all_subds(peer); disconnect_peer(peer); } /* Kicks off write_to_peer() to look for more gossip to send from store */ static void wake_gossip(struct peer *peer); static struct oneshot *gossip_stream_timer(struct peer *peer) { u32 next; /* BOLT #7: * * A node: *... * - SHOULD flush outgoing gossip messages once every 60 seconds, * independently of the arrival times of the messages. * - Note: this results in staggered announcements that are unique * (not duplicated). */ /* We shorten this for dev_fast_gossip! */ next = GOSSIP_FLUSH_INTERVAL(peer->daemon->dev_fast_gossip); return new_reltimer(&peer->daemon->timers, peer, time_from_sec(next), wake_gossip, peer); } /* Statistically, how many peers to we tell about each channel? */ #define GOSSIP_SPAM_REDUNDANCY 50 /* BOLT #7: * A node: * - MUST NOT relay any gossip messages it did not generate itself, * unless explicitly requested. */ /* i.e. the strong implication is that we spam our own gossip aggressively! * "Look at me!" "Look at me!!!!". */ static void spam_new_peer(struct peer *peer, struct gossmap *gossmap) { struct gossmap_node *me; const u8 *msg; u64 send_threshold; /* Find ourselves; if no channels, nothing to send */ me = gossmap_find_node(gossmap, &peer->daemon->id); if (!me) return; send_threshold = -1ULL; /* Just in case we have many peers and not all are connecting or * some other corner case, send everything to first few. */ if (peer_htable_count(peer->daemon->peers) > GOSSIP_SPAM_REDUNDANCY && me->num_chans > GOSSIP_SPAM_REDUNDANCY) { send_threshold = -1ULL / me->num_chans * GOSSIP_SPAM_REDUNDANCY; } for (size_t i = 0; i < me->num_chans; i++) { struct gossmap_chan *chan = gossmap_nth_chan(gossmap, me, i, NULL); /* We set this so we'll send a fraction of all our channels */ if (pseudorand_u64() > send_threshold) continue; /* Send channel_announce */ msg = gossmap_chan_get_announce(NULL, gossmap, chan); inject_peer_msg(peer, take(msg)); /* Send both channel_updates (if they exist): both help people * use our channel, so we care! */ for (int dir = 0; dir < 2; dir++) { if (!gossmap_chan_set(chan, dir)) continue; msg = gossmap_chan_get_update(NULL, gossmap, chan, dir); inject_peer_msg(peer, take(msg)); } } /* If we have one, we should send our own node_announcement */ msg = gossmap_node_get_announce(NULL, gossmap, me); if (msg) inject_peer_msg(peer, take(msg)); } void setup_peer_gossip_store(struct peer *peer, const struct feature_set *our_features, const u8 *their_features) { struct gossmap *gossmap = get_gossmap(peer->daemon); peer->gs.grf = new_gossip_rcvd_filter(peer); peer->gs.iter = gossmap_iter_new(peer, gossmap); peer->gs.bytes_this_second = 0; peer->gs.bytes_start_time = time_mono(); /* BOLT #7: * * A node: * - MUST NOT relay any gossip messages it did not generate itself, * unless explicitly requested. */ peer->gs.gossip_timer = NULL; peer->gs.active = false; spam_new_peer(peer, gossmap); return; } static bool is_urgent(enum peer_wire type) { switch (type) { /* We are happy to batch UPDATE_ADD messages: it's the * commitment signed which matters. */ case WIRE_UPDATE_ADD_HTLC: case WIRE_UPDATE_FULFILL_HTLC: case WIRE_UPDATE_FAIL_HTLC: case WIRE_UPDATE_FAIL_MALFORMED_HTLC: case WIRE_UPDATE_FEE: /* Gossip messages are also non-urgent */ case WIRE_CHANNEL_ANNOUNCEMENT: case WIRE_NODE_ANNOUNCEMENT: case WIRE_CHANNEL_UPDATE: return false; /* We don't delay for anything else, but we use a switch * statement to make you think about new cases! */ case WIRE_INIT: case WIRE_ERROR: case WIRE_WARNING: case WIRE_TX_ADD_INPUT: case WIRE_TX_ADD_OUTPUT: case WIRE_TX_REMOVE_INPUT: case WIRE_TX_REMOVE_OUTPUT: case WIRE_TX_COMPLETE: case WIRE_TX_ABORT: case WIRE_TX_SIGNATURES: case WIRE_OPEN_CHANNEL: case WIRE_ACCEPT_CHANNEL: case WIRE_FUNDING_CREATED: case WIRE_FUNDING_SIGNED: case WIRE_CHANNEL_READY: case WIRE_OPEN_CHANNEL2: case WIRE_ACCEPT_CHANNEL2: case WIRE_TX_INIT_RBF: case WIRE_TX_ACK_RBF: case WIRE_SHUTDOWN: case WIRE_CLOSING_SIGNED: case WIRE_CLOSING_COMPLETE: case WIRE_CLOSING_SIG: case WIRE_UPDATE_BLOCKHEIGHT: case WIRE_CHANNEL_REESTABLISH: case WIRE_ANNOUNCEMENT_SIGNATURES: case WIRE_QUERY_SHORT_CHANNEL_IDS: case WIRE_REPLY_SHORT_CHANNEL_IDS_END: case WIRE_QUERY_CHANNEL_RANGE: case WIRE_REPLY_CHANNEL_RANGE: case WIRE_GOSSIP_TIMESTAMP_FILTER: case WIRE_ONION_MESSAGE: case WIRE_PEER_STORAGE: case WIRE_PEER_STORAGE_RETRIEVAL: case WIRE_STFU: case WIRE_SPLICE: case WIRE_SPLICE_ACK: case WIRE_SPLICE_LOCKED: case WIRE_PING: case WIRE_PONG: case WIRE_PROTOCOL_BATCH_ELEMENT: case WIRE_START_BATCH: case WIRE_COMMITMENT_SIGNED: case WIRE_REVOKE_AND_ACK: return true; }; /* plugins can inject other messages. */ return true; } /* Process and eat protocol_batch_element messages, encrypt each element message * and return the encrypted messages as one long byte array. */ static u8 *process_batch_elements(const tal_t *ctx, struct peer *peer, const u8 *msg TAKES) { u8 *ret = tal_arr(ctx, u8, 0); size_t ret_size = 0; const u8 *cursor = msg; size_t plen = tal_count(msg); status_debug("Processing batch elements of %zu bytes. %s", plen, tal_hex(tmpctx, msg)); do { u8 *element_bytes; u16 element_size; struct channel_id channel_id; u8 *enc_msg; if (fromwire_u16(&cursor, &plen) != WIRE_PROTOCOL_BATCH_ELEMENT) { status_broken("process_batch_elements on msg that is" " not WIRE_PROTOCOL_BATCH_ELEMENT. %s", tal_hexstr(tmpctx, cursor, plen)); return tal_free(ret); } fromwire_channel_id(&cursor, &plen, &channel_id); element_size = fromwire_u16(&cursor, &plen); if (!element_size) { status_broken("process_batch_elements cannot have zero" " length elements. %s", tal_hexstr(tmpctx, cursor, plen)); return tal_free(ret); } element_bytes = fromwire_tal_arrn(NULL, &cursor, &plen, element_size); if (!element_bytes) { status_broken("process_batch_elements fromwire_tal_arrn" " %s", tal_hexstr(tmpctx, cursor, plen)); return tal_free(ret); } status_debug("Processing batch extracted item %s. %s", peer_wire_name(fromwire_peektype(element_bytes)), tal_hex(tmpctx, element_bytes)); enc_msg = cryptomsg_encrypt_msg(tmpctx, &peer->cs, take(element_bytes)); tal_resize(&ret, ret_size + tal_bytelen(enc_msg)); memcpy(&ret[ret_size], enc_msg, tal_bytelen(enc_msg)); ret_size += tal_bytelen(enc_msg); } while(plen); if (taken(msg)) tal_free(msg); return ret; } /* --dev-disconnect can do magic things: if this returns non-NULL, free msg and do that */ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) { int type = fromwire_peektype(msg); switch (dev_disconnect_out(&peer->id, type)) { case DEV_DISCONNECT_OUT_BEFORE: return io_close(peer->to_peer); case DEV_DISCONNECT_OUT_AFTER: /* Disallow reads from now on */ peer->dev_read_enabled = false; free_all_subds(peer); drain_peer(peer); return NULL; case DEV_DISCONNECT_OUT_BLACKHOLE: /* Disable both reads and writes from now on */ peer->dev_read_enabled = false; peer->dev_writes_enabled = talz(peer, u32); return NULL; case DEV_DISCONNECT_OUT_NORMAL: return NULL; case DEV_DISCONNECT_OUT_DROP: /* Tell them to read again. */ io_wake(&peer->subds); return msg_queue_wait(peer->to_peer, peer->peer_outq, write_to_peer, peer); case DEV_DISCONNECT_OUT_DISABLE_AFTER: peer->dev_read_enabled = false; peer->dev_writes_enabled = tal(peer, u32); *peer->dev_writes_enabled = 1; return NULL; } abort(); } /* Do we have enough bytes without padding? */ static bool have_full_encrypted_queue(const struct peer *peer) { return membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE; } /* Do we have nothing in queue? */ static bool have_empty_encrypted_queue(const struct peer *peer) { return membuf_num_elems(&peer->encrypted_peer_out) == 0; } /* (Continue) writing the encrypted_peer_out array */ static struct io_plan *write_encrypted_to_peer(struct peer *peer) { assert(membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE); return io_write_partial(peer->to_peer, membuf_elems(&peer->encrypted_peer_out), UNIFORM_MESSAGE_SIZE, &peer->encrypted_peer_out_sent, write_to_peer, peer); } /* Close the connection if this fails */ static bool encrypt_append(struct peer *peer, const u8 *msg TAKES) { int type = fromwire_peektype(msg); const u8 *enc; size_t enclen; /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { enc = process_batch_elements(tmpctx, peer, msg); if (!enc) return false; } else { enc = cryptomsg_encrypt_msg(tmpctx, &peer->cs, msg); } enclen = tal_bytelen(enc); memcpy(membuf_add(&peer->encrypted_peer_out, enclen), enc, enclen); return true; } static void pad_encrypted_queue(struct peer *peer) { size_t needed, pingpad; u8 *ping; /* BOLT #8: * * ``` * +------------------------------- * |2-byte encrypted message length| * +------------------------------- * | 16-byte MAC of the encrypted | * | message length | * +------------------------------- * | | * | | * | encrypted Lightning | * | message | * | | * +------------------------------- * | 16-byte MAC of the | * | Lightning message | * +------------------------------- * ``` * * The prefixed message length is encoded as a 2-byte big-endian integer, * for a total maximum packet length of `2 + 16 + 65535 + 16` = `65569` bytes. */ assert(membuf_num_elems(&peer->encrypted_peer_out) < UNIFORM_MESSAGE_SIZE); needed = UNIFORM_MESSAGE_SIZE - membuf_num_elems(&peer->encrypted_peer_out); /* BOLT #1: * 1. type: 18 (`ping`) * 2. data: * * [`u16`:`num_pong_bytes`] * * [`u16`:`byteslen`] * * [`byteslen*byte`:`ignored`] */ /* So smallest possible ping is 6 bytes (2 byte type field) */ if (needed < 2 + 16 + 16 + 6) needed += UNIFORM_MESSAGE_SIZE; pingpad = needed - (2 + 16 + 16 + 6); /* Note: we don't bother --dev-disconnect here */ /* BOLT #1: * A node receiving a `ping` message: * - if `num_pong_bytes` is less than 65532: * - MUST respond by sending a `pong` message, with `byteslen` equal to `num_pong_bytes`. * - otherwise (`num_pong_bytes` is **not** less than 65532): * - MUST ignore the `ping`. */ ping = make_ping(NULL, 65535, pingpad); if (!encrypt_append(peer, take(ping))) abort(); assert(have_full_encrypted_queue(peer)); assert(membuf_num_elems(&peer->encrypted_peer_out) % UNIFORM_MESSAGE_SIZE == 0); } /* Kicks off write_to_peer() to look for more gossip to send from store */ static void wake_gossip(struct peer *peer) { bool flush_gossip_filter = true; /* With dev-fast-gossip, we clean every 2 seconds, which is too * fast for our slow tests! So we only call this one time in 5 * actually twice that, as it's not per-peer! */ static int gossip_age_count; if (peer->daemon->dev_fast_gossip && gossip_age_count++ % 5 != 0) flush_gossip_filter = false; /* Don't remember sent per-peer gossip forever. */ if (flush_gossip_filter) gossip_rcvd_filter_age(peer->gs.grf); peer->gs.active = !peer->daemon->dev_suppress_gossip; io_wake(peer->peer_outq); /* And go again in 60 seconds (from now, now when we finish!) */ peer->gs.gossip_timer = gossip_stream_timer(peer); } /* Gossip response or something from gossip store */ static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer) { const u8 *msg; struct timemono now; struct gossmap *gossmap; u32 timestamp; const u8 **msgs; /* If it's been over a second, make a fresh start. */ now = time_mono(); if (time_to_sec(timemono_between(now, peer->gs.bytes_start_time)) > 0) { peer->gs.bytes_start_time = now; peer->gs.bytes_this_second = 0; } /* Sent too much this second? */ if (peer->gs.bytes_this_second > peer->daemon->gossip_stream_limit) { /* Replace normal timer with a timer after throttle. */ peer->gs.active = false; tal_free(peer->gs.gossip_timer); peer->gs.gossip_timer = new_abstimer(&peer->daemon->timers, peer, timemono_add(peer->gs.bytes_start_time, time_from_sec(1)), wake_gossip, peer); return NULL; } gossmap = get_gossmap(peer->daemon); /* This can return more than one. */ msgs = maybe_create_query_responses(tmpctx, peer, gossmap); if (tal_count(msgs) > 0) { /* We return the first one for immediate sending, and queue * others for future. We add all the lengths now though! */ for (size_t i = 0; i < tal_count(msgs); i++) { peer->gs.bytes_this_second += tal_bytelen(msgs[i]); status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]); if (i > 0) msg_to_peer_outq(peer, take(msgs[i])); } return msgs[0]; } /* dev-mode can suppress all gossip */ if (peer->daemon->dev_suppress_gossip) return NULL; /* Not streaming right now? */ if (!peer->gs.active) return NULL; /* This should be around to kick us every 60 seconds */ assert(peer->gs.gossip_timer); again: msg = gossmap_stream_next(ctx, gossmap, peer->gs.iter, ×tamp); if (msg) { /* Don't send back gossip they sent to us! */ if (gossip_rcvd_filter_del(peer->gs.grf, msg)) { msg = tal_free(msg); goto again; } /* Check timestamp (zero for channel_announcement with * no update yet!): FIXME: we could ignore this! */ if (timestamp && (timestamp < peer->gs.timestamp_min || timestamp > peer->gs.timestamp_max)) { msg = tal_free(msg); goto again; } peer->gs.bytes_this_second += tal_bytelen(msg); status_peer_io(LOG_IO_OUT, &peer->id, msg); return msg; } /* No gossip left to send */ peer->gs.active = false; return NULL; } /* Mutual recursion */ static void send_ping(struct peer *peer); static void set_ping_timer(struct peer *peer) { if (peer->daemon->dev_no_ping_timer) { peer->ping_timer = NULL; return; } peer->ping_timer = new_reltimer(&peer->daemon->timers, peer, time_from_sec(15 + pseudorand(30)), send_ping, peer); } static void send_ping(struct peer *peer) { /* If it's still sending us traffic, maybe ping reply is backed up? * That's OK, ping is just to make sure it's still alive, and clearly * it is. */ if (timemono_before(peer->last_recv_time, timemono_sub(time_mono(), time_from_sec(60)))) { /* Already have a ping in flight? */ if (peer->expecting_pong != PONG_UNEXPECTED) { status_peer_debug(&peer->id, "Last ping unreturned: hanging up"); if (peer->to_peer) io_close(peer->to_peer); return; } inject_peer_msg(peer, take(make_ping(NULL, 1, 0))); peer->ping_start = time_mono(); peer->expecting_pong = PONG_EXPECTED_PROBING; } set_ping_timer(peer); } void set_custommsgs(struct daemon *daemon, const u8 *msg) { tal_free(daemon->custom_msgs); if (!fromwire_connectd_set_custommsgs(daemon, msg, &daemon->custom_msgs)) master_badmsg(WIRE_CONNECTD_SET_CUSTOMMSGS, msg); status_debug("Now allowing %zu custom message types", tal_count(daemon->custom_msgs)); } void send_custommsg(struct daemon *daemon, const u8 *msg) { struct node_id id; u8 *custommsg; struct peer *peer; if (!fromwire_connectd_custommsg_out(tmpctx, msg, &id, &custommsg)) master_badmsg(WIRE_CONNECTD_CUSTOMMSG_OUT, msg); /* Races can happen: this might be gone by now. */ peer = peer_htable_get(daemon->peers, &id); if (peer) inject_peer_msg(peer, take(custommsg)); } static void handle_ping_in(struct peer *peer, const u8 *msg) { u8 *pong; if (!check_ping_make_pong(NULL, msg, &pong)) { send_warning(peer, "Invalid ping %s", tal_hex(msg, msg)); return; } if (pong) inject_peer_msg(peer, take(pong)); } static void handle_ping_reply(struct peer *peer, const u8 *msg) { u8 *ignored; size_t i; /* We print this out because we asked for pong, so can't spam us... */ if (!fromwire_pong(msg, msg, &ignored)) status_peer_unusual(&peer->id, "Got malformed ping reply %s", tal_hex(tmpctx, msg)); /* We print this because dev versions of Core Lightning embed * version here: see check_ping_make_pong! */ for (i = 0; i < tal_count(ignored); i++) { if (ignored[i] < ' ' || ignored[i] == 127) break; } status_debug("Got pong %zu bytes (%.*s...)", tal_count(ignored), (int)i, (char *)ignored); daemon_conn_send(peer->daemon->master, take(towire_connectd_ping_done(NULL, peer->ping_reqid, true, tal_bytelen(msg)))); } static void handle_pong_in(struct peer *peer, const u8 *msg) { switch (peer->expecting_pong) { case PONG_EXPECTED_COMMAND: handle_ping_reply(peer, msg); /* fall thru */ case PONG_EXPECTED_PROBING: peer->expecting_pong = PONG_UNEXPECTED; daemon_conn_send(peer->daemon->master, take(towire_connectd_ping_latency(NULL, &peer->id, time_to_nsec(timemono_since(peer->ping_start))))); return; case PONG_UNEXPECTED: status_debug("Unexpected pong?"); return; } abort(); } /* Various cases where we don't send the msg to a gossipd, we want to * do IO logging! */ static void log_peer_io(const struct peer *peer, const u8 *msg) { status_peer_io(LOG_IO_IN, &peer->id, msg); io_wake(peer->peer_outq); } /* Forward to gossipd */ static void handle_gossip_in(struct peer *peer, const u8 *msg) { u8 *gmsg; /* We warn at 250000, drop at 500000 */ if (daemon_conn_queue_length(peer->daemon->gossipd) > 500000) return; gmsg = towire_gossipd_recv_gossip(NULL, &peer->id, msg); /* gossipd doesn't log IO, so we log it here. */ log_peer_io(peer, msg); daemon_conn_send(peer->daemon->gossipd, take(gmsg)); } static void handle_gossip_timestamp_filter_in(struct peer *peer, const u8 *msg) { struct bitcoin_blkid chain_hash; u32 first_timestamp, timestamp_range; struct gossmap *gossmap = get_gossmap(peer->daemon); if (!fromwire_gossip_timestamp_filter(msg, &chain_hash, &first_timestamp, ×tamp_range)) { send_warning(peer, "gossip_timestamp_filter invalid: %s", tal_hex(tmpctx, msg)); return; } if (!bitcoin_blkid_eq(&chainparams->genesis_blockhash, &chain_hash)) { send_warning(peer, "gossip_timestamp_filter for bad chain: %s", tal_hex(tmpctx, msg)); return; } peer->gs.timestamp_min = first_timestamp; peer->gs.timestamp_max = first_timestamp + timestamp_range - 1; /* Make sure we never leave it on an impossible value. */ if (peer->gs.timestamp_max < peer->gs.timestamp_min) peer->gs.timestamp_max = UINT32_MAX; /* BOLT-gossip-filter-simplify #7: * The receiver: *... * - if `first_timestamp` is 0: * - SHOULD send all known gossip messages. * - otherwise, if `first_timestamp` is 0xFFFFFFFF: * - SHOULD NOT send any gossip messages (except its own). * - otherwise: * - SHOULD send gossip messages it receives from now own. */ /* For us, this means we only sweep the gossip store for messages * if the first_timestamp is 0 */ tal_free(peer->gs.iter); if (first_timestamp == 0) { peer->gs.iter = gossmap_iter_new(peer, gossmap); } else if (first_timestamp == 0xFFFFFFFF) { peer->gs.iter = gossmap_iter_new(peer, gossmap); gossmap_iter_end(gossmap, peer->gs.iter); } else { /* We are actually a bit nicer than the spec, and we include * "recent" gossip here. */ update_recent_timestamp(peer->daemon, gossmap); peer->gs.iter = gossmap_iter_dup(peer, peer->daemon->gossmap_iter_recent); } /* BOLT #7: * - MAY wait for the next outgoing gossip flush to send these. */ /* We send immediately the first time, after that we wait. */ if (!peer->gs.gossip_timer) wake_gossip(peer); } static bool find_custom_msg(const u16 *custom_msgs, u16 type) { for (size_t i = 0; i < tal_count(custom_msgs); i++) { if (custom_msgs[i] == type) return true; } return false; } static bool handle_custommsg(struct daemon *daemon, struct peer *peer, const u8 *msg) { enum peer_wire type = fromwire_peektype(msg); /* Messages we expect to handle ourselves. */ if (peer_wire_is_internal(type)) return false; /* We log it, since it's not going to a subdaemon */ status_peer_io(LOG_IO_IN, &peer->id, msg); /* Even unknown messages must be explicitly allowed */ if (type % 2 == 0 && !find_custom_msg(daemon->custom_msgs, type)) { send_warning(peer, "Invalid unknown even msg %s", tal_hex(msg, msg)); /* We "handled" it... */ return true; } /* The message is not part of the messages we know how to * handle. Assuming this is a custommsg, we just forward it to the * master. */ daemon_conn_send(daemon->master, take(towire_connectd_custommsg_in(NULL, &peer->id, msg))); return true; } void custommsg_completed(struct daemon *daemon, const u8 *msg) { struct node_id id; const struct peer *peer; if (!fromwire_connectd_custommsg_in_complete(msg, &id)) master_badmsg(WIRE_CONNECTD_CUSTOMMSG_IN_COMPLETE, msg); /* If it's still around, log it. */ peer = peer_htable_get(daemon->peers, &id); if (peer) { status_peer_debug(&peer->id, "custommsg processing finished"); log_peer_io(peer, msg); } } /* We handle pings and gossip messages. */ static bool handle_message_locally(struct peer *peer, const u8 *msg) { enum peer_wire type = fromwire_peektype(msg); /* We remember these so we don't rexmit them */ gossip_rcvd_filter_add(peer->gs.grf, msg); if (type == WIRE_GOSSIP_TIMESTAMP_FILTER) { log_peer_io(peer, msg); handle_gossip_timestamp_filter_in(peer, msg); return true; } else if (type == WIRE_PING) { log_peer_io(peer, msg); handle_ping_in(peer, msg); return true; } else if (type == WIRE_PONG) { log_peer_io(peer, msg); handle_pong_in(peer, msg); return true; } else if (type == WIRE_ONION_MESSAGE) { log_peer_io(peer, msg); handle_onion_message(peer->daemon, peer, msg); return true; } else if (type == WIRE_QUERY_CHANNEL_RANGE) { handle_query_channel_range(peer, msg); return true; } else if (type == WIRE_QUERY_SHORT_CHANNEL_IDS) { handle_query_short_channel_ids(peer, msg); return true; } else if (handle_custommsg(peer->daemon, peer, msg)) { return true; } /* Do we want to divert to gossipd? */ if (is_msg_for_gossipd(msg)) { handle_gossip_in(peer, msg); return true; } return false; } /* Move "channel_id" to temporary. */ static void move_channel_id_to_temp(struct subd *subd) { tal_free(subd->temporary_channel_id); subd->temporary_channel_id = tal_dup(subd, struct channel_id, &subd->channel_id); } /* Only works for open_channel2 and accept_channel2 */ static struct pubkey *extract_revocation_basepoint(const tal_t *ctx, const u8 *msg) { const u8 *cursor = msg; size_t max = tal_bytelen(msg); enum peer_wire t; struct pubkey pubkey; t = fromwire_u16(&cursor, &max); switch (t) { case WIRE_OPEN_CHANNEL2: /* BOLT #2: * 1. type: 64 (`open_channel2`) * 2. data: * * [`chain_hash`:`chain_hash`] * * [`channel_id`:`temporary_channel_id`] * * [`u32`:`funding_feerate_perkw`] * * [`u32`:`commitment_feerate_perkw`] * * [`u64`:`funding_satoshis`] * * [`u64`:`dust_limit_satoshis`] * * [`u64`:`max_htlc_value_in_flight_msat`] * * [`u64`:`htlc_minimum_msat`] * * [`u16`:`to_self_delay`] * * [`u16`:`max_accepted_htlcs`] * * [`u32`:`locktime`] * * [`point`:`funding_pubkey`] * * [`point`:`revocation_basepoint`] */ fromwire_pad(&cursor, &max, sizeof(struct bitcoin_blkid) + sizeof(struct channel_id) + sizeof(u32) + sizeof(u32) + sizeof(u64) + sizeof(u64) + sizeof(u64) + sizeof(u64) + sizeof(u16) + sizeof(u16) + sizeof(u32) + PUBKEY_CMPR_LEN); break; case WIRE_ACCEPT_CHANNEL2: /* BOLT #2: * 1. type: 65 (`accept_channel2`) * 2. data: * * [`channel_id`:`temporary_channel_id`] * * [`u64`:`funding_satoshis`] * * [`u64`:`dust_limit_satoshis`] * * [`u64`:`max_htlc_value_in_flight_msat`] * * [`u64`:`htlc_minimum_msat`] * * [`u32`:`minimum_depth`] * * [`u16`:`to_self_delay`] * * [`u16`:`max_accepted_htlcs`] * * [`point`:`funding_pubkey`] * * [`point`:`revocation_basepoint`] */ fromwire_pad(&cursor, &max, sizeof(struct channel_id) + sizeof(u64) + sizeof(u64) + sizeof(u64) + sizeof(u64) + sizeof(u32) + sizeof(u16) + sizeof(u16) + PUBKEY_CMPR_LEN); break; default: abort(); } fromwire_pubkey(&cursor, &max, &pubkey); if (!cursor) return NULL; return tal_dup(ctx, struct pubkey, &pubkey); } /* Only works for funding_created */ static bool extract_funding_created_funding(const u8 *funding_created, struct bitcoin_outpoint *outp) { const u8 *cursor = funding_created; size_t max = tal_bytelen(funding_created); enum peer_wire t; t = fromwire_u16(&cursor, &max); switch (t) { case WIRE_FUNDING_CREATED: /* BOLT #2: * 1. type: 34 (`funding_created`) * 2. data: * * [`32*byte`:`temporary_channel_id`] * * [`sha256`:`funding_txid`] * * [`u16`:`funding_output_index`] */ fromwire_pad(&cursor, &max, 32); fromwire_bitcoin_txid(&cursor, &max, &outp->txid); outp->n = fromwire_u16(&cursor, &max); break; default: abort(); } return cursor != NULL; } static void update_v1_channelid(struct subd *subd, const u8 *funding_created) { struct bitcoin_outpoint outp; if (!extract_funding_created_funding(funding_created, &outp)) { status_peer_unusual(&subd->peer->id, "WARNING: funding_created no tx info?"); return; } move_channel_id_to_temp(subd); derive_channel_id(&subd->channel_id, &outp); } static void update_v2_channelid(struct subd *subd, const u8 *accept_channel2) { struct pubkey *acc_basepoint; acc_basepoint = extract_revocation_basepoint(tmpctx, accept_channel2); if (!acc_basepoint) { status_peer_unusual(&subd->peer->id, "WARNING: accept_channel2 no revocation_basepoint?"); return; } if (!subd->opener_revocation_basepoint) { status_peer_unusual(&subd->peer->id, "WARNING: accept_channel2 without open_channel2?"); return; } move_channel_id_to_temp(subd); derive_channel_id_v2(&subd->channel_id, subd->opener_revocation_basepoint, acc_basepoint); } /* We maintain channel_id matching for subds by snooping: we set it manually * for first packet (open_channel or open_channel2). */ static void maybe_update_channelid(struct subd *subd, const u8 *msg) { switch (fromwire_peektype(msg)) { case WIRE_OPEN_CHANNEL: extract_channel_id(msg, &subd->channel_id); break; case WIRE_OPEN_CHANNEL2: subd->opener_revocation_basepoint = extract_revocation_basepoint(subd, msg); break; case WIRE_ACCEPT_CHANNEL2: update_v2_channelid(subd, msg); break; case WIRE_FUNDING_CREATED: update_v1_channelid(subd, msg); break; } } static const u8 *next_msg_for_peer(struct peer *peer) { const u8 *msg; msg = msg_dequeue(peer->peer_outq); if (msg) { if (is_urgent(fromwire_peektype(msg))) { peer->peer_out_urgent--; } } else { /* Nothing in queue means nothing urgent in queue, surely! */ assert(peer->peer_out_urgent == 0); /* Draining? Don't send gossip. */ if (peer->draining_state == WRITING_TO_PEER) return NULL; /* If they want us to send gossip, do so now. */ msg = maybe_gossip_msg(NULL, peer); if (!msg) return NULL; } /* dev_disconnect can disable writes (discard everything) */ if (peer->dev_writes_enabled) { if (*peer->dev_writes_enabled == 0) { return tal_free(msg); } (*peer->dev_writes_enabled)--; } return msg; } static void nonurgent_flush(struct peer *peer) { peer->nonurgent_flush_timer = NULL; peer->flushing_nonurgent = true; io_wake(peer->peer_outq); } static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { bool do_flush; assert(peer->to_peer == peer_conn); /* We always pad and send if we have an urgent msg or our * non-urgent has gone off, or we're trying to close. */ do_flush = (peer->peer_out_urgent != 0 || peer->flushing_nonurgent || peer->draining_state == WRITING_TO_PEER); peer->flushing_nonurgent = false; /* We wrote out some bytes from membuf. */ membuf_consume(&peer->encrypted_peer_out, peer->encrypted_peer_out_sent); peer->encrypted_peer_out_sent = 0; while (!have_full_encrypted_queue(peer)) { const u8 *msg; struct io_plan *dev_override; /* Pop tail of send queue (or gossip) */ msg = next_msg_for_peer(peer); if (!msg) { /* Draining? Shutdown socket (to avoid losing msgs) */ if (have_empty_encrypted_queue(peer) && peer->draining_state == WRITING_TO_PEER) { status_peer_debug(&peer->id, "draining done, shutting down"); io_wake(&peer->peer_in); return io_sock_shutdown(peer_conn); } /* If no urgent message, and not draining, we wait. */ if (!do_flush) { /* Tell them to read again, */ io_wake(&peer->subds); io_wake(&peer->peer_in); /* Set up a timer if not already set */ if (!have_empty_encrypted_queue(peer) && !peer->nonurgent_flush_timer) { /* Bias towards larger values, but don't be too predictable */ u32 max = pseudorand(1000); u32 msec = 1000 - pseudorand(1 + max); peer->nonurgent_flush_timer = new_reltimer(&peer->daemon->timers, peer, time_from_msec(msec), nonurgent_flush, peer); } /* Wait for them to wake us */ return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); } /* OK, add padding. */ pad_encrypted_queue(peer); } else { if (peer->draining_state == WRITING_TO_PEER) status_peer_debug(&peer->id, "draining, but sending %s.", peer_wire_name(fromwire_peektype(msg))); dev_override = msg_out_dev_disconnect(peer, msg); if (dev_override) { tal_free(msg); return dev_override; } if (!encrypt_append(peer, take(msg))) return io_close(peer->to_peer); } } peer->nonurgent_flush_timer = tal_free(peer->nonurgent_flush_timer); return write_encrypted_to_peer(peer); } static struct io_plan *read_from_subd(struct io_conn *subd_conn, struct subd *subd); static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, struct subd *subd) { maybe_update_channelid(subd, subd->in); /* Tell them to encrypt & write. */ msg_to_peer_outq(subd->peer, take(subd->in)); subd->in = NULL; /* Wait for them to wake us */ return io_wait(subd_conn, &subd->peer->subds, read_from_subd, subd); } static struct io_plan *read_from_subd(struct io_conn *subd_conn, struct subd *subd) { return io_read_wire(subd_conn, subd, &subd->in, read_from_subd_done, subd); } /* These four function handle peer->subd */ static struct io_plan *write_to_subd(struct io_conn *subd_conn, struct subd *subd) { const u8 *msg; assert(subd->conn == subd_conn); /* Pop tail of send queue */ msg = msg_dequeue(subd->outq); /* Nothing to send? */ if (!msg) { /* If peer is closed, close this. */ if (!subd->peer->to_peer) return io_close(subd_conn); /* Tell them to read again. */ io_wake(&subd->peer->peer_in); if (subd->peer->peer_in_lastmsg != -1) { u64 msec = time_to_msec(timemono_between(time_mono(), subd->peer->peer_in_lasttime)); if (msec > 5000 && !subd->peer->daemon->dev_lightningd_is_slow) status_peer_broken(&subd->peer->id, "wake delay for %s: %"PRIu64"msec", peer_wire_name(subd->peer->peer_in_lastmsg), msec); subd->peer->peer_in_lastmsg = -1; } /* Wait for them to wake us */ return msg_queue_wait(subd_conn, subd->outq, write_to_subd, subd); } return io_write_wire(subd_conn, take(msg), write_to_subd, subd); } static void destroy_connected_subd(struct subd *subd) { struct peer *peer = subd->peer; size_t pos; for (pos = 0; peer->subds[pos] != subd; pos++) assert(pos < tal_count(peer->subds)); tal_arr_remove(&peer->subds, pos); /* Make sure we try to keep reading from peer (might * have been waiting for write_to_subd) */ io_wake(&peer->peer_in); if (tal_count(peer->subds) == 0) { if (!peer->to_peer) { /* Nothing left */ tal_free(peer); } else if (peer->draining_state == READING_FROM_SUBDS) { /* We've finished draining subds, start draining peer */ drain_peer(peer); } } } static struct subd *new_subd(struct peer *peer, const struct channel_id *channel_id) { struct subd *subd; subd = tal(peer, struct subd); subd->peer = peer; subd->outq = msg_queue_new(subd, false); subd->channel_id = *channel_id; subd->temporary_channel_id = NULL; subd->opener_revocation_basepoint = NULL; subd->conn = NULL; subd->rcvd_tx_abort = false; /* Connect it to the peer */ tal_arr_expand(&peer->subds, subd); return subd; } static struct io_plan *close_peer_dev_disconnect(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); return io_close_cb(peer_conn, peer); } static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn, struct peer *peer); static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, struct peer *peer) { u8 *decrypted; struct channel_id channel_id; struct subd *subd; enum peer_wire type; struct io_plan *(*next_read)(struct io_conn *peer_conn, struct peer *peer) = read_hdr_from_peer; decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs, peer->peer_in); if (!decrypted) { status_peer_debug(&peer->id, "Bad encrypted packet len %zu", tal_bytelen(peer->peer_in)); return io_close(peer_conn); } tal_free(peer->peer_in); type = fromwire_peektype(decrypted); /* dev_disconnect can disable read */ if (!peer->dev_read_enabled) return read_hdr_from_peer(peer_conn, peer); switch (dev_disconnect_in(&peer->id, type)) { case DEV_DISCONNECT_IN_NORMAL: break; case DEV_DISCONNECT_IN_AFTER_RECV: next_read = close_peer_dev_disconnect; break; } /* We got something! */ peer->last_recv_time = time_mono(); /* Don't process packets while we're closing */ if (peer->draining_state != NOT_DRAINING) return next_read(peer_conn, peer); /* If we swallow this, just try again. */ if (handle_message_locally(peer, decrypted)) { /* Make sure to update peer->peer_in_lastmsg so we blame correct msg! */ goto out; } /* After this we should be able to match to subd by channel_id */ if (!extract_channel_id(decrypted, &channel_id)) { /* We won't log this anywhere else, so do it here. */ status_peer_io(LOG_IO_IN, &peer->id, decrypted); /* Could be a all-channel error or warning? Log it * more verbose: hang up on error. */ if (type == WIRE_ERROR || type == WIRE_WARNING) { char *desc = sanitize_error(tmpctx, decrypted, NULL); /* FIXME: We should check old gossip, since we get many of * these "no unspent txout" for closed channels which we * somehow missed. */ if (strstr(desc, "no unspent txout")) status_peer_debug(&peer->id, "Received %s: %s", peer_wire_name(type), desc); else status_peer_info(&peer->id, "Received %s: %s", peer_wire_name(type), desc); if (type == WIRE_WARNING) return next_read(peer_conn, peer); return io_close(peer_conn); } /* This sets final_msg: will close after sending warning */ send_warning(peer, "Unexpected message %s: %s", peer_wire_name(type), tal_hex(tmpctx, decrypted)); return next_read(peer_conn, peer); } /* If we don't find a subdaemon for this, create a new one. */ subd = find_subd(peer, &channel_id); if (!subd) { enum peer_wire t = fromwire_peektype(decrypted); /* Simplest to close on them at this point. */ if (peer->daemon->shutting_down) { status_peer_debug(&peer->id, "Shutting down: hanging up for %s", peer_wire_name(t)); return io_close(peer_conn); } status_peer_debug(&peer->id, "Activating for message %s", peer_wire_name(t)); subd = new_subd(peer, &channel_id); /* We tell lightningd to fire up a subdaemon to handle this! */ daemon_conn_send(peer->daemon->master, take(towire_connectd_peer_spoke(NULL, &peer->id, peer->counter, t, &channel_id, is_peer_error(tmpctx, decrypted)))); } /* Even if we just created it, call this to catch open_channel2 */ maybe_update_channelid(subd, decrypted); /* Tell them to write. */ msg_enqueue(subd->outq, take(decrypted)); /* Is this a tx_abort? Ignore from now on, and close after sending! */ if (type == WIRE_TX_ABORT) { subd->rcvd_tx_abort = true; /* In case it doesn't close by itself */ notleak(new_reltimer(&peer->daemon->timers, subd, time_from_sec(5), close_subd_timeout, subd)); } /* Wait for them to wake us */ peer->peer_in_lastmsg = type; out: peer->peer_in_lasttime = time_mono(); return io_wait(peer_conn, &peer->peer_in, next_read, peer); } static struct io_plan *read_body_from_peer(struct io_conn *peer_conn, struct peer *peer) { u16 len; if (!cryptomsg_decrypt_header(&peer->cs, peer->peer_in, &len)) return io_close(peer_conn); tal_resize(&peer->peer_in, (u32)len + CRYPTOMSG_BODY_OVERHEAD); return io_read(peer_conn, peer->peer_in, tal_count(peer->peer_in), read_body_from_peer_done, peer); } static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); /* BOLT #8: * * ### Receiving and Decrypting Messages * * In order to decrypt the _next_ message in the network * stream, the following steps are completed: * * 1. Read _exactly_ 18 bytes from the network buffer. */ peer->peer_in = tal_arr(peer, u8, CRYPTOMSG_HDR_SIZE); return io_read(peer_conn, peer->peer_in, CRYPTOMSG_HDR_SIZE, read_body_from_peer, peer); } static struct io_plan *subd_conn_init(struct io_conn *subd_conn, struct subd *subd) { subd->conn = subd_conn; /* subd is a child of the conn: free when it closes! */ tal_steal(subd->conn, subd); tal_add_destructor(subd, destroy_connected_subd); return io_duplex(subd_conn, read_from_subd(subd_conn, subd), write_to_subd(subd_conn, subd)); } /* Peer disconnected (we remove this if *we* close). */ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); /* We are no longer connected. Tell lightningd & gossipd */ peer->to_peer = NULL; send_disconnected(peer->daemon, &peer->id, peer->counter, peer->connect_starttime); /* Wake subds: give them 5 seconds to flush. */ for (size_t i = 0; i < tal_count(peer->subds); i++) { /* Might not be connected yet (no destructor, simple) */ if (!peer->subds[i]->conn) { tal_arr_remove(&peer->subds, i); i--; continue; } /* Wake the writers to subd: you have 5 seconds */ notleak(new_reltimer(&peer->daemon->timers, peer->subds[i], time_from_sec(5), close_subd_timeout, peer->subds[i])); io_wake(peer->subds[i]->outq); } /* If there were no subds, free peer immediately. */ if (tal_count(peer->subds) == 0) tal_free(peer); } /* When peer reconnects, we close the old connection unceremoniously. */ void destroy_peer_immediately(struct peer *peer) { /* Forgo normal destructors which involve timeouts */ if (peer->to_peer) tal_del_destructor2(peer->to_peer, destroy_peer_conn, peer); for (size_t i = 0; i < tal_count(peer->subds); i++) { if (peer->subds[i]->conn) tal_del_destructor(peer->subds[i], destroy_connected_subd); } tal_free(peer); } struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, struct peer *peer) { /*~ If conn closes, we drain the subd connections and wait for * lightningd to tell us to close with the peer */ tal_add_destructor2(peer_conn, destroy_peer_conn, peer); /* Start keepalives */ peer->expecting_pong = PONG_UNEXPECTED; set_ping_timer(peer); /* This used to be in openingd; don't break tests. */ status_peer_debug(&peer->id, "Handed peer, entering loop"); return io_duplex(peer_conn, read_hdr_from_peer(peer_conn, peer), write_to_peer(peer_conn, peer)); } void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd) { struct node_id id; u64 counter; struct peer *peer; struct channel_id channel_id; struct subd *subd; if (!fromwire_connectd_peer_connect_subd(msg, &id, &counter, &channel_id)) master_badmsg(WIRE_CONNECTD_PEER_CONNECT_SUBD, msg); /* If receiving fd failed, fd will be -1. Log and ignore * (subd will see immediate hangup). */ if (fd == -1) { static bool recvfd_logged = false; if (!recvfd_logged) { status_broken("receiving lightningd fd failed for %s: %s", fmt_node_id(tmpctx, &id), strerror(errno)); recvfd_logged = true; } /* Maybe free up some fds by closing something. */ close_random_connection(daemon); return; } /* Races can happen: this might be gone by now (or reconnected!). */ peer = peer_htable_get(daemon->peers, &id); if (!peer || peer->counter != counter) { close(fd); return; } /* Could be disconnecting now */ if (!peer->to_peer || peer->draining_state != NOT_DRAINING) { close(fd); return; } /* If peer said something, we created this and queued msg. */ subd = find_subd(peer, &channel_id); if (!subd) { subd = new_subd(peer, &channel_id); /* Implies lightningd is ready for another peer. */ release_one_waiting_connection(peer->daemon, tal_fmt(tmpctx, "%s given a subd", fmt_node_id(tmpctx, &id))); } assert(!subd->conn); /* This sets subd->conn inside subd_conn_init, and reparents subd! */ io_new_conn(peer, fd, subd_conn_init, subd); } /* Lightningd says to send a ping */ void send_manual_ping(struct daemon *daemon, const u8 *msg) { u8 *ping; struct node_id id; u64 reqid; u16 len, num_pong_bytes; struct peer *peer; if (!fromwire_connectd_ping(msg, &reqid, &id, &num_pong_bytes, &len)) master_badmsg(WIRE_CONNECTD_PING, msg); peer = peer_htable_get(daemon->peers, &id); if (!peer) { daemon_conn_send(daemon->master, take(towire_connectd_ping_done(NULL, reqid, false, 0))); return; } /* We're not supposed to send another ping until previous replied */ if (peer->expecting_pong != PONG_UNEXPECTED) { daemon_conn_send(daemon->master, take(towire_connectd_ping_done(NULL, reqid, false, 0))); return; } /* It should never ask for an oversize ping. */ ping = make_ping(NULL, num_pong_bytes, len); if (tal_count(ping) > 65535) status_failed(STATUS_FAIL_MASTER_IO, "Oversize ping"); peer->ping_start = time_mono(); inject_peer_msg(peer, take(ping)); status_debug("sending ping expecting %sresponse", num_pong_bytes >= 65532 ? "no " : ""); /* BOLT #1: * * A node receiving a `ping` message: * - if `num_pong_bytes` is less than 65532: * - MUST respond by sending a `pong` message, with `byteslen` equal * to `num_pong_bytes`. * - otherwise (`num_pong_bytes` is **not** less than 65532): * - MUST ignore the `ping`. */ if (num_pong_bytes >= 65532) { daemon_conn_send(daemon->master, take(towire_connectd_ping_done(NULL, reqid, true, 0))); return; } /* We'll respond to lightningd once the pong comes in */ peer->expecting_pong = PONG_EXPECTED_COMMAND; peer->ping_reqid = reqid; /* Since we're doing this manually, kill and restart timer. */ tal_free(peer->ping_timer); set_ping_timer(peer); }