The class TxBatcher handles the creation, broadcast and replacement
of replaceable transactions. Callers (LNWatcher, SwapManager) use
methods add_payment_output and add_sweep_info. Transactions
created by TxBatcher may combine sweeps and outgoing payments.
Transactions created by TxBatcher will have their fee bumped
automatically (this was only the case for sweeps before).
TxBatcher manages several TxBatches. TxBatches are created
dynamically when needed.
The GUI does not touch txbatcher transactions:
- wallet.get_candidates_for_batching excludes txbatcher
transactions
- RBF dialogs do not work with txbatcher transactions
wallet:
- instead of reading config variables, make_unsigned_transaction
takes new parameters: base_tx, send_change_to_lighting
tests:
- unit tests in test_txbatcher.py (replaces test_sswaps.py)
- force all regtests to use MPP, so that we sweep transactions
with several HTLCs. This forces the payment manager to aggregate
first-stage HTLC tx inputs. second-stage are not batched for now.
275 lines
11 KiB
Python
275 lines
11 KiB
Python
# Copyright (C) 2018 The Electrum developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
|
|
|
|
from typing import TYPE_CHECKING
|
|
from enum import IntEnum, auto
|
|
|
|
from .util import log_exceptions, ignore_exceptions, TxMinedInfo, BelowDustLimit
|
|
from .util import EventListener, event_listener
|
|
from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_FUTURE
|
|
from .transaction import Transaction, TxOutpoint
|
|
from .logging import Logger
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from .network import Network
|
|
from .lnsweep import SweepInfo
|
|
from .lnworker import LNWallet
|
|
from .lnchannel import AbstractChannel
|
|
|
|
class TxMinedDepth(IntEnum):
|
|
""" IntEnum because we call min() in get_deepest_tx_mined_depth_for_txids """
|
|
DEEP = auto()
|
|
SHALLOW = auto()
|
|
MEMPOOL = auto()
|
|
FREE = auto()
|
|
|
|
|
|
class LNWatcher(Logger, EventListener):
|
|
|
|
LOGGING_SHORTCUT = 'W'
|
|
|
|
def __init__(self, adb: 'AddressSynchronizer', network: 'Network'):
|
|
|
|
Logger.__init__(self)
|
|
self.adb = adb
|
|
self.config = network.config
|
|
self.callbacks = {} # address -> lambda: coroutine
|
|
self.network = network
|
|
self.register_callbacks()
|
|
# status gets populated when we run
|
|
self.channel_status = {}
|
|
|
|
async def stop(self):
|
|
self.unregister_callbacks()
|
|
|
|
def get_channel_status(self, outpoint):
|
|
return self.channel_status.get(outpoint, 'unknown')
|
|
|
|
def add_channel(self, outpoint: str, address: str) -> None:
|
|
assert isinstance(outpoint, str)
|
|
assert isinstance(address, str)
|
|
cb = lambda: self.check_onchain_situation(address, outpoint)
|
|
self.add_callback(address, cb)
|
|
|
|
async def unwatch_channel(self, address, funding_outpoint):
|
|
self.logger.info(f'unwatching {funding_outpoint}')
|
|
self.remove_callback(address)
|
|
|
|
def remove_callback(self, address):
|
|
self.callbacks.pop(address, None)
|
|
|
|
def add_callback(self, address, callback):
|
|
self.adb.add_address(address)
|
|
self.callbacks[address] = callback
|
|
|
|
@event_listener
|
|
async def on_event_blockchain_updated(self, *args):
|
|
await self.trigger_callbacks()
|
|
|
|
@event_listener
|
|
async def on_event_wallet_updated(self, wallet):
|
|
# called if we add local tx
|
|
if wallet.adb != self.adb:
|
|
return
|
|
await self.trigger_callbacks()
|
|
|
|
@event_listener
|
|
async def on_event_adb_added_verified_tx(self, adb, tx_hash):
|
|
if adb != self.adb:
|
|
return
|
|
await self.trigger_callbacks()
|
|
|
|
@event_listener
|
|
async def on_event_adb_set_up_to_date(self, adb):
|
|
if adb != self.adb:
|
|
return
|
|
await self.trigger_callbacks()
|
|
|
|
@log_exceptions
|
|
async def trigger_callbacks(self):
|
|
if not self.adb.synchronizer:
|
|
self.logger.info("synchronizer not set yet")
|
|
return
|
|
for address, callback in list(self.callbacks.items()):
|
|
await callback()
|
|
|
|
async def check_onchain_situation(self, address, funding_outpoint):
|
|
# early return if address has not been added yet
|
|
if not self.adb.is_mine(address):
|
|
return
|
|
# inspect_tx_candidate might have added new addresses, in which case we return early
|
|
if not self.adb.is_up_to_date():
|
|
return
|
|
funding_txid = funding_outpoint.split(':')[0]
|
|
funding_height = self.adb.get_tx_height(funding_txid)
|
|
closing_txid = self.get_spender(funding_outpoint)
|
|
closing_height = self.adb.get_tx_height(closing_txid)
|
|
if closing_txid:
|
|
closing_tx = self.adb.get_transaction(closing_txid)
|
|
if closing_tx:
|
|
keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx)
|
|
else:
|
|
self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...")
|
|
keep_watching = True
|
|
else:
|
|
keep_watching = True
|
|
await self.update_channel_state(
|
|
funding_outpoint=funding_outpoint,
|
|
funding_txid=funding_txid,
|
|
funding_height=funding_height,
|
|
closing_txid=closing_txid,
|
|
closing_height=closing_height,
|
|
keep_watching=keep_watching)
|
|
if not keep_watching:
|
|
await self.unwatch_channel(address, funding_outpoint)
|
|
|
|
async def sweep_commitment_transaction(self, funding_outpoint: str, closing_tx: Transaction) -> bool:
|
|
raise NotImplementedError() # implemented by subclasses
|
|
|
|
async def update_channel_state(self, *, funding_outpoint: str, funding_txid: str,
|
|
funding_height: TxMinedInfo, closing_txid: str,
|
|
closing_height: TxMinedInfo, keep_watching: bool) -> None:
|
|
raise NotImplementedError() # implemented by subclasses
|
|
|
|
|
|
def get_spender(self, outpoint) -> str:
|
|
"""
|
|
returns txid spending outpoint.
|
|
subscribes to addresses as a side effect.
|
|
"""
|
|
prev_txid, index = outpoint.split(':')
|
|
spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index))
|
|
# discard local spenders
|
|
tx_mined_status = self.adb.get_tx_height(spender_txid)
|
|
if tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
|
|
spender_txid = None
|
|
if not spender_txid:
|
|
return
|
|
spender_tx = self.adb.get_transaction(spender_txid)
|
|
for i, o in enumerate(spender_tx.outputs()):
|
|
if o.address is None:
|
|
continue
|
|
if not self.adb.is_mine(o.address):
|
|
self.adb.add_address(o.address)
|
|
return spender_txid
|
|
|
|
def get_tx_mined_depth(self, txid: str):
|
|
if not txid:
|
|
return TxMinedDepth.FREE
|
|
tx_mined_depth = self.adb.get_tx_height(txid)
|
|
height, conf = tx_mined_depth.height, tx_mined_depth.conf
|
|
if conf > 20:
|
|
return TxMinedDepth.DEEP
|
|
elif conf > 0:
|
|
return TxMinedDepth.SHALLOW
|
|
elif height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
|
|
return TxMinedDepth.MEMPOOL
|
|
elif height in (TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE):
|
|
return TxMinedDepth.FREE
|
|
elif height > 0 and conf == 0:
|
|
# unverified but claimed to be mined
|
|
return TxMinedDepth.MEMPOOL
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
def is_deeply_mined(self, txid):
|
|
return self.get_tx_mined_depth(txid) == TxMinedDepth.DEEP
|
|
|
|
|
|
|
|
class LNWalletWatcher(LNWatcher):
|
|
|
|
def __init__(self, lnworker: 'LNWallet', network: 'Network'):
|
|
self.network = network
|
|
self.lnworker = lnworker
|
|
LNWatcher.__init__(self, lnworker.wallet.adb, network)
|
|
|
|
@event_listener
|
|
async def on_event_blockchain_updated(self, *args):
|
|
# overload parent method with cache invalidation
|
|
# we invalidate the cache on each new block because
|
|
# some processes affect the list of sweep transactions
|
|
# (hold invoice preimage revealed, MPP completed, etc)
|
|
for chan in self.lnworker.channels.values():
|
|
chan._sweep_info.clear()
|
|
await self.trigger_callbacks()
|
|
|
|
def diagnostic_name(self):
|
|
return f"{self.lnworker.wallet.diagnostic_name()}-LNW"
|
|
|
|
@ignore_exceptions
|
|
@log_exceptions
|
|
async def update_channel_state(self, *, funding_outpoint: str, funding_txid: str,
|
|
funding_height: TxMinedInfo, closing_txid: str,
|
|
closing_height: TxMinedInfo, keep_watching: bool) -> None:
|
|
chan = self.lnworker.channel_by_txo(funding_outpoint)
|
|
if not chan:
|
|
return
|
|
chan.update_onchain_state(
|
|
funding_txid=funding_txid,
|
|
funding_height=funding_height,
|
|
closing_txid=closing_txid,
|
|
closing_height=closing_height,
|
|
keep_watching=keep_watching)
|
|
await self.lnworker.handle_onchain_state(chan)
|
|
|
|
@log_exceptions
|
|
async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool:
|
|
"""This function is called when a channel was closed. In this case
|
|
we need to check for redeemable outputs of the commitment transaction
|
|
or spenders down the line (HTLC-timeout/success transactions).
|
|
|
|
Returns whether we should continue to monitor."""
|
|
chan = self.lnworker.channel_by_txo(funding_outpoint)
|
|
if not chan:
|
|
return False
|
|
# detect who closed and get information about how to claim outputs
|
|
sweep_info_dict = chan.sweep_ctx(closing_tx)
|
|
#self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}")
|
|
keep_watching = False if sweep_info_dict else not self.is_deeply_mined(closing_tx.txid())
|
|
# create and broadcast transactions
|
|
for prevout, sweep_info in sweep_info_dict.items():
|
|
prev_txid, prev_index = prevout.split(':')
|
|
name = sweep_info.name + ' ' + chan.get_id_for_log()
|
|
self.lnworker.wallet.set_default_label(prevout, name)
|
|
if not self.adb.get_transaction(prev_txid):
|
|
# do not keep watching if prevout does not exist
|
|
self.logger.info(f'prevout does not exist for {name}: {prevout}')
|
|
continue
|
|
spender_txid = self.get_spender(prevout)
|
|
spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None
|
|
if spender_tx:
|
|
# the spender might be the remote, revoked or not
|
|
htlc_sweepinfo = chan.maybe_sweep_htlcs(closing_tx, spender_tx)
|
|
for prevout2, htlc_sweep_info in htlc_sweepinfo.items():
|
|
htlc_tx_spender = self.get_spender(prevout2)
|
|
self.lnworker.wallet.set_default_label(prevout2, htlc_sweep_info.name)
|
|
if htlc_tx_spender:
|
|
keep_watching |= not self.is_deeply_mined(htlc_tx_spender)
|
|
else:
|
|
keep_watching |= self.maybe_redeem(htlc_sweep_info)
|
|
keep_watching |= not self.is_deeply_mined(spender_txid)
|
|
self.maybe_extract_preimage(chan, spender_tx, prevout)
|
|
else:
|
|
keep_watching |= self.maybe_redeem(sweep_info)
|
|
return keep_watching
|
|
|
|
def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool:
|
|
""" returns False if it was dust """
|
|
try:
|
|
self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info, self.config.FEE_POLICY_LIGHTNING)
|
|
except BelowDustLimit:
|
|
return False
|
|
return True
|
|
|
|
def maybe_extract_preimage(self, chan: 'AbstractChannel', spender_tx: Transaction, prevout: str):
|
|
txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout))
|
|
assert txin_idx is not None
|
|
spender_txin = spender_tx.inputs()[txin_idx]
|
|
chan.extract_preimage_from_htlc_txin(
|
|
spender_txin,
|
|
is_deeply_mined=self.is_deeply_mined(spender_tx.txid()),
|
|
)
|