clean-up hw-wallet "get_password_for_storage_encryption"-related code
This commit is contained in:
@@ -37,8 +37,7 @@ from .bip32 import is_bip32_derivation, xpub_type, normalize_bip32_derivation, B
|
|||||||
from .keystore import bip44_derivation, purpose48_derivation, Hardware_KeyStore, KeyStore
|
from .keystore import bip44_derivation, purpose48_derivation, Hardware_KeyStore, KeyStore
|
||||||
from .wallet import (Imported_Wallet, Standard_Wallet, Multisig_Wallet,
|
from .wallet import (Imported_Wallet, Standard_Wallet, Multisig_Wallet,
|
||||||
wallet_types, Wallet, Abstract_Wallet)
|
wallet_types, Wallet, Abstract_Wallet)
|
||||||
from .storage import (WalletStorage, StorageEncryptionVersion,
|
from .storage import WalletStorage, StorageEncryptionVersion
|
||||||
get_derivation_used_for_hw_device_encryption)
|
|
||||||
from .wallet_db import WalletDB
|
from .wallet_db import WalletDB
|
||||||
from .i18n import _
|
from .i18n import _
|
||||||
from .util import UserCancelled, InvalidPassword, WalletFileException
|
from .util import UserCancelled, InvalidPassword, WalletFileException
|
||||||
@@ -334,7 +333,9 @@ class BaseWizard(Logger):
|
|||||||
run_next=lambda *args: self.on_device(*args, purpose=purpose, storage=storage))
|
run_next=lambda *args: self.on_device(*args, purpose=purpose, storage=storage))
|
||||||
|
|
||||||
def on_device(self, name, device_info, *, purpose, storage=None):
|
def on_device(self, name, device_info, *, purpose, storage=None):
|
||||||
self.plugin = self.plugins.get_plugin(name) # type: HW_PluginBase
|
self.plugin = self.plugins.get_plugin(name)
|
||||||
|
assert isinstance(self.plugin, HW_PluginBase)
|
||||||
|
devmgr = self.plugins.device_manager
|
||||||
try:
|
try:
|
||||||
self.plugin.setup_device(device_info, self, purpose)
|
self.plugin.setup_device(device_info, self, purpose)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
@@ -342,7 +343,6 @@ class BaseWizard(Logger):
|
|||||||
+ '\n' + str(e) + '\n'
|
+ '\n' + str(e) + '\n'
|
||||||
+ _('To try to fix this, we will now re-pair with your device.') + '\n'
|
+ _('To try to fix this, we will now re-pair with your device.') + '\n'
|
||||||
+ _('Please try again.'))
|
+ _('Please try again.'))
|
||||||
devmgr = self.plugins.device_manager
|
|
||||||
devmgr.unpair_id(device_info.device.id_)
|
devmgr.unpair_id(device_info.device.id_)
|
||||||
self.choose_hw_device(purpose, storage=storage)
|
self.choose_hw_device(purpose, storage=storage)
|
||||||
return
|
return
|
||||||
@@ -350,7 +350,6 @@ class BaseWizard(Logger):
|
|||||||
if self.question(e.text_ignore_old_fw_and_continue(), title=_("Outdated device firmware")):
|
if self.question(e.text_ignore_old_fw_and_continue(), title=_("Outdated device firmware")):
|
||||||
self.plugin.set_ignore_outdated_fw()
|
self.plugin.set_ignore_outdated_fw()
|
||||||
# will need to re-pair
|
# will need to re-pair
|
||||||
devmgr = self.plugins.device_manager
|
|
||||||
devmgr.unpair_id(device_info.device.id_)
|
devmgr.unpair_id(device_info.device.id_)
|
||||||
self.choose_hw_device(purpose, storage=storage)
|
self.choose_hw_device(purpose, storage=storage)
|
||||||
return
|
return
|
||||||
@@ -368,14 +367,12 @@ class BaseWizard(Logger):
|
|||||||
self.run('on_hw_derivation', name, device_info, derivation, script_type)
|
self.run('on_hw_derivation', name, device_info, derivation, script_type)
|
||||||
self.derivation_and_script_type_dialog(f)
|
self.derivation_and_script_type_dialog(f)
|
||||||
elif purpose == HWD_SETUP_DECRYPT_WALLET:
|
elif purpose == HWD_SETUP_DECRYPT_WALLET:
|
||||||
derivation = get_derivation_used_for_hw_device_encryption()
|
client = devmgr.client_by_id(device_info.device.id_)
|
||||||
xpub = self.plugin.get_xpub(device_info.device.id_, derivation, 'standard', self)
|
password = client.get_password_for_storage_encryption()
|
||||||
password = keystore.Xpub.get_pubkey_from_xpub(xpub, ()).hex()
|
|
||||||
try:
|
try:
|
||||||
storage.decrypt(password)
|
storage.decrypt(password)
|
||||||
except InvalidPassword:
|
except InvalidPassword:
|
||||||
# try to clear session so that user can type another passphrase
|
# try to clear session so that user can type another passphrase
|
||||||
devmgr = self.plugins.device_manager
|
|
||||||
client = devmgr.client_by_id(device_info.device.id_)
|
client = devmgr.client_by_id(device_info.device.id_)
|
||||||
if hasattr(client, 'clear_session'): # FIXME not all hw wallet plugins have this
|
if hasattr(client, 'clear_session'): # FIXME not all hw wallet plugins have this
|
||||||
client.clear_session()
|
client.clear_session()
|
||||||
|
|||||||
@@ -765,12 +765,8 @@ class Hardware_KeyStore(Xpub, KeyStore):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def get_password_for_storage_encryption(self) -> str:
|
def get_password_for_storage_encryption(self) -> str:
|
||||||
from .storage import get_derivation_used_for_hw_device_encryption
|
|
||||||
client = self.plugin.get_client(self)
|
client = self.plugin.get_client(self)
|
||||||
derivation = get_derivation_used_for_hw_device_encryption()
|
return client.get_password_for_storage_encryption()
|
||||||
xpub = client.get_xpub(derivation, "standard")
|
|
||||||
password = self.get_pubkey_from_xpub(xpub, ()).hex()
|
|
||||||
return password
|
|
||||||
|
|
||||||
def has_usable_connection_with_device(self) -> bool:
|
def has_usable_connection_with_device(self) -> bool:
|
||||||
if not hasattr(self, 'plugin'):
|
if not hasattr(self, 'plugin'):
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ class Plugins(DaemonThread):
|
|||||||
self.pkgpath = os.path.dirname(plugins.__file__)
|
self.pkgpath = os.path.dirname(plugins.__file__)
|
||||||
self.config = config
|
self.config = config
|
||||||
self.hw_wallets = {}
|
self.hw_wallets = {}
|
||||||
self.plugins = {}
|
self.plugins = {} # type: Dict[str, BasePlugin]
|
||||||
self.gui_name = gui_name
|
self.gui_name = gui_name
|
||||||
self.descriptions = {}
|
self.descriptions = {}
|
||||||
self.device_manager = DeviceMgr(config)
|
self.device_manager = DeviceMgr(config)
|
||||||
@@ -198,8 +198,8 @@ class Plugins(DaemonThread):
|
|||||||
self.logger.info(f"registering hardware {name}: {details}")
|
self.logger.info(f"registering hardware {name}: {details}")
|
||||||
register_keystore(details[1], dynamic_constructor)
|
register_keystore(details[1], dynamic_constructor)
|
||||||
|
|
||||||
def get_plugin(self, name):
|
def get_plugin(self, name: str) -> 'BasePlugin':
|
||||||
if not name in self.plugins:
|
if name not in self.plugins:
|
||||||
self.load_plugin(name)
|
self.load_plugin(name)
|
||||||
return self.plugins[name]
|
return self.plugins[name]
|
||||||
|
|
||||||
|
|||||||
@@ -32,10 +32,11 @@ from electrum.bitcoin import is_address, opcodes
|
|||||||
from electrum.util import bfh, versiontuple, UserFacingException
|
from electrum.util import bfh, versiontuple, UserFacingException
|
||||||
from electrum.transaction import TxOutput, Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
|
from electrum.transaction import TxOutput, Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
|
||||||
from electrum.bip32 import BIP32Node
|
from electrum.bip32 import BIP32Node
|
||||||
|
from electrum.storage import get_derivation_used_for_hw_device_encryption
|
||||||
|
from electrum.keystore import Xpub, Hardware_KeyStore
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from electrum.wallet import Abstract_Wallet
|
from electrum.wallet import Abstract_Wallet
|
||||||
from electrum.keystore import Hardware_KeyStore
|
|
||||||
|
|
||||||
|
|
||||||
class HW_PluginBase(BasePlugin):
|
class HW_PluginBase(BasePlugin):
|
||||||
@@ -69,7 +70,7 @@ class HW_PluginBase(BasePlugin):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def get_client(self, keystore: 'Hardware_KeyStore', force_pair: bool = True):
|
def get_client(self, keystore: 'Hardware_KeyStore', force_pair: bool = True) -> Optional['HardwareClientBase']:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def show_address(self, wallet: 'Abstract_Wallet', address, keystore: 'Hardware_KeyStore' = None):
|
def show_address(self, wallet: 'Abstract_Wallet', address, keystore: 'Hardware_KeyStore' = None):
|
||||||
@@ -182,6 +183,13 @@ class HardwareClientBase:
|
|||||||
root_fingerprint = BIP32Node.from_xkey(child_of_root_xpub).fingerprint.hex().lower()
|
root_fingerprint = BIP32Node.from_xkey(child_of_root_xpub).fingerprint.hex().lower()
|
||||||
return root_fingerprint
|
return root_fingerprint
|
||||||
|
|
||||||
|
def get_password_for_storage_encryption(self) -> str:
|
||||||
|
# note: using a different password based on hw device type is highly undesirable! see #5993
|
||||||
|
derivation = get_derivation_used_for_hw_device_encryption()
|
||||||
|
xpub = self.get_xpub(derivation, "standard")
|
||||||
|
password = Xpub.get_pubkey_from_xpub(xpub, ()).hex()
|
||||||
|
return password
|
||||||
|
|
||||||
|
|
||||||
def is_any_tx_output_on_change_branch(tx: PartialTransaction) -> bool:
|
def is_any_tx_output_on_change_branch(tx: PartialTransaction) -> bool:
|
||||||
return any([txout.is_change for txout in tx.outputs()])
|
return any([txout.is_change for txout in tx.outputs()])
|
||||||
|
|||||||
27
run_electrum
27
run_electrum
@@ -25,8 +25,7 @@
|
|||||||
# SOFTWARE.
|
# SOFTWARE.
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import warnings
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
MIN_PYTHON_VERSION = "3.6.1" # FIXME duplicated from setup.py
|
MIN_PYTHON_VERSION = "3.6.1" # FIXME duplicated from setup.py
|
||||||
_min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split("."))))
|
_min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split("."))))
|
||||||
@@ -36,6 +35,11 @@ if sys.version_info[:3] < _min_python_version_tuple:
|
|||||||
sys.exit("Error: Electrum requires Python version >= %s..." % MIN_PYTHON_VERSION)
|
sys.exit("Error: Electrum requires Python version >= %s..." % MIN_PYTHON_VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
import warnings
|
||||||
|
import asyncio
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
|
||||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||||
is_bundle = getattr(sys, 'frozen', False)
|
is_bundle = getattr(sys, 'frozen', False)
|
||||||
is_local = not is_bundle and os.path.exists(os.path.join(script_dir, "electrum.desktop"))
|
is_local = not is_bundle and os.path.exists(os.path.join(script_dir, "electrum.desktop"))
|
||||||
@@ -83,7 +87,7 @@ from electrum import constants
|
|||||||
from electrum import SimpleConfig
|
from electrum import SimpleConfig
|
||||||
from electrum.wallet_db import WalletDB
|
from electrum.wallet_db import WalletDB
|
||||||
from electrum.wallet import Wallet
|
from electrum.wallet import Wallet
|
||||||
from electrum.storage import WalletStorage, get_derivation_used_for_hw_device_encryption
|
from electrum.storage import WalletStorage
|
||||||
from electrum.util import print_msg, print_stderr, json_encode, json_decode, UserCancelled
|
from electrum.util import print_msg, print_stderr, json_encode, json_decode, UserCancelled
|
||||||
from electrum.util import InvalidPassword
|
from electrum.util import InvalidPassword
|
||||||
from electrum.commands import get_parser, known_commands, Commands, config_variables
|
from electrum.commands import get_parser, known_commands, Commands, config_variables
|
||||||
@@ -91,6 +95,9 @@ from electrum import daemon
|
|||||||
from electrum import keystore
|
from electrum import keystore
|
||||||
from electrum.util import create_and_start_event_loop
|
from electrum.util import create_and_start_event_loop
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from electrum.plugin import Plugins
|
||||||
|
|
||||||
_logger = get_logger(__name__)
|
_logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -166,7 +173,7 @@ def init_cmdline(config_options, wallet_path, server):
|
|||||||
config_options['new_password'] = new_password
|
config_options['new_password'] = new_password
|
||||||
|
|
||||||
|
|
||||||
def get_connected_hw_devices(plugins):
|
def get_connected_hw_devices(plugins: 'Plugins'):
|
||||||
supported_plugins = plugins.get_hardware_support()
|
supported_plugins = plugins.get_hardware_support()
|
||||||
# scan devices
|
# scan devices
|
||||||
devices = []
|
devices = []
|
||||||
@@ -186,7 +193,7 @@ def get_connected_hw_devices(plugins):
|
|||||||
return devices
|
return devices
|
||||||
|
|
||||||
|
|
||||||
def get_password_for_hw_device_encrypted_storage(plugins) -> str:
|
def get_password_for_hw_device_encrypted_storage(plugins: 'Plugins') -> str:
|
||||||
devices = get_connected_hw_devices(plugins)
|
devices = get_connected_hw_devices(plugins)
|
||||||
if len(devices) == 0:
|
if len(devices) == 0:
|
||||||
print_msg("Error: No connected hw device found. Cannot decrypt this wallet.")
|
print_msg("Error: No connected hw device found. Cannot decrypt this wallet.")
|
||||||
@@ -196,17 +203,15 @@ def get_password_for_hw_device_encrypted_storage(plugins) -> str:
|
|||||||
"The first one will be used to decrypt the wallet.")
|
"The first one will be used to decrypt the wallet.")
|
||||||
# FIXME we use the "first" device, in case of multiple ones
|
# FIXME we use the "first" device, in case of multiple ones
|
||||||
name, device_info = devices[0]
|
name, device_info = devices[0]
|
||||||
plugin = plugins.get_plugin(name)
|
devmgr = plugins.device_manager
|
||||||
derivation = get_derivation_used_for_hw_device_encryption()
|
|
||||||
try:
|
try:
|
||||||
xpub = plugin.get_xpub(device_info.device.id_, derivation, 'standard', plugin.handler)
|
client = devmgr.client_by_id(device_info.device.id_)
|
||||||
|
return client.get_password_for_storage_encryption()
|
||||||
except UserCancelled:
|
except UserCancelled:
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
password = keystore.Xpub.get_pubkey_from_xpub(xpub, ()).hex()
|
|
||||||
return password
|
|
||||||
|
|
||||||
|
|
||||||
async def run_offline_command(config, config_options, plugins):
|
async def run_offline_command(config, config_options, plugins: 'Plugins'):
|
||||||
cmdname = config.get('cmd')
|
cmdname = config.get('cmd')
|
||||||
cmd = known_commands[cmdname]
|
cmd = known_commands[cmdname]
|
||||||
password = config_options.get('password')
|
password = config_options.get('password')
|
||||||
|
|||||||
Reference in New Issue
Block a user