This is exactly what membuf is for: it handles expansion much more neatly. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
1733 lines
49 KiB
C
1733 lines
49 KiB
C
/*~ This contains all the code to shuffle data between socket to the peer
|
|
* itself, and the subdaemons. */
|
|
#include "config.h"
|
|
#include <ccan/tal/str/str.h>
|
|
#include <common/cryptomsg.h>
|
|
#include <common/daemon_conn.h>
|
|
#include <common/dev_disconnect.h>
|
|
#include <common/gossmap.h>
|
|
#include <common/memleak.h>
|
|
#include <common/ping.h>
|
|
#include <common/status.h>
|
|
#include <common/timeout.h>
|
|
#include <common/utils.h>
|
|
#include <common/wire_error.h>
|
|
#include <connectd/connectd.h>
|
|
#include <connectd/connectd_gossipd_wiregen.h>
|
|
#include <connectd/connectd_wiregen.h>
|
|
#include <connectd/gossip_rcvd_filter.h>
|
|
#include <connectd/multiplex.h>
|
|
#include <connectd/onion_message.h>
|
|
#include <connectd/queries.h>
|
|
#include <errno.h>
|
|
#include <inttypes.h>
|
|
#include <netinet/in.h>
|
|
#include <wire/peer_wire.h>
|
|
#include <wire/wire_io.h>
|
|
|
|
/* Size of write(), to create uniform size packets. */
|
|
#define UNIFORM_MESSAGE_SIZE 1460
|
|
|
|
struct subd {
|
|
/* Owner: we are in peer->subds[] */
|
|
struct peer *peer;
|
|
|
|
/* The temporary or permanant channel_id */
|
|
struct channel_id channel_id;
|
|
|
|
/* In passing, we can have a temporary one, too. */
|
|
struct channel_id *temporary_channel_id;
|
|
|
|
/* The opening revocation basepoint, for v2 channel_id. */
|
|
struct pubkey *opener_revocation_basepoint;
|
|
|
|
/* The actual connection to talk to it (NULL if it's not connected yet) */
|
|
struct io_conn *conn;
|
|
|
|
/* Input buffer */
|
|
u8 *in;
|
|
|
|
/* Output buffer */
|
|
struct msg_queue *outq;
|
|
|
|
/* After we've told it to tx_abort, we don't send anything else. */
|
|
bool rcvd_tx_abort;
|
|
};
|
|
|
|
/* FIXME: reorder! */
|
|
static bool is_urgent(enum peer_wire type);
|
|
static void destroy_connected_subd(struct subd *subd);
|
|
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
|
struct peer *peer);
|
|
|
|
static struct subd *find_subd(struct peer *peer,
|
|
const struct channel_id *channel_id)
|
|
{
|
|
for (size_t i = 0; i < tal_count(peer->subds); i++) {
|
|
struct subd *subd = peer->subds[i];
|
|
|
|
/* Once we sent it tx_abort, we pretend it doesn't exist */
|
|
if (subd->rcvd_tx_abort)
|
|
continue;
|
|
|
|
/* Once we see a message using the real channel_id, we
|
|
* clear the temporary_channel_id */
|
|
if (channel_id_eq(&subd->channel_id, channel_id)) {
|
|
subd->temporary_channel_id
|
|
= tal_free(subd->temporary_channel_id);
|
|
return subd;
|
|
}
|
|
if (subd->temporary_channel_id
|
|
&& channel_id_eq(subd->temporary_channel_id, channel_id)) {
|
|
return subd;
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/* We try to send the final messages, but if buffer is full and they're
|
|
* not reading, we have to give up. */
|
|
static void close_peer_io_timeout(struct peer *peer)
|
|
{
|
|
status_peer_unusual(&peer->id, CI_UNEXPECTED "Peer did not close, forcing close");
|
|
io_close(peer->to_peer);
|
|
}
|
|
|
|
static void close_subd_timeout(struct subd *subd)
|
|
{
|
|
status_peer_broken(&subd->peer->id, "Subd did not close, forcing close");
|
|
io_close(subd->conn);
|
|
}
|
|
|
|
static void msg_to_peer_outq(struct peer *peer, const u8 *msg TAKES)
|
|
{
|
|
if (is_urgent(fromwire_peektype(msg)))
|
|
peer->peer_out_urgent++;
|
|
|
|
msg_enqueue(peer->peer_outq, msg);
|
|
}
|
|
|
|
void inject_peer_msg(struct peer *peer, const u8 *msg TAKES)
|
|
{
|
|
status_peer_io(LOG_IO_OUT, &peer->id, msg);
|
|
msg_to_peer_outq(peer, msg);
|
|
}
|
|
|
|
static void drain_peer(struct peer *peer)
|
|
{
|
|
assert(tal_count(peer->subds) == 0);
|
|
|
|
/* You have five seconds to drain. */
|
|
peer->draining_state = WRITING_TO_PEER;
|
|
status_peer_debug(&peer->id, "disconnect_peer: draining with 5 second timer.");
|
|
notleak(new_reltimer(&peer->daemon->timers,
|
|
peer->to_peer, time_from_sec(5),
|
|
close_peer_io_timeout, peer));
|
|
io_wake(peer->peer_outq);
|
|
|
|
/* We will discard what they send us, but listen so we catch closes */
|
|
io_wake(&peer->peer_in);
|
|
}
|
|
|
|
void disconnect_peer(struct peer *peer)
|
|
{
|
|
peer->draining_state = READING_FROM_SUBDS;
|
|
|
|
for (size_t i = 0; i < tal_count(peer->subds); i++) {
|
|
/* Start timer in case it doesn't close by itself */
|
|
if (peer->subds[i]->conn) {
|
|
status_peer_debug(&peer->id, "disconnect_peer: setting 5 second timer for subd %zu/%zu.",
|
|
i, tal_count(peer->subds));
|
|
notleak(new_reltimer(&peer->daemon->timers, peer->subds[i],
|
|
time_from_sec(5),
|
|
close_subd_timeout, peer->subds[i]));
|
|
} else {
|
|
/* We told lightningd that peer spoke, but it hasn't returned yet. */
|
|
tal_arr_remove(&peer->subds, i);
|
|
i--;
|
|
}
|
|
}
|
|
|
|
if (tal_count(peer->subds) != 0) {
|
|
status_peer_debug(&peer->id, "disconnect_peer: waking %zu subds.",
|
|
tal_count(peer->subds));
|
|
/* Wake them up so we read again */
|
|
io_wake(&peer->subds);
|
|
} else {
|
|
status_peer_debug(&peer->id, "disconnect_peer: no subds, draining now.");
|
|
/* No subds left, start draining peer */
|
|
drain_peer(peer);
|
|
}
|
|
}
|
|
|
|
static void free_all_subds(struct peer *peer)
|
|
{
|
|
for (size_t i = 0; i < tal_count(peer->subds); i++) {
|
|
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
|
|
if (peer->subds[i]->conn) {
|
|
tal_del_destructor(peer->subds[i], destroy_connected_subd);
|
|
tal_free(peer->subds[i]->conn);
|
|
} else {
|
|
/* We told lightningd that peer spoke, but it hasn't returned yet. */
|
|
tal_free(peer->subds[i]);
|
|
}
|
|
}
|
|
tal_resize(&peer->subds, 0);
|
|
}
|
|
|
|
/* Send warning, close connection to peer */
|
|
static void send_warning(struct peer *peer, const char *fmt, ...)
|
|
{
|
|
va_list ap;
|
|
u8 *msg;
|
|
|
|
va_start(ap, fmt);
|
|
status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap);
|
|
va_end(ap);
|
|
|
|
va_start(ap, fmt);
|
|
msg = towire_warningfmtv(NULL, NULL, fmt, ap);
|
|
va_end(ap);
|
|
|
|
inject_peer_msg(peer, take(msg));
|
|
|
|
/* Free all the subds immediately */
|
|
free_all_subds(peer);
|
|
disconnect_peer(peer);
|
|
}
|
|
|
|
/* Kicks off write_to_peer() to look for more gossip to send from store */
|
|
static void wake_gossip(struct peer *peer);
|
|
|
|
static struct oneshot *gossip_stream_timer(struct peer *peer)
|
|
{
|
|
u32 next;
|
|
|
|
/* BOLT #7:
|
|
*
|
|
* A node:
|
|
*...
|
|
* - SHOULD flush outgoing gossip messages once every 60 seconds,
|
|
* independently of the arrival times of the messages.
|
|
* - Note: this results in staggered announcements that are unique
|
|
* (not duplicated).
|
|
*/
|
|
/* We shorten this for dev_fast_gossip! */
|
|
next = GOSSIP_FLUSH_INTERVAL(peer->daemon->dev_fast_gossip);
|
|
|
|
return new_reltimer(&peer->daemon->timers,
|
|
peer, time_from_sec(next),
|
|
wake_gossip, peer);
|
|
}
|
|
|
|
/* Statistically, how many peers to we tell about each channel? */
|
|
#define GOSSIP_SPAM_REDUNDANCY 50
|
|
|
|
/* BOLT #7:
|
|
* A node:
|
|
* - MUST NOT relay any gossip messages it did not generate itself,
|
|
* unless explicitly requested.
|
|
*/
|
|
/* i.e. the strong implication is that we spam our own gossip aggressively!
|
|
* "Look at me!" "Look at me!!!!".
|
|
*/
|
|
static void spam_new_peer(struct peer *peer, struct gossmap *gossmap)
|
|
{
|
|
struct gossmap_node *me;
|
|
const u8 *msg;
|
|
u64 send_threshold;
|
|
|
|
/* Find ourselves; if no channels, nothing to send */
|
|
me = gossmap_find_node(gossmap, &peer->daemon->id);
|
|
if (!me)
|
|
return;
|
|
|
|
send_threshold = -1ULL;
|
|
|
|
/* Just in case we have many peers and not all are connecting or
|
|
* some other corner case, send everything to first few. */
|
|
if (peer_htable_count(peer->daemon->peers) > GOSSIP_SPAM_REDUNDANCY
|
|
&& me->num_chans > GOSSIP_SPAM_REDUNDANCY) {
|
|
send_threshold = -1ULL / me->num_chans * GOSSIP_SPAM_REDUNDANCY;
|
|
}
|
|
|
|
for (size_t i = 0; i < me->num_chans; i++) {
|
|
struct gossmap_chan *chan = gossmap_nth_chan(gossmap, me, i, NULL);
|
|
|
|
/* We set this so we'll send a fraction of all our channels */
|
|
if (pseudorand_u64() > send_threshold)
|
|
continue;
|
|
|
|
/* Send channel_announce */
|
|
msg = gossmap_chan_get_announce(NULL, gossmap, chan);
|
|
inject_peer_msg(peer, take(msg));
|
|
|
|
/* Send both channel_updates (if they exist): both help people
|
|
* use our channel, so we care! */
|
|
for (int dir = 0; dir < 2; dir++) {
|
|
if (!gossmap_chan_set(chan, dir))
|
|
continue;
|
|
msg = gossmap_chan_get_update(NULL, gossmap, chan, dir);
|
|
inject_peer_msg(peer, take(msg));
|
|
}
|
|
}
|
|
|
|
/* If we have one, we should send our own node_announcement */
|
|
msg = gossmap_node_get_announce(NULL, gossmap, me);
|
|
if (msg)
|
|
inject_peer_msg(peer, take(msg));
|
|
}
|
|
|
|
void setup_peer_gossip_store(struct peer *peer,
|
|
const struct feature_set *our_features,
|
|
const u8 *their_features)
|
|
{
|
|
struct gossmap *gossmap = get_gossmap(peer->daemon);
|
|
|
|
peer->gs.grf = new_gossip_rcvd_filter(peer);
|
|
peer->gs.iter = gossmap_iter_new(peer, gossmap);
|
|
peer->gs.bytes_this_second = 0;
|
|
peer->gs.bytes_start_time = time_mono();
|
|
|
|
/* BOLT #7:
|
|
*
|
|
* A node:
|
|
* - MUST NOT relay any gossip messages it did not generate itself,
|
|
* unless explicitly requested.
|
|
*/
|
|
peer->gs.gossip_timer = NULL;
|
|
peer->gs.active = false;
|
|
|
|
spam_new_peer(peer, gossmap);
|
|
return;
|
|
}
|
|
|
|
static bool is_urgent(enum peer_wire type)
|
|
{
|
|
switch (type) {
|
|
/* We are happy to batch UPDATE_ADD messages: it's the
|
|
* commitment signed which matters. */
|
|
case WIRE_UPDATE_ADD_HTLC:
|
|
case WIRE_UPDATE_FULFILL_HTLC:
|
|
case WIRE_UPDATE_FAIL_HTLC:
|
|
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
|
|
case WIRE_UPDATE_FEE:
|
|
/* Gossip messages are also non-urgent */
|
|
case WIRE_CHANNEL_ANNOUNCEMENT:
|
|
case WIRE_NODE_ANNOUNCEMENT:
|
|
case WIRE_CHANNEL_UPDATE:
|
|
return false;
|
|
|
|
/* We don't delay for anything else, but we use a switch
|
|
* statement to make you think about new cases! */
|
|
case WIRE_INIT:
|
|
case WIRE_ERROR:
|
|
case WIRE_WARNING:
|
|
case WIRE_TX_ADD_INPUT:
|
|
case WIRE_TX_ADD_OUTPUT:
|
|
case WIRE_TX_REMOVE_INPUT:
|
|
case WIRE_TX_REMOVE_OUTPUT:
|
|
case WIRE_TX_COMPLETE:
|
|
case WIRE_TX_ABORT:
|
|
case WIRE_TX_SIGNATURES:
|
|
case WIRE_OPEN_CHANNEL:
|
|
case WIRE_ACCEPT_CHANNEL:
|
|
case WIRE_FUNDING_CREATED:
|
|
case WIRE_FUNDING_SIGNED:
|
|
case WIRE_CHANNEL_READY:
|
|
case WIRE_OPEN_CHANNEL2:
|
|
case WIRE_ACCEPT_CHANNEL2:
|
|
case WIRE_TX_INIT_RBF:
|
|
case WIRE_TX_ACK_RBF:
|
|
case WIRE_SHUTDOWN:
|
|
case WIRE_CLOSING_SIGNED:
|
|
case WIRE_CLOSING_COMPLETE:
|
|
case WIRE_CLOSING_SIG:
|
|
case WIRE_UPDATE_BLOCKHEIGHT:
|
|
case WIRE_CHANNEL_REESTABLISH:
|
|
case WIRE_ANNOUNCEMENT_SIGNATURES:
|
|
case WIRE_QUERY_SHORT_CHANNEL_IDS:
|
|
case WIRE_REPLY_SHORT_CHANNEL_IDS_END:
|
|
case WIRE_QUERY_CHANNEL_RANGE:
|
|
case WIRE_REPLY_CHANNEL_RANGE:
|
|
case WIRE_GOSSIP_TIMESTAMP_FILTER:
|
|
case WIRE_ONION_MESSAGE:
|
|
case WIRE_PEER_STORAGE:
|
|
case WIRE_PEER_STORAGE_RETRIEVAL:
|
|
case WIRE_STFU:
|
|
case WIRE_SPLICE:
|
|
case WIRE_SPLICE_ACK:
|
|
case WIRE_SPLICE_LOCKED:
|
|
case WIRE_PING:
|
|
case WIRE_PONG:
|
|
case WIRE_PROTOCOL_BATCH_ELEMENT:
|
|
case WIRE_START_BATCH:
|
|
case WIRE_COMMITMENT_SIGNED:
|
|
case WIRE_REVOKE_AND_ACK:
|
|
return true;
|
|
};
|
|
|
|
/* plugins can inject other messages. */
|
|
return true;
|
|
}
|
|
|
|
/* Process and eat protocol_batch_element messages, encrypt each element message
|
|
* and return the encrypted messages as one long byte array. */
|
|
static u8 *process_batch_elements(const tal_t *ctx, struct peer *peer, const u8 *msg TAKES)
|
|
{
|
|
u8 *ret = tal_arr(ctx, u8, 0);
|
|
size_t ret_size = 0;
|
|
const u8 *cursor = msg;
|
|
size_t plen = tal_count(msg);
|
|
|
|
status_debug("Processing batch elements of %zu bytes. %s", plen,
|
|
tal_hex(tmpctx, msg));
|
|
|
|
do {
|
|
u8 *element_bytes;
|
|
u16 element_size;
|
|
struct channel_id channel_id;
|
|
u8 *enc_msg;
|
|
|
|
if (fromwire_u16(&cursor, &plen) != WIRE_PROTOCOL_BATCH_ELEMENT) {
|
|
status_broken("process_batch_elements on msg that is"
|
|
" not WIRE_PROTOCOL_BATCH_ELEMENT. %s",
|
|
tal_hexstr(tmpctx, cursor, plen));
|
|
return tal_free(ret);
|
|
}
|
|
|
|
fromwire_channel_id(&cursor, &plen, &channel_id);
|
|
|
|
element_size = fromwire_u16(&cursor, &plen);
|
|
if (!element_size) {
|
|
status_broken("process_batch_elements cannot have zero"
|
|
" length elements. %s",
|
|
tal_hexstr(tmpctx, cursor, plen));
|
|
return tal_free(ret);
|
|
}
|
|
|
|
element_bytes = fromwire_tal_arrn(NULL, &cursor, &plen,
|
|
element_size);
|
|
if (!element_bytes) {
|
|
status_broken("process_batch_elements fromwire_tal_arrn"
|
|
" %s",
|
|
tal_hexstr(tmpctx, cursor, plen));
|
|
return tal_free(ret);
|
|
}
|
|
|
|
status_debug("Processing batch extracted item %s. %s",
|
|
peer_wire_name(fromwire_peektype(element_bytes)),
|
|
tal_hex(tmpctx, element_bytes));
|
|
|
|
enc_msg = cryptomsg_encrypt_msg(tmpctx, &peer->cs,
|
|
take(element_bytes));
|
|
|
|
tal_resize(&ret, ret_size + tal_bytelen(enc_msg));
|
|
memcpy(&ret[ret_size], enc_msg, tal_bytelen(enc_msg));
|
|
ret_size += tal_bytelen(enc_msg);
|
|
|
|
} while(plen);
|
|
|
|
if (taken(msg))
|
|
tal_free(msg);
|
|
|
|
return ret;
|
|
}
|
|
|
|
/* --dev-disconnect can do magic things: if this returns non-NULL,
|
|
free msg and do that */
|
|
static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg)
|
|
{
|
|
int type = fromwire_peektype(msg);
|
|
|
|
switch (dev_disconnect_out(&peer->id, type)) {
|
|
case DEV_DISCONNECT_OUT_BEFORE:
|
|
return io_close(peer->to_peer);
|
|
case DEV_DISCONNECT_OUT_AFTER:
|
|
/* Disallow reads from now on */
|
|
peer->dev_read_enabled = false;
|
|
free_all_subds(peer);
|
|
drain_peer(peer);
|
|
return NULL;
|
|
case DEV_DISCONNECT_OUT_BLACKHOLE:
|
|
/* Disable both reads and writes from now on */
|
|
peer->dev_read_enabled = false;
|
|
peer->dev_writes_enabled = talz(peer, u32);
|
|
return NULL;
|
|
case DEV_DISCONNECT_OUT_NORMAL:
|
|
return NULL;
|
|
case DEV_DISCONNECT_OUT_DROP:
|
|
/* Tell them to read again. */
|
|
io_wake(&peer->subds);
|
|
return msg_queue_wait(peer->to_peer, peer->peer_outq,
|
|
write_to_peer, peer);
|
|
case DEV_DISCONNECT_OUT_DISABLE_AFTER:
|
|
peer->dev_read_enabled = false;
|
|
peer->dev_writes_enabled = tal(peer, u32);
|
|
*peer->dev_writes_enabled = 1;
|
|
return NULL;
|
|
}
|
|
abort();
|
|
}
|
|
|
|
/* Do we have enough bytes without padding? */
|
|
static bool have_full_encrypted_queue(const struct peer *peer)
|
|
{
|
|
return membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE;
|
|
}
|
|
|
|
/* Do we have nothing in queue? */
|
|
static bool have_empty_encrypted_queue(const struct peer *peer)
|
|
{
|
|
return membuf_num_elems(&peer->encrypted_peer_out) == 0;
|
|
}
|
|
|
|
/* (Continue) writing the encrypted_peer_out array */
|
|
static struct io_plan *write_encrypted_to_peer(struct peer *peer)
|
|
{
|
|
assert(membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE);
|
|
return io_write_partial(peer->to_peer,
|
|
membuf_elems(&peer->encrypted_peer_out),
|
|
UNIFORM_MESSAGE_SIZE,
|
|
&peer->encrypted_peer_out_sent,
|
|
write_to_peer, peer);
|
|
}
|
|
|
|
/* Close the connection if this fails */
|
|
static bool encrypt_append(struct peer *peer, const u8 *msg TAKES)
|
|
{
|
|
int type = fromwire_peektype(msg);
|
|
const u8 *enc;
|
|
size_t enclen;
|
|
|
|
/* Special message type directing us to process batch items. */
|
|
if (type == WIRE_PROTOCOL_BATCH_ELEMENT) {
|
|
enc = process_batch_elements(tmpctx, peer, msg);
|
|
if (!enc)
|
|
return false;
|
|
} else {
|
|
enc = cryptomsg_encrypt_msg(tmpctx, &peer->cs, msg);
|
|
}
|
|
enclen = tal_bytelen(enc);
|
|
memcpy(membuf_add(&peer->encrypted_peer_out, enclen),
|
|
enc,
|
|
enclen);
|
|
return true;
|
|
}
|
|
|
|
static void pad_encrypted_queue(struct peer *peer)
|
|
{
|
|
size_t needed, pingpad;
|
|
u8 *ping;
|
|
|
|
/* BOLT #8:
|
|
*
|
|
* ```
|
|
* +-------------------------------
|
|
* |2-byte encrypted message length|
|
|
* +-------------------------------
|
|
* | 16-byte MAC of the encrypted |
|
|
* | message length |
|
|
* +-------------------------------
|
|
* | |
|
|
* | |
|
|
* | encrypted Lightning |
|
|
* | message |
|
|
* | |
|
|
* +-------------------------------
|
|
* | 16-byte MAC of the |
|
|
* | Lightning message |
|
|
* +-------------------------------
|
|
* ```
|
|
*
|
|
* The prefixed message length is encoded as a 2-byte big-endian integer,
|
|
* for a total maximum packet length of `2 + 16 + 65535 + 16` = `65569` bytes.
|
|
*/
|
|
assert(membuf_num_elems(&peer->encrypted_peer_out) < UNIFORM_MESSAGE_SIZE);
|
|
needed = UNIFORM_MESSAGE_SIZE - membuf_num_elems(&peer->encrypted_peer_out);
|
|
|
|
/* BOLT #1:
|
|
* 1. type: 18 (`ping`)
|
|
* 2. data:
|
|
* * [`u16`:`num_pong_bytes`]
|
|
* * [`u16`:`byteslen`]
|
|
* * [`byteslen*byte`:`ignored`]
|
|
*/
|
|
/* So smallest possible ping is 6 bytes (2 byte type field) */
|
|
if (needed < 2 + 16 + 16 + 6)
|
|
needed += UNIFORM_MESSAGE_SIZE;
|
|
|
|
pingpad = needed - (2 + 16 + 16 + 6);
|
|
/* Note: we don't bother --dev-disconnect here */
|
|
/* BOLT #1:
|
|
* A node receiving a `ping` message:
|
|
* - if `num_pong_bytes` is less than 65532:
|
|
* - MUST respond by sending a `pong` message, with `byteslen` equal to `num_pong_bytes`.
|
|
* - otherwise (`num_pong_bytes` is **not** less than 65532):
|
|
* - MUST ignore the `ping`.
|
|
*/
|
|
ping = make_ping(NULL, 65535, pingpad);
|
|
if (!encrypt_append(peer, take(ping)))
|
|
abort();
|
|
|
|
assert(have_full_encrypted_queue(peer));
|
|
assert(membuf_num_elems(&peer->encrypted_peer_out) % UNIFORM_MESSAGE_SIZE == 0);
|
|
}
|
|
|
|
/* Kicks off write_to_peer() to look for more gossip to send from store */
|
|
static void wake_gossip(struct peer *peer)
|
|
{
|
|
bool flush_gossip_filter = true;
|
|
/* With dev-fast-gossip, we clean every 2 seconds, which is too
|
|
* fast for our slow tests! So we only call this one time in 5
|
|
* actually twice that, as it's not per-peer! */
|
|
static int gossip_age_count;
|
|
|
|
if (peer->daemon->dev_fast_gossip && gossip_age_count++ % 5 != 0)
|
|
flush_gossip_filter = false;
|
|
|
|
/* Don't remember sent per-peer gossip forever. */
|
|
if (flush_gossip_filter)
|
|
gossip_rcvd_filter_age(peer->gs.grf);
|
|
|
|
peer->gs.active = !peer->daemon->dev_suppress_gossip;
|
|
io_wake(peer->peer_outq);
|
|
|
|
/* And go again in 60 seconds (from now, now when we finish!) */
|
|
peer->gs.gossip_timer = gossip_stream_timer(peer);
|
|
}
|
|
|
|
/* Gossip response or something from gossip store */
|
|
static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer)
|
|
{
|
|
const u8 *msg;
|
|
struct timemono now;
|
|
struct gossmap *gossmap;
|
|
u32 timestamp;
|
|
const u8 **msgs;
|
|
|
|
/* If it's been over a second, make a fresh start. */
|
|
now = time_mono();
|
|
if (time_to_sec(timemono_between(now, peer->gs.bytes_start_time)) > 0) {
|
|
peer->gs.bytes_start_time = now;
|
|
peer->gs.bytes_this_second = 0;
|
|
}
|
|
|
|
/* Sent too much this second? */
|
|
if (peer->gs.bytes_this_second > peer->daemon->gossip_stream_limit) {
|
|
/* Replace normal timer with a timer after throttle. */
|
|
peer->gs.active = false;
|
|
tal_free(peer->gs.gossip_timer);
|
|
peer->gs.gossip_timer
|
|
= new_abstimer(&peer->daemon->timers,
|
|
peer,
|
|
timemono_add(peer->gs.bytes_start_time,
|
|
time_from_sec(1)),
|
|
wake_gossip, peer);
|
|
return NULL;
|
|
}
|
|
|
|
gossmap = get_gossmap(peer->daemon);
|
|
|
|
/* This can return more than one. */
|
|
msgs = maybe_create_query_responses(tmpctx, peer, gossmap);
|
|
if (tal_count(msgs) > 0) {
|
|
/* We return the first one for immediate sending, and queue
|
|
* others for future. We add all the lengths now though! */
|
|
for (size_t i = 0; i < tal_count(msgs); i++) {
|
|
peer->gs.bytes_this_second += tal_bytelen(msgs[i]);
|
|
status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]);
|
|
if (i > 0)
|
|
msg_to_peer_outq(peer, take(msgs[i]));
|
|
}
|
|
return msgs[0];
|
|
}
|
|
|
|
/* dev-mode can suppress all gossip */
|
|
if (peer->daemon->dev_suppress_gossip)
|
|
return NULL;
|
|
|
|
/* Not streaming right now? */
|
|
if (!peer->gs.active)
|
|
return NULL;
|
|
|
|
/* This should be around to kick us every 60 seconds */
|
|
assert(peer->gs.gossip_timer);
|
|
|
|
again:
|
|
msg = gossmap_stream_next(ctx, gossmap, peer->gs.iter, ×tamp);
|
|
if (msg) {
|
|
/* Don't send back gossip they sent to us! */
|
|
if (gossip_rcvd_filter_del(peer->gs.grf, msg)) {
|
|
msg = tal_free(msg);
|
|
goto again;
|
|
}
|
|
/* Check timestamp (zero for channel_announcement with
|
|
* no update yet!): FIXME: we could ignore this! */
|
|
if (timestamp
|
|
&& (timestamp < peer->gs.timestamp_min || timestamp > peer->gs.timestamp_max)) {
|
|
msg = tal_free(msg);
|
|
goto again;
|
|
}
|
|
peer->gs.bytes_this_second += tal_bytelen(msg);
|
|
status_peer_io(LOG_IO_OUT, &peer->id, msg);
|
|
return msg;
|
|
}
|
|
|
|
/* No gossip left to send */
|
|
peer->gs.active = false;
|
|
return NULL;
|
|
}
|
|
|
|
/* Mutual recursion */
|
|
static void send_ping(struct peer *peer);
|
|
|
|
static void set_ping_timer(struct peer *peer)
|
|
{
|
|
if (peer->daemon->dev_no_ping_timer) {
|
|
peer->ping_timer = NULL;
|
|
return;
|
|
}
|
|
peer->ping_timer = new_reltimer(&peer->daemon->timers, peer,
|
|
time_from_sec(15 + pseudorand(30)),
|
|
send_ping, peer);
|
|
}
|
|
|
|
static void send_ping(struct peer *peer)
|
|
{
|
|
/* If it's still sending us traffic, maybe ping reply is backed up?
|
|
* That's OK, ping is just to make sure it's still alive, and clearly
|
|
* it is. */
|
|
if (timemono_before(peer->last_recv_time,
|
|
timemono_sub(time_mono(), time_from_sec(60)))) {
|
|
/* Already have a ping in flight? */
|
|
if (peer->expecting_pong != PONG_UNEXPECTED) {
|
|
status_peer_debug(&peer->id, "Last ping unreturned: hanging up");
|
|
if (peer->to_peer)
|
|
io_close(peer->to_peer);
|
|
return;
|
|
}
|
|
|
|
inject_peer_msg(peer, take(make_ping(NULL, 1, 0)));
|
|
peer->ping_start = time_mono();
|
|
peer->expecting_pong = PONG_EXPECTED_PROBING;
|
|
}
|
|
|
|
set_ping_timer(peer);
|
|
}
|
|
|
|
void set_custommsgs(struct daemon *daemon, const u8 *msg)
|
|
{
|
|
tal_free(daemon->custom_msgs);
|
|
if (!fromwire_connectd_set_custommsgs(daemon, msg, &daemon->custom_msgs))
|
|
master_badmsg(WIRE_CONNECTD_SET_CUSTOMMSGS, msg);
|
|
status_debug("Now allowing %zu custom message types",
|
|
tal_count(daemon->custom_msgs));
|
|
}
|
|
|
|
void send_custommsg(struct daemon *daemon, const u8 *msg)
|
|
{
|
|
struct node_id id;
|
|
u8 *custommsg;
|
|
struct peer *peer;
|
|
|
|
if (!fromwire_connectd_custommsg_out(tmpctx, msg, &id, &custommsg))
|
|
master_badmsg(WIRE_CONNECTD_CUSTOMMSG_OUT, msg);
|
|
|
|
/* Races can happen: this might be gone by now. */
|
|
peer = peer_htable_get(daemon->peers, &id);
|
|
if (peer)
|
|
inject_peer_msg(peer, take(custommsg));
|
|
}
|
|
|
|
static void handle_ping_in(struct peer *peer, const u8 *msg)
|
|
{
|
|
u8 *pong;
|
|
|
|
if (!check_ping_make_pong(NULL, msg, &pong)) {
|
|
send_warning(peer, "Invalid ping %s", tal_hex(msg, msg));
|
|
return;
|
|
}
|
|
|
|
if (pong)
|
|
inject_peer_msg(peer, take(pong));
|
|
}
|
|
|
|
static void handle_ping_reply(struct peer *peer, const u8 *msg)
|
|
{
|
|
u8 *ignored;
|
|
size_t i;
|
|
|
|
/* We print this out because we asked for pong, so can't spam us... */
|
|
if (!fromwire_pong(msg, msg, &ignored))
|
|
status_peer_unusual(&peer->id, "Got malformed ping reply %s",
|
|
tal_hex(tmpctx, msg));
|
|
|
|
/* We print this because dev versions of Core Lightning embed
|
|
* version here: see check_ping_make_pong! */
|
|
for (i = 0; i < tal_count(ignored); i++) {
|
|
if (ignored[i] < ' ' || ignored[i] == 127)
|
|
break;
|
|
}
|
|
status_debug("Got pong %zu bytes (%.*s...)",
|
|
tal_count(ignored), (int)i, (char *)ignored);
|
|
daemon_conn_send(peer->daemon->master,
|
|
take(towire_connectd_ping_done(NULL, peer->ping_reqid, true,
|
|
tal_bytelen(msg))));
|
|
}
|
|
|
|
static void handle_pong_in(struct peer *peer, const u8 *msg)
|
|
{
|
|
switch (peer->expecting_pong) {
|
|
case PONG_EXPECTED_COMMAND:
|
|
handle_ping_reply(peer, msg);
|
|
/* fall thru */
|
|
case PONG_EXPECTED_PROBING:
|
|
peer->expecting_pong = PONG_UNEXPECTED;
|
|
daemon_conn_send(peer->daemon->master,
|
|
take(towire_connectd_ping_latency(NULL,
|
|
&peer->id,
|
|
time_to_nsec(timemono_since(peer->ping_start)))));
|
|
return;
|
|
case PONG_UNEXPECTED:
|
|
status_debug("Unexpected pong?");
|
|
return;
|
|
}
|
|
abort();
|
|
}
|
|
|
|
/* Various cases where we don't send the msg to a gossipd, we want to
|
|
* do IO logging! */
|
|
static void log_peer_io(const struct peer *peer, const u8 *msg)
|
|
{
|
|
status_peer_io(LOG_IO_IN, &peer->id, msg);
|
|
io_wake(peer->peer_outq);
|
|
}
|
|
|
|
/* Forward to gossipd */
|
|
static void handle_gossip_in(struct peer *peer, const u8 *msg)
|
|
{
|
|
u8 *gmsg;
|
|
|
|
/* We warn at 250000, drop at 500000 */
|
|
if (daemon_conn_queue_length(peer->daemon->gossipd) > 500000)
|
|
return;
|
|
|
|
gmsg = towire_gossipd_recv_gossip(NULL, &peer->id, msg);
|
|
|
|
/* gossipd doesn't log IO, so we log it here. */
|
|
log_peer_io(peer, msg);
|
|
daemon_conn_send(peer->daemon->gossipd, take(gmsg));
|
|
}
|
|
|
|
static void handle_gossip_timestamp_filter_in(struct peer *peer, const u8 *msg)
|
|
{
|
|
struct bitcoin_blkid chain_hash;
|
|
u32 first_timestamp, timestamp_range;
|
|
struct gossmap *gossmap = get_gossmap(peer->daemon);
|
|
|
|
if (!fromwire_gossip_timestamp_filter(msg, &chain_hash,
|
|
&first_timestamp,
|
|
×tamp_range)) {
|
|
send_warning(peer, "gossip_timestamp_filter invalid: %s",
|
|
tal_hex(tmpctx, msg));
|
|
return;
|
|
}
|
|
|
|
if (!bitcoin_blkid_eq(&chainparams->genesis_blockhash, &chain_hash)) {
|
|
send_warning(peer, "gossip_timestamp_filter for bad chain: %s",
|
|
tal_hex(tmpctx, msg));
|
|
return;
|
|
}
|
|
|
|
peer->gs.timestamp_min = first_timestamp;
|
|
peer->gs.timestamp_max = first_timestamp + timestamp_range - 1;
|
|
/* Make sure we never leave it on an impossible value. */
|
|
if (peer->gs.timestamp_max < peer->gs.timestamp_min)
|
|
peer->gs.timestamp_max = UINT32_MAX;
|
|
|
|
/* BOLT-gossip-filter-simplify #7:
|
|
* The receiver:
|
|
*...
|
|
* - if `first_timestamp` is 0:
|
|
* - SHOULD send all known gossip messages.
|
|
* - otherwise, if `first_timestamp` is 0xFFFFFFFF:
|
|
* - SHOULD NOT send any gossip messages (except its own).
|
|
* - otherwise:
|
|
* - SHOULD send gossip messages it receives from now own.
|
|
*/
|
|
/* For us, this means we only sweep the gossip store for messages
|
|
* if the first_timestamp is 0 */
|
|
tal_free(peer->gs.iter);
|
|
if (first_timestamp == 0) {
|
|
peer->gs.iter = gossmap_iter_new(peer, gossmap);
|
|
} else if (first_timestamp == 0xFFFFFFFF) {
|
|
peer->gs.iter = gossmap_iter_new(peer, gossmap);
|
|
gossmap_iter_end(gossmap, peer->gs.iter);
|
|
} else {
|
|
/* We are actually a bit nicer than the spec, and we include
|
|
* "recent" gossip here. */
|
|
update_recent_timestamp(peer->daemon, gossmap);
|
|
peer->gs.iter = gossmap_iter_dup(peer,
|
|
peer->daemon->gossmap_iter_recent);
|
|
}
|
|
|
|
/* BOLT #7:
|
|
* - MAY wait for the next outgoing gossip flush to send these.
|
|
*/
|
|
/* We send immediately the first time, after that we wait. */
|
|
if (!peer->gs.gossip_timer)
|
|
wake_gossip(peer);
|
|
}
|
|
|
|
static bool find_custom_msg(const u16 *custom_msgs, u16 type)
|
|
{
|
|
for (size_t i = 0; i < tal_count(custom_msgs); i++) {
|
|
if (custom_msgs[i] == type)
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
static bool handle_custommsg(struct daemon *daemon,
|
|
struct peer *peer,
|
|
const u8 *msg)
|
|
{
|
|
enum peer_wire type = fromwire_peektype(msg);
|
|
|
|
/* Messages we expect to handle ourselves. */
|
|
if (peer_wire_is_internal(type))
|
|
return false;
|
|
|
|
/* We log it, since it's not going to a subdaemon */
|
|
status_peer_io(LOG_IO_IN, &peer->id, msg);
|
|
|
|
/* Even unknown messages must be explicitly allowed */
|
|
if (type % 2 == 0 && !find_custom_msg(daemon->custom_msgs, type)) {
|
|
send_warning(peer, "Invalid unknown even msg %s",
|
|
tal_hex(msg, msg));
|
|
/* We "handled" it... */
|
|
return true;
|
|
}
|
|
|
|
/* The message is not part of the messages we know how to
|
|
* handle. Assuming this is a custommsg, we just forward it to the
|
|
* master. */
|
|
daemon_conn_send(daemon->master,
|
|
take(towire_connectd_custommsg_in(NULL,
|
|
&peer->id,
|
|
msg)));
|
|
return true;
|
|
}
|
|
|
|
void custommsg_completed(struct daemon *daemon, const u8 *msg)
|
|
{
|
|
struct node_id id;
|
|
const struct peer *peer;
|
|
|
|
if (!fromwire_connectd_custommsg_in_complete(msg, &id))
|
|
master_badmsg(WIRE_CONNECTD_CUSTOMMSG_IN_COMPLETE, msg);
|
|
|
|
/* If it's still around, log it. */
|
|
peer = peer_htable_get(daemon->peers, &id);
|
|
if (peer) {
|
|
status_peer_debug(&peer->id, "custommsg processing finished");
|
|
log_peer_io(peer, msg);
|
|
}
|
|
}
|
|
|
|
/* We handle pings and gossip messages. */
|
|
static bool handle_message_locally(struct peer *peer, const u8 *msg)
|
|
{
|
|
enum peer_wire type = fromwire_peektype(msg);
|
|
|
|
/* We remember these so we don't rexmit them */
|
|
gossip_rcvd_filter_add(peer->gs.grf, msg);
|
|
|
|
if (type == WIRE_GOSSIP_TIMESTAMP_FILTER) {
|
|
log_peer_io(peer, msg);
|
|
handle_gossip_timestamp_filter_in(peer, msg);
|
|
return true;
|
|
} else if (type == WIRE_PING) {
|
|
log_peer_io(peer, msg);
|
|
handle_ping_in(peer, msg);
|
|
return true;
|
|
} else if (type == WIRE_PONG) {
|
|
log_peer_io(peer, msg);
|
|
handle_pong_in(peer, msg);
|
|
return true;
|
|
} else if (type == WIRE_ONION_MESSAGE) {
|
|
log_peer_io(peer, msg);
|
|
handle_onion_message(peer->daemon, peer, msg);
|
|
return true;
|
|
} else if (type == WIRE_QUERY_CHANNEL_RANGE) {
|
|
handle_query_channel_range(peer, msg);
|
|
return true;
|
|
} else if (type == WIRE_QUERY_SHORT_CHANNEL_IDS) {
|
|
handle_query_short_channel_ids(peer, msg);
|
|
return true;
|
|
} else if (handle_custommsg(peer->daemon, peer, msg)) {
|
|
return true;
|
|
}
|
|
|
|
/* Do we want to divert to gossipd? */
|
|
if (is_msg_for_gossipd(msg)) {
|
|
handle_gossip_in(peer, msg);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/* Move "channel_id" to temporary. */
|
|
static void move_channel_id_to_temp(struct subd *subd)
|
|
{
|
|
tal_free(subd->temporary_channel_id);
|
|
subd->temporary_channel_id
|
|
= tal_dup(subd, struct channel_id, &subd->channel_id);
|
|
}
|
|
|
|
/* Only works for open_channel2 and accept_channel2 */
|
|
static struct pubkey *extract_revocation_basepoint(const tal_t *ctx,
|
|
const u8 *msg)
|
|
{
|
|
const u8 *cursor = msg;
|
|
size_t max = tal_bytelen(msg);
|
|
enum peer_wire t;
|
|
struct pubkey pubkey;
|
|
|
|
t = fromwire_u16(&cursor, &max);
|
|
|
|
switch (t) {
|
|
case WIRE_OPEN_CHANNEL2:
|
|
/* BOLT #2:
|
|
* 1. type: 64 (`open_channel2`)
|
|
* 2. data:
|
|
* * [`chain_hash`:`chain_hash`]
|
|
* * [`channel_id`:`temporary_channel_id`]
|
|
* * [`u32`:`funding_feerate_perkw`]
|
|
* * [`u32`:`commitment_feerate_perkw`]
|
|
* * [`u64`:`funding_satoshis`]
|
|
* * [`u64`:`dust_limit_satoshis`]
|
|
* * [`u64`:`max_htlc_value_in_flight_msat`]
|
|
* * [`u64`:`htlc_minimum_msat`]
|
|
* * [`u16`:`to_self_delay`]
|
|
* * [`u16`:`max_accepted_htlcs`]
|
|
* * [`u32`:`locktime`]
|
|
* * [`point`:`funding_pubkey`]
|
|
* * [`point`:`revocation_basepoint`]
|
|
*/
|
|
fromwire_pad(&cursor, &max,
|
|
sizeof(struct bitcoin_blkid)
|
|
+ sizeof(struct channel_id)
|
|
+ sizeof(u32)
|
|
+ sizeof(u32)
|
|
+ sizeof(u64)
|
|
+ sizeof(u64)
|
|
+ sizeof(u64)
|
|
+ sizeof(u64)
|
|
+ sizeof(u16)
|
|
+ sizeof(u16)
|
|
+ sizeof(u32)
|
|
+ PUBKEY_CMPR_LEN);
|
|
break;
|
|
case WIRE_ACCEPT_CHANNEL2:
|
|
/* BOLT #2:
|
|
* 1. type: 65 (`accept_channel2`)
|
|
* 2. data:
|
|
* * [`channel_id`:`temporary_channel_id`]
|
|
* * [`u64`:`funding_satoshis`]
|
|
* * [`u64`:`dust_limit_satoshis`]
|
|
* * [`u64`:`max_htlc_value_in_flight_msat`]
|
|
* * [`u64`:`htlc_minimum_msat`]
|
|
* * [`u32`:`minimum_depth`]
|
|
* * [`u16`:`to_self_delay`]
|
|
* * [`u16`:`max_accepted_htlcs`]
|
|
* * [`point`:`funding_pubkey`]
|
|
* * [`point`:`revocation_basepoint`]
|
|
*/
|
|
fromwire_pad(&cursor, &max,
|
|
sizeof(struct channel_id)
|
|
+ sizeof(u64)
|
|
+ sizeof(u64)
|
|
+ sizeof(u64)
|
|
+ sizeof(u64)
|
|
+ sizeof(u32)
|
|
+ sizeof(u16)
|
|
+ sizeof(u16)
|
|
+ PUBKEY_CMPR_LEN);
|
|
break;
|
|
default:
|
|
abort();
|
|
}
|
|
|
|
fromwire_pubkey(&cursor, &max, &pubkey);
|
|
if (!cursor)
|
|
return NULL;
|
|
return tal_dup(ctx, struct pubkey, &pubkey);
|
|
}
|
|
|
|
/* Only works for funding_created */
|
|
static bool extract_funding_created_funding(const u8 *funding_created,
|
|
struct bitcoin_outpoint *outp)
|
|
{
|
|
const u8 *cursor = funding_created;
|
|
size_t max = tal_bytelen(funding_created);
|
|
enum peer_wire t;
|
|
|
|
t = fromwire_u16(&cursor, &max);
|
|
|
|
switch (t) {
|
|
case WIRE_FUNDING_CREATED:
|
|
/* BOLT #2:
|
|
* 1. type: 34 (`funding_created`)
|
|
* 2. data:
|
|
* * [`32*byte`:`temporary_channel_id`]
|
|
* * [`sha256`:`funding_txid`]
|
|
* * [`u16`:`funding_output_index`]
|
|
*/
|
|
fromwire_pad(&cursor, &max, 32);
|
|
fromwire_bitcoin_txid(&cursor, &max, &outp->txid);
|
|
outp->n = fromwire_u16(&cursor, &max);
|
|
break;
|
|
default:
|
|
abort();
|
|
}
|
|
|
|
return cursor != NULL;
|
|
}
|
|
|
|
static void update_v1_channelid(struct subd *subd, const u8 *funding_created)
|
|
{
|
|
struct bitcoin_outpoint outp;
|
|
|
|
if (!extract_funding_created_funding(funding_created, &outp)) {
|
|
status_peer_unusual(&subd->peer->id, "WARNING: funding_created no tx info?");
|
|
return;
|
|
}
|
|
move_channel_id_to_temp(subd);
|
|
derive_channel_id(&subd->channel_id, &outp);
|
|
}
|
|
|
|
static void update_v2_channelid(struct subd *subd, const u8 *accept_channel2)
|
|
{
|
|
struct pubkey *acc_basepoint;
|
|
|
|
acc_basepoint = extract_revocation_basepoint(tmpctx, accept_channel2);
|
|
if (!acc_basepoint) {
|
|
status_peer_unusual(&subd->peer->id, "WARNING: accept_channel2 no revocation_basepoint?");
|
|
return;
|
|
}
|
|
if (!subd->opener_revocation_basepoint) {
|
|
status_peer_unusual(&subd->peer->id, "WARNING: accept_channel2 without open_channel2?");
|
|
return;
|
|
}
|
|
|
|
move_channel_id_to_temp(subd);
|
|
derive_channel_id_v2(&subd->channel_id,
|
|
subd->opener_revocation_basepoint, acc_basepoint);
|
|
}
|
|
|
|
/* We maintain channel_id matching for subds by snooping: we set it manually
|
|
* for first packet (open_channel or open_channel2). */
|
|
static void maybe_update_channelid(struct subd *subd, const u8 *msg)
|
|
{
|
|
switch (fromwire_peektype(msg)) {
|
|
case WIRE_OPEN_CHANNEL:
|
|
extract_channel_id(msg, &subd->channel_id);
|
|
break;
|
|
case WIRE_OPEN_CHANNEL2:
|
|
subd->opener_revocation_basepoint
|
|
= extract_revocation_basepoint(subd, msg);
|
|
break;
|
|
case WIRE_ACCEPT_CHANNEL2:
|
|
update_v2_channelid(subd, msg);
|
|
break;
|
|
case WIRE_FUNDING_CREATED:
|
|
update_v1_channelid(subd, msg);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static const u8 *next_msg_for_peer(struct peer *peer)
|
|
{
|
|
const u8 *msg;
|
|
|
|
msg = msg_dequeue(peer->peer_outq);
|
|
if (msg) {
|
|
if (is_urgent(fromwire_peektype(msg))) {
|
|
peer->peer_out_urgent--;
|
|
}
|
|
} else {
|
|
/* Nothing in queue means nothing urgent in queue, surely! */
|
|
assert(peer->peer_out_urgent == 0);
|
|
|
|
/* Draining? Don't send gossip. */
|
|
if (peer->draining_state == WRITING_TO_PEER)
|
|
return NULL;
|
|
|
|
/* If they want us to send gossip, do so now. */
|
|
msg = maybe_gossip_msg(NULL, peer);
|
|
if (!msg)
|
|
return NULL;
|
|
}
|
|
|
|
/* dev_disconnect can disable writes (discard everything) */
|
|
if (peer->dev_writes_enabled) {
|
|
if (*peer->dev_writes_enabled == 0) {
|
|
return tal_free(msg);
|
|
}
|
|
(*peer->dev_writes_enabled)--;
|
|
}
|
|
|
|
return msg;
|
|
}
|
|
|
|
static void nonurgent_flush(struct peer *peer)
|
|
{
|
|
peer->nonurgent_flush_timer = NULL;
|
|
peer->flushing_nonurgent = true;
|
|
io_wake(peer->peer_outq);
|
|
}
|
|
|
|
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
|
struct peer *peer)
|
|
{
|
|
bool do_flush;
|
|
assert(peer->to_peer == peer_conn);
|
|
|
|
/* We always pad and send if we have an urgent msg or our
|
|
* non-urgent has gone off, or we're trying to close. */
|
|
do_flush = (peer->peer_out_urgent != 0
|
|
|| peer->flushing_nonurgent
|
|
|| peer->draining_state == WRITING_TO_PEER);
|
|
|
|
peer->flushing_nonurgent = false;
|
|
|
|
/* We wrote out some bytes from membuf. */
|
|
membuf_consume(&peer->encrypted_peer_out, peer->encrypted_peer_out_sent);
|
|
peer->encrypted_peer_out_sent = 0;
|
|
|
|
while (!have_full_encrypted_queue(peer)) {
|
|
const u8 *msg;
|
|
struct io_plan *dev_override;
|
|
|
|
/* Pop tail of send queue (or gossip) */
|
|
msg = next_msg_for_peer(peer);
|
|
if (!msg) {
|
|
/* Draining? Shutdown socket (to avoid losing msgs) */
|
|
if (have_empty_encrypted_queue(peer)
|
|
&& peer->draining_state == WRITING_TO_PEER) {
|
|
status_peer_debug(&peer->id, "draining done, shutting down");
|
|
io_wake(&peer->peer_in);
|
|
return io_sock_shutdown(peer_conn);
|
|
}
|
|
|
|
/* If no urgent message, and not draining, we wait. */
|
|
if (!do_flush) {
|
|
/* Tell them to read again, */
|
|
io_wake(&peer->subds);
|
|
io_wake(&peer->peer_in);
|
|
|
|
/* Set up a timer if not already set */
|
|
if (!have_empty_encrypted_queue(peer)
|
|
&& !peer->nonurgent_flush_timer) {
|
|
/* Bias towards larger values, but don't be too predictable */
|
|
u32 max = pseudorand(1000);
|
|
u32 msec = 1000 - pseudorand(1 + max);
|
|
peer->nonurgent_flush_timer
|
|
= new_reltimer(&peer->daemon->timers,
|
|
peer,
|
|
time_from_msec(msec),
|
|
nonurgent_flush, peer);
|
|
}
|
|
|
|
/* Wait for them to wake us */
|
|
return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer);
|
|
}
|
|
/* OK, add padding. */
|
|
pad_encrypted_queue(peer);
|
|
} else {
|
|
if (peer->draining_state == WRITING_TO_PEER)
|
|
status_peer_debug(&peer->id, "draining, but sending %s.",
|
|
peer_wire_name(fromwire_peektype(msg)));
|
|
|
|
dev_override = msg_out_dev_disconnect(peer, msg);
|
|
if (dev_override) {
|
|
tal_free(msg);
|
|
return dev_override;
|
|
}
|
|
|
|
if (!encrypt_append(peer, take(msg)))
|
|
return io_close(peer->to_peer);
|
|
}
|
|
}
|
|
|
|
peer->nonurgent_flush_timer = tal_free(peer->nonurgent_flush_timer);
|
|
return write_encrypted_to_peer(peer);
|
|
}
|
|
|
|
static struct io_plan *read_from_subd(struct io_conn *subd_conn,
|
|
struct subd *subd);
|
|
static struct io_plan *read_from_subd_done(struct io_conn *subd_conn,
|
|
struct subd *subd)
|
|
{
|
|
maybe_update_channelid(subd, subd->in);
|
|
|
|
/* Tell them to encrypt & write. */
|
|
msg_to_peer_outq(subd->peer, take(subd->in));
|
|
subd->in = NULL;
|
|
|
|
/* Wait for them to wake us */
|
|
return io_wait(subd_conn, &subd->peer->subds, read_from_subd, subd);
|
|
}
|
|
|
|
static struct io_plan *read_from_subd(struct io_conn *subd_conn,
|
|
struct subd *subd)
|
|
{
|
|
return io_read_wire(subd_conn, subd, &subd->in,
|
|
read_from_subd_done, subd);
|
|
}
|
|
|
|
/* These four function handle peer->subd */
|
|
static struct io_plan *write_to_subd(struct io_conn *subd_conn,
|
|
struct subd *subd)
|
|
{
|
|
const u8 *msg;
|
|
assert(subd->conn == subd_conn);
|
|
|
|
/* Pop tail of send queue */
|
|
msg = msg_dequeue(subd->outq);
|
|
|
|
/* Nothing to send? */
|
|
if (!msg) {
|
|
/* If peer is closed, close this. */
|
|
if (!subd->peer->to_peer)
|
|
return io_close(subd_conn);
|
|
|
|
/* Tell them to read again. */
|
|
io_wake(&subd->peer->peer_in);
|
|
if (subd->peer->peer_in_lastmsg != -1) {
|
|
u64 msec = time_to_msec(timemono_between(time_mono(),
|
|
subd->peer->peer_in_lasttime));
|
|
if (msec > 5000 && !subd->peer->daemon->dev_lightningd_is_slow)
|
|
status_peer_broken(&subd->peer->id,
|
|
"wake delay for %s: %"PRIu64"msec",
|
|
peer_wire_name(subd->peer->peer_in_lastmsg),
|
|
msec);
|
|
subd->peer->peer_in_lastmsg = -1;
|
|
}
|
|
|
|
/* Wait for them to wake us */
|
|
return msg_queue_wait(subd_conn, subd->outq,
|
|
write_to_subd, subd);
|
|
}
|
|
|
|
return io_write_wire(subd_conn, take(msg), write_to_subd, subd);
|
|
}
|
|
|
|
static void destroy_connected_subd(struct subd *subd)
|
|
{
|
|
struct peer *peer = subd->peer;
|
|
size_t pos;
|
|
|
|
for (pos = 0; peer->subds[pos] != subd; pos++)
|
|
assert(pos < tal_count(peer->subds));
|
|
|
|
tal_arr_remove(&peer->subds, pos);
|
|
|
|
/* Make sure we try to keep reading from peer (might
|
|
* have been waiting for write_to_subd) */
|
|
io_wake(&peer->peer_in);
|
|
|
|
if (tal_count(peer->subds) == 0) {
|
|
if (!peer->to_peer) {
|
|
/* Nothing left */
|
|
tal_free(peer);
|
|
} else if (peer->draining_state == READING_FROM_SUBDS) {
|
|
/* We've finished draining subds, start draining peer */
|
|
drain_peer(peer);
|
|
}
|
|
}
|
|
}
|
|
|
|
static struct subd *new_subd(struct peer *peer,
|
|
const struct channel_id *channel_id)
|
|
{
|
|
struct subd *subd;
|
|
|
|
subd = tal(peer, struct subd);
|
|
subd->peer = peer;
|
|
subd->outq = msg_queue_new(subd, false);
|
|
subd->channel_id = *channel_id;
|
|
subd->temporary_channel_id = NULL;
|
|
subd->opener_revocation_basepoint = NULL;
|
|
subd->conn = NULL;
|
|
subd->rcvd_tx_abort = false;
|
|
|
|
/* Connect it to the peer */
|
|
tal_arr_expand(&peer->subds, subd);
|
|
return subd;
|
|
}
|
|
|
|
static struct io_plan *close_peer_dev_disconnect(struct io_conn *peer_conn,
|
|
struct peer *peer)
|
|
{
|
|
assert(peer->to_peer == peer_conn);
|
|
return io_close_cb(peer_conn, peer);
|
|
}
|
|
|
|
static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn,
|
|
struct peer *peer);
|
|
static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
|
|
struct peer *peer)
|
|
{
|
|
u8 *decrypted;
|
|
struct channel_id channel_id;
|
|
struct subd *subd;
|
|
enum peer_wire type;
|
|
struct io_plan *(*next_read)(struct io_conn *peer_conn,
|
|
struct peer *peer) = read_hdr_from_peer;
|
|
|
|
decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs,
|
|
peer->peer_in);
|
|
if (!decrypted) {
|
|
status_peer_debug(&peer->id, "Bad encrypted packet len %zu",
|
|
tal_bytelen(peer->peer_in));
|
|
return io_close(peer_conn);
|
|
}
|
|
tal_free(peer->peer_in);
|
|
|
|
type = fromwire_peektype(decrypted);
|
|
|
|
/* dev_disconnect can disable read */
|
|
if (!peer->dev_read_enabled)
|
|
return read_hdr_from_peer(peer_conn, peer);
|
|
|
|
switch (dev_disconnect_in(&peer->id, type)) {
|
|
case DEV_DISCONNECT_IN_NORMAL:
|
|
break;
|
|
case DEV_DISCONNECT_IN_AFTER_RECV:
|
|
next_read = close_peer_dev_disconnect;
|
|
break;
|
|
}
|
|
|
|
/* We got something! */
|
|
peer->last_recv_time = time_mono();
|
|
|
|
/* Don't process packets while we're closing */
|
|
if (peer->draining_state != NOT_DRAINING)
|
|
return next_read(peer_conn, peer);
|
|
|
|
/* If we swallow this, just try again. */
|
|
if (handle_message_locally(peer, decrypted)) {
|
|
/* Make sure to update peer->peer_in_lastmsg so we blame correct msg! */
|
|
goto out;
|
|
}
|
|
|
|
/* After this we should be able to match to subd by channel_id */
|
|
if (!extract_channel_id(decrypted, &channel_id)) {
|
|
/* We won't log this anywhere else, so do it here. */
|
|
status_peer_io(LOG_IO_IN, &peer->id, decrypted);
|
|
|
|
/* Could be a all-channel error or warning? Log it
|
|
* more verbose: hang up on error. */
|
|
if (type == WIRE_ERROR || type == WIRE_WARNING) {
|
|
char *desc = sanitize_error(tmpctx, decrypted, NULL);
|
|
/* FIXME: We should check old gossip, since we get many of
|
|
* these "no unspent txout" for closed channels which we
|
|
* somehow missed. */
|
|
if (strstr(desc, "no unspent txout"))
|
|
status_peer_debug(&peer->id,
|
|
"Received %s: %s",
|
|
peer_wire_name(type), desc);
|
|
else
|
|
status_peer_info(&peer->id,
|
|
"Received %s: %s",
|
|
peer_wire_name(type), desc);
|
|
if (type == WIRE_WARNING)
|
|
return next_read(peer_conn, peer);
|
|
return io_close(peer_conn);
|
|
}
|
|
|
|
/* This sets final_msg: will close after sending warning */
|
|
send_warning(peer, "Unexpected message %s: %s",
|
|
peer_wire_name(type),
|
|
tal_hex(tmpctx, decrypted));
|
|
return next_read(peer_conn, peer);
|
|
}
|
|
|
|
/* If we don't find a subdaemon for this, create a new one. */
|
|
subd = find_subd(peer, &channel_id);
|
|
if (!subd) {
|
|
enum peer_wire t = fromwire_peektype(decrypted);
|
|
|
|
/* Simplest to close on them at this point. */
|
|
if (peer->daemon->shutting_down) {
|
|
status_peer_debug(&peer->id,
|
|
"Shutting down: hanging up for %s",
|
|
peer_wire_name(t));
|
|
return io_close(peer_conn);
|
|
}
|
|
status_peer_debug(&peer->id, "Activating for message %s",
|
|
peer_wire_name(t));
|
|
subd = new_subd(peer, &channel_id);
|
|
/* We tell lightningd to fire up a subdaemon to handle this! */
|
|
daemon_conn_send(peer->daemon->master,
|
|
take(towire_connectd_peer_spoke(NULL, &peer->id,
|
|
peer->counter,
|
|
t,
|
|
&channel_id,
|
|
is_peer_error(tmpctx, decrypted))));
|
|
}
|
|
|
|
/* Even if we just created it, call this to catch open_channel2 */
|
|
maybe_update_channelid(subd, decrypted);
|
|
|
|
/* Tell them to write. */
|
|
msg_enqueue(subd->outq, take(decrypted));
|
|
|
|
/* Is this a tx_abort? Ignore from now on, and close after sending! */
|
|
if (type == WIRE_TX_ABORT) {
|
|
subd->rcvd_tx_abort = true;
|
|
/* In case it doesn't close by itself */
|
|
notleak(new_reltimer(&peer->daemon->timers, subd,
|
|
time_from_sec(5),
|
|
close_subd_timeout, subd));
|
|
}
|
|
|
|
/* Wait for them to wake us */
|
|
peer->peer_in_lastmsg = type;
|
|
out:
|
|
peer->peer_in_lasttime = time_mono();
|
|
|
|
return io_wait(peer_conn, &peer->peer_in, next_read, peer);
|
|
}
|
|
|
|
static struct io_plan *read_body_from_peer(struct io_conn *peer_conn,
|
|
struct peer *peer)
|
|
{
|
|
u16 len;
|
|
|
|
if (!cryptomsg_decrypt_header(&peer->cs, peer->peer_in, &len))
|
|
return io_close(peer_conn);
|
|
|
|
tal_resize(&peer->peer_in, (u32)len + CRYPTOMSG_BODY_OVERHEAD);
|
|
return io_read(peer_conn, peer->peer_in, tal_count(peer->peer_in),
|
|
read_body_from_peer_done, peer);
|
|
}
|
|
|
|
static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn,
|
|
struct peer *peer)
|
|
{
|
|
assert(peer->to_peer == peer_conn);
|
|
|
|
/* BOLT #8:
|
|
*
|
|
* ### Receiving and Decrypting Messages
|
|
*
|
|
* In order to decrypt the _next_ message in the network
|
|
* stream, the following steps are completed:
|
|
*
|
|
* 1. Read _exactly_ 18 bytes from the network buffer.
|
|
*/
|
|
peer->peer_in = tal_arr(peer, u8, CRYPTOMSG_HDR_SIZE);
|
|
return io_read(peer_conn, peer->peer_in, CRYPTOMSG_HDR_SIZE,
|
|
read_body_from_peer, peer);
|
|
}
|
|
|
|
static struct io_plan *subd_conn_init(struct io_conn *subd_conn,
|
|
struct subd *subd)
|
|
{
|
|
subd->conn = subd_conn;
|
|
|
|
/* subd is a child of the conn: free when it closes! */
|
|
tal_steal(subd->conn, subd);
|
|
tal_add_destructor(subd, destroy_connected_subd);
|
|
return io_duplex(subd_conn,
|
|
read_from_subd(subd_conn, subd),
|
|
write_to_subd(subd_conn, subd));
|
|
}
|
|
|
|
/* Peer disconnected (we remove this if *we* close). */
|
|
static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
|
|
{
|
|
assert(peer->to_peer == peer_conn);
|
|
|
|
/* We are no longer connected. Tell lightningd & gossipd */
|
|
peer->to_peer = NULL;
|
|
send_disconnected(peer->daemon, &peer->id, peer->counter,
|
|
peer->connect_starttime);
|
|
|
|
/* Wake subds: give them 5 seconds to flush. */
|
|
for (size_t i = 0; i < tal_count(peer->subds); i++) {
|
|
/* Might not be connected yet (no destructor, simple) */
|
|
if (!peer->subds[i]->conn) {
|
|
tal_arr_remove(&peer->subds, i);
|
|
i--;
|
|
continue;
|
|
}
|
|
/* Wake the writers to subd: you have 5 seconds */
|
|
notleak(new_reltimer(&peer->daemon->timers,
|
|
peer->subds[i], time_from_sec(5),
|
|
close_subd_timeout, peer->subds[i]));
|
|
io_wake(peer->subds[i]->outq);
|
|
}
|
|
|
|
/* If there were no subds, free peer immediately. */
|
|
if (tal_count(peer->subds) == 0)
|
|
tal_free(peer);
|
|
}
|
|
|
|
/* When peer reconnects, we close the old connection unceremoniously. */
|
|
void destroy_peer_immediately(struct peer *peer)
|
|
{
|
|
/* Forgo normal destructors which involve timeouts */
|
|
if (peer->to_peer)
|
|
tal_del_destructor2(peer->to_peer, destroy_peer_conn, peer);
|
|
|
|
for (size_t i = 0; i < tal_count(peer->subds); i++) {
|
|
if (peer->subds[i]->conn)
|
|
tal_del_destructor(peer->subds[i], destroy_connected_subd);
|
|
}
|
|
tal_free(peer);
|
|
}
|
|
|
|
struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
|
|
struct peer *peer)
|
|
{
|
|
/*~ If conn closes, we drain the subd connections and wait for
|
|
* lightningd to tell us to close with the peer */
|
|
tal_add_destructor2(peer_conn, destroy_peer_conn, peer);
|
|
|
|
/* Start keepalives */
|
|
peer->expecting_pong = PONG_UNEXPECTED;
|
|
set_ping_timer(peer);
|
|
|
|
/* This used to be in openingd; don't break tests. */
|
|
status_peer_debug(&peer->id, "Handed peer, entering loop");
|
|
|
|
return io_duplex(peer_conn,
|
|
read_hdr_from_peer(peer_conn, peer),
|
|
write_to_peer(peer_conn, peer));
|
|
}
|
|
|
|
void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd)
|
|
{
|
|
struct node_id id;
|
|
u64 counter;
|
|
struct peer *peer;
|
|
struct channel_id channel_id;
|
|
struct subd *subd;
|
|
|
|
if (!fromwire_connectd_peer_connect_subd(msg, &id, &counter, &channel_id))
|
|
master_badmsg(WIRE_CONNECTD_PEER_CONNECT_SUBD, msg);
|
|
|
|
/* If receiving fd failed, fd will be -1. Log and ignore
|
|
* (subd will see immediate hangup). */
|
|
if (fd == -1) {
|
|
static bool recvfd_logged = false;
|
|
if (!recvfd_logged) {
|
|
status_broken("receiving lightningd fd failed for %s: %s",
|
|
fmt_node_id(tmpctx, &id),
|
|
strerror(errno));
|
|
recvfd_logged = true;
|
|
}
|
|
/* Maybe free up some fds by closing something. */
|
|
close_random_connection(daemon);
|
|
return;
|
|
}
|
|
|
|
/* Races can happen: this might be gone by now (or reconnected!). */
|
|
peer = peer_htable_get(daemon->peers, &id);
|
|
if (!peer || peer->counter != counter) {
|
|
close(fd);
|
|
return;
|
|
}
|
|
|
|
/* Could be disconnecting now */
|
|
if (!peer->to_peer || peer->draining_state != NOT_DRAINING) {
|
|
close(fd);
|
|
return;
|
|
}
|
|
|
|
/* If peer said something, we created this and queued msg. */
|
|
subd = find_subd(peer, &channel_id);
|
|
if (!subd) {
|
|
subd = new_subd(peer, &channel_id);
|
|
/* Implies lightningd is ready for another peer. */
|
|
release_one_waiting_connection(peer->daemon,
|
|
tal_fmt(tmpctx, "%s given a subd",
|
|
fmt_node_id(tmpctx, &id)));
|
|
}
|
|
|
|
assert(!subd->conn);
|
|
|
|
/* This sets subd->conn inside subd_conn_init, and reparents subd! */
|
|
io_new_conn(peer, fd, subd_conn_init, subd);
|
|
}
|
|
|
|
/* Lightningd says to send a ping */
|
|
void send_manual_ping(struct daemon *daemon, const u8 *msg)
|
|
{
|
|
u8 *ping;
|
|
struct node_id id;
|
|
u64 reqid;
|
|
u16 len, num_pong_bytes;
|
|
struct peer *peer;
|
|
|
|
if (!fromwire_connectd_ping(msg, &reqid, &id, &num_pong_bytes, &len))
|
|
master_badmsg(WIRE_CONNECTD_PING, msg);
|
|
|
|
peer = peer_htable_get(daemon->peers, &id);
|
|
if (!peer) {
|
|
daemon_conn_send(daemon->master,
|
|
take(towire_connectd_ping_done(NULL, reqid,
|
|
false, 0)));
|
|
return;
|
|
}
|
|
|
|
/* We're not supposed to send another ping until previous replied */
|
|
if (peer->expecting_pong != PONG_UNEXPECTED) {
|
|
daemon_conn_send(daemon->master,
|
|
take(towire_connectd_ping_done(NULL, reqid,
|
|
false, 0)));
|
|
return;
|
|
}
|
|
|
|
/* It should never ask for an oversize ping. */
|
|
ping = make_ping(NULL, num_pong_bytes, len);
|
|
if (tal_count(ping) > 65535)
|
|
status_failed(STATUS_FAIL_MASTER_IO, "Oversize ping");
|
|
|
|
peer->ping_start = time_mono();
|
|
inject_peer_msg(peer, take(ping));
|
|
|
|
status_debug("sending ping expecting %sresponse",
|
|
num_pong_bytes >= 65532 ? "no " : "");
|
|
|
|
/* BOLT #1:
|
|
*
|
|
* A node receiving a `ping` message:
|
|
* - if `num_pong_bytes` is less than 65532:
|
|
* - MUST respond by sending a `pong` message, with `byteslen` equal
|
|
* to `num_pong_bytes`.
|
|
* - otherwise (`num_pong_bytes` is **not** less than 65532):
|
|
* - MUST ignore the `ping`.
|
|
*/
|
|
if (num_pong_bytes >= 65532) {
|
|
daemon_conn_send(daemon->master,
|
|
take(towire_connectd_ping_done(NULL, reqid,
|
|
true, 0)));
|
|
return;
|
|
}
|
|
|
|
/* We'll respond to lightningd once the pong comes in */
|
|
peer->expecting_pong = PONG_EXPECTED_COMMAND;
|
|
peer->ping_reqid = reqid;
|
|
|
|
/* Since we're doing this manually, kill and restart timer. */
|
|
tal_free(peer->ping_timer);
|
|
set_ping_timer(peer);
|
|
}
|