From 4d9ed8e2fb95cc1580c4e9e64deeaa26e02880c6 Mon Sep 17 00:00:00 2001 From: Peter Neuroth Date: Wed, 2 Apr 2025 15:32:55 +0200 Subject: [PATCH] lsps: Implement the transport layer for lsps0 For lsps0 we send JSON-RPC messages via custom messages over the lightning network. This commit adds a basic implementation based on custom messages. --- Cargo.lock | 24 +- plugins/lsps-plugin/Cargo.toml | 4 +- plugins/lsps-plugin/src/jsonrpc/mod.rs | 2 + plugins/lsps-plugin/src/lib.rs | 1 + plugins/lsps-plugin/src/lsps0/mod.rs | 2 + plugins/lsps-plugin/src/lsps0/model.rs | 15 + plugins/lsps-plugin/src/lsps0/transport.rs | 533 +++++++++++++++++++++ 7 files changed, 559 insertions(+), 22 deletions(-) create mode 100644 plugins/lsps-plugin/src/lsps0/mod.rs create mode 100644 plugins/lsps-plugin/src/lsps0/model.rs create mode 100644 plugins/lsps-plugin/src/lsps0/transport.rs diff --git a/Cargo.lock b/Cargo.lock index 39f95d76d..a56ec5197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -419,8 +419,10 @@ dependencies = [ name = "cln-lsps" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", - "dashmap", + "cln-plugin", + "cln-rpc", "hex", "log", "rand 0.9.1", @@ -526,20 +528,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "data-encoding" version = "2.7.0" @@ -872,12 +860,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "hashbrown" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" - [[package]] name = "hashbrown" version = "0.15.2" diff --git a/plugins/lsps-plugin/Cargo.toml b/plugins/lsps-plugin/Cargo.toml index 8f7fd9248..012a3cc90 100644 --- a/plugins/lsps-plugin/Cargo.toml +++ b/plugins/lsps-plugin/Cargo.toml @@ -4,8 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0" async-trait = "0.1" -dashmap = "6.1" +cln-plugin = { version = "0.4", path = "../" } +cln-rpc = { version = "0.4", path = "../../cln-rpc" } hex = "0.4" log = "0.4" rand = "0.9" diff --git a/plugins/lsps-plugin/src/jsonrpc/mod.rs b/plugins/lsps-plugin/src/jsonrpc/mod.rs index ea2ac0745..f81926934 100644 --- a/plugins/lsps-plugin/src/jsonrpc/mod.rs +++ b/plugins/lsps-plugin/src/jsonrpc/mod.rs @@ -33,6 +33,8 @@ impl Error { #[derive(Error, Debug)] pub enum TransportError { + #[error("Timeout")] + Timeout, #[error("Other error: {0}")] Other(String), } diff --git a/plugins/lsps-plugin/src/lib.rs b/plugins/lsps-plugin/src/lib.rs index 0910ab014..e61d9faae 100644 --- a/plugins/lsps-plugin/src/lib.rs +++ b/plugins/lsps-plugin/src/lib.rs @@ -1 +1,2 @@ mod jsonrpc; +mod lsps0; diff --git a/plugins/lsps-plugin/src/lsps0/mod.rs b/plugins/lsps-plugin/src/lsps0/mod.rs new file mode 100644 index 000000000..e7716c921 --- /dev/null +++ b/plugins/lsps-plugin/src/lsps0/mod.rs @@ -0,0 +1,2 @@ +pub mod model; +pub mod transport; diff --git a/plugins/lsps-plugin/src/lsps0/model.rs b/plugins/lsps-plugin/src/lsps0/model.rs new file mode 100644 index 000000000..0327120e0 --- /dev/null +++ b/plugins/lsps-plugin/src/lsps0/model.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; + +use crate::jsonrpc::JsonRpcRequest; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Lsps0listProtocolsRequest {} + +impl JsonRpcRequest for Lsps0listProtocolsRequest { + const METHOD: &'static str = "lsps0.list_protocols"; +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Lsps0listProtocolsResponse { + pub protocols: Vec, +} diff --git a/plugins/lsps-plugin/src/lsps0/transport.rs b/plugins/lsps-plugin/src/lsps0/transport.rs new file mode 100644 index 000000000..b19974e77 --- /dev/null +++ b/plugins/lsps-plugin/src/lsps0/transport.rs @@ -0,0 +1,533 @@ +use crate::jsonrpc::{client::Transport, Error, TransportError}; +use async_trait::async_trait; +use cln_plugin::Plugin; +use cln_rpc::{primitives::PublicKey, ClnRpc}; +use log::{debug, error, trace}; +use serde::{de::Visitor, Deserialize, Serialize}; +use std::{ + collections::HashMap, + path::PathBuf, + str::FromStr, + sync::{Arc, Weak}, +}; +use tokio::{ + sync::{mpsc, RwLock}, + time::Duration, +}; + +pub const LSPS0_MESSAGE_TYPE: u16 = 37913; +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); + +/// Trait that must be implemented by plugin state to access the custom message hook manager. +/// +/// This trait allows the hook handler to access the custom message hook manager +/// from the plugin state, enabling proper message routing. +pub trait WithCustomMessageHookManager { + fn get_custommsg_hook_manager(&self) -> &CustomMessageHookManager; +} + +// Manages subscriptions for the custom message hook. +/// +/// The `CustomMessageHookManager` is responsible for: +/// 1. Maintaining a registry of message ID to receiver mappings +/// 2. Processing incoming LSPS0 messages and routing them to subscribers +/// 3. Cleaning up expired subscriptions +/// +/// It uses weak references to avoid memory leaks when timeouts occ +#[derive(Clone)] +pub struct CustomMessageHookManager { + /// Maps message IDs to weak references of response channels + subs: Arc>>>>, +} + +impl CustomMessageHookManager { + /// Creates a new CustomMessageHookManager. + pub fn new() -> Self { + Self { + subs: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Subscribes to receive a message with a specific ID. + /// + /// Registers a weak reference to a channel that will receive the message + /// when it arrives. Using weak references allows for automatic cleanup if + /// the receiver is dropped due to timeout. + async fn subscribe_hook_once>( + &self, + id: I, + channel: Weak>, + ) { + let id = id.into(); + trace!("Subscribe to custom message hook for message id={}", id); + let mut sub_lock = self.subs.write().await; + sub_lock.insert(id, channel); + } + + /// Processes an incoming LSP message. + /// + /// Extracts the message ID from the payload, finds the corresponding + /// subscriber, and forwards the message to them if found. + async fn process_lsp_message(&self, payload: CustomMsg, peer_id: &str) -> bool { + // Convert the binary payload to a string + let lsps_msg_string = match String::from_utf8(payload.payload.clone()) { + Ok(v) => v, + Err(e) => { + error!("Failed to deserialize custommsg payload from {peer_id}: {e}"); + return false; + } + }; + + let id = match extract_message_id(&lsps_msg_string) { + Ok(v) => v, + Err(e) => { + error!("Failed to get id from lsps message from {peer_id}: {e}"); + return false; + } + }; + + let mut subs_lock = self.subs.write().await; + // Clean up any expired subscriptions + subs_lock.retain(|_, v| Weak::strong_count(v) > 0); + subs_lock.shrink_to_fit(); + + // Find send to, and remove the subscriber for this message ID + if let Some(tx) = subs_lock.remove(&id).map(|v| v.upgrade()).flatten() { + if let Err(e) = tx.send(payload).await { + error!("Failed to send custommsg to subscriber for id={}: {e}", id); + return false; + } + return true; + } + + debug!( + "No subscriber found for message with id={} from {peer_id}", + id + ); + false + } + + /// Handles the custommsg hook from Core Lightning. + /// + /// This method should be registered as a hook handler in a Core Lightning + /// plugin. It processes incoming custom messages and routes LSPS0 messages + /// to the appropriate subscribers. + pub async fn on_custommsg( + p: Plugin, + v: serde_json::Value, + ) -> Result + where + S: Clone + Sync + Send + 'static + WithCustomMessageHookManager, + { + // Default response is to continue processing. + let continue_response = Ok(serde_json::json!({ + "result": "continue" + })); + + // Parse the custom message hook return value. + let custommsg: CustomMsgHookReturn = match serde_json::from_value(v) { + Ok(v) => v, + Err(e) => { + error!("Failed to deserialize custommsg: {e}"); + return continue_response; + } + }; + + // Only process LSPS0 message types. + if custommsg.payload.message_type != LSPS0_MESSAGE_TYPE { + debug!( + "Custommsg is not of type LSPS0 (got {}), skipping", + custommsg.payload.message_type + ); + return continue_response; + } + + // Route the message to the appropriate handler. + // Can be moved into a separate task via tokio::spawn if needed; + let hook_watcher = p.state().get_custommsg_hook_manager(); + hook_watcher + .process_lsp_message(custommsg.payload, &custommsg.peer_id) + .await; + return continue_response; + } +} + +/// Transport implementation for JSON-RPC over Lightning Network using BOLT8 +/// and BOLT1 custom messages. +/// +/// The `Bolt8Transport` allows JSON-RPC requests to be transmitted as custom +/// messages between Lightning Network nodes. It uses Core Lightning's +/// `sendcustommsg` RPC call to send messages and the `custommsg` hook to +/// receive responses. +#[derive(Clone)] +pub struct Bolt8Transport { + /// The node ID of the destination node. + endpoint: cln_rpc::primitives::PublicKey, + /// Path to the Core Lightning RPC socket. + rpc_path: PathBuf, + /// Timeout for requests. + request_timeout: Duration, + /// Hook manager for routing messages. + hook_watcher: CustomMessageHookManager, +} + +impl Bolt8Transport { + /// Creates a new Bolt8Transport. + /// + /// # Arguments + /// * `endpoint` - Node ID of the destination node as a hex string + /// * `rpc_path` - Path to the Core Lightning socket + /// * `hook_watcher` - Hook manager to use for message routing + /// * `timeout` - Optional timeout for requests (defaults to DEFAULT_TIMEOUT) + /// + /// # Returns + /// A new `Bolt8Transport` instance or an error if the node ID is invalid + pub fn new( + endpoint: &str, + rpc_path: PathBuf, + hook_watcher: CustomMessageHookManager, + timeout: Option, + ) -> Result { + let endpoint = cln_rpc::primitives::PublicKey::from_str(endpoint) + .map_err(|e| TransportError::Other(e.to_string()))?; + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); + Ok(Self { + endpoint, + rpc_path, + request_timeout: timeout, + hook_watcher, + }) + } + + /// Connects to the Core Lightning node. + async fn connect_to_node(&self) -> Result { + ClnRpc::new(&self.rpc_path) + .await + .map_err(|e| Error::Transport(TransportError::Other(e.to_string()))) + } + + /// Sends a custom message to the destination node. + async fn send_custom_msg(&self, client: &mut ClnRpc, payload: Vec) -> Result<(), Error> { + send_custommsg(client, payload, self.endpoint).await + } + + /// Waits for a response with timeout. + async fn wait_for_response( + &self, + mut rx: mpsc::Receiver, + ) -> Result { + tokio::time::timeout(self.request_timeout, rx.recv()) + .await + .map_err(|_| Error::Transport(TransportError::Timeout))? + .ok_or(Error::Transport(TransportError::Other(String::from( + "Channel unexpectedly closed", + )))) + } +} + +/// Sends a custom message to the destination node. +pub async fn send_custommsg( + client: &mut ClnRpc, + payload: Vec, + peer: PublicKey, +) -> Result<(), Error> { + let msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload, + }; + + let request = cln_rpc::model::requests::SendcustommsgRequest { + msg: msg.to_string(), + node_id: peer, + }; + + client + .call_typed(&request) + .await + .map_err(|e| { + Error::Transport(TransportError::Other(format!( + "Failed to send custom msg: {e}" + ))) + }) + .map(|r| { + trace!("Successfully queued custom msg: {}", r.status); + () + }) +} + +#[async_trait] +impl Transport for Bolt8Transport { + /// Sends a JSON-RPC request and waits for a response. + async fn send(&self, request: String) -> core::result::Result { + let id = extract_message_id(&request)?; + let mut client = self.connect_to_node().await?; + + let (tx, rx) = mpsc::channel(1); + trace!( + "Subscribing to custom msg hook manager for request id={}", + id + ); + + // Create a strong reference that will be dropped after timeout. + let tx_arc = Arc::new(tx); + + self.hook_watcher + .subscribe_hook_once(id, Arc::downgrade(&tx_arc)) + .await; + self.send_custom_msg(&mut client, request.into_bytes()) + .await?; + + let res = self.wait_for_response(rx).await?; + + if res.message_type != LSPS0_MESSAGE_TYPE { + return Err(Error::Transport(TransportError::Other(format!( + "unexpected response message type: expected {}, got {}", + LSPS0_MESSAGE_TYPE, res.message_type + )))); + } + + core::str::from_utf8(&res.payload) + .map_err(|e| { + Error::Transport(TransportError::Other(format!( + "failed to decode msg payload {:?}: {}", + res.payload, e + ))) + }) + .map(|s| s.into()) + } + + /// Sends a notification without waiting for a response. + async fn notify(&self, request: String) -> core::result::Result<(), Error> { + let mut client = self.connect_to_node().await?; + self.send_custom_msg(&mut client, request.into_bytes()) + .await + } +} + +// Extracts the message ID from a JSON-RPC message. +fn extract_message_id(msg: &str) -> core::result::Result { + let id_only: IdOnly = serde_json::from_str(msg)?; + Ok(id_only.id) +} + +/// Represents a custom message with type and payload. +#[derive(Clone, Debug, PartialEq)] +pub struct CustomMsg { + pub message_type: u16, + pub payload: Vec, +} + +impl core::fmt::Display for CustomMsg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut bytes = Vec::with_capacity(2 + self.payload.len()); + bytes.extend_from_slice(&self.message_type.to_be_bytes()); + bytes.extend_from_slice(&self.payload); + write!(f, "{}", hex::encode(bytes)) + } +} + +impl FromStr for CustomMsg { + type Err = Error; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(Error::other)?; + + if bytes.len() < 2 { + return Err(Error::other( + "hex string too short to contain a valid message_type", + )); + } + + let message_type_bytes: [u8; 2] = bytes[..2].try_into().map_err(Error::other)?; + let message_type = u16::from_be_bytes(message_type_bytes); + let payload = bytes[2..].to_owned(); + Ok(CustomMsg { + message_type, + payload, + }) + } +} + +impl Serialize for CustomMsg { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +/// Visitor for deserializing CustomMsg from strings. +struct CustomMsgVisitor; + +impl<'de> Visitor<'de> for CustomMsgVisitor { + type Value = CustomMsg; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a hex string representing a CustomMsg") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + CustomMsg::from_str(v).map_err(E::custom) + } +} + +impl<'de> Deserialize<'de> for CustomMsg { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_str(CustomMsgVisitor) + } +} + +/// Struct to extract just the ID from a JSON-RPC message. +#[derive(Clone, Debug, Serialize, Deserialize)] +struct IdOnly { + id: String, +} + +/// Return type from custommsg hook. +#[derive(Clone, Debug, Serialize, Deserialize)] +struct CustomMsgHookReturn { + peer_id: String, + payload: CustomMsg, +} + +#[cfg(test)] +mod test_transport { + use super::*; + use serde_json::json; + + // Helper to create a test JSON-RPC request + fn create_test_request(id: &str) -> String { + serde_json::to_string(&json!({ + "jsonrpc": "2.0", + "method": "test_method", + "params": {"test": "value"}, + "id": id + })) + .unwrap() + } + + #[tokio::test] + async fn test_deserialize_custommsg() { + let hex_str = r#"94197b226a736f6e727063223a22322e30222c226d6574686f64223a226c737073302e6c6973745f70726f746f636f6c73222c22706172616d73223a7b7d2c226964223a226135633665613536366333383038313936346263227d"#; + let msg = CustomMsg::from_str(hex_str).unwrap(); + assert_eq!(msg.message_type, LSPS0_MESSAGE_TYPE); + } + + #[tokio::test] + async fn test_extract_message_id() { + // Test with string ID + let request = create_test_request("test-id-123"); + let id = extract_message_id(&request).unwrap(); + assert_eq!(id, "test-id-123"); + } + + #[tokio::test] + async fn custom_msg_serialization() { + let original = CustomMsg { + message_type: 0x1234, + payload: b"test payload".to_vec(), + }; + + // Test to_string and parsing from that string + let serialized = original.to_string(); + + // Convert hex to bytes + let bytes = hex::decode(&serialized).unwrap(); + + // Verify structure + assert_eq!(bytes[0], 0x12); + assert_eq!(bytes[1], 0x34); + assert_eq!(&bytes[2..], b"test payload"); + + // Test deserialization + let deserialized: CustomMsg = + serde_json::from_str(&serde_json::to_string(&serialized).unwrap()).unwrap(); + + assert_eq!(deserialized.message_type, original.message_type); + assert_eq!(deserialized.payload, original.payload); + } + + #[tokio::test] + async fn hook_manager_subscribe_and_process() { + let hook_manager = CustomMessageHookManager::new(); + + // Create test message + let test_id = "test-id-456"; + let test_request = create_test_request(test_id); + let test_msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload: test_request.as_bytes().to_vec(), + }; + + // Set up a subscription + let (tx, mut rx) = mpsc::channel(1); + let tx_arc = Arc::new(tx); + hook_manager + .subscribe_hook_once(test_id, Arc::downgrade(&tx_arc)) + .await; + + // Process the message + let processed = hook_manager + .process_lsp_message(test_msg.clone(), "peer123") + .await; + assert!(processed); + + // Verify the received message + let received_msg = rx.recv().await.unwrap(); + assert_eq!(received_msg.message_type, LSPS0_MESSAGE_TYPE); + assert_eq!(received_msg.payload, test_request.as_bytes()); + } + + #[tokio::test] + async fn hook_manager_no_subscriber() { + let hook_manager = CustomMessageHookManager::new(); + + // Create test message with ID that has no subscriber + let test_request = create_test_request("unknown-id"); + let test_msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload: test_request.as_bytes().to_vec(), + }; + + // Process the message + let processed = hook_manager.process_lsp_message(test_msg, "peer123").await; + assert!(!processed); + } + + #[tokio::test] + async fn hook_manager_clean_up_after_timeout() { + let hook_manager = CustomMessageHookManager::new(); + + // Create test message + let test_id = "test-id-456"; + let test_request = create_test_request(test_id); + let test_msg = CustomMsg { + message_type: LSPS0_MESSAGE_TYPE, + payload: test_request.as_bytes().to_vec(), + }; + + // Set up a subscription + let (tx, _rx) = mpsc::channel(1); + let tx_arc = Arc::new(tx); + hook_manager + .subscribe_hook_once(test_id, Arc::downgrade(&tx_arc)) + .await; + + // drop the reference pointer here to simulate a timeout. + drop(tx_arc); + + // Should not process as the reference has been dropped. + let processed = hook_manager + .process_lsp_message(test_msg.clone(), "peer123") + .await; + assert!(!processed); + assert!(hook_manager.subs.read().await.is_empty()); + } +}