Files
pallectrum/electrum/txbatcher.py
ThomasV 5f30f2a0c0 txbatcher: be careful when removing local transactions
1. Do not remove local transaction in find_base_tx.

This logic was intended to cleanup claim transactions that are
never broadcast (for example, if the counterparty gets a refund)
(see 1bf1de36cb)

However, this code is too unspecific and may result in fund loss,
because the transaction being removed may contain outgoing payments.
For example, if the electrum server is not responsive, the tx will
be seen as local and deleted. In that case, another payment will
be attempted, thus paying twice.

2. Do not remove tx after try_broadcasting returns False.

The server might be lying to us. We can only remove the local tx
if there is a base_tx, because the next tx we create will try to
spend the same output.
2025-08-21 10:26:35 +02:00

612 lines
27 KiB
Python

# This class batches outgoing payments and incoming utxo sweeps.
# It ensures that we do not send a payment twice.
#
# Explanation of the problem:
# Suppose we are asked to send two payments: first o1, and then o2.
# We replace tx1(o1) (that pays to o1) with tx1'(o1,o2), that pays to o1 and o2.
# tx1 and tx1' use the same inputs, so they cannot both be mined in the same blockchain.
# If tx1 is mined instead of tx1', we now need to pay o2, so we will broadcast a new transaction tx2(o2).
# However, tx1 may be removed from the blockchain, due to a reorg, and a chain with tx1' can become the valid one.
# In that case, we might pay o2 twice: with tx1' and with tx2
#
# The following code prevents that by making tx2 a child of tx1.
# This is denoted by tx2(tx1|o2).
#
# Example:
#
# output 1: tx1(o1) ---------------
# \
# output 2: tx1'(o1,o2)------- ----> tx2(tx1|o2) ------
# \ \ \
# output 3: tx1''(o1,o2,o3) \ ---> tx2'(tx1|o2,o3) ----> tx3(tx2|o3) (if tx2 is mined)
# \
# --------------------------------> tx3(tx1'|o3) (if tx1' is mined)
#
# In the above example, we have to make 3 payments.
# Suppose we have broadcast tx1, tx1' and tx1''
# - if tx1 gets mined, we broadcast: tx2'(tx1|o2,o3)
# - if tx1' gets mined, we broadcast tx3(tx1'|o3)
#
# Note that there are two possible execution paths that may lead to the creation of tx3:
# - as a child of tx2
# - as a child of tx1'
#
# A batch is a set of incompatible txs, such as [tx1, tx1', tx1''].
# Note that we do not persist older batches. We only persist the current batch in self.batch_txids.
# Thus, if we need to broadcast tx2 or tx2', then self.batch_txids is reset, and the old batch is forgotten.
#
# If we cannot RBF a transaction (because the server returns an error), then we create a new batch,
# as if the transaction had been mined.
# if cannot_rbf(tx1) -> broadcast tx2(tx1,o2). The new base is now tx2(tx,o2)
# if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3)
#
#
# Notes:
#
# 1. CPFP:
# When a batch is forgotten but not mined (because the server returned an error), we no longer bump its fee.
# However, the current code does not theat the next batch as a CPFP when computing the fee.
#
# 2. Reorgs:
# This code does not guarantee that a payment or a sweep will happen.
# This is fine for sweeps; it is the responsibility of the caller (lnwatcher) to add them again.
# To make payments reorg-safe, we would need to persist more data and redo failed payments.
#
# 3. batch_payments and batch_inputs are not persisted.
# In the case of sweeps, lnwatcher ensures that SweepInfo is added again after a client restart.
# In order to generalize that logic to payments, callers would need to pass a unique ID along with
# the payment output, so that we can prevent paying twice.
import asyncio
import threading
import copy
from typing import Dict, Sequence, Optional, TYPE_CHECKING, Mapping, Set, List, Tuple
from . import util
from .bitcoin import dust_threshold
from .logging import Logger
from .util import log_exceptions, NotEnoughFunds, BelowDustLimit, NoDynamicFeeEstimates, OldTaskGroup
from .transaction import PartialTransaction, PartialTxOutput, Transaction, TxOutpoint, PartialTxInput
from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE
from .lnsweep import SweepInfo
from .json_db import locked, StoredDict
from .fee_policy import FeePolicy
if TYPE_CHECKING:
from .wallet import Abstract_Wallet
class TxBatcher(Logger):
SLEEP_INTERVAL = 1
def __init__(self, wallet: 'Abstract_Wallet'):
Logger.__init__(self)
self.lock = threading.RLock()
self.storage = wallet.db.get_stored_item("tx_batches", {})
self.tx_batches = {} # type: Dict[str, TxBatch]
self.wallet = wallet
for key, item_storage in self.storage.items():
self.tx_batches[key] = TxBatch(self.wallet, item_storage)
self._legacy_htlcs = {} # type: Dict[TxOutpoint, SweepInfo]
self.taskgroup = None # type: Optional[OldTaskGroup]
self.password_future = None # type: Optional[asyncio.Future[Optional[str]]]
@locked
def add_payment_output(self, key: str, output: 'PartialTxOutput') -> None:
batch = self._maybe_create_new_batch(key, fee_policy_name=key)
batch.add_payment_output(output)
@locked
def add_sweep_input(self, key: str, sweep_info: 'SweepInfo') -> None:
"""Can raise BelowDustLimit or NoDynamicFeeEstimates."""
if sweep_info.txin and sweep_info.txout:
# detect legacy htlc using name and csv delay
if sweep_info.name in ['received-htlc', 'offered-htlc'] and sweep_info.csv_delay == 0:
if sweep_info.txin.prevout not in self._legacy_htlcs:
self.logger.info(f'received {sweep_info.name}')
self._legacy_htlcs[sweep_info.txin.prevout] = sweep_info
return
fee_policy_name = key
if not sweep_info.can_be_batched:
# create a batch only for that input
key = sweep_info.txin.prevout.to_str()
batch = self._maybe_create_new_batch(key, fee_policy_name)
batch.add_sweep_input(sweep_info)
def _maybe_create_new_batch(self, key: str, fee_policy_name: str) -> 'TxBatch':
assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!"
if key not in self.storage:
self.logger.info(f'creating new batch: {key}')
self.storage[key] = { 'fee_policy_name': fee_policy_name, 'txids': [], 'prevout': None }
self.tx_batches[key] = batch = TxBatch(self.wallet, self.storage[key])
if self.taskgroup:
asyncio.ensure_future(self.taskgroup.spawn(self.run_batch(key, batch)))
return self.tx_batches[key]
@locked
def delete_batch(self, key: str) -> None:
self.logger.info(f'deleting TxBatch {key}')
self.storage.pop(key)
self.tx_batches.pop(key)
def find_batch_by_prevout(self, prevout: str) -> Optional['TxBatch']:
for k, v in self.tx_batches.items():
if v._prevout == prevout:
return v
return None
def find_batch_of_txid(self, txid: str) -> Optional[str]:
for k, v in self.tx_batches.items():
if v.is_mine(txid):
return k
return None
def is_mine(self, txid: str) -> bool:
# used to prevent GUI from interfering
return bool(self.find_batch_of_txid(txid))
async def run_batch(self, key: str, batch: 'TxBatch') -> None:
await batch.run()
self.delete_batch(key)
@log_exceptions
async def run(self):
self.taskgroup = OldTaskGroup()
for key, batch in self.tx_batches.items():
await self.taskgroup.spawn(self.run_batch(key, batch))
async with self.taskgroup as group:
await group.spawn(self.redeem_legacy_htlcs())
async def redeem_legacy_htlcs(self) -> None:
while True:
await asyncio.sleep(self.SLEEP_INTERVAL)
for sweep_info in self._legacy_htlcs.values():
await self._maybe_redeem_legacy_htlcs(sweep_info)
async def _maybe_redeem_legacy_htlcs(self, sweep_info: 'SweepInfo') -> None:
assert sweep_info.csv_delay == 0
local_height = self.wallet.network.get_local_height()
wanted_height = sweep_info.cltv_abs
if wanted_height - local_height > 0:
return
outpoint = sweep_info.txin.prevout.to_str()
prev_txid, index = outpoint.split(':')
if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)):
tx_mined_status = self.wallet.adb.get_tx_height(spender_txid)
if tx_mined_status.height > 0:
return
if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
return
self.logger.info(f'will broadcast standalone tx {sweep_info.name}')
tx = PartialTransaction.from_io([sweep_info.txin], [sweep_info.txout], locktime=sweep_info.cltv_abs, version=2)
self.wallet.sign_transaction(tx, password=None, ignore_warnings=True)
if await self.wallet.network.try_broadcasting(tx, sweep_info.name):
self.wallet.adb.add_transaction(tx)
async def get_password(self, txid: str) -> Optional[str]:
# daemon, android have password in memory
password = self.wallet.get_unlocked_password()
if password:
return password
future = self.get_password_future(txid)
try:
await future
except asyncio.CancelledError as e:
return None
password = future.result()
return password
@locked
def set_password_future(self, password: Optional[str]) -> None:
if self.password_future is not None:
if password is not None:
self.password_future.set_result(password)
else:
self.password_future.cancel()
self.password_future = None
util.trigger_callback('password_not_required', self.wallet)
@locked
def get_password_future(self, txid: str):
if self.password_future is None:
self.password_future = asyncio.Future()
self.password_future.txids = []
self.logger.info(f'password required: {txid}')
self.password_future.txids.append(txid)
util.trigger_callback('password_required', self.wallet)
return self.password_future
class TxBatch(Logger):
def __init__(self, wallet: 'Abstract_Wallet', storage: StoredDict):
Logger.__init__(self)
self.wallet = wallet
self.storage = storage
self.lock = threading.RLock()
self.batch_payments = [] # type: List[PartialTxOutput] # payments we need to make
self.batch_inputs = {} # type: Dict[TxOutpoint, SweepInfo] # inputs we need to sweep
# list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined.
self._prevout = storage.get('prevout') # type: Optional[str]
self._batch_txids = storage['txids'] # type: List[str]
self._fee_policy_name = storage.get('fee_policy_name', 'default') # type: str
self._base_tx = None # type: Optional[PartialTransaction] # current batch tx. last element of batch_txids
self._parent_tx = None # type: Optional[PartialTransaction]
self._unconfirmed_sweeps = set() # type: Set[TxOutpoint] # inputs we are sweeping (until spending tx is confirmed)
@property
def fee_policy(self) -> FeePolicy:
# this assumes the descriptor is in config.fee_policy
cv_name = 'fee_policy' + '.' + self._fee_policy_name
descriptor = self.wallet.config.get(cv_name, 'eta:2')
return FeePolicy(descriptor)
@log_exceptions
async def run(self) -> None:
while not self.is_done():
await asyncio.sleep(self.wallet.txbatcher.SLEEP_INTERVAL)
if not (self.wallet.network and self.wallet.network.is_connected()):
continue
try:
await self.run_iteration()
except Exception as e:
self.logger.exception(f'TxBatch error: {repr(e)}')
break
def is_mine(self, txid: str) -> bool:
return txid in self._batch_txids
@locked
def add_payment_output(self, output: 'PartialTxOutput') -> None:
# todo: maybe we should raise NotEnoughFunds here
self.batch_payments.append(output)
def is_dust(self, sweep_info: SweepInfo) -> bool:
"""Can raise NoDynamicFeeEstimates."""
if sweep_info.is_anchor():
return False
if sweep_info.txout is not None:
return False
value = sweep_info.txin.value_sats()
witness_size = len(sweep_info.txin.make_witness(71*b'\x00'))
tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh
self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}')
fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.wallet.network)
return value - fee <= dust_threshold()
@locked
def add_sweep_input(self, sweep_info: 'SweepInfo') -> None:
"""Can raise BelowDustLimit or NoDynamicFeeEstimates."""
if self.is_dust(sweep_info):
# note: this uses the current fee estimates. Just because something is dust
# at the current fee levels, if fees go down, it might still become
# worthwhile to sweep. So callers might want to retry later.
raise BelowDustLimit
txin = sweep_info.txin
if txin.prevout in self._unconfirmed_sweeps:
return
# early return if the spending tx is confirmed
# if its block is orphaned, the txin will be added again
prevout = txin.prevout.to_str()
prev_txid, index = prevout.split(':')
if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)):
tx_mined_status = self.wallet.adb.get_tx_height(spender_txid)
if tx_mined_status.height > 0:
return
self._unconfirmed_sweeps.add(txin.prevout)
self.logger.info(f'add_sweep_info: {sweep_info.name} {sweep_info.txin.prevout.to_str()}')
self.batch_inputs[txin.prevout] = sweep_info
@locked
def _to_pay_after(self, tx: Optional[PartialTransaction]) -> Sequence[PartialTxOutput]:
if not tx:
return self.batch_payments
to_pay = []
outputs = copy.deepcopy(tx.outputs())
for x in self.batch_payments:
if x not in outputs:
to_pay.append(x)
else:
outputs.remove(x)
return to_pay
@locked
def _to_sweep_after(self, tx: Optional[PartialTransaction]) -> Dict[str, SweepInfo]:
tx_prevouts = set(txin.prevout for txin in tx.inputs()) if tx else set()
result = []
for k, v in list(self.batch_inputs.items()):
prevout = v.txin.prevout
prev_txid, index = prevout.to_str().split(':')
if not self.wallet.adb.db.get_transaction(prev_txid):
continue
if v.is_anchor():
prev_tx_mined_status = self.wallet.adb.get_tx_height(prev_txid)
if prev_tx_mined_status.conf > 0:
self.logger.info(f"anchor not needed {k}")
self.batch_inputs.pop(k) # note: if the input is already in a batch tx, this will trigger assert error
continue
if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)):
tx_mined_status = self.wallet.adb.get_tx_height(spender_txid)
if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
continue
if prevout in tx_prevouts:
continue
result.append((k,v))
return dict(result)
def _should_bump_fee(self, base_tx: Optional[PartialTransaction]) -> bool:
if base_tx is None:
return False
if not self.is_mine(base_tx.txid()):
return False
base_tx_fee = base_tx.get_fee()
recommended_fee = self.fee_policy.estimate_fee(base_tx.estimated_size(), network=self.wallet.network)
should_bump_fee = base_tx_fee * 1.1 < recommended_fee
if should_bump_fee:
self.logger.info(f'base tx fee too low {base_tx_fee} < {recommended_fee}. we will bump the fee')
return should_bump_fee
def is_done(self):
# todo: require more than one confirmation
return len(self.batch_inputs) == 0 and len(self.batch_payments) == 0 and len(self._batch_txids) == 0
async def find_base_tx(self) -> Optional[PartialTransaction]:
if not self._prevout:
return None
prev_txid, index = self._prevout.split(':')
txid = self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index))
tx = self.wallet.adb.get_transaction(txid) if txid else None
if not tx:
return None
tx = PartialTransaction.from_tx(tx)
tx.add_info_from_wallet(self.wallet) # this sets is_change
if self.is_mine(txid):
if self._base_tx is None:
self.logger.info(f'found base_tx {txid}')
self._base_tx = tx
else:
self.logger.info(f'base tx was replaced by {tx.txid()}')
self._new_base_tx(tx)
# if tx is confirmed or local, we will start a new batch
tx_mined_status = self.wallet.adb.get_tx_height(txid)
if tx_mined_status.conf > 0:
self.logger.info(f'base tx confirmed {txid}')
self._clear_unconfirmed_sweeps(tx)
self._start_new_batch(tx)
if tx_mined_status.height in [TX_HEIGHT_LOCAL]:
# this may happen if our Electrum server is unresponsive
# server could also be lying to us. Rebroadcasting might
# help, if we have switched to another server.
await self.wallet.network.try_broadcasting(tx, 'batch')
return self._base_tx
async def run_iteration(self) -> None:
base_tx = await self.find_base_tx()
try:
tx = self.create_next_transaction(base_tx)
except NoDynamicFeeEstimates:
self.logger.debug('no dynamic fee estimates available')
return
except Exception as e:
if base_tx:
self.logger.exception(f'Cannot create batch transaction: {repr(e)}')
self._start_new_batch(base_tx)
return
else:
# will be caught by txBatcher
raise
if tx is None:
# nothing to do
return
# add tx to wallet, in order to reserve utxos
# note: This saves the tx as local *unsigned*.
# It will transition to local and signed, after we broadcast
# the signed tx and get it back via the Synchronizer dance.
self.wallet.adb.add_transaction(tx)
# await password
if not await self.sign_transaction(tx):
self.wallet.adb.remove_transaction(tx.txid())
return
# save local base_tx
self._new_base_tx(tx)
if not await self.wallet.network.try_broadcasting(tx, 'batch'):
self.logger.info(f'cannot broadcast tx {tx.txid()}')
if base_tx:
# The most likely cause is that base_tx is not
# replaceable. This may be the case if it has children
# (because we don't pay enough fees to replace them)
# or if we are trying to sweep unconfirmed inputs
# (replacement-adds-unconfirmed error)
# it is OK to remove the transaction, because
# create_next_transaction will create a new tx that is
# incompatible with the one we remove here, so we
# cannot double pay.
self.wallet.adb.remove_transaction(tx.txid())
self.logger.info(f'starting new batch because could not broadcast')
self._start_new_batch(base_tx)
else:
# it is dangerous to remove the transaction if there
# is no base_tx. Indeed, the transaction might have
# been broadcast. So, we just keep the transaction as
# local, and we will try to rebroadcast it later (see
# above).
#
# FIXME: it should be possible to ensure that
# create_next_transaction creates transactions that
# spend the same coins, using self._prevout. This
# would make them incompatible, and safe to broadcast.
pass
async def sign_transaction(self, tx: PartialTransaction) -> Optional[PartialTransaction]:
tx.add_info_from_wallet(self.wallet) # this adds input amounts
self.add_sweep_info_to_tx(tx)
pw_required = self.wallet.has_keystore_encryption() and tx.requires_keystore()
password = await self.wallet.txbatcher.get_password(tx.txid()) if pw_required else None
if password is None and pw_required:
return None
self.wallet.sign_transaction(tx, password)
assert tx.is_complete()
return tx
def create_next_transaction(self, base_tx: Optional[PartialTransaction]) -> Optional[PartialTransaction]:
to_pay = self._to_pay_after(base_tx)
to_sweep = self._to_sweep_after(base_tx)
to_sweep_now = {}
for k, v in to_sweep.items():
can_broadcast, wanted_height = self._can_broadcast(v, base_tx)
if can_broadcast:
to_sweep_now[k] = v
else:
self.wallet.add_future_tx(v, wanted_height)
while True:
if not to_pay and not to_sweep_now and not self._should_bump_fee(base_tx):
return None
try:
tx = self._create_batch_tx(base_tx=base_tx, to_sweep=to_sweep_now, to_pay=to_pay)
except NotEnoughFunds:
if to_pay:
k = max(to_pay, key=lambda x: x.value)
self.logger.info(f'Not enough funds, removing output {k}')
to_pay.remove(k)
continue
else:
self.logger.info(f'Not enough funds, waiting')
return None
# 100 kb max standardness rule
if tx.estimated_size() < 100_000:
break
to_sweep_now = to_sweep_now[0:len(to_sweep_now)//2]
to_pay = to_pay[0:len(to_pay)//2]
self.logger.info(f'created tx {tx.txid()} with {len(tx.inputs())} inputs and {len(tx.outputs())} outputs')
return tx
def add_sweep_info_to_tx(self, base_tx: PartialTransaction) -> None:
for txin in base_tx.inputs():
if sweep_info := self.batch_inputs.get(txin.prevout):
if hasattr(sweep_info.txin, 'make_witness'):
txin.make_witness = sweep_info.txin.make_witness
txin.privkey = sweep_info.txin.privkey
txin.witness_script = sweep_info.txin.witness_script
txin.script_sig = sweep_info.txin.script_sig
def _create_batch_tx(
self,
*,
base_tx: Optional[PartialTransaction],
to_sweep: Mapping[str, SweepInfo],
to_pay: Sequence[PartialTxOutput],
) -> PartialTransaction:
self.logger.info(f'to_sweep: {list(to_sweep.keys())}')
self.logger.info(f'to_pay: {to_pay}')
inputs = [] # type: List[PartialTxInput]
outputs = [] # type: List[PartialTxOutput]
locktime = base_tx.locktime if base_tx else None
# sort inputs so that txin-txout pairs are first
for sweep_info in sorted(to_sweep.values(), key=lambda x: not bool(x.txout)):
if sweep_info.cltv_abs is not None:
if locktime is None or locktime < sweep_info.cltv_abs:
# nLockTime must be greater than or equal to the stack operand.
locktime = sweep_info.cltv_abs
inputs.append(copy.deepcopy(sweep_info.txin))
if sweep_info.txout:
outputs.append(sweep_info.txout)
self.logger.info(f'locktime: {locktime}')
outputs += to_pay
inputs += self._create_inputs_from_tx_change(self._parent_tx) if self._parent_tx else []
# create tx
coins = self.wallet.get_spendable_coins(nonlocal_only=True)
tx = self.wallet.make_unsigned_transaction(
coins=coins,
fee_policy=self.fee_policy,
base_tx=base_tx,
inputs=inputs,
outputs=outputs,
locktime=locktime,
BIP69_sort=False,
merge_duplicate_outputs=False,
)
# this assert will fail if we merge duplicate outputs
for o in outputs: assert o in tx.outputs()
return tx
def _clear_unconfirmed_sweeps(self, tx: PartialTransaction) -> None:
# this ensures that we can accept an input again,
# in case the sweeping tx has been removed from the blockchain after a reorg
for txin in tx.inputs():
if txin.prevout in self._unconfirmed_sweeps:
self._unconfirmed_sweeps.remove(txin.prevout)
@locked
def _start_new_batch(self, tx: Optional[PartialTransaction]) -> None:
use_change = tx and tx.has_change() and any([txout in self.batch_payments for txout in tx.outputs()])
self.batch_payments = self._to_pay_after(tx)
self.batch_inputs = self._to_sweep_after(tx)
self._batch_txids.clear()
self._base_tx = None
self._parent_tx = tx if use_change else None
self._prevout = None
@locked
def _new_base_tx(self, tx: Transaction) -> None:
self._prevout = tx.inputs()[0].prevout.to_str()
self.storage['prevout'] = self._prevout
if tx.has_change():
self._batch_txids.append(tx.txid())
self._base_tx = tx
else:
self.logger.info(f'starting new batch because current base tx does not have change')
self._start_new_batch(tx)
def _create_inputs_from_tx_change(self, parent_tx: PartialTransaction) -> List[PartialTxInput]:
inputs = []
for o in parent_tx.get_change_outputs():
coins = self.wallet.adb.get_addr_utxo(o.address)
inputs += list(coins.values())
for txin in inputs:
txin.nsequence = 0xffffffff - 2
return inputs
def _can_broadcast(self, sweep_info: 'SweepInfo', base_tx: 'Transaction') -> Tuple[bool, Optional[int]]:
prevout = sweep_info.txin.prevout.to_str()
name = sweep_info.name
prev_txid, index = prevout.split(':')
can_broadcast = True
wanted_height_cltv = None
wanted_height_csv = None
local_height = self.wallet.network.get_local_height()
if sweep_info.cltv_abs:
wanted_height_cltv = sweep_info.cltv_abs
if wanted_height_cltv - local_height > 0:
can_broadcast = False
prev_height = self.wallet.adb.get_tx_height(prev_txid).height
if sweep_info.csv_delay:
if prev_height > 0:
wanted_height_csv = prev_height + sweep_info.csv_delay - 1
if wanted_height_csv - local_height > 0:
can_broadcast = False
else:
can_broadcast = False
wanted_height_csv = local_height + sweep_info.csv_delay
if not can_broadcast:
wanted_height = max((wanted_height_csv or 0), (wanted_height_cltv or 0))
else:
wanted_height = None
if base_tx and prev_height <= 0:
# we cannot add unconfirmed inputs to existing base_tx (per RBF rules)
# thus, we will wait until the current batch is confirmed
if can_broadcast:
can_broadcast = False
wanted_height = local_height + 1
return can_broadcast, wanted_height