Files
pallectrum/electrum/descriptor.py
SomberNight 2f1095510c bitcoin.py/transaction.py: API changes: rm most hex usage
Instead of some functions operating with hex strings,
and others using bytes, this consolidates most things to use bytes.

This mainly focuses on bitcoin.py and transaction.py,
and then adapts the API usages in other files.

Notably,
- scripts,
- pubkeys,
- signatures
should be bytes in almost all places now.
2024-04-29 17:10:26 +00:00

1048 lines
39 KiB
Python

# Copyright (c) 2017 Andrew Chow
# Copyright (c) 2023 The Electrum developers
# Distributed under the MIT software license, see the accompanying
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
#
# forked from https://github.com/bitcoin-core/HWI/blob/5f300d3dee7b317a6194680ad293eaa0962a3cc7/hwilib/descriptor.py
#
# Output Script Descriptors
# See https://github.com/bitcoin/bitcoin/blob/master/doc/descriptors.md
#
# TODO allow xprv
# TODO hardened derivation
# TODO allow WIF privkeys
# TODO impl ADDR descriptors
# TODO impl RAW descriptors
import enum
from .bip32 import convert_bip32_strpath_to_intpath, BIP32Node, KeyOriginInfo, BIP32_PRIME
from . import bitcoin
from .bitcoin import construct_script, opcodes, construct_witness
from . import constants
from .crypto import hash_160, sha256
from . import ecc
from . import segwit_addr
from .util import bfh
from binascii import unhexlify
from enum import Enum
from typing import (
List,
NamedTuple,
Optional,
Tuple,
Sequence,
Mapping,
Set,
)
MAX_TAPROOT_NODES = 128
# we guess that signatures will be 72 bytes long
# note: DER-encoded ECDSA signatures are 71 or 72 bytes in practice
# See https://bitcoin.stackexchange.com/questions/77191/what-is-the-maximum-size-of-a-der-encoded-ecdsa-signature
# We assume low S (as that is a bitcoin standardness rule).
# We do not assume low R (even though the sigs we create conform), as external sigs,
# e.g. from a hw signer cannot be expected to have a low R.
DUMMY_DER_SIG = 72 * b"\x00"
class ExpandedScripts:
def __init__(
self,
*,
output_script: bytes, # "scriptPubKey"
redeem_script: Optional[bytes] = None,
witness_script: Optional[bytes] = None,
scriptcode_for_sighash: Optional[bytes] = None
):
self.output_script = output_script
self.redeem_script = redeem_script
self.witness_script = witness_script
self.scriptcode_for_sighash = scriptcode_for_sighash
@property
def scriptcode_for_sighash(self) -> Optional[bytes]:
if self._scriptcode_for_sighash:
return self._scriptcode_for_sighash
return self.witness_script or self.redeem_script or self.output_script
@scriptcode_for_sighash.setter
def scriptcode_for_sighash(self, value: Optional[bytes]):
self._scriptcode_for_sighash = value
def address(self, *, net=None) -> Optional[str]:
return bitcoin.script_to_address(self.output_script, net=net)
class ScriptSolutionInner(NamedTuple):
witness_items: Optional[Sequence] = None
class ScriptSolutionTop(NamedTuple):
witness: Optional[bytes] = None
script_sig: Optional[bytes] = None
class MissingSolutionPiece(Exception): pass
def PolyMod(c: int, val: int) -> int:
"""
:meta private:
Function to compute modulo over the polynomial used for descriptor checksums
From: https://github.com/bitcoin/bitcoin/blob/master/src/script/descriptor.cpp
"""
c0 = c >> 35
c = ((c & 0x7ffffffff) << 5) ^ val
if (c0 & 1):
c ^= 0xf5dee51989
if (c0 & 2):
c ^= 0xa9fdca3312
if (c0 & 4):
c ^= 0x1bab10e32d
if (c0 & 8):
c ^= 0x3706b1677a
if (c0 & 16):
c ^= 0x644d626ffd
return c
_INPUT_CHARSET = "0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ "
_INPUT_CHARSET_INV = {c: i for (i, c) in enumerate(_INPUT_CHARSET)}
_CHECKSUM_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
def DescriptorChecksum(desc: str) -> str:
"""
Compute the checksum for a descriptor
:param desc: The descriptor string to compute a checksum for
:return: A checksum
"""
c = 1
cls = 0
clscount = 0
for ch in desc:
try:
pos = _INPUT_CHARSET_INV[ch]
except KeyError:
return ""
c = PolyMod(c, pos & 31)
cls = cls * 3 + (pos >> 5)
clscount += 1
if clscount == 3:
c = PolyMod(c, cls)
cls = 0
clscount = 0
if clscount > 0:
c = PolyMod(c, cls)
for j in range(0, 8):
c = PolyMod(c, 0)
c ^= 1
ret = [''] * 8
for j in range(0, 8):
ret[j] = _CHECKSUM_CHARSET[(c >> (5 * (7 - j))) & 31]
return ''.join(ret)
def AddChecksum(desc: str) -> str:
"""
Compute and attach the checksum for a descriptor
:param desc: The descriptor string to add a checksum to
:return: Descriptor with checksum
"""
return desc + "#" + DescriptorChecksum(desc)
class PubkeyProvider(object):
"""
A public key expression in a descriptor.
Can contain the key origin info, the pubkey itself, and subsequent derivation paths for derivation from the pubkey
The pubkey can be a typical pubkey or an extended pubkey.
"""
def __init__(
self,
origin: Optional['KeyOriginInfo'],
pubkey: str,
deriv_path: Optional[str]
) -> None:
"""
:param origin: The key origin if one is available
:param pubkey: The public key. Either a hex string or a serialized extended pubkey
:param deriv_path: Additional derivation path (suffix) if the pubkey is an extended pubkey
"""
self.origin = origin
self.pubkey = pubkey
self.deriv_path = deriv_path
if deriv_path:
wildcard_count = deriv_path.count("*")
if wildcard_count > 1:
raise ValueError("only one wildcard(*) is allowed in a descriptor")
if wildcard_count == 1:
if deriv_path[-1] != "*":
raise ValueError("wildcard in descriptor only allowed in last position")
if deriv_path[0] != "/":
raise ValueError(f"deriv_path suffix must start with a '/'. got {deriv_path!r}")
# Make ExtendedKey from pubkey if it isn't hex
self.extkey = None
try:
unhexlify(self.pubkey)
# Is hex, normal pubkey
except Exception:
# Not hex, maybe xpub (but don't allow ypub/zpub)
self.extkey = BIP32Node.from_xkey(pubkey, allow_custom_headers=False)
if deriv_path and self.extkey is None:
raise ValueError("deriv_path suffix present for simple pubkey")
@classmethod
def parse(cls, s: str) -> 'PubkeyProvider':
"""
Deserialize a key expression from the string into a ``PubkeyProvider``.
:param s: String containing the key expression
:return: A new ``PubkeyProvider`` containing the details given by ``s``
"""
origin = None
deriv_path = None
if s[0] == "[":
end = s.index("]")
origin = KeyOriginInfo.from_string(s[1:end])
s = s[end + 1:]
pubkey = s
slash_idx = s.find("/")
if slash_idx != -1:
pubkey = s[:slash_idx]
deriv_path = s[slash_idx:]
return cls(origin, pubkey, deriv_path)
def to_string(self) -> str:
"""
Serialize the pubkey expression to a string to be used in a descriptor
:return: The pubkey expression as a string
"""
s = ""
if self.origin:
s += "[{}]".format(self.origin.to_string())
s += self.pubkey
if self.deriv_path:
s += self.deriv_path
return s
def get_pubkey_bytes(self, *, pos: Optional[int] = None) -> bytes:
if self.is_range() and pos is None:
raise ValueError("pos must be set for ranged descriptor")
# note: if not ranged, we ignore pos.
if self.extkey is not None:
compressed = True # bip32 implies compressed pubkeys
if self.deriv_path is None:
assert not self.is_range()
return self.extkey.eckey.get_public_key_bytes(compressed=compressed)
else:
path_str = self.deriv_path[1:]
if self.is_range():
assert path_str[-1] == "*"
path_str = path_str[:-1] + str(pos)
path = convert_bip32_strpath_to_intpath(path_str)
child_key = self.extkey.subkey_at_public_derivation(path)
return child_key.eckey.get_public_key_bytes(compressed=compressed)
else:
assert not self.is_range()
return unhexlify(self.pubkey)
def get_full_derivation_path(self, *, pos: Optional[int] = None) -> str:
"""
Returns the full derivation path at the given position, including the origin
"""
if self.is_range() and pos is None:
raise ValueError("pos must be set for ranged descriptor")
path = self.origin.get_derivation_path() if self.origin is not None else "m"
path += self.deriv_path if self.deriv_path is not None else ""
if path[-1] == "*":
path = path[:-1] + str(pos)
return path
def get_full_derivation_int_list(self, *, pos: Optional[int] = None) -> List[int]:
"""
Returns the full derivation path as an integer list at the given position.
Includes the origin and master key fingerprint as an int
"""
if self.is_range() and pos is None:
raise ValueError("pos must be set for ranged descriptor")
path: List[int] = self.origin.get_full_int_list() if self.origin is not None else []
path.extend(self.get_der_suffix_int_list(pos=pos))
return path
def get_der_suffix_int_list(self, *, pos: Optional[int] = None) -> List[int]:
if not self.deriv_path:
return []
der_suffix = self.deriv_path
assert (wc_count := der_suffix.count("*")) <= 1, wc_count
der_suffix = der_suffix.replace("*", str(pos))
return convert_bip32_strpath_to_intpath(der_suffix)
def __lt__(self, other: 'PubkeyProvider') -> bool:
return self.pubkey < other.pubkey
def is_range(self) -> bool:
if not self.deriv_path:
return False
if self.deriv_path[-1] == "*": # TODO hardened
return True
return False
def has_uncompressed_pubkey(self) -> bool:
if self.is_range(): # bip32 implies compressed
return False
return b"\x04" == self.get_pubkey_bytes()[:1]
class Descriptor(object):
r"""
An abstract class for Descriptors themselves.
Descriptors can contain multiple :class:`PubkeyProvider`\ s and multiple ``Descriptor`` as subdescriptors.
Note: a significant portion of input validation logic is in parse_descriptor(),
maybe these checks should be moved to (or also done in) this class?
For example, sh() must be top-level, or segwit mandates compressed pubkeys,
or bare-multisig cannot have >3 pubkeys.
"""
def __init__(
self,
pubkeys: List['PubkeyProvider'],
subdescriptors: List['Descriptor'],
name: str
) -> None:
r"""
:param pubkeys: The :class:`PubkeyProvider`\ s that are part of this descriptor
:param subdescriptor: The ``Descriptor``\ s that are part of this descriptor
:param name: The name of the function for this descriptor
"""
self.pubkeys = pubkeys
self.subdescriptors = subdescriptors
self.name = name
def to_string_no_checksum(self) -> str:
"""
Serializes the descriptor as a string without the descriptor checksum
:return: The descriptor string
"""
return "{}({}{})".format(
self.name,
",".join([p.to_string() for p in self.pubkeys]),
self.subdescriptors[0].to_string_no_checksum() if len(self.subdescriptors) > 0 else ""
)
def to_string(self) -> str:
"""
Serializes the descriptor as a string with the checksum
:return: The descriptor with a checksum
"""
return AddChecksum(self.to_string_no_checksum())
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
"""
Returns the scripts for a descriptor at the given `pos` for ranged descriptors.
"""
raise NotImplementedError("The Descriptor base class does not implement this method")
def _satisfy_inner(
self,
*,
sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig
allow_dummy: bool = False,
) -> ScriptSolutionInner:
raise NotImplementedError("The Descriptor base class does not implement this method")
def satisfy(
self,
*,
sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig
allow_dummy: bool = False,
) -> ScriptSolutionTop:
"""Construct a witness and/or scriptSig to be used in a txin, to satisfy the bitcoin SCRIPT.
Raises MissingSolutionPiece if satisfaction is not yet possible due to e.g. missing a signature,
unless `allow_dummy` is set to True, in which case dummy data is used where needed (e.g. for size estimation).
"""
assert not self.is_range()
sol = self._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy)
witness = None
script_sig = None
if self.is_segwit():
witness = construct_witness(sol.witness_items)
else:
script_sig = construct_script(sol.witness_items)
return ScriptSolutionTop(
witness=witness,
script_sig=script_sig,
)
def get_satisfaction_progress(
self,
*,
sigdata: Mapping[bytes, bytes] = None, # pubkey -> sig
) -> Tuple[int, int]:
"""Returns (num_sigs_we_have, num_sigs_required) towards satisfying this script.
Besides signatures, later this can also consider hash-preimages.
"""
assert not self.is_range()
nhave, nreq = 0, 0
for desc in self.subdescriptors:
a, b = desc.get_satisfaction_progress(sigdata=sigdata)
nhave += a
nreq += b
return nhave, nreq
def is_range(self) -> bool:
for pubkey in self.pubkeys:
if pubkey.is_range():
return True
for desc in self.subdescriptors:
if desc.is_range():
return True
return False
def is_segwit(self) -> bool:
return any([desc.is_segwit() for desc in self.subdescriptors])
def get_all_pubkeys(self) -> Set[bytes]:
"""Returns set of pubkeys that appear at any level in this descriptor."""
assert not self.is_range()
all_pubkeys = set([p.get_pubkey_bytes() for p in self.pubkeys])
for desc in self.subdescriptors:
all_pubkeys |= desc.get_all_pubkeys()
return all_pubkeys
def get_simple_singlesig(self) -> Optional['Descriptor']:
"""Returns innermost pk/pkh/wpkh descriptor, or None if we are not a simple singlesig.
note: besides pk,pkh,sh(wpkh),wpkh, overly complicated stuff such as sh(pk),wsh(sh(pkh),etc is also accepted
"""
if len(self.subdescriptors) == 1:
return self.subdescriptors[0].get_simple_singlesig()
return None
def get_simple_multisig(self) -> Optional['MultisigDescriptor']:
"""Returns innermost multi descriptor, or None if we are not a simple multisig."""
if len(self.subdescriptors) == 1:
return self.subdescriptors[0].get_simple_multisig()
return None
def to_legacy_electrum_script_type(self) -> str:
if isinstance(self, PKDescriptor):
return "p2pk"
elif isinstance(self, PKHDescriptor):
return "p2pkh"
elif isinstance(self, WPKHDescriptor):
return "p2wpkh"
elif isinstance(self, SHDescriptor) and isinstance(self.subdescriptors[0], WPKHDescriptor):
return "p2wpkh-p2sh"
elif isinstance(self, SHDescriptor) and isinstance(self.subdescriptors[0], MultisigDescriptor):
return "p2sh"
elif isinstance(self, WSHDescriptor) and isinstance(self.subdescriptors[0], MultisigDescriptor):
return "p2wsh"
elif (isinstance(self, SHDescriptor) and isinstance(self.subdescriptors[0], WSHDescriptor)
and isinstance(self.subdescriptors[0].subdescriptors[0], MultisigDescriptor)):
return "p2wsh-p2sh"
return "unknown"
class PKDescriptor(Descriptor):
"""
A descriptor for ``pk()`` descriptors
"""
def __init__(
self,
pubkey: 'PubkeyProvider'
) -> None:
"""
:param pubkey: The :class:`PubkeyProvider` for this descriptor
"""
super().__init__([pubkey], [], "pk")
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
pubkey = self.pubkeys[0].get_pubkey_bytes(pos=pos)
script = construct_script([pubkey, opcodes.OP_CHECKSIG])
return ExpandedScripts(output_script=script)
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner:
if sigdata is None: sigdata = {}
assert not self.is_range()
assert not self.subdescriptors
pubkey = self.pubkeys[0].get_pubkey_bytes()
sig = sigdata.get(pubkey)
if sig is None and allow_dummy:
sig = DUMMY_DER_SIG
if sig is None:
raise MissingSolutionPiece(f"no sig for {pubkey.hex()}")
return ScriptSolutionInner(
witness_items=(sig,),
)
def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]:
if sigdata is None: sigdata = {}
signatures = list(sigdata.values())
return len(signatures), 1
def get_simple_singlesig(self) -> Optional['Descriptor']:
return self
class PKHDescriptor(Descriptor):
"""
A descriptor for ``pkh()`` descriptors
"""
def __init__(
self,
pubkey: 'PubkeyProvider'
) -> None:
"""
:param pubkey: The :class:`PubkeyProvider` for this descriptor
"""
super().__init__([pubkey], [], "pkh")
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
pubkey = self.pubkeys[0].get_pubkey_bytes(pos=pos)
pkh = hash_160(pubkey)
script = bitcoin.pubkeyhash_to_p2pkh_script(pkh)
return ExpandedScripts(output_script=script)
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner:
if sigdata is None: sigdata = {}
assert not self.is_range()
assert not self.subdescriptors
pubkey = self.pubkeys[0].get_pubkey_bytes()
sig = sigdata.get(pubkey)
if sig is None and allow_dummy:
sig = DUMMY_DER_SIG
if sig is None:
raise MissingSolutionPiece(f"no sig for {pubkey.hex()}")
return ScriptSolutionInner(
witness_items=(sig, pubkey),
)
def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]:
if sigdata is None: sigdata = {}
signatures = list(sigdata.values())
return len(signatures), 1
def get_simple_singlesig(self) -> Optional['Descriptor']:
return self
class WPKHDescriptor(Descriptor):
"""
A descriptor for ``wpkh()`` descriptors
"""
def __init__(
self,
pubkey: 'PubkeyProvider'
) -> None:
"""
:param pubkey: The :class:`PubkeyProvider` for this descriptor
"""
super().__init__([pubkey], [], "wpkh")
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
pkh = hash_160(self.pubkeys[0].get_pubkey_bytes(pos=pos))
output_script = construct_script([0, pkh])
scriptcode = bitcoin.pubkeyhash_to_p2pkh_script(pkh)
return ExpandedScripts(
output_script=output_script,
scriptcode_for_sighash=scriptcode,
)
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner:
if sigdata is None: sigdata = {}
assert not self.is_range()
assert not self.subdescriptors
pubkey = self.pubkeys[0].get_pubkey_bytes()
sig = sigdata.get(pubkey)
if sig is None and allow_dummy:
sig = DUMMY_DER_SIG
if sig is None:
raise MissingSolutionPiece(f"no sig for {pubkey.hex()}")
return ScriptSolutionInner(
witness_items=(sig, pubkey),
)
def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]:
if sigdata is None: sigdata = {}
signatures = list(sigdata.values())
return len(signatures), 1
def is_segwit(self) -> bool:
return True
def get_simple_singlesig(self) -> Optional['Descriptor']:
return self
class MultisigDescriptor(Descriptor):
"""
A descriptor for ``multi()`` and ``sortedmulti()`` descriptors
"""
def __init__(
self,
pubkeys: List['PubkeyProvider'],
thresh: int,
is_sorted: bool
) -> None:
r"""
:param pubkeys: The :class:`PubkeyProvider`\ s for this descriptor
:param thresh: The number of keys required to sign this multisig
:param is_sorted: Whether this is a ``sortedmulti()`` descriptor
"""
super().__init__(pubkeys, [], "sortedmulti" if is_sorted else "multi")
if not (1 <= thresh <= len(pubkeys) <= 15):
raise ValueError(f'{thresh=}, {len(pubkeys)=}')
self.thresh = thresh
self.is_sorted = is_sorted
if self.is_sorted:
if not self.is_range():
# sort xpubs using the order of pubkeys
der_pks = [p.get_pubkey_bytes() for p in self.pubkeys]
self.pubkeys = [x[1] for x in sorted(zip(der_pks, self.pubkeys))]
else:
# not possible to sort according to final order in expanded scripts,
# but for easier visual comparison, we do a lexicographical sort
self.pubkeys.sort()
def to_string_no_checksum(self) -> str:
return "{}({},{})".format(self.name, self.thresh, ",".join([p.to_string() for p in self.pubkeys]))
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
der_pks = [p.get_pubkey_bytes(pos=pos) for p in self.pubkeys]
if self.is_sorted:
der_pks.sort()
script = construct_script([self.thresh, *der_pks, len(der_pks), opcodes.OP_CHECKMULTISIG])
return ExpandedScripts(output_script=script)
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner:
if sigdata is None: sigdata = {}
assert not self.is_range()
assert not self.subdescriptors
der_pks = [p.get_pubkey_bytes() for p in self.pubkeys]
if self.is_sorted:
der_pks.sort()
signatures = []
for pubkey in der_pks:
if sig := sigdata.get(pubkey):
signatures.append(sig)
if len(signatures) >= self.thresh:
break
if allow_dummy:
dummy_sig = DUMMY_DER_SIG
signatures += (self.thresh - len(signatures)) * [dummy_sig]
if len(signatures) < self.thresh:
raise MissingSolutionPiece(f"not enough sigs")
assert len(signatures) == self.thresh, f"thresh={self.thresh}, but got {len(signatures)} sigs"
return ScriptSolutionInner(
witness_items=(0, *signatures),
)
def get_satisfaction_progress(self, *, sigdata=None) -> Tuple[int, int]:
if sigdata is None: sigdata = {}
signatures = list(sigdata.values())
return len(signatures), self.thresh
def get_simple_multisig(self) -> Optional['MultisigDescriptor']:
return self
class SHDescriptor(Descriptor):
"""
A descriptor for ``sh()`` descriptors
"""
def __init__(
self,
subdescriptor: 'Descriptor'
) -> None:
"""
:param subdescriptor: The :class:`Descriptor` that is a sub-descriptor for this descriptor
"""
super().__init__([], [subdescriptor], "sh")
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
assert len(self.subdescriptors) == 1
sub_scripts = self.subdescriptors[0].expand(pos=pos)
redeem_script = sub_scripts.output_script
witness_script = sub_scripts.witness_script
script = construct_script([opcodes.OP_HASH160, hash_160(redeem_script), opcodes.OP_EQUAL])
return ExpandedScripts(
output_script=script,
redeem_script=redeem_script,
witness_script=witness_script,
scriptcode_for_sighash=sub_scripts.scriptcode_for_sighash,
)
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner:
raise Exception("does not make sense for sh()")
def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop:
assert not self.is_range()
assert len(self.subdescriptors) == 1
subdesc = self.subdescriptors[0]
redeem_script = self.expand().redeem_script
witness = None
if isinstance(subdesc, (WSHDescriptor, WPKHDescriptor)): # witness_v0 nested in p2sh
witness = subdesc.satisfy(sigdata=sigdata, allow_dummy=allow_dummy).witness
script_sig = construct_script([redeem_script])
else: # legacy p2sh
subsol = subdesc._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy)
script_sig = construct_script([*subsol.witness_items, redeem_script])
return ScriptSolutionTop(
witness=witness,
script_sig=script_sig,
)
class WSHDescriptor(Descriptor):
"""
A descriptor for ``wsh()`` descriptors
"""
def __init__(
self,
subdescriptor: 'Descriptor'
) -> None:
"""
:param subdescriptor: The :class:`Descriptor` that is a sub-descriptor for this descriptor
"""
super().__init__([], [subdescriptor], "wsh")
def expand(self, *, pos: Optional[int] = None) -> "ExpandedScripts":
assert len(self.subdescriptors) == 1
sub_scripts = self.subdescriptors[0].expand(pos=pos)
witness_script = sub_scripts.output_script
output_script = construct_script([0, sha256(witness_script)])
return ExpandedScripts(
output_script=output_script,
witness_script=witness_script,
)
def _satisfy_inner(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionInner:
raise Exception("does not make sense for wsh()")
def satisfy(self, *, sigdata=None, allow_dummy=False) -> ScriptSolutionTop:
assert not self.is_range()
assert len(self.subdescriptors) == 1
subsol = self.subdescriptors[0]._satisfy_inner(sigdata=sigdata, allow_dummy=allow_dummy)
witness_script = self.expand().witness_script
witness = construct_witness([*subsol.witness_items, witness_script])
return ScriptSolutionTop(
witness=witness,
)
def is_segwit(self) -> bool:
return True
class TRDescriptor(Descriptor):
"""
A descriptor for ``tr()`` descriptors
"""
def __init__(
self,
internal_key: 'PubkeyProvider',
subdescriptors: List['Descriptor'] = None,
depths: List[int] = None,
) -> None:
r"""
:param internal_key: The :class:`PubkeyProvider` that is the internal key for this descriptor
:param subdescriptors: The :class:`Descriptor`\ s that are the leaf scripts for this descriptor
:param depths: The depths of the leaf scripts in the same order as `subdescriptors`
"""
if subdescriptors is None:
subdescriptors = []
if depths is None:
depths = []
super().__init__([internal_key], subdescriptors, "tr")
self.depths = depths
def to_string_no_checksum(self) -> str:
r = f"{self.name}({self.pubkeys[0].to_string()}"
path: List[bool] = [] # Track left or right for each depth
for p, depth in enumerate(self.depths):
r += ","
while len(path) <= depth:
if len(path) > 0:
r += "{"
path.append(False)
r += self.subdescriptors[p].to_string_no_checksum()
while len(path) > 0 and path[-1]:
if len(path) > 0:
r += "}"
path.pop()
if len(path) > 0:
path[-1] = True
r += ")"
return r
def is_segwit(self) -> bool:
return True
def _get_func_expr(s: str) -> Tuple[str, str]:
"""
Get the function name and then the expression inside
:param s: The string that begins with a function name
:return: The function name as the first element of the tuple, and the expression contained within the function as the second element
:raises: ValueError: if a matching pair of parentheses cannot be found
"""
start = s.index("(")
end = s.rindex(")")
return s[0:start], s[start + 1:end]
def _get_const(s: str, const: str) -> str:
"""
Get the first character of the string, make sure it is the expected character,
and return the rest of the string
:param s: The string that begins with a constant character
:param const: The constant character
:return: The remainder of the string without the constant character
:raises: ValueError: if the first character is not the constant character
"""
if s[0] != const:
raise ValueError(f"Expected '{const}' but got '{s[0]}'")
return s[1:]
def _get_expr(s: str) -> Tuple[str, str]:
"""
Extract the expression that ``s`` begins with.
This will return the initial part of ``s``, up to the first comma or closing brace,
skipping ones that are surrounded by braces.
:param s: The string to extract the expression from
:return: A pair with the first item being the extracted expression and the second the rest of the string
"""
level: int = 0
for i, c in enumerate(s):
if c in ["(", "{"]:
level += 1
elif level > 0 and c in [")", "}"]:
level -= 1
elif level == 0 and c in [")", "}", ","]:
break
return s[0:i], s[i:]
def parse_pubkey(expr: str, *, ctx: '_ParseDescriptorContext') -> Tuple['PubkeyProvider', str]:
"""
Parses an individual pubkey expression from a string that may contain more than one pubkey expression.
:param expr: The expression to parse a pubkey expression from
:return: The :class:`PubkeyProvider` that is parsed as the first item of a tuple, and the remainder of the expression as the second item.
"""
end = len(expr)
comma_idx = expr.find(",")
next_expr = ""
if comma_idx != -1:
end = comma_idx
next_expr = expr[end + 1:]
pubkey_provider = PubkeyProvider.parse(expr[:end])
permit_uncompressed = ctx in (_ParseDescriptorContext.TOP, _ParseDescriptorContext.P2SH)
if not permit_uncompressed and pubkey_provider.has_uncompressed_pubkey():
raise ValueError("uncompressed pubkeys are not allowed")
return pubkey_provider, next_expr
class _ParseDescriptorContext(Enum):
"""
:meta private:
Enum representing the level that we are in when parsing a descriptor.
Some expressions aren't allowed at certain levels, this helps us track those.
"""
TOP = enum.auto() # The top level, not within any descriptor
P2SH = enum.auto() # Within an sh() descriptor
P2WPKH = enum.auto() # Within wpkh() descriptor
P2WSH = enum.auto() # Within a wsh() descriptor
P2TR = enum.auto() # Within a tr() descriptor
def _parse_descriptor(desc: str, *, ctx: '_ParseDescriptorContext') -> 'Descriptor':
"""
:meta private:
Parse a descriptor given the context level we are in.
Used recursively to parse subdescriptors
:param desc: The descriptor string to parse
:param ctx: The :class:`_ParseDescriptorContext` indicating the level we are in
:return: The parsed descriptor
:raises: ValueError: if the descriptor is malformed
"""
func, expr = _get_func_expr(desc)
if func == "pk":
pubkey, expr = parse_pubkey(expr, ctx=ctx)
if expr:
raise ValueError("more than one pubkey in pk descriptor")
return PKDescriptor(pubkey)
if func == "pkh":
if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH or ctx == _ParseDescriptorContext.P2WSH):
raise ValueError("Can only have pkh at top level, in sh(), or in wsh()")
pubkey, expr = parse_pubkey(expr, ctx=ctx)
if expr:
raise ValueError("More than one pubkey in pkh descriptor")
return PKHDescriptor(pubkey)
if func == "sortedmulti" or func == "multi":
if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH or ctx == _ParseDescriptorContext.P2WSH):
raise ValueError("Can only have multi/sortedmulti at top level, in sh(), or in wsh()")
is_sorted = func == "sortedmulti"
comma_idx = expr.index(",")
thresh = int(expr[:comma_idx])
expr = expr[comma_idx + 1:]
pubkeys = []
while expr:
pubkey, expr = parse_pubkey(expr, ctx=ctx)
pubkeys.append(pubkey)
if len(pubkeys) == 0 or len(pubkeys) > 15:
raise ValueError("Cannot have {} keys in a multisig; must have between 1 and 15 keys, inclusive".format(len(pubkeys)))
elif thresh < 1:
raise ValueError("Multisig threshold cannot be {}, must be at least 1".format(thresh))
elif thresh > len(pubkeys):
raise ValueError("Multisig threshold cannot be larger than the number of keys; threshold is {} but only {} keys specified".format(thresh, len(pubkeys)))
if ctx == _ParseDescriptorContext.TOP and len(pubkeys) > 3:
raise ValueError("Cannot have {} pubkeys in bare multisig: only at most 3 pubkeys")
return MultisigDescriptor(pubkeys, thresh, is_sorted)
if func == "wpkh":
if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH):
raise ValueError("Can only have wpkh() at top level or inside sh()")
pubkey, expr = parse_pubkey(expr, ctx=_ParseDescriptorContext.P2WPKH)
if expr:
raise ValueError("More than one pubkey in pkh descriptor")
return WPKHDescriptor(pubkey)
if func == "sh":
if ctx != _ParseDescriptorContext.TOP:
raise ValueError("Can only have sh() at top level")
subdesc = _parse_descriptor(expr, ctx=_ParseDescriptorContext.P2SH)
return SHDescriptor(subdesc)
if func == "wsh":
if not (ctx == _ParseDescriptorContext.TOP or ctx == _ParseDescriptorContext.P2SH):
raise ValueError("Can only have wsh() at top level or inside sh()")
subdesc = _parse_descriptor(expr, ctx=_ParseDescriptorContext.P2WSH)
return WSHDescriptor(subdesc)
if func == "tr":
if ctx != _ParseDescriptorContext.TOP:
raise ValueError("Can only have tr at top level")
internal_key, expr = parse_pubkey(expr, ctx=ctx)
subscripts = []
depths = []
if expr:
# Path from top of the tree to what we're currently processing.
# branches[i] == False: left branch in the i'th step from the top
# branches[i] == true: right branch
branches = []
while True:
# Process open braces
while True:
try:
expr = _get_const(expr, "{")
branches.append(False)
except ValueError:
break
if len(branches) > MAX_TAPROOT_NODES:
raise ValueError(f"tr() supports at most {MAX_TAPROOT_NODES} nesting levels") # TODO xxxx fixed upstream bug here
# Process script expression
sarg, expr = _get_expr(expr)
subscripts.append(_parse_descriptor(sarg, ctx=_ParseDescriptorContext.P2TR))
depths.append(len(branches))
# Process closing braces
while len(branches) > 0 and branches[-1]:
expr = _get_const(expr, "}")
branches.pop()
# If we're at the end of a left branch, expect a comma
if len(branches) > 0 and not branches[-1]:
expr = _get_const(expr, ",")
branches[-1] = True
if len(branches) == 0:
break
return TRDescriptor(internal_key, subscripts, depths)
if ctx == _ParseDescriptorContext.P2SH:
raise ValueError("A function is needed within P2SH")
elif ctx == _ParseDescriptorContext.P2WSH:
raise ValueError("A function is needed within P2WSH")
raise ValueError("{} is not a valid descriptor function".format(func))
def parse_descriptor(desc: str) -> 'Descriptor':
"""
Parse a descriptor string into a :class:`Descriptor`.
Validates the checksum if one is provided in the string
:param desc: The descriptor string
:return: The parsed :class:`Descriptor`
:raises: ValueError: if the descriptor string is malformed
"""
i = desc.find("#")
if i != -1:
checksum = desc[i + 1:]
desc = desc[:i]
computed = DescriptorChecksum(desc)
if computed != checksum:
raise ValueError("The checksum does not match; Got {}, expected {}".format(checksum, computed))
return _parse_descriptor(desc, ctx=_ParseDescriptorContext.TOP)
#####
def get_singlesig_descriptor_from_legacy_leaf(*, pubkey: str, script_type: str) -> Optional[Descriptor]:
pubkey = PubkeyProvider.parse(pubkey)
if script_type == 'p2pk':
return PKDescriptor(pubkey=pubkey)
elif script_type == 'p2pkh':
return PKHDescriptor(pubkey=pubkey)
elif script_type == 'p2wpkh':
return WPKHDescriptor(pubkey=pubkey)
elif script_type == 'p2wpkh-p2sh':
wpkh = WPKHDescriptor(pubkey=pubkey)
return SHDescriptor(subdescriptor=wpkh)
else:
raise NotImplementedError(f"unexpected {script_type=}")
def create_dummy_descriptor_from_address(addr: Optional[str]) -> 'Descriptor':
# It's not possible to tell the script type in general just from an address.
# - "1" addresses are of course p2pkh
# - "3" addresses are p2sh but we don't know the redeem script...
# - "bc1" addresses (if they are 42-long) are p2wpkh
# - "bc1" addresses that are 62-long are p2wsh but we don't know the script...
# If we don't know the script, we _guess_ it is pubkeyhash.
# As this method is used e.g. for tx size estimation,
# the estimation will not be precise.
def guess_script_type(addr: Optional[str]) -> str:
if addr is None:
return 'p2wpkh' # the default guess
witver, witprog = segwit_addr.decode_segwit_address(constants.net.SEGWIT_HRP, addr)
if witprog is not None:
return 'p2wpkh'
addrtype, hash_160_ = bitcoin.b58_address_to_hash160(addr)
if addrtype == constants.net.ADDRTYPE_P2PKH:
return 'p2pkh'
elif addrtype == constants.net.ADDRTYPE_P2SH:
return 'p2wpkh-p2sh'
raise Exception(f'unrecognized address: {repr(addr)}')
script_type = guess_script_type(addr)
# guess pubkey-len to be 33-bytes:
pubkey = ecc.GENERATOR.get_public_key_bytes(compressed=True).hex()
desc = get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=script_type)
return desc