fix: replace put_nowait+sleep polling with call_later in onion_message queues

The send_queue and forward_queue loops were doing put_nowait + sleep(SLEEP_DELAY)
+ get() to re-schedule not-yet-due messages. Under asyncio scheduler pressure the
get() can stall after the item was already put back, causing test_request_and_reply
to time out. Replaced with call_later(remaining, queue.put_nowait, item) which
schedules the re-insertion at the exact due time without any polling.
This commit is contained in:
2026-05-05 09:42:10 +02:00
parent 7d433d0b44
commit 9a93bfda83
+9 -7
View File
@@ -562,9 +562,9 @@ class OnionMessageManager(Logger):
self.logger.debug(f'forward expired {node_id=}')
continue
if scheduled > now():
# return to queue
self.forward_queue.put_nowait((scheduled, expires, onion_packet, blinding, node_id))
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
remaining = max(0.0, scheduled - now())
item = (scheduled, expires, onion_packet, blinding, node_id)
asyncio.get_running_loop().call_later(remaining, self.forward_queue.put_nowait, item)
continue
try:
@@ -613,10 +613,12 @@ class OnionMessageManager(Logger):
req.future.set_exception(Timeout())
continue
if scheduled > now():
# return to queue
self.logger.debug(f'return to queue {key=}, {scheduled - now()}')
self.send_queue.put_nowait((scheduled, expires, key))
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
remaining = max(0.0, scheduled - now())
self.logger.debug(f'return to queue {key=}, {remaining}')
# Schedule the item to be re-added to the queue when it's due.
# Using call_later avoids a busy-poll loop (put_nowait + sleep + get)
# that can stall under asyncio scheduler pressure.
asyncio.get_running_loop().call_later(remaining, self.send_queue.put_nowait, (scheduled, expires, key))
continue
try:
self._send_pending_message(key)