plugins: lsps: refactor service hook

We have some shared behavior and can use the MessageSender for the
service hook as well

Signed-off-by: Peter Neuroth <pet.v.ne@gmail.com>
This commit is contained in:
Peter Neuroth
2025-12-05 15:55:03 +01:00
committed by madelinevibes
parent 462ca844dd
commit e50f5ee863
7 changed files with 127 additions and 143 deletions

View File

@@ -1,32 +1,101 @@
use crate::cln_adapters::{
state::ClientState,
utils::{decode_lsps0_frame_hex, extract_message_id},
use crate::{
cln_adapters::state::{ClientState, ServiceState},
core::{router::RequestContext, transport::MessageSender as _},
proto::lsps0,
};
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use cln_plugin::Plugin;
use serde::Deserialize;
use serde_json::Value;
/// Client hook - thin wrapper
pub async fn client_custommsg_hook<S>(plugin: Plugin<S>, v: Value) -> Result<Value, anyhow::Error>
pub async fn client_custommsg_hook<S>(plugin: Plugin<S>, v: Value) -> Result<Value>
where
S: Clone + Sync + Send + 'static + ClientState,
{
let payload_hex = v["payload"].as_str().unwrap();
// LSPS0 Bolt8 transport frame needs to be decoded.
let payload = match decode_lsps0_frame_hex(payload_hex) {
Some(p) => p,
None => {
return Ok(serde_json::json!({
"result": "continue"
}))
}
let Some(hook) = CustomMsgHook::parse(v) else {
return Ok(serde_json::json!({
"result": "continue"
}));
};
if let Some(id) = extract_message_id(&payload) {
plugin.state().pending().complete(&id, payload).await;
if let Some(id) = extract_message_id(&hook.payload) {
plugin.state().pending().complete(&id, hook.payload).await;
}
return Ok(serde_json::json!({
"result": "continue"
}));
}
pub async fn service_custommsg_hook<S>(plugin: Plugin<S>, v: Value) -> Result<Value>
where
S: Clone + Sync + Send + 'static + ServiceState,
{
let Some(hook) = CustomMsgHook::parse(v) else {
return Ok(serde_json::json!({
"result": "continue"
}));
};
let service = plugin.state().service();
let ctx = RequestContext {
peer_id: hook.peer_id,
};
let res = service.handle(&ctx, &hook.payload).await;
if let Some(payload) = res {
let sender = plugin.state().sender().clone();
if let Err(e) = sender.send(&hook.peer_id, &payload).await {
log::error!("Failed to send LSPS response to {}: {}", &hook.peer_id, e);
};
}
Ok(serde_json::json!({
"result": "continue"
}))
}
#[derive(Debug, Deserialize)]
struct CustomMsgHookRaw {
peer_id: PublicKey,
payload: String,
}
/// Parsed and validated hook data
pub struct CustomMsgHook {
pub peer_id: PublicKey,
pub payload: Vec<u8>,
}
impl CustomMsgHook {
/// Parse and validate everything upfront
pub fn parse(v: Value) -> Option<Self> {
let raw: CustomMsgHookRaw = serde_json::from_value(v).ok()?;
let peer_id = raw.peer_id;
let payload = decode_lsps0_frame_hex(&raw.payload)?;
Some(Self { peer_id, payload })
}
}
fn decode_lsps0_frame_hex(hex_str: &str) -> Option<Vec<u8>> {
let frame = match hex::decode(hex_str) {
Ok(f) => f,
Err(e) => {
log::error!(
"Failed to decode hex string payload from custom message: {}",
e
);
return None;
}
};
lsps0::decode_frame(&frame).ok().map(|d| d.to_owned())
}
fn extract_message_id(payload: &[u8]) -> Option<String> {
#[derive(Deserialize)]
struct IdOnly {
id: Option<String>,
}
let parsed: IdOnly = serde_json::from_slice(payload).ok()?;
parsed.id
}

View File

@@ -1,5 +1,3 @@
pub mod hooks;
pub mod sender;
pub mod service;
pub mod state;
pub mod utils;

View File

@@ -1,6 +1,6 @@
use crate::{
cln_adapters::utils::encode_lsps0_frame_hex,
core::transport::{Error as TransportError, MessageSender},
proto::lsps0,
};
use async_trait::async_trait;
use bitcoin::secp256k1::PublicKey;
@@ -38,3 +38,8 @@ impl MessageSender for ClnSender {
Ok(())
}
}
fn encode_lsps0_frame_hex(payload: &[u8]) -> String {
let frame = lsps0::encode_frame(payload);
hex::encode(&frame)
}

View File

@@ -1,96 +0,0 @@
use crate::{
core::{router::RequestContext, server::LspsService},
proto::lsps0::{decode_frame, encode_frame},
};
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use cln_plugin::Plugin;
use cln_rpc::model::requests::SendcustommsgRequest;
use serde::Deserialize;
use serde_json::Value;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
pub trait ServiceStore {
fn service(&self) -> Arc<LspsService>;
}
#[derive(Debug, Clone, Deserialize)]
struct CustomMsg {
peer_id: PublicKey,
payload: String,
}
fn rpc_path<S>(p: &Plugin<S>) -> PathBuf
where
S: Clone + Sync + Send + 'static,
{
let dir = p.configuration().lightning_dir;
Path::new(&dir).join(&p.configuration().rpc_file)
}
async fn send_custommsg<P>(rpc_path: P, peer: &PublicKey, msg: &str) -> Result<()>
where
P: AsRef<Path>,
{
let mut client = cln_rpc::ClnRpc::new(rpc_path).await?;
let _ = client
.call_typed(&SendcustommsgRequest {
msg: msg.to_owned(),
node_id: peer.to_owned(),
})
.await?;
Ok(())
}
pub async fn on_custommsg_service<S>(plugin: Plugin<S>, v: Value) -> Result<Value>
where
S: Clone + Sync + Send + 'static + ServiceStore,
{
let msg: CustomMsg = serde_json::from_value(v)?;
let req = match decode_lsps0_frame_hex(&msg.payload) {
Some(d) => d,
None => {
return Ok(serde_json::json!({
"result": "continue"
}))
}
};
let service = plugin.state().service();
let rpc_path = rpc_path(&plugin);
let ctx = RequestContext {
peer_id: msg.peer_id,
};
let res = service.handle(&ctx, &req).await;
if let Some(payload) = res {
let payload = encode_lsps0_frame_hex(&payload);
if let Err(e) = send_custommsg(&rpc_path, &msg.peer_id, &payload).await {
log::error!("Failed to send LSPS response to {}: {}", &msg.peer_id, e);
};
}
Ok(serde_json::json!({
"result": "continue"
}))
}
fn decode_lsps0_frame_hex(hex_str: &str) -> Option<Vec<u8>> {
let frame = match hex::decode(hex_str) {
Ok(f) => f,
Err(e) => {
log::error!(
"Failed to decode hex string payload from custom message: {}",
e
);
return None;
}
};
decode_frame(&frame).ok().map(|d| d.to_owned())
}
fn encode_lsps0_frame_hex(payload: &[u8]) -> String {
let frame = encode_frame(payload);
hex::encode(&frame)
}

View File

@@ -1,9 +1,19 @@
use std::sync::Arc;
use crate::{
cln_adapters::sender::ClnSender,
core::transport::{MultiplexedTransport, PendingRequests},
core::{
server::LspsService,
transport::{MultiplexedTransport, PendingRequests},
},
};
pub trait ClientState {
fn transport(&self) -> MultiplexedTransport<ClnSender>;
fn pending(&self) -> &PendingRequests;
}
pub trait ServiceState {
fn service(&self) -> Arc<LspsService>;
fn sender(&self) -> ClnSender;
}

View File

@@ -1,11 +0,0 @@
use serde::Deserialize;
pub fn extract_message_id(payload: &[u8]) -> Option<String> {
#[derive(Deserialize)]
struct IdOnly {
id: Option<String>,
}
let parsed: IdOnly = serde_json::from_slice(payload).ok()?;
parsed.id
}

View File

@@ -1,6 +1,6 @@
use anyhow::bail;
use cln_lsps::{
cln_adapters::service::{on_custommsg_service, ServiceStore},
cln_adapters::{hooks::service_custommsg_hook, sender::ClnSender, state::ServiceState},
core::{
lsps2::handler::{ClnApiRpc, HtlcAcceptedHookHandler, Lsps2ServiceHandler},
server::LspsService,
@@ -11,19 +11,38 @@ use cln_lsps::{
},
};
use cln_plugin::Plugin;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Clone)]
struct State {
lsps_service: Arc<LspsService>,
sender: ClnSender,
lsps2_enabled: bool,
}
impl ServiceStore for State {
impl State {
pub fn new(rpc_path: PathBuf, promise_secret: &[u8; 32]) -> Self {
let api = Arc::new(ClnApiRpc::new(rpc_path.clone()));
let sender = ClnSender::new(rpc_path);
let lsps2_handler = Arc::new(Lsps2ServiceHandler::new(api, promise_secret));
let lsps_service = Arc::new(LspsService::builder().with_protocol(lsps2_handler).build());
Self {
lsps_service,
sender,
lsps2_enabled: true,
}
}
}
impl ServiceState for State {
fn service(&self) -> Arc<LspsService> {
self.lsps_service.clone()
}
fn sender(&self) -> cln_lsps::cln_adapters::sender::ClnSender {
self.sender.clone()
}
}
#[tokio::main]
@@ -43,7 +62,7 @@ async fn main() -> Result<(), anyhow::Error> {
// cln_plugin::FeatureBitsKind::Init,
// util::feature_bit_to_hex(LSP_FEATURE_BIT),
// )
.hook("custommsg", on_custommsg_service)
.hook("custommsg", service_custommsg_hook)
.hook("htlc_accepted", on_htlc_accepted)
.configure()
.await?
@@ -80,17 +99,7 @@ async fn main() -> Result<(), anyhow::Error> {
}
};
let cln_api_rpc = ClnApiRpc::new(rpc_path);
let lsps2_handler =
Arc::new(Lsps2ServiceHandler::new(Arc::new(cln_api_rpc), &secret));
let lsps_service_builder = LspsService::builder();
let lsps_service = lsps_service_builder.with_protocol(lsps2_handler).build();
let state = State {
lsps_service: Arc::new(lsps_service),
lsps2_enabled: true,
};
let state = State::new(rpc_path, &secret);
let plugin = plugin.start(state).await?;
plugin.join().await
} else {