Files
pallectrum/electrum/gui/text.py
SomberNight 9a38c4d2a1 logging: don't lose log messages during early startup
Previously, during early-startup (until configure_logging(config) is
called in run_electrum),
- the stderr log handler lost all log messages below warning level, and
- the file log handler lost all log messages regardless of log level

We now instead start buffering log messages in memory as soon as
electrum.logging is imported. The buffer is dumped into the
stderr and file log handlers when they are fully set up, and then
the buffer is discarded (and the temporary log handler is removed).

Note that messages are not logged until configure_logging() is called.
Previously WARNING/ERROR messages would get logged immediately to stderr,
but not anymore. This was changed so that the order of the log messages
can be kept intact. (if we log WARNINGs immediately, but need to delay
INFOs until the config is available, messages would either be out of order
or alternatively there would be duplicates)

Relatedly, we now follow the recommendation of the python docs re
logging for libraries [0]: we try to only configure logging if running via
run_electrum (the main script) and not when e.g. a 3rd party script
imports electrum.

[0]: https://docs.python.org/3/howto/logging.html#configuring-logging-for-a-library
2021-04-14 19:14:26 +02:00

546 lines
20 KiB
Python

import tty
import sys
import curses
import datetime
import locale
from decimal import Decimal
import getpass
import logging
from typing import TYPE_CHECKING
import electrum
from electrum import util
from electrum.util import format_satoshis
from electrum.bitcoin import is_address, COIN
from electrum.transaction import PartialTxOutput
from electrum.wallet import Wallet
from electrum.wallet_db import WalletDB
from electrum.storage import WalletStorage
from electrum.network import NetworkParameters, TxBroadcastError, BestEffortRequestFailed
from electrum.interface import ServerAddr
if TYPE_CHECKING:
from electrum.daemon import Daemon
from electrum.simple_config import SimpleConfig
from electrum.plugin import Plugins
_ = lambda x:x # i18n
class ElectrumGui:
def __init__(self, config: 'SimpleConfig', daemon: 'Daemon', plugins: 'Plugins'):
self.config = config
self.network = daemon.network
storage = WalletStorage(config.get_wallet_path())
if not storage.file_exists():
print("Wallet not found. try 'electrum create'")
exit()
if storage.is_encrypted():
password = getpass.getpass('Password:', stream=None)
storage.decrypt(password)
db = WalletDB(storage.read(), manual_upgrades=False)
self.wallet = Wallet(db, storage, config=config)
self.wallet.start_network(self.network)
self.contacts = self.wallet.contacts
locale.setlocale(locale.LC_ALL, '')
self.encoding = locale.getpreferredencoding()
self.stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(2, curses.COLOR_WHITE, curses.COLOR_CYAN)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
self.stdscr.keypad(1)
self.stdscr.border(0)
self.maxy, self.maxx = self.stdscr.getmaxyx()
self.set_cursor(0)
self.w = curses.newwin(10, 50, 5, 5)
self.tab = 0
self.pos = 0
self.popup_pos = 0
self.str_recipient = ""
self.str_description = ""
self.str_amount = ""
self.str_fee = ""
self.history = None
self.txid = []
util.register_callback(self.update, ['wallet_updated', 'network_updated'])
self.tab_names = [_("History"), _("Send"), _("Receive"), _("Addresses"), _("Contacts"), _("Banner")]
self.num_tabs = len(self.tab_names)
def set_cursor(self, x):
try:
curses.curs_set(x)
except Exception:
pass
def restore_or_create(self):
pass
def verify_seed(self):
pass
def get_string(self, y, x):
self.set_cursor(1)
curses.echo()
self.stdscr.addstr(y, x, " "*20, curses.A_REVERSE)
s = self.stdscr.getstr(y,x)
curses.noecho()
self.set_cursor(0)
return s
def update(self, event, *args):
self.update_history()
if self.tab == 0:
self.print_history()
self.refresh()
def print_history(self):
width = [20, 40, 14, 14]
delta = (self.maxx - sum(width) - 4)/3
format_str = "%"+"%d"%width[0]+"s"+"%"+"%d"%(width[1]+delta)+"s"+"%"+"%d"%(width[2]+delta)+"s"+"%"+"%d"%(width[3]+delta)+"s"
if self.history is None:
self.update_history()
self.print_list(self.history[::-1], format_str%(_("Date"), _("Description"), _("Amount"), _("Balance")))
def update_history(self):
width = [20, 40, 14, 14]
delta = (self.maxx - sum(width) - 4)/3
format_str = "%"+"%d"%width[0]+"s"+"%"+"%d"%(width[1]+delta)+"s"+"%"+"%d"%(width[2]+delta)+"s"+"%"+"%d"%(width[3]+delta)+"s"
b = 0
self.history = []
self.txid = []
for hist_item in self.wallet.get_history():
if hist_item.tx_mined_status.conf:
timestamp = hist_item.tx_mined_status.timestamp
try:
time_str = datetime.datetime.fromtimestamp(timestamp).isoformat(' ')[:-3]
except Exception:
time_str = "------"
else:
time_str = 'unconfirmed'
label = self.wallet.get_label_for_txid(hist_item.txid)
self.txid.insert(0, hist_item.txid)
if len(label) > 40:
label = label[0:37] + '...'
self.history.append(format_str % (time_str, label, format_satoshis(hist_item.delta, whitespaces=True),
format_satoshis(hist_item.balance, whitespaces=True)))
def print_balance(self):
if not self.network:
msg = _("Offline")
elif self.network.is_connected():
if not self.wallet.up_to_date:
msg = _("Synchronizing...")
else:
c, u, x = self.wallet.get_balance()
msg = _("Balance")+": %f "%(Decimal(c) / COIN)
if u:
msg += " [%f unconfirmed]"%(Decimal(u) / COIN)
if x:
msg += " [%f unmatured]"%(Decimal(x) / COIN)
else:
msg = _("Not connected")
self.stdscr.addstr(self.maxy -1, 3, msg)
for i in range(self.num_tabs):
self.stdscr.addstr(0, 2 + 2*i + len(''.join(self.tab_names[0:i])), ' '+self.tab_names[i]+' ', curses.A_BOLD if self.tab == i else 0)
self.stdscr.addstr(self.maxy -1, self.maxx-30, ' '.join([_("Settings"), _("Network"), _("Quit")]))
def print_receive(self):
addr = self.wallet.get_receiving_address()
self.stdscr.addstr(2, 1, "Address: "+addr)
self.print_qr(addr)
def print_contacts(self):
messages = map(lambda x: "%20s %45s "%(x[0], x[1][1]), self.contacts.items())
self.print_list(messages, "%19s %15s "%("Key", "Value"))
def print_addresses(self):
fmt = "%-35s %-30s"
messages = map(lambda addr: fmt % (addr, self.wallet.get_label(addr)), self.wallet.get_addresses())
self.print_list(messages, fmt % ("Address", "Label"))
def print_edit_line(self, y, label, text, index, size):
text += " "*(size - len(text))
self.stdscr.addstr(y, 2, label)
self.stdscr.addstr(y, 15, text, curses.A_REVERSE if self.pos%6==index else curses.color_pair(1))
def print_send_tab(self):
self.stdscr.clear()
self.print_edit_line(3, _("Pay to"), self.str_recipient, 0, 40)
self.print_edit_line(5, _("Description"), self.str_description, 1, 40)
self.print_edit_line(7, _("Amount"), self.str_amount, 2, 15)
self.print_edit_line(9, _("Fee"), self.str_fee, 3, 15)
self.stdscr.addstr(12, 15, _("[Send]"), curses.A_REVERSE if self.pos%6==4 else curses.color_pair(2))
self.stdscr.addstr(12, 25, _("[Clear]"), curses.A_REVERSE if self.pos%6==5 else curses.color_pair(2))
self.maxpos = 6
def print_banner(self):
if self.network and self.network.banner:
banner = self.network.banner
banner = banner.replace('\r', '')
self.print_list(banner.split('\n'))
def print_qr(self, data):
import qrcode
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
s = StringIO()
self.qr = qrcode.QRCode()
self.qr.add_data(data)
self.qr.print_ascii(out=s, invert=False)
msg = s.getvalue()
lines = msg.split('\n')
try:
for i, l in enumerate(lines):
l = l.encode("utf-8")
self.stdscr.addstr(i+5, 5, l, curses.color_pair(3))
except curses.error:
m = 'error. screen too small?'
m = m.encode(self.encoding)
self.stdscr.addstr(5, 1, m, 0)
def print_list(self, lst, firstline = None):
lst = list(lst)
self.maxpos = len(lst)
if not self.maxpos: return
if firstline:
firstline += " "*(self.maxx -2 - len(firstline))
self.stdscr.addstr(1, 1, firstline)
for i in range(self.maxy-4):
msg = lst[i] if i < len(lst) else ""
msg += " "*(self.maxx - 2 - len(msg))
m = msg[0:self.maxx - 2]
m = m.encode(self.encoding)
self.stdscr.addstr(i+2, 1, m, curses.A_REVERSE if i == (self.pos % self.maxpos) else 0)
def refresh(self):
if self.tab == -1: return
self.stdscr.border(0)
self.print_balance()
self.stdscr.refresh()
def main_command(self):
c = self.stdscr.getch()
print(c)
cc = curses.unctrl(c).decode()
if c == curses.KEY_RIGHT: self.tab = (self.tab + 1)%self.num_tabs
elif c == curses.KEY_LEFT: self.tab = (self.tab - 1)%self.num_tabs
elif c == curses.KEY_DOWN: self.pos +=1
elif c == curses.KEY_UP: self.pos -= 1
elif c == 9: self.pos +=1 # tab
elif cc in ['^W', '^C', '^X', '^Q']: self.tab = -1
elif cc in ['^N']: self.network_dialog()
elif cc == '^S': self.settings_dialog()
else: return c
if self.pos<0: self.pos=0
if self.pos>=self.maxpos: self.pos=self.maxpos - 1
def run_tab(self, i, print_func, exec_func):
while self.tab == i:
self.stdscr.clear()
print_func()
self.refresh()
c = self.main_command()
if c: exec_func(c)
def run_history_tab(self, c):
# Get txid from cursor position
if c == 10:
out = self.run_popup('', ['Transaction ID:', self.txid[self.pos]])
def edit_str(self, target, c, is_num=False):
# detect backspace
cc = curses.unctrl(c).decode()
if c in [8, 127, 263] and target:
target = target[:-1]
elif not is_num or cc in '0123456789.':
target += cc
return target
def run_send_tab(self, c):
if self.pos%6 == 0:
self.str_recipient = self.edit_str(self.str_recipient, c)
if self.pos%6 == 1:
self.str_description = self.edit_str(self.str_description, c)
if self.pos%6 == 2:
self.str_amount = self.edit_str(self.str_amount, c, True)
elif self.pos%6 == 3:
self.str_fee = self.edit_str(self.str_fee, c, True)
elif self.pos%6==4:
if c == 10: self.do_send()
elif self.pos%6==5:
if c == 10: self.do_clear()
def run_receive_tab(self, c):
if c == 10:
out = self.run_popup('Address', ["Edit label", "Freeze", "Prioritize"])
def run_contacts_tab(self, c):
if c == 10 and self.contacts:
out = self.run_popup('Address', ["Copy", "Pay to", "Edit label", "Delete"]).get('button')
key = list(self.contacts.keys())[self.pos%len(self.contacts.keys())]
if out == "Pay to":
self.tab = 1
self.str_recipient = key
self.pos = 2
elif out == "Edit label":
s = self.get_string(6 + self.pos, 18)
if s:
self.wallet.set_label(key, s)
def run_banner_tab(self, c):
self.show_message(repr(c))
pass
def main(self):
tty.setraw(sys.stdin)
try:
while self.tab != -1:
self.run_tab(0, self.print_history, self.run_history_tab)
self.run_tab(1, self.print_send_tab, self.run_send_tab)
self.run_tab(2, self.print_receive, self.run_receive_tab)
self.run_tab(3, self.print_addresses, self.run_banner_tab)
self.run_tab(4, self.print_contacts, self.run_contacts_tab)
self.run_tab(5, self.print_banner, self.run_banner_tab)
except curses.error as e:
raise Exception("Error with curses. Is your screen too small?") from e
finally:
tty.setcbreak(sys.stdin)
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()
def stop(self):
pass
def do_clear(self):
self.str_amount = ''
self.str_recipient = ''
self.str_fee = ''
self.str_description = ''
def do_send(self):
if not is_address(self.str_recipient):
self.show_message(_('Invalid Bitcoin address'))
return
try:
amount = int(Decimal(self.str_amount) * COIN)
except Exception:
self.show_message(_('Invalid Amount'))
return
try:
fee = int(Decimal(self.str_fee) * COIN)
except Exception:
self.show_message(_('Invalid Fee'))
return
if self.wallet.has_password():
password = self.password_dialog()
if not password:
return
else:
password = None
try:
tx = self.wallet.mktx(outputs=[PartialTxOutput.from_address_and_value(self.str_recipient, amount)],
password=password,
fee=fee)
except Exception as e:
self.show_message(repr(e))
return
if self.str_description:
self.wallet.set_label(tx.txid(), self.str_description)
self.show_message(_("Please wait..."), getchar=False)
try:
self.network.run_from_another_thread(self.network.broadcast_transaction(tx))
except TxBroadcastError as e:
msg = e.get_message_for_gui()
self.show_message(msg)
except BestEffortRequestFailed as e:
msg = repr(e)
self.show_message(msg)
else:
self.show_message(_('Payment sent.'))
self.do_clear()
#self.update_contacts_tab()
def show_message(self, message, getchar = True):
w = self.w
w.clear()
w.border(0)
for i, line in enumerate(message.split('\n')):
w.addstr(2+i,2,line)
w.refresh()
if getchar: c = self.stdscr.getch()
def run_popup(self, title, items):
return self.run_dialog(title, list(map(lambda x: {'type':'button','label':x}, items)), interval=1, y_pos = self.pos+3)
def network_dialog(self):
if not self.network:
return
net_params = self.network.get_parameters()
server_addr = net_params.server
proxy_config, auto_connect = net_params.proxy, net_params.auto_connect
srv = 'auto-connect' if auto_connect else str(self.network.default_server)
out = self.run_dialog('Network', [
{'label':'server', 'type':'str', 'value':srv},
{'label':'proxy', 'type':'str', 'value':self.config.get('proxy', '')},
], buttons = 1)
if out:
if out.get('server'):
server_str = out.get('server')
auto_connect = server_str == 'auto-connect'
if not auto_connect:
try:
server_addr = ServerAddr.from_str(server_str)
except Exception:
self.show_message("Error:" + server_str + "\nIn doubt, type \"auto-connect\"")
return False
if out.get('server') or out.get('proxy'):
proxy = electrum.network.deserialize_proxy(out.get('proxy')) if out.get('proxy') else proxy_config
net_params = NetworkParameters(server=server_addr,
proxy=proxy,
auto_connect=auto_connect)
self.network.run_from_another_thread(self.network.set_parameters(net_params))
def settings_dialog(self):
fee = str(Decimal(self.config.fee_per_kb()) / COIN)
out = self.run_dialog('Settings', [
{'label':'Default fee', 'type':'satoshis', 'value': fee}
], buttons = 1)
if out:
if out.get('Default fee'):
fee = int(Decimal(out['Default fee']) * COIN)
self.config.set_key('fee_per_kb', fee, True)
def password_dialog(self):
out = self.run_dialog('Password', [
{'label':'Password', 'type':'password', 'value':''}
], buttons = 1)
return out.get('Password')
def run_dialog(self, title, items, interval=2, buttons=None, y_pos=3):
self.popup_pos = 0
self.w = curses.newwin(5 + len(list(items))*interval + (2 if buttons else 0), 68, y_pos, 5)
w = self.w
out = {}
while True:
w.clear()
w.border(0)
w.addstr(0, 2, title)
num = len(list(items))
numpos = num
if buttons: numpos += 2
for i in range(num):
item = items[i]
label = item.get('label')
if item.get('type') == 'list':
value = item.get('value','')
elif item.get('type') == 'satoshis':
value = item.get('value','')
elif item.get('type') == 'str':
value = item.get('value','')
elif item.get('type') == 'password':
value = '*'*len(item.get('value',''))
else:
value = ''
if value is None:
value = ''
if len(value)<20:
value += ' '*(20-len(value))
if 'value' in item:
w.addstr(2+interval*i, 2, label)
w.addstr(2+interval*i, 15, value, curses.A_REVERSE if self.popup_pos%numpos==i else curses.color_pair(1))
else:
w.addstr(2+interval*i, 2, label, curses.A_REVERSE if self.popup_pos%numpos==i else 0)
if buttons:
w.addstr(5+interval*i, 10, "[ ok ]", curses.A_REVERSE if self.popup_pos%numpos==(numpos-2) else curses.color_pair(2))
w.addstr(5+interval*i, 25, "[cancel]", curses.A_REVERSE if self.popup_pos%numpos==(numpos-1) else curses.color_pair(2))
w.refresh()
c = self.stdscr.getch()
if c in [ord('q'), 27]: break
elif c in [curses.KEY_LEFT, curses.KEY_UP]: self.popup_pos -= 1
elif c in [curses.KEY_RIGHT, curses.KEY_DOWN]: self.popup_pos +=1
else:
i = self.popup_pos%numpos
if buttons and c==10:
if i == numpos-2:
return out
elif i == numpos -1:
return {}
item = items[i]
_type = item.get('type')
if _type == 'str':
item['value'] = self.edit_str(item['value'], c)
out[item.get('label')] = item.get('value')
elif _type == 'password':
item['value'] = self.edit_str(item['value'], c)
out[item.get('label')] = item ['value']
elif _type == 'satoshis':
item['value'] = self.edit_str(item['value'], c, True)
out[item.get('label')] = item.get('value')
elif _type == 'list':
choices = item.get('choices')
try:
j = choices.index(item.get('value'))
except Exception:
j = 0
new_choice = choices[(j + 1)% len(choices)]
item['value'] = new_choice
out[item.get('label')] = item.get('value')
elif _type == 'button':
out['button'] = item.get('label')
break
return out