diff --git a/connectd/connectd.c b/connectd/connectd.c index aec89d240..952f94391 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -131,7 +131,9 @@ static struct peer *new_peer(struct daemon *daemon, peer->encrypted_peer_out = tal_arr(peer, u8, 0); peer->encrypted_peer_out_off = 0; peer->encrypted_peer_out_sent = 0; - peer->urgent = false; + peer->nonurgent_flush_timer = NULL; + peer->peer_out_urgent = 0; + peer->flushing_nonurgent = false; peer->draining_state = NOT_DRAINING; peer->peer_in_lastmsg = -1; peer->peer_outq = msg_queue_new(peer, false); diff --git a/connectd/connectd.h b/connectd/connectd.h index 1fdc95c4b..55f7719a4 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -79,9 +79,6 @@ struct peer { /* Connections to the subdaemons */ struct subd **subds; - /* When socket has Nagle overridden */ - bool urgent; - /* Input buffer. */ u8 *peer_in; @@ -92,6 +89,9 @@ struct peer { u8 *encrypted_peer_out; size_t encrypted_peer_out_off; size_t encrypted_peer_out_sent; + size_t peer_out_urgent; + bool flushing_nonurgent; + struct oneshot *nonurgent_flush_timer; /* We stream from the gossip_store for them, when idle */ struct gossip_state gs; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 0ac2f80df..26c99814b 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -55,6 +55,7 @@ struct subd { }; /* FIXME: reorder! */ +static bool is_urgent(enum peer_wire type); static void destroy_connected_subd(struct subd *subd); static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer); @@ -98,10 +99,18 @@ static void close_subd_timeout(struct subd *subd) io_close(subd->conn); } +static void msg_to_peer_outq(struct peer *peer, const u8 *msg TAKES) +{ + if (is_urgent(fromwire_peektype(msg))) + peer->peer_out_urgent++; + + msg_enqueue(peer->peer_outq, msg); +} + void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { status_peer_io(LOG_IO_OUT, &peer->id, msg); - msg_enqueue(peer->peer_outq, msg); + msg_to_peer_outq(peer, msg); } static void drain_peer(struct peer *peer) @@ -293,7 +302,7 @@ void setup_peer_gossip_store(struct peer *peer, return; } -static bool UNNEEDED is_urgent(enum peer_wire type) +static bool is_urgent(enum peer_wire type) { switch (type) { case WIRE_INIT: @@ -629,7 +638,7 @@ static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer) peer->gs.bytes_this_second += tal_bytelen(msgs[i]); status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]); if (i > 0) - msg_enqueue(peer->peer_outq, take(msgs[i])); + msg_to_peer_outq(peer, take(msgs[i])); } return msgs[0]; } @@ -1145,7 +1154,14 @@ static const u8 *next_msg_for_peer(struct peer *peer) const u8 *msg; msg = msg_dequeue(peer->peer_outq); - if (!msg) { + if (msg) { + if (is_urgent(fromwire_peektype(msg))) { + peer->peer_out_urgent--; + } + } else { + /* Nothing in queue means nothing urgent in queue, surely! */ + assert(peer->peer_out_urgent == 0); + /* Draining? Don't send gossip. */ if (peer->draining_state == WRITING_TO_PEER) return NULL; @@ -1167,14 +1183,31 @@ static const u8 *next_msg_for_peer(struct peer *peer) return msg; } +static void nonurgent_flush(struct peer *peer) +{ + peer->nonurgent_flush_timer = NULL; + peer->flushing_nonurgent = true; + io_wake(peer->peer_outq); +} + static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { + bool do_flush; assert(peer->to_peer == peer_conn); + /* We always pad and send if we have an urgent msg or our + * non-urgent has gone off, or we're trying to close. */ + do_flush = (peer->peer_out_urgent != 0 + || peer->flushing_nonurgent + || peer->draining_state == WRITING_TO_PEER); + + peer->flushing_nonurgent = false; + /* Write any remainder. */ peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent; peer->encrypted_peer_out_sent = 0; + /* If all used, clean up */ if (peer->encrypted_peer_out_off == tal_bytelen(peer->encrypted_peer_out)) { peer->encrypted_peer_out_off = 0; @@ -1188,19 +1221,33 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Pop tail of send queue (or gossip) */ msg = next_msg_for_peer(peer); if (!msg) { - /* Nothing to send at all? We're done */ - if (have_empty_encrypted_queue(peer)) { - /* Draining? Shutdown socket (to avoid losing msgs) */ - if (peer->draining_state == WRITING_TO_PEER) { - status_peer_debug(&peer->id, "draining done, shutting down"); - io_wake(&peer->peer_in); - return io_sock_shutdown(peer_conn); - } + /* Draining? Shutdown socket (to avoid losing msgs) */ + if (have_empty_encrypted_queue(peer) + && peer->draining_state == WRITING_TO_PEER) { + status_peer_debug(&peer->id, "draining done, shutting down"); + io_wake(&peer->peer_in); + return io_sock_shutdown(peer_conn); + } + /* If no urgent message, and not draining, we wait. */ + if (!do_flush) { /* Tell them to read again, */ io_wake(&peer->subds); io_wake(&peer->peer_in); + /* Set up a timer if not already set */ + if (!have_empty_encrypted_queue(peer) + && !peer->nonurgent_flush_timer) { + /* Bias towards larger values, but don't be too predictable */ + u32 max = pseudorand(1000); + u32 msec = 1000 - pseudorand(1 + max); + peer->nonurgent_flush_timer + = new_reltimer(&peer->daemon->timers, + peer, + time_from_msec(msec), + nonurgent_flush, peer); + } + /* Wait for them to wake us */ return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer); } @@ -1221,6 +1268,8 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, return io_close(peer->to_peer); } } + + peer->nonurgent_flush_timer = tal_free(peer->nonurgent_flush_timer); return write_encrypted_to_peer(peer); } @@ -1232,7 +1281,7 @@ static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, maybe_update_channelid(subd, subd->in); /* Tell them to encrypt & write. */ - msg_enqueue(subd->peer->peer_outq, take(subd->in)); + msg_to_peer_outq(subd->peer, take(subd->in)); subd->in = NULL; /* Wait for them to wake us */