connectd: use membuf for more efficient output queue.

This is exactly what membuf is for: it handles expansion much more
neatly.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2026-02-16 14:06:11 +10:30
parent afdc92fedf
commit 963b353a30
3 changed files with 21 additions and 30 deletions

View File

@@ -128,8 +128,9 @@ static struct peer *new_peer(struct daemon *daemon,
peer->cs = *cs;
peer->subds = tal_arr(peer, struct subd *, 0);
peer->peer_in = NULL;
peer->encrypted_peer_out = tal_arr(peer, u8, 0);
peer->encrypted_peer_out_off = 0;
membuf_init(&peer->encrypted_peer_out,
tal_arr(peer, u8, 0), 0,
membuf_tal_resize);
peer->encrypted_peer_out_sent = 0;
peer->nonurgent_flush_timer = NULL;
peer->peer_out_urgent = 0;

View File

@@ -3,6 +3,7 @@
#include "config.h"
#include <bitcoin/short_channel_id.h>
#include <ccan/htable/htable_type.h>
#include <ccan/membuf/membuf.h>
#include <ccan/timer/timer.h>
#include <common/bigsize.h>
#include <common/crypto_state.h>
@@ -86,8 +87,7 @@ struct peer {
struct msg_queue *peer_outq;
/* Encrypted peer sending buffer */
u8 *encrypted_peer_out;
size_t encrypted_peer_out_off;
MEMBUF(u8) encrypted_peer_out;
size_t encrypted_peer_out_sent;
size_t peer_out_urgent;
bool flushing_nonurgent;

View File

@@ -473,23 +473,21 @@ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg)
/* Do we have enough bytes without padding? */
static bool have_full_encrypted_queue(const struct peer *peer)
{
size_t bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off;
return bytes >= UNIFORM_MESSAGE_SIZE;
return membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE;
}
/* Do we have nothing in queue? */
static bool have_empty_encrypted_queue(const struct peer *peer)
{
size_t bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off;
return bytes == 0;
return membuf_num_elems(&peer->encrypted_peer_out) == 0;
}
/* (Continue) writing the encrypted_peer_out array */
static struct io_plan *write_encrypted_to_peer(struct peer *peer)
{
assert(have_full_encrypted_queue(peer));
assert(membuf_num_elems(&peer->encrypted_peer_out) >= UNIFORM_MESSAGE_SIZE);
return io_write_partial(peer->to_peer,
peer->encrypted_peer_out + peer->encrypted_peer_out_off,
membuf_elems(&peer->encrypted_peer_out),
UNIFORM_MESSAGE_SIZE,
&peer->encrypted_peer_out_sent,
write_to_peer, peer);
@@ -499,8 +497,8 @@ static struct io_plan *write_encrypted_to_peer(struct peer *peer)
static bool encrypt_append(struct peer *peer, const u8 *msg TAKES)
{
int type = fromwire_peektype(msg);
u8 *enc;
size_t prev_size;
const u8 *enc;
size_t enclen;
/* Special message type directing us to process batch items. */
if (type == WIRE_PROTOCOL_BATCH_ELEMENT) {
@@ -510,16 +508,16 @@ static bool encrypt_append(struct peer *peer, const u8 *msg TAKES)
} else {
enc = cryptomsg_encrypt_msg(tmpctx, &peer->cs, msg);
}
prev_size = tal_bytelen(peer->encrypted_peer_out);
tal_resize(&peer->encrypted_peer_out, prev_size + tal_bytelen(enc));
memcpy(peer->encrypted_peer_out + prev_size, enc, tal_bytelen(enc));
enclen = tal_bytelen(enc);
memcpy(membuf_add(&peer->encrypted_peer_out, enclen),
enc,
enclen);
return true;
}
static void pad_encrypted_queue(struct peer *peer)
{
size_t needed, pingpad, bytes;
size_t needed, pingpad;
u8 *ping;
/* BOLT #8:
@@ -545,10 +543,8 @@ static void pad_encrypted_queue(struct peer *peer)
* The prefixed message length is encoded as a 2-byte big-endian integer,
* for a total maximum packet length of `2 + 16 + 65535 + 16` = `65569` bytes.
*/
assert(!have_full_encrypted_queue(peer));
bytes = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off;
needed = UNIFORM_MESSAGE_SIZE - bytes;
assert(membuf_num_elems(&peer->encrypted_peer_out) < UNIFORM_MESSAGE_SIZE);
needed = UNIFORM_MESSAGE_SIZE - membuf_num_elems(&peer->encrypted_peer_out);
/* BOLT #1:
* 1. type: 18 (`ping`)
@@ -575,7 +571,7 @@ static void pad_encrypted_queue(struct peer *peer)
abort();
assert(have_full_encrypted_queue(peer));
assert((tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off) % UNIFORM_MESSAGE_SIZE == 0);
assert(membuf_num_elems(&peer->encrypted_peer_out) % UNIFORM_MESSAGE_SIZE == 0);
}
/* Kicks off write_to_peer() to look for more gossip to send from store */
@@ -1208,16 +1204,10 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
peer->flushing_nonurgent = false;
/* Write any remainder. */
peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent;
/* We wrote out some bytes from membuf. */
membuf_consume(&peer->encrypted_peer_out, peer->encrypted_peer_out_sent);
peer->encrypted_peer_out_sent = 0;
/* If all used, clean up */
if (peer->encrypted_peer_out_off == tal_bytelen(peer->encrypted_peer_out)) {
peer->encrypted_peer_out_off = 0;
tal_resize(&peer->encrypted_peer_out, 0);
}
while (!have_full_encrypted_queue(peer)) {
const u8 *msg;
struct io_plan *dev_override;