# Some parts of this code are adapted from bitcoin-core/HWI: # https://github.com/bitcoin-core/HWI/blob/e731395bde13362950e9f13e01689c475545e4dc/hwilib/devices/ledger.py from abc import ABC, abstractmethod import base64 import hashlib from typing import Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union import electrum_ecc as ecc from electrum import bip32, constants from electrum import descriptor from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath, normalize_bip32_derivation from electrum.bitcoin import EncodeBase58Check, is_b58_address, is_segwit_script_type, var_int from electrum.crypto import hash_160 from electrum.i18n import _ from electrum.keystore import Hardware_KeyStore from electrum.logging import get_logger from electrum.plugin import Device, runs_in_hwd_thread from electrum.transaction import PartialTransaction, Transaction, PartialTxInput from electrum.util import bfh, UserFacingException, versiontuple from electrum.wallet import Standard_Wallet from electrum.hw_wallet import HardwareClientBase, HW_PluginBase from electrum.hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable from electrum.hw_wallet.plugin import HardwareClientDummy if TYPE_CHECKING: from electrum.plugin import DeviceInfo from electrum.wizard import NewWalletWizard _logger = get_logger(__name__) try: import ledger_bitcoin from ledger_bitcoin import WalletPolicy, MultisigWallet, AddressType, Chain from ledger_bitcoin.exception.errors import DenyError, NotSupportedError, SecurityStatusNotSatisfiedError from ledger_bitcoin.key import KeyOriginInfo from ledgercomm.interfaces.hid_device import HID # legacy imports import hid from ledger_bitcoin.btchip.btchipComm import HIDDongleHIDAPI from ledger_bitcoin.btchip.btchip import btchip from ledger_bitcoin.btchip.btchipUtils import compress_public_key from ledger_bitcoin.btchip.bitcoinTransaction import bitcoinTransaction from ledger_bitcoin.btchip.btchipException import BTChipException LEDGER_BITCOIN = True except ImportError as e: if not (isinstance(e, ModuleNotFoundError) and e.name == 'ledger_bitcoin'): _logger.exception('error importing ledger plugin deps') LEDGER_BITCOIN = False MSG_NEEDS_FW_UPDATE_GENERIC = _('Firmware version too old. Please update at') + \ ' https://www.ledger.com' MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for Segwit support. Please update at') + \ ' https://www.ledger.com' MULTI_OUTPUT_SUPPORT = '1.1.4' SEGWIT_SUPPORT = '1.1.10' SEGWIT_SUPPORT_SPECIAL = '1.0.4' SEGWIT_TRUSTEDINPUTS = '1.4.0' def is_policy_standard(wp: 'WalletPolicy', fpr: bytes, exp_coin_type: int) -> bool: """Returns True if the wallet policy can be used without registration.""" if wp.name != "" or wp.n_keys != 1: return False key_info = wp.keys_info[0] if key_info[0] != '[': # no key origin info return False try: key_orig_end = key_info.index(']') except ValueError: # invalid key_info return False key_fpr, key_path = key_info[1:key_orig_end].split('/', maxsplit=1) if key_fpr != fpr.hex(): # not an internal key return False key_path_parts = key_path.split('/') # Account key should be exactly 3 hardened derivation steps if len(key_path_parts) != 3 or any(part[-1] != "'" for part in key_path_parts): return False purpose, coin_type, account_index = key_path_parts if coin_type != f"{exp_coin_type}'" or int(account_index[:-1]) > 100: return False if wp.descriptor_template == "pkh(@0/**)": # BIP-44 return purpose == "44'" elif wp.descriptor_template == "sh(wpkh(@0/**))": # BIP-49, nested SegWit return purpose == "49'" elif wp.descriptor_template == "wpkh(@0/**)": # BIP-84, native SegWit return purpose == "84'" elif wp.descriptor_template == "tr(@0/**)": # BIP-86, taproot single key return purpose == "86'" else: # unknown return False def convert_xpub(xpub: str, xtype='standard') -> str: bip32node = BIP32Node.from_xkey(xpub) return BIP32Node( xtype=xtype, eckey=bip32node.eckey, chaincode=bip32node.chaincode, depth=bip32node.depth, fingerprint=bip32node.fingerprint, child_number=bip32node.child_number).to_xpub() def test_pin_unlocked(func): """Function decorator to test the Ledger for being unlocked, and if not, raise a human-readable exception. """ def catch_exception(self, *args, **kwargs): try: return func(self, *args, **kwargs) except SecurityStatusNotSatisfiedError: raise UserFacingException(_('Your Ledger is locked. Please unlock it.')) return catch_exception # from HWI def is_witness(script: bytes) -> Tuple[bool, int, bytes]: """ Determine whether a script is a segwit output script. If so, also returns the witness version and witness program. :param script: The script :returns: A tuple of a bool indicating whether the script is a segwit output script, an int representing the witness version, and the bytes of the witness program. """ if len(script) < 4 or len(script) > 42: return (False, 0, b"") if script[0] != 0 and (script[0] < 81 or script[0] > 96): return (False, 0, b"") if script[1] + 2 == len(script): return (True, script[0] - 0x50 if script[0] else 0, script[2:]) return (False, 0, b"") # from HWI # Only handles up to 15 of 15. Returns None if this script is not a # multisig script. Returns (m, pubkeys) otherwise. def parse_multisig(script: bytes) -> Optional[Tuple[int, Sequence[bytes]]]: """ Determine whether a script is a multisig script. If so, determine the parameters of that multisig. :param script: The script :returns: ``None`` if the script is not multisig. If multisig, returns a tuple of the number of signers required, and a sequence of public key bytes. """ # Get m m = script[0] - 80 if m < 1 or m > 15: return None # Get pubkeys pubkeys = [] offset = 1 while True: pubkey_len = script[offset] if pubkey_len != 33: break offset += 1 pubkeys.append(script[offset:offset + 33]) offset += 33 # Check things at the end n = script[offset] - 80 if n != len(pubkeys): return None offset += 1 op_cms = script[offset] if op_cms != 174: return None return (m, pubkeys) HARDENED_FLAG = 1 << 31 def H_(x: int) -> int: """ Shortcut function that "hardens" a number in a BIP44 path. """ return x | HARDENED_FLAG def is_hardened(i: int) -> bool: """ Returns whether an index is hardened """ return i & HARDENED_FLAG != 0 def get_bip44_purpose(addrtype: 'AddressType') -> int: """ Determine the BIP 44 purpose based on the given :class:`~hwilib.common.AddressType`. :param addrtype: The address type """ if addrtype == AddressType.LEGACY: return 44 elif addrtype == AddressType.SH_WIT: return 49 elif addrtype == AddressType.WIT: return 84 elif addrtype == AddressType.TAP: return 86 else: raise ValueError("Unknown address type") def get_bip44_chain(chain: 'Chain') -> int: """ Determine the BIP 44 coin type based on the Bitcoin chain type. For the Bitcoin mainnet chain, this returns 0. For the other chains, this returns 1. :param chain: The chain """ if chain == Chain.MAIN: return 0 else: return 1 def get_addrtype_from_bip44_purpose(index: int) -> Optional['AddressType']: purpose = index & ~HARDENED_FLAG if purpose == 44: return AddressType.LEGACY elif purpose == 49: return AddressType.SH_WIT elif purpose == 84: return AddressType.WIT elif purpose == 86: return AddressType.TAP else: return None def is_standard_path( path: Sequence[int], addrtype: 'AddressType', chain: 'Chain', ) -> bool: if len(path) != 5: return False if not is_hardened(path[0]) or not is_hardened(path[1]) or not is_hardened(path[2]): return False if is_hardened(path[3]) or is_hardened(path[4]): return False computed_addrtype = get_addrtype_from_bip44_purpose(path[0]) if computed_addrtype is None: return False if computed_addrtype != addrtype: return False if path[1] != H_(get_bip44_chain(chain)): return False if path[3] not in [0, 1]: return False return True def get_chain() -> 'Chain': if constants.net.NET_NAME == "mainnet": return Chain.MAIN elif constants.net.NET_NAME == "testnet": return Chain.TEST elif constants.net.NET_NAME == "regtest": return Chain.REGTEST else: raise ValueError("Unsupported network") class Ledger_Client(HardwareClientBase, ABC): is_legacy: bool @staticmethod def construct_new( *args, device: Device, plugin: 'LedgerPlugin', **kwargs, ) -> Union['Ledger_Client', HardwareClientDummy]: """The 'real' constructor, that automatically decides which subclass to use.""" if LedgerPlugin.is_hw1(device.product_key): return HardwareClientDummy( plugin=plugin, error_text="ledger hw.1 devices are no longer supported", ) # for nano S or newer hw, decide which client impl to use based on software/firmware version: hid_device = HID() hid_device.path = device.path hid_device.open() transport = ledger_bitcoin.TransportClient('hid', hid=hid_device) try: cl = ledger_bitcoin.createClient(transport, chain=get_chain()) except (ledger_bitcoin.exception.errors.InsNotSupportedError, ledger_bitcoin.exception.errors.ClaNotSupportedError) as e: # This can happen on very old versions. # E.g. with a "nano s", with bitcoin app 1.1.10, SE 1.3.1, MCU 1.0, # - on machine one, ghost43 got InsNotSupportedError # - on machine two, thomasv got ClaNotSupportedError # unclear why the different exceptions, ledger_bitcoin version 0.2.1 in both cases _logger.info(f"ledger_bitcoin.createClient() got exc: {e}. falling back to old plugin.") cl = None if isinstance(cl, ledger_bitcoin.client.NewClient): _logger.debug(f"Ledger_Client.construct_new(). creating NewClient for {device=}.") return Ledger_Client_New(hid_device, *args, plugin=plugin, **kwargs) else: _logger.debug(f"Ledger_Client.construct_new(). creating LegacyClient for {device=}.") return Ledger_Client_Legacy(hid_device, *args, plugin=plugin, **kwargs) def __init__(self, *, plugin: HW_PluginBase): HardwareClientBase.__init__(self, plugin=plugin) def get_master_fingerprint(self) -> bytes: return self.request_root_fingerprint_from_device() @abstractmethod def show_address(self, address_path: str, txin_type: str): pass @abstractmethod def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str): pass @abstractmethod def sign_message( self, address_path: str, message: str, password, *, script_type: Optional[str] = None, ) -> bytes: pass class Ledger_Client_Legacy(Ledger_Client): """Client based on the bitchip library, targeting versions 2.0.* and below.""" is_legacy = True def __init__(self, hidDevice: 'HID', *, product_key: Tuple[int, int], plugin: HW_PluginBase): Ledger_Client.__init__(self, plugin=plugin) # Hack, we close the old object and instantiate a new one hidDevice.close() dev = hid.device() dev.open_path(hidDevice.path) dev.set_nonblocking(True) self.dongleObject = btchip(HIDDongleHIDAPI(dev, True, False)) self.signing = False self._product_key = product_key self._soft_device_id = None def is_pairable(self): return True def set_and_unset_signing(func): """Function decorator to set and unset self.signing.""" def wrapper(self, *args, **kwargs): try: self.signing = True return func(self, *args, **kwargs) finally: self.signing = False return wrapper def give_error(self, message): _logger.info(message) if not self.signing: self.handler.show_error(message) else: self.signing = False raise UserFacingException(message) @runs_in_hwd_thread def close(self): self.dongleObject.dongle.close() def is_initialized(self): return True @runs_in_hwd_thread def get_soft_device_id(self): if self._soft_device_id is None: # modern ledger can provide xpub without user interaction # (hw1 would prompt for PIN) if not self.is_hw1(): self._soft_device_id = self.request_root_fingerprint_from_device() return self._soft_device_id def is_hw1(self) -> bool: return LedgerPlugin.is_hw1(self._product_key) def device_model_name(self): return LedgerPlugin.device_name_from_product_key(self._product_key) @runs_in_hwd_thread def has_usable_connection_with_device(self): try: self.dongleObject.getFirmwareVersion() except BaseException: return False return True @runs_in_hwd_thread @test_pin_unlocked def get_xpub(self, bip32_path, xtype): self.checkDevice() # bip32_path is of the form 44'/0'/1' # S-L-O-W - we don't handle the fingerprint directly, so compute # it manually from the previous node # This only happens once so it's bearable # self.get_client() # prompt for the PIN before displaying the dialog if necessary # self.handler.show_message("Computing master public key") if xtype in ['p2wpkh', 'p2wsh'] and not self.supports_native_segwit(): raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT) if xtype in ['p2wpkh-p2sh', 'p2wsh-p2sh'] and not self.supports_segwit(): raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT) bip32_path = bip32.normalize_bip32_derivation(bip32_path, hardened_char="'") bip32_intpath = bip32.convert_bip32_strpath_to_intpath(bip32_path) bip32_path = bip32_path[2:] # cut off "m/" if len(bip32_intpath) >= 1: prevPath = bip32.convert_bip32_intpath_to_strpath(bip32_intpath[:-1])[2:] nodeData = self.dongleObject.getWalletPublicKey(prevPath) publicKey = compress_public_key(nodeData['publicKey']) fingerprint_bytes = hash_160(publicKey)[0:4] childnum_bytes = bip32_intpath[-1].to_bytes(length=4, byteorder="big") else: fingerprint_bytes = bytes(4) childnum_bytes = bytes(4) nodeData = self.dongleObject.getWalletPublicKey(bip32_path) publicKey = compress_public_key(nodeData['publicKey']) depth = len(bip32_intpath) return BIP32Node(xtype=xtype, eckey=ecc.ECPubkey(bytes(publicKey)), chaincode=nodeData['chainCode'], depth=depth, fingerprint=fingerprint_bytes, child_number=childnum_bytes).to_xpub() def has_detached_pin_support(self, client: 'btchip'): try: client.getVerifyPinRemainingAttempts() return True except BTChipException as e: if e.sw == 0x6d00: return False raise e def is_pin_validated(self, client: 'btchip'): try: # Invalid SET OPERATION MODE to verify the PIN status client.dongle.exchange(bytearray([0xe0, 0x26, 0x00, 0x00, 0x01, 0xAB])) except BTChipException as e: if (e.sw == 0x6982): return False if (e.sw == 0x6A80): return True raise e def supports_multi_output(self): return self.multiOutputSupported def supports_segwit(self): return self.segwitSupported def supports_native_segwit(self): return self.nativeSegwitSupported def supports_segwit_trustedInputs(self): return self.segwitTrustedInputs @runs_in_hwd_thread def checkDevice(self): firmwareInfo = self.dongleObject.getFirmwareVersion() firmware = firmwareInfo['version'] self.multiOutputSupported = versiontuple(firmware) >= versiontuple(MULTI_OUTPUT_SUPPORT) self.nativeSegwitSupported = versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT) self.segwitSupported = self.nativeSegwitSupported or (firmwareInfo['specialVersion'] == 0x20 and versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT_SPECIAL)) self.segwitTrustedInputs = versiontuple(firmware) >= versiontuple(SEGWIT_TRUSTEDINPUTS) def password_dialog(self, msg=None): response = self.handler.get_word(msg) if response is None: return False, None, None return True, response, response @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing def show_address(self, address_path: str, txin_type: str): self.handler.show_message(_("Showing address ...")) segwit = is_segwit_script_type(txin_type) segwitNative = txin_type == 'p2wpkh' try: self.dongleObject.getWalletPublicKey(address_path, showOnScreen=True, segwit=segwit, segwitNative=segwitNative) except BTChipException as e: if e.sw == 0x6985: # cancelled by user pass elif e.sw == 0x6982: raise # pin lock. decorator will catch it elif e.sw == 0x6b00: # hw.1 raises this self.handler.show_error('{}\n{}\n{}'.format( _('Error showing address') + ':', e, _('Your device might not have support for this functionality.'))) else: _logger.exception('') self.handler.show_error(e) except BaseException as e: _logger.exception('') self.handler.show_error(e) finally: self.handler.finished() @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str): if tx.is_complete(): return inputs = [] inputsPaths = [] chipInputs = [] redeemScripts = [] changePath = "" p2shTransaction = False segwitTransaction = False pin = "" # prompt for the PIN before displaying the dialog if necessary def is_txin_legacy_multisig(txin: PartialTxInput) -> bool: desc = txin.script_descriptor return (isinstance(desc, descriptor.SHDescriptor) and isinstance(desc.subdescriptors[0], descriptor.MultisigDescriptor)) # Fetch inputs of the transaction to sign for txin in tx.inputs(): if txin.is_coinbase_input(): self.give_error("Coinbase not supported") # should never happen if is_txin_legacy_multisig(txin): p2shTransaction = True if txin.is_p2sh_segwit(): if not self.supports_segwit(): self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT) segwitTransaction = True if txin.is_native_segwit(): if not self.supports_native_segwit(): self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT) segwitTransaction = True my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin) if not full_path: self.give_error("No matching pubkey for sign_transaction") # should never happen full_path = convert_bip32_intpath_to_strpath(full_path)[2:] redeemScript = txin.get_scriptcode_for_sighash().hex() txin_prev_tx = txin.utxo if txin_prev_tx is None and not txin.is_segwit(): raise UserFacingException(_('Missing previous tx for legacy input.')) txin_prev_tx_raw = txin_prev_tx.serialize() if txin_prev_tx else None inputs.append([txin_prev_tx_raw, txin.prevout.out_idx, redeemScript, txin.prevout.txid.hex(), my_pubkey, txin.nsequence, txin.value_sats()]) inputsPaths.append(full_path) # Sanity check if p2shTransaction: for txin in tx.inputs(): if not is_txin_legacy_multisig(txin): self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen if not self.supports_multi_output(): if len(tx.outputs()) > 2: self.give_error("Transaction with more than 2 outputs not supported") for txout in tx.outputs(): if not txout.address: # note: max_size based on https://github.com/LedgerHQ/ledger-app-btc/commit/3a78dee9c0484821df58975803e40d58fbfc2c38#diff-c61ccd96a6d8b54d48f54a3bc4dfa7e2R26 validate_op_return_output(txout, max_size=190) # Output "change" detection # - at most one output can bypass confirmation (~change) if not p2shTransaction: has_change = False any_output_on_change_branch = is_any_tx_output_on_change_branch(tx) for txout in tx.outputs(): if txout.is_mine and len(tx.outputs()) > 1 \ and not has_change: # prioritise hiding outputs on the 'change' branch from user # because no more than one change address allowed if txout.is_change == any_output_on_change_branch: my_pubkey, changePath = keystore.find_my_pubkey_in_txinout(txout) assert changePath changePath = convert_bip32_intpath_to_strpath(changePath)[2:] has_change = True try: # Get trusted inputs from the original transactions for input_idx, utxo in enumerate(inputs): self.handler.show_message(_("Preparing transaction inputs...") + f" (phase1, {input_idx}/{len(inputs)})") sequence = int.to_bytes(utxo[5], length=4, byteorder="little", signed=False).hex() if segwitTransaction and not self.supports_segwit_trustedInputs(): tmp = bfh(utxo[3])[::-1] tmp += int.to_bytes(utxo[1], length=4, byteorder="little", signed=False) tmp += int.to_bytes(utxo[6], length=8, byteorder="little", signed=False) # txin['value'] chipInputs.append({'value': tmp, 'witness': True, 'sequence': sequence}) redeemScripts.append(bfh(utxo[2])) elif (not p2shTransaction) or self.supports_multi_output(): txtmp = bitcoinTransaction(bfh(utxo[0])) trustedInput = self.dongleObject.getTrustedInput(txtmp, utxo[1]) trustedInput['sequence'] = sequence if segwitTransaction: trustedInput['witness'] = True chipInputs.append(trustedInput) if p2shTransaction or segwitTransaction: redeemScripts.append(bfh(utxo[2])) else: redeemScripts.append(txtmp.outputs[utxo[1]].script) else: tmp = bfh(utxo[3])[::-1] tmp += int.to_bytes(utxo[1], length=4, byteorder="little", signed=False) chipInputs.append({'value': tmp, 'sequence': sequence}) redeemScripts.append(bfh(utxo[2])) self.handler.show_message(_("Confirm Transaction on your Ledger device...")) # Sign all inputs firstTransaction = True inputIndex = 0 rawTx = tx.serialize_to_network(include_sigs=False) if segwitTransaction: self.dongleObject.startUntrustedTransaction(True, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version) # we don't set meaningful outputAddress, amount and fees # as we only care about the alternateEncoding==True branch outputData = self.dongleObject.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) while inputIndex < len(inputs): self.handler.show_message(_("Signing transaction...") + f" (phase2, {inputIndex}/{len(inputs)})") singleInput = [chipInputs[inputIndex]] self.dongleObject.startUntrustedTransaction(False, 0, singleInput, redeemScripts[inputIndex], version=tx.version) inputSignature = self.dongleObject.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) inputSignature[0] = 0x30 # force for 1.4.9+ my_pubkey = inputs[inputIndex][4] tx.add_signature_to_txin(txin_idx=inputIndex, signing_pubkey=my_pubkey, sig=inputSignature) inputIndex = inputIndex + 1 else: while inputIndex < len(inputs): self.handler.show_message(_("Signing transaction...") + f" (phase2, {inputIndex}/{len(inputs)})") self.dongleObject.startUntrustedTransaction(firstTransaction, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version) # we don't set meaningful outputAddress, amount and fees # as we only care about the alternateEncoding==True branch outputData = self.dongleObject.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) # Sign input with the provided PIN inputSignature = self.dongleObject.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) inputSignature[0] = 0x30 # force for 1.4.9+ my_pubkey = inputs[inputIndex][4] tx.add_signature_to_txin(txin_idx=inputIndex, signing_pubkey=my_pubkey, sig=inputSignature) inputIndex = inputIndex + 1 firstTransaction = False except UserWarning: self.handler.show_error(_('Cancelled by user')) return except BTChipException as e: if e.sw in (0x6985, 0x6d00): # cancelled by user return elif e.sw == 0x6982: raise # pin lock. decorator will catch it else: _logger.exception('') self.give_error(e) except BaseException as e: _logger.exception('') self.give_error(e) finally: self.handler.finished() @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing def sign_message( self, address_path: str, message: str, password, *, script_type: Optional[str] = None, ) -> bytes: message = message.encode('utf8') message_hash = hashlib.sha256(message).hexdigest().upper() self.handler.show_message("Signing message ...\r\nMessage hash: " + message_hash) try: info = self.dongleObject.signMessagePrepare(address_path, message) pin = "" signature = self.dongleObject.signMessageSign(pin) except BTChipException as e: if e.sw == 0x6a80: self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. " "Only alphanumerical messages shorter than 140 characters are supported. " "Please remove any extra characters (tab, carriage return) and retry.") elif e.sw == 0x6985: # cancelled by user return b'' elif e.sw == 0x6982: raise # pin lock. decorator will catch it else: self.give_error(e) except UserWarning: self.handler.show_error(_('Cancelled by user')) return b'' except Exception as e: self.give_error(e) finally: self.handler.finished() # Parse the ASN.1 signature rLength = signature[3] r = signature[4: 4 + rLength] sLength = signature[4 + rLength + 1] s = signature[4 + rLength + 2:] if rLength == 33: r = r[1:] if sLength == 33: s = s[1:] # And convert it # Pad r and s points with 0x00 bytes when the point is small to get valid signature. r_padded = bytes([0x00]) * (32 - len(r)) + r s_padded = bytes([0x00]) * (32 - len(s)) + s return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded class Ledger_Client_New(Ledger_Client): """Client based on the ledger_bitcoin library, targeting versions 2.1.* and above.""" is_legacy = False def __init__(self, hidDevice: 'HID', *, product_key: Tuple[int, int], plugin: HW_PluginBase): Ledger_Client.__init__(self, plugin=plugin) transport = ledger_bitcoin.TransportClient('hid', hid=hidDevice) self.client = ledger_bitcoin.client.NewClient(transport, get_chain()) self._product_key = product_key self._soft_device_id = None self.master_fingerprint = None self._known_xpubs: Dict[str, str] = {} # path ==> xpub self._registered_policies: Dict[bytes, bytes] = {} # wallet id => wallet hmac def is_pairable(self): return True @runs_in_hwd_thread def close(self): self.client.stop() def is_initialized(self): return True @runs_in_hwd_thread def get_soft_device_id(self): if self._soft_device_id is None: self._soft_device_id = self.request_root_fingerprint_from_device() return self._soft_device_id def device_model_name(self): return LedgerPlugin.device_name_from_product_key(self._product_key) @runs_in_hwd_thread def has_usable_connection_with_device(self): try: self.client.get_version() except BaseException: return False return True @runs_in_hwd_thread @test_pin_unlocked def get_xpub(self, bip32_path: str, xtype): # try silently first; if not a standard path, repeat with on-screen display bip32_path = normalize_bip32_derivation(bip32_path, hardened_char="'") # cache known path/xpubs combinations in order to avoid requesting them many times if bip32_path in self._known_xpubs: xpub = self._known_xpubs[bip32_path] else: try: xpub = self.client.get_extended_pubkey(bip32_path) except NotSupportedError: xpub = self.client.get_extended_pubkey(bip32_path, True) self._known_xpubs[bip32_path] = xpub # Ledger always returns 'standard' xpubs; convert to the right xtype return convert_xpub(xpub, xtype) @runs_in_hwd_thread def request_root_fingerprint_from_device(self) -> str: return self.client.get_master_fingerprint().hex() @runs_in_hwd_thread @test_pin_unlocked def get_master_fingerprint(self) -> bytes: if self.master_fingerprint is None: self.master_fingerprint = self.client.get_master_fingerprint() return self.master_fingerprint @runs_in_hwd_thread @test_pin_unlocked def get_singlesig_default_wallet_policy(self, addr_type: 'AddressType', account: int) -> 'WalletPolicy': assert account >= HARDENED_FLAG if addr_type == AddressType.LEGACY: template = "pkh(@0/**)" elif addr_type == AddressType.WIT: template = "wpkh(@0/**)" elif addr_type == AddressType.SH_WIT: template = "sh(wpkh(@0/**))" elif addr_type == AddressType.TAP: template = "tr(@0/**)" else: raise ValueError("Unknown address type") fpr = self.get_master_fingerprint() key_origin_steps = f"{get_bip44_purpose(addr_type)}'/{get_bip44_chain(self.client.chain)}'/{account & ~HARDENED_FLAG}'" xpub = self.get_xpub(f"m/{key_origin_steps}", 'standard') key_str = f"[{fpr.hex()}/{key_origin_steps}]{xpub}" # Make the Wallet object return WalletPolicy(name="", descriptor_template=template, keys_info=[key_str]) @runs_in_hwd_thread @test_pin_unlocked def get_singlesig_policy_for_path(self, path: str, xtype: str, master_fp: bytes) -> Optional['WalletPolicy']: path = path.replace("h", "'") path_parts = path.split("/") if not 5 <= len(path_parts) <= 6: raise UserFacingException(_('Unsupported derivation path: {}').format(path)) path_root = "/".join(path_parts[:-2]) fpr = self.get_master_fingerprint() # Ledger always uses standard xpubs in wallet policies xpub = self.get_xpub(f"m/{path_root}", 'standard') key_info = f"[{fpr.hex()}/{path_root}]{xpub}" if xtype == 'p2pkh': name = "Legacy P2PKH" descriptor_template = "pkh(@0/**)" elif xtype == 'p2wpkh-p2sh': name = "Nested SegWit" descriptor_template = "sh(wpkh(@0/**))" elif xtype == 'p2wpkh': name = "SegWit" descriptor_template = "wpkh(@0/**)" elif xtype == 'p2tr': name = "Taproot" descriptor_template = "tr(@0/**)" else: return None policy = WalletPolicy("", descriptor_template, [key_info]) if is_policy_standard(policy, master_fp, constants.net.BIP44_COIN_TYPE): return policy # Non standard policy, so give it a name return WalletPolicy(name, descriptor_template, [key_info]) def password_dialog(self, msg=None): response = self.handler.get_word(msg) if response is None: return False, None, None return True, response, response def _register_policy_if_needed(self, wallet_policy: 'WalletPolicy') -> Tuple[bytes, bytes]: # If the policy is not register, registers it and saves its hmac on success # Returns the pair of wallet id and wallet hmac if wallet_policy.id not in self._registered_policies: wallet_id, wallet_hmac = self.client.register_wallet(wallet_policy) assert wallet_id == wallet_policy.id self._registered_policies[wallet_id] = wallet_hmac return wallet_policy.id, self._registered_policies[wallet_policy.id] @runs_in_hwd_thread @test_pin_unlocked def show_address(self, address_path: str, txin_type: str): client_ledger = self.client self.handler.show_message(_("Showing address ...")) # TODO: generalize for multisignature try: master_fp = client_ledger.get_master_fingerprint() wallet_policy = self.get_singlesig_policy_for_path(address_path, txin_type, master_fp) change, addr_index = [int(i) for i in address_path.split("/")[-2:]] wallet_hmac = None if not is_policy_standard(wallet_policy, master_fp, constants.net.BIP44_COIN_TYPE): wallet_id, wallet_hmac = self._register_policy_if_needed(wallet_policy) self.client.get_wallet_address(wallet_policy, wallet_hmac, change, addr_index, True) except DenyError: pass # cancelled by user except BaseException as e: _logger.exception('Error while showing an address') self.handler.show_error(e) finally: self.handler.finished() @runs_in_hwd_thread @test_pin_unlocked def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str): if tx.is_complete(): return # mostly adapted from HWI psbt_bytes = tx.serialize_as_bytes() psbt = ledger_bitcoin.client.PSBT() psbt.deserialize(base64.b64encode(psbt_bytes).decode('ascii')) try: master_fp = self.client.get_master_fingerprint() # Figure out which wallets are signing wallets: Dict[bytes, Tuple[AddressType, WalletPolicy, Optional[bytes]]] = {} for input_num, (electrum_txin, psbt_in) in enumerate(zip(tx.inputs(), psbt.inputs)): if electrum_txin.is_coinbase_input(): raise UserFacingException(_('Coinbase not supported')) # should never happen utxo = None if psbt_in.witness_utxo: utxo = psbt_in.witness_utxo if psbt_in.non_witness_utxo: if psbt_in.prev_txid != psbt_in.non_witness_utxo.hash: raise UserFacingException(_('Input {} has a non_witness_utxo with the wrong hash').format(input_num)) assert psbt_in.prev_out is not None utxo = psbt_in.non_witness_utxo.vout[psbt_in.prev_out] if utxo is None: continue if (desc := electrum_txin.script_descriptor) is None: raise Exception("script_descriptor missing for txin ") scriptcode = desc.expand().scriptcode_for_sighash is_wit, wit_ver, __ = is_witness(psbt_in.redeem_script or utxo.scriptPubKey) script_addrtype = AddressType.LEGACY if is_wit: # if it's a segwit spend (any version), make sure the witness_utxo is also present psbt_in.witness_utxo = utxo if electrum_txin.is_p2sh_segwit(): if wit_ver == 0: script_addrtype = AddressType.SH_WIT else: raise UserFacingException(_('Cannot have witness v1+ in p2sh')) else: if wit_ver == 0: script_addrtype = AddressType.WIT elif wit_ver == 1: script_addrtype = AddressType.TAP else: continue multisig = parse_multisig(scriptcode) if multisig is not None: k, ms_pubkeys = multisig # Figure out the parent xpubs key_exprs: List[str] = [] ok = True our_keys = 0 for pub in ms_pubkeys: if pub in psbt_in.hd_keypaths: pk_origin = psbt_in.hd_keypaths[pub] if pk_origin.fingerprint == master_fp: our_keys += 1 for xpub_bytes, xpub_origin in psbt.xpub.items(): xpub_str = EncodeBase58Check(xpub_bytes) if (xpub_origin.fingerprint == pk_origin.fingerprint) and (xpub_origin.path == pk_origin.path[:len(xpub_origin.path)]): key_origin_full = pk_origin.to_string().replace('h', '\'') # strip last two steps of derivation key_origin_parts = key_origin_full.split('/') if len(key_origin_parts) < 3: raise UserFacingException(_('Unable to sign this transaction')) key_origin = '/'.join(key_origin_parts[:-2]) key_exprs.append(f"[{key_origin}]{xpub_str}") break else: # No xpub, Ledger will not accept this multisig ok = False if not ok: continue # Electrum uses sortedmulti; we make sure that the array of key information is normalized in a consistent order key_exprs = list(sorted(key_exprs)) # Make and register the MultisigWallet msw = MultisigWallet(f"{k} of {len(key_exprs)} Multisig", script_addrtype, k, key_exprs) msw_id = msw.id if msw_id not in wallets: __, registered_hmac = self._register_policy_if_needed(msw) wallets[msw_id] = ( script_addrtype, msw, registered_hmac, ) else: def process_origin(origin: KeyOriginInfo, *, script_addrtype=script_addrtype) -> None: if is_standard_path(origin.path, script_addrtype, get_chain()): # these policies do not need to be registered policy = self.get_singlesig_default_wallet_policy(script_addrtype, origin.path[2]) wallets[policy.id] = ( script_addrtype, self.get_singlesig_default_wallet_policy(script_addrtype, origin.path[2]), None, # Wallet hmac ) else: # register the policy if script_addrtype == AddressType.LEGACY: name = "Legacy" template = "pkh(@0/**)" elif script_addrtype == AddressType.WIT: name = "Native SegWit" template = "wpkh(@0/**)" elif script_addrtype == AddressType.SH_WIT: name = "Nested SegWit" template = "sh(wpkh(@0/**))" elif script_addrtype == AddressType.TAP: name = "Taproot" template = "tr(@0/**)" else: raise ValueError("Unknown address type") key_origin_info = origin.to_string() key_origin_steps = key_origin_info.replace('h', '\'').split('/')[1:] if len(key_origin_steps) < 3: # Skip this input, not able to sign return # remove the last two steps account_key_origin = "/".join(key_origin_steps[:-2]) # get the account-level xpub xpub = self.get_xpub(f"m/{account_key_origin}", 'standard') key_str = f"[{master_fp.hex()}/{account_key_origin}]{xpub}" policy = WalletPolicy(name, template, [key_str]) __, registered_hmac = self.client.register_wallet(policy) wallets[policy.id] = ( script_addrtype, policy, registered_hmac, ) for key, origin in psbt_in.hd_keypaths.items(): if origin.fingerprint == master_fp: process_origin(origin) for key, (__, origin) in psbt_in.tap_bip32_paths.items(): # TODO: Support script path signing if key == psbt_in.tap_internal_key and origin.fingerprint == master_fp: process_origin(origin) self.handler.show_message(_("Confirm Transaction on your Ledger device...")) if len(wallets) == 0: # Could not find a WalletPolicy to sign with raise UserFacingException(_('Unable to sign this transaction')) # For each wallet, sign for __, (__, wallet, wallet_hmac) in wallets.items(): input_sigs = self.client.sign_psbt(psbt, wallet, wallet_hmac) for idx, part_sig in input_sigs: tx.add_signature_to_txin( txin_idx=idx, signing_pubkey=part_sig.pubkey, sig=part_sig.signature) except DenyError: pass # cancelled by user except BaseException as e: _logger.exception('Error while signing') self.handler.show_error(e) finally: self.handler.finished() @runs_in_hwd_thread @test_pin_unlocked def sign_message( self, address_path: str, message: str, password, *, script_type: Optional[str] = None, ) -> bytes: message = message.encode('utf8') message_hash = hashlib.sha256(message).hexdigest().upper() # prompt for the PIN before displaying the dialog if necessary self.handler.show_message("Signing message ...\r\nMessage hash: " + message_hash) result = b'' try: sig_str = self.client.sign_message(message, address_path) result = base64.b64decode(sig_str, validate=True) except DenyError: pass # cancelled by user except BaseException as e: _logger.exception('') self.handler.show_error(e) finally: self.handler.finished() return result class Ledger_KeyStore(Hardware_KeyStore): """Ledger keystore. Targets all versions, will have different behavior with different clients.""" hw_type = 'ledger' device = 'Ledger' plugin: 'LedgerPlugin' def __init__(self, d): Hardware_KeyStore.__init__(self, d) self.cfg = d.get('cfg', {'mode': 0}) def dump(self): obj = Hardware_KeyStore.dump(self) obj['cfg'] = self.cfg return obj def get_client_dongle_object(self, *, client: Optional[Ledger_Client] = None) -> Ledger_Client: if client is None: client = self.get_client() return client def decrypt_message(self, pubkey, message, password): raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device)) def sign_message(self, sequence, *args, **kwargs): address_path = self.get_derivation_prefix() + "/%d/%d" % sequence address_path = normalize_bip32_derivation(address_path, hardened_char="'") address_path = address_path[2:] # cut m/ return self.get_client_dongle_object().sign_message(address_path, *args, **kwargs) def sign_transaction(self, *args, **kwargs): return self.get_client_dongle_object().sign_transaction(self, *args, **kwargs) def show_address(self, sequence, *args, **kwargs): address_path = self.get_derivation_prefix() + "/%d/%d" % sequence address_path = normalize_bip32_derivation(address_path, hardened_char="'") address_path = address_path[2:] # cut m/ return self.get_client_dongle_object().show_address(address_path, *args, **kwargs) class LedgerPlugin(HW_PluginBase): keystore_class = Ledger_KeyStore minimum_library = (0, 2, 0) maximum_library = (1, 0) DEVICE_IDS = [(0x2581, 0x1807), # HW.1 legacy btchip # not supported anymore (but we log an exception) (0x2581, 0x2b7c), # HW.1 transitional production # not supported anymore (0x2581, 0x3b7c), # HW.1 ledger production # not supported anymore (0x2581, 0x4b7c), # HW.1 ledger test # not supported anymore (0x2c97, 0x0000), # Blue (0x2c97, 0x0001), # Nano-S (0x2c97, 0x0004), # Nano-X (0x2c97, 0x0005), # Nano-S Plus (0x2c97, 0x0006), # Stax (0x2c97, 0x0007), # Flex (0x2c97, 0x0008), # RFU (0x2c97, 0x0009), # RFU (0x2c97, 0x000a)] # RFU VENDOR_IDS = (0x2c97,) LEDGER_MODEL_IDS = { 0x10: "Ledger Nano S", 0x40: "Ledger Nano X", 0x50: "Ledger Nano S Plus", 0x60: "Ledger Stax", 0x70: "Ledger Flex", } SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') def __init__(self, parent, config, name): HW_PluginBase.__init__(self, parent, config, name) self.libraries_available = self.check_libraries_available() if not self.libraries_available: _logger.info("Library unavailable") return # to support legacy devices and legacy firmwares self.device_manager().register_devices(self.DEVICE_IDS, plugin=self) # to support modern firmware self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self) def get_library_version(self): try: import ledger_bitcoin version = ledger_bitcoin.__version__ except ImportError: raise except Exception: version = "unknown" if LEDGER_BITCOIN: return version else: raise LibraryFoundButUnusable(library_version=version) @classmethod def is_hw1(cls, product_key) -> bool: return product_key[0] == 0x2581 @classmethod def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]: """Returns (can_recognize, model_name) tuple.""" # legacy product_keys if product_key in cls.DEVICE_IDS: if cls.is_hw1(product_key): return True, "Ledger HW.1" if product_key == (0x2c97, 0x0000): return True, "Ledger Blue" if product_key == (0x2c97, 0x0001): return True, "Ledger Nano S" if product_key == (0x2c97, 0x0004): return True, "Ledger Nano X" if product_key == (0x2c97, 0x0005): return True, "Ledger Nano S Plus" if product_key == (0x2c97, 0x0006): return True, "Ledger Stax" if product_key == (0x2c97, 0x0007): return True, "Ledger Flex" return True, None # modern product_keys if product_key[0] == 0x2c97: product_id = product_key[1] model_id = product_id >> 8 if model_id in cls.LEDGER_MODEL_IDS: model_name = cls.LEDGER_MODEL_IDS[model_id] return True, model_name # give up return False, None def can_recognize_device(self, device: Device) -> bool: can_recognize = self._recognize_device(device.product_key)[0] if can_recognize: # Do a further check, duplicated from: # https://github.com/LedgerHQ/ledgercomm/blob/bc5ada865980cb63c2b9b71a916e01f2f8e53716/ledgercomm/interfaces/hid_device.py#L79-L82 # Modern ledger devices can have multiple interfaces picked up by hid, only one of which is usable by us. # If we try communicating with the wrong one, we might not get a reply and block forever. if device.product_key[0] == 0x2c97: if not (device.interface_number == 0 or device.usage_page == 0xffa0): return False return can_recognize @classmethod def device_name_from_product_key(cls, product_key) -> Optional[str]: return cls._recognize_device(product_key)[1] def create_device_from_hid_enumeration(self, d, *, product_key): device = super().create_device_from_hid_enumeration(d, product_key=product_key) if not self.can_recognize_device(device): return None return device @runs_in_hwd_thread def create_client(self, device, handler) -> Union[Ledger_Client, None, HardwareClientDummy]: try: return Ledger_Client.construct_new(device=device, product_key=device.product_key, plugin=self) except Exception as e: self.logger.info(f"cannot connect at {device.path} {e}", exc_info=e) return None @runs_in_hwd_thread def show_address(self, wallet, address, keystore=None): if keystore is None: keystore = wallet.get_keystore() if not self.show_address_helper(wallet, address, keystore): return if type(wallet) is not Standard_Wallet: keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device)) return sequence = wallet.get_address_index(address) txin_type = wallet.get_txin_type(address) keystore.show_address(sequence, txin_type) def wizard_entry_for_device(self, device_info: 'DeviceInfo', *, new_wallet=True) -> str: if new_wallet: return 'ledger_start' if device_info.initialized else 'ledger_not_initialized' else: return 'ledger_unlock' # insert ledger pages in new wallet wizard def extend_wizard(self, wizard: 'NewWalletWizard'): views = { 'ledger_start': { 'next': 'ledger_xpub', }, 'ledger_xpub': { 'next': lambda d: wizard.wallet_password_view(d) if wizard.last_cosigner(d) else 'multisig_cosigner_keystore', 'accept': wizard.maybe_master_pubkey, 'last': lambda d: wizard.is_single_password() and wizard.last_cosigner(d) }, 'ledger_not_initialized': {}, 'ledger_unlock': { 'last': True }, } wizard.navmap_merge(views)