Files
pallectrum/electrum/base_wizard.py
SomberNight 24980feab7 config: introduce ConfigVars
A new config API is introduced, and ~all of the codebase is adapted to it.
The old API is kept but mainly only for dynamic usage where its extra flexibility is needed.

Using examples, the old config API looked this:
```
>>> config.get("request_expiry", 86400)
604800
>>> config.set_key("request_expiry", 86400)
>>>
```

The new config API instead:
```
>>> config.WALLET_PAYREQ_EXPIRY_SECONDS
604800
>>> config.WALLET_PAYREQ_EXPIRY_SECONDS = 86400
>>>
```

The old API operated on arbitrary string keys, the new one uses
a static ~enum-like list of variables.

With the new API:
- there is a single centralised list of config variables, as opposed to
  these being scattered all over
- no more duplication of default values (in the getters)
- there is now some (minimal for now) type-validation/conversion for
  the config values

closes https://github.com/spesmilo/electrum/pull/5640
closes https://github.com/spesmilo/electrum/pull/5649

Note: there is yet a third API added here, for certain niche/abstract use-cases,
where we need a reference to the config variable itself.
It should only be used when needed:
```
>>> var = config.cv.WALLET_PAYREQ_EXPIRY_SECONDS
>>> var
<ConfigVarWithConfig key='request_expiry'>
>>> var.get()
604800
>>> var.set(3600)
>>> var.get_default_value()
86400
>>> var.is_set()
True
>>> var.is_modifiable()
True
```
2023-05-25 17:39:48 +00:00

742 lines
32 KiB
Python

