import asyncio import json import os import ssl import threading from typing import TYPE_CHECKING, Optional, Dict, Sequence, Tuple, Iterable, List from decimal import Decimal import math import time import attr import aiohttp from electrum_ecc import ECPrivkey import electrum_aionostr as aionostr import electrum_aionostr.key from electrum_aionostr.event import Event from electrum_aionostr.util import to_nip19 from collections import defaultdict from .i18n import _ from .logging import Logger from .crypto import sha256, ripemd from .bitcoin import script_to_p2wsh, opcodes, dust_threshold, DummyAddress, construct_witness, construct_script from . import bitcoin from .transaction import ( PartialTxInput, PartialTxOutput, PartialTransaction, Transaction, TxInput, TxOutpoint, script_GetOp, match_script_against_template, OPPushDataGeneric, OPPushDataPubkey ) from .util import ( log_exceptions, ignore_exceptions, BelowDustLimit, OldTaskGroup, ca_path, gen_nostr_ann_pow, get_nostr_ann_pow_amount, make_aiohttp_proxy_connector, get_running_loop, get_asyncio_loop, wait_for2, run_sync_function_on_asyncio_thread, trigger_callback, NoDynamicFeeEstimates, UserFacingException, ) from . import lnutil from .lnutil import (hex_to_bytes, REDEEM_AFTER_DOUBLE_SPENT_DELAY, Keypair, MIN_FINAL_CLTV_DELTA_ACCEPTED) from .lnaddr import lndecode from .json_db import StoredObject, stored_in from . import constants from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE from .fee_policy import FeePolicy from .invoices import Invoice, PR_PAID from .lnonion import OnionRoutingFailure, OnionFailureCode from .lnsweep import SweepInfo if TYPE_CHECKING: from .network import Network from .wallet import Abstract_Wallet from .lnwatcher import LNWatcher from .lnworker import LNWallet from .lnchannel import Channel from .simple_config import SimpleConfig from aiohttp_socks import ProxyConnector SWAP_TX_SIZE = 150 # default tx size, used for mining fee estimation MIN_SWAP_AMOUNT_SAT = 20_000 MIN_LOCKTIME_DELTA = 60 LOCKTIME_DELTA_REFUND = 70 MAX_LOCKTIME_DELTA = 100 MIN_FINAL_CLTV_DELTA_FOR_CLIENT = 3 * 144 # note: put in invoice, but is not enforced by receiver in lnpeer.py assert MIN_LOCKTIME_DELTA <= LOCKTIME_DELTA_REFUND <= MAX_LOCKTIME_DELTA assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED assert MAX_LOCKTIME_DELTA < MIN_FINAL_CLTV_DELTA_FOR_CLIENT # The script of the reverse swaps has one extra check in it to verify # that the length of the preimage is 32. This is required because in # the reverse swaps the preimage is generated by the user and to # settle the hold invoice, you need a preimage with 32 bytes . If that # check wasn't there the user could generate a preimage with a # different length which would still allow for claiming the onchain # coins but the invoice couldn't be settled # Unified witness-script for all swaps. Historically with Boltz-backend, this was the reverse-swap script. WITNESS_TEMPLATE_SWAP = [ opcodes.OP_SIZE, OPPushDataGeneric(None), # idx 1. length of preimage opcodes.OP_EQUAL, opcodes.OP_IF, opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), # idx 5. payment_hash opcodes.OP_EQUALVERIFY, OPPushDataPubkey, # idx 7. claim_pubkey opcodes.OP_ELSE, opcodes.OP_DROP, OPPushDataGeneric(None), # idx 10. locktime opcodes.OP_CHECKLOCKTIMEVERIFY, opcodes.OP_DROP, OPPushDataPubkey, # idx 13. refund_pubkey opcodes.OP_ENDIF, opcodes.OP_CHECKSIG ] def _check_swap_scriptcode( *, redeem_script: bytes, lockup_address: str, payment_hash: bytes, locktime: int, refund_pubkey: Optional[bytes], # note: We don't need to check the counterparty's key. claim_pubkey: Optional[bytes], # Can use None in that case. ) -> None: assert (refund_pubkey is not None) or (claim_pubkey is not None), "at least one pubkey must be set" parsed_script = [x for x in script_GetOp(redeem_script)] if not match_script_against_template(redeem_script, WITNESS_TEMPLATE_SWAP): raise Exception("rswap check failed: scriptcode does not match template") if script_to_p2wsh(redeem_script) != lockup_address: raise Exception("rswap check failed: inconsistent scriptcode and address") if ripemd(payment_hash) != parsed_script[5][1]: raise Exception("rswap check failed: our preimage not in script") claim_pubkey = claim_pubkey or parsed_script[7][1] if claim_pubkey != parsed_script[7][1]: raise Exception("rswap check failed: our pubkey not in script") refund_pubkey = refund_pubkey or parsed_script[13][1] if refund_pubkey != parsed_script[13][1]: raise Exception("rswap check failed: our pubkey not in script") if locktime != int.from_bytes(parsed_script[10][1], byteorder='little'): raise Exception("rswap check failed: inconsistent locktime and script") # let's just rebuild the full script from scratch... if redeem_script != _construct_swap_scriptcode( payment_hash=payment_hash, locktime=locktime, refund_pubkey=refund_pubkey, claim_pubkey=claim_pubkey, ): raise Exception("failed to rebuild swap script from scratch") def _construct_swap_scriptcode( payment_hash: bytes, locktime: int, refund_pubkey: bytes, claim_pubkey: bytes, ) -> bytes: assert isinstance(payment_hash, bytes) and len(payment_hash) == 32 assert isinstance(locktime, int) and (0 <= locktime <= bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX) assert isinstance(refund_pubkey, bytes) and len(refund_pubkey) == 33 assert isinstance(claim_pubkey, bytes) and len(claim_pubkey) == 33 return construct_script( WITNESS_TEMPLATE_SWAP, values={1: 32, 5: ripemd(payment_hash), 7: claim_pubkey, 10: locktime, 13: refund_pubkey} ) class SwapServerError(Exception): def __init__(self, message=None): self.message = message super().__init__(message) def __str__(self): if self.message: return self.message return _("The swap server errored or is unreachable.") def now(): return int(time.time()) @attr.s(frozen=True) class SwapFees: percentage = attr.ib(type=int) mining_fee = attr.ib(type=int) min_amount = attr.ib(type=int) max_forward = attr.ib(type=int) max_reverse = attr.ib(type=int) @attr.frozen class SwapOffer: pairs = attr.ib(type=SwapFees) relays = attr.ib(type=list[str]) pow_bits = attr.ib(type=int) server_pubkey = attr.ib(type=str) timestamp = attr.ib(type=int) @property def server_npub(self): return to_nip19('npub', self.server_pubkey) @stored_in('submarine_swaps') @attr.s class SwapData(StoredObject): is_reverse = attr.ib(type=bool) # for whoever is running code (PoV of client or server) locktime = attr.ib(type=int) # onchain, abs onchain_amount = attr.ib(type=int) # in sats lightning_amount = attr.ib(type=int) # in sats redeem_script = attr.ib(type=bytes, converter=hex_to_bytes) preimage = attr.ib(type=Optional[bytes], converter=hex_to_bytes) prepay_hash = attr.ib(type=Optional[bytes], converter=hex_to_bytes) privkey = attr.ib(type=bytes, converter=hex_to_bytes) lockup_address = attr.ib(type=str) receive_address = attr.ib(type=str) funding_txid = attr.ib(type=Optional[str]) spending_txid = attr.ib(type=Optional[str]) is_redeemed = attr.ib(type=bool) _funding_prevout = None # type: Optional[TxOutpoint] # for RBF _payment_hash = None _payment_pending = False # for forward swaps @property def payment_hash(self) -> bytes: return self._payment_hash def is_funded(self) -> bool: return self._payment_pending or bool(self.funding_txid) def pubkey_to_rgb_color(swapserver_pubkey: str) -> Tuple[int, int, int]: assert isinstance(swapserver_pubkey, str), type(swapserver_pubkey) assert len(swapserver_pubkey) == 64, len(swapserver_pubkey) input_hash = int.from_bytes(sha256(swapserver_pubkey), byteorder="big") r = (input_hash & 0xFF0000) >> 16 g = (input_hash & 0x00FF00) >> 8 b = input_hash & 0x0000FF return r, g, b class SwapManager(Logger): network: Optional['Network'] = None lnwatcher: Optional['LNWatcher'] = None def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'): Logger.__init__(self) self.mining_fee = None self.percentage = None self._min_amount = None self._max_forward = None self._max_reverse = None self.wallet = wallet self.config = wallet.config self.lnworker = lnworker self.lnwatcher = self.lnworker.lnwatcher self.config = wallet.config self.taskgroup = OldTaskGroup() self.dummy_address = DummyAddress.SWAP # note: accessing swaps dicts (besides simple lookup) needs swaps_lock self.swaps_lock = threading.Lock() self._swaps = self.wallet.db.get_dict('submarine_swaps') # type: Dict[str, SwapData] self._swaps_by_funding_outpoint = {} # type: Dict[TxOutpoint, SwapData] self._swaps_by_lockup_address = {} # type: Dict[str, SwapData] for payment_hash_hex, swap in self._swaps.items(): payment_hash = bytes.fromhex(payment_hash_hex) swap._payment_hash = payment_hash self._add_or_reindex_swap(swap, is_new=False) if not swap.is_reverse and not swap.is_redeemed: self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback) self._prepayments = {} # type: Dict[bytes, bytes] # fee_rhash -> rhash for k, swap in self._swaps.items(): if swap.prepay_hash is not None: self._prepayments[swap.prepay_hash] = bytes.fromhex(k) self.is_server = False # overridden by swapserver plugin if enabled self.is_initialized = asyncio.Event() self.pairs_updated = asyncio.Event() def start_network(self, network: 'Network'): assert network if self.network is not None: self.logger.info('start_network: already started') return self.logger.info('start_network: starting main loop') self.network = network with self.swaps_lock: swaps_items = list(self._swaps.items()) for k, swap in swaps_items: if swap.is_redeemed: continue self.add_lnwatcher_callback(swap) asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) @log_exceptions async def run_nostr_server(self): await self.set_nostr_proof_of_work() while self.wallet.has_password() and self.wallet.get_unlocked_password() is None: self.logger.info("This wallet is password-protected. Please unlock it to start the swapserver plugin") await asyncio.sleep(10) with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport: await transport.is_connected.wait() self.logger.info(f'nostr is connected') # will publish a new announcement if liquidity changed or every OFFER_UPDATE_INTERVAL_SEC last_update = time.time() while True: await asyncio.sleep(transport.LIQUIDITY_UPDATE_INTERVAL_SEC) previous_max_forward = self._max_forward previous_max_reverse = self._max_reverse previous_mining_fee = self.mining_fee try: self.server_update_pairs() except Exception: self.logger.exception("server_update_pairs failed") continue liquidity_changed = self._max_forward != previous_max_forward \ or self._max_reverse != previous_max_reverse mining_fees_changed = self.mining_fee != previous_mining_fee if liquidity_changed or mining_fees_changed: self.logger.debug(f"updating announcement: {liquidity_changed=}, {mining_fees_changed=}") elif time.time() - last_update < transport.OFFER_UPDATE_INTERVAL_SEC: continue await transport.publish_offer(self) last_update = time.time() @log_exceptions async def main_loop(self): tasks = [self.pay_pending_invoices()] if self.is_server: # nostr and http are not mutually exclusive if self.config.SWAPSERVER_PORT: tasks.append(self.http_server.run()) if self.config.NOSTR_RELAYS: tasks.append(self.run_nostr_server()) async with self.taskgroup as group: for task in tasks: await group.spawn(task) async def stop(self): await self.taskgroup.cancel_remaining() def create_transport(self) -> 'SwapServerTransport': from .lnutil import generate_random_keypair if self.config.SWAPSERVER_URL: return HttpTransport(self.config, self) else: keypair = self.lnworker.nostr_keypair if self.is_server else generate_random_keypair() return NostrTransport(self.config, self, keypair) async def set_nostr_proof_of_work(self) -> None: current_pow = get_nostr_ann_pow_amount( self.lnworker.nostr_keypair.pubkey[1:], self.config.SWAPSERVER_ANN_POW_NONCE ) if current_pow >= self.config.SWAPSERVER_POW_TARGET: self.logger.debug(f"Reusing existing PoW nonce for nostr announcement.") return self.logger.info(f"Generating PoW for nostr announcement. Target: {self.config.SWAPSERVER_POW_TARGET}") nonce, pow_amount = await gen_nostr_ann_pow( self.lnworker.nostr_keypair.pubkey[1:], # pubkey without prefix self.config.SWAPSERVER_POW_TARGET, ) self.logger.debug(f"Found {pow_amount} bits of work for Nostr announcement.") self.config.SWAPSERVER_ANN_POW_NONCE = nonce async def pay_invoice(self, key): self.logger.info(f'trying to pay invoice {key}') self.invoices_to_pay[key] = 1000000000000 # lock try: invoice = self.wallet.get_invoice(key) success, log = await self.lnworker.pay_invoice(invoice) except Exception as e: self.logger.info(f'exception paying {key}, will not retry') self.invoices_to_pay.pop(key, None) return if not success: self.logger.info(f'failed to pay {key}, will retry in 10 minutes') self.invoices_to_pay[key] = now() + 600 else: self.logger.info(f'paid invoice {key}') self.invoices_to_pay.pop(key, None) async def pay_pending_invoices(self): self.invoices_to_pay = {} while True: await asyncio.sleep(5) for key, not_before in list(self.invoices_to_pay.items()): if now() < not_before: continue await self.taskgroup.spawn(self.pay_invoice(key)) def cancel_normal_swap(self, swap: SwapData): """ we must not have broadcast the funding tx """ if swap is None: return if swap.is_funded(): self.logger.info(f'cannot cancel swap {swap.payment_hash.hex()}: already funded') return self._fail_swap(swap, 'user cancelled') def _fail_swap(self, swap: SwapData, reason: str): self.logger.info(f'failing swap {swap.payment_hash.hex()}: {reason}') if not swap.is_reverse and swap.payment_hash in self.lnworker.hold_invoice_callbacks: self.lnworker.unregister_hold_invoice(swap.payment_hash) # Peer.maybe_fulfill_htlc will fail incoming htlcs if there is no payment info self.lnworker.delete_payment_info(swap.payment_hash.hex()) self.lnworker.clear_invoices_cache() self.lnwatcher.remove_callback(swap.lockup_address) if not swap.is_funded(): with self.swaps_lock: if self._swaps.pop(swap.payment_hash.hex(), None) is None: self.logger.debug(f"swap {swap.payment_hash.hex()} has already been deleted.") if swap._funding_prevout is not None: self._swaps_by_funding_outpoint.pop(swap._funding_prevout, None) self._swaps_by_lockup_address.pop(swap.lockup_address, None) if swap.prepay_hash is not None: self._prepayments.pop(swap.prepay_hash, None) if self.lnworker.get_payment_status(swap.prepay_hash) != PR_PAID: self.lnworker.delete_payment_info(swap.prepay_hash.hex()) self.lnworker.delete_payment_bundle(swap.payment_hash) @classmethod def extract_preimage(cls, swap: SwapData, claim_tx: Transaction) -> Optional[bytes]: for txin in claim_tx.inputs(): witness = txin.witness_elements() if not witness or len(witness) < 2: # tx may be unsigned continue preimage = witness[1] if sha256(preimage) == swap.payment_hash: return preimage return None @log_exceptions async def _claim_swap(self, swap: SwapData) -> None: assert self.network assert self.lnwatcher if not self.lnwatcher.adb.is_up_to_date(): return current_height = self.network.get_local_height() remaining_time = swap.locktime - current_height txos = self.lnwatcher.adb.get_addr_outputs(swap.lockup_address) for txin in txos.values(): if swap.is_reverse and txin.value_sats() < swap.onchain_amount: # amount too low, we must not reveal the preimage continue break else: # swap not funded. txin = None # if it is a normal swap, we might have double spent the funding tx # in that case we need to fail the HTLCs if remaining_time <= 0: self._fail_swap(swap, 'expired') if txin: # the swap is funded # note: swap.funding_txid can change due to RBF, it will get updated here: swap.funding_txid = txin.prevout.txid.hex() swap._funding_prevout = txin.prevout self._add_or_reindex_swap(swap, is_new=False) # to update _swaps_by_funding_outpoint funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex()) spent_height = txin.spent_height # set spending_txid (even if tx is local), for GUI grouping swap.spending_txid = txin.spent_txid # discard local spenders if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: spent_height = None if spent_height is not None: if spent_height > 0 and swap.preimage: if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY: self.logger.info(f'stop watching swap {swap.lockup_address}') swap.is_redeemed = True # cleanup self.lnwatcher.remove_callback(swap.lockup_address) if not swap.is_reverse: self.lnworker.delete_payment_bundle(swap.payment_hash) self.lnworker.unregister_hold_invoice(swap.payment_hash) if not swap.is_reverse: if swap.preimage is None and spent_height is not None: # extract the preimage, add it to lnwatcher claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid) preimage = self.extract_preimage(swap, claim_tx) if preimage: swap.preimage = preimage self.logger.info(f'found preimage: {preimage.hex()}') self.lnworker.save_preimage(swap.payment_hash, preimage) else: # this is our refund tx if spent_height > 0: self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}') self._fail_swap(swap, 'refund tx confirmed') return if remaining_time > 0: # too early for refund return if swap.preimage: # we have been paid. do not try to get refund. return else: if swap.preimage is None: swap.preimage = self.lnworker.get_preimage(swap.payment_hash) if swap.preimage is None: if funding_height.conf <= 0: return key = swap.payment_hash.hex() if remaining_time <= MIN_LOCKTIME_DELTA: if key in self.invoices_to_pay: # fixme: should consider cltv of ln payment self.logger.info(f'locktime too close {key} {remaining_time}') self.invoices_to_pay.pop(key, None) return if key not in self.invoices_to_pay: self.invoices_to_pay[key] = 0 return if self.network.config.TEST_SWAPSERVER_REFUND: # for testing: do not create claim tx return if spent_height is not None and spent_height > 0: return txin, locktime = self.create_claim_txin(txin=txin, swap=swap) # note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation name = 'swap claim' if swap.is_reverse else 'swap refund' can_be_batched = True sweep_info = SweepInfo( txin=txin, cltv_abs=locktime, txout=None, name=name, can_be_batched=can_be_batched, dust_override=False, ) try: self.wallet.txbatcher.add_sweep_input('swaps', sweep_info) except BelowDustLimit: self.logger.info('utxo value below dust threshold') return except NoDynamicFeeEstimates: self.logger.info('got NoDynamicFeeEstimates') return def get_fee_for_txbatcher(self): return self._get_tx_fee(self.config.FEE_POLICY_SWAPS) def _get_tx_fee(self, policy_descriptor: str): fee_policy = FeePolicy(policy_descriptor) return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True) def _sanity_check_swap_costs( self, *, incoming_sat: int, outgoing_sat: int, ) -> None: """The user should have already seen the swap amounts, and hence the cost. These are just some last-minute sanity checks that the cost of the swap is not insane. """ costs_abs = outgoing_sat - incoming_sat costs_ratio = 1 - incoming_sat / outgoing_sat if costs_abs < 10_000: # "small" amounts are exempt from checks return exc = UserFacingException(_("Total swap costs are insane.") + f"\n({costs_ratio=}, {costs_abs=} sat)") if costs_ratio > 0.25: raise exc if costs_abs > 1_000_000: if costs_ratio > 0.15: raise exc def get_swap(self, payment_hash: bytes) -> Optional[SwapData]: # for history swap = self._swaps.get(payment_hash.hex()) if swap: return swap payment_hash = self._prepayments.get(payment_hash) if payment_hash: return self._swaps.get(payment_hash.hex()) return None def add_lnwatcher_callback(self, swap: SwapData) -> None: callback = lambda: self._claim_swap(swap) self.lnwatcher.add_callback(swap.lockup_address, callback) async def hold_invoice_callback(self, payment_hash: bytes) -> None: # note: this assumes the wallet has been unlocked key = payment_hash.hex() if swap := self._swaps.get(key): if not swap.is_funded(): output = self.create_funding_output(swap) self.wallet.txbatcher.add_payment_output('swaps', output) swap._payment_pending = True else: self.logger.info(f'key not in swaps {key}') def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None): """ server method """ assert lightning_amount_sat if payment_hash.hex() in self._swaps: raise Exception("payment_hash already in use") locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND if self.network.blockchain().is_tip_stale(): raise Exception("our blockchain tip is stale") our_privkey = os.urandom(32) our_pubkey = ECPrivkey(our_privkey).get_public_key_bytes(compressed=True) onchain_amount_sat = self._get_recv_amount(lightning_amount_sat, is_reverse=True) # what the client is going to receive if not onchain_amount_sat: raise Exception("no onchain amount") redeem_script = _construct_swap_scriptcode( payment_hash=payment_hash, locktime=locktime, refund_pubkey=our_pubkey, claim_pubkey=their_pubkey, ) swap, invoice, prepay_invoice = self.add_normal_swap( redeem_script=redeem_script, locktime=locktime, onchain_amount_sat=onchain_amount_sat, lightning_amount_sat=lightning_amount_sat, payment_hash=payment_hash, our_privkey=our_privkey, prepay=True, ) self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback) return swap, invoice, prepay_invoice def add_normal_swap( self, *, redeem_script: bytes, locktime: int, # onchain, abs onchain_amount_sat: int, lightning_amount_sat: int, payment_hash: bytes, our_privkey: bytes, prepay: bool, channels: Optional[Sequence['Channel']] = None, min_final_cltv_expiry_delta: Optional[int] = None, ) -> Tuple[SwapData, str, Optional[str]]: """creates a hold invoice""" if payment_hash.hex() in self._swaps: raise Exception("payment_hash already in use") if prepay: # server requests 2 * the mining fee as instantly settled prepayment so that the mining # fees of the funding tx and potential timeout refund tx are always covered prepay_amount_sat = self.mining_fee * 2 invoice_amount_sat = lightning_amount_sat - prepay_amount_sat else: invoice_amount_sat = lightning_amount_sat # add payment info to lnworker self.lnworker.add_payment_info_for_hold_invoice( payment_hash, lightning_amount_sat=invoice_amount_sat, min_final_cltv_delta=min_final_cltv_expiry_delta or MIN_FINAL_CLTV_DELTA_ACCEPTED, exp_delay=300, ) info = self.lnworker.get_payment_info(payment_hash) lnaddr1, invoice = self.lnworker.get_bolt11_invoice( payment_info=info, message='Submarine swap', fallback_address=None, channels=channels, ) margin_to_get_refund_tx_mined = MIN_LOCKTIME_DELTA if not (locktime + margin_to_get_refund_tx_mined < self.network.get_local_height() + lnaddr1.get_min_final_cltv_delta()): raise Exception( f"onchain locktime ({locktime}+{margin_to_get_refund_tx_mined}) " f"too close to LN-htlc-expiry ({self.network.get_local_height()+lnaddr1.get_min_final_cltv_delta()})") if prepay: prepay_hash = self.lnworker.create_payment_info( amount_msat=prepay_amount_sat*1000, min_final_cltv_delta=min_final_cltv_expiry_delta or MIN_FINAL_CLTV_DELTA_ACCEPTED, exp_delay=300, ) info = self.lnworker.get_payment_info(prepay_hash) lnaddr2, prepay_invoice = self.lnworker.get_bolt11_invoice( payment_info=info, message='Submarine swap prepayment', fallback_address=None, channels=channels, ) self.lnworker.bundle_payments([payment_hash, prepay_hash]) self._prepayments[prepay_hash] = payment_hash assert lnaddr1.get_min_final_cltv_delta() == lnaddr2.get_min_final_cltv_delta() else: prepay_invoice = None prepay_hash = None lockup_address = script_to_p2wsh(redeem_script) receive_address = self.wallet.get_receiving_address() swap = SwapData( redeem_script=redeem_script, locktime=locktime, privkey=our_privkey, preimage=None, prepay_hash=prepay_hash, lockup_address=lockup_address, onchain_amount=onchain_amount_sat, receive_address=receive_address, lightning_amount=lightning_amount_sat, is_reverse=False, is_redeemed=False, funding_txid=None, spending_txid=None, ) swap._payment_hash = payment_hash self._add_or_reindex_swap(swap, is_new=True) self.add_lnwatcher_callback(swap) return swap, invoice, prepay_invoice def create_reverse_swap(self, *, lightning_amount_sat: int, their_pubkey: bytes) -> SwapData: """ server method. """ assert lightning_amount_sat is not None locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND if self.network.blockchain().is_tip_stale(): raise Exception("our blockchain tip is stale") privkey = os.urandom(32) our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True) onchain_amount_sat = self._get_send_amount(lightning_amount_sat, is_reverse=False) if not onchain_amount_sat: raise Exception("no onchain amount") preimage = os.urandom(32) payment_hash = sha256(preimage) redeem_script = _construct_swap_scriptcode( payment_hash=payment_hash, locktime=locktime, refund_pubkey=their_pubkey, claim_pubkey=our_pubkey, ) swap = self.add_reverse_swap( redeem_script=redeem_script, locktime=locktime, privkey=privkey, preimage=preimage, payment_hash=payment_hash, prepay_hash=None, onchain_amount_sat=onchain_amount_sat, lightning_amount_sat=lightning_amount_sat) return swap def add_reverse_swap( self, *, redeem_script: bytes, locktime: int, # onchain privkey: bytes, lightning_amount_sat: int, onchain_amount_sat: int, preimage: bytes, payment_hash: bytes, prepay_hash: Optional[bytes] = None, ) -> SwapData: if payment_hash.hex() in self._swaps: raise Exception("payment_hash already in use") assert sha256(preimage) == payment_hash lockup_address = script_to_p2wsh(redeem_script) receive_address = self.wallet.get_receiving_address() swap = SwapData( redeem_script=redeem_script, locktime=locktime, privkey=privkey, preimage=preimage, prepay_hash=prepay_hash, lockup_address=lockup_address, onchain_amount=onchain_amount_sat, receive_address=receive_address, lightning_amount=lightning_amount_sat, is_reverse=True, is_redeemed=False, funding_txid=None, spending_txid=None, ) if prepay_hash: if prepay_hash in self._prepayments: raise Exception("prepay_hash already in use") self._prepayments[prepay_hash] = payment_hash swap._payment_hash = payment_hash self._add_or_reindex_swap(swap, is_new=True) self.add_lnwatcher_callback(swap) return swap def server_add_swap_invoice(self, request: dict) -> dict: """ server method. (client-forward-swap phase2) """ invoice = request['invoice'] invoice = Invoice.from_bech32(invoice) key = invoice.rhash payment_hash = bytes.fromhex(key) their_pubkey = bytes.fromhex(request['refundPublicKey']) with self.swaps_lock: assert key in self._swaps swap = self._swaps[key] assert swap.lightning_amount == int(invoice.get_amount_sat()) assert swap.is_reverse is True # check that we have the preimage assert sha256(swap.preimage) == payment_hash assert swap.spending_txid is None # check their_pubkey by recalculating redeem_script our_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True) redeem_script = _construct_swap_scriptcode( payment_hash=payment_hash, locktime=swap.locktime, refund_pubkey=their_pubkey, claim_pubkey=our_pubkey, ) assert swap.redeem_script == redeem_script assert key not in self.invoices_to_pay self.invoices_to_pay[key] = 0 assert self.wallet.get_invoice(invoice.get_id()) is None self.wallet.save_invoice(invoice) return {} async def normal_swap( self, *, transport: 'SwapServerTransport', lightning_amount_sat: int, expected_onchain_amount_sat: int, password, tx: PartialTransaction = None, channels = None, ) -> Optional[str]: """send on-chain BTC, receive on Lightning Old (removed) flow: - User generates an LN invoice with RHASH, and knows preimage. - User creates on-chain output locked to RHASH. - Server pays LN invoice. User reveals preimage. - Server spends the on-chain output using preimage. cltv safety requirement: (onchain_locktime > LN_locktime), otherwise server is vulnerable New flow: - User requests swap (RPC 'createnormalswap') - Server creates preimage, sends RHASH to user - User creates hold invoice, sends it to server (RPC 'addswapinvoice') - Server sends HTLC, user holds it - User creates on-chain output locked to RHASH - Server spends the on-chain output using preimage (revealing the preimage) - User fulfills HTLC using preimage cltv safety requirement: (onchain_locktime < LN_locktime), otherwise client is vulnerable """ assert self.network assert self.lnwatcher swap, invoice = await self.request_normal_swap( transport=transport, lightning_amount_sat=lightning_amount_sat, expected_onchain_amount_sat=expected_onchain_amount_sat, channels=channels, ) tx = self.create_funding_tx(swap, tx, password=password) return await self.wait_for_htlcs_and_broadcast(transport=transport, swap=swap, invoice=invoice, tx=tx) async def request_normal_swap( self, *, transport: 'SwapServerTransport', lightning_amount_sat: int, expected_onchain_amount_sat: int, channels: Optional[Sequence['Channel']] = None, ) -> Tuple[SwapData, str]: self._sanity_check_swap_costs( incoming_sat=lightning_amount_sat, outgoing_sat=expected_onchain_amount_sat) await self.is_initialized.wait() # add timeout refund_privkey = os.urandom(32) refund_pubkey = ECPrivkey(refund_privkey).get_public_key_bytes(compressed=True) self.logger.info('requesting preimage hash for swap') request_data = { "invoiceAmount": lightning_amount_sat, "refundPublicKey": refund_pubkey.hex() } data = await transport.send_request_to_server('createnormalswap', request_data) try: payment_hash = bytes.fromhex(data["preimageHash"]) assert len(payment_hash) == 32, len(payment_hash) onchain_amount = data["expectedAmount"] assert isinstance(onchain_amount, int), type(onchain_amount) locktime = data["timeoutBlockHeight"] assert isinstance(locktime, int), type(locktime) lockup_address = data["address"] assert isinstance(lockup_address, str), type(lockup_address) assert bitcoin.is_address(lockup_address), lockup_address redeem_script = bytes.fromhex(data["redeemScript"]) except Exception as e: self.logger.error(f"failed to parse response from swapserver for createnormalswap: {e!r}") raise SwapServerError("failed to parse response from swapserver for createnormalswap") from e del data # parsing done # verify redeem_script is built with our pubkey and preimage _check_swap_scriptcode( redeem_script=redeem_script, lockup_address=lockup_address, payment_hash=payment_hash, locktime=locktime, refund_pubkey=refund_pubkey, claim_pubkey=None, ) # check that onchain_amount is not more than what we estimated if onchain_amount > expected_onchain_amount_sat: raise Exception(f"fswap check failed: onchain_amount is more than what we estimated: " f"{onchain_amount} > {expected_onchain_amount_sat}") # verify that they are not locking up funds for too long if locktime - self.network.get_local_height() > MAX_LOCKTIME_DELTA: raise Exception("fswap check failed: locktime too far in future") if self.network.blockchain().is_tip_stale(): raise Exception("our blockchain tip is stale") swap, invoice, _ = self.add_normal_swap( redeem_script=redeem_script, locktime=locktime, lightning_amount_sat=lightning_amount_sat, onchain_amount_sat=onchain_amount, payment_hash=payment_hash, our_privkey=refund_privkey, prepay=False, channels=channels, # When the client is doing a normal swap, we create a ln-invoice with larger than usual final_cltv_delta. # If the user goes offline after broadcasting the funding tx (but before it is mined and # the server claims it), they need to come back online before the held ln-htlc expires (see #8940). # If the held ln-htlc expires, and the funding tx got confirmed, the server will have claimed the onchain # funds, and the ln-htlc will be timed out onchain (and channel force-closed). i.e. the user loses the swap # amount. Increasing the final_cltv_delta the user puts in the invoice extends this critical window. min_final_cltv_expiry_delta=MIN_FINAL_CLTV_DELTA_FOR_CLIENT, ) return swap, invoice async def wait_for_htlcs_and_broadcast( self, *, transport: 'SwapServerTransport', swap: SwapData, invoice: str, tx: Transaction, ) -> Optional[str]: await transport.is_connected.wait() payment_hash = swap.payment_hash refund_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True) async def callback(payment_hash): # FIXME what if this raises, e.g. TxBroadcastError? # We will never retry the hold-invoice-callback. await self.broadcast_funding_tx(swap, tx) self.lnworker.register_hold_invoice(payment_hash, callback) # send invoice to server and wait for htlcs # note: server will link this RPC to our previous 'createnormalswap' RPC # - using the RHASH from invoice, and using refundPublicKey # - FIXME it would be safer to use a proper session-secret?! request_data = { "invoice": invoice, "refundPublicKey": refund_pubkey.hex(), } await transport.send_request_to_server('addswapinvoice', request_data) # wait for funding tx lnaddr = lndecode(invoice) while swap.funding_txid is None and not lnaddr.is_expired(): await asyncio.sleep(0.1) return swap.funding_txid def create_funding_output(self, swap: SwapData) -> PartialTxOutput: return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount) def create_funding_tx( self, swap: SwapData, tx: Optional[PartialTransaction], *, password, ) -> PartialTransaction: # create funding tx # use fee policy set by user (not using txbatcher) fee_policy = FeePolicy(self.config.FEE_POLICY) # note: rbf must not decrease payment # this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output if tx is None: funding_output = self.create_funding_output(swap) tx = self.wallet.make_unsigned_transaction( outputs=[funding_output], rbf=True, fee_policy=fee_policy, ) else: tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address) tx.set_rbf(True) self.wallet.sign_transaction(tx, password) return tx @log_exceptions async def request_swap_for_amount( self, *, transport: 'SwapServerTransport', onchain_amount: int, ) -> Optional[Tuple[SwapData, str]]: await self.is_initialized.wait() lightning_amount_sat = self.get_recv_amount(onchain_amount, is_reverse=False) if lightning_amount_sat is None: raise SwapServerError(_("Swap amount outside of providers limits") + ":\n" + _("min") + f": {self.get_min_amount()}\n" + _("max") + f": {self.get_provider_max_reverse_amount()}") swap, invoice = await self.request_normal_swap( transport=transport, lightning_amount_sat=lightning_amount_sat, expected_onchain_amount_sat=onchain_amount) return swap, invoice @log_exceptions async def broadcast_funding_tx(self, swap: SwapData, tx: Transaction) -> None: swap.funding_txid = tx.txid() await self.network.broadcast_transaction(tx) async def reverse_swap( self, *, transport: 'SwapServerTransport', lightning_amount_sat: int, expected_onchain_amount_sat: int, prepayment_sat: int, channels: Optional[Sequence['Channel']] = None, ) -> Optional[str]: """send on Lightning, receive on-chain - User generates preimage, RHASH. Sends RHASH to server. (RPC 'createswap') - Server creates an LN invoice for RHASH. - User pays LN invoice - except server needs to hold the HTLC as preimage is unknown. - if the server requested a fee prepayment (using 'minerFeeInvoice'), the server will have the preimage for that. The user will send HTLCs for both the main RHASH, and for the fee prepayment. Once both MPP sets arrive at the server, the server will fulfill the HTLCs for the fee prepayment (before creating the on-chain output). - Server creates on-chain output locked to RHASH. - User spends on-chain output, revealing preimage. - Server fulfills HTLC using preimage. cltv safety requirement: (onchain_locktime < LN_locktime), otherwise server is vulnerable Note: expected_onchain_amount_sat is BEFORE deducting the on-chain claim tx fee. Note: prepayment_sat is passed as argument instead of accessing self.mining_fee to ensure the mining fees the user sees in the GUI are also the values used for the checks performed here. We commit to prepayment_sat as it limits the max fee pre-payment amt, which the server is trusted with. """ assert self.network assert self.lnwatcher self._sanity_check_swap_costs( incoming_sat=expected_onchain_amount_sat, outgoing_sat=lightning_amount_sat) privkey = os.urandom(32) our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True) preimage = os.urandom(32) payment_hash = sha256(preimage) request_data = { "type": "reversesubmarine", "pairId": "BTC/BTC", "invoiceAmount": lightning_amount_sat, "preimageHash": payment_hash.hex(), "claimPublicKey": our_pubkey.hex(), } self.logger.debug(f'rswap: sending request for {lightning_amount_sat}') data = await transport.send_request_to_server('createswap', request_data) try: invoice = data['invoice'] assert isinstance(invoice, str), type(invoice) fee_invoice = data.get('minerFeeInvoice') assert fee_invoice is None or isinstance(fee_invoice, str), type(fee_invoice) lockup_address = data['lockupAddress'] assert isinstance(lockup_address, str), type(lockup_address) assert bitcoin.is_address(lockup_address), lockup_address redeem_script = bytes.fromhex(data['redeemScript']) locktime = data['timeoutBlockHeight'] assert isinstance(locktime, int), type(locktime) onchain_amount = data["onchainAmount"] assert isinstance(onchain_amount, int), type(onchain_amount) response_id = data['id'] except Exception as e: self.logger.error(f"failed to parse response from swapserver for createswap: {e!r}") raise SwapServerError("failed to parse response from swapserver for createswap") from e del data # parsing done self.logger.debug(f'rswap: {response_id=}') # verify redeem_script is built with our pubkey and preimage _check_swap_scriptcode( redeem_script=redeem_script, lockup_address=lockup_address, payment_hash=payment_hash, locktime=locktime, refund_pubkey=None, claim_pubkey=our_pubkey, ) # check that the onchain amount is what we expected if onchain_amount < expected_onchain_amount_sat: raise Exception(f"rswap check failed: onchain_amount is less than what we expected: " f"{onchain_amount} < {expected_onchain_amount_sat}") # verify that we will have enough time to get our tx confirmed if locktime - self.network.get_local_height() <= MIN_LOCKTIME_DELTA: raise Exception("rswap check failed: locktime too close") if self.network.blockchain().is_tip_stale(): raise Exception("our blockchain tip is stale") # verify invoice payment_hash lnaddr = self.lnworker._check_bolt11_invoice(invoice) invoice_amount = int(lnaddr.get_amount_sat()) if lnaddr.paymenthash != payment_hash: raise Exception("rswap check failed: inconsistent RHASH and invoice") if fee_invoice: fee_lnaddr = self.lnworker._check_bolt11_invoice(fee_invoice) if fee_lnaddr.get_amount_sat() > prepayment_sat: raise SwapServerError(_("Mining fee requested by swap-server larger " "than what was announced in their offer.")) invoice_amount += fee_lnaddr.get_amount_sat() prepay_hash = fee_lnaddr.paymenthash else: prepay_hash = None # check that the lightning amount is what we requested if int(invoice_amount) != lightning_amount_sat: raise Exception(f"rswap check failed: invoice_amount ({invoice_amount}) " f"not what we requested ({lightning_amount_sat})") # save swap data to wallet file swap = self.add_reverse_swap( redeem_script=redeem_script, locktime=locktime, privkey=privkey, preimage=preimage, payment_hash=payment_hash, prepay_hash=prepay_hash, onchain_amount_sat=onchain_amount, lightning_amount_sat=lightning_amount_sat) # initiate fee payment. if fee_invoice: fee_invoice_obj = Invoice.from_bech32(fee_invoice) asyncio.ensure_future(self.lnworker.pay_invoice(fee_invoice_obj)) # we return if we detect funding async def wait_for_funding(swap): while swap.funding_txid is None: await asyncio.sleep(1) # initiate main payment invoice_obj = Invoice.from_bech32(invoice) tasks = [asyncio.create_task(self.lnworker.pay_invoice(invoice_obj, channels=channels)), asyncio.create_task(wait_for_funding(swap))] await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) return swap.funding_txid def _add_or_reindex_swap(self, swap: SwapData, *, is_new: bool) -> None: with self.swaps_lock: assert is_new == (swap.payment_hash.hex() not in self._swaps), is_new if swap.payment_hash.hex() not in self._swaps: self._swaps[swap.payment_hash.hex()] = swap if swap._funding_prevout: self._swaps_by_funding_outpoint[swap._funding_prevout] = swap self._swaps_by_lockup_address[swap.lockup_address] = swap def server_update_pairs(self) -> None: """ for server """ self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000 # type: ignore self._min_amount = MIN_SWAP_AMOUNT_SAT oc_balance_sat: int = self.wallet.get_spendable_balance_sat() max_forward: int = min(int(self.lnworker.num_sats_can_receive()), oc_balance_sat, 10000000) max_reverse: int = min(int(self.lnworker.num_sats_can_send()), 10000000) self._max_forward: int = self._keep_leading_digits(max_forward, 2) self._max_reverse: int = self._keep_leading_digits(max_reverse, 2) new_mining_fee = self.get_fee_for_txbatcher() if self.mining_fee is None \ or abs(self.mining_fee - new_mining_fee) / self.mining_fee > 0.1: self.mining_fee = new_mining_fee @staticmethod def _keep_leading_digits(num: int, digits: int) -> int: """Reduces precision of num to `digits` leading digits.""" if num <= 0: return 0 num_str = str(num) zeroed_num_str = f"{num_str[:digits]}{(len(num_str[digits:])) * '0'}" return int(zeroed_num_str) def update_pairs(self, pairs: SwapFees): self.logger.info(f'updating fees {pairs}') self.mining_fee = pairs.mining_fee self.percentage = pairs.percentage self._min_amount = pairs.min_amount self._max_forward = pairs.max_forward self._max_reverse = pairs.max_reverse self.trigger_pairs_updated_threadsafe() def trigger_pairs_updated_threadsafe(self): def trigger(): self.is_initialized.set() self.pairs_updated.set() self.pairs_updated.clear() run_sync_function_on_asyncio_thread(trigger, block=True) def get_provider_max_forward_amount(self) -> int: """in sat""" return self._max_forward def get_provider_max_reverse_amount(self) -> int: """in sat""" return self._max_reverse def get_min_amount(self) -> int: """in satoshis""" return self._min_amount def check_invoice_amount(self, x, is_reverse: bool) -> bool: if is_reverse: max_amount = self.get_provider_max_forward_amount() else: max_amount = self.get_provider_max_reverse_amount() return self.get_min_amount() <= x <= max_amount def _get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]: """For a given swap direction and amount we send, returns how much we will receive. Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for. In the reverse direction, the result matches what the swap server returns as response["onchainAmount"]. """ if send_amount is None: return None x = Decimal(send_amount) percentage = Decimal(self.percentage) if is_reverse: if not self.check_invoice_amount(x, is_reverse): return None # see/ref: # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L948 percentage_fee = math.ceil(percentage * x / 100) base_fee = self.mining_fee x -= percentage_fee + base_fee x = math.floor(x) if x < dust_threshold(): return None else: x -= self.mining_fee percentage_fee = math.ceil(x * percentage / (100 + percentage)) x -= percentage_fee if not self.check_invoice_amount(x, is_reverse): return None x = int(x) return x def _get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]: """For a given swap direction and amount we want to receive, returns how much we will need to send. Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for. In the forward direction, the result matches what the swap server returns as response["expectedAmount"]. """ if not recv_amount: return None x = Decimal(recv_amount) percentage = Decimal(self.percentage) if is_reverse: # see/ref: # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L928 # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L958 base_fee = self.mining_fee x += base_fee x = math.ceil(x / ((100 - percentage) / 100)) if not self.check_invoice_amount(x, is_reverse): return None else: if not self.check_invoice_amount(x, is_reverse): return None # see/ref: # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L708 # https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/rates/FeeProvider.ts#L90 percentage_fee = math.ceil(percentage * x / 100) x += percentage_fee + self.mining_fee x = int(x) return x def get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]: # first, add percentage fee recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse) # sanity check calculation can be inverted if recv_amount is not None: inverted_send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse) # accept off-by ones as amt_rcv = recv_amt(send_amt(amt_rcv)) only up to +-1 if abs(send_amount - inverted_send_amount) > 1: raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. " f"send_amount={send_amount} -> recv_amount={recv_amount} -> inverted_send_amount={inverted_send_amount}") # second, add on-chain claim tx fee if is_reverse and recv_amount is not None: recv_amount -= self.get_fee_for_txbatcher() return recv_amount def get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]: # first, add on-chain claim tx fee if is_reverse and recv_amount is not None: recv_amount += self.get_fee_for_txbatcher() # second, add percentage fee send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse) # sanity check calculation can be inverted if send_amount is not None: inverted_recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse) if recv_amount != inverted_recv_amount: raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. " f"recv_amount={recv_amount} -> send_amount={send_amount} -> inverted_recv_amount={inverted_recv_amount}") return send_amount def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable[SwapData]: swaps = [] for txout_idx, _txo in enumerate(tx.outputs()): prevout = TxOutpoint(txid=bytes.fromhex(tx.txid()), out_idx=txout_idx) if swap := self._swaps_by_funding_outpoint.get(prevout): swaps.append(swap) return swaps def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[Tuple[int, SwapData]]: swaps = [] for i, txin in enumerate(tx.inputs()): if swap := self.get_swap_by_claim_txin(txin): swaps.append((i, swap)) return swaps def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]: return self._swaps_by_funding_outpoint.get(txin.prevout) def is_lockup_address_for_a_swap(self, addr: str) -> bool: return bool(self._swaps_by_lockup_address.get(addr)) @classmethod def add_txin_info(cls, swap, txin: PartialTxInput) -> None: """Add some info to a claim txin. note: even without signing, this is useful for tx size estimation. """ preimage = swap.preimage if swap.is_reverse else 0 witness_script = swap.redeem_script txin.script_sig = b'' txin.witness_script = witness_script sig_dummy = b'\x00' * 71 # DER-encoded ECDSA sig, with low S and low R witness = [sig_dummy, preimage, witness_script] txin.witness_sizehint = len(construct_witness(witness)) txin.nsequence = 1 if swap.is_reverse else 0xffffffff - 2 @classmethod def create_claim_txin( cls, *, txin: PartialTxInput, swap: SwapData, ) -> Tuple[PartialTxInput, Optional[int]]: if swap.is_reverse: # successful reverse swap locktime = None # preimage will be set in sign_tx else: # timing out forward swap locktime = swap.locktime cls.add_txin_info(swap, txin) txin.privkey = swap.privkey def make_witness(sig): # preimae not known yet preimage = swap.preimage if swap.is_reverse else 0 witness_script = swap.redeem_script return construct_witness([sig, preimage, witness_script]) txin.make_witness = make_witness return txin, locktime def client_max_amount_forward_swap(self) -> Optional[int]: """ returns None if we cannot swap """ max_swap_amt_ln = self.get_provider_max_reverse_amount() if max_swap_amt_ln is None: return None max_recv_amt_ln = int(self.lnworker.num_sats_can_receive()) max_amt_ln = int(min(max_swap_amt_ln, max_recv_amt_ln)) max_amt_oc = self.get_send_amount(max_amt_ln, is_reverse=False) or 0 min_amt_oc = self.get_send_amount(self.get_min_amount(), is_reverse=False) or 0 return max_amt_oc if max_amt_oc >= min_amt_oc else None def client_max_amount_reverse_swap(self) -> Optional[int]: """Returns None if swap is not possible""" provider_max = self.get_provider_max_forward_amount() max_ln_send = int(self.lnworker.num_sats_can_send()) max_swap_size = min(max_ln_send, provider_max) if max_swap_size < self.get_min_amount(): return None return max_swap_size def server_create_normal_swap(self, request): # normal for client, reverse for server #request = await r.json() lightning_amount_sat = request['invoiceAmount'] their_pubkey = bytes.fromhex(request['refundPublicKey']) assert len(their_pubkey) == 33 swap = self.create_reverse_swap( lightning_amount_sat=lightning_amount_sat, their_pubkey=their_pubkey, ) response = { "id": swap.payment_hash.hex(), 'preimageHash': swap.payment_hash.hex(), "acceptZeroConf": False, "expectedAmount": swap.onchain_amount, "timeoutBlockHeight": swap.locktime, "address": swap.lockup_address, "redeemScript": swap.redeem_script.hex(), } return response def server_create_swap(self, request): # reverse for client, forward for server # requesting a normal swap (old protocol) will raise an exception #request = await r.json() req_type = request['type'] assert request['pairId'] == 'BTC/BTC' if req_type == 'reversesubmarine': lightning_amount_sat=request['invoiceAmount'] payment_hash=bytes.fromhex(request['preimageHash']) their_pubkey=bytes.fromhex(request['claimPublicKey']) assert len(payment_hash) == 32 assert len(their_pubkey) == 33 swap, invoice, prepay_invoice = self.create_normal_swap( lightning_amount_sat=lightning_amount_sat, payment_hash=payment_hash, their_pubkey=their_pubkey ) response = { 'id': payment_hash.hex(), 'invoice': invoice, 'minerFeeInvoice': prepay_invoice, 'lockupAddress': swap.lockup_address, 'redeemScript': swap.redeem_script.hex(), 'timeoutBlockHeight': swap.locktime, "onchainAmount": swap.onchain_amount, } elif req_type == 'submarine': raise Exception('Deprecated API. Please upgrade your version of Electrum') else: raise Exception('unsupported request type:' + req_type) return response def get_groups_for_onchain_history(self): current_height = self.wallet.adb.get_local_height() d = {} # add info about submarine swaps settled_payments = self.lnworker.get_payments(status='settled') with self.swaps_lock: swaps_items = list(self._swaps.items()) for payment_hash_hex, swap in swaps_items: txid = swap.spending_txid if swap.is_reverse else swap.funding_txid if txid is None: continue payment_hash = bytes.fromhex(payment_hash_hex) if payment_hash in settled_payments: plist = settled_payments[payment_hash] info = self.lnworker.get_payment_info(payment_hash) direction, amount_msat, fee_msat, timestamp = self.lnworker.get_payment_value(info, plist) else: amount_msat = 0 if swap.is_reverse: group_label = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount) else: group_label = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount) label = _('Claim transaction') if swap.is_reverse else _('Funding transaction') delta = current_height - swap.locktime if self.wallet.adb.is_mine(swap.lockup_address): tx_height = self.wallet.adb.get_tx_height(swap.funding_txid) if swap.is_reverse and tx_height.height() <= 0: label += ' (%s)' % _('waiting for funding tx confirmation') if not swap.is_reverse and not swap.is_redeemed and swap.spending_txid is None and delta < 0: label += f' (refundable in {-delta} blocks)' # fixme: only if unspent d[txid] = { 'group_id': txid, 'label': label, 'group_label': group_label, } if not swap.is_reverse: claim_tx = self.lnwatcher.adb.get_transaction(swap.spending_txid) if claim_tx and not self.extract_preimage(swap, claim_tx): # if the spending_tx is in the wallet, this will add it # to the group (see wallet.get_full_history) d[swap.spending_txid] = { 'group_id': txid, 'group_label': group_label, 'label': _('Refund transaction'), } self.wallet._accounting_addresses.add(swap.lockup_address) return d def get_group_id_for_payment_hash(self, payment_hash: bytes) -> Optional[str]: # add group_id to swap transactions swap = self.get_swap(payment_hash) if swap: return swap.spending_txid if swap.is_reverse else swap.funding_txid return None def get_pending_swaps(self) -> List[SwapData]: """Returns a list of swaps with unconfirmed funding tx (which require us to stay online).""" pending_swaps: List[SwapData] = [] with self.swaps_lock: swaps = list(self._swaps.values()) for swap in swaps: if swap.is_redeemed: # adb data might have been removed after is_redeemed was set. # in that case lnwatcher will no longer fetch the spending tx # and adb will return TX_HEIGHT_LOCAL continue # note: adb.get_tx_height returns TX_HEIGHT_LOCAL if the txid is unknown funding_height = self.lnworker.wallet.adb.get_tx_height(swap.funding_txid).height() spending_height = self.lnworker.wallet.adb.get_tx_height(swap.spending_txid).height() if funding_height > TX_HEIGHT_LOCAL and spending_height <= TX_HEIGHT_LOCAL: pending_swaps.append(swap) return pending_swaps class SwapServerTransport(Logger): def __init__(self, *, config: 'SimpleConfig', sm: 'SwapManager'): Logger.__init__(self) self.sm = sm self.network = sm.network self.config = config self.is_connected = asyncio.Event() self.connect_timeout = 10 if self.uses_proxy else 5 def __enter__(self): pass def __exit__(self, ex_type, ex, tb): pass async def __aenter__(self): pass async def __aexit__(self, exc_type, exc_val, exc_tb): pass async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict: """Might raise SwapServerError.""" pass @property def uses_proxy(self): return self.network.proxy and self.network.proxy.enabled class HttpTransport(SwapServerTransport): def __init__(self, config, sm): SwapServerTransport.__init__(self, config=config, sm=sm) self.api_url = config.SWAPSERVER_URL self.is_connected.set() def __enter__(self): asyncio.run_coroutine_threadsafe(self.get_pairs_just_once(), self.network.asyncio_loop) return self def __exit__(self, ex_type, ex, tb): pass async def __aenter__(self): asyncio.create_task(self.get_pairs_just_once()) return self async def __aexit__(self, exc_type, exc_val, exc_tb): pass async def send_request_to_server(self, method, request_data): try: response = await self.network.async_send_http_on_proxy( 'post' if request_data else 'get', self.api_url + '/' + method, json=request_data, timeout=30) except aiohttp.ClientError as e: self.logger.info(f"Swap server errored: {e!r}") raise SwapServerError() from e try: parsed_json = json.loads(response) if not isinstance(parsed_json, dict): raise Exception("malformed response, not dict") except Exception as e: self.logger.error(f"failed to parse response from swapserver for {method=}: {e!r}") raise SwapServerError(f"failed to parse response from swapserver for {method=}") from e return parsed_json async def get_pairs_just_once(self) -> None: """Might raise SwapServerError.""" response = await self.send_request_to_server('getpairs', None) try: assert response.get('htlcFirst') is True fees = response['pairs']['BTC/BTC']['fees'] limits = response['pairs']['BTC/BTC']['limits'] pairs = SwapFees( percentage=fees['percentage'], mining_fee=fees['minerFees']['baseAsset']['mining_fee'], min_amount=limits['minimal'], max_forward=limits['max_forward_amount'], max_reverse=limits['max_reverse_amount'], ) except Exception as e: self.logger.error(f"failed to parse response from swapserver for getpairs: {e!r}") raise SwapServerError("failed to parse response from swapserver for getpairs") from e self.sm.update_pairs(pairs) class NostrTransport(SwapServerTransport): # uses nostr: # - to advertise servers # - for client-server RPCs (using DMs) # (todo: we should use onion messages for that) EPHEMERAL_REQUEST = 25582 USER_STATUS_NIP38 = 30315 NOSTR_EVENT_VERSION = 5 OFFER_UPDATE_INTERVAL_SEC = 60 * 10 LIQUIDITY_UPDATE_INTERVAL_SEC = 30 def __init__(self, config, sm, keypair: Keypair): SwapServerTransport.__init__(self, config=config, sm=sm) self._offers = {} # type: Dict[str, SwapOffer] self.private_key = keypair.privkey self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex()) self.nostr_pubkey = keypair.pubkey.hex()[2:] self.dm_replies = {} # type: Dict[tuple[str, str], asyncio.Future[dict]] self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path) self.relay_manager = None # type: Optional[aionostr.Manager] self.taskgroup = OldTaskGroup() self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]] self._swap_server_requests = asyncio.Queue(maxsize=5) # type: asyncio.Queue[dict] def __enter__(self): asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) return self def __exit__(self, ex_type, ex, tb): fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop) fut.result(timeout=5) async def __aenter__(self): asyncio.create_task(self.main_loop()) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await wait_for2(self.stop(), timeout=5) @log_exceptions async def main_loop(self): self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}') self.logger.info(f'nostr relays: {self.relays}') self.relay_manager = self.get_relay_manager() await self.relay_manager.connect() connected_relays = self.relay_manager.relays self.logger.info(f'connected relays: {[relay.url for relay in connected_relays]}') if connected_relays: self.is_connected.set() if self.sm.is_server: tasks = [ self.check_direct_messages(), self._handle_requests(), ] else: tasks = [ self.check_direct_messages(), self._get_pairs_loop(), self.update_relays() ] try: async with self.taskgroup as group: for task in tasks: await group.spawn(task) except Exception as e: self.logger.exception("taskgroup died.") finally: self.logger.info("taskgroup stopped.") @log_exceptions async def stop(self): self.logger.info("shutting down nostr transport") self.sm.is_initialized.clear() await self.taskgroup.cancel_remaining() await self.relay_manager.close() self.logger.info("nostr transport shut down") @property def relays(self): our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else [] if self.sm.is_server: return our_relays last_swapserver_relays = self._last_swapserver_relays or [] return list(set(our_relays + last_swapserver_relays)) def get_relay_manager(self) -> aionostr.Manager: assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!" if not self.relay_manager: if self.uses_proxy: proxy = make_aiohttp_proxy_connector(self.network.proxy, self.ssl_context) else: proxy: Optional['ProxyConnector'] = None nostr_logger = self.logger.getChild('aionostr') nostr_logger.setLevel('INFO') # DEBUG is very verbose with aionostr return aionostr.Manager( self.relays, private_key=self.nostr_private_key, log=nostr_logger, ssl_context=self.ssl_context, proxy=proxy, connect_timeout=self.connect_timeout ) return self.relay_manager def get_offer(self, pubkey: str) -> Optional[SwapOffer]: return self._offers.get(pubkey) def get_recent_offers(self) -> Sequence[SwapOffer]: # filter to fresh timestamps now = int(time.time()) recent_offers = [x for x in self._offers.values() if now - x.timestamp < 3600] # sort by proof-of-work recent_offers = sorted(recent_offers, key=lambda x: x.pow_bits, reverse=True) # cap list size recent_offers = recent_offers[:20] return recent_offers @ignore_exceptions @log_exceptions async def publish_offer(self, sm: 'SwapManager') -> None: assert self.sm.is_server if sm._max_forward < sm._min_amount and sm._max_reverse < sm._min_amount: self.logger.warning(f"not publishing swap offer, no liquidity available: {sm._max_forward=}, {sm._max_reverse=}") return offer = { 'percentage_fee': sm.percentage, 'mining_fee': sm.mining_fee, 'min_amount': sm._min_amount, 'max_forward_amount': sm._max_forward, 'max_reverse_amount': sm._max_reverse, 'relays': sm.config.NOSTR_RELAYS, 'pow_nonce': hex(sm.config.SWAPSERVER_ANN_POW_NONCE), } # the first value of a single letter tag is indexed and can be filtered for tags = [['d', f'electrum-swapserver-{self.NOSTR_EVENT_VERSION}'], ['r', 'net:' + constants.net.NET_NAME], ['expiration', str(int(time.time() + self.OFFER_UPDATE_INTERVAL_SEC + 10))]] try: event_id = await aionostr._add_event( self.relay_manager, kind=self.USER_STATUS_NIP38, tags=tags, content=json.dumps(offer), private_key=self.nostr_private_key) self.logger.info(f"published offer {event_id}") except asyncio.TimeoutError as e: self.logger.warning(f"failed to publish swap offer: {str(e)}") @ignore_exceptions @log_exceptions async def send_direct_message(self, pubkey: str, content: str, *, retries: int = 0) -> Optional[str]: assert retries < 25, "Use a sane retry amount" our_private_key = aionostr.key.PrivateKey(self.private_key) recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex) try: event_id = await aionostr._add_event( self.relay_manager, kind=self.EPHEMERAL_REQUEST, content=encrypted_msg, private_key=self.nostr_private_key, tags=[['p', recv_pubkey_hex]], ) except asyncio.TimeoutError: self.logger.warning(f"sending message to {pubkey} failed: timeout. {retries=}") if retries > 0: return await self.send_direct_message(pubkey, content, retries=retries-1) return None return event_id @log_exceptions async def send_request_to_server(self, method: str, request_data: dict) -> dict: self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}") request_data['method'] = method server_npub = self.config.SWAPSERVER_NPUB server_pubkey = aionostr.util.from_nip19(server_npub)['object'].hex() event_id = await self.send_direct_message(server_pubkey, json.dumps(request_data), retries=1) if not event_id: raise SwapServerError() self.dm_replies[(server_pubkey, event_id)] = fut = asyncio.Future() response = await fut assert isinstance(response, dict) if 'error' in response: self.logger.warning(f"error from swap server [DO NOT TRUST THIS MESSAGE]: {response['error']}") raise SwapServerError() return response async def _get_pairs_loop(self): await self.is_connected.wait() query = { "kinds": [self.USER_STATUS_NIP38], "limit": 10, "#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"], "#r": [f"net:{constants.net.NET_NAME}"], "since": int(time.time()) - 60 * 60, "until": int(time.time()) + 60 * 60, } async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False): try: content = json.loads(event.content) if not isinstance(content, dict): raise Exception("malformed content, not dict") tags = {k: v for k, v in event.tags} except Exception as e: self.logger.debug(f"failed to parse event: {e}") continue if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}": continue if tags.get('r') != f"net:{constants.net.NET_NAME}": continue if (event.created_at > int(time.time()) + 60 * 60 or event.created_at < int(time.time()) - 60 * 60): continue # check if this is the most recent event for this pubkey pubkey = event.pubkey prev_offer = self._offers.get(to_nip19('npub', pubkey)) if prev_offer and event.created_at <= prev_offer.timestamp: continue try: pow_nonce = int(content.get('pow_nonce', "0"), 16) # type: int except Exception: continue pow_bits = get_nostr_ann_pow_amount(bytes.fromhex(pubkey), pow_nonce) if pow_bits < self.config.SWAPSERVER_POW_TARGET: self.logger.debug(f"too low pow: {pubkey}: pow: {pow_bits} nonce: {pow_nonce}") continue try: server_relays = content['relays'].split(',') except Exception: continue try: pairs = SwapFees( percentage=content['percentage_fee'], mining_fee=content['mining_fee'], min_amount=content['min_amount'], max_forward=content['max_forward_amount'], max_reverse=content['max_reverse_amount'], ) except Exception: self.logger.debug(f"swap fees couldn't be parsed", exc_info=True) continue offer = SwapOffer( pairs=pairs, relays=server_relays[:10], timestamp=event.created_at, server_pubkey=pubkey, pow_bits=pow_bits, ) self._offers[offer.server_npub] = offer if self.config.SWAPSERVER_NPUB == offer.server_npub: self.sm.update_pairs(pairs) trigger_callback('swap_offers_changed', self.get_recent_offers()) # mirror event to other relays await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays)) async def update_relays(self): """ Update the relays when update_pairs is called. This ensures we try to connect to the same relays as the ones announced by the swap server. """ while True: previous_relays = self._last_swapserver_relays await self.sm.pairs_updated.wait() if (conf_swapserver_offer := self._offers.get(self.config.SWAPSERVER_NPUB)) is None: self.logger.debug( f"pairs updated but no pair for {self.config.SWAPSERVER_NPUB=} available? {self._offers=}", stack_info=True, ) continue latest_known_relays = conf_swapserver_offer.relays if latest_known_relays != previous_relays: self.logger.debug(f"swapserver relays changed, updating relay list.") # store the latest known relays to a file self._store_last_swapserver_relays(latest_known_relays) # update the relay manager await self.relay_manager.update_relays(self.relays) async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]): """If the relays of the origin server are different from our relays we rebroadcast the event to our relays so it gets spread more widely.""" if not server_relays: return rebroadcast_relays = [relay for relay in self.relay_manager.relays if relay.url not in server_relays] for relay in rebroadcast_relays: try: res = await relay.add_event(event, check_response=True) except Exception as e: self.logger.debug(f"failed to rebroadcast event to {relay.url}: {e}") continue self.logger.debug(f"rebroadcasted event to {relay.url}: {res}") @log_exceptions async def check_direct_messages(self): privkey = aionostr.key.PrivateKey(self.private_key) query = {"kinds": [self.EPHEMERAL_REQUEST], "limit":0, "#p": [self.nostr_pubkey]} async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False): try: content = privkey.decrypt_message(event.content, event.pubkey) content = json.loads(content) if not isinstance(content, dict): raise Exception("malformed content, not dict") except Exception: continue content['event_id'] = event.id content['event_pubkey'] = event.pubkey if not self.sm.is_server and 'reply_to' in content: prev_event_id = content['reply_to'] server_pubkey = event.pubkey fut = self.dm_replies.get((server_pubkey, prev_event_id)) if fut: fut.set_result(content) elif self.sm.is_server and 'method' in content: if self._swap_server_requests.full(): self.logger.warning(f"too many swap requests, dropping incoming request: {event.id[:10]}...") continue await self._swap_server_requests.put(content) else: self.logger.info(f'unknown message {content}') @log_exceptions async def _handle_requests(self) -> None: assert self.sm.is_server while True: await asyncio.sleep(5) request = await self._swap_server_requests.get() event_id = request.pop('event_id') event_pubkey = request.pop('event_pubkey') try: method = request.pop('method') self.logger.info(f'handle_request: id={event_id} {method} {request}') if method == 'addswapinvoice': # client-forward-swap phase2 r = self.sm.server_add_swap_invoice(request) elif method == 'createswap': # client-reverse-swap r = self.sm.server_create_swap(request) elif method == 'createnormalswap': # client-forward-swap phase1 r = self.sm.server_create_normal_swap(request) else: raise Exception(method) r['reply_to'] = event_id self.logger.debug(f'sending response id={event_id}') await self.taskgroup.spawn(self.send_direct_message(event_pubkey, json.dumps(r), retries=2)) except Exception as e: self.logger.exception(f"failed to handle {request=}") error_response = json.dumps({ "error": f"Internal Server Error: {str(type(e))}", "reply_to": event_id, }) await self.taskgroup.spawn(self.send_direct_message(event_pubkey, error_response)) def _store_last_swapserver_relays(self, relays: Sequence[str]): self._last_swapserver_relays = relays if not self.config.path or not relays: return storage_path = os.path.join(self.config.path, 'recent_swapserver_relays') try: with open(storage_path, 'w', encoding="utf-8") as f: json.dump(relays, f, indent=4, sort_keys=True) # type: ignore except Exception: self.logger.exception(f"failed to write last swapserver relays to {storage_path}") def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]: storage_path = os.path.join(self.config.path, 'recent_swapserver_relays') if not os.path.exists(storage_path): return None try: with open(storage_path, 'r', encoding="utf-8") as f: relays = json.load(f) except Exception: self.logger.exception(f"failed to read last swapserver relays from {storage_path}") return None return relays