193 lines
5.8 KiB
Python
193 lines
5.8 KiB
Python
import queue
|
|
import sys
|
|
from typing import Optional, NamedTuple, Callable
|
|
import os.path
|
|
|
|
from PyQt6 import QtGui
|
|
from PyQt6.QtCore import Qt, QThread, pyqtSignal
|
|
from PyQt6.QtGui import QColor, QPen, QPaintDevice, QFontDatabase, QImage
|
|
import qrcode
|
|
|
|
from electrum.i18n import _
|
|
from electrum.logging import Logger
|
|
|
|
_cached_font_ids: dict[str, int] = {}
|
|
|
|
|
|
def get_font_id(filename: str) -> int:
|
|
font_id = _cached_font_ids.get(filename)
|
|
if font_id is not None:
|
|
return font_id
|
|
# font_id will be negative on error
|
|
font_id = QFontDatabase.addApplicationFont(
|
|
os.path.join(os.path.dirname(__file__), '..', 'fonts', filename)
|
|
)
|
|
_cached_font_ids[filename] = font_id
|
|
return font_id
|
|
|
|
|
|
def draw_qr(
|
|
*,
|
|
qr: Optional[qrcode.main.QRCode],
|
|
paint_device: QPaintDevice, # target to paint on
|
|
is_enabled: bool = True,
|
|
min_boxsize: int = 2, # min size in pixels of single black/white unit box of the qr code
|
|
) -> None:
|
|
"""Draw 'qr' onto 'paint_device'.
|
|
- qr.box_size is ignored. We will calculate our own boxsize to fill the whole size of paint_device.
|
|
- qr.border is respected.
|
|
"""
|
|
black = QColor(0, 0, 0, 255)
|
|
grey = QColor(196, 196, 196, 255)
|
|
white = QColor(255, 255, 255, 255)
|
|
black_pen = QPen(black) if is_enabled else QPen(grey)
|
|
black_pen.setJoinStyle(Qt.PenJoinStyle.MiterJoin)
|
|
|
|
if not qr:
|
|
qp = QtGui.QPainter()
|
|
qp.begin(paint_device)
|
|
qp.setBrush(white)
|
|
qp.setPen(white)
|
|
r = qp.viewport()
|
|
qp.drawRect(0, 0, r.width(), r.height())
|
|
qp.end()
|
|
return
|
|
|
|
# note: next line can raise qrcode.exceptions.DataOverflowError (or ValueError)
|
|
matrix = qr.get_matrix() # includes qr.border
|
|
k = len(matrix)
|
|
qp = QtGui.QPainter()
|
|
qp.begin(paint_device)
|
|
r = qp.viewport()
|
|
framesize = min(r.width(), r.height())
|
|
boxsize = int(framesize / k)
|
|
if boxsize < min_boxsize:
|
|
# The amount of data is still within what can fit into a QR code,
|
|
# however we don't have enough pixels to draw it.
|
|
qp.setBrush(white)
|
|
qp.setPen(white)
|
|
qp.drawRect(0, 0, r.width(), r.height())
|
|
qp.setBrush(black)
|
|
qp.setPen(black)
|
|
qp.drawText(0, 20, _("Cannot draw QR code") + ":")
|
|
qp.drawText(0, 40, _("Not enough space available."))
|
|
qp.end()
|
|
return
|
|
size = k * boxsize
|
|
left = (framesize - size) / 2
|
|
top = (framesize - size) / 2
|
|
# Draw white background with margin
|
|
qp.setBrush(white)
|
|
qp.setPen(white)
|
|
qp.drawRect(0, 0, framesize, framesize)
|
|
# Draw qr code
|
|
qp.setBrush(black if is_enabled else grey)
|
|
qp.setPen(black_pen)
|
|
for r in range(k):
|
|
for c in range(k):
|
|
if matrix[r][c]:
|
|
qp.drawRect(
|
|
int(left + c * boxsize), int(top + r * boxsize),
|
|
boxsize - 1, boxsize - 1)
|
|
qp.end()
|
|
|
|
|
|
def paintQR(data) -> Optional[QImage]:
|
|
if not data:
|
|
return None
|
|
|
|
# Create QR code
|
|
qr = qrcode.QRCode()
|
|
qr.add_data(data)
|
|
|
|
# Create a QImage to draw on
|
|
matrix = qr.get_matrix()
|
|
k = len(matrix)
|
|
boxsize = 5
|
|
size = k * boxsize
|
|
|
|
# Create the image with appropriate size
|
|
base_img = QImage(size, size, QImage.Format.Format_ARGB32)
|
|
|
|
# Use draw_qr to paint on the image
|
|
draw_qr(
|
|
qr=qr,
|
|
paint_device=base_img,
|
|
is_enabled=True,
|
|
min_boxsize=boxsize
|
|
)
|
|
|
|
return base_img
|
|
|
|
|
|
class TaskThread(QThread, Logger):
|
|
"""Thread that runs background tasks. Callbacks are guaranteed
|
|
to happen in the context of its parent."""
|
|
|
|
class Task(NamedTuple):
|
|
task: Callable
|
|
cb_success: Optional[Callable]
|
|
cb_done: Optional[Callable]
|
|
cb_error: Optional[Callable]
|
|
cancel: Optional[Callable] = None
|
|
|
|
doneSig = pyqtSignal(object, object, object)
|
|
|
|
def __init__(self, parent, on_error=None):
|
|
QThread.__init__(self, parent)
|
|
Logger.__init__(self)
|
|
self.on_error = on_error
|
|
self.tasks = queue.Queue()
|
|
self._cur_task = None # type: Optional[TaskThread.Task]
|
|
self._stopping = False
|
|
self.doneSig.connect(self.on_done)
|
|
self.start()
|
|
|
|
def add(self, task, on_success=None, on_done=None, on_error=None, *, cancel=None):
|
|
if self._stopping:
|
|
self.logger.warning(f"stopping or already stopped but tried to add new task.")
|
|
return
|
|
on_error = on_error or self.on_error
|
|
task_ = TaskThread.Task(task, on_success, on_done, on_error, cancel=cancel)
|
|
self.tasks.put(task_)
|
|
|
|
def run(self):
|
|
while True:
|
|
if self._stopping:
|
|
break
|
|
task = self.tasks.get() # type: TaskThread.Task
|
|
self._cur_task = task
|
|
if not task or self._stopping:
|
|
break
|
|
try:
|
|
result = task.task()
|
|
self.doneSig.emit(result, task.cb_done, task.cb_success)
|
|
except BaseException:
|
|
self.doneSig.emit(sys.exc_info(), task.cb_done, task.cb_error)
|
|
|
|
def on_done(self, result, cb_done, cb_result):
|
|
# This runs in the parent's thread.
|
|
if cb_done:
|
|
cb_done()
|
|
if cb_result:
|
|
cb_result(result)
|
|
|
|
def stop(self):
|
|
self._stopping = True
|
|
# try to cancel currently running task now.
|
|
# if the task does not implement "cancel", we will have to wait until it finishes.
|
|
task = self._cur_task
|
|
if task and task.cancel:
|
|
task.cancel()
|
|
# cancel the remaining tasks in the queue
|
|
while True:
|
|
try:
|
|
task = self.tasks.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
if task and task.cancel:
|
|
task.cancel()
|
|
self.tasks.put(None) # in case the thread is still waiting on the queue
|
|
self.exit()
|
|
self.wait()
|