1. Do not pass request_reply_timeout and request_reply_retry_delay to OnionMessageManager. Arguments passed to a function are useful only if their value might change during the execution of a program. If that is not the case, it is preferable to define them as constants. That makes the code easier to read, because the reader can focus on arguments that are actually relevant. 2 . Multiply all time constants by the same factor, instead of doing so incompletely and on a per parameter basis. (note that before this commit, the speedup factor was not consistent) 3. Do not use util.now(), because it is rounded as integer. With these changes, the tests can effectively run with a 100x speedup
759 lines
32 KiB
Python
759 lines
32 KiB
Python
# Electrum - Lightweight Bitcoin Client
|
|
# Copyright (c) 2023-2024 Thomas Voegtlin
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person
|
|
# obtaining a copy of this software and associated documentation files
|
|
# (the "Software"), to deal in the Software without restriction,
|
|
# including without limitation the rights to use, copy, modify, merge,
|
|
# publish, distribute, sublicense, and/or sell copies of the Software,
|
|
# and to permit persons to whom the Software is furnished to do so,
|
|
# subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
|
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
|
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
|
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
import asyncio
|
|
import copy
|
|
import io
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
from random import random
|
|
|
|
from typing import TYPE_CHECKING, Optional, List, Sequence
|
|
|
|
import electrum_ecc as ecc
|
|
|
|
from electrum.lnrouter import PathEdge
|
|
from electrum.logging import get_logger, Logger
|
|
from electrum.crypto import sha256, get_ecdh
|
|
from electrum.lnmsg import OnionWireSerializer
|
|
from electrum.lnonion import (get_bolt04_onion_key, OnionPacket, process_onion_packet,
|
|
OnionHopsDataSingle, decrypt_onionmsg_data_tlv, encrypt_onionmsg_data_tlv,
|
|
get_shared_secrets_along_route, new_onion_packet)
|
|
from electrum.lnutil import LnFeatures
|
|
from electrum.util import OldTaskGroup
|
|
|
|
# do not use util.now, because it rounds to integers
|
|
def now():
|
|
return time.time()
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from electrum.lnworker import LNWallet
|
|
from electrum.network import Network
|
|
from electrum.lnrouter import NodeInfo
|
|
from electrum.lntransport import LNPeerAddr
|
|
from asyncio import Task
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
REQUEST_REPLY_PATHS_MAX = 3
|
|
|
|
|
|
class NoRouteFound(Exception):
|
|
def __init__(self, *args, peer_address: 'LNPeerAddr' = None):
|
|
Exception.__init__(self, *args)
|
|
self.peer_address = peer_address
|
|
|
|
|
|
def create_blinded_path(session_key: bytes, path: List[bytes], final_recipient_data: dict, *,
|
|
hop_extras: Optional[Sequence[dict]] = None,
|
|
dummy_hops: Optional[int] = 0) -> dict:
|
|
# dummy hops could be inserted anywhere in the path, but for compatibility just add them at the end
|
|
# because blinded paths are usually constructed towards ourselves, and we know we can handle dummy hops.
|
|
if dummy_hops:
|
|
logger.debug(f'adding {dummy_hops} dummy hops at the end')
|
|
path += [path[-1]] * dummy_hops
|
|
|
|
introduction_point = path[0]
|
|
|
|
blinding = ecc.ECPrivkey(session_key).get_public_key_bytes()
|
|
|
|
onionmsg_hops = []
|
|
shared_secrets, blinded_node_ids = get_shared_secrets_along_route(path, session_key)
|
|
for i, node_id in enumerate(path):
|
|
is_non_final_node = i < len(path) - 1
|
|
|
|
if is_non_final_node:
|
|
recipient_data = {
|
|
# TODO: SHOULD add padding data to ensure all encrypted_data_tlv(i) have the same length
|
|
'next_node_id': {'node_id': path[i+1]}
|
|
}
|
|
if hop_extras and i < len(hop_extras): # extra hop data for debugging for now
|
|
recipient_data.update(hop_extras[i])
|
|
else:
|
|
# TODO: SHOULD add padding data to ensure all encrypted_data_tlv(i) have the same length
|
|
recipient_data = final_recipient_data
|
|
|
|
encrypted_recipient_data = encrypt_onionmsg_data_tlv(shared_secret=shared_secrets[i], **recipient_data)
|
|
|
|
hopdata = {
|
|
'blinded_node_id': blinded_node_ids[i],
|
|
'enclen': len(encrypted_recipient_data),
|
|
'encrypted_recipient_data': encrypted_recipient_data
|
|
}
|
|
onionmsg_hops.append(hopdata)
|
|
|
|
blinded_path = {
|
|
'first_node_id': introduction_point,
|
|
'blinding': blinding,
|
|
'num_hops': len(onionmsg_hops),
|
|
'path': onionmsg_hops
|
|
}
|
|
|
|
return blinded_path
|
|
|
|
|
|
def blinding_privkey(privkey, blinding):
|
|
shared_secret = get_ecdh(privkey, blinding)
|
|
b_hmac = get_bolt04_onion_key(b'blinded_node_id', shared_secret)
|
|
b_hmac_int = int.from_bytes(b_hmac, byteorder="big")
|
|
|
|
our_privkey_int = int.from_bytes(privkey, byteorder="big")
|
|
our_privkey_int = our_privkey_int * b_hmac_int % ecc.CURVE_ORDER
|
|
our_privkey = our_privkey_int.to_bytes(32, byteorder="big")
|
|
|
|
return our_privkey
|
|
|
|
|
|
def is_onion_message_node(node_id: bytes, node_info: Optional['NodeInfo']):
|
|
if not node_info:
|
|
return False
|
|
return LnFeatures(node_info.features).supports(LnFeatures.OPTION_ONION_MESSAGE_OPT)
|
|
|
|
|
|
def encrypt_onionmsg_tlv_hops_data(hops_data, hop_shared_secrets):
|
|
"""encrypt unencrypted onionmsg_tlv.encrypted_recipient_data for hops with blind_fields"""
|
|
num_hops = len(hops_data)
|
|
for i in range(num_hops):
|
|
if hops_data[i].tlv_stream_name == 'onionmsg_tlv' and 'encrypted_recipient_data' not in hops_data[i].payload:
|
|
# construct encrypted_recipient_data from blind_fields
|
|
encrypted_recipient_data = encrypt_onionmsg_data_tlv(shared_secret=hop_shared_secrets[i], **hops_data[i].blind_fields)
|
|
hops_data[i].payload['encrypted_recipient_data'] = {'encrypted_recipient_data': encrypted_recipient_data}
|
|
|
|
|
|
def create_onion_message_route_to(lnwallet: 'LNWallet', node_id: bytes) -> List[PathEdge]:
|
|
"""Constructs a route to the destination node_id, first by starting with peers with existing channels,
|
|
and if no route found, opening a direct peer connection if node_id is found with an address in
|
|
channel_db."""
|
|
# TODO: is this the proper way to set up my_sending_channels?
|
|
my_active_channels = [
|
|
chan for chan in lnwallet.channels.values() if
|
|
chan.is_active() and not chan.is_frozen_for_sending()]
|
|
my_sending_channels = {chan.short_channel_id: chan for chan in my_active_channels
|
|
if chan.short_channel_id is not None}
|
|
# find route to introduction point over existing channel mesh
|
|
# NOTE: nodes that are in channel_db but are offline are not removed from the set
|
|
if lnwallet.network.path_finder:
|
|
if path := lnwallet.network.path_finder.find_path_for_payment(
|
|
nodeA=lnwallet.node_keypair.pubkey,
|
|
nodeB=node_id,
|
|
invoice_amount_msat=10000, # TODO: do this without amount constraints
|
|
node_filter=lambda x, y: True if x == lnwallet.node_keypair.pubkey else is_onion_message_node(x, y),
|
|
my_sending_channels=my_sending_channels
|
|
): return path
|
|
|
|
# alt: dest is existing peer?
|
|
if lnwallet.peers.get(node_id):
|
|
return [PathEdge(short_channel_id=None, start_node=None, end_node=node_id)]
|
|
|
|
# if we have an address, pass it.
|
|
if lnwallet.channel_db:
|
|
if peer_addr := lnwallet.channel_db.get_last_good_address(node_id):
|
|
raise NoRouteFound('no path found, peer_addr available', peer_address=peer_addr)
|
|
|
|
raise NoRouteFound('no path found')
|
|
|
|
|
|
def send_onion_message_to(lnwallet: 'LNWallet', node_id_or_blinded_path: bytes, destination_payload: dict, session_key: bytes = None):
|
|
if session_key is None:
|
|
session_key = os.urandom(32)
|
|
|
|
if len(node_id_or_blinded_path) > 33: # assume blinded path
|
|
with io.BytesIO(node_id_or_blinded_path) as blinded_path_fd:
|
|
try:
|
|
blinded_path = OnionWireSerializer._read_complex_field(
|
|
fd=blinded_path_fd,
|
|
field_type='blinded_path',
|
|
count=1)
|
|
logger.debug(f'blinded path: {blinded_path!r}')
|
|
except Exception as e:
|
|
logger.error(f'e!r')
|
|
raise
|
|
|
|
introduction_point = blinded_path['first_node_id']
|
|
|
|
hops_data = []
|
|
blinded_node_ids = []
|
|
|
|
if lnwallet.node_keypair.pubkey == introduction_point:
|
|
# blinded path introduction point is me
|
|
our_blinding = blinded_path['blinding']
|
|
our_payload = blinded_path['path'][0]
|
|
remaining_blinded_path = blinded_path['path'][1:]
|
|
assert len(remaining_blinded_path) > 0, 'sending to myself?'
|
|
|
|
# decrypt
|
|
shared_secret = get_ecdh(lnwallet.node_keypair.privkey, our_blinding)
|
|
recipient_data = decrypt_onionmsg_data_tlv(
|
|
shared_secret=shared_secret,
|
|
encrypted_recipient_data=our_payload['encrypted_recipient_data']
|
|
)
|
|
|
|
peer = lnwallet.peers.get(recipient_data['next_node_id']['node_id'])
|
|
assert peer, 'next_node_id not a peer'
|
|
|
|
# blinding override?
|
|
next_blinding_override = recipient_data.get('next_blinding_override')
|
|
if next_blinding_override:
|
|
next_blinding = next_blinding_override.get('blinding')
|
|
else:
|
|
# E_i+1=SHA256(E_i||ss_i) * E_i
|
|
blinding_factor = sha256(our_blinding + shared_secret)
|
|
blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
|
|
next_public_key_int = ecc.ECPubkey(our_blinding) * blinding_factor_int
|
|
next_blinding = next_public_key_int.get_public_key_bytes()
|
|
|
|
blinding = next_blinding
|
|
|
|
else:
|
|
# we need a route to introduction point
|
|
remaining_blinded_path = blinded_path['path']
|
|
peer = lnwallet.peers.get(introduction_point)
|
|
# if blinded path introduction point is our direct peer, no need to route-find
|
|
if peer:
|
|
# start of blinded path is our peer
|
|
blinding = blinded_path['blinding']
|
|
else:
|
|
path = create_onion_message_route_to(lnwallet, introduction_point)
|
|
|
|
# first edge must be to our peer
|
|
peer = lnwallet.peers.get(path[0].end_node)
|
|
assert peer, 'first hop not a peer'
|
|
|
|
# last edge is to introduction point and start of blinded path. remove from route
|
|
assert path[-1].end_node == introduction_point, 'last hop in route must be introduction point'
|
|
|
|
path = path[:-1]
|
|
|
|
if len(path) == 0:
|
|
blinding = blinded_path['blinding']
|
|
else:
|
|
payment_path_pubkeys = [edge.end_node for edge in path]
|
|
hop_shared_secrets, blinded_node_ids = get_shared_secrets_along_route(
|
|
payment_path_pubkeys,
|
|
session_key)
|
|
|
|
hops_data = [
|
|
OnionHopsDataSingle(
|
|
tlv_stream_name='onionmsg_tlv',
|
|
blind_fields={'next_node_id': {'node_id': x.end_node}}
|
|
) for x in path[:-1]
|
|
]
|
|
|
|
# final hop pre-ip, add next_blinding_override
|
|
final_hop_pre_ip = OnionHopsDataSingle(
|
|
tlv_stream_name='onionmsg_tlv',
|
|
blind_fields={
|
|
'next_node_id': {'node_id': introduction_point},
|
|
'next_blinding_override': {'blinding': blinded_path['blinding']},
|
|
}
|
|
)
|
|
hops_data.append(final_hop_pre_ip)
|
|
|
|
# encrypt encrypted_data_tlv here
|
|
for i in range(len(hops_data)):
|
|
encrypted_recipient_data = encrypt_onionmsg_data_tlv(
|
|
shared_secret=hop_shared_secrets[i],
|
|
**hops_data[i].blind_fields)
|
|
hops_data[i].payload['encrypted_recipient_data'] = {
|
|
'encrypted_recipient_data': encrypted_recipient_data
|
|
}
|
|
|
|
blinding = ecc.ECPrivkey(session_key).get_public_key_bytes()
|
|
|
|
# append (remaining) blinded path and payload
|
|
blinded_path_blinded_ids = []
|
|
for i, onionmsg_hop in enumerate(remaining_blinded_path):
|
|
blinded_path_blinded_ids.append(onionmsg_hop.get('blinded_node_id'))
|
|
payload = {
|
|
'encrypted_recipient_data': {'encrypted_recipient_data': onionmsg_hop['encrypted_recipient_data']}
|
|
}
|
|
if i == len(remaining_blinded_path) - 1: # final hop
|
|
payload.update(destination_payload)
|
|
hop = OnionHopsDataSingle(tlv_stream_name='onionmsg_tlv', payload=payload)
|
|
hops_data.append(hop)
|
|
|
|
payment_path_pubkeys = blinded_node_ids + blinded_path_blinded_ids
|
|
hop_shared_secrets, _ = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
|
|
encrypt_onionmsg_tlv_hops_data(hops_data, hop_shared_secrets)
|
|
packet = new_onion_packet(payment_path_pubkeys, session_key, hops_data)
|
|
packet_b = packet.to_bytes()
|
|
|
|
else: # node pubkey
|
|
pubkey = node_id_or_blinded_path
|
|
|
|
if lnwallet.node_keypair.pubkey == pubkey:
|
|
raise Exception('cannot send to myself')
|
|
|
|
hops_data = []
|
|
peer = lnwallet.peers.get(pubkey)
|
|
|
|
if peer:
|
|
# destination is our direct peer, no need to route-find
|
|
path = [PathEdge(short_channel_id=None, start_node=None, end_node=pubkey)]
|
|
else:
|
|
path = create_onion_message_route_to(lnwallet, pubkey)
|
|
|
|
# first edge must be to our peer
|
|
peer = lnwallet.peers.get(path[0].end_node)
|
|
assert peer, 'first hop not a peer'
|
|
|
|
hops_data = [
|
|
OnionHopsDataSingle(
|
|
tlv_stream_name='onionmsg_tlv',
|
|
blind_fields={'next_node_id': {'node_id': x.end_node}}
|
|
) for x in path[1:]
|
|
]
|
|
|
|
final_hop = OnionHopsDataSingle(
|
|
tlv_stream_name='onionmsg_tlv',
|
|
payload=destination_payload
|
|
)
|
|
|
|
hops_data.append(final_hop)
|
|
|
|
payment_path_pubkeys = [edge.end_node for edge in path]
|
|
|
|
hop_shared_secrets, blinded_node_ids = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
|
|
encrypt_onionmsg_tlv_hops_data(hops_data, hop_shared_secrets)
|
|
packet = new_onion_packet(blinded_node_ids, session_key, hops_data)
|
|
packet_b = packet.to_bytes()
|
|
|
|
blinding = ecc.ECPrivkey(session_key).get_public_key_bytes()
|
|
|
|
peer.send_message(
|
|
"onion_message",
|
|
blinding=blinding,
|
|
len=len(packet_b),
|
|
onion_message_packet=packet_b
|
|
)
|
|
|
|
|
|
def get_blinded_reply_paths(lnwallet: 'LNWallet', path_id: bytes, *,
|
|
max_paths: int = REQUEST_REPLY_PATHS_MAX,
|
|
preferred_node_id: bytes = None) -> List[dict]:
|
|
# TODO: build longer paths and/or add dummy hops to increase privacy
|
|
my_active_channels = [chan for chan in lnwallet.channels.values() if chan.is_active()]
|
|
my_onionmsg_channels = [chan for chan in my_active_channels if lnwallet.peers.get(chan.node_id) and
|
|
lnwallet.peers.get(chan.node_id).their_features.supports(LnFeatures.OPTION_ONION_MESSAGE_OPT)]
|
|
my_onionmsg_peers = [peer for peer in lnwallet.peers.values() if peer.their_features.supports(LnFeatures.OPTION_ONION_MESSAGE_OPT)]
|
|
|
|
result = []
|
|
mynodeid = lnwallet.node_keypair.pubkey
|
|
mydata = {'path_id': {'data': path_id}} # same used in every path
|
|
if len(my_onionmsg_channels):
|
|
# randomize list, but prefer preferred_node_id
|
|
rchans = sorted(my_onionmsg_channels, key=lambda x: random() if x.node_id != preferred_node_id else 0)
|
|
for chan in rchans[:max_paths]:
|
|
blinded_path = create_blinded_path(os.urandom(32), [chan.node_id, mynodeid], mydata)
|
|
result.append(blinded_path)
|
|
elif len(my_onionmsg_peers):
|
|
# randomize list, but prefer preferred_node_id
|
|
rpeers = sorted(my_onionmsg_peers, key=lambda x: random() if x.pubkey != preferred_node_id else 0)
|
|
for peer in rpeers[:max_paths]:
|
|
blinded_path = create_blinded_path(os.urandom(32), [peer.pubkey, mynodeid], mydata)
|
|
result.append(blinded_path)
|
|
|
|
return result
|
|
|
|
|
|
class Timeout(Exception): pass
|
|
|
|
|
|
class OnionMessageManager(Logger):
|
|
"""handle state around onion message sends and receives
|
|
- association between onion message and their replies
|
|
- manage re-send attempts, TODO: iterate through routes (both directions)"""
|
|
|
|
SLEEP_DELAY = 1
|
|
REQUEST_REPLY_TIMEOUT = 30
|
|
REQUEST_REPLY_RETRY_DELAY = 5
|
|
FORWARD_RETRY_TIMEOUT = 4
|
|
FORWARD_RETRY_DELAY = 2
|
|
FORWARD_MAX_QUEUE = 3
|
|
|
|
def __init__(self, lnwallet: 'LNWallet'):
|
|
Logger.__init__(self)
|
|
self.network = None # type: Optional['Network']
|
|
self.taskgroup = None # type: OldTaskGroup
|
|
self.lnwallet = lnwallet
|
|
|
|
self.pending = {}
|
|
self.pending_lock = threading.Lock()
|
|
self.requestreply_queue = queue.PriorityQueue()
|
|
self.requestreply_queue_notempty = asyncio.Event()
|
|
self.forwardqueue = queue.PriorityQueue()
|
|
self.forwardqueue_notempty = asyncio.Event()
|
|
|
|
def start_network(self, *, network: 'Network'):
|
|
assert network
|
|
assert self.network is None, "already started"
|
|
self.network = network
|
|
self.taskgroup = OldTaskGroup()
|
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
|
|
|
async def main_loop(self):
|
|
self.logger.info("starting taskgroup.")
|
|
try:
|
|
async with self.taskgroup as group:
|
|
await group.spawn(self.process_request_reply_queue())
|
|
await group.spawn(self.process_forward_queue())
|
|
except Exception as e:
|
|
self.logger.exception("taskgroup died.")
|
|
else:
|
|
self.logger.info("taskgroup stopped.")
|
|
|
|
async def stop(self):
|
|
await self.taskgroup.cancel_remaining()
|
|
|
|
async def process_forward_queue(self):
|
|
while True:
|
|
try:
|
|
scheduled, expires, onion_packet, blinding, node_id = self.forwardqueue.get_nowait()
|
|
except queue.Empty:
|
|
self.logger.info(f'forward queue empty')
|
|
self.forwardqueue_notempty.clear()
|
|
await self.forwardqueue_notempty.wait()
|
|
continue
|
|
|
|
if expires <= now():
|
|
self.logger.debug(f'forward expired {node_id=}')
|
|
continue
|
|
if scheduled > now():
|
|
# return to queue
|
|
self.forwardqueue.put_nowait((scheduled, expires, onion_packet, blinding, node_id))
|
|
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
|
|
continue
|
|
|
|
try:
|
|
onion_packet_b = onion_packet.to_bytes()
|
|
next_peer = self.lnwallet.peers.get(node_id)
|
|
|
|
next_peer.send_message(
|
|
"onion_message",
|
|
blinding=blinding,
|
|
len=len(onion_packet_b),
|
|
onion_message_packet=onion_packet_b
|
|
)
|
|
except BaseException as e:
|
|
self.logger.debug(f'error while sending {node_id=} e={e!r}')
|
|
self.forwardqueue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id))
|
|
|
|
def submit_forward(
|
|
self, *,
|
|
onion_packet: OnionPacket,
|
|
blinding: bytes,
|
|
node_id: bytes):
|
|
if self.forwardqueue.qsize() >= self.FORWARD_MAX_QUEUE:
|
|
self.logger.debug('forward queue full, dropping packet')
|
|
return
|
|
expires = now() + self.FORWARD_RETRY_TIMEOUT
|
|
queueitem = (now(), expires, onion_packet, blinding, node_id)
|
|
self.forwardqueue.put_nowait(queueitem)
|
|
self.forwardqueue_notempty.set()
|
|
|
|
async def process_request_reply_queue(self):
|
|
while True:
|
|
try:
|
|
scheduled, expires, key = self.requestreply_queue.get_nowait()
|
|
except queue.Empty:
|
|
self.logger.info(f'requestreply queue empty')
|
|
self.requestreply_queue_notempty.clear()
|
|
try:
|
|
self.requestreply_queue_notempty.clear()
|
|
await self.requestreply_queue_notempty.wait() # NOTE: quirk, see note below
|
|
except Exception as e:
|
|
self.logger.info(f'Exception e={e!r}')
|
|
continue
|
|
|
|
requestreply = self.get_requestreply(key)
|
|
if requestreply is None:
|
|
self.logger.debug(f'no data for key {key=}')
|
|
continue
|
|
if requestreply.get('result') is not None:
|
|
self.logger.debug(f'has result! {key=}')
|
|
continue
|
|
if expires <= now():
|
|
self.logger.debug(f'expired {key=}')
|
|
self._set_requestreply_result(key, Timeout())
|
|
continue
|
|
if scheduled > now():
|
|
# return to queue
|
|
self.logger.debug(f'return to queue {key=}, {scheduled - now()}')
|
|
self.requestreply_queue.put_nowait((scheduled, expires, key))
|
|
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
|
|
continue
|
|
|
|
try:
|
|
self._send_pending_requestreply(key)
|
|
except BaseException as e:
|
|
self.logger.debug(f'error while sending {key=} {e!r}')
|
|
self._set_requestreply_result(key, copy.copy(e))
|
|
# NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in
|
|
# queue_notempty.wait() later (??). pass a copy instead.
|
|
if isinstance(e, NoRouteFound) and e.peer_address:
|
|
await self.lnwallet.add_peer(str(e.peer_address))
|
|
else:
|
|
self.logger.debug(f'resubmit {key=}')
|
|
self.requestreply_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key))
|
|
|
|
def get_requestreply(self, key):
|
|
with self.pending_lock:
|
|
return self.pending.get(key)
|
|
|
|
def _set_requestreply_result(self, key, result):
|
|
with self.pending_lock:
|
|
requestreply = self.pending.get(key)
|
|
if requestreply is None:
|
|
self.logger.error(f'requestreply with {key=} not found!')
|
|
return
|
|
self.pending[key]['result'] = result
|
|
requestreply['ev'].set()
|
|
|
|
def _remove_requestreply(self, key):
|
|
with self.pending_lock:
|
|
requestreply = self.pending.get(key)
|
|
if requestreply is None:
|
|
return
|
|
requestreply['ev'].set()
|
|
del self.pending[key]
|
|
|
|
def submit_requestreply(
|
|
self, *,
|
|
payload: dict,
|
|
node_id_or_blinded_path: bytes,
|
|
key: bytes = None) -> 'Task':
|
|
"""Add onion message to queue for sending. Queued onion message payloads
|
|
are supplied with a path_id and a reply_path to determine which request
|
|
corresponds with arriving replies.
|
|
|
|
If caller has provided 'reply_path' in payload, caller should also provide associating key.
|
|
|
|
:return: returns awaitable task"""
|
|
if not key:
|
|
key = os.urandom(8)
|
|
assert type(key) is bytes and len(key) >= 8
|
|
|
|
self.logger.debug(f'submit_requestreply {key=} {payload=} {node_id_or_blinded_path=}')
|
|
|
|
with self.pending_lock:
|
|
if key in self.pending:
|
|
raise Exception(f'{key=} already exists!')
|
|
self.pending[key] = {
|
|
'ev': asyncio.Event(),
|
|
'payload': payload,
|
|
'node_id_or_blinded_path': node_id_or_blinded_path
|
|
}
|
|
|
|
# tuple = (when to process, when it expires, key)
|
|
expires = now() + self.REQUEST_REPLY_TIMEOUT
|
|
queueitem = (now(), expires, key)
|
|
self.requestreply_queue.put_nowait(queueitem)
|
|
task = asyncio.create_task(self._requestreply_task(key))
|
|
self.requestreply_queue_notempty.set()
|
|
return task
|
|
|
|
async def _requestreply_task(self, key):
|
|
requestreply = self.get_requestreply(key)
|
|
assert requestreply
|
|
if requestreply is None:
|
|
return
|
|
try:
|
|
self.logger.debug(f'wait task start {key}')
|
|
await requestreply['ev'].wait()
|
|
finally:
|
|
self.logger.debug(f'wait task end {key}')
|
|
|
|
try:
|
|
requestreply = self.get_requestreply(key)
|
|
assert requestreply
|
|
result = requestreply.get('result')
|
|
if isinstance(result, Exception):
|
|
raise result # raising in the task requires caller to explicitly extract exception.
|
|
return result
|
|
finally:
|
|
self._remove_requestreply(key)
|
|
|
|
def _send_pending_requestreply(self, key):
|
|
"""adds reply_path to payload"""
|
|
data = self.get_requestreply(key)
|
|
payload = data.get('payload')
|
|
node_id_or_blinded_path = data.get('node_id_or_blinded_path')
|
|
self.logger.debug(f'send_requestreply {key=} {payload=} {node_id_or_blinded_path=}')
|
|
|
|
final_payload = copy.deepcopy(payload)
|
|
|
|
if 'reply_path' not in final_payload:
|
|
# unless explicitly set in payload, generate reply_path here
|
|
path_id = self._path_id_from_payload_and_key(payload, key)
|
|
reply_paths = get_blinded_reply_paths(self.lnwallet, path_id, max_paths=1)
|
|
if not reply_paths:
|
|
raise Exception(f'Could not create a reply_path for {key=}')
|
|
|
|
final_payload['reply_path'] = {'path': reply_paths}
|
|
|
|
# TODO: we should try alternate paths when retrying, this is currently not done.
|
|
# (send_onion_message_to decides path, without knowledge of prev attempts)
|
|
send_onion_message_to(self.lnwallet, node_id_or_blinded_path, final_payload)
|
|
|
|
def _path_id_from_payload_and_key(self, payload: dict, key: bytes) -> bytes:
|
|
# TODO: use payload to determine prefix?
|
|
return b'electrum' + key
|
|
|
|
def on_onion_message_received(self, recipient_data, payload):
|
|
# we are destination, sanity checks
|
|
# - if `encrypted_data_tlv` contains `allowed_features`:
|
|
# - MUST ignore the message if:
|
|
# - `encrypted_data_tlv.allowed_features.features` contains an unknown feature bit (even if it is odd).
|
|
# - the message uses a feature not included in `encrypted_data_tlv.allowed_features.features`.
|
|
if 'allowed_features' in recipient_data:
|
|
pass # TODO
|
|
|
|
# - if `path_id` is set and corresponds to a path the reader has previously published in a `reply_path`:
|
|
# - if the onion message is not a reply to that previous onion:
|
|
# - MUST ignore the onion message
|
|
# TODO: store path_id and lookup here
|
|
if 'path_id' not in recipient_data:
|
|
# unsolicited onion_message
|
|
self.on_onion_message_received_unsolicited(recipient_data, payload)
|
|
else:
|
|
self.on_onion_message_received_reply(recipient_data, payload)
|
|
|
|
def on_onion_message_received_reply(self, recipient_data, payload):
|
|
# check if this reply is associated with a known request
|
|
correl_data = recipient_data['path_id'].get('data')
|
|
if not correl_data[:8] == b'electrum':
|
|
self.logger.warning('not a reply to our request (unknown path_id prefix)')
|
|
return
|
|
key = correl_data[8:]
|
|
requestreply = self.get_requestreply(key)
|
|
if requestreply is None:
|
|
self.logger.warning('not a reply to our request (unknown request)')
|
|
return
|
|
|
|
self._set_requestreply_result(key, (recipient_data, payload))
|
|
|
|
def on_onion_message_received_unsolicited(self, recipient_data, payload):
|
|
self.logger.debug('unsolicited onion_message received')
|
|
self.logger.debug(f'payload: {payload!r}')
|
|
|
|
# TODO: currently only accepts simple text 'message' payload.
|
|
|
|
if 'message' not in payload:
|
|
self.logger.error('Unsupported onion message payload')
|
|
return
|
|
|
|
if 'text' not in payload['message'] or not isinstance(payload['message']['text'], bytes):
|
|
self.logger.error('Malformed \'message\' payload')
|
|
return
|
|
|
|
try:
|
|
text = payload['message']['text'].decode('utf-8')
|
|
except Exception as e:
|
|
self.logger.error(f'Malformed \'message\' payload: {e!r}')
|
|
return
|
|
|
|
self.logger.info(f'onion message with text received: {text}')
|
|
|
|
def on_onion_message_forward(self, recipient_data, onion_packet, blinding, shared_secret):
|
|
if recipient_data.get('path_id'):
|
|
self.logger.error('cannot forward onion_message, path_id in encrypted_data_tlv')
|
|
return
|
|
|
|
next_node_id = recipient_data.get('next_node_id')
|
|
if not next_node_id:
|
|
self.logger.error('cannot forward onion_message, next_node_id missing in encrypted_data_tlv')
|
|
return
|
|
next_node_id = next_node_id['node_id']
|
|
|
|
is_dummy_hop = False
|
|
if next_node_id == self.lnwallet.node_keypair.pubkey:
|
|
self.logger.debug('dummy hop')
|
|
is_dummy_hop = True
|
|
else:
|
|
# is next_node one of our peers?
|
|
next_peer = self.lnwallet.peers.get(next_node_id)
|
|
if not next_peer:
|
|
self.logger.info(f'next node {next_node_id.hex()} not a peer, dropping message')
|
|
return
|
|
|
|
# blinding override?
|
|
next_blinding_override = recipient_data.get('next_blinding_override')
|
|
if next_blinding_override:
|
|
next_blinding = next_blinding_override.get('blinding')
|
|
else:
|
|
# E_i+1=SHA256(E_i||ss_i) * E_i
|
|
blinding_factor = sha256(blinding + shared_secret)
|
|
blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
|
|
next_public_key_int = ecc.ECPubkey(blinding) * blinding_factor_int
|
|
next_blinding = next_public_key_int.get_public_key_bytes()
|
|
|
|
if is_dummy_hop:
|
|
self.process_onion_message_packet(next_blinding, onion_packet)
|
|
return
|
|
|
|
self.submit_forward(onion_packet=onion_packet, blinding=next_blinding, node_id=next_node_id)
|
|
|
|
def on_onion_message(self, payload):
|
|
blinding = payload.get('blinding')
|
|
if not blinding:
|
|
self.logger.error('missing blinding')
|
|
return
|
|
packet = payload.get('onion_message_packet')
|
|
if payload.get('len', 0) != len(packet):
|
|
self.logger.error('invalid/missing length')
|
|
return
|
|
|
|
self.logger.debug('handling onion message')
|
|
|
|
onion_packet = OnionPacket.from_bytes(packet)
|
|
self.process_onion_message_packet(blinding, onion_packet)
|
|
|
|
def process_onion_message_packet(self, blinding: bytes, onion_packet: OnionPacket):
|
|
our_privkey = blinding_privkey(self.lnwallet.node_keypair.privkey, blinding)
|
|
processed_onion_packet = process_onion_packet(onion_packet, our_privkey, tlv_stream_name='onionmsg_tlv')
|
|
payload = processed_onion_packet.hop_data.payload
|
|
|
|
self.logger.debug(f'onion peeled: {processed_onion_packet!r}')
|
|
|
|
if not processed_onion_packet.are_we_final:
|
|
if any([x not in ['encrypted_recipient_data'] for x in payload.keys()]):
|
|
self.logger.error('unexpected data in payload') # non-final nodes only encrypted_recipient_data
|
|
return
|
|
|
|
# decrypt
|
|
shared_secret = get_ecdh(self.lnwallet.node_keypair.privkey, blinding)
|
|
recipient_data = decrypt_onionmsg_data_tlv(
|
|
shared_secret=shared_secret,
|
|
encrypted_recipient_data=payload['encrypted_recipient_data']['encrypted_recipient_data']
|
|
)
|
|
|
|
self.logger.debug(f'parsed recipient_data: {recipient_data!r}')
|
|
|
|
if processed_onion_packet.are_we_final:
|
|
self.on_onion_message_received(recipient_data, payload)
|
|
else:
|
|
self.on_onion_message_forward(recipient_data, processed_onion_packet.next_packet, blinding, shared_secret)
|