plugins: lsps: add own trait for lightning provider

Clean up the CLN interface

Signed-off-by: Peter Neuroth <pet.v.ne@gmail.com>
This commit is contained in:
Peter Neuroth
2025-12-06 15:12:38 +01:00
committed by madelinevibes
parent a05c2c9c2c
commit e001652ab1

View File

@@ -19,7 +19,10 @@ use crate::{
};
use anyhow::{Context, Result as AnyResult};
use async_trait::async_trait;
use bitcoin::{hashes::Hash as _, secp256k1::PublicKey};
use bitcoin::{
hashes::{sha256::Hash as Sha256, Hash as _},
secp256k1::PublicKey,
};
use chrono::Utc;
use cln_rpc::{
model::{
@@ -27,7 +30,7 @@ use cln_rpc::{
DatastoreMode, DatastoreRequest, DeldatastoreRequest, FundchannelRequest,
GetinfoRequest, ListdatastoreRequest, ListpeerchannelsRequest,
},
responses::{FundchannelResponse, ListdatastoreResponse, ListpeerchannelsResponse},
responses::ListdatastoreResponse,
},
primitives::{Amount, AmountOrAll, ChannelState},
ClnRpc,
@@ -37,21 +40,6 @@ use rand::{rng, Rng as _};
use serde::Serialize;
use std::{fmt, path::PathBuf, sync::Arc, time::Duration};
#[async_trait]
pub trait ClnApi: Send + Sync {
async fn lsps2_getchannelcapacity(
&self,
params: &Lsps2PolicyGetChannelCapacityRequest,
) -> AnyResult<Lsps2PolicyGetChannelCapacityResponse>;
async fn cln_fundchannel(&self, params: &FundchannelRequest) -> AnyResult<FundchannelResponse>;
async fn cln_listpeerchannels(
&self,
params: &ListpeerchannelsRequest,
) -> AnyResult<ListpeerchannelsResponse>;
}
const DEFAULT_CLTV_EXPIRY_DELTA: u32 = 144;
#[derive(Clone)]
@@ -70,35 +58,55 @@ impl ClnApiRpc {
}
#[async_trait]
impl ClnApi for ClnApiRpc {
async fn lsps2_getchannelcapacity(
impl LightningProvider for ClnApiRpc {
async fn fund_jit_channel(
&self,
params: &Lsps2PolicyGetChannelCapacityRequest,
) -> AnyResult<Lsps2PolicyGetChannelCapacityResponse> {
peer_id: &PublicKey,
amount: &Msat,
) -> AnyResult<(Sha256, String)> {
let mut rpc = self.create_rpc().await?;
rpc.call_raw("lsps2-policy-getchannelcapacity", params)
let res = rpc
.call_typed(&FundchannelRequest {
announce: Some(false),
close_to: None,
compact_lease: None,
feerate: None,
minconf: None,
mindepth: Some(0),
push_msat: None,
request_amt: None,
reserve: None,
channel_type: Some(vec![12, 46, 50]),
utxos: None,
amount: AmountOrAll::Amount(Amount::from_msat(amount.msat())),
id: peer_id.to_owned(),
})
.await
.map_err(anyhow::Error::new)
.with_context(|| "calling lsps2-policy-getchannelcapacity")
.with_context(|| "calling fundchannel")?;
Ok((res.channel_id, res.txid))
}
async fn cln_fundchannel(&self, params: &FundchannelRequest) -> AnyResult<FundchannelResponse> {
async fn is_channel_ready(&self, peer_id: &PublicKey, channel_id: &Sha256) -> AnyResult<bool> {
let mut rpc = self.create_rpc().await?;
rpc.call_typed(params)
let r = rpc
.call_typed(&ListpeerchannelsRequest {
id: Some(peer_id.to_owned()),
short_channel_id: None,
})
.await
.map_err(anyhow::Error::new)
.with_context(|| "calling fundchannel")
}
.with_context(|| "calling listpeerchannels")?;
async fn cln_listpeerchannels(
&self,
params: &ListpeerchannelsRequest,
) -> AnyResult<ListpeerchannelsResponse> {
let mut rpc = self.create_rpc().await?;
rpc.call_typed(params)
.await
.map_err(anyhow::Error::new)
.with_context(|| "calling listpeerchannels")
let chs = r
.channels
.iter()
.find(|&ch| ch.channel_id.is_some_and(|id| id == *channel_id));
if let Some(ch) = chs {
if ch.state == ChannelState::CHANNELD_NORMAL {
return Ok(true);
}
}
return Ok(false);
}
}
@@ -256,6 +264,17 @@ pub trait DatastoreProvider: Send + Sync {
async fn del_buy_request(&self, scid: &ShortChannelId) -> AnyResult<()>;
}
#[async_trait]
pub trait LightningProvider: Send + Sync {
async fn fund_jit_channel(
&self,
peer_id: &PublicKey,
amount: &Msat,
) -> AnyResult<(Sha256, String)>;
async fn is_channel_ready(&self, peer_id: &PublicKey, channel_id: &Sha256) -> AnyResult<bool>;
}
pub struct Lsps2ServiceHandler<A> {
pub api: Arc<A>,
pub promise_secret: [u8; 32],
@@ -397,7 +416,7 @@ impl<A> HtlcAcceptedHookHandler<A> {
}
}
impl<A: DatastoreProvider + Lsps2OfferProvider + ClnApi> HtlcAcceptedHookHandler<A> {
impl<A: DatastoreProvider + Lsps2OfferProvider + LightningProvider> HtlcAcceptedHookHandler<A> {
pub async fn handle(&self, req: HtlcAcceptedRequest) -> AnyResult<HtlcAcceptedResponse> {
let scid = match req.onion.short_channel_id {
Some(scid) => scid,
@@ -495,7 +514,7 @@ impl<A: DatastoreProvider + Lsps2OfferProvider + ClnApi> HtlcAcceptedHookHandler
};
let cap = match ch_cap_res.channel_capacity_msat {
Some(c) => c,
Some(c) => Msat::from_msat(c),
None => {
debug!("policy giver does not allow channel for scid {}", scid);
return Ok(HtlcAcceptedResponse::fail(
@@ -511,27 +530,9 @@ impl<A: DatastoreProvider + Lsps2OfferProvider + ClnApi> HtlcAcceptedHookHandler
// (amount_msat - opening fee) in the future.
// Fixme: Make this configurable, maybe return the whole request from
// the policy giver?
let fund_ch_req = FundchannelRequest {
announce: Some(false),
close_to: None,
compact_lease: None,
feerate: None,
minconf: None,
mindepth: Some(0),
push_msat: None,
request_amt: None,
reserve: None,
channel_type: Some(vec![12, 46, 50]),
utxos: None,
amount: AmountOrAll::Amount(Amount::from_msat(cap)),
id: ds_rec.peer_id,
};
let fund_ch_res = match self.api.cln_fundchannel(&fund_ch_req).await {
Ok(r) => r,
Err(e) => {
// Fixme: Retry to fund the channel.
warn!("could not fund jit channel for scid {}: {}", scid, e);
let channel_id = match self.api.fund_jit_channel(&ds_rec.peer_id, &cap).await {
Ok((channel_id, _)) => channel_id,
Err(_) => {
return Ok(HtlcAcceptedResponse::fail(
Some(UNKNOWN_NEXT_PEER.to_string()),
None,
@@ -543,31 +544,15 @@ impl<A: DatastoreProvider + Lsps2OfferProvider + ClnApi> HtlcAcceptedHookHandler
// Fixme: Use event to check for channel ready,
// Fixme: Check for htlc timeout if peer refuses to send "ready".
// Fixme: handle unexpected channel states.
let mut is_active = false;
while !is_active {
let ls_ch_req = ListpeerchannelsRequest {
id: Some(ds_rec.peer_id),
short_channel_id: None,
loop {
match self
.api
.is_channel_ready(&ds_rec.peer_id, &channel_id)
.await
{
Ok(true) => break,
Ok(false) | Err(_) => tokio::time::sleep(self.backoff_listpeerchannels).await,
};
let ls_ch_res = match self.api.cln_listpeerchannels(&ls_ch_req).await {
Ok(r) => r,
Err(e) => {
warn!("failed to fetch peer channels for scid {}: {}", scid, e);
tokio::time::sleep(self.backoff_listpeerchannels).await;
continue;
}
};
let chs = ls_ch_res
.channels
.iter()
.find(|&ch| ch.channel_id.is_some_and(|id| id == fund_ch_res.channel_id));
if let Some(ch) = chs {
debug!("jit channel for scid {} has state {:?}", scid, ch.state);
if ch.state == ChannelState::CHANNELD_NORMAL {
is_active = true;
}
}
tokio::time::sleep(self.backoff_listpeerchannels).await;
}
// G) We got a working channel, deduct fee and forward htlc.
@@ -586,7 +571,7 @@ impl<A: DatastoreProvider + Lsps2OfferProvider + ClnApi> HtlcAcceptedHookHandler
Ok(HtlcAcceptedResponse::continue_(
Some(payload_bytes),
Some(fund_ch_res.channel_id.as_byte_array().to_vec()),
Some(channel_id.as_byte_array().to_vec()),
Some(extra_tlvs_bytes),
))
}
@@ -688,11 +673,8 @@ mod tests {
};
use anyhow::bail;
use chrono::{TimeZone, Utc};
use cln_rpc::primitives::{Amount, PublicKey};
use cln_rpc::RpcError as ClnRpcError;
use cln_rpc::{
model::responses::ListpeerchannelsChannels,
primitives::{Amount, PublicKey, Sha256},
};
use std::sync::{Arc, Mutex};
const PUBKEY: [u8; 33] = [
@@ -740,130 +722,13 @@ mod tests {
store_buy_request_response: bool,
get_buy_request_response: Arc<Mutex<Option<DatastoreEntry>>>,
get_buy_request_error: Arc<Mutex<Option<anyhow::Error>>>,
cln_fundchannel_response: Arc<Mutex<Option<FundchannelResponse>>>,
cln_fundchannel_error: Arc<Mutex<Option<ClnRpcError>>>,
cln_listpeerchannels_response: Arc<Mutex<Option<ListpeerchannelsResponse>>>,
cln_listpeerchannels_error: Arc<Mutex<Option<ClnRpcError>>>,
fund_channel_error: Arc<Mutex<Option<anyhow::Error>>>,
fund_channel_response: Arc<Mutex<Option<(Sha256, String)>>>,
lsps2_getchannelcapacity_response:
Arc<Mutex<Option<Lsps2PolicyGetChannelCapacityResponse>>>,
lsps2_getchannelcapacity_error: Arc<Mutex<Option<ClnRpcError>>>,
}
#[async_trait]
impl ClnApi for FakeCln {
async fn lsps2_getchannelcapacity(
&self,
_params: &Lsps2PolicyGetChannelCapacityRequest,
) -> AnyResult<Lsps2PolicyGetChannelCapacityResponse> {
if let Some(err) = self.lsps2_getchannelcapacity_error.lock().unwrap().take() {
return Err(anyhow::Error::new(err).context("from fake api"));
}
if let Some(res) = self
.lsps2_getchannelcapacity_response
.lock()
.unwrap()
.take()
{
return Ok(res);
}
panic!("No lsps2 getchannelcapacity response defined");
}
async fn cln_fundchannel(
&self,
_params: &FundchannelRequest,
) -> AnyResult<FundchannelResponse> {
if let Some(err) = self.cln_fundchannel_error.lock().unwrap().take() {
return Err(anyhow::Error::new(err).context("from fake api"));
}
if let Some(res) = self.cln_fundchannel_response.lock().unwrap().take() {
return Ok(res);
}
panic!("No cln fundchannel response defined");
}
async fn cln_listpeerchannels(
&self,
_params: &ListpeerchannelsRequest,
) -> AnyResult<ListpeerchannelsResponse> {
if let Some(err) = self.cln_listpeerchannels_error.lock().unwrap().take() {
return Err(anyhow::Error::new(err).context("from fake api"));
}
if let Some(res) = self.cln_listpeerchannels_response.lock().unwrap().take() {
return Ok(res);
}
// Default: return a ready channel
let channel = ListpeerchannelsChannels {
channel_id: Some(*Sha256::from_bytes_ref(&[1u8; 32])),
state: ChannelState::CHANNELD_NORMAL,
peer_id: create_peer_id(),
peer_connected: true,
alias: None,
closer: None,
funding: None,
funding_outnum: None,
funding_txid: None,
htlcs: None,
in_offered_msat: None,
initial_feerate: None,
last_feerate: None,
last_stable_connection: None,
last_tx_fee_msat: None,
lost_state: None,
max_accepted_htlcs: None,
minimum_htlc_in_msat: None,
next_feerate: None,
next_fee_step: None,
out_fulfilled_msat: None,
out_offered_msat: None,
owner: None,
private: None,
receivable_msat: None,
reestablished: None,
scratch_txid: None,
short_channel_id: None,
spendable_msat: None,
status: None,
their_reserve_msat: None,
to_us_msat: None,
total_msat: None,
close_to: None,
close_to_addr: None,
direction: None,
dust_limit_msat: None,
fee_base_msat: None,
fee_proportional_millionths: None,
feerate: None,
ignore_fee_limits: None,
in_fulfilled_msat: None,
in_payments_fulfilled: None,
in_payments_offered: None,
max_to_us_msat: None,
maximum_htlc_out_msat: None,
min_to_us_msat: None,
minimum_htlc_out_msat: None,
our_max_htlc_value_in_flight_msat: None,
our_reserve_msat: None,
our_to_self_delay: None,
out_payments_fulfilled: None,
out_payments_offered: None,
their_max_htlc_value_in_flight_msat: None,
their_to_self_delay: None,
updates: None,
inflight: None,
#[allow(deprecated)]
max_total_htlc_in_msat: None,
opener: cln_rpc::primitives::ChannelSide::LOCAL,
};
Ok(ListpeerchannelsResponse {
channels: vec![channel],
})
}
}
#[async_trait]
impl Lsps2OfferProvider for FakeCln {
async fn get_offer(
@@ -942,6 +807,32 @@ mod tests {
}
}
#[async_trait]
impl LightningProvider for FakeCln {
async fn fund_jit_channel(
&self,
_peer_id: &PublicKey,
_amount: &Msat,
) -> AnyResult<(Sha256, String)> {
if let Some(err) = self.fund_channel_error.lock().unwrap().take() {
return Err(err);
}
if let Some(res) = self.fund_channel_response.lock().unwrap().take() {
return Ok(res);
} else {
bail!("request not found")
}
}
async fn is_channel_ready(
&self,
_peer_id: &PublicKey,
_channel_id: &Sha256,
) -> AnyResult<bool> {
Ok(true)
}
}
fn create_test_htlc_request(
scid: Option<ShortChannelId>,
amount_msat: u64,
@@ -1399,11 +1290,7 @@ mod tests {
channel_capacity_msat: Some(50_000_000),
});
*fake.cln_fundchannel_error.lock().unwrap() = Some(ClnRpcError {
code: Some(-1),
message: "insufficient funds".to_string(),
data: None,
});
*fake.fund_channel_error.lock().unwrap() = Some(anyhow::anyhow!("insufficient funds"));
let req = create_test_htlc_request(Some(scid), 10_000_000);
@@ -1434,15 +1321,8 @@ mod tests {
channel_capacity_msat: Some(50_000_000),
});
*fake.cln_fundchannel_response.lock().unwrap() = Some(FundchannelResponse {
channel_id: *Sha256::from_bytes_ref(&[1u8; 32]),
outnum: 0,
txid: String::default(),
channel_type: None,
close_to: None,
mindepth: None,
tx: String::default(),
});
*fake.fund_channel_response.lock().unwrap() =
Some((*Sha256::from_bytes_ref(&[1u8; 32]), String::default()));
let req = create_test_htlc_request(Some(scid), 10_000_000);