diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 820181fb9..fc5d99b87 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -77,6 +77,7 @@ class Peer(Logger, EventListener): 'query_short_channel_ids', 'reply_short_channel_ids', 'reply_short_channel_ids_end') DELAY_INC_MSG_PROCESSING_SLEEP = 0.01 + MIN_TIME_BETWEEN_SENDING_COMMITSIGS = 0.05 RECV_GOSSIP_QUEUE_SOFT_MAXSIZE = 2000 RECV_GOSSIP_QUEUE_HARD_MAXSIZE = 5000 @@ -132,6 +133,7 @@ class Peer(Logger, EventListener): self.register_callbacks() self._num_gossip_messages_forwarded = 0 self._processed_onion_cache = LRUCache(maxsize=100) # type: LRUCache[bytes, ProcessedOnionPacket] + self._last_commitsig_sent_time = time.monotonic() def send_message(self, message_name: str, **kwargs): assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!" @@ -1859,7 +1861,6 @@ class Peer(Logger, EventListener): f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}") return chan.receive_fail_htlc(htlc_id, error_bytes=reason) # TODO handle exc and maybe fail channel (e.g. bad htlc_id) - self.maybe_send_commitment(chan) def maybe_send_commitment(self, chan: Channel) -> bool: assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!" @@ -1871,6 +1872,12 @@ class Peer(Logger, EventListener): # if there are no changes, we will not (and must not) send a new commitment if not chan.has_pending_changes(REMOTE): return False + now = time.monotonic() + if now - self._last_commitsig_sent_time < self.MIN_TIME_BETWEEN_SENDING_COMMITSIGS: + # We recently sent "commitment_signed". Delay sending again, to allow batching updates. + # No need to set a timer, htlc_switch polling will call us again. + return False + self._last_commitsig_sent_time = now self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.') sig_64, htlc_sigs = chan.sign_next_commitment() self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)) @@ -1983,7 +1990,6 @@ class Peer(Logger, EventListener): f"chan={chan.get_id_for_log()}. {htlc_id=}. {chan.get_state()=!r}. {chan.peer_state=!r}") return chan.receive_htlc_settle(preimage, htlc_id) # TODO handle exc and maybe fail channel (e.g. bad htlc_id) - self.maybe_send_commitment(chan) def on_update_fail_malformed_htlc(self, chan: Channel, payload): htlc_id = payload["id"] @@ -2000,7 +2006,6 @@ class Peer(Logger, EventListener): raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}") reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"]) chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason) - self.maybe_send_commitment(chan) def on_update_add_htlc(self, chan: Channel, payload): payment_hash = payload["payment_hash"] @@ -2316,6 +2321,7 @@ class Peer(Logger, EventListener): id=htlc_id, len=len(error_bytes), reason=error_bytes) + self.maybe_send_commitment(chan) def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionParsingError): self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.") @@ -2330,6 +2336,7 @@ class Peer(Logger, EventListener): id=htlc_id, sha256_of_onion=reason.data, failure_code=reason.code) + self.maybe_send_commitment(chan) def on_revoke_and_ack(self, chan: Channel, payload) -> None: self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}') @@ -2341,7 +2348,6 @@ class Peer(Logger, EventListener): rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"]) chan.receive_revocation(rev) self.lnworker.save_channel(chan) - self.maybe_send_commitment(chan) self._received_revack_event.set() self._received_revack_event.clear() diff --git a/tests/test_lnpeer.py b/tests/test_lnpeer.py index af0716adb..897a4b39e 100644 --- a/tests/test_lnpeer.py +++ b/tests/test_lnpeer.py @@ -734,6 +734,8 @@ class TestPeerDirect(TestPeer): # note: we don't start peer.htlc_switch() so that the fake htlcs are left alone. async def f(): p1, p2, w1, w2 = self.prepare_peers(chan_AB, chan_BA) + p1.MIN_TIME_BETWEEN_SENDING_COMMITSIGS = 0 + p2.MIN_TIME_BETWEEN_SENDING_COMMITSIGS = 0 async with OldTaskGroup() as group: await group.spawn(p1._message_loop()) await group.spawn(p2._message_loop()) @@ -753,6 +755,8 @@ class TestPeerDirect(TestPeer): # simulating disconnection. recreate transports. self.logger.info("simulating disconnection. recreating transports.") p1, p2, w1, w2 = self.prepare_peers(chan_AB, chan_BA) + p1.MIN_TIME_BETWEEN_SENDING_COMMITSIGS = 0 + p2.MIN_TIME_BETWEEN_SENDING_COMMITSIGS = 0 for chan in (chan_AB, chan_BA): chan.peer_state = PeerState.DISCONNECTED async with OldTaskGroup() as group: @@ -1607,6 +1611,7 @@ class TestPeerDirect(TestPeer): payment_hash=lnaddr.paymenthash, min_final_cltv_delta=lnaddr.get_min_final_cltv_delta(), payment_secret=lnaddr.payment_secret) + await p2.received_commitsig_event.wait() # alice closes await p1.close_channel(alice_channel.channel_id) gath.cancel()