rm bip70 support

- could not find a single project that still actually cares about bip70 [0]
    - well except maybe BitPay.
        - but I cannot test with BitPay:
            - they have a testnet3 staging environment on test.bitpay.com
                - but the SSL cert they use for bip70 has expired in 2021
                - the webUI probably also has not been updated since then...
                    - they claim to have added LN support in 2022 in a blog post,
                        but it's not there on test.bitpay.com
            - on mainnet, they require KYC before payment
                - < ... angry noises >
            - their loss then, I don't care.
- this is code that no one wants to maintain

- this does not yet delete the signed bip70 payment data for historical txs
    - but it is no longer possible to export it from the GUI

[0]: https://bitcoinops.org/en/topics/bip70-payment-protocol/
This commit is contained in:
SomberNight
2026-03-20 15:51:07 +00:00
parent baf9a1d976
commit 42ad18b216
20 changed files with 43 additions and 1381 deletions
-2
View File
@@ -53,5 +53,3 @@ The differences are as follows:
- the normal tarball includes compiled (.mo) locale files, the source-only tarball does not.
Both tarballs contain (.po) source locale files. If you are packaging for a Linux distro,
you probably want to compile the .mo locale files yourself (see `contrib/locale/build_locale.sh`).
- the normal tarball includes generated `*_pb2.py` files. These are created
using `protobuf-compiler` from `.proto` files (see `contrib/generate_payreqpb2.sh`)
-5
View File
@@ -35,11 +35,6 @@ info "preparing electrum-locale."
fi
)
if ([ "$OMIT_UNCLEAN_FILES" = 1 ]); then
# FIXME side-effecting repo... though in practice, this script probably runs in fresh_clone
rm -f "$PROJECT_ROOT/electrum/paymentrequest_pb2.py"
fi
(
cd "$PROJECT_ROOT"
-15
View File
@@ -1,15 +0,0 @@
#!/bin/bash
# Generates the file paymentrequest_pb2.py
set -e
CONTRIB="$(dirname "$(readlink -e "$0")")"
EL="$CONTRIB"/../electrum
if ! which protoc > /dev/null 2>&1; then
echo "Please install 'protoc'"
echo "If you're on Debian, try 'sudo apt install protobuf-compiler'?"
exit 1
fi
protoc --proto_path="$EL" --python_out="$EL" "$EL"/paymentrequest.proto
+1 -1
View File
@@ -2452,7 +2452,7 @@ def get_parser():
subparsers = parser.add_subparsers(dest='cmd', metavar='<command>')
# gui
parser_gui = subparsers.add_parser('gui', description="Run Electrum's Graphical User Interface.", help="Run GUI (default)")
parser_gui.add_argument("url", nargs='?', default=None, help="bitcoin URI (or bip70 file)")
parser_gui.add_argument("url", nargs='?', default=None, help="bitcoin URI")
parser_gui.add_argument("-g", "--gui", dest=SimpleConfig.GUI_NAME.key(), help="select graphical user interface", choices=['qt', 'text', 'stdio', 'qml'])
parser_gui.add_argument("-m", action="store_true", dest=SimpleConfig.GUI_QT_HIDE_ON_STARTUP.key(), default=False, help="hide GUI on startup")
parser_gui.add_argument("-L", "--lang", dest=SimpleConfig.LOCALIZATION_LANGUAGE.key(), default=None, help="default language used in GUI")
+5 -25
View File
@@ -16,7 +16,6 @@ from electrum.transaction import PartialTxOutput, TxOutput
from electrum.lnutil import format_short_channel_id
from electrum.lnurl import LNURL6Data
from electrum.bitcoin import COIN, address_to_script
from electrum.paymentrequest import PaymentRequest
from electrum.payment_identifier import PaymentIdentifier, PaymentIdentifierState, PaymentIdentifierType
from electrum.network import Network
from electrum.util import event_listener
@@ -500,28 +499,13 @@ class QEInvoiceParser(QEInvoice):
self._effectiveInvoice = None
self.invoiceChanged.emit()
def create_onchain_invoice(self, outputs, message, payment_request, uri):
def create_onchain_invoice(self, *, outputs, message, uri):
return self._wallet.wallet.create_invoice(
outputs=outputs,
message=message,
pr=payment_request,
URI=uri
URI=uri,
)
def _bip70_payment_request_resolved(self, pr: 'PaymentRequest'):
self._logger.debug('resolved payment request')
if Network.run_from_another_thread(pr.verify()):
invoice = Invoice.from_bip70_payreq(pr, height=0)
if self._wallet.wallet.get_invoice_status(invoice) == PR_PAID:
self.validationError.emit('unknown', _('Invoice already paid'))
elif pr.has_expired():
self.validationError.emit('unknown', _('Payment request has expired'))
else:
self.setValidOnchainInvoice(invoice)
self.validationSuccess.emit()
else:
self.validationError.emit('unknown', f'invoice error:\n{pr.error}')
def validateRecipient(self, pi: PaymentIdentifier):
if not pi:
self.setInvoiceType(QEInvoice.Type.Invalid)
@@ -530,7 +514,7 @@ class QEInvoiceParser(QEInvoice):
self._pi = pi
if not self._pi.is_valid() or self._pi.type not in [
PaymentIdentifierType.SPK, PaymentIdentifierType.BIP21,
PaymentIdentifierType.BIP70, PaymentIdentifierType.BOLT11,
PaymentIdentifierType.BOLT11,
PaymentIdentifierType.LNADDR, PaymentIdentifierType.LNURLP,
PaymentIdentifierType.EMAILLIKE, PaymentIdentifierType.DOMAINLIKE,
PaymentIdentifierType.OPENALIAS,
@@ -556,14 +540,10 @@ class QEInvoiceParser(QEInvoice):
self.on_lnurl_pay(self._pi.lnurl_data)
return
if self._pi.type == PaymentIdentifierType.BIP70:
self._bip70_payment_request_resolved(self._pi.bip70_data)
return
if self._pi.is_available():
if self._pi.type in [PaymentIdentifierType.SPK, PaymentIdentifierType.OPENALIAS]:
outputs = [PartialTxOutput(scriptpubkey=self._pi.spk, value=0)]
invoice = self.create_onchain_invoice(outputs, None, None, None)
invoice = self.create_onchain_invoice(outputs=outputs, message=None, uri=None)
self._logger.debug(repr(invoice))
self.setValidOnchainInvoice(invoice)
self.validationSuccess.emit()
@@ -597,7 +577,7 @@ class QEInvoiceParser(QEInvoice):
outputs = [PartialTxOutput.from_address_and_value(bip21['address'], amount)]
self._logger.debug(outputs)
message = bip21.get('message', '')
invoice = self.create_onchain_invoice(outputs, message, None, bip21)
invoice = self.create_onchain_invoice(outputs=outputs, message=message, uri=bip21)
self._logger.debug(repr(invoice))
self.setValidOnchainInvoice(invoice)
self.validationSuccess.emit()
-2
View File
@@ -76,8 +76,6 @@ class QEPIResolver(QObject):
msg = _('Could not resolve address')
elif pi.type == PaymentIdentifierType.LNURL:
msg = _('Could not resolve LNURL') + "\n\n" + pi.get_error()
elif pi.type == PaymentIdentifierType.BIP70:
msg = _('Could not resolve BIP70 payment request: {}').format(pi.error)
else:
msg = _('Could not resolve')
self.resolveError.emit('resolve', msg)
-2
View File
@@ -114,8 +114,6 @@ class InvoiceList(MyTreeView):
icon_name = 'lightning.png'
else:
icon_name = 'bitcoin.png'
if item.bip70:
icon_name = 'seal.png'
status = self.wallet.get_invoice_status(item)
amount = item.get_amount_sat()
amount_str = self.main_window.format_amount(amount, whitespaces=True) if amount else ""
+1 -26
View File
@@ -50,7 +50,7 @@ import electrum_ecc as ecc
import electrum
from electrum.gui import messages
from electrum import (keystore, constants, util, bitcoin, commands,
paymentrequest, lnutil)
lnutil)
from electrum.bitcoin import COIN, is_address, DummyAddress
from electrum.plugin import run_hook
from electrum.i18n import _
@@ -1667,31 +1667,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
if invoice.exp:
grid.addWidget(QLabel(_("Expires") + ':'), 4, 0)
grid.addWidget(QLabel(format_time(invoice.exp + invoice.time)), 4, 1)
if invoice.bip70:
pr = paymentrequest.PaymentRequest(bytes.fromhex(invoice.bip70))
Network.run_from_another_thread(pr.verify())
grid.addWidget(QLabel(_("Requestor") + ':'), 5, 0)
grid.addWidget(QLabel(pr.get_requestor()), 5, 1)
grid.addWidget(QLabel(_("Signature") + ':'), 6, 0)
grid.addWidget(QLabel(pr.get_verify_status()), 6, 1)
def do_export():
name = pr.get_name_for_export() or "payment_request"
name = f"{name}.bip70"
fn = getSaveFileName(
parent=self,
title=_("Save invoice to file"),
filename=name,
filter="*.bip70",
config=self.config,
)
if not fn:
return
with open(fn, 'wb') as f:
data = f.write(pr.raw)
self.show_message(_('BIP70 invoice saved as {}').format(fn))
exportButton = EnterButton(_('Export'), do_export)
buttons = Buttons(exportButton, CloseButton(d))
else:
buttons = Buttons(CloseButton(d))
vbox.addLayout(grid)
vbox.addLayout(buttons)
+3 -27
View File
@@ -24,7 +24,7 @@ from electrum.transaction import Transaction, PartialTxInput, PartialTxOutput
from electrum.network import TxBroadcastError, BestEffortRequestFailed
from electrum.payment_identifier import (PaymentIdentifierType, PaymentIdentifier,
invoice_from_payment_identifier,
payment_identifier_from_invoice, PaymentIdentifierState)
PaymentIdentifierState)
from electrum.submarine_swaps import SwapServerError
from electrum.fee_policy import FeePolicy, FixedFeePolicy
from electrum.lnurl import LNURL3Data, request_lnurl_withdraw_callback, LNURLError
@@ -44,7 +44,6 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
resolve_done_signal = pyqtSignal(object)
finalize_done_signal = pyqtSignal(object)
notify_merchant_done_signal = pyqtSignal(object)
def __init__(self, window: 'ElectrumWindow'):
QWidget.__init__(self, window)
@@ -210,7 +209,6 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
self.resolve_done_signal.connect(self.on_resolve_done)
self.finalize_done_signal.connect(self.on_finalize_done)
self.notify_merchant_done_signal.connect(self.on_notify_merchant_done)
self.payto_e.paymentIdentifierChanged.connect(self._handle_payment_identifier)
self.setTabOrder(self.send_button, self.invoice_list)
@@ -392,10 +390,6 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
btn.setEnabled(False)
self.spinner.setVisible(True)
def payment_request_error(self, error):
self.show_message(error)
self.do_clear()
def set_field_validated(self, w, *, validated: Optional[bool] = None):
if validated is not None:
w.setStyleSheet(ColorScheme.GREEN.as_stylesheet(True) if validated else ColorScheme.RED.as_stylesheet(True))
@@ -443,7 +437,7 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
lock_recipient = pi.type in [PaymentIdentifierType.LNURL, PaymentIdentifierType.LNURLW,
PaymentIdentifierType.LNURLP, PaymentIdentifierType.LNADDR,
PaymentIdentifierType.OPENALIAS, PaymentIdentifierType.BIP70,
PaymentIdentifierType.OPENALIAS,
PaymentIdentifierType.BIP21, PaymentIdentifierType.BOLT11] and not pi.need_resolve()
lock_amount = pi.is_amount_locked()
lock_max = lock_amount or pi.type not in [PaymentIdentifierType.SPK, PaymentIdentifierType.BIP21]
@@ -542,7 +536,7 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
if not self.wallet.has_lightning() and not invoice.can_be_paid_onchain():
self.show_error(_('Lightning is disabled'))
if self.wallet.get_invoice_status(invoice) == PR_PAID:
# fixme: this is only for bip70 and lightning
# fixme: this is only for lightning
self.show_error(_('Invoice already paid'))
return
#if not invoice.is_lightning():
@@ -766,16 +760,6 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
except BestEffortRequestFailed as e:
return False, repr(e)
# success
if invoice and invoice.bip70:
payment_identifier = payment_identifier_from_invoice(invoice)
# FIXME: this should move to backend
if payment_identifier and payment_identifier.need_merchant_notify():
refund_address = self.wallet.get_receiving_address()
payment_identifier.notify_merchant(
tx=tx,
refund_address=refund_address,
on_finished=self.notify_merchant_done_signal.emit
)
return True, tx.txid()
# Capture current TL window; override might be removed on return
@@ -800,14 +784,6 @@ class SendTab(QWidget, MessageBoxMixin, Logger):
WaitingDialog(self, _('Broadcasting transaction...'),
broadcast_thread, broadcast_done, self.window.on_error)
def on_notify_merchant_done(self, pi: PaymentIdentifier):
if pi.is_error():
self.logger.debug(f'merchant notify error: {pi.get_error()}')
else:
self.logger.debug(f'merchant notify result: {pi.merchant_ack_status}: {pi.merchant_ack_message}')
# TODO: show user? if we broadcasted the tx successfully, do we care?
# BitPay complains with a NAK if tx is RbF
def toggle_paytomany(self):
self.payto_e.toggle_paytomany()
if self.payto_e.is_paytomany():
+2 -2
View File
@@ -638,8 +638,8 @@ class ElectrumGui(BaseElectrumGui, EventListener):
invoice = self.wallet.create_invoice(
outputs=outputs,
message=self.str_description,
pr=None,
URI=None)
URI=None,
)
else:
self.show_message(_('Invalid Bitcoin address'))
return None
+4 -19
View File
@@ -16,8 +16,6 @@ from .bitcoin import address_to_script
from .transaction import PartialTxOutput
from .crypto import sha256d
if TYPE_CHECKING:
from .paymentrequest import PaymentRequest
# convention: 'invoices' = outgoing , 'request' = incoming
@@ -114,14 +112,15 @@ class BaseInvoice(StoredObject):
# optional fields.
# an request (incoming) can be satisfied onchain, using lightning or using a swap
# an invoice (outgoing) is constructed from a source: bip21, bip70, lnaddr
# an invoice (outgoing) is constructed from a source: bip21, lnaddr
# onchain only
outputs = attr.ib(kw_only=True, converter=_decode_outputs) # type: Optional[List[PartialTxOutput]]
height = attr.ib( # only for receiving
type=int, kw_only=True, validator=attr.validators.instance_of(int), on_setattr=attr.setters.validate)
bip70 = attr.ib(type=str, kw_only=True) # type: Optional[str]
#bip70_requestor = attr.ib(type=str, kw_only=True) # type: Optional[str]
# (unused) historical bip70 invoice data, for BIP70 invoices paid in the past
bip70 = attr.ib(type=str, kw_only=True, default=None) # type: Optional[str]
def is_lightning(self) -> bool:
raise NotImplementedError()
@@ -227,24 +226,10 @@ class BaseInvoice(StoredObject):
time=timestamp,
exp=exp_delay,
outputs=None,
bip70=None,
height=0,
lightning_invoice=invoice,
)
@classmethod
def from_bip70_payreq(cls, pr: 'PaymentRequest', *, height: int = 0) -> 'Invoice':
return Invoice(
amount_msat=pr.get_amount()*1000,
message=pr.get_memo(),
time=pr.get_time(),
exp=pr.get_expiration_date() - pr.get_time(),
outputs=pr.get_outputs(),
bip70=pr.raw.hex(),
height=height,
lightning_invoice=None,
)
def get_id(self) -> str:
if self.is_lightning():
return self.rhash
+6 -128
View File
@@ -22,7 +22,6 @@ from .lnaddr import LnInvoiceException
from .lnutil import IncompatibleOrInsaneFeatures
from .bip21 import parse_bip21_URI, InvalidBitcoinURI, LIGHTNING_URI_SCHEME, BITCOIN_BIP21_URI_SCHEME
from .segwit_addr import bech32_decode
from . import paymentrequest
if TYPE_CHECKING:
from .wallet import Abstract_Wallet
@@ -61,10 +60,6 @@ class PaymentIdentifierState(IntEnum):
NEED_RESOLVE = 3 # PI contains a recognized destination format, but needs an online resolve step
LNURLP_FINALIZE = 4 # PI contains a resolved LNURLp, but needs amount and comment to resolve to a bolt11
LNURLW_FINALIZE = 5 # PI contains resolved LNURLw, user needs to enter amount and initiate withdraw
MERCHANT_NOTIFY = 6 # PI contains a valid payment request and on-chain destination. It should notify
# the merchant payment processor of the tx after on-chain broadcast,
# and supply a refund address (bip70)
MERCHANT_ACK = 7 # PI notified merchant. nothing to be done.
ERROR = 50 # generic error
NOT_FOUND = 51 # PI contains a recognized destination format, but resolve step was unsuccessful
MERCHANT_ERROR = 52 # PI failed notifying the merchant after broadcasting onchain TX
@@ -75,7 +70,6 @@ class PaymentIdentifierType(IntEnum):
UNKNOWN = 0
SPK = 1
BIP21 = 2
BIP70 = 3
MULTILINE = 4
BOLT11 = 5
LNURL = 6 # before the resolve it's unknown if pi is LNURLP or LNURLW
@@ -131,11 +125,6 @@ class PaymentIdentifier(Logger):
self.domainlike = None
self.openalias_data = None
#
self.bip70 = None
self.bip70_data = None
self.merchant_ack_status = None
self.merchant_ack_message = None
#
self.lnurl = None # type: Optional[str]
self.lnurl_data = None # type: Optional[LNURLData]
@@ -159,9 +148,6 @@ class PaymentIdentifier(Logger):
def need_finalize(self):
return self._state == PaymentIdentifierState.LNURLP_FINALIZE
def need_merchant_notify(self):
return self._state == PaymentIdentifierState.MERCHANT_NOTIFY
def is_valid(self):
return self._state not in [PaymentIdentifierState.INVALID, PaymentIdentifierState.EMPTY]
@@ -172,7 +158,7 @@ class PaymentIdentifier(Logger):
return bool(self.lnurl) or bool(self.bolt11)
def is_onchain(self):
if self._type in [PaymentIdentifierType.SPK, PaymentIdentifierType.MULTILINE, PaymentIdentifierType.BIP70,
if self._type in [PaymentIdentifierType.SPK, PaymentIdentifierType.MULTILINE,
PaymentIdentifierType.OPENALIAS]:
return True
if self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.BOLT11, PaymentIdentifierType.LNADDR]:
@@ -189,8 +175,6 @@ class PaymentIdentifier(Logger):
def is_amount_locked(self):
if self._type == PaymentIdentifierType.BIP21:
return bool(self.bip21.get('amount'))
elif self._type == PaymentIdentifierType.BIP70:
return not self.need_resolve() # always fixed after resolve?
elif self._type == PaymentIdentifierType.BOLT11:
return bool(self.bolt11.get_amount_sat())
elif self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.LNADDR]:
@@ -252,11 +236,6 @@ class PaymentIdentifier(Logger):
self.set_state(PaymentIdentifierState.INVALID)
return
self.bip21 = out
self.bip70 = out.get('r')
if self.bip70:
self._type = PaymentIdentifierType.BIP70
self.set_state(PaymentIdentifierState.NEED_RESOLVE)
else:
self._type = PaymentIdentifierType.BIP21
# check optional lightning in bip21, set self.bolt11 if valid
bolt11 = out.get('lightning')
@@ -341,14 +320,6 @@ class PaymentIdentifier(Logger):
self.set_state(PaymentIdentifierState.NOT_FOUND)
else:
self.set_state(PaymentIdentifierState.NOT_FOUND)
elif self.bip70:
pr = await paymentrequest.get_payment_request(self.bip70)
if await pr.verify():
self.bip70_data = pr
self.set_state(PaymentIdentifierState.MERCHANT_NOTIFY)
else:
self.error = pr.error
self.set_state(PaymentIdentifierState.ERROR)
elif self.lnurl:
data = await request_lnurl(self.lnurl)
self.lnurl_data = data
@@ -430,49 +401,8 @@ class PaymentIdentifier(Logger):
if on_finished:
on_finished(self)
def notify_merchant(
self,
*,
tx: 'Transaction',
refund_address: str,
on_finished: Callable[['PaymentIdentifier'], None] = None,
):
assert self._state == PaymentIdentifierState.MERCHANT_NOTIFY
assert tx
assert refund_address
coro = self._do_notify_merchant(tx, refund_address, on_finished=on_finished)
asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop())
@log_exceptions
async def _do_notify_merchant(
self,
tx: 'Transaction',
refund_address: str,
*,
on_finished: Callable[['PaymentIdentifier'], None] = None,
):
try:
if not self.bip70_data:
self.set_state(PaymentIdentifierState.ERROR)
return
ack_status, ack_msg = await self.bip70_data.send_payment_and_receive_paymentack(tx.serialize(), refund_address)
self.logger.info(f"Payment ACK: {ack_status}. Ack message: {ack_msg}")
self.merchant_ack_status = ack_status
self.merchant_ack_message = ack_msg
self.set_state(PaymentIdentifierState.MERCHANT_ACK)
except Exception as e:
self.error = str(e)
self.logger.error(f"_do_notify_merchant() got error: {e!r}")
self.set_state(PaymentIdentifierState.MERCHANT_ERROR)
finally:
if on_finished:
on_finished(self)
def get_onchain_outputs(self, amount):
if self.bip70:
return self.bip70_data.get_outputs()
elif self.multiline_outputs:
if self.multiline_outputs:
return self.multiline_outputs
elif self.spk:
return [PartialTxOutput(scriptpubkey=self.spk, value=amount)]
@@ -605,16 +535,6 @@ class PaymentIdentifier(Logger):
if self.lnurl_data.min_sendable_sat != self.lnurl_data.max_sendable_sat:
amount_range = (self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat)
elif self.bip70 and self.bip70_data:
pr = self.bip70_data
if pr.error:
self.error = pr.error
else:
recipient = pr.get_requestor()
amount = pr.get_amount()
description = pr.get_memo()
validated = not pr.has_expired()
elif self.spk:
pass
@@ -662,9 +582,7 @@ class PaymentIdentifier(Logger):
return None
def has_expired(self):
if self.bip70 and self.bip70_data:
return self.bip70_data.has_expired()
elif self.bolt11:
if self.bolt11:
return self.bolt11.has_expired()
elif self.bip21:
expires = self.bip21.get('exp') + self.bip21.get('time') if self.bip21.get('exp') else 0
@@ -678,7 +596,7 @@ def invoice_from_payment_identifier(
amount_sat: Union[int, str],
message: str = None
) -> Optional[Invoice]:
assert pi.state in [PaymentIdentifierState.AVAILABLE, PaymentIdentifierState.MERCHANT_NOTIFY]
assert pi.state in [PaymentIdentifierState.AVAILABLE,]
assert pi.is_onchain() if amount_sat == '!' else True # MAX should only be allowed if pi has onchain destination
if pi.is_lightning() and not amount_sat == '!':
@@ -691,49 +609,9 @@ def invoice_from_payment_identifier(
else:
outputs = pi.get_onchain_outputs(amount_sat)
message = pi.bip21.get('message') if pi.bip21 else message
bip70_data = pi.bip70_data if pi.bip70 else None
return wallet.create_invoice(
outputs=outputs,
message=message,
pr=bip70_data,
URI=pi.bip21)
URI=pi.bip21,
)
# Note: this is only really used for bip70 to handle MECHANT_NOTIFY state from
# a saved bip70 invoice.
# TODO: reflect bip70-only in function name, or implement other types as well.
def payment_identifier_from_invoice(
wallet: 'Abstract_Wallet',
invoice: Invoice
) -> Optional[PaymentIdentifier]:
if not invoice:
return
pi = PaymentIdentifier(wallet, '')
if invoice.bip70:
pi._type = PaymentIdentifierType.BIP70
pi.bip70_data = paymentrequest.PaymentRequest(bytes.fromhex(invoice.bip70))
pi.set_state(PaymentIdentifierState.MERCHANT_NOTIFY)
return pi
# else:
# if invoice.outputs:
# if len(invoice.outputs) > 1:
# pi._type = PaymentIdentifierType.MULTILINE
# pi.multiline_outputs = invoice.outputs
# pi.set_state(PaymentIdentifierState.AVAILABLE)
# else:
# pi._type = PaymentIdentifierType.BIP21
# params = {}
# if invoice.exp:
# params['exp'] = str(invoice.exp)
# if invoice.time:
# params['time'] = str(invoice.time)
# pi.bip21 = create_bip21_uri(invoice.outputs[0].address, invoice.get_amount_sat(), invoice.message,
# extra_query_params=params)
# pi.set_state(PaymentIdentifierState.AVAILABLE)
# elif invoice.is_lightning():
# pi._type = PaymentIdentifierType.BOLT11
# pi.bolt11 = invoice
# pi.set_state(PaymentIdentifierState.AVAILABLE)
# else:
# return None
# return pi
-47
View File
@@ -1,47 +0,0 @@
//
// Simple Bitcoin Payment Protocol messages
//
// Use fields 1000+ for extensions;
// to avoid conflicts, register extensions via pull-req at
// https://github.com/bitcoin/bips/blob/master/bip-0070/extensions.mediawiki
//
syntax = "proto2";
package payments;
option java_package = "org.bitcoin.protocols.payments";
option java_outer_classname = "Protos";
// Generalized form of "send payment to this/these bitcoin addresses"
message Output {
optional uint64 amount = 1 [default = 0]; // amount is integer-number-of-satoshis
required bytes script = 2; // usually one of the standard Script forms
}
message PaymentDetails {
optional string network = 1 [default = "main"]; // "main" or "test"
repeated Output outputs = 2; // Where payment should be sent
required uint64 time = 3; // Timestamp; when payment request created
optional uint64 expires = 4; // Timestamp; when this request should be considered invalid
optional string memo = 5; // Human-readable description of request for the customer
optional string payment_url = 6; // URL to send Payment and get PaymentACK
optional bytes merchant_data = 7; // Arbitrary data to include in the Payment message
}
message PaymentRequest {
optional uint32 payment_details_version = 1 [default = 1];
optional string pki_type = 2 [default = "none"]; // none / x509+sha256 / x509+sha1
optional bytes pki_data = 3; // depends on pki_type
required bytes serialized_payment_details = 4; // PaymentDetails
optional bytes signature = 5; // pki-dependent signature
}
message X509Certificates {
repeated bytes certificate = 1; // DER-encoded X.509 certificate chain
}
message Payment {
optional bytes merchant_data = 1; // From PaymentDetails.merchant_data
repeated bytes transactions = 2; // Signed transactions that satisfy PaymentDetails.outputs
repeated Output refund_to = 3; // Where to send refunds, if a refund is necessary
optional string memo = 4; // Human-readable message for the merchant
}
message PaymentACK {
required Payment payment = 1; // Payment message that triggered this ACK
optional string memo = 2; // human-readable message for customer
}
-468
View File
@@ -1,468 +0,0 @@
#!/usr/bin/env python
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2014 Thomas Voegtlin
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import hashlib
import sys
import time
from typing import Optional, List, TYPE_CHECKING
import asyncio
import urllib.parse
import certifi
import aiohttp
import electrum_ecc as ecc
try:
from . import paymentrequest_pb2 as pb2
except ImportError:
sys.exit("Error: could not find paymentrequest_pb2.py. Create it with 'contrib/generate_payreqpb2.sh'")
from . import bitcoin, constants, util, transaction, x509, rsakey
from .util import (bfh, make_aiohttp_session, error_text_bytes_to_safe_str, get_running_loop,
get_asyncio_loop)
from .invoices import Invoice, get_id_from_onchain_outputs
from .bitcoin import address_to_script
from .transaction import PartialTxOutput
from .network import Network
from .logging import get_logger
from .contacts import Contacts
if TYPE_CHECKING:
from .simple_config import SimpleConfig
_logger = get_logger(__name__)
REQUEST_HEADERS = {'Accept': 'application/bitcoin-paymentrequest', 'User-Agent': 'Electrum'}
ACK_HEADERS = {'Content-Type': 'application/bitcoin-payment', 'Accept': 'application/bitcoin-paymentack', 'User-Agent': 'Electrum'}
ca_path = certifi.where()
ca_list = None
ca_keyID = None
def load_ca_list():
global ca_list, ca_keyID
if ca_list is None:
ca_list, ca_keyID = x509.load_certificates(ca_path)
async def get_payment_request(url: str) -> 'PaymentRequest':
u = urllib.parse.urlparse(url)
error = None
if u.scheme in ('http', 'https'):
resp_content = None
try:
proxy = Network.get_instance().proxy
async with make_aiohttp_session(proxy, headers=REQUEST_HEADERS) as session:
async with session.get(url) as response:
resp_content = await response.read()
response.raise_for_status()
# Guard against `bitcoin:`-URIs with invalid payment request URLs
if "Content-Type" not in response.headers \
or response.headers["Content-Type"] != "application/bitcoin-paymentrequest":
data = None
error = "payment URL not pointing to a payment request handling server"
else:
data = resp_content
data_len = len(data) if data is not None else None
_logger.info(f'fetched payment request {url} {data_len}')
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
error = f"Error while contacting payment URL: {url}.\nerror type: {type(e)}"
if isinstance(e, aiohttp.ClientResponseError):
error += f"\nGot HTTP status code {e.status}."
if resp_content:
error_text_received = error_text_bytes_to_safe_str(resp_content)
error_text_received = error_text_received[:400]
error_oneline = ' -- '.join(error.split('\n'))
_logger.info(f"{error_oneline} -- [DO NOT TRUST THIS MESSAGE] "
f"{repr(e)} text: {error_text_received}")
data = None
else:
data = None
error = f"Unknown scheme for payment request. URL: {url}"
pr = PaymentRequest(data, error=error)
# do x509/dnssec verification now. we still expect the caller to at least check pr.error!
await pr.verify()
return pr
class PaymentRequest:
def __init__(self, data: bytes, *, error=None):
self.raw = data
self.error = error # type: Optional[str]
self._verified_success = None # caches result of _verify
self._verified_success_msg = None # type: Optional[str]
self._parse(data)
self.requestor = None # known after verify
self.tx = None
def __str__(self):
return str(self.raw)
def _parse(self, r: bytes):
self.outputs = [] # type: List[PartialTxOutput]
if self.error:
return
try:
self.data = pb2.PaymentRequest()
self.data.ParseFromString(r)
except Exception:
self.error = "cannot parse payment request"
return
self.details = pb2.PaymentDetails()
self.details.ParseFromString(self.data.serialized_payment_details)
pr_network = self.details.network
client_network = 'test' if constants.net.TESTNET else 'main'
if pr_network != client_network:
self.error = (f'Payment request network "{pr_network}" does not'
f' match client network "{client_network}".')
return
for o in self.details.outputs:
addr = transaction.get_address_from_output_script(o.script)
if not addr:
# TODO maybe rm restriction but then get_requestor and get_id need changes
self.error = "only addresses are allowed as outputs"
return
self.outputs.append(PartialTxOutput.from_address_and_value(addr, o.amount))
self.memo = self.details.memo
self.payment_url = self.details.payment_url
async def verify(self) -> bool:
# FIXME: we should enforce that this method was called before we attempt payment
# note: this method might do network requests (at least for verify_dnssec)
if self._verified_success is True:
return True
if self.error:
return False
if not self.raw:
self.error = "Empty request"
return False
pr = pb2.PaymentRequest()
try:
pr.ParseFromString(self.raw)
except Exception:
self.error = "Error: Cannot parse payment request"
return False
if not pr.signature:
# the address will be displayed as requestor
self.requestor = None
return True
if pr.pki_type in ["x509+sha256", "x509+sha1"]:
return self.verify_x509(pr)
elif pr.pki_type in ["dnssec+btc", "dnssec+ecdsa"]:
return await self.verify_dnssec(pr)
else:
self.error = "ERROR: Unsupported PKI Type for Message Signature"
return False
def verify_x509(self, paymntreq):
load_ca_list()
if not ca_list:
self.error = "Trusted certificate authorities list not found"
return False
cert = pb2.X509Certificates()
cert.ParseFromString(paymntreq.pki_data)
# verify the chain of certificates
try:
x, ca = verify_cert_chain(cert.certificate)
except BaseException as e:
_logger.exception('')
self.error = str(e)
return False
# get requestor name
self.requestor = x.get_common_name()
if self.requestor.startswith('*.'):
self.requestor = self.requestor[2:]
# verify the BIP70 signature
pubkey0 = rsakey.RSAKey(x.modulus, x.exponent)
sig = paymntreq.signature
paymntreq.signature = b''
s = paymntreq.SerializeToString()
sigBytes = bytearray(sig)
msgBytes = bytearray(s)
if paymntreq.pki_type == "x509+sha256":
hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
verify = pubkey0.verify(sigBytes, x509.PREFIX_RSA_SHA256 + hashBytes)
elif paymntreq.pki_type == "x509+sha1":
verify = pubkey0.hashAndVerify(sigBytes, msgBytes)
else:
self.error = f"ERROR: unknown pki_type {paymntreq.pki_type} in Payment Request"
return False
if not verify:
self.error = "ERROR: Invalid Signature for Payment Request Data"
return False
### SIG Verified
self._verified_success_msg = 'Signed by Trusted CA: ' + ca.get_common_name()
self._verified_success = True
return True
async def verify_dnssec(self, pr):
sig = pr.signature
alias = pr.pki_data
info: dict = await Contacts.resolve_openalias(alias)
if pr.pki_type == "dnssec+btc":
self.requestor = alias
address = info.get('address')
pr.signature = b''
message = pr.SerializeToString()
if bitcoin.verify_usermessage_with_address(address, sig, message):
self._verified_success_msg = 'Verified with DNSSEC'
self._verified_success = True
return True
else:
self.error = "verify failed"
return False
else:
self.error = "unknown algo"
return False
def has_expired(self) -> Optional[bool]:
if not hasattr(self, 'details'):
return None
return self.details.expires and self.details.expires < int(time.time())
def get_time(self):
return self.details.time
def get_expiration_date(self):
return self.details.expires
def get_amount(self):
return sum(map(lambda x:x.value, self.outputs))
def get_address(self):
o = self.outputs[0]
addr = o.address
assert addr
return addr
def get_requestor(self):
return self.requestor if self.requestor else self.get_address()
def get_verify_status(self) -> str:
return (self.error or self._verified_success_msg) if self.requestor else "No Signature"
def get_memo(self):
return self.memo
def get_name_for_export(self) -> Optional[str]:
if not hasattr(self, 'details'):
return None
return get_id_from_onchain_outputs(self.outputs, timestamp=self.get_time())
def get_outputs(self):
return self.outputs[:]
async def send_payment_and_receive_paymentack(self, raw_tx, refund_addr):
pay_det = self.details
if not self.details.payment_url:
return False, "no url"
paymnt = pb2.Payment()
paymnt.merchant_data = pay_det.merchant_data
paymnt.transactions.append(bfh(raw_tx))
ref_out = paymnt.refund_to.add()
ref_out.script = address_to_script(refund_addr)
paymnt.memo = "Paid using Electrum"
pm = paymnt.SerializeToString()
payurl = urllib.parse.urlparse(pay_det.payment_url)
resp_content = None
try:
proxy = Network.get_instance().proxy
async with make_aiohttp_session(proxy, headers=ACK_HEADERS) as session:
async with session.post(payurl.geturl(), data=pm) as response:
resp_content = await response.read()
response.raise_for_status()
try:
paymntack = pb2.PaymentACK()
paymntack.ParseFromString(resp_content)
except Exception:
return False, "PaymentACK could not be processed. Payment was sent; please manually verify that payment was received."
print(f"PaymentACK message received: {paymntack.memo}")
return True, paymntack.memo
except aiohttp.ClientError as e:
error = f"Payment Message/PaymentACK Failed:\nerror type: {type(e)}"
if isinstance(e, aiohttp.ClientResponseError):
error += f"\nGot HTTP status code {e.status}."
if resp_content:
error_text_received = error_text_bytes_to_safe_str(resp_content)
error_text_received = error_text_received[:400]
error_oneline = ' -- '.join(error.split('\n'))
_logger.info(f"{error_oneline} -- [DO NOT TRUST THIS MESSAGE] "
f"{repr(e)} text: {error_text_received}")
return False, error
def make_unsigned_request(req: 'Invoice'):
addr = req.get_address()
time = req.time
exp = req.exp
if time and type(time) != int:
time = 0
if exp and type(exp) != int:
exp = 0
amount = req.get_amount_sat()
if amount is None:
amount = 0
memo = req.message
script = address_to_script(addr)
outputs = [(script, amount)]
pd = pb2.PaymentDetails()
if constants.net.TESTNET:
pd.network = 'test'
for script, amount in outputs:
pd.outputs.add(amount=amount, script=script)
pd.time = time
pd.expires = time + exp if exp else 0
pd.memo = memo
pr = pb2.PaymentRequest()
pr.serialized_payment_details = pd.SerializeToString()
pr.signature = util.to_bytes('')
return pr
def sign_request_with_alias(pr, alias, alias_privkey):
pr.pki_type = 'dnssec+btc'
pr.pki_data = str(alias)
message = pr.SerializeToString()
ec_key = ecc.ECPrivkey(alias_privkey)
compressed = bitcoin.is_compressed_privkey(alias_privkey)
pr.signature = bitcoin.ecdsa_sign_usermessage(ec_key, message, is_compressed=compressed)
def verify_cert_chain(chain):
""" Verify a chain of certificates. The last certificate is the CA"""
load_ca_list()
# parse the chain
cert_num = len(chain)
x509_chain = []
for i in range(cert_num):
x = x509.X509(bytearray(chain[i]))
x509_chain.append(x)
if i == 0:
x.check_date()
else:
if not x.check_ca():
raise Exception("ERROR: Supplied CA Certificate Error")
if not cert_num > 1:
raise Exception("ERROR: CA Certificate Chain Not Provided by Payment Processor")
# if the root CA is not supplied, add it to the chain
ca = x509_chain[cert_num-1]
if ca.getFingerprint() not in ca_list:
keyID = ca.get_issuer_keyID()
f = ca_keyID.get(keyID)
if f:
root = ca_list[f]
x509_chain.append(root)
else:
raise Exception("Supplied CA Not Found in Trusted CA Store.")
# verify the chain of signatures
cert_num = len(x509_chain)
for i in range(1, cert_num):
x = x509_chain[i]
prev_x = x509_chain[i-1]
algo, sig, data = prev_x.get_signature()
sig = bytearray(sig)
pubkey = rsakey.RSAKey(x.modulus, x.exponent)
if algo == x509.ALGO_RSA_SHA1:
verify = pubkey.hashAndVerify(sig, data)
elif algo == x509.ALGO_RSA_SHA256:
hashBytes = bytearray(hashlib.sha256(data).digest())
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA256 + hashBytes)
elif algo == x509.ALGO_RSA_SHA384:
hashBytes = bytearray(hashlib.sha384(data).digest())
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA384 + hashBytes)
elif algo == x509.ALGO_RSA_SHA512:
hashBytes = bytearray(hashlib.sha512(data).digest())
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA512 + hashBytes)
else:
raise Exception(f"Algorithm not supported: {algo}")
if not verify:
raise Exception("Certificate not Signed by Provided CA Certificate Chain")
return x509_chain[0], ca
def check_ssl_config(config: 'SimpleConfig'):
from . import pem
key_path = config.SSL_KEYFILE_PATH
cert_path = config.SSL_CERTFILE_PATH
with open(key_path, 'r', encoding='utf-8') as f:
params = pem.parse_private_key(f.read())
with open(cert_path, 'r', encoding='utf-8') as f:
s = f.read()
bList = pem.dePemList(s, "CERTIFICATE")
# verify chain
x, ca = verify_cert_chain(bList)
# verify that privkey and pubkey match
privkey = rsakey.RSAKey(*params)
pubkey = rsakey.RSAKey(x.modulus, x.exponent)
assert x.modulus == params[0]
assert x.exponent == params[1]
# return requestor
requestor = x.get_common_name()
if requestor.startswith('*.'):
requestor = requestor[2:]
return requestor
def sign_request_with_x509(pr, key_path, cert_path):
from . import pem
with open(key_path, 'r', encoding='utf-8') as f:
params = pem.parse_private_key(f.read())
privkey = rsakey.RSAKey(*params)
with open(cert_path, 'r', encoding='utf-8') as f:
s = f.read()
bList = pem.dePemList(s, "CERTIFICATE")
certificates = pb2.X509Certificates()
certificates.certificate.extend(map(bytes, bList))
pr.pki_type = 'x509+sha256'
pr.pki_data = certificates.SerializeToString()
msgBytes = bytearray(pr.SerializeToString())
hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
sig = privkey.sign(x509.PREFIX_RSA_SHA256 + hashBytes)
pr.signature = bytes(sig)
def serialize_request(req): # FIXME this is broken
pr = make_unsigned_request(req)
signature = req.get('sig')
requestor = req.get('name')
if requestor and signature:
pr.signature = bfh(signature)
pr.pki_type = 'dnssec+btc'
pr.pki_data = str(requestor)
return pr
def make_request(config: 'SimpleConfig', req: 'Invoice'):
pr = make_unsigned_request(req)
key_path = config.SSL_KEYFILE_PATH
cert_path = config.SSL_CERTFILE_PATH
if key_path and cert_path:
sign_request_with_x509(pr, key_path, cert_path)
return pr
-36
View File
@@ -1,36 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: paymentrequest.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14paymentrequest.proto\x12\x08payments\"+\n\x06Output\x12\x11\n\x06\x61mount\x18\x01 \x01(\x04:\x01\x30\x12\x0e\n\x06script\x18\x02 \x02(\x0c\"\xa3\x01\n\x0ePaymentDetails\x12\x15\n\x07network\x18\x01 \x01(\t:\x04main\x12!\n\x07outputs\x18\x02 \x03(\x0b\x32\x10.payments.Output\x12\x0c\n\x04time\x18\x03 \x02(\x04\x12\x0f\n\x07\x65xpires\x18\x04 \x01(\x04\x12\x0c\n\x04memo\x18\x05 \x01(\t\x12\x13\n\x0bpayment_url\x18\x06 \x01(\t\x12\x15\n\rmerchant_data\x18\x07 \x01(\x0c\"\x95\x01\n\x0ePaymentRequest\x12\"\n\x17payment_details_version\x18\x01 \x01(\r:\x01\x31\x12\x16\n\x08pki_type\x18\x02 \x01(\t:\x04none\x12\x10\n\x08pki_data\x18\x03 \x01(\x0c\x12\"\n\x1aserialized_payment_details\x18\x04 \x02(\x0c\x12\x11\n\tsignature\x18\x05 \x01(\x0c\"\'\n\x10X509Certificates\x12\x13\n\x0b\x63\x65rtificate\x18\x01 \x03(\x0c\"i\n\x07Payment\x12\x15\n\rmerchant_data\x18\x01 \x01(\x0c\x12\x14\n\x0ctransactions\x18\x02 \x03(\x0c\x12#\n\trefund_to\x18\x03 \x03(\x0b\x32\x10.payments.Output\x12\x0c\n\x04memo\x18\x04 \x01(\t\">\n\nPaymentACK\x12\"\n\x07payment\x18\x01 \x02(\x0b\x32\x11.payments.Payment\x12\x0c\n\x04memo\x18\x02 \x01(\tB(\n\x1eorg.bitcoin.protocols.paymentsB\x06Protos')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'paymentrequest_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\036org.bitcoin.protocols.paymentsB\006Protos'
_OUTPUT._serialized_start=34
_OUTPUT._serialized_end=77
_PAYMENTDETAILS._serialized_start=80
_PAYMENTDETAILS._serialized_end=243
_PAYMENTREQUEST._serialized_start=246
_PAYMENTREQUEST._serialized_end=395
_X509CERTIFICATES._serialized_start=397
_X509CERTIFICATES._serialized_end=436
_PAYMENT._serialized_start=438
_PAYMENT._serialized_end=543
_PAYMENTACK._serialized_start=545
_PAYMENTACK._serialized_end=607
# @@protoc_insertion_point(module_scope)
-535
View File
@@ -1,535 +0,0 @@
#!/usr/bin/env python
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2015 Thomas Voegtlin
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# This module uses functions from TLSLite (public domain)
#
# TLSLite Authors:
# Trevor Perrin
# Martin von Loewis - python 3 port
# Yngve Pettersen (ported by Paul Sokolovsky) - TLS 1.2
#
"""Pure-Python RSA implementation."""
import os
import math
import hashlib
def SHA1(x):
return hashlib.sha1(x).digest()
# **************************************************************************
# PRNG Functions
# **************************************************************************
def getRandomBytes(howMany):
b = bytearray(os.urandom(howMany))
assert len(b) == howMany
return b
prngName = "os.urandom"
# **************************************************************************
# Converter Functions
# **************************************************************************
def bytesToNumber(b):
total = 0
multiplier = 1
for count in range(len(b)-1, -1, -1):
byte = b[count]
total += multiplier * byte
multiplier *= 256
return total
def numberToByteArray(n, howManyBytes=None):
"""Convert an integer into a bytearray, zero-pad to howManyBytes.
The returned bytearray may be smaller than howManyBytes, but will
not be larger. The returned bytearray will contain a big-endian
encoding of the input integer (n).
"""
if howManyBytes is None:
howManyBytes = numBytes(n)
b = bytearray(howManyBytes)
for count in range(howManyBytes-1, -1, -1):
b[count] = int(n % 256)
n >>= 8
return b
def mpiToNumber(mpi): #mpi is an openssl-format bignum string
if (ord(mpi[4]) & 0x80) !=0: #Make sure this is a positive number
raise AssertionError()
b = bytearray(mpi[4:])
return bytesToNumber(b)
def numberToMPI(n):
b = numberToByteArray(n)
ext = 0
#If the high-order bit is going to be set,
#add an extra byte of zeros
if (numBits(n) & 0x7)==0:
ext = 1
length = numBytes(n) + ext
b = bytearray(4+ext) + b
b[0] = (length >> 24) & 0xFF
b[1] = (length >> 16) & 0xFF
b[2] = (length >> 8) & 0xFF
b[3] = length & 0xFF
return bytes(b)
# **************************************************************************
# Misc. Utility Functions
# **************************************************************************
def numBits(n):
if n==0:
return 0
s = "%x" % n
return ((len(s)-1)*4) + \
{'0':0, '1':1, '2':2, '3':2,
'4':3, '5':3, '6':3, '7':3,
'8':4, '9':4, 'a':4, 'b':4,
'c':4, 'd':4, 'e':4, 'f':4,
}[s[0]]
def numBytes(n):
if n==0:
return 0
bits = numBits(n)
return int(math.ceil(bits / 8.0))
# **************************************************************************
# Big Number Math
# **************************************************************************
def getRandomNumber(low, high):
if low >= high:
raise AssertionError()
howManyBits = numBits(high)
howManyBytes = numBytes(high)
lastBits = howManyBits % 8
while 1:
bytes = getRandomBytes(howManyBytes)
if lastBits:
bytes[0] = bytes[0] % (1 << lastBits)
n = bytesToNumber(bytes)
if n >= low and n < high:
return n
def gcd(a,b):
a, b = max(a,b), min(a,b)
while b:
a, b = b, a % b
return a
def lcm(a, b):
return (a * b) // gcd(a, b)
#Returns inverse of a mod b, zero if none
#Uses Extended Euclidean Algorithm
def invMod(a, b):
c, d = a, b
uc, ud = 1, 0
while c != 0:
q = d // c
c, d = d-(q*c), c
uc, ud = ud - (q * uc), uc
if d == 1:
return ud % b
return 0
def powMod(base, power, modulus):
if power < 0:
result = pow(base, power*-1, modulus)
result = invMod(result, modulus)
return result
else:
return pow(base, power, modulus)
#Pre-calculate a sieve of the ~100 primes < 1000:
def makeSieve(n):
sieve = list(range(n))
for count in range(2, int(math.sqrt(n))+1):
if sieve[count] == 0:
continue
x = sieve[count] * 2
while x < len(sieve):
sieve[x] = 0
x += sieve[count]
sieve = [x for x in sieve[2:] if x]
return sieve
sieve = makeSieve(1000)
def isPrime(n, iterations=5, display=False):
#Trial division with sieve
for x in sieve:
if x >= n: return True
if n % x == 0: return False
#Passed trial division, proceed to Rabin-Miller
#Rabin-Miller implemented per Ferguson & Schneier
#Compute s, t for Rabin-Miller
if display: print("*", end=' ')
s, t = n-1, 0
while s % 2 == 0:
s, t = s//2, t+1
#Repeat Rabin-Miller x times
a = 2 #Use 2 as a base for first iteration speedup, per HAC
for count in range(iterations):
v = powMod(a, s, n)
if v==1:
continue
i = 0
while v != n-1:
if i == t-1:
return False
else:
v, i = powMod(v, 2, n), i+1
a = getRandomNumber(2, n)
return True
def getRandomPrime(bits, display=False):
if bits < 10:
raise AssertionError()
#The 1.5 ensures the 2 MSBs are set
#Thus, when used for p,q in RSA, n will have its MSB set
#
#Since 30 is lcm(2,3,5), we'll set our test numbers to
#29 % 30 and keep them there
low = ((2 ** (bits-1)) * 3) // 2
high = 2 ** bits - 30
p = getRandomNumber(low, high)
p += 29 - (p % 30)
while 1:
if display: print(".", end=' ')
p += 30
if p >= high:
p = getRandomNumber(low, high)
p += 29 - (p % 30)
if isPrime(p, display=display):
return p
#Unused at the moment...
def getRandomSafePrime(bits, display=False):
if bits < 10:
raise AssertionError()
#The 1.5 ensures the 2 MSBs are set
#Thus, when used for p,q in RSA, n will have its MSB set
#
#Since 30 is lcm(2,3,5), we'll set our test numbers to
#29 % 30 and keep them there
low = (2 ** (bits-2)) * 3//2
high = (2 ** (bits-1)) - 30
q = getRandomNumber(low, high)
q += 29 - (q % 30)
while 1:
if display: print(".", end=' ')
q += 30
if (q >= high):
q = getRandomNumber(low, high)
q += 29 - (q % 30)
#Ideas from Tom Wu's SRP code
#Do trial division on p and q before Rabin-Miller
if isPrime(q, 0, display=display):
p = (2 * q) + 1
if isPrime(p, display=display):
if isPrime(q, display=display):
return p
class RSAKey(object):
def __init__(self, n=0, e=0, d=0, p=0, q=0, dP=0, dQ=0, qInv=0):
if (n and not e) or (e and not n):
raise AssertionError()
self.n = n
self.e = e
self.d = d
self.p = p
self.q = q
self.dP = dP
self.dQ = dQ
self.qInv = qInv
self.blinder = 0
self.unblinder = 0
def __len__(self):
"""Return the length of this key in bits.
@rtype: int
"""
return numBits(self.n)
def hasPrivateKey(self):
return self.d != 0
def hashAndSign(self, bytes):
"""Hash and sign the passed-in bytes.
This requires the key to have a private component. It performs
a PKCS1-SHA1 signature on the passed-in data.
@type bytes: str or L{bytearray} of unsigned bytes
@param bytes: The value which will be hashed and signed.
@rtype: L{bytearray} of unsigned bytes.
@return: A PKCS1-SHA1 signature on the passed-in data.
"""
hashBytes = SHA1(bytearray(bytes))
prefixedHashBytes = self._addPKCS1SHA1Prefix(hashBytes)
sigBytes = self.sign(prefixedHashBytes)
return sigBytes
def hashAndVerify(self, sigBytes, bytes):
"""Hash and verify the passed-in bytes with the signature.
This verifies a PKCS1-SHA1 signature on the passed-in data.
@type sigBytes: L{bytearray} of unsigned bytes
@param sigBytes: A PKCS1-SHA1 signature.
@type bytes: str or L{bytearray} of unsigned bytes
@param bytes: The value which will be hashed and verified.
@rtype: bool
@return: Whether the signature matches the passed-in data.
"""
hashBytes = SHA1(bytearray(bytes))
# Try it with/without the embedded NULL
prefixedHashBytes1 = self._addPKCS1SHA1Prefix(hashBytes, False)
prefixedHashBytes2 = self._addPKCS1SHA1Prefix(hashBytes, True)
result1 = self.verify(sigBytes, prefixedHashBytes1)
result2 = self.verify(sigBytes, prefixedHashBytes2)
return (result1 or result2)
def sign(self, bytes):
"""Sign the passed-in bytes.
This requires the key to have a private component. It performs
a PKCS1 signature on the passed-in data.
@type bytes: L{bytearray} of unsigned bytes
@param bytes: The value which will be signed.
@rtype: L{bytearray} of unsigned bytes.
@return: A PKCS1 signature on the passed-in data.
"""
if not self.hasPrivateKey():
raise AssertionError()
paddedBytes = self._addPKCS1Padding(bytes, 1)
m = bytesToNumber(paddedBytes)
if m >= self.n:
raise ValueError()
c = self._rawPrivateKeyOp(m)
sigBytes = numberToByteArray(c, numBytes(self.n))
return sigBytes
def verify(self, sigBytes, bytes):
"""Verify the passed-in bytes with the signature.
This verifies a PKCS1 signature on the passed-in data.
@type sigBytes: L{bytearray} of unsigned bytes
@param sigBytes: A PKCS1 signature.
@type bytes: L{bytearray} of unsigned bytes
@param bytes: The value which will be verified.
@rtype: bool
@return: Whether the signature matches the passed-in data.
"""
if len(sigBytes) != numBytes(self.n):
return False
paddedBytes = self._addPKCS1Padding(bytes, 1)
c = bytesToNumber(sigBytes)
if c >= self.n:
return False
m = self._rawPublicKeyOp(c)
checkBytes = numberToByteArray(m, numBytes(self.n))
return checkBytes == paddedBytes
def encrypt(self, bytes):
"""Encrypt the passed-in bytes.
This performs PKCS1 encryption of the passed-in data.
@type bytes: L{bytearray} of unsigned bytes
@param bytes: The value which will be encrypted.
@rtype: L{bytearray} of unsigned bytes.
@return: A PKCS1 encryption of the passed-in data.
"""
paddedBytes = self._addPKCS1Padding(bytes, 2)
m = bytesToNumber(paddedBytes)
if m >= self.n:
raise ValueError()
c = self._rawPublicKeyOp(m)
encBytes = numberToByteArray(c, numBytes(self.n))
return encBytes
def decrypt(self, encBytes):
"""Decrypt the passed-in bytes.
This requires the key to have a private component. It performs
PKCS1 decryption of the passed-in data.
@type encBytes: L{bytearray} of unsigned bytes
@param encBytes: The value which will be decrypted.
@rtype: L{bytearray} of unsigned bytes or None.
@return: A PKCS1 decryption of the passed-in data or None if
the data is not properly formatted.
"""
if not self.hasPrivateKey():
raise AssertionError()
if len(encBytes) != numBytes(self.n):
return None
c = bytesToNumber(encBytes)
if c >= self.n:
return None
m = self._rawPrivateKeyOp(c)
decBytes = numberToByteArray(m, numBytes(self.n))
#Check first two bytes
if decBytes[0] != 0 or decBytes[1] != 2:
return None
#Scan through for zero separator
for x in range(1, len(decBytes)-1):
if decBytes[x]== 0:
break
else:
return None
return decBytes[x+1:] #Return everything after the separator
# **************************************************************************
# Helper Functions for RSA Keys
# **************************************************************************
def _addPKCS1SHA1Prefix(self, bytes, withNULL=True):
# There is a long history of confusion over whether the SHA1
# algorithmIdentifier should be encoded with a NULL parameter or
# with the parameter omitted. While the original intention was
# apparently to omit it, many toolkits went the other way. TLS 1.2
# specifies the NULL should be included, and this behavior is also
# mandated in recent versions of PKCS #1, and is what tlslite has
# always implemented. Anyways, verification code should probably
# accept both. However, nothing uses this code yet, so this is
# all fairly moot.
if not withNULL:
prefixBytes = bytearray(\
[0x30,0x1f,0x30,0x07,0x06,0x05,0x2b,0x0e,0x03,0x02,0x1a,0x04,0x14])
else:
prefixBytes = bytearray(\
[0x30,0x21,0x30,0x09,0x06,0x05,0x2b,0x0e,0x03,0x02,0x1a,0x05,0x00,0x04,0x14])
prefixedBytes = prefixBytes + bytes
return prefixedBytes
def _addPKCS1Padding(self, bytes, blockType):
padLength = (numBytes(self.n) - (len(bytes)+3))
if blockType == 1: #Signature padding
pad = [0xFF] * padLength
elif blockType == 2: #Encryption padding
pad = bytearray(0)
while len(pad) < padLength:
padBytes = getRandomBytes(padLength * 2)
pad = [b for b in padBytes if b != 0]
pad = pad[:padLength]
else:
raise AssertionError()
padding = bytearray([0,blockType] + pad + [0])
paddedBytes = padding + bytes
return paddedBytes
def _rawPrivateKeyOp(self, m):
#Create blinding values, on the first pass:
if not self.blinder:
self.unblinder = getRandomNumber(2, self.n)
self.blinder = powMod(invMod(self.unblinder, self.n), self.e,
self.n)
#Blind the input
m = (m * self.blinder) % self.n
#Perform the RSA operation
c = self._rawPrivateKeyOpHelper(m)
#Unblind the output
c = (c * self.unblinder) % self.n
#Update blinding values
self.blinder = (self.blinder * self.blinder) % self.n
self.unblinder = (self.unblinder * self.unblinder) % self.n
#Return the output
return c
def _rawPrivateKeyOpHelper(self, m):
#Non-CRT version
#c = powMod(m, self.d, self.n)
#CRT version (~3x faster)
s1 = powMod(m, self.dP, self.p)
s2 = powMod(m, self.dQ, self.q)
h = ((s1 - s2) * self.qInv) % self.p
c = s2 + self.q * h
return c
def _rawPublicKeyOp(self, c):
m = powMod(c, self.e, self.n)
return m
def acceptsPassword(self):
return False
def generate(bits):
key = RSAKey()
p = getRandomPrime(bits//2, False)
q = getRandomPrime(bits//2, False)
t = lcm(p-1, q-1)
key.n = p * q
key.e = 65537
key.d = invMod(key.e, t)
key.p = p
key.q = q
key.dP = key.d % (p-1)
key.dQ = key.d % (q-1)
key.qInv = invMod(q, p)
return key
generate = staticmethod(generate)
+1 -7
View File
@@ -1252,10 +1252,8 @@ class Abstract_Wallet(ABC, Logger, EventListener):
return transactions
def create_invoice(self, *, outputs: List[PartialTxOutput], message, pr, URI) -> Invoice:
def create_invoice(self, *, outputs: List[PartialTxOutput], message, URI) -> Invoice:
height = self.adb.get_local_height()
if pr:
return Invoice.from_bip70_payreq(pr, height=height)
amount_msat = 0
for x in outputs:
if parse_max_spend(x.value):
@@ -1277,7 +1275,6 @@ class Abstract_Wallet(ABC, Logger, EventListener):
time=timestamp,
exp=exp,
outputs=outputs,
bip70=None,
height=height,
lightning_invoice=None,
)
@@ -2970,8 +2967,6 @@ class Abstract_Wallet(ABC, Logger, EventListener):
amount_sat = x.get_amount_sat()
assert isinstance(amount_sat, (int, str, type(None)))
d['outputs'] = [y.to_legacy_tuple() for y in x.get_outputs()]
if x.bip70:
d['bip70'] = x.bip70
return d
def get_invoices_and_requests_touched_by_tx(self, tx):
@@ -3055,7 +3050,6 @@ class Abstract_Wallet(ABC, Logger, EventListener):
amount_msat=amount_msat,
exp=exp_delay,
height=height,
bip70=None,
payment_hash=payment_hash,
)
key = self.add_payment_request(req)
-2
View File
@@ -108,7 +108,6 @@ class TestTypes(QETestCase):
time=1692716965,
exp=LN_EXPIRY_NEVER,
outputs=outputs,
bip70=None,
height=0,
lightning_invoice=None,
)
@@ -124,7 +123,6 @@ class TestTypes(QETestCase):
time=1692716965,
exp=LN_EXPIRY_NEVER,
outputs=outputs,
bip70=None,
height=0,
lightning_invoice=None,
)
-2
View File
@@ -233,7 +233,6 @@ class TestBaseInvoice(ElectrumTestCase):
time=1692716965,
exp=LN_EXPIRY_NEVER,
outputs=outputs,
bip70=None,
height=0,
lightning_invoice=None,
)
@@ -248,7 +247,6 @@ class TestBaseInvoice(ElectrumTestCase):
time=1692716965,
exp=LN_EXPIRY_NEVER,
outputs=outputs,
bip70=None,
height=0,
lightning_invoice=None,
)
-10
View File
@@ -391,16 +391,6 @@ class TestPaymentIdentifier(ElectrumTestCase):
self.assertFalse(pi.is_available())
self.assertTrue(pi.need_resolve())
def test_bip70(self):
pi_str = 'bitcoin:?r=https://test.bitpay.com/i/87iLJoaYVyJwFXtdassQJv'
pi = PaymentIdentifier(None, pi_str)
self.assertTrue(pi.is_valid())
self.assertEqual(PaymentIdentifierType.BIP70, pi.type)
self.assertFalse(pi.is_available())
self.assertTrue(pi.need_resolve())
# TODO resolve mock
async def test_invoice_from_payment_identifier(self):
# amount, expired, message, lightning w matching amount
bip21 = 'bitcoin:1RustyRX2oai4EYYDpQGWvEL62BBGqN9T?amount=0.02&message=unit_test&time=1707382023&exp=3600&lightning=lnbc20m1pvjluezpp5qqqsyqcyq5rqwzqfqqqsyqcyq5rqwzqfqqqsyqcyq5rqwzqfqypqhp58yjmdan79s6qqdhdzgynm4zwqd5d7xmw5fk98klysy043l2ahrqsfpp3qjmp7lwpagxun9pygexvgpjdc4jdj85fr9yq20q82gphp2nflc7jtzrcazrra7wwgzxqc8u7754cdlpfrmccae92qgzqvzq2ps8pqqqqqqpqqqqq9qqqvpeuqafqxu92d8lr6fvg0r5gv0heeeqgcrqlnm6jhphu9y00rrhy4grqszsvpcgpy9qqqqqqgqqqqq7qqzqj9n4evl6mr5aj9f58zp6fyjzup6ywn3x6sk8akg5v4tgn2q8g4fhx05wf6juaxu9760yp46454gpg5mtzgerlzezqcqvjnhjh8z3g2qqdhhwkj'