connect: switch to using io_write_partial instead of io_write.
This gives us finer control over write sizes: for now we just cap the write size. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -127,7 +127,9 @@ static struct peer *new_peer(struct daemon *daemon,
|
||||
peer->cs = *cs;
|
||||
peer->subds = tal_arr(peer, struct subd *, 0);
|
||||
peer->peer_in = NULL;
|
||||
peer->sent_to_peer = NULL;
|
||||
peer->encrypted_peer_out = NULL;
|
||||
peer->encrypted_peer_out_off = 0;
|
||||
peer->encrypted_peer_out_sent = 0;
|
||||
peer->urgent = false;
|
||||
peer->draining_state = NOT_DRAINING;
|
||||
peer->peer_in_lastmsg = -1;
|
||||
|
||||
@@ -88,8 +88,10 @@ struct peer {
|
||||
/* Output buffer. */
|
||||
struct msg_queue *peer_outq;
|
||||
|
||||
/* Peer sent buffer (for freeing after sending) */
|
||||
const u8 *sent_to_peer;
|
||||
/* Encrypted peer sending buffer */
|
||||
const u8 *encrypted_peer_out;
|
||||
size_t encrypted_peer_out_off;
|
||||
size_t encrypted_peer_out_sent;
|
||||
|
||||
/* We stream from the gossip_store for them, when idle */
|
||||
struct gossip_state gs;
|
||||
|
||||
@@ -26,6 +26,9 @@
|
||||
#include <wire/peer_wire.h>
|
||||
#include <wire/wire_io.h>
|
||||
|
||||
/* Maximum write(), to create uniform size packets. */
|
||||
#define MAX_MESSAGE_SIZE 1460
|
||||
|
||||
struct subd {
|
||||
/* Owner: we are in peer->subds[] */
|
||||
struct peer *peer;
|
||||
@@ -490,6 +493,19 @@ static struct io_plan *msg_out_dev_disconnect(struct peer *peer, const u8 *msg)
|
||||
abort();
|
||||
}
|
||||
|
||||
/* (Continue) writing the encrypted_peer_out array */
|
||||
static struct io_plan *write_encrypted_to_peer(struct peer *peer)
|
||||
{
|
||||
size_t max = tal_bytelen(peer->encrypted_peer_out) - peer->encrypted_peer_out_off;
|
||||
if (max > MAX_MESSAGE_SIZE)
|
||||
max = MAX_MESSAGE_SIZE;
|
||||
return io_write_partial(peer->to_peer,
|
||||
peer->encrypted_peer_out + peer->encrypted_peer_out_off,
|
||||
max,
|
||||
&peer->encrypted_peer_out_sent,
|
||||
write_to_peer, peer);
|
||||
}
|
||||
|
||||
static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES)
|
||||
{
|
||||
int type = fromwire_peektype(msg);
|
||||
@@ -498,19 +514,15 @@ static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES)
|
||||
|
||||
/* Special message type directing us to process batch items. */
|
||||
if (type == WIRE_PROTOCOL_BATCH_ELEMENT) {
|
||||
peer->sent_to_peer = process_batch_elements(peer, msg);
|
||||
if (!peer->sent_to_peer)
|
||||
peer->encrypted_peer_out = process_batch_elements(peer, msg);
|
||||
if (!peer->encrypted_peer_out)
|
||||
return io_close(peer->to_peer);
|
||||
}
|
||||
else {
|
||||
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
|
||||
peer->encrypted_peer_out = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
|
||||
}
|
||||
/* We free this and the encrypted version in next write_to_peer */
|
||||
|
||||
return io_write(peer->to_peer,
|
||||
peer->sent_to_peer,
|
||||
tal_bytelen(peer->sent_to_peer),
|
||||
write_to_peer, peer);
|
||||
return write_encrypted_to_peer(peer);
|
||||
}
|
||||
|
||||
/* Kicks off write_to_peer() to look for more gossip to send from store */
|
||||
@@ -1123,8 +1135,14 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
||||
|
||||
assert(peer->to_peer == peer_conn);
|
||||
|
||||
/* Write any remainder. */
|
||||
peer->encrypted_peer_out_off += peer->encrypted_peer_out_sent;
|
||||
if (peer->encrypted_peer_out_off < tal_bytelen(peer->encrypted_peer_out))
|
||||
return write_encrypted_to_peer(peer);
|
||||
|
||||
/* Free last sent one (if any) */
|
||||
peer->sent_to_peer = tal_free(peer->sent_to_peer);
|
||||
peer->encrypted_peer_out = tal_free(peer->encrypted_peer_out);
|
||||
peer->encrypted_peer_out_off = 0;
|
||||
|
||||
/* Pop tail of send queue (or gossip) */
|
||||
msg = next_msg_for_peer(peer);
|
||||
|
||||
Reference in New Issue
Block a user