plugins: lsps: add peer_id to transport

Frankly, transport without a target doesn't make sense, so this commit
adds a PublicKey (from secp256k1) as the target to the Transport trait.
It can easily be replaced by a common PeerId without changing the api
for external implementations if needed in the future

Signed-off-by: Peter Neuroth <pet.v.ne@gmail.com>
This commit is contained in:
Peter Neuroth
2025-12-02 11:43:35 +01:00
committed by madelinevibes
parent 6e868443e2
commit 0f624ed67f
3 changed files with 80 additions and 23 deletions

View File

@@ -113,6 +113,8 @@ async fn on_lsps_lsps2_getinfo(
req.lsp_id, req.token
);
let lsp_id = PublicKey::from_str(&req.lsp_id).context("lsp_id is not a valid public key")?;
let dir = p.configuration().lightning_dir;
let rpc_path = Path::new(&dir).join(&p.configuration().rpc_file);
let mut cln_client = cln_rpc::ClnRpc::new(rpc_path.clone()).await?;
@@ -144,7 +146,7 @@ async fn on_lsps_lsps2_getinfo(
// 1. Call lsps2.get_info.
let info_req = Lsps2GetInfoRequest { token: req.token };
let info_res: Lsps2GetInfoResponse = client
.call_typed(info_req)
.call_typed(&lsp_id, info_req)
.await
.context("lsps2.get_info call failed")?
.into_result()?;
@@ -165,6 +167,8 @@ async fn on_lsps_lsps2_buy(
req.lsp_id, req.opening_fee_params, req.payment_size_msat
);
let lsp_id = PublicKey::from_str(&req.lsp_id).context("lsp_id is not a valid public key")?;
let dir = p.configuration().lightning_dir;
let rpc_path = Path::new(&dir).join(&p.configuration().rpc_file);
let mut cln_client = cln_rpc::ClnRpc::new(rpc_path.clone()).await?;
@@ -242,7 +246,7 @@ async fn on_lsps_lsps2_buy(
payment_size_msat: req.payment_size_msat,
};
let buy_res: Lsps2BuyResponse = client
.call_typed(buy_req)
.call_typed(&lsp_id, buy_req)
.await
.context("lsps2.buy call failed")?
.into_result()?;
@@ -705,6 +709,7 @@ async fn on_lsps_listprotocols(
let mut cln_client = cln_rpc::ClnRpc::new(rpc_path.clone()).await?;
let req: Request = serde_json::from_value(v).context("Failed to parse request JSON")?;
let lsp_id = PublicKey::from_str(&req.lsp_id).context("lsp_id is not a valid public key")?;
let lsp_status = check_peer_lsp_status(&mut cln_client, &req.lsp_id).await?;
// Fail early: Check that we are connected to the peer.
@@ -733,7 +738,7 @@ async fn on_lsps_listprotocols(
let request = Lsps0listProtocolsRequest {};
let res: Lsps0listProtocolsResponse = client
.call_typed(request)
.call_typed(&lsp_id, request)
.await
.context("lsps0.list_protocols call failed")?
.into_result()?;

View File

@@ -1,5 +1,6 @@
use crate::proto::jsonrpc::{JsonRpcRequest, JsonRpcResponse, RequestObject};
use async_trait::async_trait;
use bitcoin::secp256k1::PublicKey;
use core::fmt::Debug;
use log::{debug, error};
use rand::rngs::OsRng;
@@ -37,9 +38,9 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Implementors of this trait are responsible for actually sending the JSON-RPC
/// request over some transport mechanism (RPC, Bolt8, etc.)
#[async_trait]
pub trait Transport {
async fn send(&self, request: String) -> core::result::Result<String, Error>;
async fn notify(&self, request: String) -> core::result::Result<(), Error>;
pub trait Transport: Send + Sync {
async fn send(&self, peer: &PublicKey, request: String) -> core::result::Result<String, Error>;
async fn notify(&self, peer: &PublicKey, request: String) -> core::result::Result<(), Error>;
}
/// A typed JSON-RPC client that works with any transport implementation.
@@ -62,6 +63,7 @@ impl<T: Transport> JsonRpcClient<T> {
/// JSON result.
pub async fn call_raw(
&self,
peer_id: &PublicKey,
method: &str,
params: Option<Value>,
) -> Result<JsonRpcResponse<Value>> {
@@ -75,7 +77,8 @@ impl<T: Transport> JsonRpcClient<T> {
id: Some(id.clone().into()),
};
let response: JsonRpcResponse<Value> = self.send_request(method, &request, id).await?;
let response: JsonRpcResponse<Value> =
self.send_request(peer_id, method, &request, id).await?;
Ok(response)
}
@@ -84,7 +87,11 @@ impl<T: Transport> JsonRpcClient<T> {
///
/// This method provides type safety by using request and response types
/// that implement the necessary traits.
pub async fn call_typed<RQ, RS>(&self, request: RQ) -> Result<JsonRpcResponse<RS>>
pub async fn call_typed<RQ, RS>(
&self,
peer_id: &PublicKey,
request: RQ,
) -> Result<JsonRpcResponse<RS>>
where
RQ: JsonRpcRequest + Serialize + Send + Sync,
RS: DeserializeOwned + Serialize + Debug + Send + Sync,
@@ -94,12 +101,18 @@ impl<T: Transport> JsonRpcClient<T> {
debug!("Preparing request: method={}, id={}", method, id);
let request = request.into_request(Some(id.clone().into()));
let response: JsonRpcResponse<RS> = self.send_request(method, &request, id).await?;
let response: JsonRpcResponse<RS> =
self.send_request(peer_id, method, &request, id).await?;
Ok(response)
}
/// Sends a notification with raw JSON parameters (no response expected).
pub async fn notify_raw(&self, method: &str, params: Option<Value>) -> Result<()> {
pub async fn notify_raw(
&self,
peer_id: &PublicKey,
method: &str,
params: Option<Value>,
) -> Result<()> {
debug!("Preparing notification: method={}", method);
let request = RequestObject {
jsonrpc: "2.0".into(),
@@ -107,11 +120,11 @@ impl<T: Transport> JsonRpcClient<T> {
params,
id: None,
};
Ok(self.send_notification(method, &request).await?)
Ok(self.send_notification(peer_id, method, &request).await?)
}
/// Sends a typed notification (no response expected).
pub async fn notify_typed<RQ>(&self, request: RQ) -> Result<()>
pub async fn notify_typed<RQ>(&self, peer_id: &PublicKey, request: RQ) -> Result<()>
where
RQ: JsonRpcRequest + Serialize + Send + Sync,
{
@@ -119,11 +132,12 @@ impl<T: Transport> JsonRpcClient<T> {
debug!("Preparing notification: method={}", method);
let request = request.into_request(None);
Ok(self.send_notification(method, &request).await?)
Ok(self.send_notification(peer_id, method, &request).await?)
}
async fn send_request<RS, RP>(
&self,
peer: &PublicKey,
method: &str,
payload: &RP,
id: String,
@@ -138,7 +152,7 @@ impl<T: Transport> JsonRpcClient<T> {
method, id, &request_json
);
let start = tokio::time::Instant::now();
let res_str = self.transport.send(request_json).await?;
let res_str = self.transport.send(peer, request_json).await?;
let elapsed = start.elapsed();
debug!(
"Received response: method={}, id={}, response={}, elapsed={}ms",
@@ -150,14 +164,19 @@ impl<T: Transport> JsonRpcClient<T> {
Ok(serde_json::from_str(&res_str)?)
}
async fn send_notification<RP>(&self, method: &str, payload: &RP) -> Result<()>
async fn send_notification<RP>(
&self,
peer_id: &PublicKey,
method: &str,
payload: &RP,
) -> Result<()>
where
RP: Serialize + Send + Sync,
{
let request_json = serde_json::to_string(&payload)?;
debug!("Sending notification: method={}", method);
let start = tokio::time::Instant::now();
self.transport.notify(request_json).await?;
self.transport.notify(peer_id, request_json).await?;
let elapsed = start.elapsed();
debug!(
"Sent notification: method={}, elapsed={}ms",
@@ -194,6 +213,8 @@ fn generate_random_id() -> String {
#[cfg(test)]
mod test_json_rpc {
use std::str::FromStr as _;
use super::*;
use crate::proto::jsonrpc::RpcError;
use serde::Deserialize;
@@ -217,7 +238,11 @@ mod test_json_rpc {
#[async_trait]
impl Transport for TestTransport {
async fn send(&self, req: String) -> core::result::Result<String, Error> {
async fn send(
&self,
_peer_id: &PublicKey,
req: String,
) -> core::result::Result<String, Error> {
// Store the request
let _ = self.req.set(req);
@@ -234,7 +259,11 @@ mod test_json_rpc {
panic!("TestTransport: neither result nor error is set.");
}
async fn notify(&self, req: String) -> core::result::Result<(), Error> {
async fn notify(
&self,
_peer_id: &PublicKey,
req: String,
) -> core::result::Result<(), Error> {
// Store the request
let _ = self.req.set(req);
@@ -265,6 +294,11 @@ mod test_json_rpc {
#[tokio::test]
async fn test_typed_call_w_response() {
let peer_id = PublicKey::from_str(
"02a1633cafcc01ebfb6d78e39f687a1f0995c62fc95f51ead10a02ee0be551b5dc",
)
.unwrap();
let req = DummyCall {
foo: String::from("hello world!"),
bar: 13,
@@ -286,7 +320,7 @@ mod test_json_rpc {
let client_1 = JsonRpcClient::new(transport.clone());
let res = client_1
.call_typed::<_, DummyResponse>(req.clone())
.call_typed::<_, DummyResponse>(&peer_id, req.clone())
.await
.expect("Should have an OK result")
.expect("Should not be a JSON-RPC error");
@@ -312,6 +346,11 @@ mod test_json_rpc {
#[tokio::test]
async fn test_typed_call_w_rpc_error() {
let peer_id = PublicKey::from_str(
"02a1633cafcc01ebfb6d78e39f687a1f0995c62fc95f51ead10a02ee0be551b5dc",
)
.unwrap();
let req = DummyCall {
foo: "hello world!".into(),
bar: 13,
@@ -334,7 +373,7 @@ mod test_json_rpc {
let client_1 = JsonRpcClient::new(transport);
let res = client_1
.call_typed::<_, DummyResponse>(req)
.call_typed::<_, DummyResponse>(&peer_id, req)
.await
.expect("only inner rpc error")
.expect_err("expect rpc error");
@@ -343,6 +382,11 @@ mod test_json_rpc {
#[tokio::test]
async fn test_typed_call_w_internal_error() {
let peer_id = PublicKey::from_str(
"02a1633cafcc01ebfb6d78e39f687a1f0995c62fc95f51ead10a02ee0be551b5dc",
)
.unwrap();
let req = DummyCall {
foo: "hello world!".into(),
bar: 13,
@@ -356,7 +400,7 @@ mod test_json_rpc {
let client_1 = JsonRpcClient::new(transport);
let res = client_1
.call_typed::<_, DummyResponse>(req)
.call_typed::<_, DummyResponse>(&peer_id, req)
.await
.expect_err("Expected error response");
assert!(matches!(res, Error::Internal(..)));

View File

@@ -255,7 +255,11 @@ pub async fn send_custommsg(
#[async_trait]
impl Transport for Bolt8Transport {
/// Sends a JSON-RPC request and waits for a response.
async fn send(&self, request: String) -> core::result::Result<String, Error> {
async fn send(
&self,
_peer_id: &PublicKey,
request: String,
) -> core::result::Result<String, Error> {
let id = extract_message_id(&request)?;
let mut client = self.connect_to_node().await?;
@@ -294,7 +298,11 @@ impl Transport for Bolt8Transport {
}
/// Sends a notification without waiting for a response.
async fn notify(&self, request: String) -> core::result::Result<(), Error> {
async fn notify(
&self,
_peer_id: &PublicKey,
request: String,
) -> core::result::Result<(), Error> {
let mut client = self.connect_to_node().await?;
self.send_custom_msg(&mut client, request.into_bytes())
.await