Improve accuracy of swapserver liquidity announcement.
Adds event handler and more calls to the liquidity update trigger to ensure that changes in liquidity will get published more reliably.
This commit is contained in:
@@ -210,7 +210,6 @@ class SwapManager(Logger):
|
||||
self.is_server = False # overriden by swapserver plugin if enabled
|
||||
self.is_initialized = asyncio.Event()
|
||||
self.pairs_updated = asyncio.Event()
|
||||
self._liquidity_changed = asyncio.Event()
|
||||
|
||||
def start_network(self, network: 'Network'):
|
||||
assert network
|
||||
@@ -229,23 +228,33 @@ class SwapManager(Logger):
|
||||
async def run_nostr_server(self):
|
||||
await self.set_nostr_proof_of_work()
|
||||
with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport:
|
||||
# wait a bit so we don't publish 0 liquidity on startup if channels are not yet reestablished
|
||||
await asyncio.sleep(10)
|
||||
await transport.is_connected.wait()
|
||||
self.logger.info(f'nostr is connected')
|
||||
# will publish a new announcement if liquidity changed or every OFFER_UPDATE_INTERVAL_SEC
|
||||
last_update = time.time()
|
||||
while True:
|
||||
# todo: publish everytime fees have changed
|
||||
self.server_update_pairs()
|
||||
await transport.publish_offer(self)
|
||||
await asyncio.sleep(transport.LIQUIDITY_UPDATE_INTERVAL_SEC)
|
||||
|
||||
previous_max_forward = self._max_forward
|
||||
previous_max_reverse = self._max_reverse
|
||||
previous_mining_fee = self.mining_fee
|
||||
try:
|
||||
await wait_for2(
|
||||
self._liquidity_changed.wait(),
|
||||
timeout=transport.OFFER_UPDATE_INTERVAL_SEC
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
self.server_update_pairs()
|
||||
except Exception:
|
||||
self.logger.exception("server_update_pairs failed")
|
||||
continue
|
||||
|
||||
liquidity_changed = self._max_forward != previous_max_forward \
|
||||
or self._max_reverse != previous_max_reverse
|
||||
mining_fees_changed = self.mining_fee != previous_mining_fee
|
||||
if liquidity_changed or mining_fees_changed:
|
||||
self.logger.debug(f"updating announcement: {liquidity_changed=}, {mining_fees_changed=}")
|
||||
elif time.time() - last_update < transport.OFFER_UPDATE_INTERVAL_SEC:
|
||||
continue
|
||||
|
||||
await transport.publish_offer(self)
|
||||
last_update = time.time()
|
||||
|
||||
@log_exceptions
|
||||
async def main_loop(self):
|
||||
tasks = [self.pay_pending_invoices()]
|
||||
@@ -444,7 +453,6 @@ class SwapManager(Logger):
|
||||
except BelowDustLimit:
|
||||
self.logger.info('utxo value below dust threshold')
|
||||
return
|
||||
self.server_maybe_trigger_liquidity_update()
|
||||
|
||||
def get_swap_tx_fee(self):
|
||||
return self._get_tx_fee(self.config.FEE_POLICY)
|
||||
@@ -940,7 +948,10 @@ class SwapManager(Logger):
|
||||
max_reverse: int = min(int(self.lnworker.num_sats_can_send()), 10000000)
|
||||
self._max_forward: int = self._keep_leading_digits(max_forward, 2)
|
||||
self._max_reverse: int = self._keep_leading_digits(max_reverse, 2)
|
||||
self.mining_fee = self.get_fee_for_txbatcher()
|
||||
new_mining_fee = self.get_fee_for_txbatcher()
|
||||
if self.mining_fee is None \
|
||||
or abs(self.mining_fee - new_mining_fee) / self.mining_fee > 0.1:
|
||||
self.mining_fee = new_mining_fee
|
||||
|
||||
@staticmethod
|
||||
def _keep_leading_digits(num: int, digits: int) -> int:
|
||||
@@ -968,23 +979,6 @@ class SwapManager(Logger):
|
||||
|
||||
run_sync_function_on_asyncio_thread(trigger, block=True)
|
||||
|
||||
def server_maybe_trigger_liquidity_update(self) -> None:
|
||||
"""
|
||||
To be called when the available liquidity changes so the new liquidity is announced.
|
||||
(ln in/out, onchain in/out)
|
||||
"""
|
||||
if not self.is_server:
|
||||
return
|
||||
assert get_running_loop() == get_asyncio_loop(), "Events must be set in the asyncio thread"
|
||||
previous_max_forward = self._max_forward
|
||||
previous_max_reverse = self._max_reverse
|
||||
self.server_update_pairs()
|
||||
# if liquidity really changed the event is triggered so a new provider announcement is published
|
||||
if self._max_forward != previous_max_forward or self._max_reverse != previous_max_reverse:
|
||||
self.logger.debug(f"liquidity changed, updating announcement")
|
||||
self._liquidity_changed.set()
|
||||
self._liquidity_changed.clear()
|
||||
|
||||
def get_provider_max_forward_amount(self) -> int:
|
||||
"""in sat"""
|
||||
return self._max_forward
|
||||
@@ -1367,6 +1361,7 @@ class NostrTransport(SwapServerTransport):
|
||||
USER_STATUS_NIP38 = 30315
|
||||
NOSTR_EVENT_VERSION = 5
|
||||
OFFER_UPDATE_INTERVAL_SEC = 60 * 10
|
||||
LIQUIDITY_UPDATE_INTERVAL_SEC = 30
|
||||
|
||||
def __init__(self, config, sm, keypair):
|
||||
SwapServerTransport.__init__(self, config=config, sm=sm)
|
||||
@@ -1655,7 +1650,6 @@ class NostrTransport(SwapServerTransport):
|
||||
r['reply_to'] = event_id
|
||||
self.logger.debug(f'sending response id={event_id}')
|
||||
await self.send_direct_message(event_pubkey, json.dumps(r))
|
||||
self.sm.server_maybe_trigger_liquidity_update()
|
||||
|
||||
def _store_last_swapserver_relays(self, relays: Sequence[str]):
|
||||
self._last_swapserver_relays = relays
|
||||
|
||||
Reference in New Issue
Block a user