connectd: refactor outgoing loop.
Give us a single "next message" function to call. This will be useful when we want to write more than one at a time. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -1085,6 +1085,33 @@ static void maybe_update_channelid(struct subd *subd, const u8 *msg)
|
||||
}
|
||||
}
|
||||
|
||||
static const u8 *next_msg_for_peer(struct peer *peer)
|
||||
{
|
||||
const u8 *msg;
|
||||
|
||||
msg = msg_dequeue(peer->peer_outq);
|
||||
if (!msg) {
|
||||
/* Draining? Don't send gossip. */
|
||||
if (peer->draining_state == WRITING_TO_PEER)
|
||||
return NULL;
|
||||
|
||||
/* If they want us to send gossip, do so now. */
|
||||
msg = maybe_gossip_msg(NULL, peer);
|
||||
if (!msg)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* dev_disconnect can disable writes (discard everything) */
|
||||
if (peer->dev_writes_enabled) {
|
||||
if (*peer->dev_writes_enabled == 0) {
|
||||
return tal_free(msg);
|
||||
}
|
||||
(*peer->dev_writes_enabled)--;
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
||||
struct peer *peer)
|
||||
{
|
||||
@@ -1094,10 +1121,8 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
||||
/* Free last sent one (if any) */
|
||||
peer->sent_to_peer = tal_free(peer->sent_to_peer);
|
||||
|
||||
/* Pop tail of send queue */
|
||||
msg = msg_dequeue(peer->peer_outq);
|
||||
|
||||
/* Still nothing to send? */
|
||||
/* Pop tail of send queue (or gossip) */
|
||||
msg = next_msg_for_peer(peer);
|
||||
if (!msg) {
|
||||
/* Draining? Shutdown socket (to avoid losing msgs) */
|
||||
if (peer->draining_state == WRITING_TO_PEER) {
|
||||
@@ -1106,33 +1131,18 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
||||
return io_sock_shutdown(peer_conn);
|
||||
}
|
||||
|
||||
/* If they want us to send gossip, do so now. */
|
||||
msg = maybe_gossip_msg(NULL, peer);
|
||||
if (!msg) {
|
||||
/* Tell them to read again, */
|
||||
io_wake(&peer->subds);
|
||||
io_wake(&peer->peer_in);
|
||||
|
||||
/* Wait for them to wake us */
|
||||
return msg_queue_wait(peer_conn, peer->peer_outq,
|
||||
write_to_peer, peer);
|
||||
}
|
||||
return msg_queue_wait(peer_conn, peer->peer_outq, write_to_peer, peer);
|
||||
}
|
||||
|
||||
if (peer->draining_state == WRITING_TO_PEER)
|
||||
status_peer_debug(&peer->id, "draining, but sending %s.",
|
||||
peer_wire_name(fromwire_peektype(msg)));
|
||||
|
||||
/* dev_disconnect can disable writes */
|
||||
if (peer->dev_writes_enabled) {
|
||||
if (*peer->dev_writes_enabled == 0) {
|
||||
tal_free(msg);
|
||||
/* Continue, to drain queue */
|
||||
return write_to_peer(peer_conn, peer);
|
||||
}
|
||||
(*peer->dev_writes_enabled)--;
|
||||
}
|
||||
|
||||
return encrypt_and_send(peer, take(msg));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user