import asyncio import time import urllib import re from decimal import Decimal, InvalidOperation from enum import IntEnum from typing import NamedTuple, Optional, Callable, List, TYPE_CHECKING, Tuple, Union from . import bitcoin from .contacts import AliasNotFoundException from .i18n import _ from .invoices import Invoice from .logging import Logger from .util import parse_max_spend, InvoiceError from .util import get_asyncio_loop, log_exceptions from .transaction import PartialTxOutput from .lnurl import (decode_lnurl, request_lnurl, callback_lnurl, LNURLError, lightning_address_to_url, try_resolve_lnurlpay, LNURL6Data, LNURL3Data, LNURLData) from .bitcoin import opcodes, construct_script from .lnaddr import LnInvoiceException from .lnutil import IncompatibleOrInsaneFeatures from .bip21 import parse_bip21_URI, InvalidBitcoinURI, LIGHTNING_URI_SCHEME, BITCOIN_BIP21_URI_SCHEME from .segwit_addr import bech32_decode if TYPE_CHECKING: from .wallet import Abstract_Wallet from .transaction import Transaction def maybe_extract_bech32_lightning_payment_identifier(data: str) -> Optional[str]: data = remove_uri_prefix(data, prefix=LIGHTNING_URI_SCHEME) if not data.startswith('ln'): return None decoded_bech32 = bech32_decode(data, ignore_long_length=True) if not decoded_bech32.hrp or not decoded_bech32.data: return None return data def remove_uri_prefix(data: str, *, prefix: str) -> str: assert isinstance(data, str) and isinstance(prefix, str) data = data.lower().strip() data = data.removeprefix(prefix + ':') return data RE_ALIAS = r'(.*?)\s*\<([0-9A-Za-z]{1,})\>' RE_EMAIL = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9-]+\.)+[A-Z|a-z]{2,7}\b' RE_DOMAIN = r'\b([A-Za-z0-9-]+\.)+[A-Z|a-z]{2,7}\b' RE_SCRIPT_FN = r'script\((.*)\)' class PaymentIdentifierState(IntEnum): EMPTY = 0 # Initial state. INVALID = 1 # Unrecognized PI AVAILABLE = 2 # PI contains a payable destination # payable means there's enough addressing information to submit to one # of the channels Electrum supports (on-chain, lightning) NEED_RESOLVE = 3 # PI contains a recognized destination format, but needs an online resolve step LNURLP_FINALIZE = 4 # PI contains a resolved LNURLp, but needs amount and comment to resolve to a bolt11 LNURLW_FINALIZE = 5 # PI contains resolved LNURLw, user needs to enter amount and initiate withdraw ERROR = 50 # generic error NOT_FOUND = 51 # PI contains a recognized destination format, but resolve step was unsuccessful MERCHANT_ERROR = 52 # PI failed notifying the merchant after broadcasting onchain TX INVALID_AMOUNT = 53 # Specified amount not accepted class PaymentIdentifierType(IntEnum): UNKNOWN = 0 SPK = 1 BIP21 = 2 MULTILINE = 4 BOLT11 = 5 LNURL = 6 # before the resolve it's unknown if pi is LNURLP or LNURLW LNURLP = 7 LNURLW = 8 EMAILLIKE = 9 OPENALIAS = 10 LNADDR = 11 DOMAINLIKE = 12 class FieldsForGUI(NamedTuple): recipient: Optional[str] amount: Optional[int] description: Optional[str] validated: Optional[bool] comment: Optional[int] amount_range: Optional[Tuple[int, int]] class PaymentIdentifier(Logger): """ Takes: * bitcoin addresses or script * paytomany csv * openalias * bip21 URI * lightning-URI (containing bolt11 or lnurl) * bolt11 invoice * lnurl * lightning address """ def __init__(self, wallet: Optional['Abstract_Wallet'], text: str): Logger.__init__(self) self._state = PaymentIdentifierState.EMPTY self.wallet = wallet self.contacts = wallet.contacts if wallet is not None else None self.config = wallet.config if wallet is not None else None self.text = text.strip() self._type = PaymentIdentifierType.UNKNOWN self.error = None # if set, GUI should show error and stop self.warning = None # if set, GUI should ask user if they want to proceed # more than one of those may be set self.multiline_outputs = None self._is_max = False self.bolt11 = None # type: Optional[Invoice] self.bip21 = None self.spk = None self.spk_is_address = False # self.emaillike = None self.domainlike = None self.openalias_data = None # self.lnurl = None # type: Optional[str] self.lnurl_data = None # type: Optional[LNURLData] self.parse(text) @property def type(self): return self._type def set_state(self, state: 'PaymentIdentifierState'): self.logger.debug(f'PI state {self._state.name} -> {state.name}') self._state = state @property def state(self): return self._state def need_resolve(self): return self._state == PaymentIdentifierState.NEED_RESOLVE def need_finalize(self): return self._state == PaymentIdentifierState.LNURLP_FINALIZE def is_valid(self): return self._state not in [PaymentIdentifierState.INVALID, PaymentIdentifierState.EMPTY] def is_available(self): return self._state in [PaymentIdentifierState.AVAILABLE] def is_lightning(self): return bool(self.lnurl) or bool(self.bolt11) def is_onchain(self): if self._type in [PaymentIdentifierType.SPK, PaymentIdentifierType.MULTILINE, PaymentIdentifierType.OPENALIAS]: return True if self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.BOLT11, PaymentIdentifierType.LNADDR]: return bool(self.bolt11) and bool(self.bolt11.get_address()) if self._type == PaymentIdentifierType.BIP21: return bool(self.bip21.get('address', None)) or (bool(self.bolt11) and bool(self.bolt11.get_address())) def is_multiline(self): return bool(self.multiline_outputs) def is_multiline_max(self): return self.is_multiline() and self._is_max def is_amount_locked(self): if self._type == PaymentIdentifierType.BIP21: return bool(self.bip21.get('amount')) elif self._type == PaymentIdentifierType.BOLT11: return bool(self.bolt11.get_amount_sat()) elif self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.LNADDR]: # amount limits known after resolve, might be specific amount or locked to range if self.need_resolve(): return False if self.need_finalize(): self.logger.debug(f'lnurl f {self.lnurl_data.min_sendable_sat}-{self.lnurl_data.max_sendable_sat}') return not (self.lnurl_data.min_sendable_sat < self.lnurl_data.max_sendable_sat) return True elif self._type == PaymentIdentifierType.MULTILINE: return True else: return False def is_error(self) -> bool: return self._state >= PaymentIdentifierState.ERROR def get_error(self) -> str: return self.error def parse(self, text: str): # parse text, set self._type and self.error text = text.strip() if not text: return if outputs := self._parse_as_multiline(text): self._type = PaymentIdentifierType.MULTILINE self.multiline_outputs = outputs if self.error: self.set_state(PaymentIdentifierState.INVALID) else: self.set_state(PaymentIdentifierState.AVAILABLE) elif invoice_or_lnurl := maybe_extract_bech32_lightning_payment_identifier(text): if invoice_or_lnurl.startswith('lnurl'): self._type = PaymentIdentifierType.LNURL try: self.lnurl = decode_lnurl(invoice_or_lnurl) self.set_state(PaymentIdentifierState.NEED_RESOLVE) except Exception as e: self.error = _("Error parsing LNURL") + f":\n{e}" self.set_state(PaymentIdentifierState.INVALID) return else: self._type = PaymentIdentifierType.BOLT11 try: self.bolt11 = Invoice.from_bech32(invoice_or_lnurl) except InvoiceError as e: self.error = self._get_error_from_invoiceerror(e) self.set_state(PaymentIdentifierState.INVALID) self.logger.debug(f'Exception cause {e.args!r}') return self.set_state(PaymentIdentifierState.AVAILABLE) elif text.lower().startswith(BITCOIN_BIP21_URI_SCHEME + ':'): try: out = parse_bip21_URI(text) except InvalidBitcoinURI as e: self.error = _("Error parsing URI") + f":\n{e}" self.set_state(PaymentIdentifierState.INVALID) return self.bip21 = out self._type = PaymentIdentifierType.BIP21 # check optional lightning in bip21, set self.bolt11 if valid bolt11 = out.get('lightning') if bolt11: try: self.bolt11 = Invoice.from_bech32(bolt11) # carry BIP21 onchain address in Invoice.outputs in case bolt11 doesn't contain a fallback # address but the BIP21 URI has one. if bip21_address := self.bip21.get('address'): amount = self.bip21.get('amount', 0) self.bolt11.outputs = [PartialTxOutput.from_address_and_value(bip21_address, amount)] except InvoiceError as e: self.logger.debug(self._get_error_from_invoiceerror(e)) elif not self.bip21.get('address'): # no address and no bolt11, invalid self.set_state(PaymentIdentifierState.INVALID) return self.set_state(PaymentIdentifierState.AVAILABLE) elif self.parse_output(text)[0]: scriptpubkey, is_address = self.parse_output(text) self._type = PaymentIdentifierType.SPK self.spk = scriptpubkey self.spk_is_address = is_address self.set_state(PaymentIdentifierState.AVAILABLE) elif self.contacts and (contact := self.contacts.by_name(text)): if contact['type'] == 'address': self._type = PaymentIdentifierType.BIP21 self.bip21 = { 'address': contact['address'], 'label': contact['name'] } self.set_state(PaymentIdentifierState.AVAILABLE) elif contact['type'] in ('openalias', 'lnaddress'): self._type = PaymentIdentifierType.EMAILLIKE self.emaillike = contact['address'] self.set_state(PaymentIdentifierState.NEED_RESOLVE) elif re.match(RE_EMAIL, (maybe_emaillike := remove_uri_prefix(text, prefix=LIGHTNING_URI_SCHEME))): self._type = PaymentIdentifierType.EMAILLIKE self.emaillike = maybe_emaillike self.set_state(PaymentIdentifierState.NEED_RESOLVE) elif re.match(RE_DOMAIN, text): self._type = PaymentIdentifierType.DOMAINLIKE self.domainlike = text self.set_state(PaymentIdentifierState.NEED_RESOLVE) elif self.error is None: truncated_text = f"{text[:100]}..." if len(text) > 100 else text self.error = f"Unknown payment identifier:\n{truncated_text}" self.set_state(PaymentIdentifierState.INVALID) def resolve(self, *, on_finished: Callable[['PaymentIdentifier'], None]) -> None: assert self._state == PaymentIdentifierState.NEED_RESOLVE coro = self._do_resolve(on_finished=on_finished) asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop()) @log_exceptions async def _do_resolve(self, *, on_finished: Callable[['PaymentIdentifier'], None] = None): try: if self.emaillike or self.domainlike: openalias_key = self.emaillike if self.emaillike else self.domainlike openalias_task = asyncio.create_task(self.resolve_openalias(openalias_key)) # prefers lnurl over openalias if both are available lnurl = lightning_address_to_url(self.emaillike) if self.emaillike else None if lnurl is not None and (lnurl_result := await try_resolve_lnurlpay(lnurl)): openalias_task.cancel() self._type = PaymentIdentifierType.LNADDR self.lnurl = lnurl self.lnurl_data = lnurl_result self.set_state(PaymentIdentifierState.LNURLP_FINALIZE) elif openalias_result := await openalias_task: self.openalias_data = openalias_result address = openalias_result.get('address') try: # this assertion error message is shown in the GUI assert bitcoin.is_address(address), f"{_('Openalias address invalid')}: {address[:100]}" scriptpubkey = bitcoin.address_to_script(address) self._type = PaymentIdentifierType.OPENALIAS self.spk = scriptpubkey self.set_state(PaymentIdentifierState.AVAILABLE) except Exception as e: self.error = str(e) self.set_state(PaymentIdentifierState.NOT_FOUND) else: self.set_state(PaymentIdentifierState.NOT_FOUND) elif self.lnurl: data = await request_lnurl(self.lnurl) self.lnurl_data = data if isinstance(data, LNURL6Data): self._type = PaymentIdentifierType.LNURLP self.set_state(PaymentIdentifierState.LNURLP_FINALIZE) elif isinstance(data, LNURL3Data): self._type = PaymentIdentifierType.LNURLW self.set_state(PaymentIdentifierState.LNURLW_FINALIZE) else: raise NotImplementedError(f"Invalid LNURL type? {data=}") self.logger.debug(f'LNURL data: {data!r}') else: self.set_state(PaymentIdentifierState.ERROR) return except Exception as e: self.error = str(e) self.logger.error(f"_do_resolve() got error: {e!r}") self.set_state(PaymentIdentifierState.ERROR) finally: if on_finished: on_finished(self) def finalize( self, *, amount_sat: int = 0, comment: str = None, on_finished: Callable[['PaymentIdentifier'], None] = None, ): assert self._state == PaymentIdentifierState.LNURLP_FINALIZE coro = self._do_finalize(amount_sat=amount_sat, comment=comment, on_finished=on_finished) asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop()) @log_exceptions async def _do_finalize( self, *, amount_sat: int = None, comment: str = None, on_finished: Callable[['PaymentIdentifier'], None] = None, ): from .invoices import Invoice try: if not self.lnurl_data: raise Exception("Unexpected missing LNURL data") if not (self.lnurl_data.min_sendable_sat <= amount_sat <= self.lnurl_data.max_sendable_sat): self.error = _('Amount must be between {} and {} sat.').format( self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat) self.set_state(PaymentIdentifierState.INVALID_AMOUNT) return if self.lnurl_data.comment_allowed == 0: comment = None params = {'amount': amount_sat * 1000} if comment: params['comment'] = comment try: invoice_data = await callback_lnurl(self.lnurl_data.callback_url, params=params) except LNURLError as e: self.error = f"LNURL request encountered error: {e}" self.set_state(PaymentIdentifierState.ERROR) return bolt11_invoice = invoice_data.get('pr') invoice = Invoice.from_bech32(bolt11_invoice) if invoice.get_amount_sat() != amount_sat: raise Exception("lnurl returned invoice with wrong amount") # this will change what is returned by get_fields_for_GUI self.bolt11 = invoice self.set_state(PaymentIdentifierState.AVAILABLE) except Exception as e: self.error = str(e) self.logger.error(f"_do_finalize() got error: {e!r}") self.set_state(PaymentIdentifierState.ERROR) finally: if on_finished: on_finished(self) def get_onchain_outputs(self, amount): if self.multiline_outputs: return self.multiline_outputs elif self.spk: return [PartialTxOutput(scriptpubkey=self.spk, value=amount)] elif self.bip21: address = self.bip21.get('address') scriptpubkey, is_address = self.parse_output(address) assert is_address # unlikely, but make sure it is an address, not a script return [PartialTxOutput(scriptpubkey=scriptpubkey, value=amount)] else: raise Exception('not onchain') def _parse_as_multiline(self, text: str): # filter out empty lines lines = text.split('\n') lines = [i for i in lines if i] is_multiline = len(lines) > 1 outputs = [] # type: List[PartialTxOutput] errors = '' total = 0 self._is_max = False for i, line in enumerate(lines): try: output = self.parse_address_and_amount(line) outputs.append(output) if parse_max_spend(output.value): self._is_max = True else: total += output.value except Exception as e: errors = f'{errors}line #{i}: {str(e)}\n' continue if is_multiline and errors: self.error = errors.strip() if errors else None self.logger.debug(f'multiline: {outputs!r}, {self.error}') return outputs def parse_address_and_amount(self, line: str) -> PartialTxOutput: try: x, y = line.split(',') except ValueError: raise Exception("expected two comma-separated values: (address, amount)") from None scriptpubkey, is_address = self.parse_output(x) if not scriptpubkey: raise Exception('Invalid address') amount = self.parse_amount(y) return PartialTxOutput(scriptpubkey=scriptpubkey, value=amount) def parse_output(self, x: str) -> Tuple[Optional[bytes], bool]: try: address = self.parse_address(x) return bitcoin.address_to_script(address), True except Exception as e: pass try: m = re.match('^' + RE_SCRIPT_FN + '$', x) script = self.parse_script(str(m.group(1))) return script, False except Exception as e: pass return None, False def parse_script(self, x: str) -> bytes: script = bytearray() for word in x.split(): if word[0:3] == 'OP_': opcode_int = opcodes[word] script += construct_script([opcode_int]) else: bytes.fromhex(word) # to test it is hex data script += construct_script([word]) return bytes(script) def parse_amount(self, x: str) -> Union[str, int]: x = x.strip() if not x: raise Exception("Amount is empty") if parse_max_spend(x): return x p = pow(10, self.config.BTC_AMOUNTS_DECIMAL_POINT) try: return int(p * Decimal(x)) except InvalidOperation: raise Exception("Invalid amount") def parse_address(self, line: str): r = line.strip() m = re.match('^' + RE_ALIAS + '$', r) address = str(m.group(2) if m else r) assert bitcoin.is_address(address) return address def _get_error_from_invoiceerror(self, e: 'InvoiceError') -> str: error = _("Error parsing Lightning invoice") + f":\n{e!r}" if e.args and len(e.args): arg = e.args[0] if isinstance(arg, LnInvoiceException): error = _("Error parsing Lightning invoice") + f":\n{e}" elif isinstance(arg, IncompatibleOrInsaneFeatures): error = _("Invoice requires unknown or incompatible Lightning feature") + f":\n{e!r}" return error def get_fields_for_GUI(self) -> FieldsForGUI: recipient = None amount = None description = None validated = None comment = None amount_range = None if (self.emaillike or self.domainlike) and self.openalias_data: key = self.emaillike if self.emaillike else self.domainlike address = self.openalias_data.get('address') name = self.openalias_data.get('name') description = name recipient = key + ' <' + address + '>' elif self.bolt11: recipient, amount, description = self._get_bolt11_fields() elif self.lnurl and self.lnurl_data: assert isinstance(self.lnurl_data, LNURL6Data), f"{self.lnurl_data=}" domain = urllib.parse.urlparse(self.lnurl).netloc recipient = f"{self.lnurl_data.metadata_plaintext} <{domain}>" description = self.lnurl_data.metadata_plaintext if self.lnurl_data.comment_allowed: comment = self.lnurl_data.comment_allowed if self.lnurl_data.min_sendable_sat: amount = self.lnurl_data.min_sendable_sat if self.lnurl_data.min_sendable_sat != self.lnurl_data.max_sendable_sat: amount_range = (self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat) elif self.spk: pass elif self.multiline_outputs: pass elif self.bip21: label = self.bip21.get('label') address = self.bip21.get('address') recipient = f'{label} <{address}>' if label else address amount = self.bip21.get('amount') description = self.bip21.get('message') # TODO: use label as description? (not BIP21 compliant) # if label and not description: # description = label return FieldsForGUI(recipient=recipient, amount=amount, description=description, comment=comment, validated=validated, amount_range=amount_range) def _get_bolt11_fields(self): lnaddr = self.bolt11._lnaddr # TODO: improve access to lnaddr pubkey = lnaddr.pubkey.serialize().hex() for k, v in lnaddr.tags: if k == 'd': description = v break else: description = '' amount = lnaddr.get_amount_sat() return pubkey, amount, description async def resolve_openalias(self, key: str) -> Optional[dict]: parts = key.split(sep=',') # assuming single line if parts and len(parts) > 0 and bitcoin.is_address(parts[0]): return None try: data = await self.contacts.resolve(key) # TODO: don't use contacts as delegate to resolve openalias, separate. self.logger.debug(f'OA: {data!r}') return data except AliasNotFoundException as e: self.logger.info(f'OpenAlias not found: {repr(e)}') return None except Exception as e: self.logger.info(f'error resolving address/alias: {repr(e)}') return None def has_expired(self): if self.bolt11: return self.bolt11.has_expired() elif self.bip21: expires = self.bip21.get('exp') + self.bip21.get('time') if self.bip21.get('exp') else 0 return bool(expires) and expires < time.time() return False def invoice_from_payment_identifier( pi: 'PaymentIdentifier', wallet: 'Abstract_Wallet', amount_sat: Union[int, str], message: str = None ) -> Optional[Invoice]: assert pi.state in [PaymentIdentifierState.AVAILABLE,] assert pi.is_onchain() if amount_sat == '!' else True # MAX should only be allowed if pi has onchain destination if pi.is_lightning() and not amount_sat == '!': invoice = pi.bolt11 if not invoice: return if invoice._lnaddr.get_amount_msat() is None: invoice.set_amount_msat(int(amount_sat * 1000)) return invoice else: outputs = pi.get_onchain_outputs(amount_sat) message = pi.bip21.get('message') if pi.bip21 else message return wallet.create_invoice( outputs=outputs, message=message, URI=pi.bip21, )