From 368bc2329c24f13715661aec8ca06e7e6ea01cea Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Tue, 5 May 2026 13:54:10 +0200 Subject: [PATCH] script per ripulire indirizzi su electrum --- temp/sweep_p2wpkh.py | 443 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 443 insertions(+) create mode 100644 temp/sweep_p2wpkh.py diff --git a/temp/sweep_p2wpkh.py b/temp/sweep_p2wpkh.py new file mode 100644 index 000000000..13b5828ae --- /dev/null +++ b/temp/sweep_p2wpkh.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python3 +""" +Sweep the addresses associated with a BitcoinPurple WIF. + +Examples: + python temp/sweep_p2wpkh.py + python temp/sweep_p2wpkh.py "p2wpkh:WIF..." btcp1destination... all + python temp/sweep_p2wpkh.py "WIF..." btcp1destination... 3 --fee-rate 2 + +By default the script creates and prints a signed raw transaction. It only +broadcasts if --broadcast is passed. +""" + +import argparse +import asyncio +import json +import os +import ssl +import sys +from dataclasses import dataclass +from decimal import Decimal, InvalidOperation, ROUND_CEILING +from typing import Iterable, Optional + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import electrum.constants as constants +from electrum.constants import BitcoinPurple + +BitcoinPurple.set_as_network() + +from electrum import bitcoin +from electrum.bitcoin import ( + address_to_script, + deserialize_privkey, + dust_threshold, + is_address, + pubkey_to_address, +) +from electrum.descriptor import get_singlesig_descriptor_from_legacy_leaf +from electrum.transaction import ( + PartialTransaction, + PartialTxInput, + PartialTxOutput, + TxOutput, + TxOutpoint, +) +from electrum_ecc import ECPrivkey + + +DEFAULT_FEE_RATE = Decimal("2") +CLIENT_NAME = "btcp_sweep_tool" +MAX_RPC_RESPONSE_BYTES = 64 * 1024 * 1024 +UTXO_PREVIEW_LIMIT = 50 + + +@dataclass(frozen=True) +class DerivedAddress: + script_type: str + address: str + pubkey: bytes + + +@dataclass(frozen=True) +class SpendableUtxo: + tx_hash: str + tx_pos: int + value: int + height: int + derived: DerivedAddress + + @property + def outpoint(self) -> str: + return f"{self.tx_hash}:{self.tx_pos}" + + +class ElectrumXClient: + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self.reader = reader + self.writer = writer + self.request_id = 0 + + @classmethod + async def connect(cls, servers: Iterable[tuple[str, int]]) -> "ElectrumXClient": + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + last_error: Optional[BaseException] = None + for host, port in servers: + try: + print(f"Connecting to {host}:{port} ... ", end="", flush=True) + reader, writer = await asyncio.wait_for( + asyncio.open_connection( + host, + port, + ssl=ctx, + limit=MAX_RPC_RESPONSE_BYTES, + ), + timeout=10, + ) + client = cls(reader, writer) + await client.rpc("server.version", [CLIENT_NAME, "1.4"]) + print("OK") + return client + except Exception as e: + last_error = e + print(f"failed ({e})") + + raise RuntimeError(f"no ElectrumX server reachable: {last_error}") + + async def rpc(self, method: str, params: list): + self.request_id += 1 + payload = {"id": self.request_id, "method": method, "params": params} + self.writer.write((json.dumps(payload) + "\n").encode("ascii")) + await self.writer.drain() + + line = await asyncio.wait_for(self.reader.readline(), timeout=30) + if not line: + raise RuntimeError("ElectrumX connection closed") + response = json.loads(line) + if response.get("error"): + raise RuntimeError(f"ElectrumX error for {method}: {response['error']}") + return response["result"] + + async def close(self) -> None: + self.writer.close() + try: + await self.writer.wait_closed() + except Exception: + pass + + +def decimal_fee_rate(value: str) -> Decimal: + try: + fee_rate = Decimal(value) + except InvalidOperation as e: + raise argparse.ArgumentTypeError("fee rate must be a number") from e + if fee_rate <= 0: + raise argparse.ArgumentTypeError("fee rate must be greater than zero") + return fee_rate + + +def load_servers() -> list[tuple[str, int]]: + servers = [] + for host, data in constants.net.DEFAULT_SERVERS.items(): + ssl_port = data.get("s") + if ssl_port: + servers.append((host, int(ssl_port))) + if not servers: + raise RuntimeError("no SSL ElectrumX servers configured") + return servers + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Create a signed sweep transaction for addresses derived from a BTCP WIF.", + ) + parser.add_argument("wif", nargs="?", help="BTCP WIF private key") + parser.add_argument("destination", nargs="?", help="destination BTCP address") + parser.add_argument( + "utxo_count", + nargs="?", + help="number of UTXOs to spend, or 'all' (default: prompt, Enter = all)", + ) + parser.add_argument( + "--fee-rate", + type=decimal_fee_rate, + default=DEFAULT_FEE_RATE, + help=f"fee rate in sat/vbyte (default: {DEFAULT_FEE_RATE})", + ) + parser.add_argument( + "--script-types", + default=None, + help="comma-separated script types to scan (default: p2wpkh,p2wpkh-p2sh,p2pkh)", + ) + parser.add_argument( + "--broadcast", + action="store_true", + help="broadcast the signed transaction after creating it", + ) + return parser.parse_args() + + +def prompt_missing_args(args: argparse.Namespace) -> argparse.Namespace: + if not args.wif: + args.wif = input("Private key WIF: ").strip() + if not args.destination: + args.destination = input("Destination address: ").strip() + return args + + +def get_script_types(args_value: Optional[str], compressed: bool) -> list[str]: + if args_value: + script_types = [item.strip() for item in args_value.split(",") if item.strip()] + elif compressed: + script_types = ["p2wpkh", "p2wpkh-p2sh", "p2pkh"] + else: + script_types = ["p2pkh"] + + unsupported = sorted(set(script_types) - {"p2wpkh", "p2wpkh-p2sh", "p2pkh"}) + if unsupported: + raise ValueError(f"unsupported script type(s): {', '.join(unsupported)}") + if not compressed and any(t in {"p2wpkh", "p2wpkh-p2sh"} for t in script_types): + raise ValueError("segwit script types require a compressed WIF") + return script_types + + +def derive_addresses(wif: str, script_types_arg: Optional[str]) -> tuple[bytes, list[DerivedAddress]]: + _txin_type, privkey, compressed = deserialize_privkey(wif) + ec_privkey = ECPrivkey(privkey) + + addresses = [] + for script_type in get_script_types(script_types_arg, compressed): + use_compressed = script_type in {"p2wpkh", "p2wpkh-p2sh"} or compressed + pubkey = ec_privkey.get_public_key_bytes(compressed=use_compressed) + address = pubkey_to_address(script_type, pubkey.hex()) + addresses.append(DerivedAddress(script_type=script_type, address=address, pubkey=pubkey)) + + return privkey, addresses + + +async def fetch_utxos(client: ElectrumXClient, derived: DerivedAddress) -> list[SpendableUtxo]: + script_hash = bitcoin.address_to_scripthash(derived.address) + rows = await client.rpc("blockchain.scripthash.listunspent", [script_hash]) + utxos = [] + for row in rows: + utxos.append( + SpendableUtxo( + tx_hash=row["tx_hash"], + tx_pos=int(row["tx_pos"]), + value=int(row["value"]), + height=int(row.get("height", 0)), + derived=derived, + ) + ) + return utxos + + +def sort_utxos(utxos: list[SpendableUtxo]) -> list[SpendableUtxo]: + return sorted( + utxos, + key=lambda u: ( + u.height <= 0, + -u.value, + u.derived.script_type, + u.tx_hash, + u.tx_pos, + ), + ) + + +def print_addresses(addresses: list[DerivedAddress]) -> None: + print("\nDerived addresses:") + for derived in addresses: + print(f" {derived.script_type:<12} {derived.address}") + + +def print_utxos(utxos: list[SpendableUtxo]) -> None: + print("\nSpendable UTXOs:") + print(f"{'#':>4} {'type':<12} {'outpoint':<69} {'sat':>14} height") + print("-" * 112) + for index, utxo in enumerate(utxos[:UTXO_PREVIEW_LIMIT], start=1): + print( + f"{index:>4} {utxo.derived.script_type:<12} " + f"{utxo.outpoint:<69} {utxo.value:>14,} {utxo.height}" + ) + if len(utxos) > UTXO_PREVIEW_LIMIT: + print(f"... {len(utxos) - UTXO_PREVIEW_LIMIT:,} more UTXOs not shown") + print("-" * 112) + print(f"Total: {len(utxos)} UTXO, {sum(u.value for u in utxos):,} sat") + + +def parse_utxo_count(raw_count: Optional[str], max_count: int) -> int: + raw = raw_count + if raw is None: + raw = input(f"How many UTXOs to spend? (1-{max_count}, Enter = all): ").strip() + if raw == "" or raw.lower() == "all": + return max_count + try: + count = int(raw) + except ValueError as e: + raise ValueError("UTXO count must be an integer or 'all'") from e + if not 1 <= count <= max_count: + raise ValueError(f"UTXO count must be between 1 and {max_count}") + return count + + +def make_inputs_and_keypairs( + selected_utxos: list[SpendableUtxo], + privkey: bytes, +) -> tuple[list[PartialTxInput], dict[bytes, bytes]]: + inputs = [] + keypairs = {} + + for utxo in selected_utxos: + desc = get_singlesig_descriptor_from_legacy_leaf( + pubkey=utxo.derived.pubkey.hex(), + script_type=utxo.derived.script_type, + ) + txin = PartialTxInput(prevout=TxOutpoint.from_str(utxo.outpoint)) + txin.script_descriptor = desc + txin.witness_utxo = TxOutput( + value=utxo.value, + scriptpubkey=address_to_script(utxo.derived.address), + ) + txin._trusted_value_sats = utxo.value + txin._trusted_address = utxo.derived.address + txin.block_height = utxo.height + inputs.append(txin) + keypairs[utxo.derived.pubkey] = privkey + + return inputs, keypairs + + +def fee_from_vbytes(vbytes: int, fee_rate: Decimal) -> int: + return int((Decimal(vbytes) * fee_rate).to_integral_value(rounding=ROUND_CEILING)) + + +def build_signed_transaction( + *, + selected_utxos: list[SpendableUtxo], + privkey: bytes, + destination: str, + fee_rate: Decimal, +) -> tuple[PartialTransaction, str, int, int]: + total_in = sum(utxo.value for utxo in selected_utxos) + fee = 0 + + for _attempt in range(5): + inputs, keypairs = make_inputs_and_keypairs(selected_utxos, privkey) + output_value = total_in - fee + output = PartialTxOutput.from_address_and_value(destination, output_value) + tx = PartialTransaction.from_io(inputs, [output], locktime=0) + + estimated_vbytes = tx.estimated_size() + next_fee = fee_from_vbytes(estimated_vbytes, fee_rate) + if next_fee != fee: + fee = next_fee + if total_in - fee < dust_threshold(): + raise ValueError( + f"not enough funds: total={total_in} sat, fee={fee} sat, " + f"dust={dust_threshold()} sat" + ) + continue + + tx.sign(keypairs) + if not tx.is_complete(): + raise RuntimeError("transaction is incomplete after signing") + raw = tx.serialize() + actual_vbytes = tx.estimated_size() + return tx, raw, fee, actual_vbytes + + raise RuntimeError("fee calculation did not converge") + + +async def broadcast(raw_tx: str, servers: list[tuple[str, int]]) -> str: + last_error: Optional[BaseException] = None + for host, port in servers: + client = None + try: + client = await ElectrumXClient.connect([(host, port)]) + txid = await client.rpc("blockchain.transaction.broadcast", [raw_tx]) + return txid + except Exception as e: + last_error = e + print(f"Broadcast via {host}:{port} failed: {e}") + finally: + if client: + await client.close() + raise RuntimeError(f"broadcast failed on all servers: {last_error}") + + +async def main() -> int: + args = prompt_missing_args(parse_args()) + + if not is_address(args.destination): + raise ValueError("destination is not a valid BitcoinPurple address") + + privkey, addresses = derive_addresses(args.wif, args.script_types) + servers = load_servers() + + print("\nBitcoinPurple WIF sweep") + print(f"Fee rate: {args.fee_rate} sat/vbyte") + print_addresses(addresses) + + client = await ElectrumXClient.connect(servers) + try: + utxos: list[SpendableUtxo] = [] + for derived in addresses: + found = await fetch_utxos(client, derived) + print(f"Found {len(found)} UTXO for {derived.script_type} {derived.address}") + utxos.extend(found) + finally: + await client.close() + + utxos = sort_utxos(utxos) + if not utxos: + print("\nNo spendable UTXOs found for the derived addresses.") + return 1 + + print_utxos(utxos) + count = parse_utxo_count(args.utxo_count, len(utxos)) + selected = utxos[:count] + total_in = sum(utxo.value for utxo in selected) + + tx, raw, fee, actual_vbytes = build_signed_transaction( + selected_utxos=selected, + privkey=privkey, + destination=args.destination, + fee_rate=args.fee_rate, + ) + + print("\nTransaction created") + print(f"Inputs : {len(selected)}") + print(f"Total : {total_in:,} sat") + print(f"Fee : {fee:,} sat ({actual_vbytes} vbytes)") + print(f"Output : {total_in - fee:,} sat") + print(f"TxID : {tx.txid()}") + print(f"\nRaw TX:\n{raw}") + + should_broadcast = args.broadcast + if not should_broadcast: + answer = input("\nBroadcast transaction? Type 'yes' to send, or Enter/no to keep raw tx only: ").strip().lower() + should_broadcast = answer == "yes" + + if should_broadcast: + print("\nBroadcasting...") + txid = await broadcast(raw, servers) + print(f"Broadcast OK: {txid}") + else: + print("\nBroadcast skipped. Raw transaction only.") + + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(asyncio.run(main())) + except KeyboardInterrupt: + raise SystemExit(130) + except Exception as e: + print(f"ERROR: {e}", file=sys.stderr) + raise SystemExit(1)