#!/usr/bin/env python # # Electrum - lightweight Bitcoin client # Copyright (C) 2011 Thomas Voegtlin # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # Note: The deserialization code originally comes from ABE. import struct import io import base64 from typing import ( Sequence, Union, NamedTuple, Tuple, Optional, Iterable, Callable, List, Dict, Set, TYPE_CHECKING, Mapping, Any ) from collections import defaultdict from enum import IntEnum import itertools import binascii import copy import electrum_ecc as ecc from electrum_ecc.util import bip340_tagged_hash from . import bitcoin, bip32 from .bip32 import BIP32Node from .util import to_bytes, bfh, chunks, is_hex_str, parse_max_spend from .bitcoin import ( TYPE_ADDRESS, TYPE_SCRIPT, hash_160, hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr, var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN, opcodes, base_decode, base_encode, construct_witness, construct_script, taproot_tweak_seckey ) from .crypto import sha256d, sha256 from .logging import get_logger from .util import ShortID, OldTaskGroup from .descriptor import Descriptor, MissingSolutionPiece, create_dummy_descriptor_from_address if TYPE_CHECKING: from .wallet import Abstract_Wallet from .network import Network from .simple_config import SimpleConfig _logger = get_logger(__name__) DEBUG_PSBT_PARSING = False _NEEDS_RECALC = ... # sentinel value class SerializationError(Exception): """ Thrown when there's a problem deserializing or serializing """ class UnknownTxinType(Exception): pass class BadHeaderMagic(SerializationError): pass class UnexpectedEndOfStream(SerializationError): pass class PSBTInputConsistencyFailure(SerializationError): pass class MalformedBitcoinScript(Exception): pass class MissingTxInputAmount(Exception): pass class TxinDataFetchProgress(NamedTuple): num_tasks_done: int num_tasks_total: int has_errored: bool has_finished: bool class Sighash(IntEnum): # note: this is not an IntFlag, as ALL|NONE != SINGLE DEFAULT = 0 # taproot only (bip-0341) ALL = 1 NONE = 2 SINGLE = 3 ANYONECANPAY = 0x80 @classmethod def is_valid(cls, sighash: int, *, is_taproot: bool = False) -> bool: valid_flags = { 0x01, 0x02, 0x03, 0x81, 0x82, 0x83, } if is_taproot: valid_flags.add(0x00) return sighash in valid_flags @classmethod def to_sigbytes(cls, sighash: int) -> bytes: if sighash == Sighash.DEFAULT: return b"" return sighash.to_bytes(length=1, byteorder="big") class TxOutput: scriptpubkey: bytes value: Union[int, str] def __init__(self, *, scriptpubkey: bytes, value: Union[int, str]): self.scriptpubkey = scriptpubkey if not (isinstance(value, int) or parse_max_spend(value) is not None): raise ValueError(f"bad txout value: {value!r}") self.value = value # int in satoshis; or spend-max-like str @classmethod def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']: return cls(scriptpubkey=bitcoin.address_to_script(address), value=value) def serialize_to_network(self) -> bytes: buf = int.to_bytes(self.value, 8, byteorder="little", signed=False) script = self.scriptpubkey buf += var_int(len(script)) buf += script return buf @classmethod def from_network_bytes(cls, raw: bytes) -> 'TxOutput': vds = BCDataStream() vds.write(raw) txout = parse_output(vds) if vds.can_read_more(): raise SerializationError('extra junk at the end of TxOutput bytes') return txout def to_legacy_tuple(self) -> Tuple[int, str, Union[int, str]]: if self.address: return TYPE_ADDRESS, self.address, self.value return TYPE_SCRIPT, self.scriptpubkey.hex(), self.value @classmethod def from_legacy_tuple(cls, _type: int, addr: str, val: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']: if _type == TYPE_ADDRESS: return cls.from_address_and_value(addr, val) if _type == TYPE_SCRIPT: return cls(scriptpubkey=bfh(addr), value=val) raise Exception(f"unexpected legacy address type: {_type}") @property def scriptpubkey(self) -> bytes: return self._scriptpubkey @scriptpubkey.setter def scriptpubkey(self, scriptpubkey: bytes): self._scriptpubkey = scriptpubkey self._address = _NEEDS_RECALC @property def address(self) -> Optional[str]: if self._address is _NEEDS_RECALC: self._address = get_address_from_output_script(self._scriptpubkey) return self._address def get_ui_address_str(self) -> str: addr = self.address if addr is not None: return addr return f"SCRIPT {self.scriptpubkey.hex()}" def __repr__(self): return f"" def __eq__(self, other): if not isinstance(other, TxOutput): return False return self.scriptpubkey == other.scriptpubkey and self.value == other.value def __ne__(self, other): return not (self == other) def __hash__(self) -> int: return hash((self.scriptpubkey, self.value)) def to_json(self): d = { 'scriptpubkey': self.scriptpubkey.hex(), 'address': self.address, 'value_sats': self.value, } return d class BIP143SharedTxDigestFields(NamedTuple): # witness v0 hashPrevouts: bytes hashSequence: bytes hashOutputs: bytes @classmethod def from_tx(cls, tx: 'Transaction') -> 'BIP143SharedTxDigestFields': inputs = tx.inputs() outputs = tx.outputs() hashPrevouts = sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs)) hashSequence = sha256d(b''.join( int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False) for txin in inputs)) hashOutputs = sha256d(b''.join(o.serialize_to_network() for o in outputs)) return BIP143SharedTxDigestFields( hashPrevouts=hashPrevouts, hashSequence=hashSequence, hashOutputs=hashOutputs, ) class BIP341SharedTxDigestFields(NamedTuple): # witness v1 sha_prevouts: bytes sha_amounts: bytes sha_scriptpubkeys: bytes sha_sequences: bytes sha_outputs: bytes @classmethod def from_tx(cls, tx: 'Transaction') -> 'BIP341SharedTxDigestFields': inputs = tx.inputs() outputs = tx.outputs() sha_prevouts = sha256(b''.join(txin.prevout.serialize_to_network() for txin in inputs)) sha_amounts = sha256(b''.join( int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False) for txin in inputs)) sha_scriptpubkeys = sha256(b''.join( var_int(len(txin.scriptpubkey)) + txin.scriptpubkey for txin in inputs)) sha_sequences = sha256(b''.join( int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False) for txin in inputs)) sha_outputs = sha256(b''.join(o.serialize_to_network() for o in outputs)) return BIP341SharedTxDigestFields( sha_prevouts=sha_prevouts, sha_amounts=sha_amounts, sha_scriptpubkeys=sha_scriptpubkeys, sha_sequences=sha_sequences, sha_outputs=sha_outputs, ) class SighashCache: def __init__(self): self._witver0 = None # type: Optional[BIP143SharedTxDigestFields] self._witver1 = None # type: Optional[BIP341SharedTxDigestFields] def get_witver0_data_for_tx(self, tx: 'Transaction') -> BIP143SharedTxDigestFields: if self._witver0 is None: self._witver0 = BIP143SharedTxDigestFields.from_tx(tx) return self._witver0 def get_witver1_data_for_tx(self, tx: 'Transaction') -> BIP341SharedTxDigestFields: if self._witver1 is None: self._witver1 = BIP341SharedTxDigestFields.from_tx(tx) return self._witver1 class TxOutpoint(NamedTuple): txid: bytes # endianness same as hex string displayed; reverse of tx serialization order out_idx: int @classmethod def from_str(cls, s: str) -> 'TxOutpoint': hash_str, idx_str = s.split(':') assert len(hash_str) == 64, f"{hash_str} should be a sha256 hash" return TxOutpoint(txid=bfh(hash_str), out_idx=int(idx_str)) def __str__(self) -> str: return f"""TxOutpoint("{self.to_str()}")""" def __repr__(self): return f"<{str(self)}>" def to_str(self) -> str: return f"{self.txid.hex()}:{self.out_idx}" def to_json(self): return [self.txid.hex(), self.out_idx] def serialize_to_network(self) -> bytes: return self.txid[::-1] + int.to_bytes(self.out_idx, length=4, byteorder="little", signed=False) def is_coinbase(self) -> bool: return self.txid == bytes(32) def short_name(self): return f"{self.txid.hex()[0:10]}:{self.out_idx}" class TxInput: prevout: TxOutpoint script_sig: Optional[bytes] nsequence: int witness: Optional[bytes] _is_coinbase_output: bool def __init__(self, *, prevout: TxOutpoint, script_sig: bytes = None, nsequence: int = 0xffffffff - 1, witness: bytes = None, is_coinbase_output: bool = False): self.prevout = prevout self.script_sig = script_sig self.nsequence = nsequence self.witness = witness self._is_coinbase_output = is_coinbase_output # blockchain fields self.block_height = None # type: Optional[int] # height at which the TXO is mined; None means unknown. not SPV-ed. self.block_txpos = None # type: Optional[int] # position of tx in block, if TXO is mined; otherwise None or -1 self.spent_height = None # type: Optional[int] # height at which the TXO got spent self.spent_txid = None # type: Optional[str] # txid of the spender self._utxo = None # type: Optional[Transaction] self.__scriptpubkey = None # type: Optional[bytes] self.__address = None # type: Optional[str] self.__value_sats = None # type: Optional[int] self._is_taproot = None # type: Optional[bool] # None means unknown def get_time_based_relative_locktime(self) -> Optional[int]: # see bip 68 if self.nsequence & (1<<31): return if self.nsequence & (1<<22): return self.nsequence & 0xffff def get_block_based_relative_locktime(self) -> Optional[int]: if self.nsequence & (1<<31): return if not self.nsequence & (1<<22): return self.nsequence & 0xffff @property def short_id(self): if self.block_txpos is not None and self.block_txpos >= 0: return ShortID.from_components(self.block_height, self.block_txpos, self.prevout.out_idx) else: return self.prevout.short_name() @property def utxo(self): return self._utxo @utxo.setter def utxo(self, tx: Optional['Transaction']): if tx is None: return # note that tx might be a PartialTransaction # serialize and de-serialize tx now. this might e.g. convert a complete PartialTx to a Tx tx = tx_from_any(str(tx)) # 'utxo' field should not be a PSBT: if not tx.is_complete(): return self.validate_data(utxo=tx) self._utxo = tx # update derived fields out_idx = self.prevout.out_idx self.__scriptpubkey = self._utxo.outputs()[out_idx].scriptpubkey self.__address = _NEEDS_RECALC self.__value_sats = self._utxo.outputs()[out_idx].value def validate_data(self, *, utxo: Optional['Transaction'] = None, **kwargs) -> None: utxo = utxo or self.utxo if utxo: if self.prevout.txid.hex() != utxo.txid(): raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout") def is_coinbase_input(self) -> bool: """Whether this is the input of a coinbase tx.""" return self.prevout.is_coinbase() def is_coinbase_output(self) -> bool: """Whether the coin being spent is an output of a coinbase tx. This matters for coin maturity (and pretty much only for that!). """ return self._is_coinbase_output def value_sats(self) -> Optional[int]: return self.__value_sats @property def address(self) -> Optional[str]: if self.__address is _NEEDS_RECALC: self.__address = get_address_from_output_script(self.__scriptpubkey) return self.__address @property def scriptpubkey(self) -> Optional[bytes]: return self.__scriptpubkey def to_json(self): d = { 'prevout_hash': self.prevout.txid.hex(), 'prevout_n': self.prevout.out_idx, 'coinbase': self.is_coinbase_output(), 'nsequence': self.nsequence, } if self.script_sig is not None: d['scriptSig'] = self.script_sig.hex() if self.witness is not None: d['witness'] = [x.hex() for x in self.witness_elements()] return d def serialize_to_network(self, *, script_sig: bytes = None) -> bytes: if script_sig is None: script_sig = self.script_sig # Prev hash and index s = self.prevout.serialize_to_network() # Script length, script, sequence s += var_int(len(script_sig)) s += script_sig s += int.to_bytes(self.nsequence, length=4, byteorder="little", signed=False) return s def witness_elements(self) -> Sequence[bytes]: if not self.witness: return [] vds = BCDataStream() vds.write(self.witness) n = vds.read_compact_size() return list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) def is_segwit(self, *, guess_for_address=False) -> bool: if self.witness not in (b'\x00', b'', None): return True return False def is_taproot(self) -> Optional[bool]: if self._is_taproot is None: if self.address: self._is_taproot = bitcoin.is_taproot_address(self.address) return self._is_taproot async def add_info_from_network( self, network: Optional['Network'], *, ignore_network_issues: bool = True, timeout=None, ) -> bool: """Returns True iff successful.""" from .network import NetworkException async def fetch_from_network(txid) -> Optional[Transaction]: tx = None if network and network.has_internet_connection(): try: raw_tx = await network.get_transaction(txid, timeout=timeout) except NetworkException as e: _logger.info(f'got network error getting input txn. err: {repr(e)}. txid: {txid}. ' f'if you are intentionally offline, consider using the --offline flag') if not ignore_network_issues: raise e else: tx = Transaction(raw_tx) if not tx and not ignore_network_issues: raise NetworkException('failed to get prev tx from network') return tx if self.utxo is None: self.utxo = await fetch_from_network(txid=self.prevout.txid.hex()) return self.utxo is not None def get_scriptcode_for_sighash(self) -> bytes: """Reconstructs the scriptcode part of the preimage for OP_CHECKSIG, for an already complete txin, in order to *verify the signature*. """ scriptpubkey = self.scriptpubkey if scriptpubkey is None: raise Exception("missing scriptpubkey. 'utxo' not set?") script_type = get_script_type_from_output_script(scriptpubkey) if script_type == "p2wsh": wit_elems = self.witness_elements() if not wit_elems: raise Exception(f"missing witness for {script_type=}") witness_script = wit_elems[-1] if self.address != bitcoin.script_to_p2wsh(witness_script): raise Exception("witness_script from witness does not match address") if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(witness_script)]: raise Exception('OP_CODESEPARATOR black magic is not supported') return witness_script elif script_type == "p2wpkh": pkh = scriptpubkey[-20:] assert len(pkh) == 20 p2pkh_script = bitcoin.pubkeyhash_to_p2pkh_script(pkh) return p2pkh_script elif script_type == "p2sh": if not self.script_sig: raise Exception(f"missing script_sig for {script_type=}") parsed_ss = list(script_GetOp(self.script_sig)) redeem_script = parsed_ss[-1][1] if self.address != bitcoin.hash160_to_p2sh(hash_160(redeem_script)): raise Exception("redeem_script from script_sig does not match address") if self.is_segwit(): # p2sh-wrapped-segwit inner_script_type = get_script_type_from_output_script(redeem_script) if inner_script_type == "p2wsh": wit_elems = self.witness_elements() witness_script = wit_elems[-1] if redeem_script != bitcoin.p2wsh_nested_script(witness_script): raise Exception("witness_script from witness does not match redeem_script") if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(witness_script)]: raise Exception('OP_CODESEPARATOR black magic is not supported') return witness_script elif inner_script_type == "p2wpkh": pkh = self.script_sig[-20:] assert len(pkh) == 20 p2pkh_script = bitcoin.pubkeyhash_to_p2pkh_script(pkh) return p2pkh_script else: raise Exception(f"unexpected {inner_script_type=} wrapped in p2sh. script_sig={self.script_sig.hex()}") else: if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(redeem_script)]: raise Exception('OP_CODESEPARATOR black magic is not supported') return redeem_script elif script_type in ("p2pkh", "p2pk"): # For "raw scriptpubkey" case, it is usually the scriptPubKey that is signed. # Complications for the general case: # - signatures should be removed ("FindAndDelete") # - OP_CODESEPARATOR black magic return scriptpubkey else: raise Exception(f"cannot handle {script_type=} ({scriptpubkey.hex()=})") raise Exception("should not get here") class BCDataStream(object): """Workalike python implementation of Bitcoin's CDataStream class.""" def __init__(self): self.input = None # type: Optional[bytearray] self.read_cursor = 0 def clear(self): self.input = None self.read_cursor = 0 def write(self, _bytes: Union[bytes, bytearray]): # Initialize with string of _bytes assert isinstance(_bytes, (bytes, bytearray)) if self.input is None: self.input = bytearray(_bytes) else: self.input += bytearray(_bytes) def read_string(self, encoding='ascii'): # Strings are encoded depending on length: # 0 to 252 : 1-byte-length followed by bytes (if any) # 253 to 65,535 : byte'253' 2-byte-length followed by bytes # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes # ... and the Bitcoin client is coded to understand: # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string # ... but I don't think it actually handles any strings that big. if self.input is None: raise SerializationError("call write(bytes) before trying to deserialize") length = self.read_compact_size() return self.read_bytes(length).decode(encoding) def write_string(self, string, encoding='ascii'): string = to_bytes(string, encoding) # Length-encoded as with read-string self.write_compact_size(len(string)) self.write(string) def read_bytes(self, length: int) -> bytes: if self.input is None: raise SerializationError("call write(bytes) before trying to deserialize") assert length >= 0 input_len = len(self.input) read_begin = self.read_cursor read_end = read_begin + length if 0 <= read_begin <= read_end <= input_len: result = self.input[read_begin:read_end] # type: bytearray self.read_cursor += length return bytes(result) else: raise SerializationError('attempt to read past end of buffer') def write_bytes(self, _bytes: Union[bytes, bytearray], length: int): assert len(_bytes) == length, len(_bytes) self.write(_bytes) def can_read_more(self) -> bool: if not self.input: return False return self.read_cursor < len(self.input) def read_boolean(self) -> bool: return self.read_bytes(1) != b'\x00' def read_int16(self): return self._read_num(' len(_bytes): raise MalformedBitcoinScript( f"Push of data element that is larger than remaining data: {nSize} vs {len(_bytes) - i}") vch = _bytes[i:i + nSize] i += nSize yield opcode, vch, i class OPPushDataGeneric: def __init__(self, pushlen: Callable[[int], bool] | None = None): if pushlen is not None: self.check_data_len = pushlen @classmethod def check_data_len(cls, datalen: int) -> bool: # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. return opcodes.OP_PUSHDATA4 >= datalen >= 0 @classmethod def is_instance(cls, item): # accept objects that are instances of this class # or other classes that are subclasses return isinstance(item, cls) \ or (isinstance(item, type) and issubclass(item, cls)) class OPGeneric: def __init__(self, matcher: Callable[[Any], bool] | None = None): if matcher is not None: self.matcher = matcher def match(self, op) -> bool: return self.matcher(op) @classmethod def is_instance(cls, item) -> bool: # accept objects that are instances of this class # or other classes that are subclasses return isinstance(item, cls) \ or (isinstance(item, type) and issubclass(item, cls)) OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65)) OP_ANYSEGWIT_VERSION = OPGeneric(lambda x: x in list(range(opcodes.OP_1, opcodes.OP_16 + 1))) SCRIPTPUBKEY_TEMPLATE_P2PK = [OPPushDataGeneric(lambda x: x in (33, 65)), opcodes.OP_CHECKSIG] SCRIPTPUBKEY_TEMPLATE_P2PKH = [opcodes.OP_DUP, opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG] SCRIPTPUBKEY_TEMPLATE_P2SH = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL] SCRIPTPUBKEY_TEMPLATE_WITNESS_V0 = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))] SCRIPTPUBKEY_TEMPLATE_P2WPKH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 20)] SCRIPTPUBKEY_TEMPLATE_P2WSH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 32)] SCRIPTPUBKEY_TEMPLATE_P2TR = [opcodes.OP_1, OPPushDataGeneric(lambda x: x == 32)] SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT = [OP_ANYSEGWIT_VERSION, OPPushDataGeneric(lambda x: x in list(range(2, 40 + 1)))] def check_scriptpubkey_template_and_dust(scriptpubkey, amount: Optional[int]): if match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2PKH): dust_limit = bitcoin.DUST_LIMIT_P2PKH elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2SH): dust_limit = bitcoin.DUST_LIMIT_P2SH elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2WSH): dust_limit = bitcoin.DUST_LIMIT_P2WSH elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_P2WPKH): dust_limit = bitcoin.DUST_LIMIT_P2WPKH elif match_script_against_template(scriptpubkey, SCRIPTPUBKEY_TEMPLATE_ANYSEGWIT): dust_limit = bitcoin.DUST_LIMIT_UNKNOWN_SEGWIT else: raise Exception(f'scriptpubkey does not conform to any template: {scriptpubkey.hex()}') if amount < dust_limit: raise Exception(f'amount ({amount}) is below dust limit for scriptpubkey type ({dust_limit})') def merge_duplicate_tx_outputs(outputs: Iterable['PartialTxOutput']) -> List['PartialTxOutput']: """Merges outputs that are paying to the same address by replacing them with a single larger output.""" output_dict = {} for output in outputs: assert isinstance(output.value, int), "tx outputs with spend-max-like str cannot be merged" if output.scriptpubkey in output_dict: output_dict[output.scriptpubkey].value += output.value else: output_dict[output.scriptpubkey] = copy.copy(output) return list(output_dict.values()) def match_script_against_template(script, template, debug=False) -> bool: """Returns whether 'script' matches 'template'.""" if script is None: return False # optionally decode script now: if isinstance(script, (bytes, bytearray)): try: script = [x for x in script_GetOp(script)] except MalformedBitcoinScript: if debug: _logger.debug(f"malformed script") return False if debug: _logger.debug(f"match script against template: {script}") if len(script) != len(template): if debug: _logger.debug(f"length mismatch {len(script)} != {len(template)}") return False for i in range(len(script)): template_item = template[i] script_item = script[i] if OPPushDataGeneric.is_instance(template_item) and template_item.check_data_len(script_item[0]): continue if OPGeneric.is_instance(template_item) and template_item.match(script_item[0]): continue if template_item != script_item[0]: if debug: _logger.debug(f"item mismatch at position {i}: {template_item} != {script_item[0]}") return False return True def get_script_type_from_output_script(scriptpubkey: bytes) -> Optional[str]: if scriptpubkey is None: return None try: decoded = [x for x in script_GetOp(scriptpubkey)] except MalformedBitcoinScript: return None if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PK): return 'p2pk' if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH): return 'p2pkh' if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH): return 'p2sh' if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WPKH): return 'p2wpkh' if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WSH): return 'p2wsh' if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2TR): return 'p2tr' return None def get_address_from_output_script(_bytes: bytes, *, net=None) -> Optional[str]: try: decoded = [x for x in script_GetOp(_bytes)] except MalformedBitcoinScript: return None # p2pkh if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH): return hash160_to_p2pkh(decoded[2][1], net=net) # p2sh if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH): return hash160_to_p2sh(decoded[1][1], net=net) # segwit address (version 0) if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0): return hash_to_segwit_addr(decoded[1][1], witver=0, net=net) # segwit address (version 1-16) future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1)) for witver, opcode in enumerate(future_witness_versions, start=1): match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)] if match_script_against_template(decoded, match): return hash_to_segwit_addr(decoded[1][1], witver=witver, net=net) return None def parse_input(vds: BCDataStream) -> TxInput: prevout_hash = vds.read_bytes(32)[::-1] prevout_n = vds.read_uint32() prevout = TxOutpoint(txid=prevout_hash, out_idx=prevout_n) script_sig = vds.read_bytes(vds.read_compact_size()) nsequence = vds.read_uint32() return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence) def parse_witness(vds: BCDataStream, txin: TxInput) -> None: n = vds.read_compact_size() witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) txin.witness = construct_witness(witness_elements) def parse_output(vds: BCDataStream) -> TxOutput: value = vds.read_int64() if value > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN: raise SerializationError('invalid output amount (too large)') if value < 0: raise SerializationError('invalid output amount (negative)') scriptpubkey = vds.read_bytes(vds.read_compact_size()) return TxOutput(value=value, scriptpubkey=scriptpubkey) # pay & redeem scripts def multisig_script(public_keys: Sequence[str], m: int) -> bytes: n = len(public_keys) assert 1 <= m <= n <= 15, f'm {m}, n {n}' return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG]) class Transaction: _cached_network_ser: Optional[str] def __str__(self): return self.serialize() def __init__(self, raw): if raw is None: self._cached_network_ser = None elif isinstance(raw, str): self._cached_network_ser = raw.strip() if raw else None assert is_hex_str(self._cached_network_ser) elif isinstance(raw, (bytes, bytearray)): self._cached_network_ser = raw.hex() else: raise Exception(f"cannot initialize transaction from {raw}") self._inputs = None # type: List[TxInput] self._outputs = None # type: List[TxOutput] self._locktime = 0 self._version = 2 self._cached_txid = None # type: Optional[str] @property def locktime(self): self.deserialize() return self._locktime @locktime.setter def locktime(self, value: int): assert isinstance(value, int), f"locktime must be int, not {value!r}" self._locktime = value self.invalidate_ser_cache() @property def version(self): self.deserialize() return self._version @version.setter def version(self, value): self._version = value self.invalidate_ser_cache() def to_json(self) -> dict: d = { 'version': self.version, 'locktime': self.locktime, 'inputs': [txin.to_json() for txin in self.inputs()], 'outputs': [txout.to_json() for txout in self.outputs()], } return d def inputs(self) -> Sequence[TxInput]: if self._inputs is None: self.deserialize() return self._inputs def outputs(self) -> Sequence[TxOutput]: if self._outputs is None: self.deserialize() return self._outputs def deserialize(self) -> None: if self._cached_network_ser is None: return if self._inputs is not None: return raw_bytes = bfh(self._cached_network_ser) vds = BCDataStream() vds.write(raw_bytes) self._version = vds.read_int32() n_vin = vds.read_compact_size() is_segwit = (n_vin == 0) if is_segwit: marker = vds.read_bytes(1) if marker != b'\x01': raise SerializationError('invalid txn marker byte: {}'.format(marker)) n_vin = vds.read_compact_size() if n_vin < 1: raise SerializationError('tx needs to have at least 1 input') txins = [parse_input(vds) for i in range(n_vin)] n_vout = vds.read_compact_size() if n_vout < 1: raise SerializationError('tx needs to have at least 1 output') self._outputs = [parse_output(vds) for i in range(n_vout)] if is_segwit: for txin in txins: parse_witness(vds, txin) self._inputs = txins # only expose field after witness is parsed, for sanity self._locktime = vds.read_uint32() if vds.can_read_more(): raise SerializationError('extra junk at the end') @classmethod def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> bytes: if txin.witness is not None: return txin.witness if txin.is_coinbase_input(): return b"" assert isinstance(txin, PartialTxInput) if not txin.is_segwit(): return construct_witness([]) if estimate_size and hasattr(txin, 'make_witness'): sig_dummy = b'\x00' * 71 # DER-encoded ECDSA sig, with low S and low R txin.witness_sizehint = len(txin.make_witness(sig_dummy)) if estimate_size and txin.witness_sizehint is not None: return bytes(txin.witness_sizehint) dummy_desc = None if estimate_size: dummy_desc = create_dummy_descriptor_from_address(txin.address) if desc := (txin.script_descriptor or dummy_desc): sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.sigs_ecdsa) if sol.witness is not None: return sol.witness return construct_witness([]) raise UnknownTxinType("cannot construct witness") @classmethod def input_script(cls, txin: TxInput, *, estimate_size=False) -> bytes: if txin.script_sig is not None: return txin.script_sig if txin.is_coinbase_input(): return b"" assert isinstance(txin, PartialTxInput) if txin.is_p2sh_segwit() and txin.redeem_script: return construct_script([txin.redeem_script]) if txin.is_native_segwit(): return b"" dummy_desc = None if estimate_size: dummy_desc = create_dummy_descriptor_from_address(txin.address) if desc := (txin.script_descriptor or dummy_desc): if desc.is_segwit(): if redeem_script := desc.expand().redeem_script: return construct_script([redeem_script]) return b"" sol = desc.satisfy(allow_dummy=estimate_size, sigdata=txin.sigs_ecdsa) if sol.script_sig is not None: return sol.script_sig return b"" raise UnknownTxinType("cannot construct scriptSig") def serialize_preimage( self, txin_index: int, *, sighash: Optional[int] = None, sighash_cache: SighashCache = None, ) -> bytes: nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True) nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False) inputs = self.inputs() outputs = self.outputs() txin = inputs[txin_index] if isinstance(txin, PartialTxInput): if sighash is None: sighash = txin.sighash if sighash is None: sighash = Sighash.DEFAULT if txin.is_taproot() else Sighash.ALL assert sighash is not None if not Sighash.is_valid(sighash, is_taproot=txin.is_taproot()): raise Exception(f"SIGHASH_FLAG ({sighash}) not supported!") if sighash_cache is None: sighash_cache = SighashCache() if txin.is_segwit(): if txin.is_taproot(): scache = sighash_cache.get_witver1_data_for_tx(self) sighash_epoch = b"\x00" hash_type = int.to_bytes(sighash, length=1, byteorder="little", signed=False) # txdata preimage_txdata = bytearray() preimage_txdata += nVersion preimage_txdata += nLocktime if sighash & 0x80 != Sighash.ANYONECANPAY: preimage_txdata += scache.sha_prevouts preimage_txdata += scache.sha_amounts preimage_txdata += scache.sha_scriptpubkeys preimage_txdata += scache.sha_sequences if sighash & 3 not in (Sighash.NONE, Sighash.SINGLE): preimage_txdata += scache.sha_outputs # inputdata preimage_inputdata = bytearray() spend_type = bytes([0]) # (ext_flag * 2) + annex_present preimage_inputdata += spend_type if sighash & 0x80 == Sighash.ANYONECANPAY: preimage_inputdata += txin.prevout.serialize_to_network() preimage_inputdata += int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False) preimage_inputdata += var_int(len(txin.scriptpubkey)) + txin.scriptpubkey preimage_inputdata += int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False) else: preimage_inputdata += int.to_bytes(txin_index, length=4, byteorder="little", signed=False) # TODO sha_annex # outputdata preimage_outputdata = bytearray() if sighash & 3 == Sighash.SINGLE: try: txout = outputs[txin_index] except IndexError: raise Exception("Using SIGHASH_SINGLE without a corresponding output") from None # note: we could cache this to avoid some potential DOS vectors: preimage_outputdata += sha256(txout.serialize_to_network()) return bytes(sighash_epoch + hash_type + preimage_txdata + preimage_inputdata + preimage_outputdata) else: # segwit (witness v0) scache = sighash_cache.get_witver0_data_for_tx(self) if not (sighash & Sighash.ANYONECANPAY): hashPrevouts = scache.hashPrevouts else: hashPrevouts = bytes(32) if not (sighash & Sighash.ANYONECANPAY) and (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE: hashSequence = scache.hashSequence else: hashSequence = bytes(32) if (sighash & 0x1f) != Sighash.SINGLE and (sighash & 0x1f) != Sighash.NONE: hashOutputs = scache.hashOutputs elif (sighash & 0x1f) == Sighash.SINGLE and txin_index < len(outputs): # note: we could cache this to avoid some potential DOS vectors: hashOutputs = sha256d(outputs[txin_index].serialize_to_network()) else: hashOutputs = bytes(32) outpoint = txin.prevout.serialize_to_network() preimage_script = txin.get_scriptcode_for_sighash() scriptCode = var_int(len(preimage_script)) + preimage_script amount = int.to_bytes(txin.value_sats(), length=8, byteorder="little", signed=False) nSequence = int.to_bytes(txin.nsequence, length=4, byteorder="little", signed=False) nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False) preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType return preimage else: # legacy sighash (pre-segwit) if sighash != Sighash.ALL: raise Exception(f"SIGHASH_FLAG ({sighash}) not supported! (for legacy sighash)") preimage_script = txin.get_scriptcode_for_sighash() txins = var_int(len(inputs)) + b"".join( txin.serialize_to_network(script_sig=preimage_script if txin_index==k else b"") for k, txin in enumerate(inputs)) txouts = var_int(len(outputs)) + b"".join(o.serialize_to_network() for o in outputs) nHashType = int.to_bytes(sighash, length=4, byteorder="little", signed=False) preimage = nVersion + txins + txouts + nLocktime + nHashType return preimage raise Exception("should not reach this") def verify_sig_for_txin( self, *, txin_index: int, pubkey_bytes: bytes, sig: bytes, sighash_cache: SighashCache = None, ) -> bool: txin = self.inputs()[txin_index] if txin.is_taproot(): raise Exception("not implemented") # TODO else: der_sig, sighash = sig[:-1], sig[-1] pre_hash = self.serialize_preimage(txin_index, sighash=sighash, sighash_cache=sighash_cache) pubkey = ecc.ECPubkey(pubkey_bytes) msg_hash = sha256d(pre_hash) sig64 = ecc.ecdsa_sig64_from_der_sig(der_sig) return pubkey.ecdsa_verify(sig64, msg_hash) def is_segwit(self, *, guess_for_address=False): return any(txin.is_segwit(guess_for_address=guess_for_address) for txin in self.inputs()) def invalidate_ser_cache(self): self._cached_network_ser = None self._cached_txid = None def serialize(self) -> str: if not self._cached_network_ser: self._cached_network_ser = self.serialize_to_network(estimate_size=False, include_sigs=True) return self._cached_network_ser def serialize_as_bytes(self) -> bytes: return bfh(self.serialize()) def serialize_to_network(self, *, estimate_size=False, include_sigs=True, force_legacy=False) -> str: """Serialize the transaction as used on the Bitcoin network, into hex. `include_sigs` signals whether to include scriptSigs and witnesses. `force_legacy` signals to use the pre-segwit format note: (not include_sigs) implies force_legacy """ self.deserialize() nVersion = int.to_bytes(self.version, length=4, byteorder="little", signed=True).hex() nLocktime = int.to_bytes(self.locktime, length=4, byteorder="little", signed=False).hex() inputs = self.inputs() outputs = self.outputs() def create_script_sig(txin: TxInput) -> bytes: if include_sigs: script_sig = self.input_script(txin, estimate_size=estimate_size) return script_sig return b"" txins = var_int(len(inputs)).hex() + ''.join( txin.serialize_to_network(script_sig=create_script_sig(txin)).hex() for txin in inputs) txouts = var_int(len(outputs)).hex() + ''.join(o.serialize_to_network().hex() for o in outputs) use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True) use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit() use_segwit_ser = use_segwit_ser_for_estimate_size or use_segwit_ser_for_actual_use if include_sigs and not force_legacy and use_segwit_ser: marker = '00' flag = '01' witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size).hex() for x in inputs) return nVersion + marker + flag + txins + txouts + witness + nLocktime else: return nVersion + txins + txouts + nLocktime def to_qr_data(self) -> Tuple[str, bool]: """Returns (serialized_tx, is_complete). The tx is serialized to be put inside a QR code. No side-effects. As space in a QR code is limited, some data might have to be omitted. This is signalled via is_complete=False. """ is_complete = True tx = copy.deepcopy(self) # make copy as we mutate tx if isinstance(tx, PartialTransaction): # this makes QR codes a lot smaller (or just possible in the first place!) # note: will not apply if all inputs are taproot, due to new sighash. tx.convert_all_utxos_to_witness_utxos() is_complete = False tx_bytes = tx.serialize_as_bytes() return base_encode(tx_bytes, base=43), is_complete def txid(self) -> Optional[str]: if self._cached_txid is None: self.deserialize() all_segwit = all(txin.is_segwit() for txin in self.inputs()) if not all_segwit and not self.is_complete(): return None try: ser = self.serialize_to_network(force_legacy=True) except UnknownTxinType: # we might not know how to construct scriptSig for some scripts return None self._cached_txid = sha256d(bfh(ser))[::-1].hex() return self._cached_txid def wtxid(self) -> Optional[str]: self.deserialize() if not self.is_complete(): return None try: ser = self.serialize_to_network() except UnknownTxinType: # we might not know how to construct scriptSig/witness for some scripts return None return sha256d(bfh(ser))[::-1].hex() def add_info_from_wallet(self, wallet: 'Abstract_Wallet', **kwargs) -> None: # populate prev_txs for txin in self.inputs(): wallet.add_input_info(txin) async def add_info_from_network( self, network: Optional['Network'], *, ignore_network_issues: bool = True, progress_cb: Callable[[TxinDataFetchProgress], None] = None, timeout=None, ) -> None: """note: it is recommended to call add_info_from_wallet first, as this can save some network requests""" if not self.is_missing_info_from_network(): return if progress_cb is None: progress_cb = lambda *args, **kwargs: None num_tasks_done = 0 num_tasks_total = 0 has_errored = False has_finished = False async def add_info_to_txin(txin: TxInput): nonlocal num_tasks_done, has_errored progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished)) success = await txin.add_info_from_network( network=network, ignore_network_issues=ignore_network_issues, timeout=timeout, ) if success: num_tasks_done += 1 else: has_errored = True progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished)) # schedule a network task for each txin try: async with OldTaskGroup() as group: for txin in self.inputs(): if txin.utxo is None: num_tasks_total += 1 await group.spawn(add_info_to_txin(txin=txin)) except Exception as e: has_errored = True _logger.error(f"tx.add_info_from_network() got exc: {e!r}") finally: has_finished = True progress_cb(TxinDataFetchProgress(num_tasks_done, num_tasks_total, has_errored, has_finished)) def is_missing_info_from_network(self) -> bool: return any(txin.utxo is None for txin in self.inputs()) def add_info_from_wallet_and_network( self, *, wallet: 'Abstract_Wallet', show_error: Callable[[str], None], ) -> bool: """Returns whether successful. note: This is sort of a legacy hack... doing network requests in non-async code. Relatedly, this should *not* be called from the network thread. """ # note side-effect: tx is being mutated from .network import NetworkException, Network self.add_info_from_wallet(wallet) try: if self.is_missing_info_from_network(): Network.run_from_another_thread( self.add_info_from_network(wallet.network, ignore_network_issues=False)) except NetworkException as e: show_error(repr(e)) return False return True def get_time_based_relative_locktime(self) -> Optional[int]: if self.version < 2: return locktimes = list(filter(None, [txin.get_time_based_relative_locktime() for txin in self.inputs()])) return max(locktimes) if locktimes else None def get_block_based_relative_locktime(self) -> Optional[int]: if self.version < 2: return locktimes = list(filter(None, [txin.get_block_based_relative_locktime() for txin in self.inputs()])) return max(locktimes) if locktimes else None def is_rbf_enabled(self) -> bool: """Whether the tx explicitly signals BIP-0125 replace-by-fee.""" return any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()]) def estimated_size(self) -> int: """Return an estimated virtual tx size in vbytes. BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up. This definition is only for humans, and has little meaning otherwise. If we wanted sub-byte precision, fee calculation should use transaction weights, but for simplicity we approximate that with (virtual_size)x4 """ weight = self.estimated_weight() return self.virtual_size_from_weight(weight) @classmethod def estimated_input_weight(cls, txin: TxInput, is_segwit_tx: bool) -> int: """Return an estimate of serialized input weight in weight units.""" script_sig = cls.input_script(txin, estimate_size=True) input_size = len(txin.serialize_to_network(script_sig=script_sig)) if txin.is_segwit(guess_for_address=True): witness_size = len(cls.serialize_witness(txin, estimate_size=True)) else: witness_size = 1 if is_segwit_tx else 0 return 4 * input_size + witness_size @classmethod def estimated_output_size_for_address(cls, address: str) -> int: """Return an estimate of serialized output size in bytes.""" script = bitcoin.address_to_script(address) return cls.estimated_output_size_for_script(script) @classmethod def estimated_output_size_for_script(cls, script: bytes) -> int: """Return an estimate of serialized output size in bytes.""" # 8 byte value + varint script len + script script_len = len(script) var_int_len = len(var_int(script_len)) return 8 + var_int_len + script_len @classmethod def virtual_size_from_weight(cls, weight: int) -> int: return weight // 4 + (weight % 4 > 0) @classmethod def satperbyte_from_satperkw(cls, feerate_kw): """Converts feerate from sat/kw to sat/vbyte.""" return feerate_kw * 4 / 1000 def estimated_total_size(self): """Return an estimated total transaction size in bytes.""" if not self.is_complete() or self._cached_network_ser is None: return len(self.serialize_to_network(estimate_size=True)) // 2 else: return len(self._cached_network_ser) // 2 # ASCII hex string def estimated_witness_size(self): """Return an estimate of witness size in bytes.""" estimate = not self.is_complete() if not self.is_segwit(guess_for_address=estimate): return 0 inputs = self.inputs() witness = b"".join(self.serialize_witness(x, estimate_size=estimate) for x in inputs) witness_size = len(witness) + 2 # include marker and flag return witness_size def estimated_base_size(self): """Return an estimated base transaction size in bytes.""" return self.estimated_total_size() - self.estimated_witness_size() def estimated_weight(self): """Return an estimate of transaction weight.""" total_tx_size = self.estimated_total_size() base_tx_size = self.estimated_base_size() return 3 * base_tx_size + total_tx_size def is_complete(self) -> bool: return True def get_output_idxs_from_scriptpubkey(self, script: bytes) -> Set[int]: """Returns the set indices of outputs with given script.""" assert isinstance(script, bytes) # build cache if there isn't one yet # note: can become stale and return incorrect data # if the tx is modified later; that's out of scope. if not hasattr(self, '_script_to_output_idx'): d = defaultdict(set) for output_idx, o in enumerate(self.outputs()): o_script = o.scriptpubkey d[o_script].add(output_idx) self._script_to_output_idx = d return set(self._script_to_output_idx[script]) # copy def get_output_idxs_from_address(self, addr: str) -> Set[int]: script = bitcoin.address_to_script(addr) return self.get_output_idxs_from_scriptpubkey(script) def replace_output_address(self, old_address: str, new_address: str) -> None: idx = list(self.get_output_idxs_from_address(old_address)) assert len(idx) == 1 amount = self._outputs[idx[0]].value funding_output = PartialTxOutput.from_address_and_value(new_address, amount) old_output = PartialTxOutput.from_address_and_value(old_address, amount) self._outputs.remove(old_output) self.add_outputs([funding_output]) delattr(self, '_script_to_output_idx') def get_change_outputs(self): return [o for o in self._outputs if o.is_change] def has_change(self): return len(self.get_change_outputs()) > 0 def get_dummy_output(self, dummy_addr: str) -> Optional['PartialTxOutput']: idxs = self.get_output_idxs_from_address(dummy_addr) if not idxs: return assert len(idxs) == 1 for i in idxs: return self.outputs()[i] def output_value_for_address(self, addr): # assumes exactly one output has that address for o in self.outputs(): if o.address == addr: return o.value else: raise Exception('output not found', addr) def input_value(self) -> int: input_values = [txin.value_sats() for txin in self.inputs()] if any([val is None for val in input_values]): raise MissingTxInputAmount() return sum(input_values) def output_value(self) -> int: return sum(o.value for o in self.outputs()) def get_fee(self) -> Optional[int]: try: return self.input_value() - self.output_value() except MissingTxInputAmount: return None def get_input_idx_that_spent_prevout(self, prevout: TxOutpoint) -> Optional[int]: # build cache if there isn't one yet # note: can become stale and return incorrect data # if the tx is modified later; that's out of scope. if not hasattr(self, '_prevout_to_input_idx'): d = {} # type: Dict[TxOutpoint, int] for i, txin in enumerate(self.inputs()): d[txin.prevout] = i self._prevout_to_input_idx = d idx = self._prevout_to_input_idx.get(prevout) if idx is not None: assert self.inputs()[idx].prevout == prevout return idx def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str: """Sanitizes tx-describing input (hex/base43/base64) into raw tx hex string.""" if not raw: raise ValueError("empty string") raw_unstripped = raw raw = raw.strip() # try hex try: return binascii.unhexlify(raw).hex() except Exception: pass # try base43 try: return base_decode(raw, base=43).hex() except Exception: pass # try base64 if raw[0:6] in ('cHNidP', b'cHNidP'): # base64 psbt try: return base64.b64decode(raw, validate=True).hex() except Exception: pass # raw bytes (do not strip whitespaces in this case) if isinstance(raw_unstripped, bytes): return raw_unstripped.hex() raise ValueError(f"failed to recognize transaction encoding for txt: {raw[:30]}...") def tx_from_any(raw: Union[str, bytes], *, deserialize: bool = True) -> Union['PartialTransaction', 'Transaction']: if isinstance(raw, bytearray): raw = bytes(raw) raw = convert_raw_tx_to_hex(raw) try: return PartialTransaction.from_raw_psbt(raw) except BadHeaderMagic: if raw[:10] == b'EPTF\xff'.hex(): raise SerializationError("Partial transactions generated with old Electrum versions " "(< 4.0) are no longer supported. Please upgrade Electrum on " "the other machine where this transaction was created.") try: tx = Transaction(raw) if deserialize: tx.deserialize() return tx except Exception as e: raise SerializationError(f"Failed to recognise tx encoding, or to parse transaction. " f"raw: {raw[:30]}...") from e class PSBTGlobalType(IntEnum): UNSIGNED_TX = 0 XPUB = 1 VERSION = 0xFB class PSBTInputType(IntEnum): NON_WITNESS_UTXO = 0 WITNESS_UTXO = 1 PARTIAL_SIG = 2 SIGHASH_TYPE = 3 REDEEM_SCRIPT = 4 WITNESS_SCRIPT = 5 BIP32_DERIVATION = 6 FINAL_SCRIPTSIG = 7 FINAL_SCRIPTWITNESS = 8 TAP_KEY_SIG = 0x13 TAP_MERKLE_ROOT = 0x18 SLIP19_OWNERSHIP_PROOF = 0x19 class PSBTOutputType(IntEnum): REDEEM_SCRIPT = 0 WITNESS_SCRIPT = 1 BIP32_DERIVATION = 2 # Serialization/deserialization tools def deser_compact_size(f) -> Optional[int]: # note: ~inverse of bitcoin.var_int try: nit = f.read(1)[0] except IndexError: return None # end of file if nit == 253: nit = struct.unpack(" Tuple[int, bytes, bytes]: key_size = deser_compact_size(fd) if key_size == 0: raise StopIteration() if key_size is None: raise UnexpectedEndOfStream() full_key = fd.read(key_size) key_type, key = cls.get_keytype_and_key_from_fullkey(full_key) val_size = deser_compact_size(fd) if val_size is None: raise UnexpectedEndOfStream() val = fd.read(val_size) return key_type, key, val @classmethod def create_psbt_writer(cls, fd): def wr(key_type: int, val: bytes, key: bytes = b''): full_key = cls.get_fullkey_from_keytype_and_key(key_type, key) fd.write(var_int(len(full_key))) # key_size fd.write(full_key) # key fd.write(var_int(len(val))) # val_size fd.write(val) # val return wr @classmethod def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]: with io.BytesIO(full_key) as key_stream: key_type = deser_compact_size(key_stream) if key_type is None: raise UnexpectedEndOfStream() key = key_stream.read() return key_type, key @classmethod def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes: key_type_bytes = var_int(key_type) return key_type_bytes + key def _serialize_psbt_section(self, fd): wr = self.create_psbt_writer(fd) self.serialize_psbt_section_kvs(wr) fd.write(b'\x00') # section-separator def parse_psbt_section_kv(self, kt: int, key: bytes, val: bytes) -> None: raise NotImplementedError() # implemented by subclasses def serialize_psbt_section_kvs(self, wr) -> None: raise NotImplementedError() # implemented by subclasses class PartialTxInput(TxInput, PSBTSection): def __init__(self, *args, **kwargs): TxInput.__init__(self, *args, **kwargs) self._witness_utxo = None # type: Optional[TxOutput] self.sigs_ecdsa = {} # type: Dict[bytes, bytes] # pubkey -> sig self.tap_key_sig = None # type: Optional[bytes] # sig for taproot key-path-spending self.sighash = None # type: Optional[int] # note: wrong abstraction level. should be per-signature self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path) self.redeem_script = None # type: Optional[bytes] self.witness_script = None # type: Optional[bytes] self.tap_merkle_root = None # type: Optional[bytes] self.slip_19_ownership_proof = None # type: Optional[bytes] self._unknown = {} # type: Dict[bytes, bytes] self._script_descriptor = None # type: Optional[Descriptor] self.is_mine = False # type: bool # whether the wallet considers the input to be ismine self._trusted_value_sats = None # type: Optional[int] self._trusted_address = None # type: Optional[str] self._is_p2sh_segwit = None # type: Optional[bool] # None means unknown self._is_native_segwit = None # type: Optional[bool] # None means unknown self.witness_sizehint = None # type: Optional[int] # byte size of serialized complete witness, for tx size est @property def witness_utxo(self): return self._witness_utxo @witness_utxo.setter def witness_utxo(self, value: Optional[TxOutput]): self.validate_data(witness_utxo=value) self._witness_utxo = value @property def pubkeys(self) -> Set[bytes]: if desc := self.script_descriptor: return desc.get_all_pubkeys() return set() @property def script_descriptor(self): return self._script_descriptor @script_descriptor.setter def script_descriptor(self, desc: Optional[Descriptor]): self._script_descriptor = desc if desc: if self.redeem_script is None: self.redeem_script = desc.expand().redeem_script if self.witness_script is None: self.witness_script = desc.expand().witness_script def to_json(self): d = super().to_json() d.update({ 'height': self.block_height, 'value_sats': self.value_sats(), 'address': self.address, 'desc': self.script_descriptor.to_string() if self.script_descriptor else None, 'utxo': str(self.utxo) if self.utxo else None, 'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None, 'sighash': self.sighash, 'redeem_script': self.redeem_script.hex() if self.redeem_script else None, 'witness_script': self.witness_script.hex() if self.witness_script else None, 'sigs_ecdsa': {pubkey.hex(): sig.hex() for pubkey, sig in self.sigs_ecdsa.items()}, 'tap_key_sig': self.tap_key_sig.hex() if self.tap_key_sig else None, 'tap_merkle_root': self.tap_merkle_root.hex() if self.tap_merkle_root else None, 'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path)) for pubkey, (xfp, path) in self.bip32_paths.items()}, 'slip_19_ownership_proof': self.slip_19_ownership_proof.hex() if self.slip_19_ownership_proof else None, 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()}, }) return d @classmethod def from_txin(cls, txin: TxInput, *, strip_witness: bool = True) -> 'PartialTxInput': # FIXME: if strip_witness is True, res.is_segwit() will return False, # and res.estimated_size() will return an incorrect value. These methods # will return the correct values after we call add_input_info(). (see dscancel and bump_fee) # This is very fragile: the value returned by estimate_size() depends on the calling order. res = PartialTxInput(prevout=txin.prevout, script_sig=None if strip_witness else txin.script_sig, nsequence=txin.nsequence, witness=None if strip_witness else txin.witness, is_coinbase_output=txin.is_coinbase_output()) res.utxo = txin.utxo return res def validate_data( self, *, for_signing=False, # allow passing provisional fields for 'self', before setting them: utxo: Optional[Transaction] = None, witness_utxo: Optional[TxOutput] = None, ) -> None: utxo = utxo or self.utxo witness_utxo = witness_utxo or self.witness_utxo if utxo: if self.prevout.txid.hex() != utxo.txid(): raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout") if witness_utxo: if utxo.outputs()[self.prevout.out_idx] != witness_utxo: raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If both non-witness UTXO and witness UTXO are provided, they must be consistent") # The following test is disabled, so we are willing to sign non-segwit inputs # without verifying the input amount. This means, given a maliciously modified PSBT, # for non-segwit inputs, we might end up burning coins as miner fees. if for_signing and False: if not self.is_segwit() and witness_utxo: raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If a witness UTXO is provided, no non-witness signature may be created") if self.redeem_script and self.address: addr = hash160_to_p2sh(hash_160(self.redeem_script)) if self.address != addr: raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript") if self.witness_script: if self.redeem_script: if self.redeem_script != bitcoin.p2wsh_nested_script(self.witness_script): raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If a witnessScript is provided, the redeemScript must be for that witnessScript") elif self.address: if self.address != bitcoin.script_to_p2wsh(self.witness_script): raise PSBTInputConsistencyFailure(f"PSBT input validation: " f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript") def parse_psbt_section_kv(self, kt, key, val): try: kt = PSBTInputType(kt) except ValueError: pass # unknown type if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}") if kt == PSBTInputType.NON_WITNESS_UTXO: if self.utxo is not None: raise SerializationError(f"duplicate key: {repr(kt)}") self.utxo = Transaction(val) self.utxo.deserialize() if key: raise SerializationError(f"key for {repr(kt)} must be empty") elif kt == PSBTInputType.WITNESS_UTXO: if self.witness_utxo is not None: raise SerializationError(f"duplicate key: {repr(kt)}") self.witness_utxo = TxOutput.from_network_bytes(val) if key: raise SerializationError(f"key for {repr(kt)} must be empty") elif kt == PSBTInputType.PARTIAL_SIG: if key in self.sigs_ecdsa: raise SerializationError(f"duplicate key: {repr(kt)}") if len(key) not in (33, 65): raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}") self.sigs_ecdsa[key] = val elif kt == PSBTInputType.TAP_KEY_SIG: if self.tap_key_sig is not None: raise SerializationError(f"duplicate key: {repr(kt)}") if len(val) not in (64, 65): raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}") self.tap_key_sig = val if key: raise SerializationError(f"key for {repr(kt)} must be empty") elif kt == PSBTInputType.TAP_MERKLE_ROOT: if self.tap_merkle_root is not None: raise SerializationError(f"duplicate key: {repr(kt)}") if len(val) != 32: raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}") self.tap_merkle_root = val if key: raise SerializationError(f"key for {repr(kt)} must be empty") elif kt == PSBTInputType.SIGHASH_TYPE: if self.sighash is not None: raise SerializationError(f"duplicate key: {repr(kt)}") if len(val) != 4: raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}") self.sighash = struct.unpack(" Optional[int]: if (val := super().value_sats()) is not None: return val if self._trusted_value_sats is not None: return self._trusted_value_sats if self.witness_utxo: return self.witness_utxo.value return None @property def address(self) -> Optional[str]: if (addr := super().address) is not None: return addr if self._trusted_address is not None: return self._trusted_address if self.witness_utxo: return self.witness_utxo.address return None @property def scriptpubkey(self) -> Optional[bytes]: if (spk := super().scriptpubkey) is not None: return spk if self._trusted_address is not None: return bitcoin.address_to_script(self._trusted_address) if self.witness_utxo: return self.witness_utxo.scriptpubkey return None def is_complete(self) -> bool: if self.script_sig is not None and self.witness is not None: return True if self.is_coinbase_input(): return True if self.script_sig is not None and not self.is_segwit(): return True if desc := self.script_descriptor: try: desc.satisfy(allow_dummy=False, sigdata=self.sigs_ecdsa) except MissingSolutionPiece: pass else: return True return False def get_satisfaction_progress(self) -> Tuple[int, int]: if desc := self.script_descriptor: return desc.get_satisfaction_progress(sigdata=self.sigs_ecdsa) return 0, 0 def finalize(self) -> None: def clear_fields_when_finalized(): # BIP-174: "All other data except the UTXO and unknown fields in the # input key-value map should be cleared from the PSBT" self.sigs_ecdsa = {} self.tap_key_sig = None self.tap_merkle_root = None self.sighash = None self.bip32_paths = {} self.redeem_script = None # FIXME: side effect interferes with make_witness # self.witness_script = None if self.script_sig is not None and self.witness is not None: clear_fields_when_finalized() return # already finalized if self.is_complete(): self.script_sig = Transaction.input_script(self) self.witness = Transaction.serialize_witness(self) clear_fields_when_finalized() def combine_with_other_txin(self, other_txin: 'TxInput') -> None: assert self.prevout == other_txin.prevout if other_txin.script_sig is not None: self.script_sig = other_txin.script_sig if other_txin.witness is not None: self.witness = other_txin.witness if isinstance(other_txin, PartialTxInput): if other_txin.witness_utxo: self.witness_utxo = other_txin.witness_utxo if other_txin.utxo: self.utxo = other_txin.utxo self.sigs_ecdsa.update(other_txin.sigs_ecdsa) if other_txin.sighash is not None: self.sighash = other_txin.sighash if other_txin.tap_key_sig is not None: self.tap_key_sig = other_txin.tap_key_sig if other_txin.tap_merkle_root is not None: self.tap_merkle_root = other_txin.tap_merkle_root self.bip32_paths.update(other_txin.bip32_paths) if other_txin.redeem_script is not None: self.redeem_script = other_txin.redeem_script if other_txin.witness_script is not None: self.witness_script = other_txin.witness_script self._unknown.update(other_txin._unknown) self.validate_data() # try to finalize now self.finalize() def convert_utxo_to_witness_utxo(self) -> None: if self.utxo: self._witness_utxo = self.utxo.outputs()[self.prevout.out_idx] self._utxo = None # type: Optional[Transaction] def is_native_segwit(self) -> Optional[bool]: """Whether this input is native segwit (any witness version). None means inconclusive.""" if self._is_native_segwit is None: if self.address: self._is_native_segwit = bitcoin.is_segwit_address(self.address) return self._is_native_segwit def is_p2sh_segwit(self) -> Optional[bool]: """Whether this input is p2sh-embedded-segwit. None means inconclusive.""" if self._is_p2sh_segwit is None: def calc_if_p2sh_segwit_now(): if not (self.address and self.redeem_script): return None if self.address != bitcoin.hash160_to_p2sh(hash_160(self.redeem_script)): # not p2sh address return False try: decoded = [x for x in script_GetOp(self.redeem_script)] except MalformedBitcoinScript: decoded = None # witness version 0 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0): return True # witness version 1-16 future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1)) for witver, opcode in enumerate(future_witness_versions, start=1): match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)] if match_script_against_template(decoded, match): return True return False self._is_p2sh_segwit = calc_if_p2sh_segwit_now() return self._is_p2sh_segwit def is_segwit(self, *, guess_for_address=False) -> bool: """Whether this input is segwit (any witness version).""" if super().is_segwit(): return True if self.is_native_segwit() or self.is_p2sh_segwit(): return True if self.is_native_segwit() is False and self.is_p2sh_segwit() is False: return False if self.witness_script: return True if desc := self.script_descriptor: return desc.is_segwit() if guess_for_address: dummy_desc = create_dummy_descriptor_from_address(self.address) return dummy_desc.is_segwit() return False # can be false-negative def is_taproot(self) -> Optional[bool]: if (is_taproot := super().is_taproot()) is not None: return is_taproot if desc := self.script_descriptor: return desc.is_taproot() return None def already_has_some_signatures(self) -> bool: """Returns whether progress has been made towards completing this input.""" return (self.sigs_ecdsa or self.tap_key_sig is not None or self.script_sig is not None or self.witness is not None) def get_scriptcode_for_sighash(self) -> bytes: """Constructs the scriptcode part of the preimage for OP_CHECKSIG, for a partial txin, to create a new signature. Note: the base impl works by mimicking a consensus-verifier and extracting the required fields from the already complete witness/scriptSig and the scriptpubkey of the funding utxo. In contrast, here we are the one constructing a spending txin, and can have knowledge beyond what will be revealed onchain. """ if self.witness_script: if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(self.witness_script)]: raise Exception('OP_CODESEPARATOR black magic is not supported') return self.witness_script if not self.is_segwit() and self.redeem_script: if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(self.redeem_script)]: raise Exception('OP_CODESEPARATOR black magic is not supported') return self.redeem_script if desc := self.script_descriptor: sc = desc.expand() if script := sc.scriptcode_for_sighash: return script raise Exception(f"don't know scriptcode for descriptor: {desc.to_string()}") raise UnknownTxinType(f'cannot construct preimage_script') class PartialTxOutput(TxOutput, PSBTSection): def __init__(self, *args, **kwargs): TxOutput.__init__(self, *args, **kwargs) self.redeem_script = None # type: Optional[bytes] self.witness_script = None # type: Optional[bytes] self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path) self._unknown = {} # type: Dict[bytes, bytes] self._script_descriptor = None # type: Optional[Descriptor] self.is_mine = False # type: bool # whether the wallet considers the output to be ismine self.is_change = False # type: bool # whether the wallet considers the output to be change self.is_utxo_reserve = False # type: bool # whether this is a change output added to satisfy anchor channel requirements @property def pubkeys(self) -> Set[bytes]: if desc := self.script_descriptor: return desc.get_all_pubkeys() return set() @property def script_descriptor(self): return self._script_descriptor @script_descriptor.setter def script_descriptor(self, desc: Optional[Descriptor]): self._script_descriptor = desc if desc: if self.redeem_script is None: self.redeem_script = desc.expand().redeem_script if self.witness_script is None: self.witness_script = desc.expand().witness_script def to_json(self): d = super().to_json() d.update({ 'desc': self.script_descriptor.to_string() if self.script_descriptor else None, 'redeem_script': self.redeem_script.hex() if self.redeem_script else None, 'witness_script': self.witness_script.hex() if self.witness_script else None, 'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path)) for pubkey, (xfp, path) in self.bip32_paths.items()}, 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()}, }) return d @classmethod def from_txout(cls, txout: TxOutput) -> 'PartialTxOutput': res = PartialTxOutput(scriptpubkey=txout.scriptpubkey, value=txout.value) return res def parse_psbt_section_kv(self, kt, key, val): try: kt = PSBTOutputType(kt) except ValueError: pass # unknown type if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}") if kt == PSBTOutputType.REDEEM_SCRIPT: if self.redeem_script is not None: raise SerializationError(f"duplicate key: {repr(kt)}") self.redeem_script = val if key: raise SerializationError(f"key for {repr(kt)} must be empty") elif kt == PSBTOutputType.WITNESS_SCRIPT: if self.witness_script is not None: raise SerializationError(f"duplicate key: {repr(kt)}") self.witness_script = val if key: raise SerializationError(f"key for {repr(kt)} must be empty") elif kt == PSBTOutputType.BIP32_DERIVATION: if key in self.bip32_paths: raise SerializationError(f"duplicate key: {repr(kt)}") if len(key) not in (33, 65): raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}") self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val) else: full_key = self.get_fullkey_from_keytype_and_key(kt, key) if full_key in self._unknown: raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}') self._unknown[full_key] = val def serialize_psbt_section_kvs(self, wr): if self.redeem_script is not None: wr(PSBTOutputType.REDEEM_SCRIPT, self.redeem_script) if self.witness_script is not None: wr(PSBTOutputType.WITNESS_SCRIPT, self.witness_script) for k in sorted(self.bip32_paths): packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k]) wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k) for full_key, val in sorted(self._unknown.items()): key_type, key = self.get_keytype_and_key_from_fullkey(full_key) wr(key_type, val, key=key) def combine_with_other_txout(self, other_txout: 'TxOutput') -> None: assert self.scriptpubkey == other_txout.scriptpubkey if not isinstance(other_txout, PartialTxOutput): return if other_txout.redeem_script is not None: self.redeem_script = other_txout.redeem_script if other_txout.witness_script is not None: self.witness_script = other_txout.witness_script self.bip32_paths.update(other_txout.bip32_paths) self._unknown.update(other_txout._unknown) class PartialTransaction(Transaction): def __init__(self): Transaction.__init__(self, None) self.xpubs = {} # type: Dict[BIP32Node, Tuple[bytes, Sequence[int]]] # intermediate bip32node -> (xfp, der_prefix) self._inputs = [] # type: List[PartialTxInput] self._outputs = [] # type: List[PartialTxOutput] self._unknown = {} # type: Dict[bytes, bytes] self.rbf_merge_txid = None def to_json(self) -> dict: d = super().to_json() d.update({ 'xpubs': {bip32node.to_xpub(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path)) for bip32node, (xfp, path) in self.xpubs.items()}, 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()}, }) return d @classmethod def from_tx(cls, tx: Transaction, *, strip_witness: bool = True) -> 'PartialTransaction': assert tx res = cls() res._inputs = [PartialTxInput.from_txin(txin, strip_witness=strip_witness) for txin in tx.inputs()] res._outputs = [PartialTxOutput.from_txout(txout) for txout in tx.outputs()] res.version = tx.version res.locktime = tx.locktime return res @classmethod def from_raw_psbt(cls, raw) -> 'PartialTransaction': # auto-detect and decode Base64 and Hex. if raw[0:10].lower() in (b'70736274ff', '70736274ff'): # hex raw = bytes.fromhex(raw) elif raw[0:6] in (b'cHNidP', 'cHNidP'): # base64 raw = base64.b64decode(raw, validate=True) if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff': raise BadHeaderMagic("bad magic") tx = None # type: Optional[PartialTransaction] # We parse the raw stream twice. The first pass is used to find the # PSBT_GLOBAL_UNSIGNED_TX key in the global section and set 'tx'. # The second pass does everything else. with io.BytesIO(raw[5:]) as fd: # parsing "first pass" while True: try: kt, key, val = PSBTSection.get_next_kv_from_fd(fd) except StopIteration: break try: kt = PSBTGlobalType(kt) except ValueError: pass # unknown type if kt == PSBTGlobalType.UNSIGNED_TX: if tx is not None: raise SerializationError(f"duplicate key: {repr(kt)}") if key: raise SerializationError(f"key for {repr(kt)} must be empty") unsigned_tx = Transaction(val.hex()) for txin in unsigned_tx.inputs(): if txin.script_sig or txin.witness: raise SerializationError(f"PSBT {repr(kt)} must have empty scriptSigs and witnesses") tx = PartialTransaction.from_tx(unsigned_tx) if tx is None: raise SerializationError(f"PSBT missing required global section PSBT_GLOBAL_UNSIGNED_TX") with io.BytesIO(raw[5:]) as fd: # parsing "second pass" # global section while True: try: kt, key, val = PSBTSection.get_next_kv_from_fd(fd) except StopIteration: break try: kt = PSBTGlobalType(kt) except ValueError: pass # unknown type if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}") if kt == PSBTGlobalType.UNSIGNED_TX: pass # already handled during "first" parsing pass elif kt == PSBTGlobalType.XPUB: bip32node = BIP32Node.from_bytes(key) if bip32node in tx.xpubs: raise SerializationError(f"duplicate key: {repr(kt)}") xfp, path = unpack_bip32_root_fingerprint_and_int_path(val) if bip32node.depth != len(path): raise SerializationError(f"PSBT global xpub has mismatching depth ({bip32node.depth}) " f"and derivation prefix len ({len(path)})") child_number_of_xpub = int.from_bytes(bip32node.child_number, 'big') if not ((bip32node.depth == 0 and child_number_of_xpub == 0) or (bip32node.depth != 0 and child_number_of_xpub == path[-1])): raise SerializationError(f"PSBT global xpub has inconsistent child_number and derivation prefix") tx.xpubs[bip32node] = xfp, path elif kt == PSBTGlobalType.VERSION: if len(val) > 4: raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)} > 4") psbt_version = int.from_bytes(val, byteorder='little', signed=False) if psbt_version > 0: raise SerializationError(f"Only PSBTs with version 0 are supported. Found version: {psbt_version}") if key: raise SerializationError(f"key for {repr(kt)} must be empty") else: full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key) if full_key in tx._unknown: raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}') tx._unknown[full_key] = val try: # inputs sections for txin in tx.inputs(): if DEBUG_PSBT_PARSING: print("-> new input starts") txin._populate_psbt_fields_from_fd(fd) # outputs sections for txout in tx.outputs(): if DEBUG_PSBT_PARSING: print("-> new output starts") txout._populate_psbt_fields_from_fd(fd) except UnexpectedEndOfStream: raise UnexpectedEndOfStream('Unexpected end of stream. Num input and output maps provided does not match unsigned tx.') from None if fd.read(1) != b'': raise SerializationError("extra junk at the end of PSBT") for txin in tx.inputs(): txin.validate_data() return tx def requires_keystore(self): """ Returns True if signing will require private keys from the keystore Called by txbatcher in order to know if a password is needed """ return not all(hasattr(txin, 'make_witness') for txin in self.inputs()) @classmethod def from_io( cls, inputs: Sequence[PartialTxInput], outputs: Sequence[PartialTxOutput], *, locktime: int = None, version: int = None, BIP69_sort: bool = True ) -> 'PartialTransaction': self = cls() self._inputs = list(inputs) self._outputs = list(outputs) if locktime is not None: self.locktime = locktime if version is not None: self.version = version if BIP69_sort: self.BIP69_sort() return self def _serialize_psbt(self, fd) -> None: wr = PSBTSection.create_psbt_writer(fd) fd.write(b'psbt\xff') # global section wr(PSBTGlobalType.UNSIGNED_TX, bfh(self.serialize_to_network(include_sigs=False))) for bip32node, (xfp, path) in sorted(self.xpubs.items()): val = pack_bip32_root_fingerprint_and_int_path(xfp, path) wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes()) for full_key, val in sorted(self._unknown.items()): key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key) wr(key_type, val, key=key) fd.write(b'\x00') # section-separator # input sections for inp in self._inputs: inp._serialize_psbt_section(fd) # output sections for outp in self._outputs: outp._serialize_psbt_section(fd) def finalize_psbt(self) -> None: for txin in self.inputs(): txin.finalize() def combine_with_other_psbt(self, other_tx: 'Transaction') -> None: """Pulls in all data from other_tx we don't yet have (e.g. signatures). other_tx must be concerning the same unsigned tx. """ if self.serialize_to_network(include_sigs=False) != other_tx.serialize_to_network(include_sigs=False): raise Exception('A Combiner must not combine two different PSBTs.') # BIP-174: "The resulting PSBT must contain all of the key-value pairs from each of the PSBTs. # The Combiner must remove any duplicate key-value pairs, in accordance with the specification." # global section if isinstance(other_tx, PartialTransaction): self.xpubs.update(other_tx.xpubs) self._unknown.update(other_tx._unknown) # input sections for txin, other_txin in zip(self.inputs(), other_tx.inputs()): txin.combine_with_other_txin(other_txin) # output sections for txout, other_txout in zip(self.outputs(), other_tx.outputs()): txout.combine_with_other_txout(other_txout) self.invalidate_ser_cache() def join_with_other_psbt(self, other_tx: 'PartialTransaction', *, config: 'SimpleConfig') -> None: """Adds inputs and outputs from other_tx into this one.""" if not isinstance(other_tx, PartialTransaction): raise Exception('Can only join partial transactions.') # make sure there are no duplicate prevouts prevouts = set() for txin in itertools.chain(self.inputs(), other_tx.inputs()): prevout_str = txin.prevout.to_str() if prevout_str in prevouts: raise Exception(f"Duplicate inputs! " f"Transactions that spend the same prevout cannot be joined.") prevouts.add(prevout_str) # copy global PSBT section self.xpubs.update(other_tx.xpubs) self._unknown.update(other_tx._unknown) # copy and add inputs and outputs self.add_inputs(list(other_tx.inputs())) self.add_outputs(list(other_tx.outputs()), merge_duplicates=config.WALLET_MERGE_DUPLICATE_OUTPUTS) self.remove_signatures() self.invalidate_ser_cache() def inputs(self) -> Sequence[PartialTxInput]: return self._inputs def outputs(self) -> Sequence[PartialTxOutput]: return self._outputs def add_inputs(self, inputs: List[PartialTxInput], BIP69_sort=True) -> None: self._inputs.extend(inputs) if BIP69_sort: self.BIP69_sort(outputs=False) self.invalidate_ser_cache() def add_outputs(self, outputs: List[PartialTxOutput], *, merge_duplicates: bool = False, BIP69_sort: bool = True) -> None: self._outputs.extend(outputs) if merge_duplicates: self._outputs = merge_duplicate_tx_outputs(self._outputs) if BIP69_sort: self.BIP69_sort(inputs=False) self.invalidate_ser_cache() def set_rbf(self, rbf: bool) -> None: nSequence = 0xffffffff - (2 if rbf else 1) for txin in self.inputs(): txin.nsequence = nSequence self.invalidate_ser_cache() def BIP69_sort(self, inputs=True, outputs=True): # NOTE: other parts of the code rely on these sorts being *stable* sorts if inputs: self._inputs.sort(key = lambda i: (i.prevout.txid, i.prevout.out_idx)) if outputs: self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey)) self.invalidate_ser_cache() def sign(self, keypairs: Mapping[bytes, bytes]) -> None: # keypairs: pubkey_bytes -> secret_bytes sighash_cache = SighashCache() for i, txin in enumerate(self.inputs()): for pubkey in txin.pubkeys: if txin.is_complete(): break if pubkey not in keypairs: continue _logger.info(f"adding signature for {pubkey.hex()}. spending utxo {txin.prevout.to_str()}") sec = keypairs[pubkey] sig = self.sign_txin(i, sec, sighash_cache=sighash_cache) self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig) _logger.debug(f"tx.sign() finished. is_complete={self.is_complete()}") self.invalidate_ser_cache() def sign_txin( self, txin_index: int, privkey_bytes: bytes, *, sighash_cache: SighashCache = None, ) -> bytes: txin = self.inputs()[txin_index] txin.validate_data(for_signing=True) pre_hash = self.serialize_preimage(txin_index, sighash_cache=sighash_cache) if txin.is_taproot(): # note: privkey_bytes is the internal key merkle_root = txin.tap_merkle_root or bytes() output_privkey_bytes = taproot_tweak_seckey(privkey_bytes, merkle_root) output_privkey = ecc.ECPrivkey(output_privkey_bytes) msg_hash = bip340_tagged_hash(b"TapSighash", pre_hash) sig = output_privkey.schnorr_sign(msg_hash) sighash = txin.sighash if txin.sighash is not None else Sighash.DEFAULT else: privkey = ecc.ECPrivkey(privkey_bytes) msg_hash = sha256d(pre_hash) sig = privkey.ecdsa_sign(msg_hash, sigencode=ecc.ecdsa_der_sig_from_r_and_s) sighash = txin.sighash if txin.sighash is not None else Sighash.ALL return sig + Sighash.to_sigbytes(sighash) def is_complete(self) -> bool: return all([txin.is_complete() for txin in self.inputs()]) def signature_count(self) -> Tuple[int, int]: nhave, nreq = 0, 0 for txin in self.inputs(): a, b = txin.get_satisfaction_progress() nhave += a nreq += b return nhave, nreq def serialize(self) -> str: """Returns PSBT as base64 text, or raw hex of network tx (if complete).""" self.finalize_psbt() # FIXME this side-effects self if self.is_complete(): return Transaction.serialize(self) return self._serialize_as_base64() def serialize_as_bytes(self, *, force_psbt: bool = False) -> bytes: """Returns PSBT as raw bytes, or raw bytes of network tx (if complete).""" self.finalize_psbt() # FIXME this side-effects self if force_psbt or not self.is_complete(): with io.BytesIO() as fd: self._serialize_psbt(fd) return fd.getvalue() else: return Transaction.serialize_as_bytes(self) def _serialize_as_base64(self) -> str: raw_bytes = self.serialize_as_bytes() return base64.b64encode(raw_bytes).decode('ascii') def update_signatures(self, signatures: Sequence[Union[bytes, None]]) -> None: """Add new signatures to a transaction `signatures` is expected to be a list of sigs with signatures[i] intended for self._inputs[i]. This is used by the Trezor, KeepKey and Safe-T plugins. """ if self.is_complete(): return if len(self.inputs()) != len(signatures): raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures))) for i, txin in enumerate(self.inputs()): sig = signatures[i] if sig is None: continue if sig in list(txin.sigs_ecdsa.values()): continue msg_hash = sha256d(self.serialize_preimage(i)) sig64 = ecc.ecdsa_sig64_from_der_sig(sig[:-1]) for recid in range(4): try: public_key = ecc.ECPubkey.from_ecdsa_sig64(sig64, recid, msg_hash) except ecc.InvalidECPointException: # the point might not be on the curve for some recid values continue pubkey_bytes = public_key.get_public_key_bytes(compressed=True) if pubkey_bytes in txin.pubkeys: if not public_key.ecdsa_verify(sig64, msg_hash): continue _logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_bytes.hex()}, sig={sig.hex()}") self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes, sig=sig) break # redo raw self.invalidate_ser_cache() def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: bytes, sig: bytes) -> None: txin = self._inputs[txin_idx] txin.sigs_ecdsa[signing_pubkey] = sig # force re-serialization txin.script_sig = None txin.witness = None self.invalidate_ser_cache() def add_info_from_wallet( self, wallet: 'Abstract_Wallet', *, include_xpubs: bool = False, ) -> None: if self.is_complete(): return # only include xpubs for multisig wallets; currently only they need it in practice # note: coldcard fw have a limitation that if they are included then all # inputs are assumed to be multisig... https://github.com/spesmilo/electrum/pull/5440#issuecomment-549504761 # note: trezor plugin needs xpubs included, if there are multisig inputs/change_outputs from .wallet import Multisig_Wallet if include_xpubs and isinstance(wallet, Multisig_Wallet): from .keystore import Xpub for ks in wallet.get_keystores(): if isinstance(ks, Xpub): fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx( der_suffix=[], only_der_suffix=False) xpub = ks.get_xpub_to_be_used_in_partial_tx(only_der_suffix=False) bip32node = BIP32Node.from_xkey(xpub) self.xpubs[bip32node] = (fp_bytes, der_full) for txin in self.inputs(): wallet.add_input_info( txin, only_der_suffix=False, ) for txout in self.outputs(): wallet.add_output_info( txout, only_der_suffix=False, ) def remove_xpubs_and_bip32_paths(self) -> None: self.xpubs.clear() for txin in self.inputs(): txin.bip32_paths.clear() for txout in self.outputs(): txout.bip32_paths.clear() def prepare_for_export_for_coinjoin(self) -> None: """Removes all sensitive details.""" # globals self.xpubs.clear() self._unknown.clear() # inputs for txin in self.inputs(): txin.bip32_paths.clear() # outputs for txout in self.outputs(): txout.redeem_script = None txout.witness_script = None txout.bip32_paths.clear() txout._unknown.clear() async def prepare_for_export_for_hardware_device(self, wallet: 'Abstract_Wallet') -> None: self.add_info_from_wallet(wallet, include_xpubs=True) await self.add_info_from_network(wallet.network) # log warning if PSBT_*_BIP32_DERIVATION fields cannot be filled with full path due to missing info from .keystore import Xpub def is_ks_missing_info(ks): return (isinstance(ks, Xpub) and (ks.get_root_fingerprint() is None or ks.get_derivation_prefix() is None)) if any([is_ks_missing_info(ks) for ks in wallet.get_keystores()]): _logger.warning('PSBT was requested to be filled with full bip32 paths but ' 'some keystores lacked either the derivation prefix or the root fingerprint') def convert_all_utxos_to_witness_utxos(self) -> None: """Replaces all NON-WITNESS-UTXOs with WITNESS-UTXOs. This will likely make an exported PSBT invalid spec-wise, but it makes e.g. QR codes significantly smaller. """ for txin in self.inputs(): txin.convert_utxo_to_witness_utxo() def remove_signatures(self): for txin in self.inputs(): txin.sigs_ecdsa = {} txin.tap_key_sig = None txin.script_sig = None txin.witness = None assert not self.is_complete() self.invalidate_ser_cache() def pack_bip32_root_fingerprint_and_int_path(xfp: bytes, path: Sequence[int]) -> bytes: if len(xfp) != 4: raise Exception(f'unexpected xfp length. xfp={xfp}') return xfp + b''.join(i.to_bytes(4, byteorder='little', signed=False) for i in path) def unpack_bip32_root_fingerprint_and_int_path(path: bytes) -> Tuple[bytes, Sequence[int]]: if len(path) % 4 != 0: raise Exception(f'unexpected packed path length. path={path.hex()}') xfp = path[0:4] int_path = [int.from_bytes(b, byteorder='little', signed=False) for b in chunks(path[4:], 4)] return xfp, int_path