# -*- coding: utf-8 -*-
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2016 Thomas Voegtlin
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import sys
import copy
import traceback
from functools import partial
from typing import List, TYPE_CHECKING, Tuple, NamedTuple, Any, Dict, Optional, Union
from . import bitcoin
from . import keystore
from . import mnemonic
from .bip32 import is_bip32_derivation, xpub_type, normalize_bip32_derivation, BIP32Node
from .keystore import bip44_derivation, purpose48_derivation, Hardware_KeyStore, KeyStore, bip39_to_seed
from .wallet import (Imported_Wallet, Standard_Wallet, Multisig_Wallet,
wallet_types, Wallet, Abstract_Wallet)
from .storage import WalletStorage, StorageEncryptionVersion
from .wallet_db import WalletDB
from .i18n import _
from .util import UserCancelled, InvalidPassword, WalletFileException, UserFacingException
from .simple_config import SimpleConfig
from .plugin import Plugins, HardwarePluginLibraryUnavailable
from .logging import Logger
from .plugins.hw_wallet.plugin import OutdatedHwFirmwareException, HW_PluginBase
if TYPE_CHECKING:
from .plugin import DeviceInfo, BasePlugin
# hardware device setup purpose
HWD_SETUP_NEW_WALLET, HWD_SETUP_DECRYPT_WALLET = range(0, 2)
class ScriptTypeNotSupported(Exception): pass
class GoBack(Exception): pass
class ReRunDialog(Exception): pass
class ChooseHwDeviceAgain(Exception): pass
class WizardStackItem(NamedTuple):
action: Any
args: Any
kwargs: Dict[str, Any]
db_data: dict
class WizardWalletPasswordSetting(NamedTuple):
password: Optional[str]
encrypt_storage: bool
storage_enc_version: StorageEncryptionVersion
encrypt_keystore: bool
class BaseWizard(Logger):
def __init__(self, config: SimpleConfig, plugins: Plugins):
super(BaseWizard, self).__init__()
Logger.__init__(self)
self.config = config
self.plugins = plugins
self.data = {}
self.pw_args = None # type: Optional[WizardWalletPasswordSetting]
self._stack = [] # type: List[WizardStackItem]
self.plugin = None # type: Optional[BasePlugin]
self.keystores = [] # type: List[KeyStore]
self.is_kivy = config.GUI_NAME == 'kivy'
self.seed_type = None
def set_icon(self, icon):
pass
def run(self, *args, **kwargs):
action = args[0]
args = args[1:]
db_data = copy.deepcopy(self.data)
self._stack.append(WizardStackItem(action, args, kwargs, db_data))
if not action:
return
if type(action) is tuple:
self.plugin, action = action
if self.plugin and hasattr(self.plugin, action):
f = getattr(self.plugin, action)
f(self, *args, **kwargs)
elif hasattr(self, action):
f = getattr(self, action)
f(*args, **kwargs)
else:
raise Exception("unknown action", action)
def can_go_back(self):
return len(self._stack) > 1
def go_back(self, *, rerun_previous: bool = True) -> None:
if not self.can_go_back():
return
# pop 'current' frame
self._stack.pop()
prev_frame = self._stack[-1]
# try to undo side effects since we last entered 'previous' frame
# FIXME only self.data is properly restored
self.data = copy.deepcopy(prev_frame.db_data)
if rerun_previous:
# pop 'previous' frame
self._stack.pop()
# rerun 'previous' frame
self.run(prev_frame.action, *prev_frame.args, **prev_frame.kwargs)
def reset_stack(self):
self._stack = []
def new(self):
title = _("Create new wallet")
message = '\n'.join([
_("What kind of wallet do you want to create?")
])
wallet_kinds = [
('standard', _("Standard wallet")),
('2fa', _("Wallet with two-factor authentication")),
('multisig', _("Multi-signature wallet")),
('imported', _("Import Bitcoin addresses or private keys")),
]
choices = [pair for pair in wallet_kinds if pair[0] in wallet_types]
self.choice_dialog(title=title, message=message, choices=choices, run_next=self.on_wallet_type)
def upgrade_db(self, storage, db):
exc = None # type: Optional[Exception]
def on_finished():
if exc is None:
self.terminate(storage=storage, db=db)
else:
raise exc
def do_upgrade():
nonlocal exc
try:
db.upgrade()
except Exception as e:
exc = e
self.waiting_dialog(do_upgrade, _('Upgrading wallet format...'), on_finished=on_finished)
def run_task_without_blocking_gui(self, task, *, msg: str = None) -> Any:
"""Perform a task in a thread without blocking the GUI.
Returns the result of 'task', or raises the same exception.
This method blocks until 'task' is finished.
"""
raise NotImplementedError()
def load_2fa(self):
self.data['wallet_type'] = '2fa'
self.data['use_trustedcoin'] = True
self.plugin = self.plugins.load_plugin('trustedcoin')
def on_wallet_type(self, choice):
self.data['wallet_type'] = self.wallet_type = choice
if choice == 'standard':
action = 'choose_keystore'
elif choice == 'multisig':
action = 'choose_multisig'
elif choice == '2fa':
self.load_2fa()
action = self.plugin.get_action(self.data)
elif choice == 'imported':
action = 'import_addresses_or_keys'
self.run(action)
def choose_multisig(self):
def on_multisig(m, n):
multisig_type = "%dof%d" % (m, n)
self.data['wallet_type'] = multisig_type
self.n = n
self.run('choose_keystore')
self.multisig_dialog(run_next=on_multisig)
def choose_keystore(self):
assert self.wallet_type in ['standard', 'multisig']
i = len(self.keystores)
title = _('Add cosigner') + ' (%d of %d)'%(i+1, self.n) if self.wallet_type=='multisig' else _('Keystore')
if self.wallet_type =='standard' or i==0:
message = _('Do you want to create a new seed, or to restore a wallet using an existing seed?')
choices = [
('choose_seed_type', _('Create a new seed')),
('restore_from_seed', _('I already have a seed')),
('restore_from_key', _('Use a master key')),
]
if not self.is_kivy:
choices.append(('choose_hw_device', _('Use a hardware device')))
else:
message = _('Add a cosigner to your multi-sig wallet')
choices = [
('restore_from_key', _('Enter cosigner key')),
('restore_from_seed', _('Enter cosigner seed')),
]
if not self.is_kivy:
choices.append(('choose_hw_device', _('Cosign with hardware device')))
self.choice_dialog(title=title, message=message, choices=choices, run_next=self.run)
def import_addresses_or_keys(self):
v = lambda x: keystore.is_address_list(x) or keystore.is_private_key_list(x, raise_on_error=True)
title = _("Import Bitcoin Addresses")
message = _("Enter a list of Bitcoin addresses (this will create a watching-only wallet), or a list of private keys.")
self.add_xpub_dialog(title=title, message=message, run_next=self.on_import,
is_valid=v, allow_multi=True, show_wif_help=True)
def on_import(self, text):
# text is already sanitized by is_address_list and is_private_keys_list
if keystore.is_address_list(text):
self.data['addresses'] = {}
for addr in text.split():
assert bitcoin.is_address(addr)
self.data['addresses'][addr] = {}
elif keystore.is_private_key_list(text):
self.data['addresses'] = {}
k = keystore.Imported_KeyStore({})
keys = keystore.get_private_keys(text)
for pk in keys:
assert bitcoin.is_private_key(pk)
txin_type, pubkey = k.import_privkey(pk, None)
addr = bitcoin.pubkey_to_address(txin_type, pubkey)
self.data['addresses'][addr] = {'type':txin_type, 'pubkey':pubkey}
self.keystores.append(k)
else:
return self.terminate(aborted=True)
return self.run('create_wallet')
def restore_from_key(self):
if self.wallet_type == 'standard':
v = keystore.is_master_key
title = _("Create keystore from a master key")
message = ' '.join([
_("To create a watching-only wallet, please enter your master public key (xpub/ypub/zpub)."),
_("To create a spending wallet, please enter a master private key (xprv/yprv/zprv).")
])
self.add_xpub_dialog(title=title, message=message, run_next=self.on_restore_from_key, is_valid=v)
else:
i = len(self.keystores) + 1
self.add_cosigner_dialog(index=i, run_next=self.on_restore_from_key, is_valid=keystore.is_bip32_key)
def on_restore_from_key(self, text):
k = keystore.from_master_key(text)
self.on_keystore(k)
def choose_hw_device(self, purpose=HWD_SETUP_NEW_WALLET, *, storage: WalletStorage = None):
while True:
try:
self._choose_hw_device(purpose=purpose, storage=storage)
except ChooseHwDeviceAgain:
pass
else:
break
def _choose_hw_device(self, *, purpose, storage: WalletStorage = None):
title = _('Hardware Keystore')
# check available plugins
supported_plugins = self.plugins.get_hardware_support()
devices = [] # type: List[Tuple[str, DeviceInfo]]
devmgr = self.plugins.device_manager
debug_msg = ''
def failed_getting_device_infos(name, e):
nonlocal debug_msg
err_str_oneline = ' // '.join(str(e).splitlines())
self.logger.warning(f'error getting device infos for {name}: {err_str_oneline}')
indented_error_msg = ' '.join([''] + str(e).splitlines(keepends=True))
debug_msg += f' {name}: (error getting device infos)\n{indented_error_msg}\n'
# scan devices
try:
scanned_devices = self.run_task_without_blocking_gui(task=devmgr.scan_devices,
msg=_("Scanning devices..."))
except BaseException as e:
self.logger.info('error scanning devices: {}'.format(repr(e)))
debug_msg = ' {}:\n {}'.format(_('Error scanning devices'), e)
else:
for splugin in supported_plugins:
name, plugin = splugin.name, splugin.plugin
# plugin init errored?
if not plugin:
e = splugin.exception
indented_error_msg = ' '.join([''] + str(e).splitlines(keepends=True))
debug_msg += f' {name}: (error during plugin init)\n'
debug_msg += ' {}\n'.format(_('You might have an incompatible library.'))
debug_msg += f'{indented_error_msg}\n'
continue
# see if plugin recognizes 'scanned_devices'
try:
# FIXME: side-effect: this sets client.handler
device_infos = devmgr.list_pairable_device_infos(
handler=None, plugin=plugin, devices=scanned_devices, include_failing_clients=True)
except HardwarePluginLibraryUnavailable as e:
failed_getting_device_infos(name, e)
continue
except BaseException as e:
self.logger.exception('')
failed_getting_device_infos(name, e)
continue
device_infos_failing = list(filter(lambda di: di.exception is not None, device_infos))
for di in device_infos_failing:
failed_getting_device_infos(name, di.exception)
device_infos_working = list(filter(lambda di: di.exception is None, device_infos))
devices += list(map(lambda x: (name, x), device_infos_working))
if not debug_msg:
debug_msg = ' {}'.format(_('No exceptions encountered.'))
if not devices:
msg = (_('No hardware device detected.') + '\n' +
_('To trigger a rescan, press \'Next\'.') + '\n\n')
if sys.platform == 'win32':
msg += _('If your device is not detected on Windows, go to "Settings", "Devices", "Connected devices", '
'and do "Remove device". Then, plug your device again.') + '\n'
msg += _('While this is less than ideal, it might help if you run Electrum as Administrator.') + '\n'
else:
msg += _('On Linux, you might have to add a new permission to your udev rules.') + '\n'
msg += '\n\n'
msg += _('Debug message') + '\n' + debug_msg
self.confirm_dialog(title=title, message=msg,
run_next=lambda x: None)
raise ChooseHwDeviceAgain()
# select device
self.devices = devices
choices = []
for name, info in devices:
state = _("initialized") if info.initialized else _("wiped")
label = info.label or _("An unnamed {}").format(name)
try: transport_str = info.device.transport_ui_string[:20]
except Exception: transport_str = 'unknown transport'
descr = f"{label} [{info.model_name or name}, {state}, {transport_str}]"
choices.append(((name, info), descr))
msg = _('Select a device') + ':'
self.choice_dialog(title=title, message=msg, choices=choices,
run_next=lambda *args: self.on_device(*args, purpose=purpose, storage=storage))
def on_device(self, name, device_info: 'DeviceInfo', *, purpose, storage: WalletStorage = None):
self.plugin = self.plugins.get_plugin(name)
assert isinstance(self.plugin, HW_PluginBase)
devmgr = self.plugins.device_manager
try:
client = self.plugin.setup_device(device_info, self, purpose)
except OSError as e:
self.show_error(_('We encountered an error while connecting to your device:')
+ '\n' + str(e) + '\n'
+ _('To try to fix this, we will now re-pair with your device.') + '\n'
+ _('Please try again.'))
devmgr.unpair_id(device_info.device.id_)
raise ChooseHwDeviceAgain()
except OutdatedHwFirmwareException as e:
if self.question(e.text_ignore_old_fw_and_continue(), title=_("Outdated device firmware")):
self.plugin.set_ignore_outdated_fw()
# will need to re-pair
devmgr.unpair_id(device_info.device.id_)
raise ChooseHwDeviceAgain()
except GoBack:
raise ChooseHwDeviceAgain()
except (UserCancelled, ReRunDialog):
raise
except UserFacingException as e:
self.show_error(str(e))
raise ChooseHwDeviceAgain()
except BaseException as e:
self.logger.exception('')
self.show_error(str(e))
raise ChooseHwDeviceAgain()
if purpose == HWD_SETUP_NEW_WALLET:
def f(derivation, script_type):
derivation = normalize_bip32_derivation(derivation)
self.run('on_hw_derivation', name, device_info, derivation, script_type)
self.derivation_and_script_type_dialog(f)
elif purpose == HWD_SETUP_DECRYPT_WALLET:
password = client.get_password_for_storage_encryption()
try:
storage.decrypt(password)
except InvalidPassword:
# try to clear session so that user can type another passphrase
if hasattr(client, 'clear_session'): # FIXME not all hw wallet plugins have this
client.clear_session()
raise
else:
raise Exception('unknown purpose: %s' % purpose)
def derivation_and_script_type_dialog(self, f, *, get_account_xpub=None):
message1 = _('Choose the type of addresses in your wallet.')
message2 = ' '.join([
_('You can override the suggested derivation path.'),
_('If you are not sure what this is, leave this field unchanged.')
])
hide_choices = False
if self.wallet_type == 'multisig':
# There is no general standard for HD multisig.
# For legacy, this is partially compatible with BIP45; assumes index=0
# For segwit, a custom path is used, as there is no standard at all.
default_choice_idx = 2
choices = [
('standard', 'legacy multisig (p2sh)', normalize_bip32_derivation("m/45'/0")),
('p2wsh-p2sh', 'p2sh-segwit multisig (p2wsh-p2sh)', purpose48_derivation(0, xtype='p2wsh-p2sh')),
('p2wsh', 'native segwit multisig (p2wsh)', purpose48_derivation(0, xtype='p2wsh')),
]
# if this is not the first cosigner, pre-select the expected script type,
# and hide the choices
script_type = self.get_script_type_of_wallet()
if script_type is not None:
script_types = [*zip(*choices)][0]
chosen_idx = script_types.index(script_type)
default_choice_idx = chosen_idx
hide_choices = True
else:
default_choice_idx = 2
choices = [
('standard', 'legacy (p2pkh)', bip44_derivation(0, bip43_purpose=44)),
('p2wpkh-p2sh', 'p2sh-segwit (p2wpkh-p2sh)', bip44_derivation(0, bip43_purpose=49)),
('p2wpkh', 'native segwit (p2wpkh)', bip44_derivation(0, bip43_purpose=84)),
]
while True:
try:
self.derivation_and_script_type_gui_specific_dialog(
run_next=f,
title=_('Script type and Derivation path'),
message1=message1,
message2=message2,
choices=choices,
test_text=is_bip32_derivation,
default_choice_idx=default_choice_idx,
get_account_xpub=get_account_xpub,
hide_choices=hide_choices,
)
return
except ScriptTypeNotSupported as e:
self.show_error(e)
# let the user choose again
def on_hw_derivation(self, name, device_info: 'DeviceInfo', derivation, xtype):
from .keystore import hardware_keystore
devmgr = self.plugins.device_manager
assert isinstance(self.plugin, HW_PluginBase)
try:
xpub = self.plugin.get_xpub(device_info.device.id_, derivation, xtype, self)
client = devmgr.client_by_id(device_info.device.id_, scan_now=False)
if not client: raise Exception("failed to find client for device id")
root_fingerprint = client.request_root_fingerprint_from_device()
label = client.label() # use this as device_info.label might be outdated!
soft_device_id = client.get_soft_device_id() # use this as device_info.device_id might be outdated!
except ScriptTypeNotSupported:
raise # this is handled in derivation_dialog
except BaseException as e:
self.logger.exception('')
self.show_error(e)
raise ChooseHwDeviceAgain()
d = {
'type': 'hardware',
'hw_type': name,
'derivation': derivation,
'root_fingerprint': root_fingerprint,
'xpub': xpub,
'label': label,
'soft_device_id': soft_device_id,
}
try:
client.manipulate_keystore_dict_during_wizard_setup(d)
except Exception as e:
self.logger.exception('')
self.show_error(e)
raise ChooseHwDeviceAgain()
k = hardware_keystore(d)
self.on_keystore(k)
def passphrase_dialog(self, run_next, is_restoring=False):
title = _('Seed extension')
message = '\n'.join([
_('You may extend your seed with custom words.'),
_('Your seed extension must be saved together with your seed.'),
])
warning = '\n'.join([
_('Note that this is NOT your encryption password.'),
_('If you do not know what this is, leave this field empty.'),
])
warn_issue4566 = is_restoring and self.seed_type == 'bip39'
self.line_dialog(title=title, message=message, warning=warning,
default='', test=lambda x:True, run_next=run_next,
warn_issue4566=warn_issue4566)
def restore_from_seed(self):
self.opt_bip39 = True
self.opt_slip39 = True
self.opt_ext = True
is_cosigning_seed = lambda x: mnemonic.seed_type(x) in ['standard', 'segwit']
test = mnemonic.is_seed if self.wallet_type == 'standard' else is_cosigning_seed
f = lambda *args: self.run('on_restore_seed', *args)
self.restore_seed_dialog(run_next=f, test=test)
def on_restore_seed(self, seed, seed_type, is_ext):
self.seed_type = seed_type if seed_type != 'electrum' else mnemonic.seed_type(seed)
if self.seed_type == 'bip39':
def f(passphrase):
root_seed = bip39_to_seed(seed, passphrase)
self.on_restore_bip43(root_seed)
self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
elif self.seed_type == 'slip39':
def f(passphrase):
root_seed = seed.decrypt(passphrase)
self.on_restore_bip43(root_seed)
self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
elif self.seed_type in ['standard', 'segwit']:
f = lambda passphrase: self.run('create_keystore', seed, passphrase)
self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
elif self.seed_type == 'old':
self.run('create_keystore', seed, '')
elif mnemonic.is_any_2fa_seed_type(self.seed_type):
self.load_2fa()
self.run('on_restore_seed', seed, is_ext)
else:
raise Exception('Unknown seed type', self.seed_type)
def on_restore_bip43(self, root_seed):
def f(derivation, script_type):
derivation = normalize_bip32_derivation(derivation)
self.run('on_bip43', root_seed, derivation, script_type)
if self.wallet_type == 'standard':
def get_account_xpub(account_path):
root_node = BIP32Node.from_rootseed(root_seed, xtype="standard")
account_node = root_node.subkey_at_private_derivation(account_path)
account_xpub = account_node.to_xpub()
return account_xpub
else:
get_account_xpub = None
self.derivation_and_script_type_dialog(f, get_account_xpub=get_account_xpub)
def create_keystore(self, seed, passphrase):
k = keystore.from_seed(seed, passphrase, self.wallet_type == 'multisig')
if k.can_have_deterministic_lightning_xprv():
self.data['lightning_xprv'] = k.get_lightning_xprv(None)
self.on_keystore(k)
def on_bip43(self, root_seed, derivation, script_type):
k = keystore.from_bip43_rootseed(root_seed, derivation, xtype=script_type)
self.on_keystore(k)
def get_script_type_of_wallet(self) -> Optional[str]:
if len(self.keystores) > 0:
ks = self.keystores[0]
if isinstance(ks, keystore.Xpub):
return xpub_type(ks.xpub)
return None
def on_keystore(self, k: KeyStore):
has_xpub = isinstance(k, keystore.Xpub)
if has_xpub:
t1 = xpub_type(k.xpub)
if self.wallet_type == 'standard':
if has_xpub and t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
self.show_error(_('Wrong key type') + ' %s'%t1)
self.run('choose_keystore')
return
self.keystores.append(k)
self.run('create_wallet')
elif self.wallet_type == 'multisig':
assert has_xpub
if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
self.show_error(_('Wrong key type') + ' %s'%t1)
self.run('choose_keystore')
return
if k.xpub in map(lambda x: x.xpub, self.keystores):
self.show_error(_('Error: duplicate master public key'))
self.run('choose_keystore')
return
if len(self.keystores)>0:
t2 = xpub_type(self.keystores[0].xpub)
if t1 != t2:
self.show_error(_('Cannot add this cosigner:') + '\n' + "Their key type is '%s', we are '%s'"%(t1, t2))
self.run('choose_keystore')
return
if len(self.keystores) == 0:
xpub = k.get_master_public_key()
self.reset_stack()
self.keystores.append(k)
self.run('show_xpub_and_add_cosigners', xpub)
return
self.reset_stack()
self.keystores.append(k)
if len(self.keystores) < self.n:
self.run('choose_keystore')
else:
self.run('create_wallet')
def create_wallet(self):
encrypt_keystore = any(k.may_have_password() for k in self.keystores)
# note: the following condition ("if") is duplicated logic from
# wallet.get_available_storage_encryption_version()
if self.wallet_type == 'standard' and isinstance(self.keystores[0], Hardware_KeyStore):
# offer encrypting with a pw derived from the hw device
k = self.keystores[0] # type: Hardware_KeyStore
assert isinstance(self.plugin, HW_PluginBase)
try:
k.handler = self.plugin.create_handler(self)
password = k.get_password_for_storage_encryption()
except UserCancelled:
devmgr = self.plugins.device_manager
devmgr.unpair_pairing_code(k.pairing_code())
raise ChooseHwDeviceAgain()
except BaseException as e:
self.logger.exception('')
self.show_error(str(e))
raise ChooseHwDeviceAgain()
self.request_storage_encryption(
run_next=lambda encrypt_storage: self.on_password(
password,
encrypt_storage=encrypt_storage,
storage_enc_version=StorageEncryptionVersion.XPUB_PASSWORD,
encrypt_keystore=False))
else:
# reset stack to disable 'back' button in password dialog
self.reset_stack()
# prompt the user to set an arbitrary password
self.request_password(
run_next=lambda password, encrypt_storage: self.on_password(
password,
encrypt_storage=encrypt_storage,
storage_enc_version=StorageEncryptionVersion.USER_PASSWORD,
encrypt_keystore=encrypt_keystore),
force_disable_encrypt_cb=not encrypt_keystore)
def on_password(self, password, *, encrypt_storage: bool,
storage_enc_version=StorageEncryptionVersion.USER_PASSWORD,
encrypt_keystore: bool):
for k in self.keystores:
if k.may_have_password():
k.update_password(None, password)
if self.wallet_type == 'standard':
self.data['seed_type'] = self.seed_type
keys = self.keystores[0].dump()
self.data['keystore'] = keys
elif self.wallet_type == 'multisig':
for i, k in enumerate(self.keystores):
self.data['x%d/'%(i+1)] = k.dump()
elif self.wallet_type == 'imported':
if len(self.keystores) > 0:
keys = self.keystores[0].dump()
self.data['keystore'] = keys
else:
raise Exception('Unknown wallet type')
self.pw_args = WizardWalletPasswordSetting(password=password,
encrypt_storage=encrypt_storage,
storage_enc_version=storage_enc_version,
encrypt_keystore=encrypt_keystore)
self.terminate()
def create_storage(self, path) -> Tuple[WalletStorage, WalletDB]:
if os.path.exists(path):
raise Exception('file already exists at path')
assert self.pw_args, f"pw_args not set?!"
pw_args = self.pw_args
self.pw_args = None # clean-up so that it can get GC-ed
storage = WalletStorage(path)
if pw_args.encrypt_storage:
storage.set_password(pw_args.password, enc_version=pw_args.storage_enc_version)
db = WalletDB('', manual_upgrades=False)
db.set_keystore_encryption(bool(pw_args.password) and pw_args.encrypt_keystore)
for key, value in self.data.items():
db.put(key, value)
db.load_plugins()
db.write(storage)
return storage, db
def terminate(self, *, storage: WalletStorage = None,
db: WalletDB = None,
aborted: bool = False) -> None:
raise NotImplementedError() # implemented by subclasses
def show_xpub_and_add_cosigners(self, xpub):
self.show_xpub_dialog(xpub=xpub, run_next=lambda x: self.run('choose_keystore'))
def choose_seed_type(self):
seed_type = 'standard' if self.config.WIZARD_DONT_CREATE_SEGWIT else 'segwit'
self.create_seed(seed_type)
def create_seed(self, seed_type):
from . import mnemonic
self.seed_type = seed_type
seed = mnemonic.Mnemonic('en').make_seed(seed_type=self.seed_type)
self.opt_bip39 = False
self.opt_ext = True
self.opt_slip39 = False
f = lambda x: self.request_passphrase(seed, x)
self.show_seed_dialog(run_next=f, seed_text=seed)
def request_passphrase(self, seed, opt_passphrase):
if opt_passphrase:
f = lambda x: self.confirm_seed(seed, x)
self.passphrase_dialog(run_next=f)
else:
self.run('confirm_seed', seed, '')
def confirm_seed(self, seed, passphrase):
f = lambda x: self.confirm_passphrase(seed, passphrase)
self.confirm_seed_dialog(
run_next=f,
seed=seed if self.config.get('debug_seed') else '',
test=lambda x: mnemonic.is_matching_seed(seed=seed, seed_again=x),
)
def confirm_passphrase(self, seed, passphrase):
f = lambda x: self.run('create_keystore', seed, x)
if passphrase:
title = _('Confirm Seed Extension')
message = '\n'.join([
_('Your seed extension must be saved together with your seed.'),
_('Please type it here.'),
])
self.line_dialog(run_next=f, title=title, message=message, default='', test=lambda x: x==passphrase)
else:
f('')
def show_error(self, msg: Union[str, BaseException]) -> None:
raise NotImplementedError()