diff --git a/connectd/connectd.c b/connectd/connectd.c index 44f81b6c2..792b472f5 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -127,7 +127,9 @@ static struct peer *new_peer(struct daemon *daemon, peer->cs = *cs; peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; - peer->sent_to_peer = NULL; + peer->encrypted_peer_out = NULL; + peer->encrypted_peer_out_off = 0; + peer->encrypted_peer_out_sent = 0; peer->urgent = false; peer->draining_state = NOT_DRAINING; peer->peer_in_lastmsg = -1; diff --git a/connectd/connectd.h b/connectd/connectd.h index 41a86cd10..451abf7a1 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -88,8 +88,10 @@ struct peer { /* Output buffer. */ struct msg_queue *peer_outq; - /* Peer sent buffer (for freeing after sending) */ - const u8 *sent_to_peer; + /* Encrypted peer sending buffer */ + const u8 *encrypted_peer_out; + size_t encrypted_peer_out_off; + size_t encrypted_peer_out_sent; /* We stream from the gossip_store for them, when idle */ struct gossip_state gs; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 40abb5082..b19bffe99 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -26,6 +26,9 @@ #include #include +/* Maximum write(), to create uniform size packets. */ +#define MAX_MESSAGE_SIZE 1460 + struct subd { /* Owner: we are in peer->subds[] */ struct peer *peer; @@ -490,6 +493,19 @@ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg) abort(); } +/* (Continue) writing the encrypted_peer_out array */ +static struct io_plan *write_encrypted_to_peer(struct peer *peer) +{ + size_t max = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off; + if (max > MAX_MESSAGE_SIZE) + max = MAX_MESSAGE_SIZE; + return io_write_partial(peer->to_peer, + peer->encrypted_peer_out + peer->encrypted_peer_out_off, + max, + &peer->encrypted_peer_out_sent, + write_to_peer, peer); +} + static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) { int type = fromwire_peektype(msg); @@ -498,19 +514,15 @@ static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES) /* Special message type directing us to process batch items. */ if (type == WIRE_PROTOCOL_BATCH_ELEMENT) { - peer->sent_to_peer = process_batch_elements(peer, msg); - if (!peer->sent_to_peer) + peer->encrypted_peer_out = process_batch_elements(peer, msg); + if (!peer->encrypted_peer_out) return io_close(peer->to_peer); } else { - peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg); + peer->encrypted_peer_out = cryptomsg_encrypt_msg(peer, &peer->cs, msg); } - /* We free this and the encrypted version in next write_to_peer */ - return io_write(peer->to_peer, - peer->sent_to_peer, - tal_bytelen(peer->sent_to_peer), - write_to_peer, peer); + return write_encrypted_to_peer(peer); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -1123,8 +1135,14 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, assert(peer->to_peer == peer_conn); + /* Write any remainder. */ + peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent; + if (peer->encrypted_peer_out_off < tal_bytelen(peer->encrypted_peer_out)) + return write_encrypted_to_peer(peer); + /* Free last sent one (if any) */ - peer->sent_to_peer = tal_free(peer->sent_to_peer); + peer->encrypted_peer_out = tal_free(peer->encrypted_peer_out); + peer->encrypted_peer_out_off = 0; /* Pop tail of send queue (or gossip) */ msg = next_msg_for_peer(peer);