Files
purple-electrumwallet/electrum/onion_message.py
T

861 lines
38 KiB
Python

# Electrum - Lightweight Bitcoin Client
# Copyright (c) 2023-2024 Thomas Voegtlin
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import copy
import io
import os
import threading
import time
from typing import TYPE_CHECKING, Optional, Sequence, NamedTuple, Tuple, Union
import electrum_ecc as ecc
from electrum.channel_db import get_mychannel_policy
from electrum.lnrouter import PathEdge
from electrum.logging import get_logger, Logger
from electrum.crypto import sha256, get_ecdh
from electrum.lnmsg import OnionWireSerializer
from electrum.lnonion import (get_bolt04_onion_key, OnionPacket, process_onion_packet, blinding_privkey,
OnionHopsDataSingle, decrypt_onionmsg_data_tlv, encrypt_onionmsg_data_tlv,
get_shared_secrets_along_route, new_onion_packet, encrypt_hops_recipient_data)
from electrum.lnutil import LnFeatures, MIN_FINAL_CLTV_DELTA_ACCEPTED, MAXIMUM_REMOTE_TO_SELF_DELAY_ACCEPTED
from electrum.util import OldTaskGroup, log_exceptions, random_shuffled_copy
def now():
return time.time()
if TYPE_CHECKING:
from electrum.lnworker import LNWallet
from electrum.network import Network
from electrum.lnrouter import NodeInfo
from electrum.lntransport import LNPeerAddr
from electrum.lnchannel import Channel
from asyncio import Task
logger = get_logger(__name__)
REQUEST_REPLY_PATHS_MAX = 3
PAYMENT_PATHS_MAX = 3
class NoRouteFound(Exception):
def __init__(self, *args, peer_address: 'LNPeerAddr' = None):
Exception.__init__(self, *args)
self.peer_address = peer_address
def create_blinded_path(
session_key: bytes,
path: Sequence[bytes],
final_recipient_data: dict,
*,
hop_extras: Optional[Sequence[dict]] = None,
dummy_hops: Optional[int] = 0,
channels: Optional[Sequence['Channel']] = None,
) -> dict:
# dummy hops could be inserted anywhere in the path, but for compatibility just add them at the end
# because blinded paths are usually constructed towards ourselves, and we know we can handle dummy hops.
if dummy_hops:
logger.debug(f'adding {dummy_hops} dummy hops at the end')
path += [path[-1]] * dummy_hops
introduction_point = path[0]
blinding = ecc.ECPrivkey(session_key).get_public_key_bytes()
onionmsg_hops = []
shared_secrets, blinded_node_ids = get_shared_secrets_along_route(path, session_key)
for i, node_id in enumerate(path):
is_non_final_node = i < len(path) - 1
if is_non_final_node:
# spec: alt: short_channel_id instead of next_node_id
if channels: # use short_channel_id for payments
scid = channels[i].get_remote_scid_alias() or channels[i].short_channel_id
recipient_data = {
# TODO: SHOULD add padding data to ensure all encrypted_data_tlv(i) have the same length
'short_channel_id': {'short_channel_id': scid}
}
else:
recipient_data = {
# TODO: SHOULD add padding data to ensure all encrypted_data_tlv(i) have the same length
'next_node_id': {'node_id': path[i+1]}
}
if hop_extras and i < len(hop_extras): # extra hop data for debugging for now
recipient_data.update(hop_extras[i])
else:
# TODO: SHOULD add padding data to ensure all encrypted_data_tlv(i) have the same length
recipient_data = final_recipient_data
encrypted_recipient_data = encrypt_onionmsg_data_tlv(shared_secret=shared_secrets[i], **recipient_data)
hopdata = {
'blinded_node_id': blinded_node_ids[i],
'enclen': len(encrypted_recipient_data),
'encrypted_recipient_data': encrypted_recipient_data
}
onionmsg_hops.append(hopdata)
blinded_path = {
'first_node_id': introduction_point,
'first_path_key': blinding,
'num_hops': bytes([len(onionmsg_hops)]),
'path': onionmsg_hops
}
return blinded_path
def encode_blinded_path(blinded_path: dict):
with io.BytesIO() as blinded_path_fd:
OnionWireSerializer.write_field(
fd=blinded_path_fd,
field_type='blinded_path',
count=1,
value=blinded_path)
return blinded_path_fd.getvalue()
def is_onion_message_node(node_id: bytes, node_info: Optional['NodeInfo']) -> bool:
if not node_info:
return False
return LnFeatures(node_info.features).supports(LnFeatures.OPTION_ONION_MESSAGE_OPT)
def create_onion_message_route_to(lnwallet: 'LNWallet', node_id: bytes) -> Sequence[PathEdge]:
"""Constructs a route to the destination node_id, first by starting with peers with existing channels,
and if no route found, opening a direct peer connection if node_id is found with an address in
channel_db."""
# TODO: is this the proper way to set up my_sending_channels?
my_active_channels = [
chan for chan in lnwallet.channels.values() if
chan.is_active() and not chan.is_frozen_for_sending()]
my_sending_channels = {chan.short_channel_id: chan for chan in my_active_channels
if chan.short_channel_id is not None}
# find route to introduction point over existing channel mesh
# NOTE: nodes that are in channel_db but are offline are not removed from the set
if lnwallet.network.path_finder:
if path := lnwallet.network.path_finder.find_path_for_payment(
nodeA=lnwallet.node_keypair.pubkey,
nodeB=node_id,
invoice_amount_msat=10000, # TODO: do this without amount constraints
node_filter=lambda x, y: True if x == lnwallet.node_keypair.pubkey else is_onion_message_node(x, y),
my_sending_channels=my_sending_channels
):
# first edge must be to our peer
peer = lnwallet.lnpeermgr.get_peer_by_pubkey(path[0].end_node)
assert peer, 'first hop not a peer'
return path
# alt: dest is existing peer?
if lnwallet.lnpeermgr.get_peer_by_pubkey(node_id):
return [PathEdge(short_channel_id=None, start_node=None, end_node=node_id)]
# if we have an address, pass it.
if lnwallet.channel_db:
if peer_addr := lnwallet.channel_db.get_last_good_address(node_id):
raise NoRouteFound('no path found, peer_addr available', peer_address=peer_addr)
raise NoRouteFound('no path found')
def create_route_to_introduction_point(
lnwallet: 'LNWallet',
blinded_path: dict,
introduction_point: bytes,
session_key: bytes
):
hops_data = []
blinded_node_ids = []
peer = lnwallet.lnpeermgr.get_peer_by_pubkey(introduction_point)
# if blinded path introduction point is our direct peer, no need to route-find
if peer:
# start of blinded path is our peer
path_key = blinded_path['first_path_key']
return peer, path_key, hops_data, blinded_node_ids
path = create_onion_message_route_to(lnwallet, introduction_point)
# last edge is to introduction point and start of blinded path. remove from route
assert path[-1].end_node == introduction_point, 'last hop in route must be introduction point'
assert len(path) > 1, "if we are directly connected to the IP, why didn't we return the peer above?"
peer = lnwallet.lnpeermgr.get_peer_by_pubkey(path[0].end_node)
assert peer, "first hop is not a peer"
# rm last hop (ip-1 -> ip) as it is added explicitly from the blinded path we got (final_hop_pre_ip)
path = path[:-1]
# we construct a route to the introduction point
payment_path_pubkeys = [edge.end_node for edge in path]
hop_shared_secrets, blinded_node_ids = get_shared_secrets_along_route(
payment_path_pubkeys,
session_key)
# exclude first hop (us to first node on path): we don't need to a layer for ourselves
for edge in path[1:]:
hop = OnionHopsDataSingle(
tlv_stream_name='onionmsg_tlv',
blind_fields={'next_node_id': {'node_id': edge.end_node}},
)
hops_data.append(hop)
# final hop pre-ip, add next_path_key_override
final_hop_pre_ip = OnionHopsDataSingle(
tlv_stream_name='onionmsg_tlv',
blind_fields={
'next_node_id': {'node_id': introduction_point},
'next_path_key_override': {'path_key': blinded_path['first_path_key']},
},
)
hops_data.append(final_hop_pre_ip)
encrypt_hops_recipient_data(tlv_stream_name='onionmsg_tlv', hops_data=hops_data, hop_shared_secrets=hop_shared_secrets)
path_key = ecc.ECPrivkey(session_key).get_public_key_bytes()
return peer, path_key, hops_data, blinded_node_ids
def send_onion_message_to(
lnwallet: 'LNWallet',
node_id_or_blinded_path: bytes,
destination_payload: dict,
session_key: bytes = None
) -> None:
if session_key is None:
session_key = os.urandom(32)
if len(node_id_or_blinded_path) > 33: # assume blinded path
with io.BytesIO(node_id_or_blinded_path) as blinded_path_fd:
try:
blinded_path = OnionWireSerializer.read_field(
fd=blinded_path_fd,
field_type='blinded_path',
count=1)
logger.debug(f'blinded path: {blinded_path!r}')
except Exception as e:
logger.error(f'e!r')
raise
introduction_point = blinded_path['first_node_id']
if len(introduction_point) != 33:
raise Exception('first_node_id not a nodeid but a sciddir, which is not supported')
# Note: blinded_path specifies type sciddir_or_nodeid for first_node_id
# but only nodeid is supported in onion_message context;
# https://github.com/lightning/bolts/blob/master/04-onion-routing.md
# "MUST set first_node_id to N0"
if lnwallet.node_keypair.pubkey == introduction_point:
hops_data = []
blinded_node_ids = []
# blinded path introduction point is me
our_blinding = blinded_path['first_path_key']
our_payload = blinded_path['path'][0]
remaining_blinded_path = blinded_path['path'][1:]
assert len(remaining_blinded_path) > 0, 'sending to myself?'
# decrypt
shared_secret = get_ecdh(lnwallet.node_keypair.privkey, our_blinding)
recipient_data = decrypt_onionmsg_data_tlv(
shared_secret=shared_secret,
encrypted_recipient_data=our_payload['encrypted_recipient_data']
)
peer = lnwallet.lnpeermgr.get_peer_by_pubkey(recipient_data['next_node_id']['node_id'])
assert peer, 'next_node_id not a peer'
# blinding override?
next_path_key_override = recipient_data.get('next_path_key_override')
if next_path_key_override:
next_path_key = next_path_key_override.get('path_key')
else:
# E_i+1=SHA256(E_i||ss_i) * E_i
blinding_factor = sha256(our_blinding + shared_secret)
blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
next_public_key_int = ecc.ECPubkey(our_blinding) * blinding_factor_int
next_path_key = next_public_key_int.get_public_key_bytes()
path_key = next_path_key
else:
# we need a route to introduction point
r = create_route_to_introduction_point(lnwallet, blinded_path, introduction_point, session_key)
peer, path_key, hops_data, blinded_node_ids = r
remaining_blinded_path = blinded_path['path']
if not isinstance(remaining_blinded_path, list): # doesn't return list when num items == 1
remaining_blinded_path = [remaining_blinded_path]
# append (remaining) blinded path and payload
blinded_path_blinded_ids = []
for i, onionmsg_hop in enumerate(remaining_blinded_path):
blinded_path_blinded_ids.append(onionmsg_hop.get('blinded_node_id'))
payload = {
'encrypted_recipient_data': {'encrypted_recipient_data': onionmsg_hop['encrypted_recipient_data']}
}
if i == len(remaining_blinded_path) - 1: # final hop
payload.update(destination_payload)
hop = OnionHopsDataSingle(tlv_stream_name='onionmsg_tlv', payload=payload)
hops_data.append(hop)
payment_path_pubkeys = blinded_node_ids + blinded_path_blinded_ids
packet = new_onion_packet(payment_path_pubkeys, session_key, hops_data, onion_message=True)
packet_b = packet.to_bytes()
else: # node pubkey
pubkey = node_id_or_blinded_path
if lnwallet.node_keypair.pubkey == pubkey:
raise Exception('cannot send to myself')
hops_data = []
peer = lnwallet.lnpeermgr.get_peer_by_pubkey(pubkey)
if peer:
# destination is our direct peer, no need to route-find
path = [PathEdge(short_channel_id=None, start_node=None, end_node=pubkey)]
else:
path = create_onion_message_route_to(lnwallet, pubkey)
# first edge must be to our peer
peer = lnwallet.lnpeermgr.get_peer_by_pubkey(path[0].end_node)
assert peer, 'first hop not a peer'
hops_data = [
OnionHopsDataSingle(
tlv_stream_name='onionmsg_tlv',
blind_fields={'next_node_id': {'node_id': x.end_node}},
) for x in path[1:]
]
final_hop = OnionHopsDataSingle(
tlv_stream_name='onionmsg_tlv',
payload=destination_payload,
)
hops_data.append(final_hop)
payment_path_pubkeys = [edge.end_node for edge in path]
hop_shared_secrets, blinded_node_ids = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
encrypt_hops_recipient_data('onionmsg_tlv', hops_data, hop_shared_secrets)
packet = new_onion_packet(blinded_node_ids, session_key, hops_data)
packet_b = packet.to_bytes()
path_key = ecc.ECPrivkey(session_key).get_public_key_bytes()
peer.send_message(
"onion_message",
path_key=path_key,
len=len(packet_b),
onion_message_packet=packet_b
)
def get_blinded_reply_paths(
lnwallet: 'LNWallet',
path_id: bytes,
*,
max_paths: int = REQUEST_REPLY_PATHS_MAX,
) -> Sequence[dict]:
"""construct a list of blinded reply-paths for onion message.
"""
mydata = {'path_id': {'data': path_id}} # same path_id used in every reply path
paths, payinfo = get_blinded_paths_to_me(lnwallet, mydata, max_paths=max_paths, onion_message=True)
return paths
def get_blinded_paths_to_me(
lnwallet: 'LNWallet',
final_recipient_data: dict,
*,
max_paths: int = PAYMENT_PATHS_MAX,
my_channels: Optional[Sequence['Channel']] = None,
onion_message: bool = False
) -> Tuple[Sequence[dict], Sequence[dict]]:
"""construct a list of blinded paths.
current logic:
- uses channels peers if not onion_message
- uses current onion_message capable channel peers if exist and if onion_message
- otherwise, uses current onion_message capable peers if onion_message
- reply_path introduction points are direct peers only (TODO: longer paths)"""
# TODO: build longer paths and/or add dummy hops to increase privacy
if not my_channels:
my_active_channels = [chan for chan in lnwallet.channels.values() if chan.is_active()]
my_channels = my_active_channels
if onion_message:
my_channels = [chan for chan in my_channels if lnwallet.lnpeermgr.get_peer_by_pubkey(chan.node_id) and
lnwallet.lnpeermgr.get_peer_by_pubkey(chan.node_id).their_features.supports(LnFeatures.OPTION_ONION_MESSAGE_OPT)]
result = []
payinfo = []
mynodeid = lnwallet.node_keypair.pubkey
local_height = lnwallet.network.get_local_height()
if len(my_channels):
rchans = random_shuffled_copy(my_channels)
for chan in rchans[:max_paths]:
hop_extras = None
if not onion_message: # add hop_extras and payinfo, assumption: len(blinded_path) == 2 (us and peer)
# get policy
cp = get_mychannel_policy(chan.short_channel_id, chan.node_id, {chan.short_channel_id: chan})
dest_max_cltv_expiry = local_height + MAXIMUM_REMOTE_TO_SELF_DELAY_ACCEPTED
# TODO: for longer paths (>2), reverse traverse and calculate max_cltv_expiry at each intermediate hop
# and determine the cltv delta sums and fee sums of the hops for the payinfo struct.
# current assumption is len(blinded_path) == 2 (us and peer)
sum_cltv_expiry_delta = cp.cltv_delta
sum_fee_base_msat = cp.fee_base_msat
sum_fee_proportional_millionths = cp.fee_proportional_millionths
# path htlc limits
blinded_path_min_htlc_msat = cp.htlc_minimum_msat
blinded_path_max_htlc_msat = cp.htlc_maximum_msat
hop_extras = [{
# spec: MUST include encrypted_data_tlv.payment_relay for each non-final node.
'payment_relay': {
'cltv_expiry_delta': cp.cltv_delta,
'fee_base_msat': cp.fee_base_msat,
'fee_proportional_millionths': cp.fee_proportional_millionths,
},
# spec: MUST set encrypted_data_tlv.payment_constraints for each non-final node and MAY set it for the final node:
#
# max_cltv_expiry to the largest block height at which the route is allowed to be used, starting
# from the final node's chosen max_cltv_expiry height at which the route should expire, adding
# the final node's min_final_cltv_expiry_delta and then adding
# encrypted_data_tlv.payment_relay.cltv_expiry_delta at each hop.
#
# htlc_minimum_msat to the largest minimum HTLC value the nodes will allow.
'payment_constraints': {
'max_cltv_expiry': dest_max_cltv_expiry + cp.cltv_delta,
'htlc_minimum_msat': blinded_path_min_htlc_msat
}
}]
payinfo.append({
'fee_base_msat': sum_fee_base_msat,
'fee_proportional_millionths': sum_fee_proportional_millionths,
'cltv_expiry_delta': sum_cltv_expiry_delta + MIN_FINAL_CLTV_DELTA_ACCEPTED,
'htlc_minimum_msat': blinded_path_min_htlc_msat,
'htlc_maximum_msat': blinded_path_max_htlc_msat,
'flen': 0,
'features': bytes(0)
})
blinded_path = create_blinded_path(os.urandom(32), [chan.node_id, mynodeid], final_recipient_data,
hop_extras=hop_extras, channels=[chan] if not onion_message else None)
result.append(blinded_path)
elif onion_message:
# we can use peers even without channels for onion messages
my_onionmsg_peers = [peer for peer in lnwallet.lnpeermgr.peers.values() if
peer.their_features.supports(LnFeatures.OPTION_ONION_MESSAGE_OPT)]
if len(my_onionmsg_peers):
rpeers = random_shuffled_copy(my_onionmsg_peers)
for peer in rpeers[:max_paths]:
blinded_path = create_blinded_path(os.urandom(32), [peer.pubkey, mynodeid], final_recipient_data)
result.append(blinded_path)
return result, payinfo
class Timeout(Exception): pass
class OnionMessageManager(Logger):
"""handle state around onion message sends and receives.
- one instance per (ln)wallet
- association between onion message and their replies
- manage re-send attempts while iterating over possible routes. Onion messages are unreliable
and fail silently if they don't reach their destination (or the reply gets dropped along the route back),
so the BOLT-4 spec suggests to send multiple messages, each with a different route to the introduction point).
- forwards are best-effort. They should not need retrying, but a queue is used to limit the pacing of forwarding,
and limiting the number of outstanding forwards. Any onion message forwards arriving when the forward queue
is full will be dropped.
"""
SLEEP_DELAY = 1
REQUEST_REPLY_TIMEOUT = 30
REQUEST_REPLY_RETRY_DELAY = 5
FORWARD_RETRY_TIMEOUT = 4
FORWARD_RETRY_DELAY = 2
FORWARD_MAX_QUEUE = 3
class Request:
def __init__(self, *, payload: dict, node_id_or_blinded_paths: Union[bytes, Sequence[bytes]]):
self.future = asyncio.Future()
self.payload = payload
self.node_id_or_blinded_paths = node_id_or_blinded_paths
self.current_index: int = 0
# ensure node_id_or_blinded_paths is list
if isinstance(self.node_id_or_blinded_paths, bytes):
self.node_id_or_blinded_paths = [self.node_id_or_blinded_paths]
def get_next_destination(self) -> bytes:
"""get next path (round-robin)"""
dests = self.node_id_or_blinded_paths
dest = dests[self.current_index]
self.current_index = (self.current_index + 1) % len(dests)
return dest
def __init__(self, lnwallet: 'LNWallet'):
Logger.__init__(self)
self.network = None # type: Optional['Network']
self.taskgroup = None # type: OldTaskGroup
self.lnwallet = lnwallet
self.pending = {} # type: dict[bytes, OnionMessageManager.Request]
self.pending_lock = threading.Lock()
self.send_queue = asyncio.PriorityQueue()
self.forward_queue = asyncio.PriorityQueue()
def start_network(self, *, network: 'Network') -> None:
assert network
assert self.network is None, "already started"
self.network = network
self.taskgroup = OldTaskGroup()
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
@log_exceptions
async def main_loop(self) -> None:
self.logger.info("starting taskgroup.")
async with self.taskgroup as group:
await group.spawn(self.process_send_queue())
await group.spawn(self.process_forward_queue())
self.logger.info("taskgroup stopped.")
async def stop(self) -> None:
if self.taskgroup:
await self.taskgroup.cancel_remaining()
async def process_forward_queue(self) -> None:
while True:
scheduled, expires, onion_packet, blinding, node_id = await self.forward_queue.get()
if expires <= now():
self.logger.debug(f'forward expired {node_id=}')
continue
if scheduled > now():
# return to queue
self.forward_queue.put_nowait((scheduled, expires, onion_packet, blinding, node_id))
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
continue
try:
onion_packet_b = onion_packet.to_bytes()
next_peer = self.lnwallet.lnpeermgr.get_peer_by_pubkey(node_id)
if not next_peer.their_features.supports(LnFeatures.OPTION_ONION_MESSAGE_OPT):
self.logger.debug('forward dropped, next peer is not ONION_MESSAGE capable')
continue
next_peer.send_message(
"onion_message",
path_key=blinding,
len=len(onion_packet_b),
onion_message_packet=onion_packet_b
)
except BaseException as e:
self.logger.debug(f'error while sending {node_id=} e={e!r}')
# TODO: it is debatable whether we want to retry a forward.
self.forward_queue.put_nowait((now() + self.FORWARD_RETRY_DELAY, expires, onion_packet, blinding, node_id))
def submit_forward(
self, *,
onion_packet: OnionPacket,
blinding: bytes,
node_id: bytes) -> None:
if self.forward_queue.qsize() >= self.FORWARD_MAX_QUEUE:
self.logger.debug('forward queue full, dropping packet')
return
expires = now() + self.FORWARD_RETRY_TIMEOUT
queueitem = (now(), expires, onion_packet, blinding, node_id)
self.forward_queue.put_nowait(queueitem)
async def process_send_queue(self) -> None:
while True:
scheduled, expires, key = await self.send_queue.get()
req = self.pending.get(key)
if req is None:
self.logger.debug(f'no data for key {key=}')
continue
if req.future.done():
self.logger.debug(f'has result! {key=}')
continue
if expires <= now():
self.logger.debug(f'expired {key=}')
req.future.set_exception(Timeout())
continue
if scheduled > now():
# return to queue
self.logger.debug(f'return to queue {key=}, {scheduled - now()}')
self.send_queue.put_nowait((scheduled, expires, key))
await asyncio.sleep(self.SLEEP_DELAY) # sleep here, as the first queue item wasn't due yet
continue
try:
self._send_pending_message(key)
except BaseException as e:
self.logger.debug(f'error while sending {key=}: ', exc_info=True)
req.future.set_exception(copy.copy(e))
# NOTE: above, when passing the caught exception instance e directly it leads to GeneratorExit() in
if isinstance(e, NoRouteFound) and e.peer_address:
await self.lnwallet.lnpeermgr.add_peer(str(e.peer_address))
else:
self.logger.debug(f'resubmit {key=}')
self.send_queue.put_nowait((now() + self.REQUEST_REPLY_RETRY_DELAY, expires, key))
def _remove_pending_message(self, key: bytes) -> None:
with self.pending_lock:
if key in self.pending:
del self.pending[key]
def submit_send(
self, *,
payload: dict,
node_id_or_blinded_paths: Union[bytes, Sequence[bytes]],
key: Optional[bytes] = None) -> 'Task':
"""Add onion message to queue for sending. Queued onion message payloads
are supplied with a path_id and a reply_path to determine which request
corresponds with arriving replies.
If caller has provided 'reply_path' in payload, caller should also provide associating key.
:return: returns awaitable task"""
if not key:
key = os.urandom(8)
assert type(key) is bytes and len(key) >= 8
self.logger.debug(f'submit_send {key=} {payload=} {node_id_or_blinded_paths=}')
req = OnionMessageManager.Request(payload=payload, node_id_or_blinded_paths=node_id_or_blinded_paths)
with self.pending_lock:
if key in self.pending:
raise Exception(f'{key=} already exists!')
self.pending[key] = req
# tuple = (when to process, when it expires, key)
expires = now() + self.REQUEST_REPLY_TIMEOUT
queueitem = (now(), expires, key)
self.send_queue.put_nowait(queueitem)
task = asyncio.create_task(self._wait_task(key, req.future))
return task
async def _wait_task(self, key: bytes, future: asyncio.Future):
try:
return await future
finally:
self._remove_pending_message(key)
def _send_pending_message(self, key: bytes) -> None:
"""adds reply_path to payload"""
req = self.pending[key]
payload = req.payload
dest = req.get_next_destination()
self.logger.debug(f'send_pending_message {key=} {payload=} {dest=}')
final_payload = copy.deepcopy(payload)
if 'reply_path' not in final_payload:
# unless explicitly set in payload, generate reply_path here
path_id = self._path_id_from_payload_and_key(payload, key)
reply_paths = get_blinded_reply_paths(self.lnwallet, path_id, max_paths=1)
if not reply_paths:
raise Exception(f'Could not create a reply_path for {key=}. No active peers?')
final_payload['reply_path'] = {'path': reply_paths}
# NOTE: we could also try alternate paths to introduction point (the non-blinded part of the route)
# when retrying, this is currently not done.
# (send_onion_message_to decides path, without knowledge of prev attempts)
send_onion_message_to(self.lnwallet, dest, final_payload)
def _path_id_from_payload_and_key(self, payload: dict, key: bytes) -> bytes:
# TODO: use payload to determine prefix?
return b'electrum' + key
def _get_request_for_path_id(self, recipient_data: dict) -> Optional[Request]:
path_id = recipient_data.get('path_id', {}).get('data')
if not path_id:
return None
if not path_id[:8] == b'electrum':
self.logger.warning('not a reply to our request (unknown path_id prefix)')
return None
key = path_id[8:]
req = self.pending.get(key)
if req is None:
self.logger.warning('not a reply to our request (unknown request)')
return req
def on_onion_message_received(self, recipient_data: dict, payload: dict) -> None:
# we are destination, sanity checks
# - if `encrypted_data_tlv` contains `allowed_features`:
# - MUST ignore the message if:
# - `encrypted_data_tlv.allowed_features.features` contains an unknown feature bit (even if it is odd).
# - the message uses a feature not included in `encrypted_data_tlv.allowed_features.features`.
if 'allowed_features' in recipient_data:
# Note: These checks will be usecase specific (e.g. BOLT12) and probably should be checked
# by consumers of the message.
self.logger.debug(f'allowed_features={recipient_data["allowed_features"].get("features", b"").hex()}')
# - if `path_id` is set and corresponds to a path the reader has previously published in a `reply_path`:
# - if the onion message is not a reply to that previous onion:
# - MUST ignore the onion message
req = self._get_request_for_path_id(recipient_data)
if req is None:
# unsolicited onion_message
self.on_onion_message_received_unsolicited(recipient_data, payload)
else:
self.on_onion_message_received_reply(req, recipient_data, payload)
def on_onion_message_received_reply(self, request: Request, recipient_data: dict, payload: dict) -> None:
assert request is not None, 'Request is mandatory'
request.future.set_result((recipient_data, payload))
def on_onion_message_received_unsolicited(self, recipient_data: dict, payload: dict) -> None:
self.logger.debug('unsolicited onion_message received')
self.logger.debug(f'payload: {payload!r}')
# This func currently only accepts simple text 'message' payload, a.k.a 'unknown_tag_1'
# in the bolt-4 test vectors.
#
# TODO: for BOLT-12, handle invoice_request here, which should correspond with a previously generated Offer.
# as this is not strictly part of BOLT-4, we should probably create a registration mechanism
# for various types of payloads, so we can let external code plug into onion messages
# e.g. via a decorator, something like
# @onion_message_request_handler(payload_key='invoice_request') for BOLT12 invoice requests.
if 'message' not in payload:
self.logger.error('Unsupported onion message payload')
return
if 'text' not in payload['message'] or not isinstance(payload['message']['text'], bytes):
self.logger.error('Malformed \'message\' payload')
return
try:
text = payload['message']['text'].decode('utf-8')
except Exception as e:
self.logger.error(f'Malformed \'message\' payload: {e!r}')
return
self.logger.info(f'onion message with text received: {text}')
def on_onion_message_forward(
self,
recipient_data: dict,
onion_packet: OnionPacket,
blinding: bytes,
shared_secret: bytes
) -> None:
if recipient_data.get('path_id'):
self.logger.error('cannot forward onion_message, path_id in encrypted_data_tlv')
return
next_node_id = recipient_data.get('next_node_id')
if not next_node_id:
self.logger.error('cannot forward onion_message, next_node_id missing in encrypted_data_tlv')
return
next_node_id = next_node_id['node_id']
is_dummy_hop = False
if next_node_id == self.lnwallet.node_keypair.pubkey:
self.logger.debug('dummy hop')
is_dummy_hop = True
else:
# not a dummy hop, check if we are configured to forward
if not self.network.config.EXPERIMENTAL_LN_FORWARD_PAYMENTS:
self.logger.info(
'onion_message dropped (not forwarding due to lightning_forward_payments config option disabled')
return
# is next_node one of our peers?
next_peer = self.lnwallet.lnpeermgr.get_peer_by_pubkey(next_node_id)
if not next_peer:
self.logger.info(f'next node {next_node_id.hex()} not a peer, dropping message')
return
# blinding override?
next_path_key_override = recipient_data.get('next_path_key_override')
if next_path_key_override:
next_path_key = next_path_key_override.get('path_key')
else:
# E_i+1=SHA256(E_i||ss_i) * E_i
blinding_factor = sha256(blinding + shared_secret)
blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
next_public_key_int = ecc.ECPubkey(blinding) * blinding_factor_int
next_path_key = next_public_key_int.get_public_key_bytes()
if is_dummy_hop:
self.process_onion_message_packet(next_path_key, onion_packet)
return
self.submit_forward(onion_packet=onion_packet, blinding=next_path_key, node_id=next_node_id)
def on_onion_message(self, payload: dict) -> None:
"""handle arriving onion_message."""
path_key = payload.get('path_key')
if not path_key:
self.logger.error('missing path_key')
return
packet = payload.get('onion_message_packet')
if payload.get('len', 0) != len(packet):
self.logger.error('invalid/missing length')
return
self.logger.debug('handling onion message')
onion_packet = OnionPacket.from_bytes(packet)
self.process_onion_message_packet(path_key, onion_packet)
def process_onion_message_packet(self, blinding: bytes, onion_packet: OnionPacket) -> None:
our_privkey = blinding_privkey(self.lnwallet.node_keypair.privkey, blinding)
processed_onion_packet = process_onion_packet(onion_packet, our_privkey, is_onion_message=True, tlv_stream_name='onionmsg_tlv')
payload = processed_onion_packet.hop_data.payload
self.logger.debug(f'onion peeled: {processed_onion_packet!r}')
if not processed_onion_packet.are_we_final:
if any([x not in ['encrypted_recipient_data'] for x in payload.keys()]):
self.logger.error('unexpected data in payload') # non-final nodes only encrypted_recipient_data
return
# decrypt
shared_secret = get_ecdh(self.lnwallet.node_keypair.privkey, blinding)
recipient_data = decrypt_onionmsg_data_tlv(
shared_secret=shared_secret,
encrypted_recipient_data=payload['encrypted_recipient_data']['encrypted_recipient_data']
)
self.logger.debug(f'parsed recipient_data: {recipient_data!r}')
if processed_onion_packet.are_we_final:
self.on_onion_message_received(recipient_data, payload)
else:
self.on_onion_message_forward(recipient_data, processed_onion_packet.next_packet, blinding, shared_secret)