Files
pallectrum/electrum/tests/test_lnpeer.py

393 lines
15 KiB
Python
Raw Normal View History

import asyncio
import tempfile
from decimal import Decimal
import os
from contextlib import contextmanager
from collections import defaultdict
2019-05-02 18:09:11 +02:00
import logging
import concurrent
from concurrent import futures
import unittest
from aiorpcx import TaskGroup
from electrum import constants
from electrum.network import Network
from electrum.ecc import ECPrivkey
from electrum import simple_config, lnutil
from electrum.lnaddr import lnencode, LnAddr, lndecode
from electrum.bitcoin import COIN, sha256
2019-05-02 18:09:11 +02:00
from electrum.util import bh2u, create_and_start_event_loop
from electrum.lnpeer import Peer
from electrum.lnutil import LNPeerAddr, Keypair, privkey_to_pubkey
from electrum.lnutil import LightningPeerConnectionClosed, RemoteMisbehaving
from electrum.lnutil import PaymentFailure, LnLocalFeatures, HTLCOwner
from electrum.lnchannel import channel_states, peer_states, Channel
from electrum.lnrouter import LNPathFinder
from electrum.channel_db import ChannelDB
from electrum.lnworker import LNWallet, NoPathFound
from electrum.lnmsg import encode_msg, decode_msg
from electrum.logging import console_stderr_handler, Logger
from electrum.lnworker import PaymentInfo, RECEIVED, PR_UNPAID
from .test_lnchannel import create_test_channels
from .test_bitcoin import needs_test_with_all_chacha20_implementations
from . import ElectrumTestCase
def keypair():
priv = ECPrivkey.generate_random_key().get_secret_bytes()
k1 = Keypair(
pubkey=privkey_to_pubkey(priv),
privkey=priv)
return k1
@contextmanager
def noop_lock():
yield
class MockNetwork:
def __init__(self, tx_queue):
self.callbacks = defaultdict(list)
self.lnwatcher = None
self.interface = None
user_config = {}
user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-")
self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir)
self.asyncio_loop = asyncio.get_event_loop()
self.channel_db = ChannelDB(self)
self.path_finder = LNPathFinder(self.channel_db)
self.tx_queue = tx_queue
@property
def callback_lock(self):
return noop_lock()
register_callback = Network.register_callback
unregister_callback = Network.unregister_callback
trigger_callback = Network.trigger_callback
def get_local_height(self):
return 0
2020-03-06 12:40:42 +01:00
async def broadcast_transaction(self, tx):
if self.tx_queue:
await self.tx_queue.put(tx)
2020-03-06 12:40:42 +01:00
async def try_broadcasting(self, tx, name):
self.broadcast_transaction(tx)
2018-11-07 18:00:28 +01:00
class MockWallet:
def set_label(self, x, y):
pass
def save_db(self):
pass
2020-02-12 19:23:09 +01:00
def is_lightning_backup(self):
return False
2018-11-07 18:00:28 +01:00
class MockLNWallet(Logger):
def __init__(self, remote_keypair, local_keypair, chan: 'Channel', tx_queue):
Logger.__init__(self)
self.remote_keypair = remote_keypair
self.node_keypair = local_keypair
self.network = MockNetwork(tx_queue)
self.channels = {chan.channel_id: chan}
self.payments = {}
self.logs = defaultdict(list)
2018-11-07 18:00:28 +01:00
self.wallet = MockWallet()
self.localfeatures = LnLocalFeatures(0)
self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT
self.pending_payments = defaultdict(asyncio.Future)
chan.lnworker = self
chan.node_id = remote_keypair.pubkey
# used in tests
self.enable_htlc_settle = asyncio.Event()
self.enable_htlc_settle.set()
def get_invoice_status(self, key):
pass
@property
def lock(self):
return noop_lock()
@property
def peers(self):
return {self.remote_keypair.pubkey: self.peer}
def channels_for_peer(self, pubkey):
return self.channels
2019-02-03 15:27:48 +01:00
def get_channel_by_short_id(self, short_channel_id):
with self.lock:
for chan in self.channels.values():
if chan.short_channel_id == short_channel_id:
return chan
def save_channel(self, chan):
print("Ignoring channel save")
preimages = {}
get_payment_info = LNWallet.get_payment_info
save_payment_info = LNWallet.save_payment_info
set_payment_status = LNWallet.set_payment_status
get_payment_status = LNWallet.get_payment_status
await_payment = LNWallet.await_payment
payment_received = LNWallet.payment_received
payment_sent = LNWallet.payment_sent
payment_failed = LNWallet.payment_failed
save_preimage = LNWallet.save_preimage
get_preimage = LNWallet.get_preimage
_create_route_from_invoice = LNWallet._create_route_from_invoice
_check_invoice = staticmethod(LNWallet._check_invoice)
_pay_to_route = LNWallet._pay_to_route
2020-03-04 11:54:42 +01:00
_pay = LNWallet._pay
force_close_channel = LNWallet.force_close_channel
try_force_closing = LNWallet.try_force_closing
get_first_timestamp = lambda self: 0
class MockTransport:
2019-02-10 19:17:04 +01:00
def __init__(self, name):
self.queue = asyncio.Queue()
2019-02-10 19:17:04 +01:00
self._name = name
def name(self):
2019-02-10 19:17:04 +01:00
return self._name
async def read_messages(self):
while True:
yield await self.queue.get()
class NoFeaturesTransport(MockTransport):
"""
This answers the init message with a init that doesn't signal any features.
Used for testing that we require DATA_LOSS_PROTECT.
"""
def send_bytes(self, data):
decoded = decode_msg(data)
print(decoded)
if decoded[0] == 'init':
self.queue.put_nowait(encode_msg('init', lflen=1, gflen=1, localfeatures=b"\x00", globalfeatures=b"\x00"))
class PutIntoOthersQueueTransport(MockTransport):
2019-02-10 19:17:04 +01:00
def __init__(self, name):
super().__init__(name)
self.other_mock_transport = None
def send_bytes(self, data):
self.other_mock_transport.queue.put_nowait(data)
2019-02-10 19:17:04 +01:00
def transport_pair(name1, name2):
t1 = PutIntoOthersQueueTransport(name1)
t2 = PutIntoOthersQueueTransport(name2)
t1.other_mock_transport = t2
t2.other_mock_transport = t1
return t1, t2
class TestPeer(ElectrumTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
2019-05-02 18:09:11 +02:00
console_stderr_handler.setLevel(logging.DEBUG)
2019-02-10 19:17:04 +01:00
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
2020-02-12 10:32:55 +01:00
def prepare_peers(self, alice_channel, bob_channel):
k1, k2 = keypair(), keypair()
2020-02-12 10:32:55 +01:00
t1, t2 = transport_pair(alice_channel.name, bob_channel.name)
q1, q2 = asyncio.Queue(), asyncio.Queue()
2020-02-12 10:32:55 +01:00
w1 = MockLNWallet(k1, k2, alice_channel, tx_queue=q1)
w2 = MockLNWallet(k2, k1, bob_channel, tx_queue=q2)
p1 = Peer(w1, k1.pubkey, t1)
p2 = Peer(w2, k2.pubkey, t2)
w1.peer = p1
w2.peer = p2
# mark_open won't work if state is already OPEN.
# so set it to FUNDED
2020-02-12 10:32:55 +01:00
alice_channel._state = channel_states.FUNDED
bob_channel._state = channel_states.FUNDED
# this populates the channel graph:
2020-02-12 10:32:55 +01:00
p1.mark_open(alice_channel)
p2.mark_open(bob_channel)
return p1, p2, w1, w2, q1, q2
@staticmethod
def prepare_invoice(
w2, # receiver
*,
amount_sat=100_000,
):
amount_btc = amount_sat/Decimal(COIN)
payment_preimage = os.urandom(32)
RHASH = sha256(payment_preimage)
info = PaymentInfo(RHASH, amount_sat, RECEIVED, PR_UNPAID)
w2.save_preimage(RHASH, payment_preimage)
w2.save_payment_info(info)
lnaddr = LnAddr(
RHASH,
amount_btc,
tags=[('c', lnutil.MIN_FINAL_CLTV_EXPIRY_FOR_INVOICE),
('d', 'coffee')
])
return lnencode(lnaddr, w2.node_keypair.privkey)
def test_reestablish(self):
2020-02-12 10:32:55 +01:00
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
for chan in (alice_channel, bob_channel):
chan.peer_state = peer_states.DISCONNECTED
async def reestablish():
await asyncio.gather(
2020-02-12 10:32:55 +01:00
p1.reestablish_channel(alice_channel),
p2.reestablish_channel(bob_channel))
self.assertEqual(alice_channel.peer_state, peer_states.GOOD)
self.assertEqual(bob_channel.peer_state, peer_states.GOOD)
gath.cancel()
2020-03-02 15:41:50 +01:00
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p1.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
@needs_test_with_all_chacha20_implementations
def test_reestablish_with_old_state(self):
2020-02-12 10:32:55 +01:00
alice_channel, bob_channel = create_test_channels()
alice_channel_0, bob_channel_0 = create_test_channels() # these are identical
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
pay_req = self.prepare_invoice(w2)
fix test: test_reestablish_with_old_state Messages sent as part of the payment were getting interleaved with the channel_reestablish. It does not actually make sense to do a payment and then reestablish the channel in the same transport -- the channel is supposed to already have been reestablished to do a payment in the first place. So, after payment, strip down the transport, and set up a new transport before reestablishing. Traceback (most recent call last): File "...\Python\Python38\lib\unittest\case.py", line 60, in testPartExecutor yield File "...\Python\Python38\lib\unittest\case.py", line 676, in run self._callTestMethod(testMethod) File "...\Python\Python38\lib\unittest\case.py", line 633, in _callTestMethod method() File "...\electrum\electrum\tests\test_lnpeer.py", line 262, in test_reestablish_with_old_state run(f()) File "...\electrum\electrum\tests\test_lnpeer.py", line 302, in run return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 439, in result return self.__get_result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 388, in __get_result raise self._exception File "...\electrum\electrum\tests\test_lnpeer.py", line 260, in f await gath File "...\electrum\electrum\lnpeer.py", line 439, in _message_loop self.process_message(msg) File "...\electrum\electrum\lnpeer.py", line 159, in process_message execution_result = f(payload) File "...\electrum\electrum\lnpeer.py", line 1308, in on_revoke_and_ack chan.receive_revocation(rev) File "...\electrum\electrum\lnchannel.py", line 556, in receive_revocation raise Exception('revoked secret not for current point') Exception: revoked secret not for current point
2020-02-24 21:09:34 +01:00
async def pay():
2020-03-04 11:54:42 +01:00
result = await w1._pay(pay_req)
self.assertEqual(result, True)
fix test: test_reestablish_with_old_state Messages sent as part of the payment were getting interleaved with the channel_reestablish. It does not actually make sense to do a payment and then reestablish the channel in the same transport -- the channel is supposed to already have been reestablished to do a payment in the first place. So, after payment, strip down the transport, and set up a new transport before reestablishing. Traceback (most recent call last): File "...\Python\Python38\lib\unittest\case.py", line 60, in testPartExecutor yield File "...\Python\Python38\lib\unittest\case.py", line 676, in run self._callTestMethod(testMethod) File "...\Python\Python38\lib\unittest\case.py", line 633, in _callTestMethod method() File "...\electrum\electrum\tests\test_lnpeer.py", line 262, in test_reestablish_with_old_state run(f()) File "...\electrum\electrum\tests\test_lnpeer.py", line 302, in run return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 439, in result return self.__get_result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 388, in __get_result raise self._exception File "...\electrum\electrum\tests\test_lnpeer.py", line 260, in f await gath File "...\electrum\electrum\lnpeer.py", line 439, in _message_loop self.process_message(msg) File "...\electrum\electrum\lnpeer.py", line 159, in process_message execution_result = f(payload) File "...\electrum\electrum\lnpeer.py", line 1308, in on_revoke_and_ack chan.receive_revocation(rev) File "...\electrum\electrum\lnchannel.py", line 556, in receive_revocation raise Exception('revoked secret not for current point') Exception: revoked secret not for current point
2020-02-24 21:09:34 +01:00
gath.cancel()
2020-03-02 15:41:50 +01:00
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
fix test: test_reestablish_with_old_state Messages sent as part of the payment were getting interleaved with the channel_reestablish. It does not actually make sense to do a payment and then reestablish the channel in the same transport -- the channel is supposed to already have been reestablished to do a payment in the first place. So, after payment, strip down the transport, and set up a new transport before reestablishing. Traceback (most recent call last): File "...\Python\Python38\lib\unittest\case.py", line 60, in testPartExecutor yield File "...\Python\Python38\lib\unittest\case.py", line 676, in run self._callTestMethod(testMethod) File "...\Python\Python38\lib\unittest\case.py", line 633, in _callTestMethod method() File "...\electrum\electrum\tests\test_lnpeer.py", line 262, in test_reestablish_with_old_state run(f()) File "...\electrum\electrum\tests\test_lnpeer.py", line 302, in run return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 439, in result return self.__get_result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 388, in __get_result raise self._exception File "...\electrum\electrum\tests\test_lnpeer.py", line 260, in f await gath File "...\electrum\electrum\lnpeer.py", line 439, in _message_loop self.process_message(msg) File "...\electrum\electrum\lnpeer.py", line 159, in process_message execution_result = f(payload) File "...\electrum\electrum\lnpeer.py", line 1308, in on_revoke_and_ack chan.receive_revocation(rev) File "...\electrum\electrum\lnchannel.py", line 556, in receive_revocation raise Exception('revoked secret not for current point') Exception: revoked secret not for current point
2020-02-24 21:09:34 +01:00
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel_0, bob_channel)
for chan in (alice_channel_0, bob_channel):
chan.peer_state = peer_states.DISCONNECTED
fix test: test_reestablish_with_old_state Messages sent as part of the payment were getting interleaved with the channel_reestablish. It does not actually make sense to do a payment and then reestablish the channel in the same transport -- the channel is supposed to already have been reestablished to do a payment in the first place. So, after payment, strip down the transport, and set up a new transport before reestablishing. Traceback (most recent call last): File "...\Python\Python38\lib\unittest\case.py", line 60, in testPartExecutor yield File "...\Python\Python38\lib\unittest\case.py", line 676, in run self._callTestMethod(testMethod) File "...\Python\Python38\lib\unittest\case.py", line 633, in _callTestMethod method() File "...\electrum\electrum\tests\test_lnpeer.py", line 262, in test_reestablish_with_old_state run(f()) File "...\electrum\electrum\tests\test_lnpeer.py", line 302, in run return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 439, in result return self.__get_result() File "...\Python\Python38\lib\concurrent\futures\_base.py", line 388, in __get_result raise self._exception File "...\electrum\electrum\tests\test_lnpeer.py", line 260, in f await gath File "...\electrum\electrum\lnpeer.py", line 439, in _message_loop self.process_message(msg) File "...\electrum\electrum\lnpeer.py", line 159, in process_message execution_result = f(payload) File "...\electrum\electrum\lnpeer.py", line 1308, in on_revoke_and_ack chan.receive_revocation(rev) File "...\electrum\electrum\lnchannel.py", line 556, in receive_revocation raise Exception('revoked secret not for current point') Exception: revoked secret not for current point
2020-02-24 21:09:34 +01:00
async def reestablish():
await asyncio.gather(
2020-02-12 10:32:55 +01:00
p1.reestablish_channel(alice_channel_0),
p2.reestablish_channel(bob_channel))
self.assertEqual(alice_channel_0.peer_state, peer_states.BAD)
self.assertEqual(bob_channel._state, channel_states.FORCE_CLOSING)
# wait so that pending messages are processed
#await asyncio.sleep(1)
gath.cancel()
2020-03-02 15:41:50 +01:00
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
@needs_test_with_all_chacha20_implementations
def test_payment(self):
2020-02-12 10:32:55 +01:00
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
pay_req = self.prepare_invoice(w2)
async def pay():
2020-03-04 11:54:42 +01:00
result = await w1._pay(pay_req)
self.assertTrue(result)
gath.cancel()
2020-03-02 15:41:50 +01:00
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
#@unittest.skip("too expensive")
#@needs_test_with_all_chacha20_implementations
def test_payments_stresstest(self):
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
alice_init_balance_msat = alice_channel.balance(HTLCOwner.LOCAL)
bob_init_balance_msat = bob_channel.balance(HTLCOwner.LOCAL)
num_payments = 50
#pay_reqs1 = [self.prepare_invoice(w1, amount_sat=1) for i in range(num_payments)]
pay_reqs2 = [self.prepare_invoice(w2, amount_sat=1) for i in range(num_payments)]
max_htlcs_in_flight = asyncio.Semaphore(5)
async def single_payment(pay_req):
async with max_htlcs_in_flight:
await w1._pay(pay_req)
async def many_payments():
async with TaskGroup() as group:
for pay_req in pay_reqs2:
await group.spawn(single_payment(pay_req))
gath.cancel()
gath = asyncio.gather(many_payments(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
self.assertEqual(alice_init_balance_msat - num_payments * 1000, alice_channel.balance(HTLCOwner.LOCAL))
self.assertEqual(alice_init_balance_msat - num_payments * 1000, bob_channel.balance(HTLCOwner.REMOTE))
self.assertEqual(bob_init_balance_msat + num_payments * 1000, bob_channel.balance(HTLCOwner.LOCAL))
self.assertEqual(bob_init_balance_msat + num_payments * 1000, alice_channel.balance(HTLCOwner.REMOTE))
@needs_test_with_all_chacha20_implementations
def test_close(self):
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
w1.network.config.set_key('dynamic_fees', False)
w2.network.config.set_key('dynamic_fees', False)
w1.network.config.set_key('fee_per_kb', 5000)
w2.network.config.set_key('fee_per_kb', 1000)
w2.enable_htlc_settle.clear()
pay_req = self.prepare_invoice(w2)
lnaddr = lndecode(pay_req, expected_hrp=constants.net.SEGWIT_HRP)
async def pay():
await asyncio.wait_for(p1.initialized, 1)
await asyncio.wait_for(p2.initialized, 1)
# alice sends htlc
route = await w1._create_route_from_invoice(decoded_invoice=lnaddr)
2020-03-04 11:54:42 +01:00
htlc = p1.pay(route, alice_channel, int(lnaddr.amount * COIN * 1000), lnaddr.paymenthash, lnaddr.get_min_final_cltv_expiry())
# alice closes
await p1.close_channel(alice_channel.channel_id)
gath.cancel()
async def set_settle():
await asyncio.sleep(0.1)
w2.enable_htlc_settle.set()
2020-03-02 15:41:50 +01:00
gath = asyncio.gather(pay(), set_settle(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
def test_channel_usage_after_closing(self):
2020-02-12 10:32:55 +01:00
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, q1, q2 = self.prepare_peers(alice_channel, bob_channel)
pay_req = self.prepare_invoice(w2)
addr = w1._check_invoice(pay_req)
2019-02-01 20:59:59 +01:00
route = run(w1._create_route_from_invoice(decoded_invoice=addr))
2020-02-12 10:32:55 +01:00
run(w1.force_close_channel(alice_channel.channel_id))
# check if a tx (commitment transaction) was broadcasted:
assert q1.qsize() == 1
with self.assertRaises(NoPathFound) as e:
2019-02-01 20:59:59 +01:00
run(w1._create_route_from_invoice(decoded_invoice=addr))
peer = w1.peers[route[0].node_id]
# AssertionError is ok since we shouldn't use old routes, and the
# route finding should fail when channel is closed
async def f():
2020-03-02 15:41:50 +01:00
await asyncio.gather(w1._pay_to_route(route, addr), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
with self.assertRaises(PaymentFailure):
run(f())
def run(coro):
return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result()