If `_fail_swap()` gets called multiple times (e.g. from callbacks) this would race a `KeyError` as the swap got already popped from `self._swaps`. In theory `_fail_swap` unregisters itself from the lnwatcher callback but the callback may is scheduled multiple times before it has the chance to unregister itself.
1737 lines
74 KiB
Python
1737 lines
74 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
import ssl
|
|
import threading
|
|
from typing import TYPE_CHECKING, Optional, Dict, Sequence, Tuple, Iterable, List
|
|
from decimal import Decimal
|
|
import math
|
|
import time
|
|
|
|
import attr
|
|
import aiohttp
|
|
|
|
from electrum_ecc import ECPrivkey
|
|
|
|
import electrum_aionostr as aionostr
|
|
import electrum_aionostr.key
|
|
from electrum_aionostr.event import Event
|
|
from electrum_aionostr.util import to_nip19
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
from .i18n import _
|
|
from .logging import Logger
|
|
from .crypto import sha256, ripemd
|
|
from .bitcoin import script_to_p2wsh, opcodes, dust_threshold, DummyAddress, construct_witness, construct_script
|
|
from .transaction import (
|
|
PartialTxInput, PartialTxOutput, PartialTransaction, Transaction, TxInput, TxOutpoint, script_GetOp,
|
|
match_script_against_template, OPPushDataGeneric, OPPushDataPubkey
|
|
)
|
|
from .util import (
|
|
log_exceptions, ignore_exceptions, BelowDustLimit, OldTaskGroup, ca_path, gen_nostr_ann_pow,
|
|
get_nostr_ann_pow_amount, make_aiohttp_proxy_connector, get_running_loop, get_asyncio_loop, wait_for2,
|
|
run_sync_function_on_asyncio_thread
|
|
)
|
|
from . import lnutil
|
|
from .lnutil import hex_to_bytes, REDEEM_AFTER_DOUBLE_SPENT_DELAY, Keypair
|
|
from .lnaddr import lndecode
|
|
from .json_db import StoredObject, stored_in
|
|
from . import constants
|
|
from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE
|
|
from .fee_policy import FeePolicy
|
|
from .invoices import Invoice
|
|
from .lnonion import OnionRoutingFailure, OnionFailureCode
|
|
from .lnsweep import SweepInfo
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from .network import Network
|
|
from .wallet import Abstract_Wallet
|
|
from .lnwatcher import LNWatcher
|
|
from .lnworker import LNWallet
|
|
from .lnchannel import Channel
|
|
from .simple_config import SimpleConfig
|
|
from aiohttp_socks import ProxyConnector
|
|
|
|
|
|
SWAP_TX_SIZE = 150 # default tx size, used for mining fee estimation
|
|
|
|
MIN_LOCKTIME_DELTA = 60
|
|
LOCKTIME_DELTA_REFUND = 70
|
|
MAX_LOCKTIME_DELTA = 100
|
|
MIN_FINAL_CLTV_DELTA_FOR_CLIENT = 3 * 144 # note: put in invoice, but is not enforced by receiver in lnpeer.py
|
|
assert MIN_LOCKTIME_DELTA <= LOCKTIME_DELTA_REFUND <= MAX_LOCKTIME_DELTA
|
|
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED
|
|
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_FOR_INVOICE
|
|
assert MAX_LOCKTIME_DELTA < MIN_FINAL_CLTV_DELTA_FOR_CLIENT
|
|
|
|
|
|
# The script of the reverse swaps has one extra check in it to verify
|
|
# that the length of the preimage is 32. This is required because in
|
|
# the reverse swaps the preimage is generated by the user and to
|
|
# settle the hold invoice, you need a preimage with 32 bytes . If that
|
|
# check wasn't there the user could generate a preimage with a
|
|
# different length which would still allow for claiming the onchain
|
|
# coins but the invoice couldn't be settled
|
|
|
|
WITNESS_TEMPLATE_REVERSE_SWAP = [
|
|
opcodes.OP_SIZE,
|
|
OPPushDataGeneric(None),
|
|
opcodes.OP_EQUAL,
|
|
opcodes.OP_IF,
|
|
opcodes.OP_HASH160,
|
|
OPPushDataGeneric(lambda x: x == 20),
|
|
opcodes.OP_EQUALVERIFY,
|
|
OPPushDataPubkey,
|
|
opcodes.OP_ELSE,
|
|
opcodes.OP_DROP,
|
|
OPPushDataGeneric(None),
|
|
opcodes.OP_CHECKLOCKTIMEVERIFY,
|
|
opcodes.OP_DROP,
|
|
OPPushDataPubkey,
|
|
opcodes.OP_ENDIF,
|
|
opcodes.OP_CHECKSIG
|
|
]
|
|
|
|
|
|
def check_reverse_redeem_script(
|
|
*,
|
|
redeem_script: bytes,
|
|
lockup_address: str,
|
|
payment_hash: bytes,
|
|
locktime: int,
|
|
refund_pubkey: bytes = None,
|
|
claim_pubkey: bytes = None,
|
|
) -> None:
|
|
parsed_script = [x for x in script_GetOp(redeem_script)]
|
|
if not match_script_against_template(redeem_script, WITNESS_TEMPLATE_REVERSE_SWAP):
|
|
raise Exception("rswap check failed: scriptcode does not match template")
|
|
if script_to_p2wsh(redeem_script) != lockup_address:
|
|
raise Exception("rswap check failed: inconsistent scriptcode and address")
|
|
if ripemd(payment_hash) != parsed_script[5][1]:
|
|
raise Exception("rswap check failed: our preimage not in script")
|
|
if claim_pubkey and claim_pubkey != parsed_script[7][1]:
|
|
raise Exception("rswap check failed: our pubkey not in script")
|
|
if refund_pubkey and refund_pubkey != parsed_script[13][1]:
|
|
raise Exception("rswap check failed: our pubkey not in script")
|
|
if locktime != int.from_bytes(parsed_script[10][1], byteorder='little'):
|
|
raise Exception("rswap check failed: inconsistent locktime and script")
|
|
|
|
|
|
class SwapServerError(Exception):
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
super().__init__(message)
|
|
|
|
def __str__(self):
|
|
if self.message:
|
|
return self.message
|
|
return _("The swap server errored or is unreachable.")
|
|
|
|
|
|
def now():
|
|
return int(time.time())
|
|
|
|
|
|
@attr.s(frozen=True)
|
|
class SwapFees:
|
|
percentage = attr.ib(type=int)
|
|
mining_fee = attr.ib(type=int)
|
|
min_amount = attr.ib(type=int)
|
|
max_forward = attr.ib(type=int)
|
|
max_reverse = attr.ib(type=int)
|
|
|
|
|
|
@attr.frozen
|
|
class SwapOffer:
|
|
pairs = attr.ib(type=SwapFees)
|
|
relays = attr.ib(type=list[str])
|
|
pow_bits = attr.ib(type=int)
|
|
server_pubkey = attr.ib(type=str)
|
|
timestamp = attr.ib(type=int)
|
|
|
|
@property
|
|
def server_npub(self):
|
|
return to_nip19('npub', self.server_pubkey)
|
|
|
|
|
|
@stored_in('submarine_swaps')
|
|
@attr.s
|
|
class SwapData(StoredObject):
|
|
is_reverse = attr.ib(type=bool) # for whoever is running code (PoV of client or server)
|
|
locktime = attr.ib(type=int)
|
|
onchain_amount = attr.ib(type=int) # in sats
|
|
lightning_amount = attr.ib(type=int) # in sats
|
|
redeem_script = attr.ib(type=bytes, converter=hex_to_bytes)
|
|
preimage = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
|
|
prepay_hash = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
|
|
privkey = attr.ib(type=bytes, converter=hex_to_bytes)
|
|
lockup_address = attr.ib(type=str)
|
|
receive_address = attr.ib(type=str)
|
|
funding_txid = attr.ib(type=Optional[str])
|
|
spending_txid = attr.ib(type=Optional[str])
|
|
is_redeemed = attr.ib(type=bool)
|
|
|
|
_funding_prevout = None # type: Optional[TxOutpoint] # for RBF
|
|
_payment_hash = None
|
|
_payment_pending = False # for forward swaps
|
|
|
|
@property
|
|
def payment_hash(self) -> bytes:
|
|
return self._payment_hash
|
|
|
|
def is_funded(self) -> bool:
|
|
return self._payment_pending or bool(self.funding_txid)
|
|
|
|
|
|
class SwapManager(Logger):
|
|
|
|
network: Optional['Network'] = None
|
|
lnwatcher: Optional['LNWatcher'] = None
|
|
|
|
def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'):
|
|
Logger.__init__(self)
|
|
self.mining_fee = None
|
|
self.percentage = None
|
|
self._min_amount = None
|
|
self._max_forward = None
|
|
self._max_reverse = None
|
|
|
|
self.wallet = wallet
|
|
self.config = wallet.config
|
|
self.lnworker = lnworker
|
|
self.lnwatcher = self.lnworker.lnwatcher
|
|
self.config = wallet.config
|
|
self.taskgroup = OldTaskGroup()
|
|
self.dummy_address = DummyAddress.SWAP
|
|
|
|
# note: accessing swaps dicts (besides simple lookup) needs swaps_lock
|
|
self.swaps_lock = threading.Lock()
|
|
self._swaps = self.wallet.db.get_dict('submarine_swaps') # type: Dict[str, SwapData]
|
|
self._swaps_by_funding_outpoint = {} # type: Dict[TxOutpoint, SwapData]
|
|
self._swaps_by_lockup_address = {} # type: Dict[str, SwapData]
|
|
for payment_hash_hex, swap in self._swaps.items():
|
|
payment_hash = bytes.fromhex(payment_hash_hex)
|
|
swap._payment_hash = payment_hash
|
|
self._add_or_reindex_swap(swap)
|
|
if not swap.is_reverse and not swap.is_redeemed:
|
|
self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
|
|
|
|
self._prepayments = {} # type: Dict[bytes, bytes] # fee_rhash -> rhash
|
|
for k, swap in self._swaps.items():
|
|
if swap.prepay_hash is not None:
|
|
self._prepayments[swap.prepay_hash] = bytes.fromhex(k)
|
|
self.is_server = False # overridden by swapserver plugin if enabled
|
|
self.is_initialized = asyncio.Event()
|
|
self.pairs_updated = asyncio.Event()
|
|
|
|
def start_network(self, network: 'Network'):
|
|
assert network
|
|
if self.network is not None:
|
|
self.logger.info('start_network: already started')
|
|
return
|
|
self.logger.info('start_network: starting main loop')
|
|
self.network = network
|
|
with self.swaps_lock:
|
|
swaps_items = list(self._swaps.items())
|
|
for k, swap in swaps_items:
|
|
if swap.is_redeemed:
|
|
continue
|
|
self.add_lnwatcher_callback(swap)
|
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
|
|
|
@log_exceptions
|
|
async def run_nostr_server(self):
|
|
await self.set_nostr_proof_of_work()
|
|
with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport:
|
|
await transport.is_connected.wait()
|
|
self.logger.info(f'nostr is connected')
|
|
# will publish a new announcement if liquidity changed or every OFFER_UPDATE_INTERVAL_SEC
|
|
last_update = time.time()
|
|
while True:
|
|
await asyncio.sleep(transport.LIQUIDITY_UPDATE_INTERVAL_SEC)
|
|
|
|
previous_max_forward = self._max_forward
|
|
previous_max_reverse = self._max_reverse
|
|
previous_mining_fee = self.mining_fee
|
|
try:
|
|
self.server_update_pairs()
|
|
except Exception:
|
|
self.logger.exception("server_update_pairs failed")
|
|
continue
|
|
|
|
liquidity_changed = self._max_forward != previous_max_forward \
|
|
or self._max_reverse != previous_max_reverse
|
|
mining_fees_changed = self.mining_fee != previous_mining_fee
|
|
if liquidity_changed or mining_fees_changed:
|
|
self.logger.debug(f"updating announcement: {liquidity_changed=}, {mining_fees_changed=}")
|
|
elif time.time() - last_update < transport.OFFER_UPDATE_INTERVAL_SEC:
|
|
continue
|
|
|
|
await transport.publish_offer(self)
|
|
last_update = time.time()
|
|
|
|
@log_exceptions
|
|
async def main_loop(self):
|
|
tasks = [self.pay_pending_invoices()]
|
|
if self.is_server:
|
|
# nostr and http are not mutually exclusive
|
|
if self.config.SWAPSERVER_PORT:
|
|
tasks.append(self.http_server.run())
|
|
if self.config.NOSTR_RELAYS:
|
|
tasks.append(self.run_nostr_server())
|
|
|
|
async with self.taskgroup as group:
|
|
for task in tasks:
|
|
await group.spawn(task)
|
|
|
|
async def stop(self):
|
|
await self.taskgroup.cancel_remaining()
|
|
|
|
def create_transport(self) -> 'SwapServerTransport':
|
|
from .lnutil import generate_random_keypair
|
|
if self.config.SWAPSERVER_URL:
|
|
return HttpTransport(self.config, self)
|
|
else:
|
|
keypair = self.lnworker.nostr_keypair if self.is_server else generate_random_keypair()
|
|
return NostrTransport(self.config, self, keypair)
|
|
|
|
async def set_nostr_proof_of_work(self) -> None:
|
|
current_pow = get_nostr_ann_pow_amount(
|
|
self.lnworker.nostr_keypair.pubkey[1:],
|
|
self.config.SWAPSERVER_ANN_POW_NONCE
|
|
)
|
|
if current_pow >= self.config.SWAPSERVER_POW_TARGET:
|
|
self.logger.debug(f"Reusing existing PoW nonce for nostr announcement.")
|
|
return
|
|
|
|
self.logger.info(f"Generating PoW for nostr announcement. Target: {self.config.SWAPSERVER_POW_TARGET}")
|
|
nonce, pow_amount = await gen_nostr_ann_pow(
|
|
self.lnworker.nostr_keypair.pubkey[1:], # pubkey without prefix
|
|
self.config.SWAPSERVER_POW_TARGET,
|
|
)
|
|
self.logger.debug(f"Found {pow_amount} bits of work for Nostr announcement.")
|
|
self.config.SWAPSERVER_ANN_POW_NONCE = nonce
|
|
|
|
async def pay_invoice(self, key):
|
|
self.logger.info(f'trying to pay invoice {key}')
|
|
self.invoices_to_pay[key] = 1000000000000 # lock
|
|
try:
|
|
invoice = self.wallet.get_invoice(key)
|
|
success, log = await self.lnworker.pay_invoice(invoice)
|
|
except Exception as e:
|
|
self.logger.info(f'exception paying {key}, will not retry')
|
|
self.invoices_to_pay.pop(key, None)
|
|
return
|
|
if not success:
|
|
self.logger.info(f'failed to pay {key}, will retry in 10 minutes')
|
|
self.invoices_to_pay[key] = now() + 600
|
|
else:
|
|
self.logger.info(f'paid invoice {key}')
|
|
self.invoices_to_pay.pop(key, None)
|
|
|
|
async def pay_pending_invoices(self):
|
|
self.invoices_to_pay = {}
|
|
while True:
|
|
await asyncio.sleep(5)
|
|
for key, not_before in list(self.invoices_to_pay.items()):
|
|
if now() < not_before:
|
|
continue
|
|
await self.taskgroup.spawn(self.pay_invoice(key))
|
|
|
|
def cancel_normal_swap(self, swap: SwapData):
|
|
""" we must not have broadcast the funding tx """
|
|
if swap is None:
|
|
return
|
|
if swap.is_funded():
|
|
self.logger.info(f'cannot cancel swap {swap.payment_hash.hex()}: already funded')
|
|
return
|
|
self._fail_swap(swap, 'user cancelled')
|
|
|
|
def _fail_swap(self, swap: SwapData, reason: str):
|
|
self.logger.info(f'failing swap {swap.payment_hash.hex()}: {reason}')
|
|
if not swap.is_reverse and swap.payment_hash in self.lnworker.hold_invoice_callbacks:
|
|
self.lnworker.unregister_hold_invoice(swap.payment_hash)
|
|
payment_secret = self.lnworker.get_payment_secret(swap.payment_hash)
|
|
payment_key = swap.payment_hash + payment_secret
|
|
e = OnionRoutingFailure(code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, data=b'')
|
|
self.lnworker.save_forwarding_failure(payment_key.hex(), failure_message=e)
|
|
self.lnwatcher.remove_callback(swap.lockup_address)
|
|
if not swap.is_funded():
|
|
with self.swaps_lock:
|
|
if self._swaps.pop(swap.payment_hash.hex(), None) is None:
|
|
self.logger.debug(f"swap {swap.payment_hash.hex()} has already been deleted.")
|
|
# TODO clean-up other swaps dicts, i.e. undo _add_or_reindex_swap()
|
|
|
|
@classmethod
|
|
def extract_preimage(cls, swap: SwapData, claim_tx: Transaction) -> Optional[bytes]:
|
|
for txin in claim_tx.inputs():
|
|
witness = txin.witness_elements()
|
|
if not witness:
|
|
# tx may be unsigned
|
|
continue
|
|
preimage = witness[1]
|
|
if sha256(preimage) == swap.payment_hash:
|
|
return preimage
|
|
return None
|
|
|
|
@log_exceptions
|
|
async def _claim_swap(self, swap: SwapData) -> None:
|
|
assert self.network
|
|
assert self.lnwatcher
|
|
if not self.lnwatcher.adb.is_up_to_date():
|
|
return
|
|
current_height = self.network.get_local_height()
|
|
remaining_time = swap.locktime - current_height
|
|
txos = self.lnwatcher.adb.get_addr_outputs(swap.lockup_address)
|
|
|
|
for txin in txos.values():
|
|
if swap.is_reverse and txin.value_sats() < swap.onchain_amount:
|
|
# amount too low, we must not reveal the preimage
|
|
continue
|
|
break
|
|
else:
|
|
# swap not funded.
|
|
txin = None
|
|
# if it is a normal swap, we might have double spent the funding tx
|
|
# in that case we need to fail the HTLCs
|
|
if remaining_time <= 0:
|
|
self._fail_swap(swap, 'expired')
|
|
|
|
if txin:
|
|
# the swap is funded
|
|
# note: swap.funding_txid can change due to RBF, it will get updated here:
|
|
swap.funding_txid = txin.prevout.txid.hex()
|
|
swap._funding_prevout = txin.prevout
|
|
self._add_or_reindex_swap(swap) # to update _swaps_by_funding_outpoint
|
|
funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex())
|
|
spent_height = txin.spent_height
|
|
# set spending_txid (even if tx is local), for GUI grouping
|
|
swap.spending_txid = txin.spent_txid
|
|
# discard local spenders
|
|
if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
|
|
spent_height = None
|
|
if spent_height is not None:
|
|
if spent_height > 0:
|
|
if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY:
|
|
self.logger.info(f'stop watching swap {swap.lockup_address}')
|
|
self.lnwatcher.remove_callback(swap.lockup_address)
|
|
swap.is_redeemed = True
|
|
|
|
if not swap.is_reverse:
|
|
if swap.preimage is None and spent_height is not None:
|
|
# extract the preimage, add it to lnwatcher
|
|
claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid)
|
|
preimage = self.extract_preimage(swap, claim_tx)
|
|
if preimage:
|
|
swap.preimage = preimage
|
|
self.logger.info(f'found preimage: {preimage.hex()}')
|
|
self.lnworker.save_preimage(swap.payment_hash, preimage)
|
|
else:
|
|
# this is our refund tx
|
|
if spent_height > 0:
|
|
self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}')
|
|
self._fail_swap(swap, 'refund tx confirmed')
|
|
return
|
|
if remaining_time > 0:
|
|
# too early for refund
|
|
return
|
|
if swap.preimage:
|
|
# we have been paid. do not try to get refund.
|
|
return
|
|
else:
|
|
if swap.preimage is None:
|
|
swap.preimage = self.lnworker.get_preimage(swap.payment_hash)
|
|
if swap.preimage is None:
|
|
if funding_height.conf <= 0:
|
|
return
|
|
key = swap.payment_hash.hex()
|
|
if remaining_time <= MIN_LOCKTIME_DELTA:
|
|
if key in self.invoices_to_pay:
|
|
# fixme: should consider cltv of ln payment
|
|
self.logger.info(f'locktime too close {key} {remaining_time}')
|
|
self.invoices_to_pay.pop(key, None)
|
|
return
|
|
if key not in self.invoices_to_pay:
|
|
self.invoices_to_pay[key] = 0
|
|
return
|
|
|
|
if self.network.config.TEST_SWAPSERVER_REFUND:
|
|
# for testing: do not create claim tx
|
|
return
|
|
|
|
if spent_height is not None and spent_height > 0:
|
|
return
|
|
txin, locktime = self.create_claim_txin(txin=txin, swap=swap)
|
|
# note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation
|
|
name = 'swap claim' if swap.is_reverse else 'swap refund'
|
|
can_be_batched = True
|
|
sweep_info = SweepInfo(
|
|
txin=txin,
|
|
cltv_abs=locktime,
|
|
txout=None,
|
|
name=name,
|
|
can_be_batched=can_be_batched,
|
|
)
|
|
try:
|
|
self.wallet.txbatcher.add_sweep_input('swaps', sweep_info)
|
|
except BelowDustLimit:
|
|
self.logger.info('utxo value below dust threshold')
|
|
return
|
|
|
|
def get_fee_for_txbatcher(self):
|
|
return self._get_tx_fee(self.config.FEE_POLICY_SWAPS)
|
|
|
|
def _get_tx_fee(self, policy_descriptor: str):
|
|
fee_policy = FeePolicy(policy_descriptor)
|
|
return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True)
|
|
|
|
def get_swap(self, payment_hash: bytes) -> Optional[SwapData]:
|
|
# for history
|
|
swap = self._swaps.get(payment_hash.hex())
|
|
if swap:
|
|
return swap
|
|
payment_hash = self._prepayments.get(payment_hash)
|
|
if payment_hash:
|
|
return self._swaps.get(payment_hash.hex())
|
|
return None
|
|
|
|
def add_lnwatcher_callback(self, swap: SwapData) -> None:
|
|
callback = lambda: self._claim_swap(swap)
|
|
self.lnwatcher.add_callback(swap.lockup_address, callback)
|
|
|
|
async def hold_invoice_callback(self, payment_hash: bytes) -> None:
|
|
# note: this assumes the wallet has been unlocked
|
|
key = payment_hash.hex()
|
|
if swap := self._swaps.get(key):
|
|
if not swap.is_funded():
|
|
output = self.create_funding_output(swap)
|
|
self.wallet.txbatcher.add_payment_output('swaps', output)
|
|
swap._payment_pending = True
|
|
else:
|
|
self.logger.info(f'key not in swaps {key}')
|
|
|
|
def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
|
|
""" server method """
|
|
assert lightning_amount_sat
|
|
locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
|
|
our_privkey = os.urandom(32)
|
|
our_pubkey = ECPrivkey(our_privkey).get_public_key_bytes(compressed=True)
|
|
onchain_amount_sat = self._get_recv_amount(lightning_amount_sat, is_reverse=True) # what the client is going to receive
|
|
if not onchain_amount_sat:
|
|
raise Exception("no onchain amount")
|
|
redeem_script = construct_script(
|
|
WITNESS_TEMPLATE_REVERSE_SWAP,
|
|
values={1:32, 5:ripemd(payment_hash), 7:their_pubkey, 10:locktime, 13:our_pubkey}
|
|
)
|
|
swap, invoice, prepay_invoice = self.add_normal_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
onchain_amount_sat=onchain_amount_sat,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
payment_hash=payment_hash,
|
|
our_privkey=our_privkey,
|
|
prepay=True,
|
|
)
|
|
self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
|
|
return swap, invoice, prepay_invoice
|
|
|
|
def add_normal_swap(
|
|
self, *,
|
|
redeem_script: bytes,
|
|
locktime: int, # onchain
|
|
onchain_amount_sat: int,
|
|
lightning_amount_sat: int,
|
|
payment_hash: bytes,
|
|
our_privkey: bytes,
|
|
prepay: bool,
|
|
channels: Optional[Sequence['Channel']] = None,
|
|
min_final_cltv_expiry_delta: Optional[int] = None,
|
|
) -> Tuple[SwapData, str, Optional[str]]:
|
|
"""creates a hold invoice"""
|
|
if prepay:
|
|
prepay_amount_sat = self.get_fee_for_txbatcher() * 2
|
|
invoice_amount_sat = lightning_amount_sat - prepay_amount_sat
|
|
else:
|
|
invoice_amount_sat = lightning_amount_sat
|
|
|
|
_, invoice = self.lnworker.get_bolt11_invoice(
|
|
payment_hash=payment_hash,
|
|
amount_msat=invoice_amount_sat * 1000,
|
|
message='Submarine swap',
|
|
expiry=300,
|
|
fallback_address=None,
|
|
channels=channels,
|
|
min_final_cltv_expiry_delta=min_final_cltv_expiry_delta,
|
|
)
|
|
# add payment info to lnworker
|
|
self.lnworker.add_payment_info_for_hold_invoice(payment_hash, invoice_amount_sat)
|
|
|
|
if prepay:
|
|
prepay_hash = self.lnworker.create_payment_info(amount_msat=prepay_amount_sat*1000)
|
|
_, prepay_invoice = self.lnworker.get_bolt11_invoice(
|
|
payment_hash=prepay_hash,
|
|
amount_msat=prepay_amount_sat * 1000,
|
|
message='Submarine swap mining fees',
|
|
expiry=300,
|
|
fallback_address=None,
|
|
channels=channels,
|
|
min_final_cltv_expiry_delta=min_final_cltv_expiry_delta,
|
|
)
|
|
self.lnworker.bundle_payments([payment_hash, prepay_hash])
|
|
self._prepayments[prepay_hash] = payment_hash
|
|
else:
|
|
prepay_invoice = None
|
|
prepay_hash = None
|
|
|
|
lockup_address = script_to_p2wsh(redeem_script)
|
|
receive_address = self.wallet.get_receiving_address()
|
|
swap = SwapData(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=our_privkey,
|
|
preimage=None,
|
|
prepay_hash=prepay_hash,
|
|
lockup_address=lockup_address,
|
|
onchain_amount=onchain_amount_sat,
|
|
receive_address=receive_address,
|
|
lightning_amount=lightning_amount_sat,
|
|
is_reverse=False,
|
|
is_redeemed=False,
|
|
funding_txid=None,
|
|
spending_txid=None,
|
|
)
|
|
swap._payment_hash = payment_hash
|
|
self._add_or_reindex_swap(swap)
|
|
self.add_lnwatcher_callback(swap)
|
|
return swap, invoice, prepay_invoice
|
|
|
|
def create_reverse_swap(self, *, lightning_amount_sat: int, their_pubkey: bytes) -> SwapData:
|
|
""" server method. """
|
|
assert lightning_amount_sat is not None
|
|
locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
|
|
privkey = os.urandom(32)
|
|
our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
|
|
onchain_amount_sat = self._get_send_amount(lightning_amount_sat, is_reverse=False)
|
|
if not onchain_amount_sat:
|
|
raise Exception("no onchain amount")
|
|
preimage = os.urandom(32)
|
|
payment_hash = sha256(preimage)
|
|
redeem_script = construct_script(
|
|
WITNESS_TEMPLATE_REVERSE_SWAP,
|
|
values={1:32, 5:ripemd(payment_hash), 7:our_pubkey, 10:locktime, 13:their_pubkey}
|
|
)
|
|
swap = self.add_reverse_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=privkey,
|
|
preimage=preimage,
|
|
payment_hash=payment_hash,
|
|
prepay_hash=None,
|
|
onchain_amount_sat=onchain_amount_sat,
|
|
lightning_amount_sat=lightning_amount_sat)
|
|
return swap
|
|
|
|
def add_reverse_swap(
|
|
self,
|
|
*,
|
|
redeem_script: bytes,
|
|
locktime: int, # onchain
|
|
privkey: bytes,
|
|
lightning_amount_sat: int,
|
|
onchain_amount_sat: int,
|
|
preimage: bytes,
|
|
payment_hash: bytes,
|
|
prepay_hash: Optional[bytes] = None,
|
|
) -> SwapData:
|
|
lockup_address = script_to_p2wsh(redeem_script)
|
|
receive_address = self.wallet.get_receiving_address()
|
|
swap = SwapData(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=privkey,
|
|
preimage=preimage,
|
|
prepay_hash=prepay_hash,
|
|
lockup_address=lockup_address,
|
|
onchain_amount=onchain_amount_sat,
|
|
receive_address=receive_address,
|
|
lightning_amount=lightning_amount_sat,
|
|
is_reverse=True,
|
|
is_redeemed=False,
|
|
funding_txid=None,
|
|
spending_txid=None,
|
|
)
|
|
if prepay_hash:
|
|
self._prepayments[prepay_hash] = payment_hash
|
|
swap._payment_hash = payment_hash
|
|
self._add_or_reindex_swap(swap)
|
|
self.add_lnwatcher_callback(swap)
|
|
return swap
|
|
|
|
def server_add_swap_invoice(self, request):
|
|
invoice = request['invoice']
|
|
invoice = Invoice.from_bech32(invoice)
|
|
key = invoice.rhash
|
|
payment_hash = bytes.fromhex(key)
|
|
with self.swaps_lock:
|
|
assert key in self._swaps
|
|
swap = self._swaps[key]
|
|
assert swap.lightning_amount == int(invoice.get_amount_sat())
|
|
self.wallet.save_invoice(invoice)
|
|
# check that we have the preimage
|
|
assert sha256(swap.preimage) == payment_hash
|
|
assert swap.spending_txid is None
|
|
self.invoices_to_pay[key] = 0
|
|
return {}
|
|
|
|
async def normal_swap(
|
|
self,
|
|
*,
|
|
lightning_amount_sat: int,
|
|
expected_onchain_amount_sat: int,
|
|
password,
|
|
tx: PartialTransaction = None,
|
|
channels = None,
|
|
) -> Optional[str]:
|
|
"""send on-chain BTC, receive on Lightning
|
|
|
|
Old (removed) flow:
|
|
- User generates an LN invoice with RHASH, and knows preimage.
|
|
- User creates on-chain output locked to RHASH.
|
|
- Server pays LN invoice. User reveals preimage.
|
|
- Server spends the on-chain output using preimage.
|
|
cltv safety requirement: (onchain_locktime > LN_locktime), otherwise server is vulnerable
|
|
|
|
New flow:
|
|
- User requests swap
|
|
- Server creates preimage, sends RHASH to user
|
|
- User creates hold invoice, sends it to server
|
|
- Server sends HTLC, user holds it
|
|
- User creates on-chain output locked to RHASH
|
|
- Server spends the on-chain output using preimage (revealing the preimage)
|
|
- User fulfills HTLC using preimage
|
|
cltv safety requirement: (onchain_locktime < LN_locktime), otherwise client is vulnerable
|
|
"""
|
|
assert self.network
|
|
assert self.lnwatcher
|
|
swap, invoice = await self.request_normal_swap(
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
expected_onchain_amount_sat=expected_onchain_amount_sat,
|
|
channels=channels,
|
|
)
|
|
tx = self.create_funding_tx(swap, tx, password=password)
|
|
return await self.wait_for_htlcs_and_broadcast(swap=swap, invoice=invoice, tx=tx)
|
|
|
|
async def request_normal_swap(
|
|
self, transport, *,
|
|
lightning_amount_sat: int,
|
|
expected_onchain_amount_sat: int,
|
|
channels: Optional[Sequence['Channel']] = None,
|
|
) -> Tuple[SwapData, str]:
|
|
await self.is_initialized.wait() # add timeout
|
|
refund_privkey = os.urandom(32)
|
|
refund_pubkey = ECPrivkey(refund_privkey).get_public_key_bytes(compressed=True)
|
|
self.logger.info('requesting preimage hash for swap')
|
|
request_data = {
|
|
"invoiceAmount": lightning_amount_sat,
|
|
"refundPublicKey": refund_pubkey.hex()
|
|
}
|
|
data = await transport.send_request_to_server('createnormalswap', request_data)
|
|
payment_hash = bytes.fromhex(data["preimageHash"])
|
|
onchain_amount = data["expectedAmount"]
|
|
locktime = data["timeoutBlockHeight"]
|
|
lockup_address = data["address"]
|
|
redeem_script = bytes.fromhex(data["redeemScript"])
|
|
# verify redeem_script is built with our pubkey and preimage
|
|
check_reverse_redeem_script(
|
|
redeem_script=redeem_script,
|
|
lockup_address=lockup_address,
|
|
payment_hash=payment_hash,
|
|
locktime=locktime,
|
|
refund_pubkey=refund_pubkey,
|
|
)
|
|
|
|
# check that onchain_amount is not more than what we estimated
|
|
if onchain_amount > expected_onchain_amount_sat:
|
|
raise Exception(f"fswap check failed: onchain_amount is more than what we estimated: "
|
|
f"{onchain_amount} > {expected_onchain_amount_sat}")
|
|
# verify that they are not locking up funds for too long
|
|
if locktime - self.network.get_local_height() > MAX_LOCKTIME_DELTA:
|
|
raise Exception("fswap check failed: locktime too far in future")
|
|
|
|
swap, invoice, _ = self.add_normal_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
onchain_amount_sat=onchain_amount,
|
|
payment_hash=payment_hash,
|
|
our_privkey=refund_privkey,
|
|
prepay=False,
|
|
channels=channels,
|
|
# When the client is doing a normal swap, we create a ln-invoice with larger than usual final_cltv_delta.
|
|
# If the user goes offline after broadcasting the funding tx (but before it is mined and
|
|
# the server claims it), they need to come back online before the held ln-htlc expires (see #8940).
|
|
# If the held ln-htlc expires, and the funding tx got confirmed, the server will have claimed the onchain
|
|
# funds, and the ln-htlc will be timed out onchain (and channel force-closed). i.e. the user loses the swap
|
|
# amount. Increasing the final_cltv_delta the user puts in the invoice extends this critical window.
|
|
min_final_cltv_expiry_delta=MIN_FINAL_CLTV_DELTA_FOR_CLIENT,
|
|
)
|
|
return swap, invoice
|
|
|
|
async def wait_for_htlcs_and_broadcast(
|
|
self, transport, *,
|
|
swap: SwapData,
|
|
invoice: str,
|
|
tx: Transaction,
|
|
) -> Optional[str]:
|
|
await transport.is_connected.wait()
|
|
payment_hash = swap.payment_hash
|
|
refund_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True)
|
|
async def callback(payment_hash):
|
|
# FIXME what if this raises, e.g. TxBroadcastError?
|
|
# We will never retry the hold-invoice-callback.
|
|
await self.broadcast_funding_tx(swap, tx)
|
|
|
|
self.lnworker.register_hold_invoice(payment_hash, callback)
|
|
|
|
# send invoice to server and wait for htlcs
|
|
request_data = {
|
|
"preimageHash": payment_hash.hex(),
|
|
"invoice": invoice,
|
|
"refundPublicKey": refund_pubkey.hex(),
|
|
}
|
|
data = await transport.send_request_to_server('addswapinvoice', request_data)
|
|
# wait for funding tx
|
|
lnaddr = lndecode(invoice)
|
|
while swap.funding_txid is None and not lnaddr.is_expired():
|
|
await asyncio.sleep(0.1)
|
|
return swap.funding_txid
|
|
|
|
def create_funding_output(self, swap: SwapData) -> PartialTxOutput:
|
|
return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
|
|
|
|
def create_funding_tx(
|
|
self,
|
|
swap: SwapData,
|
|
tx: Optional[PartialTransaction],
|
|
*,
|
|
password,
|
|
) -> PartialTransaction:
|
|
# create funding tx
|
|
# use fee policy set by user (not using txbatcher)
|
|
fee_policy = FeePolicy(self.config.FEE_POLICY)
|
|
# note: rbf must not decrease payment
|
|
# this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
|
|
if tx is None:
|
|
funding_output = self.create_funding_output(swap)
|
|
tx = self.wallet.make_unsigned_transaction(
|
|
outputs=[funding_output],
|
|
rbf=True,
|
|
fee_policy=fee_policy,
|
|
)
|
|
else:
|
|
tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address)
|
|
tx.set_rbf(True)
|
|
self.wallet.sign_transaction(tx, password)
|
|
return tx
|
|
|
|
@log_exceptions
|
|
async def request_swap_for_amount(self, transport, onchain_amount) -> Optional[Tuple[SwapData, str]]:
|
|
await self.is_initialized.wait()
|
|
lightning_amount_sat = self.get_recv_amount(onchain_amount, is_reverse=False)
|
|
if lightning_amount_sat is None:
|
|
raise SwapServerError(_("Swap amount outside of providers limits") + ":\n"
|
|
+ _("min") + f": {self.get_min_amount()}\n"
|
|
+ _("max") + f": {self.get_provider_max_reverse_amount()}")
|
|
swap, invoice = await self.request_normal_swap(
|
|
transport,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
expected_onchain_amount_sat=onchain_amount)
|
|
return swap, invoice
|
|
|
|
@log_exceptions
|
|
async def broadcast_funding_tx(self, swap: SwapData, tx: Transaction) -> None:
|
|
swap.funding_txid = tx.txid()
|
|
await self.network.broadcast_transaction(tx)
|
|
|
|
async def reverse_swap(
|
|
self, transport,
|
|
*,
|
|
lightning_amount_sat: int,
|
|
expected_onchain_amount_sat: int,
|
|
channels: Optional[Sequence['Channel']] = None,
|
|
) -> Optional[str]:
|
|
"""send on Lightning, receive on-chain
|
|
|
|
- User generates preimage, RHASH. Sends RHASH to server.
|
|
- Server creates an LN invoice for RHASH.
|
|
- User pays LN invoice - except server needs to hold the HTLC as preimage is unknown.
|
|
- if the server requested a fee prepayment (using 'minerFeeInvoice'),
|
|
the server will have the preimage for that. The user will send HTLCs for both the main RHASH,
|
|
and for the fee prepayment. Once both MPP sets arrive at the server, the server will fulfill
|
|
the HTLCs for the fee prepayment (before creating the on-chain output).
|
|
- Server creates on-chain output locked to RHASH.
|
|
- User spends on-chain output, revealing preimage.
|
|
- Server fulfills HTLC using preimage.
|
|
|
|
Note: expected_onchain_amount_sat is BEFORE deducting the on-chain claim tx fee.
|
|
"""
|
|
assert self.network
|
|
assert self.lnwatcher
|
|
privkey = os.urandom(32)
|
|
our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
|
|
preimage = os.urandom(32)
|
|
payment_hash = sha256(preimage)
|
|
request_data = {
|
|
"type": "reversesubmarine",
|
|
"pairId": "BTC/BTC",
|
|
"orderSide": "buy",
|
|
"invoiceAmount": lightning_amount_sat,
|
|
"preimageHash": payment_hash.hex(),
|
|
"claimPublicKey": our_pubkey.hex()
|
|
}
|
|
self.logger.debug(f'rswap: sending request for {lightning_amount_sat}')
|
|
data = await transport.send_request_to_server('createswap', request_data)
|
|
invoice = data['invoice']
|
|
fee_invoice = data.get('minerFeeInvoice')
|
|
lockup_address = data['lockupAddress']
|
|
redeem_script = bytes.fromhex(data['redeemScript'])
|
|
locktime = data['timeoutBlockHeight']
|
|
onchain_amount = data["onchainAmount"]
|
|
response_id = data['id']
|
|
self.logger.debug(f'rswap: {response_id=}')
|
|
# verify redeem_script is built with our pubkey and preimage
|
|
check_reverse_redeem_script(
|
|
redeem_script=redeem_script,
|
|
lockup_address=lockup_address,
|
|
payment_hash=payment_hash,
|
|
locktime=locktime,
|
|
refund_pubkey=None,
|
|
claim_pubkey=our_pubkey,
|
|
)
|
|
# check that the onchain amount is what we expected
|
|
if onchain_amount < expected_onchain_amount_sat:
|
|
raise Exception(f"rswap check failed: onchain_amount is less than what we expected: "
|
|
f"{onchain_amount} < {expected_onchain_amount_sat}")
|
|
# verify that we will have enough time to get our tx confirmed
|
|
if locktime - self.network.get_local_height() <= MIN_LOCKTIME_DELTA:
|
|
raise Exception("rswap check failed: locktime too close")
|
|
# verify invoice payment_hash
|
|
lnaddr = self.lnworker._check_bolt11_invoice(invoice)
|
|
invoice_amount = int(lnaddr.get_amount_sat())
|
|
if lnaddr.paymenthash != payment_hash:
|
|
raise Exception("rswap check failed: inconsistent RHASH and invoice")
|
|
# check that the lightning amount is what we requested
|
|
if fee_invoice:
|
|
fee_lnaddr = self.lnworker._check_bolt11_invoice(fee_invoice)
|
|
invoice_amount += fee_lnaddr.get_amount_sat()
|
|
prepay_hash = fee_lnaddr.paymenthash
|
|
else:
|
|
prepay_hash = None
|
|
if int(invoice_amount) != lightning_amount_sat:
|
|
raise Exception(f"rswap check failed: invoice_amount ({invoice_amount}) "
|
|
f"not what we requested ({lightning_amount_sat})")
|
|
# save swap data to wallet file
|
|
swap = self.add_reverse_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=privkey,
|
|
preimage=preimage,
|
|
payment_hash=payment_hash,
|
|
prepay_hash=prepay_hash,
|
|
onchain_amount_sat=onchain_amount,
|
|
lightning_amount_sat=lightning_amount_sat)
|
|
# initiate fee payment.
|
|
if fee_invoice:
|
|
fee_invoice_obj = Invoice.from_bech32(fee_invoice)
|
|
asyncio.ensure_future(self.lnworker.pay_invoice(fee_invoice_obj))
|
|
# we return if we detect funding
|
|
async def wait_for_funding(swap):
|
|
while swap.funding_txid is None:
|
|
await asyncio.sleep(1)
|
|
# initiate main payment
|
|
invoice_obj = Invoice.from_bech32(invoice)
|
|
tasks = [asyncio.create_task(self.lnworker.pay_invoice(invoice_obj, channels=channels)), asyncio.create_task(wait_for_funding(swap))]
|
|
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
|
return swap.funding_txid
|
|
|
|
def _add_or_reindex_swap(self, swap: SwapData) -> None:
|
|
with self.swaps_lock:
|
|
if swap.payment_hash.hex() not in self._swaps:
|
|
self._swaps[swap.payment_hash.hex()] = swap
|
|
if swap._funding_prevout:
|
|
self._swaps_by_funding_outpoint[swap._funding_prevout] = swap
|
|
self._swaps_by_lockup_address[swap.lockup_address] = swap
|
|
|
|
def server_update_pairs(self) -> None:
|
|
""" for server """
|
|
self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000 # type: ignore
|
|
self._min_amount = 20000
|
|
oc_balance_sat: int = self.wallet.get_spendable_balance_sat()
|
|
max_forward: int = min(int(self.lnworker.num_sats_can_receive()), oc_balance_sat, 10000000)
|
|
max_reverse: int = min(int(self.lnworker.num_sats_can_send()), 10000000)
|
|
self._max_forward: int = self._keep_leading_digits(max_forward, 2)
|
|
self._max_reverse: int = self._keep_leading_digits(max_reverse, 2)
|
|
new_mining_fee = self.get_fee_for_txbatcher()
|
|
if self.mining_fee is None \
|
|
or abs(self.mining_fee - new_mining_fee) / self.mining_fee > 0.1:
|
|
self.mining_fee = new_mining_fee
|
|
|
|
@staticmethod
|
|
def _keep_leading_digits(num: int, digits: int) -> int:
|
|
"""Reduces precision of num to `digits` leading digits."""
|
|
if num <= 0:
|
|
return 0
|
|
num_str = str(num)
|
|
zeroed_num_str = f"{num_str[:digits]}{(len(num_str[digits:])) * '0'}"
|
|
return int(zeroed_num_str)
|
|
|
|
def update_pairs(self, pairs: SwapFees):
|
|
self.logger.info(f'updating fees {pairs}')
|
|
self.mining_fee = pairs.mining_fee
|
|
self.percentage = pairs.percentage
|
|
self._min_amount = pairs.min_amount
|
|
self._max_forward = pairs.max_forward
|
|
self._max_reverse = pairs.max_reverse
|
|
self.trigger_pairs_updated_threadsafe()
|
|
|
|
def trigger_pairs_updated_threadsafe(self):
|
|
def trigger():
|
|
self.is_initialized.set()
|
|
self.pairs_updated.set()
|
|
self.pairs_updated.clear()
|
|
|
|
run_sync_function_on_asyncio_thread(trigger, block=True)
|
|
|
|
def get_provider_max_forward_amount(self) -> int:
|
|
"""in sat"""
|
|
return self._max_forward
|
|
|
|
def get_provider_max_reverse_amount(self) -> int:
|
|
"""in sat"""
|
|
return self._max_reverse
|
|
|
|
def get_min_amount(self) -> int:
|
|
"""in satoshis"""
|
|
return self._min_amount
|
|
|
|
def check_invoice_amount(self, x, is_reverse: bool) -> bool:
|
|
if is_reverse:
|
|
max_amount = self.get_provider_max_forward_amount()
|
|
else:
|
|
max_amount = self.get_provider_max_reverse_amount()
|
|
return self.get_min_amount() <= x <= max_amount
|
|
|
|
def _get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
"""For a given swap direction and amount we send, returns how much we will receive.
|
|
|
|
Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
|
|
In the reverse direction, the result matches what the swap server returns as response["onchainAmount"].
|
|
"""
|
|
if send_amount is None:
|
|
return None
|
|
x = Decimal(send_amount)
|
|
percentage = Decimal(self.percentage)
|
|
if is_reverse:
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
# see/ref:
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L948
|
|
percentage_fee = math.ceil(percentage * x / 100)
|
|
base_fee = self.mining_fee
|
|
x -= percentage_fee + base_fee
|
|
x = math.floor(x)
|
|
if x < dust_threshold():
|
|
return None
|
|
else:
|
|
x -= self.mining_fee
|
|
percentage_fee = math.ceil(x * percentage / (100 + percentage))
|
|
x -= percentage_fee
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
x = int(x)
|
|
return x
|
|
|
|
def _get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
"""For a given swap direction and amount we want to receive, returns how much we will need to send.
|
|
|
|
Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
|
|
In the forward direction, the result matches what the swap server returns as response["expectedAmount"].
|
|
"""
|
|
if not recv_amount:
|
|
return None
|
|
x = Decimal(recv_amount)
|
|
percentage = Decimal(self.percentage)
|
|
if is_reverse:
|
|
# see/ref:
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L928
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L958
|
|
base_fee = self.mining_fee
|
|
x += base_fee
|
|
x = math.ceil(x / ((100 - percentage) / 100))
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
else:
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
# see/ref:
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L708
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/rates/FeeProvider.ts#L90
|
|
percentage_fee = math.ceil(percentage * x / 100)
|
|
x += percentage_fee + self.mining_fee
|
|
x = int(x)
|
|
return x
|
|
|
|
def get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
# first, add percentage fee
|
|
recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
|
|
# sanity check calculation can be inverted
|
|
if recv_amount is not None:
|
|
inverted_send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
|
|
# accept off-by ones as amt_rcv = recv_amt(send_amt(amt_rcv)) only up to +-1
|
|
if abs(send_amount - inverted_send_amount) > 1:
|
|
raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
|
|
f"send_amount={send_amount} -> recv_amount={recv_amount} -> inverted_send_amount={inverted_send_amount}")
|
|
# second, add on-chain claim tx fee
|
|
if is_reverse and recv_amount is not None:
|
|
recv_amount -= self.get_fee_for_txbatcher()
|
|
return recv_amount
|
|
|
|
def get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
# first, add on-chain claim tx fee
|
|
if is_reverse and recv_amount is not None:
|
|
recv_amount += self.get_fee_for_txbatcher()
|
|
# second, add percentage fee
|
|
send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
|
|
# sanity check calculation can be inverted
|
|
if send_amount is not None:
|
|
inverted_recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
|
|
if recv_amount != inverted_recv_amount:
|
|
raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
|
|
f"recv_amount={recv_amount} -> send_amount={send_amount} -> inverted_recv_amount={inverted_recv_amount}")
|
|
return send_amount
|
|
|
|
def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable[SwapData]:
|
|
swaps = []
|
|
for txout_idx, _txo in enumerate(tx.outputs()):
|
|
prevout = TxOutpoint(txid=bytes.fromhex(tx.txid()), out_idx=txout_idx)
|
|
if swap := self._swaps_by_funding_outpoint.get(prevout):
|
|
swaps.append(swap)
|
|
return swaps
|
|
|
|
def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[Tuple[int, SwapData]]:
|
|
swaps = []
|
|
for i, txin in enumerate(tx.inputs()):
|
|
if swap := self.get_swap_by_claim_txin(txin):
|
|
swaps.append((i, swap))
|
|
return swaps
|
|
|
|
def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]:
|
|
return self._swaps_by_funding_outpoint.get(txin.prevout)
|
|
|
|
def is_lockup_address_for_a_swap(self, addr: str) -> bool:
|
|
return bool(self._swaps_by_lockup_address.get(addr))
|
|
|
|
@classmethod
|
|
def add_txin_info(cls, swap, txin: PartialTxInput) -> None:
|
|
"""Add some info to a claim txin.
|
|
note: even without signing, this is useful for tx size estimation.
|
|
"""
|
|
preimage = swap.preimage if swap.is_reverse else 0
|
|
witness_script = swap.redeem_script
|
|
txin.script_sig = b''
|
|
txin.witness_script = witness_script
|
|
sig_dummy = b'\x00' * 71 # DER-encoded ECDSA sig, with low S and low R
|
|
witness = [sig_dummy, preimage, witness_script]
|
|
txin.witness_sizehint = len(construct_witness(witness))
|
|
txin.nsequence = 1 if swap.is_reverse else 0xffffffff - 2
|
|
|
|
@classmethod
|
|
def create_claim_txin(
|
|
cls,
|
|
*,
|
|
txin: PartialTxInput,
|
|
swap: SwapData,
|
|
) -> Tuple[PartialTxInput, Optional[int]]:
|
|
if swap.is_reverse: # successful reverse swap
|
|
locktime = None
|
|
# preimage will be set in sign_tx
|
|
else: # timing out forward swap
|
|
locktime = swap.locktime
|
|
cls.add_txin_info(swap, txin)
|
|
txin.privkey = swap.privkey
|
|
def make_witness(sig):
|
|
# preimae not known yet
|
|
preimage = swap.preimage if swap.is_reverse else 0
|
|
witness_script = swap.redeem_script
|
|
return construct_witness([sig, preimage, witness_script])
|
|
txin.make_witness = make_witness
|
|
return txin, locktime
|
|
|
|
def client_max_amount_forward_swap(self) -> Optional[int]:
|
|
""" returns None if we cannot swap """
|
|
max_swap_amt_ln = self.get_provider_max_reverse_amount()
|
|
if max_swap_amt_ln is None:
|
|
return None
|
|
max_recv_amt_ln = int(self.lnworker.num_sats_can_receive())
|
|
max_amt_ln = int(min(max_swap_amt_ln, max_recv_amt_ln))
|
|
max_amt_oc = self.get_send_amount(max_amt_ln, is_reverse=False) or 0
|
|
min_amt_oc = self.get_send_amount(self.get_min_amount(), is_reverse=False) or 0
|
|
return max_amt_oc if max_amt_oc >= min_amt_oc else None
|
|
|
|
def client_max_amount_reverse_swap(self) -> Optional[int]:
|
|
"""Returns None if swap is not possible"""
|
|
provider_max = self.get_provider_max_forward_amount()
|
|
max_ln_send = int(self.lnworker.num_sats_can_send())
|
|
max_swap_size = min(max_ln_send, provider_max)
|
|
if max_swap_size < self.get_min_amount():
|
|
return None
|
|
return max_swap_size
|
|
|
|
def server_create_normal_swap(self, request):
|
|
# normal for client, reverse for server
|
|
#request = await r.json()
|
|
lightning_amount_sat = request['invoiceAmount']
|
|
their_pubkey = bytes.fromhex(request['refundPublicKey'])
|
|
assert len(their_pubkey) == 33
|
|
swap = self.create_reverse_swap(
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
their_pubkey=their_pubkey,
|
|
)
|
|
response = {
|
|
"id": swap.payment_hash.hex(),
|
|
'preimageHash': swap.payment_hash.hex(),
|
|
"acceptZeroConf": False,
|
|
"expectedAmount": swap.onchain_amount,
|
|
"timeoutBlockHeight": swap.locktime,
|
|
"address": swap.lockup_address,
|
|
"redeemScript": swap.redeem_script.hex(),
|
|
}
|
|
return response
|
|
|
|
def server_create_swap(self, request):
|
|
# reverse for client, forward for server
|
|
# requesting a normal swap (old protocol) will raise an exception
|
|
#request = await r.json()
|
|
req_type = request['type']
|
|
assert request['pairId'] == 'BTC/BTC'
|
|
if req_type == 'reversesubmarine':
|
|
lightning_amount_sat=request['invoiceAmount']
|
|
payment_hash=bytes.fromhex(request['preimageHash'])
|
|
their_pubkey=bytes.fromhex(request['claimPublicKey'])
|
|
assert len(payment_hash) == 32
|
|
assert len(their_pubkey) == 33
|
|
swap, invoice, prepay_invoice = self.create_normal_swap(
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
payment_hash=payment_hash,
|
|
their_pubkey=their_pubkey
|
|
)
|
|
response = {
|
|
'id': payment_hash.hex(),
|
|
'invoice': invoice,
|
|
'minerFeeInvoice': prepay_invoice,
|
|
'lockupAddress': swap.lockup_address,
|
|
'redeemScript': swap.redeem_script.hex(),
|
|
'timeoutBlockHeight': swap.locktime,
|
|
"onchainAmount": swap.onchain_amount,
|
|
}
|
|
elif req_type == 'submarine':
|
|
raise Exception('Deprecated API. Please upgrade your version of Electrum')
|
|
else:
|
|
raise Exception('unsupported request type:' + req_type)
|
|
return response
|
|
|
|
def get_groups_for_onchain_history(self):
|
|
current_height = self.wallet.adb.get_local_height()
|
|
d = {}
|
|
# add info about submarine swaps
|
|
settled_payments = self.lnworker.get_payments(status='settled')
|
|
with self.swaps_lock:
|
|
swaps_items = list(self._swaps.items())
|
|
for payment_hash_hex, swap in swaps_items:
|
|
txid = swap.spending_txid if swap.is_reverse else swap.funding_txid
|
|
if txid is None:
|
|
continue
|
|
payment_hash = bytes.fromhex(payment_hash_hex)
|
|
if payment_hash in settled_payments:
|
|
plist = settled_payments[payment_hash]
|
|
info = self.lnworker.get_payment_info(payment_hash)
|
|
direction, amount_msat, fee_msat, timestamp = self.lnworker.get_payment_value(info, plist)
|
|
else:
|
|
amount_msat = 0
|
|
|
|
if swap.is_reverse:
|
|
group_label = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount)
|
|
else:
|
|
group_label = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount)
|
|
|
|
label = _('Claim transaction') if swap.is_reverse else _('Funding transaction')
|
|
delta = current_height - swap.locktime
|
|
if self.wallet.adb.is_mine(swap.lockup_address):
|
|
tx_height = self.wallet.adb.get_tx_height(swap.funding_txid)
|
|
if swap.is_reverse and tx_height.height <= 0:
|
|
label += ' (%s)' % _('waiting for funding tx confirmation')
|
|
if not swap.is_reverse and not swap.is_redeemed and swap.spending_txid is None and delta < 0:
|
|
label += f' (refundable in {-delta} blocks)' # fixme: only if unspent
|
|
d[txid] = {
|
|
'group_id': txid,
|
|
'label': label,
|
|
'group_label': group_label,
|
|
}
|
|
if not swap.is_reverse:
|
|
claim_tx = self.lnwatcher.adb.get_transaction(swap.spending_txid)
|
|
if claim_tx and not self.extract_preimage(swap, claim_tx):
|
|
# if the spending_tx is in the wallet, this will add it
|
|
# to the group (see wallet.get_full_history)
|
|
d[swap.spending_txid] = {
|
|
'group_id': txid,
|
|
'group_label': group_label,
|
|
'label': _('Refund transaction'),
|
|
}
|
|
self.wallet._accounting_addresses.add(swap.lockup_address)
|
|
return d
|
|
|
|
def get_group_id_for_payment_hash(self, payment_hash: bytes) -> Optional[str]:
|
|
# add group_id to swap transactions
|
|
swap = self.get_swap(payment_hash)
|
|
if swap:
|
|
return swap.spending_txid if swap.is_reverse else swap.funding_txid
|
|
return None
|
|
|
|
def get_pending_swaps(self) -> List[SwapData]:
|
|
"""Returns a list of swaps with unconfirmed funding tx (which require us to stay online)."""
|
|
pending_swaps: List[SwapData] = []
|
|
with self.swaps_lock:
|
|
swaps = list(self._swaps.values())
|
|
for swap in swaps:
|
|
if swap.is_redeemed:
|
|
# adb data might have been removed after is_redeemed was set.
|
|
# in that case lnwatcher will no longer fetch the spending tx
|
|
# and adb will return TX_HEIGHT_LOCAL
|
|
continue
|
|
# note: adb.get_tx_height returns TX_HEIGHT_LOCAL if the txid is unknown
|
|
funding_height = self.lnworker.wallet.adb.get_tx_height(swap.funding_txid).height
|
|
spending_height = self.lnworker.wallet.adb.get_tx_height(swap.spending_txid).height
|
|
if funding_height > TX_HEIGHT_LOCAL and spending_height <= TX_HEIGHT_LOCAL:
|
|
pending_swaps.append(swap)
|
|
return pending_swaps
|
|
|
|
|
|
class SwapServerTransport(Logger):
|
|
|
|
def __init__(self, *, config: 'SimpleConfig', sm: 'SwapManager'):
|
|
Logger.__init__(self)
|
|
self.sm = sm
|
|
self.network = sm.network
|
|
self.config = config
|
|
self.is_connected = asyncio.Event()
|
|
self.connect_timeout = 10 if self.uses_proxy else 5
|
|
|
|
def __enter__(self):
|
|
pass
|
|
|
|
def __exit__(self, ex_type, ex, tb):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
pass
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict:
|
|
pass
|
|
|
|
async def get_pairs(self) -> None:
|
|
pass
|
|
|
|
@property
|
|
def uses_proxy(self):
|
|
return self.network.proxy and self.network.proxy.enabled
|
|
|
|
|
|
class HttpTransport(SwapServerTransport):
|
|
|
|
def __init__(self, config, sm):
|
|
SwapServerTransport.__init__(self, config=config, sm=sm)
|
|
self.api_url = config.SWAPSERVER_URL
|
|
self.is_connected.set()
|
|
|
|
def __enter__(self):
|
|
asyncio.run_coroutine_threadsafe(self.get_pairs(), self.network.asyncio_loop)
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex, tb):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
asyncio.create_task(self.get_pairs())
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
async def send_request_to_server(self, method, request_data):
|
|
response = await self.network.async_send_http_on_proxy(
|
|
'post' if request_data else 'get',
|
|
self.api_url + '/' + method,
|
|
json=request_data,
|
|
timeout=30)
|
|
return json.loads(response)
|
|
|
|
async def get_pairs(self) -> None:
|
|
"""Might raise SwapServerError."""
|
|
try:
|
|
response = await self.send_request_to_server('getpairs', None)
|
|
except aiohttp.ClientError as e:
|
|
self.logger.error(f"Swap server errored: {e!r}")
|
|
raise SwapServerError() from e
|
|
assert response.get('htlcFirst') is True
|
|
fees = response['pairs']['BTC/BTC']['fees']
|
|
limits = response['pairs']['BTC/BTC']['limits']
|
|
pairs = SwapFees(
|
|
percentage=fees['percentage'],
|
|
mining_fee=fees['minerFees']['baseAsset']['mining_fee'],
|
|
min_amount=limits['minimal'],
|
|
max_forward=limits['max_forward_amount'],
|
|
max_reverse=limits['max_reverse_amount'],
|
|
)
|
|
self.sm.update_pairs(pairs)
|
|
|
|
|
|
class NostrTransport(SwapServerTransport):
|
|
# uses nostr:
|
|
# - to advertise servers
|
|
# - for client-server RPCs (using DMs)
|
|
# (todo: we should use onion messages for that)
|
|
|
|
EPHEMERAL_REQUEST = 25582
|
|
USER_STATUS_NIP38 = 30315
|
|
NOSTR_EVENT_VERSION = 5
|
|
OFFER_UPDATE_INTERVAL_SEC = 60 * 10
|
|
LIQUIDITY_UPDATE_INTERVAL_SEC = 30
|
|
|
|
def __init__(self, config, sm, keypair: Keypair):
|
|
SwapServerTransport.__init__(self, config=config, sm=sm)
|
|
self._offers = {} # type: Dict[str, SwapOffer]
|
|
self.private_key = keypair.privkey
|
|
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
|
|
self.nostr_pubkey = keypair.pubkey.hex()[2:]
|
|
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[str, asyncio.Future]
|
|
self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
|
|
self.relay_manager = None
|
|
self.taskgroup = OldTaskGroup()
|
|
self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]]
|
|
|
|
def __enter__(self):
|
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex, tb):
|
|
fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop)
|
|
fut.result(timeout=5)
|
|
|
|
async def __aenter__(self):
|
|
asyncio.create_task(self.main_loop())
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await wait_for2(self.stop(), timeout=5)
|
|
|
|
@log_exceptions
|
|
async def main_loop(self):
|
|
self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}')
|
|
self.logger.info(f'nostr relays: {self.relays}')
|
|
self.relay_manager = self.get_relay_manager()
|
|
await self.relay_manager.connect()
|
|
connected_relays = self.relay_manager.relays
|
|
self.logger.info(f'connected relays: {[relay.url for relay in connected_relays]}')
|
|
if connected_relays:
|
|
self.is_connected.set()
|
|
if self.sm.is_server:
|
|
tasks = [
|
|
self.check_direct_messages(),
|
|
]
|
|
else:
|
|
tasks = [
|
|
self.check_direct_messages(),
|
|
self.get_pairs(),
|
|
self.update_relays()
|
|
]
|
|
try:
|
|
async with self.taskgroup as group:
|
|
for task in tasks:
|
|
await group.spawn(task)
|
|
except Exception as e:
|
|
self.logger.exception("taskgroup died.")
|
|
finally:
|
|
self.logger.info("taskgroup stopped.")
|
|
|
|
@log_exceptions
|
|
async def stop(self):
|
|
self.logger.info("shutting down nostr transport")
|
|
self.sm.is_initialized.clear()
|
|
await self.taskgroup.cancel_remaining()
|
|
await self.relay_manager.close()
|
|
self.logger.info("nostr transport shut down")
|
|
|
|
@property
|
|
def relays(self):
|
|
our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else []
|
|
if self.sm.is_server:
|
|
return our_relays
|
|
last_swapserver_relays = self._last_swapserver_relays or []
|
|
return list(set(our_relays + last_swapserver_relays))
|
|
|
|
def get_relay_manager(self):
|
|
assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
|
|
if not self.relay_manager:
|
|
if self.uses_proxy:
|
|
proxy = make_aiohttp_proxy_connector(self.network.proxy, self.ssl_context)
|
|
else:
|
|
proxy: Optional['ProxyConnector'] = None
|
|
nostr_logger = self.logger.getChild('aionostr')
|
|
nostr_logger.setLevel('INFO') # DEBUG is very verbose with aionostr
|
|
return aionostr.Manager(
|
|
self.relays,
|
|
private_key=self.nostr_private_key,
|
|
log=nostr_logger,
|
|
ssl_context=self.ssl_context,
|
|
proxy=proxy,
|
|
connect_timeout=self.connect_timeout
|
|
)
|
|
return self.relay_manager
|
|
|
|
def get_offer(self, pubkey: str) -> Optional[SwapOffer]:
|
|
return self._offers.get(pubkey)
|
|
|
|
def get_recent_offers(self) -> Sequence[SwapOffer]:
|
|
# filter to fresh timestamps
|
|
now = int(time.time())
|
|
recent_offers = [x for x in self._offers.values() if now - x.timestamp < 3600]
|
|
# sort by proof-of-work
|
|
recent_offers = sorted(recent_offers, key=lambda x: x.pow_bits, reverse=True)
|
|
# cap list size
|
|
recent_offers = recent_offers[:20]
|
|
return recent_offers
|
|
|
|
@ignore_exceptions
|
|
@log_exceptions
|
|
async def publish_offer(self, sm: 'SwapManager') -> None:
|
|
assert self.sm.is_server
|
|
if sm._max_forward < sm._min_amount and sm._max_reverse < sm._min_amount:
|
|
self.logger.warning(f"not publishing swap offer, no liquidity available: {sm._max_forward=}, {sm._max_reverse=}")
|
|
return
|
|
offer = {
|
|
'percentage_fee': sm.percentage,
|
|
'mining_fee': sm.mining_fee,
|
|
'min_amount': sm._min_amount,
|
|
'max_forward_amount': sm._max_forward,
|
|
'max_reverse_amount': sm._max_reverse,
|
|
'relays': sm.config.NOSTR_RELAYS,
|
|
'pow_nonce': hex(sm.config.SWAPSERVER_ANN_POW_NONCE),
|
|
}
|
|
# the first value of a single letter tag is indexed and can be filtered for
|
|
tags = [['d', f'electrum-swapserver-{self.NOSTR_EVENT_VERSION}'],
|
|
['r', 'net:' + constants.net.NET_NAME],
|
|
['expiration', str(int(time.time() + self.OFFER_UPDATE_INTERVAL_SEC + 10))]]
|
|
event_id = await aionostr._add_event(
|
|
self.relay_manager,
|
|
kind=self.USER_STATUS_NIP38,
|
|
tags=tags,
|
|
content=json.dumps(offer),
|
|
private_key=self.nostr_private_key)
|
|
self.logger.info(f"published offer {event_id}")
|
|
|
|
async def send_direct_message(self, pubkey: str, content: str) -> str:
|
|
our_private_key = aionostr.key.PrivateKey(self.private_key)
|
|
recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
|
|
encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
|
|
event_id = await aionostr._add_event(
|
|
self.relay_manager,
|
|
kind=self.EPHEMERAL_REQUEST,
|
|
content=encrypted_msg,
|
|
private_key=self.nostr_private_key,
|
|
tags=[['p', recv_pubkey_hex]],
|
|
)
|
|
return event_id
|
|
|
|
@log_exceptions
|
|
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
|
|
self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}")
|
|
request_data['method'] = method
|
|
server_npub = self.config.SWAPSERVER_NPUB
|
|
event_id = await self.send_direct_message(server_npub, json.dumps(request_data))
|
|
response = await self.dm_replies[event_id]
|
|
if 'error' in response:
|
|
self.logger.warning(f"error from swap server [DO NOT TRUST THIS MESSAGE]: {response['error']}")
|
|
raise SwapServerError()
|
|
return response
|
|
|
|
async def get_pairs(self):
|
|
await self.is_connected.wait()
|
|
query = {
|
|
"kinds": [self.USER_STATUS_NIP38],
|
|
"limit": 10,
|
|
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
|
|
"#r": [f"net:{constants.net.NET_NAME}"],
|
|
"since": int(time.time()) - 60 * 60,
|
|
"until": int(time.time()) + 60 * 60,
|
|
}
|
|
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
|
|
try:
|
|
content = json.loads(event.content)
|
|
tags = {k: v for k, v in event.tags}
|
|
except Exception as e:
|
|
self.logger.debug(f"failed to parse event: {e}")
|
|
continue
|
|
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
|
|
continue
|
|
if tags.get('r') != f"net:{constants.net.NET_NAME}":
|
|
continue
|
|
if (event.created_at > int(time.time()) + 60 * 60
|
|
or event.created_at < int(time.time()) - 60 * 60):
|
|
continue
|
|
# check if this is the most recent event for this pubkey
|
|
pubkey = event.pubkey
|
|
prev_offer = self._offers.get(to_nip19('npub', pubkey))
|
|
if prev_offer and event.created_at <= prev_offer.timestamp:
|
|
continue
|
|
try:
|
|
pow_bits = get_nostr_ann_pow_amount(
|
|
bytes.fromhex(pubkey),
|
|
int(content.get('pow_nonce', "0"), 16)
|
|
)
|
|
except ValueError:
|
|
continue
|
|
if pow_bits < self.config.SWAPSERVER_POW_TARGET:
|
|
self.logger.debug(f"too low pow: {pubkey}: pow: {pow_bits} nonce: {content.get('pow_nonce', 0)}")
|
|
continue
|
|
server_relays = content['relays'].split(',') if 'relays' in content else []
|
|
try:
|
|
pairs = SwapFees(
|
|
percentage=content['percentage_fee'],
|
|
mining_fee=content['mining_fee'],
|
|
min_amount=content['min_amount'],
|
|
max_forward=content['max_forward_amount'],
|
|
max_reverse=content['max_reverse_amount'],
|
|
)
|
|
except Exception:
|
|
self.logger.debug(f"swap fees couldn't be parsed", exc_info=True)
|
|
continue
|
|
offer = SwapOffer(
|
|
pairs=pairs,
|
|
relays=server_relays[:10],
|
|
timestamp=event.created_at,
|
|
server_pubkey=pubkey,
|
|
pow_bits=pow_bits,
|
|
)
|
|
if self.config.SWAPSERVER_NPUB == offer.server_npub:
|
|
self.sm.update_pairs(pairs)
|
|
self._offers[offer.server_npub] = offer
|
|
# mirror event to other relays
|
|
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))
|
|
|
|
async def update_relays(self):
|
|
"""
|
|
Update the relays when update_pairs is called.
|
|
This ensures we try to connect to the same relays as the ones announced by the swap server.
|
|
"""
|
|
while True:
|
|
previous_relays = self._last_swapserver_relays
|
|
await self.sm.pairs_updated.wait()
|
|
latest_known_relays = self._offers[self.config.SWAPSERVER_NPUB].relays
|
|
if latest_known_relays != previous_relays:
|
|
self.logger.debug(f"swapserver relays changed, updating relay list.")
|
|
# store the latest known relays to a file
|
|
self._store_last_swapserver_relays(latest_known_relays)
|
|
# update the relay manager
|
|
await self.relay_manager.update_relays(self.relays)
|
|
|
|
async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
|
|
"""If the relays of the origin server are different from our relays we rebroadcast the
|
|
event to our relays so it gets spread more widely."""
|
|
if not server_relays:
|
|
return
|
|
rebroadcast_relays = [relay for relay in self.relay_manager.relays if
|
|
relay.url not in server_relays]
|
|
for relay in rebroadcast_relays:
|
|
try:
|
|
res = await relay.add_event(event, check_response=True)
|
|
except Exception as e:
|
|
self.logger.debug(f"failed to rebroadcast event to {relay.url}: {e}")
|
|
continue
|
|
self.logger.debug(f"rebroadcasted event to {relay.url}: {res}")
|
|
|
|
@log_exceptions
|
|
async def check_direct_messages(self):
|
|
privkey = aionostr.key.PrivateKey(self.private_key)
|
|
query = {"kinds": [self.EPHEMERAL_REQUEST], "limit":0, "#p": [self.nostr_pubkey]}
|
|
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
|
|
try:
|
|
content = privkey.decrypt_message(event.content, event.pubkey)
|
|
content = json.loads(content)
|
|
except Exception:
|
|
continue
|
|
content['event_id'] = event.id
|
|
content['event_pubkey'] = event.pubkey
|
|
if 'reply_to' in content:
|
|
self.dm_replies[content['reply_to']].set_result(content)
|
|
elif self.sm.is_server and 'method' in content:
|
|
try:
|
|
await self.handle_request(content)
|
|
except Exception as e:
|
|
self.logger.exception(f"failed to handle request: {content}")
|
|
error_response = json.dumps({
|
|
"error": str(e)[:100],
|
|
"reply_to": event.id,
|
|
})
|
|
await self.send_direct_message(event.pubkey,[], error_response)
|
|
else:
|
|
self.logger.info(f'unknown message {content}')
|
|
|
|
@log_exceptions
|
|
async def handle_request(self, request):
|
|
assert self.sm.is_server
|
|
# todo: remember event_id of already processed requests
|
|
method = request.pop('method')
|
|
event_id = request.pop('event_id')
|
|
event_pubkey = request.pop('event_pubkey')
|
|
self.logger.info(f'handle_request: id={event_id} {method} {request}')
|
|
if method == 'addswapinvoice':
|
|
r = self.sm.server_add_swap_invoice(request)
|
|
elif method == 'createswap':
|
|
r = self.sm.server_create_swap(request)
|
|
elif method == 'createnormalswap':
|
|
r = self.sm.server_create_normal_swap(request)
|
|
else:
|
|
raise Exception(method)
|
|
r['reply_to'] = event_id
|
|
self.logger.debug(f'sending response id={event_id}')
|
|
await self.send_direct_message(event_pubkey, json.dumps(r))
|
|
|
|
def _store_last_swapserver_relays(self, relays: Sequence[str]):
|
|
self._last_swapserver_relays = relays
|
|
if not self.config.path or not relays:
|
|
return
|
|
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
|
|
try:
|
|
with open(storage_path, 'w', encoding="utf-8") as f:
|
|
json.dump(relays, f, indent=4, sort_keys=True) # type: ignore
|
|
except Exception:
|
|
self.logger.exception(f"failed to write last swapserver relays to {storage_path}")
|
|
|
|
def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]:
|
|
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
|
|
if not os.path.exists(storage_path):
|
|
return None
|
|
try:
|
|
with open(storage_path, 'r', encoding="utf-8") as f:
|
|
relays = json.load(f)
|
|
except Exception:
|
|
self.logger.exception(f"failed to read last swapserver relays from {storage_path}")
|
|
return None
|
|
return relays
|