connectd: tie gossip query responses into ratelimiting code.
A bit tricky, since we get more than one message at a time. However, this just means we go over quota for a bit, and will get caught when those are sent (we do this for a single message already, so it's not that much worse). Note: this not only limits sending, but it limits the actuall query processing, which is nice. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -420,24 +420,14 @@ static void wake_gossip(struct peer *peer)
|
||||
peer->gs.gossip_timer = gossip_stream_timer(peer);
|
||||
}
|
||||
|
||||
/* If we are streaming gossip, get something from gossip store */
|
||||
static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
|
||||
/* Gossip response or something from gossip store */
|
||||
static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer)
|
||||
{
|
||||
const u8 *msg;
|
||||
struct timemono now;
|
||||
struct gossmap *gossmap;
|
||||
u32 timestamp;
|
||||
|
||||
/* dev-mode can suppress all gossip */
|
||||
if (peer->daemon->dev_suppress_gossip)
|
||||
return NULL;
|
||||
|
||||
/* Not streaming right now? */
|
||||
if (!peer->gs.active)
|
||||
return NULL;
|
||||
|
||||
/* This should be around to kick us every 60 seconds */
|
||||
assert(peer->gs.gossip_timer);
|
||||
const u8 **msgs;
|
||||
|
||||
/* If it's been over a second, make a fresh start. */
|
||||
now = time_mono();
|
||||
@@ -462,6 +452,31 @@ static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
|
||||
|
||||
gossmap = get_gossmap(peer->daemon);
|
||||
|
||||
/* This can return more than one. */
|
||||
msgs = maybe_create_query_responses(tmpctx, peer, gossmap);
|
||||
if (tal_count(msgs) > 0) {
|
||||
/* We return the first one for immediate sending, and queue
|
||||
* others for future. We add all the lengths now though! */
|
||||
for (size_t i = 0; i < tal_count(msgs); i++) {
|
||||
peer->gs.bytes_this_second += tal_bytelen(msgs[i]);
|
||||
status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]);
|
||||
if (i > 0)
|
||||
msg_enqueue(peer->peer_outq, take(msgs[i]));
|
||||
}
|
||||
return msgs[0];
|
||||
}
|
||||
|
||||
/* dev-mode can suppress all gossip */
|
||||
if (peer->daemon->dev_suppress_gossip)
|
||||
return NULL;
|
||||
|
||||
/* Not streaming right now? */
|
||||
if (!peer->gs.active)
|
||||
return NULL;
|
||||
|
||||
/* This should be around to kick us every 60 seconds */
|
||||
assert(peer->gs.gossip_timer);
|
||||
|
||||
again:
|
||||
msg = gossmap_stream_next(ctx, gossmap, peer->gs.iter, ×tamp);
|
||||
if (msg) {
|
||||
@@ -482,6 +497,7 @@ again:
|
||||
return msg;
|
||||
}
|
||||
|
||||
/* No gossip left to send */
|
||||
peer->gs.active = false;
|
||||
return NULL;
|
||||
}
|
||||
@@ -940,14 +956,9 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
|
||||
return io_sock_shutdown(peer_conn);
|
||||
|
||||
/* If they want us to send gossip, do so now. */
|
||||
if (!peer->draining) {
|
||||
/* FIXME: make it return the message? */
|
||||
if (maybe_send_query_responses(peer, get_gossmap(peer->daemon))) {
|
||||
msg = msg_dequeue(peer->peer_outq);
|
||||
} else {
|
||||
msg = maybe_from_gossip_store(NULL, peer);
|
||||
}
|
||||
}
|
||||
if (!peer->draining)
|
||||
msg = maybe_gossip_msg(NULL, peer);
|
||||
|
||||
if (!msg) {
|
||||
/* Tell them to read again, */
|
||||
io_wake(&peer->subds);
|
||||
|
||||
@@ -108,13 +108,14 @@ static void uniquify_node_ids(struct node_id **ids)
|
||||
}
|
||||
|
||||
/* We are fairly careful to avoid the peer DoSing us with channel queries:
|
||||
* this routine sends information about a single short_channel_id, unless
|
||||
* this routine creates messages about a single short_channel_id, unless
|
||||
* it's finished all of them. */
|
||||
bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
const u8 **maybe_create_query_responses(const tal_t *ctx,
|
||||
struct peer *peer,
|
||||
struct gossmap *gossmap)
|
||||
{
|
||||
size_t i, num;
|
||||
bool sent = false;
|
||||
const u8 *msg;
|
||||
const u8 **msgs = tal_arr(ctx, const u8 *, 0);
|
||||
|
||||
/* BOLT #7:
|
||||
*
|
||||
@@ -122,7 +123,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
*/
|
||||
/* Search for next short_channel_id we know about. */
|
||||
num = tal_count(peer->scid_queries);
|
||||
for (i = peer->scid_query_idx; !sent && i < num; i++) {
|
||||
for (i = peer->scid_query_idx; tal_count(msgs) == 0 && i < num; i++) {
|
||||
struct gossmap_chan *chan;
|
||||
struct gossmap_node *node;
|
||||
struct node_id node_id;
|
||||
@@ -136,9 +137,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
* - MUST reply with a `channel_announcement`
|
||||
*/
|
||||
if (peer->scid_query_flags[i] & SCID_QF_ANNOUNCE) {
|
||||
msg = gossmap_chan_get_announce(NULL, gossmap, chan);
|
||||
inject_peer_msg(peer, take(msg));
|
||||
sent = true;
|
||||
tal_arr_expand(&msgs,
|
||||
gossmap_chan_get_announce(msgs, gossmap, chan));
|
||||
}
|
||||
|
||||
/* BOLT #7:
|
||||
@@ -152,15 +152,13 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
* `node_id_2` */
|
||||
if ((peer->scid_query_flags[i] & SCID_QF_UPDATE1)
|
||||
&& gossmap_chan_set(chan, 0)) {
|
||||
msg = gossmap_chan_get_update(NULL, gossmap, chan, 0);
|
||||
inject_peer_msg(peer, take(msg));
|
||||
sent = true;
|
||||
tal_arr_expand(&msgs,
|
||||
gossmap_chan_get_update(msgs, gossmap, chan, 0));
|
||||
}
|
||||
if ((peer->scid_query_flags[i] & SCID_QF_UPDATE2)
|
||||
&& gossmap_chan_set(chan, 1)) {
|
||||
msg = gossmap_chan_get_update(NULL, gossmap, chan, 1);
|
||||
inject_peer_msg(peer, take(msg));
|
||||
sent = true;
|
||||
tal_arr_expand(&msgs,
|
||||
gossmap_chan_get_update(msgs, gossmap, chan, 1));
|
||||
}
|
||||
|
||||
/* BOLT #7:
|
||||
@@ -212,7 +210,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
/* If we haven't sent anything above, we look for the next
|
||||
* node_announcement to send. */
|
||||
num = tal_count(peer->scid_query_nodes);
|
||||
for (i = peer->scid_query_nodes_idx; !sent && i < num; i++) {
|
||||
for (i = peer->scid_query_nodes_idx; tal_count(msgs) == 0 && i < num; i++) {
|
||||
const struct gossmap_node *n;
|
||||
|
||||
/* Not every node announces itself (we know it exists because
|
||||
@@ -221,9 +219,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
if (!n || !gossmap_node_announced(n))
|
||||
continue;
|
||||
|
||||
msg = gossmap_node_get_announce(NULL, gossmap, n);
|
||||
inject_peer_msg(peer, take(msg));
|
||||
sent = true;
|
||||
tal_arr_expand(&msgs,
|
||||
gossmap_node_get_announce(msgs, gossmap, n));
|
||||
}
|
||||
peer->scid_query_nodes_idx = i;
|
||||
|
||||
@@ -245,7 +242,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
u8 *end = towire_reply_short_channel_ids_end(peer,
|
||||
&chainparams->genesis_blockhash,
|
||||
true);
|
||||
inject_peer_msg(peer, take(end));
|
||||
tal_arr_expand(&msgs, end);
|
||||
|
||||
/* We're done! Clean up so we simply pass-through next time. */
|
||||
peer->scid_queries = tal_free(peer->scid_queries);
|
||||
@@ -254,7 +251,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
|
||||
peer->scid_query_nodes = tal_free(peer->scid_query_nodes);
|
||||
peer->scid_query_nodes_idx = 0;
|
||||
}
|
||||
return sent;
|
||||
|
||||
return msgs;
|
||||
}
|
||||
|
||||
/* The peer can ask about an array of short channel ids: we don't assemble the
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
#define LIGHTNING_CONNECTD_QUERIES_H
|
||||
#include "config.h"
|
||||
|
||||
/* See if there's a query to respond to, if so, do it and return true */
|
||||
bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap);
|
||||
/* See if there's a query to respond to: if so, return some msgs */
|
||||
const u8 **maybe_create_query_responses(const tal_t *ctx,
|
||||
struct peer *peer,
|
||||
struct gossmap *gossmap);
|
||||
|
||||
void handle_query_short_channel_ids(struct peer *peer, const u8 *msg);
|
||||
void handle_query_channel_range(struct peer *peer, const u8 *msg);
|
||||
|
||||
@@ -2043,7 +2043,7 @@ def test_listchannels_deprecated_local(node_factory, bitcoind):
|
||||
assert vals == [(True, True, l1l2)] * 2 + [(True, False, l2l3)] * 2 or vals == [(True, False, l2l3)] * 2 + [(True, True, l1l2)] * 2
|
||||
|
||||
|
||||
def test_gossip_throttle(node_factory, bitcoind):
|
||||
def test_gossip_throttle(node_factory, bitcoind, chainparams):
|
||||
"""Make some gossip, test it gets throttled"""
|
||||
l1, l2, l3, l4 = node_factory.line_graph(4, wait_for_announce=True,
|
||||
opts=[{}, {}, {}, {'dev-throttle-gossip': None}])
|
||||
@@ -2063,9 +2063,11 @@ def test_gossip_throttle(node_factory, bitcoind):
|
||||
'--max-messages={}'.format(expected),
|
||||
'{}@localhost:{}'.format(l1.info['id'], l1.port)],
|
||||
check=True,
|
||||
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
|
||||
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
|
||||
time_fast = time.time() - start_fast
|
||||
assert time_fast < 2
|
||||
# Remove timestamp filter, since timestamp will change!
|
||||
out1 = [m for m in out1 if not m.startswith(b'0109')]
|
||||
|
||||
# l4 is throttled
|
||||
start_slow = time.time()
|
||||
@@ -2076,10 +2078,62 @@ def test_gossip_throttle(node_factory, bitcoind):
|
||||
'--max-messages={}'.format(expected),
|
||||
'{}@localhost:{}'.format(l4.info['id'], l4.port)],
|
||||
check=True,
|
||||
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
|
||||
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
|
||||
time_slow = time.time() - start_slow
|
||||
assert time_slow > 3
|
||||
|
||||
# Remove timestamp filter, since timestamp will change!
|
||||
out2 = [m for m in out2 if not m.startswith(b'0109')]
|
||||
|
||||
# Contents should be identical (once uniquified, since each
|
||||
# doubles-up on its own gossip)
|
||||
assert set(out1.split()) == set(out2.split())
|
||||
assert set(out1) == set(out2)
|
||||
|
||||
encoded = subprocess.run(['devtools/mkencoded', '--scids', '00',
|
||||
first_scid(l1, l2),
|
||||
first_scid(l2, l3),
|
||||
first_scid(l3, l4)],
|
||||
check=True,
|
||||
timeout=TIMEOUT,
|
||||
stdout=subprocess.PIPE).stdout.strip().decode()
|
||||
|
||||
query = subprocess.run(['devtools/mkquery',
|
||||
'query_short_channel_ids',
|
||||
chainparams['chain_hash'],
|
||||
encoded,
|
||||
# We want channel announce, updates and node ann.
|
||||
'00', '1F1F1F'],
|
||||
check=True,
|
||||
timeout=TIMEOUT,
|
||||
stdout=subprocess.PIPE).stdout.strip()
|
||||
|
||||
# Queries should also be ratelimited, so compare l1 vs l4.
|
||||
start_fast = time.time()
|
||||
out3 = subprocess.run(['devtools/gossipwith',
|
||||
'--no-gossip',
|
||||
'--hex',
|
||||
'--network={}'.format(TEST_NETWORK),
|
||||
'--max-messages={}'.format(expected),
|
||||
'{}@localhost:{}'.format(l1.info['id'], l1.port),
|
||||
query],
|
||||
check=True,
|
||||
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
|
||||
time_fast = time.time() - start_fast
|
||||
assert time_fast < 2
|
||||
out3 = [m for m in out3 if not m.startswith(b'0109')]
|
||||
assert set(out1) == set(out3)
|
||||
|
||||
start_slow = time.time()
|
||||
out4 = subprocess.run(['devtools/gossipwith',
|
||||
'--no-gossip',
|
||||
'--hex',
|
||||
'--network={}'.format(TEST_NETWORK),
|
||||
'--max-messages={}'.format(expected),
|
||||
'{}@localhost:{}'.format(l4.info['id'], l4.port),
|
||||
query],
|
||||
check=True,
|
||||
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
|
||||
time_slow = time.time() - start_slow
|
||||
assert time_slow > 3
|
||||
out4 = [m for m in out4 if not m.startswith(b'0109')]
|
||||
assert set(out2) == set(out4)
|
||||
|
||||
Reference in New Issue
Block a user