diff --git a/plugins/lsps-plugin/src/cln_adapters/hooks.rs b/plugins/lsps-plugin/src/cln_adapters/hooks.rs index 8475281f8..4cb647852 100644 --- a/plugins/lsps-plugin/src/cln_adapters/hooks.rs +++ b/plugins/lsps-plugin/src/cln_adapters/hooks.rs @@ -1,32 +1,101 @@ -use crate::cln_adapters::{ - state::ClientState, - utils::{decode_lsps0_frame_hex, extract_message_id}, +use crate::{ + cln_adapters::state::{ClientState, ServiceState}, + core::{router::RequestContext, transport::MessageSender as _}, + proto::lsps0, }; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; use cln_plugin::Plugin; +use serde::Deserialize; use serde_json::Value; -/// Client hook - thin wrapper -pub async fn client_custommsg_hook(plugin: Plugin, v: Value) -> Result +pub async fn client_custommsg_hook(plugin: Plugin, v: Value) -> Result where S: Clone + Sync + Send + 'static + ClientState, { - let payload_hex = v["payload"].as_str().unwrap(); - - // LSPS0 Bolt8 transport frame needs to be decoded. - let payload = match decode_lsps0_frame_hex(payload_hex) { - Some(p) => p, - None => { - return Ok(serde_json::json!({ - "result": "continue" - })) - } + let Some(hook) = CustomMsgHook::parse(v) else { + return Ok(serde_json::json!({ + "result": "continue" + })); }; - if let Some(id) = extract_message_id(&payload) { - plugin.state().pending().complete(&id, payload).await; + if let Some(id) = extract_message_id(&hook.payload) { + plugin.state().pending().complete(&id, hook.payload).await; } return Ok(serde_json::json!({ "result": "continue" })); } + +pub async fn service_custommsg_hook(plugin: Plugin, v: Value) -> Result +where + S: Clone + Sync + Send + 'static + ServiceState, +{ + let Some(hook) = CustomMsgHook::parse(v) else { + return Ok(serde_json::json!({ + "result": "continue" + })); + }; + let service = plugin.state().service(); + let ctx = RequestContext { + peer_id: hook.peer_id, + }; + let res = service.handle(&ctx, &hook.payload).await; + if let Some(payload) = res { + let sender = plugin.state().sender().clone(); + if let Err(e) = sender.send(&hook.peer_id, &payload).await { + log::error!("Failed to send LSPS response to {}: {}", &hook.peer_id, e); + }; + } + + Ok(serde_json::json!({ + "result": "continue" + })) +} + +#[derive(Debug, Deserialize)] +struct CustomMsgHookRaw { + peer_id: PublicKey, + payload: String, +} + +/// Parsed and validated hook data +pub struct CustomMsgHook { + pub peer_id: PublicKey, + pub payload: Vec, +} + +impl CustomMsgHook { + /// Parse and validate everything upfront + pub fn parse(v: Value) -> Option { + let raw: CustomMsgHookRaw = serde_json::from_value(v).ok()?; + let peer_id = raw.peer_id; + let payload = decode_lsps0_frame_hex(&raw.payload)?; + Some(Self { peer_id, payload }) + } +} + +fn decode_lsps0_frame_hex(hex_str: &str) -> Option> { + let frame = match hex::decode(hex_str) { + Ok(f) => f, + Err(e) => { + log::error!( + "Failed to decode hex string payload from custom message: {}", + e + ); + return None; + } + }; + lsps0::decode_frame(&frame).ok().map(|d| d.to_owned()) +} + +fn extract_message_id(payload: &[u8]) -> Option { + #[derive(Deserialize)] + struct IdOnly { + id: Option, + } + + let parsed: IdOnly = serde_json::from_slice(payload).ok()?; + parsed.id +} diff --git a/plugins/lsps-plugin/src/cln_adapters/mod.rs b/plugins/lsps-plugin/src/cln_adapters/mod.rs index ea60b21d0..1ff4d2f3f 100644 --- a/plugins/lsps-plugin/src/cln_adapters/mod.rs +++ b/plugins/lsps-plugin/src/cln_adapters/mod.rs @@ -1,5 +1,3 @@ pub mod hooks; pub mod sender; -pub mod service; pub mod state; -pub mod utils; diff --git a/plugins/lsps-plugin/src/cln_adapters/sender.rs b/plugins/lsps-plugin/src/cln_adapters/sender.rs index 236412b0b..39a73acd6 100644 --- a/plugins/lsps-plugin/src/cln_adapters/sender.rs +++ b/plugins/lsps-plugin/src/cln_adapters/sender.rs @@ -1,6 +1,6 @@ use crate::{ - cln_adapters::utils::encode_lsps0_frame_hex, core::transport::{Error as TransportError, MessageSender}, + proto::lsps0, }; use async_trait::async_trait; use bitcoin::secp256k1::PublicKey; @@ -38,3 +38,8 @@ impl MessageSender for ClnSender { Ok(()) } } + +fn encode_lsps0_frame_hex(payload: &[u8]) -> String { + let frame = lsps0::encode_frame(payload); + hex::encode(&frame) +} diff --git a/plugins/lsps-plugin/src/cln_adapters/service.rs b/plugins/lsps-plugin/src/cln_adapters/service.rs deleted file mode 100644 index 36f9f5042..000000000 --- a/plugins/lsps-plugin/src/cln_adapters/service.rs +++ /dev/null @@ -1,96 +0,0 @@ -use crate::{ - core::{router::RequestContext, server::LspsService}, - proto::lsps0::{decode_frame, encode_frame}, -}; -use anyhow::Result; -use bitcoin::secp256k1::PublicKey; -use cln_plugin::Plugin; -use cln_rpc::model::requests::SendcustommsgRequest; -use serde::Deserialize; -use serde_json::Value; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; - -pub trait ServiceStore { - fn service(&self) -> Arc; -} - -#[derive(Debug, Clone, Deserialize)] -struct CustomMsg { - peer_id: PublicKey, - payload: String, -} - -fn rpc_path(p: &Plugin) -> PathBuf -where - S: Clone + Sync + Send + 'static, -{ - let dir = p.configuration().lightning_dir; - Path::new(&dir).join(&p.configuration().rpc_file) -} - -async fn send_custommsg

(rpc_path: P, peer: &PublicKey, msg: &str) -> Result<()> -where - P: AsRef, -{ - let mut client = cln_rpc::ClnRpc::new(rpc_path).await?; - let _ = client - .call_typed(&SendcustommsgRequest { - msg: msg.to_owned(), - node_id: peer.to_owned(), - }) - .await?; - Ok(()) -} - -pub async fn on_custommsg_service(plugin: Plugin, v: Value) -> Result -where - S: Clone + Sync + Send + 'static + ServiceStore, -{ - let msg: CustomMsg = serde_json::from_value(v)?; - let req = match decode_lsps0_frame_hex(&msg.payload) { - Some(d) => d, - None => { - return Ok(serde_json::json!({ - "result": "continue" - })) - } - }; - let service = plugin.state().service(); - let rpc_path = rpc_path(&plugin); - let ctx = RequestContext { - peer_id: msg.peer_id, - }; - let res = service.handle(&ctx, &req).await; - if let Some(payload) = res { - let payload = encode_lsps0_frame_hex(&payload); - if let Err(e) = send_custommsg(&rpc_path, &msg.peer_id, &payload).await { - log::error!("Failed to send LSPS response to {}: {}", &msg.peer_id, e); - }; - } - - Ok(serde_json::json!({ - "result": "continue" - })) -} - -fn decode_lsps0_frame_hex(hex_str: &str) -> Option> { - let frame = match hex::decode(hex_str) { - Ok(f) => f, - Err(e) => { - log::error!( - "Failed to decode hex string payload from custom message: {}", - e - ); - return None; - } - }; - decode_frame(&frame).ok().map(|d| d.to_owned()) -} - -fn encode_lsps0_frame_hex(payload: &[u8]) -> String { - let frame = encode_frame(payload); - hex::encode(&frame) -} diff --git a/plugins/lsps-plugin/src/cln_adapters/state.rs b/plugins/lsps-plugin/src/cln_adapters/state.rs index 8ea80fc6c..4ba564a46 100644 --- a/plugins/lsps-plugin/src/cln_adapters/state.rs +++ b/plugins/lsps-plugin/src/cln_adapters/state.rs @@ -1,9 +1,19 @@ +use std::sync::Arc; + use crate::{ cln_adapters::sender::ClnSender, - core::transport::{MultiplexedTransport, PendingRequests}, + core::{ + server::LspsService, + transport::{MultiplexedTransport, PendingRequests}, + }, }; pub trait ClientState { fn transport(&self) -> MultiplexedTransport; fn pending(&self) -> &PendingRequests; } + +pub trait ServiceState { + fn service(&self) -> Arc; + fn sender(&self) -> ClnSender; +} diff --git a/plugins/lsps-plugin/src/cln_adapters/utils.rs b/plugins/lsps-plugin/src/cln_adapters/utils.rs deleted file mode 100644 index fb506ea6d..000000000 --- a/plugins/lsps-plugin/src/cln_adapters/utils.rs +++ /dev/null @@ -1,11 +0,0 @@ -use serde::Deserialize; - -pub fn extract_message_id(payload: &[u8]) -> Option { - #[derive(Deserialize)] - struct IdOnly { - id: Option, - } - - let parsed: IdOnly = serde_json::from_slice(payload).ok()?; - parsed.id -} diff --git a/plugins/lsps-plugin/src/service.rs b/plugins/lsps-plugin/src/service.rs index c87cddc56..384e44512 100644 --- a/plugins/lsps-plugin/src/service.rs +++ b/plugins/lsps-plugin/src/service.rs @@ -1,6 +1,6 @@ use anyhow::bail; use cln_lsps::{ - cln_adapters::service::{on_custommsg_service, ServiceStore}, + cln_adapters::{hooks::service_custommsg_hook, sender::ClnSender, state::ServiceState}, core::{ lsps2::handler::{ClnApiRpc, HtlcAcceptedHookHandler, Lsps2ServiceHandler}, server::LspsService, @@ -11,19 +11,38 @@ use cln_lsps::{ }, }; use cln_plugin::Plugin; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; #[derive(Clone)] struct State { lsps_service: Arc, + sender: ClnSender, lsps2_enabled: bool, } -impl ServiceStore for State { +impl State { + pub fn new(rpc_path: PathBuf, promise_secret: &[u8; 32]) -> Self { + let api = Arc::new(ClnApiRpc::new(rpc_path.clone())); + let sender = ClnSender::new(rpc_path); + let lsps2_handler = Arc::new(Lsps2ServiceHandler::new(api, promise_secret)); + let lsps_service = Arc::new(LspsService::builder().with_protocol(lsps2_handler).build()); + Self { + lsps_service, + sender, + lsps2_enabled: true, + } + } +} + +impl ServiceState for State { fn service(&self) -> Arc { self.lsps_service.clone() } + + fn sender(&self) -> cln_lsps::cln_adapters::sender::ClnSender { + self.sender.clone() + } } #[tokio::main] @@ -43,7 +62,7 @@ async fn main() -> Result<(), anyhow::Error> { // cln_plugin::FeatureBitsKind::Init, // util::feature_bit_to_hex(LSP_FEATURE_BIT), // ) - .hook("custommsg", on_custommsg_service) + .hook("custommsg", service_custommsg_hook) .hook("htlc_accepted", on_htlc_accepted) .configure() .await? @@ -80,17 +99,7 @@ async fn main() -> Result<(), anyhow::Error> { } }; - let cln_api_rpc = ClnApiRpc::new(rpc_path); - let lsps2_handler = - Arc::new(Lsps2ServiceHandler::new(Arc::new(cln_api_rpc), &secret)); - - let lsps_service_builder = LspsService::builder(); - let lsps_service = lsps_service_builder.with_protocol(lsps2_handler).build(); - - let state = State { - lsps_service: Arc::new(lsps_service), - lsps2_enabled: true, - }; + let state = State::new(rpc_path, &secret); let plugin = plugin.start(state).await?; plugin.join().await } else {