From ffc05ddcf05c6be092c7e4f2397d47c17b756494 Mon Sep 17 00:00:00 2001 From: Peter Neuroth Date: Thu, 4 Dec 2025 17:19:38 +0100 Subject: [PATCH] plugins: lsps: add slim generic json-rpc router This router is very lightweight and can handle routing for the lsps service. It avoids allocations by just borrowing from the request. This is much cleaner than what we have now and reduces the amount of serializations by a lot. Also it supports typed handlers which will remove serialization overhead from the handlers in future commits Signed-off-by: Peter Neuroth --- plugins/lsps-plugin/Cargo.toml | 2 +- plugins/lsps-plugin/src/core/mod.rs | 1 + plugins/lsps-plugin/src/core/router.rs | 385 +++++++++++++++++++++++++ 3 files changed, 387 insertions(+), 1 deletion(-) create mode 100644 plugins/lsps-plugin/src/core/router.rs diff --git a/plugins/lsps-plugin/Cargo.toml b/plugins/lsps-plugin/Cargo.toml index 028aee20a..7fd074f1c 100644 --- a/plugins/lsps-plugin/Cargo.toml +++ b/plugins/lsps-plugin/Cargo.toml @@ -23,6 +23,6 @@ log = "0.4" paste = "1.0.15" rand = "0.9" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json = { version = "1.0", features = ["raw_value"] } thiserror = "2.0" tokio = { version = "1.44", features = ["full"] } diff --git a/plugins/lsps-plugin/src/core/mod.rs b/plugins/lsps-plugin/src/core/mod.rs index 8fc4cfc2b..0b7aa6a55 100644 --- a/plugins/lsps-plugin/src/core/mod.rs +++ b/plugins/lsps-plugin/src/core/mod.rs @@ -1,3 +1,4 @@ pub mod client; +pub mod router; pub mod server; pub mod transport; diff --git a/plugins/lsps-plugin/src/core/router.rs b/plugins/lsps-plugin/src/core/router.rs new file mode 100644 index 000000000..63b84b680 --- /dev/null +++ b/plugins/lsps-plugin/src/core/router.rs @@ -0,0 +1,385 @@ +use crate::proto::jsonrpc::{RpcError, RpcErrorExt}; +use bitcoin::secp256k1::PublicKey; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::value::RawValue; +use std::{collections::HashMap, future::Future, pin::Pin}; + +pub type BoxedHandler = Box< + dyn Fn( + &RequestContext, + &[u8], + ) -> Pin> + Send>> + + Send + + Sync, +>; + +/// Convenience macro to register a handler at the JsonRpcRouterBuilder. +#[macro_export] +macro_rules! register_handler { + ($builder:expr, $handler:expr, $method:literal, $fn:ident) => {{ + let h = $handler.clone(); + $crate::core::router::JsonRpcRouterBuilder::register($builder, $method, move |p| { + let h = h.clone(); + async move { h.$fn(p).await } + }); + }}; + + // With context (peer_id) + ($builder:expr, $handler:expr, $method:literal, $fn:ident, with_peer) => {{ + let h = $handler.clone(); + $crate::core::router::JsonRpcRouterBuilder::register_with_context( + $builder, + $method, + move |ctx, p| { + let h = h.clone(); + async move { h.$fn(ctx.peer_id, p).await } + }, + ); + }}; +} + +#[derive(Clone)] +pub struct RequestContext { + pub peer_id: PublicKey, +} + +/// Builder for a generic JSON-RPC 2.0 router +pub struct JsonRpcRouterBuilder { + handlers: HashMap<&'static str, BoxedHandler>, +} + +impl JsonRpcRouterBuilder { + pub fn new() -> Self { + Self { + handlers: HashMap::new(), + } + } + + pub fn register(&mut self, method: &'static str, handler: F) + where + P: DeserializeOwned + Send + 'static, + R: Serialize + Send + 'static, + F: Fn(P) -> Fut + Send + Sync + Clone + 'static, + Fut: Future> + Send + 'static, + { + let boxed: BoxedHandler = Box::new(move |_ctx, params: &[u8]| { + let handler = handler.clone(); + let params: Result = serde_json::from_slice(params); + Box::pin(async move { + let params = params.map_err(|e| RpcError::invalid_params(e))?; + let result = handler(params).await?; + serde_json::to_value(&result).map_err(|_| RpcError::internal_error("parsing error")) + }) + }); + self.handlers.insert(method, boxed); + } + + pub fn register_with_context(&mut self, method: &'static str, handler: F) + where + P: DeserializeOwned + Send + 'static, + R: Serialize + Send + 'static, + F: Fn(RequestContext, P) -> Fut + Send + Sync + Clone + 'static, + Fut: Future> + Send + 'static, + { + let boxed: BoxedHandler = Box::new(move |ctx: &RequestContext, params: &[u8]| { + let handler = handler.clone(); + let ctx = ctx.clone(); + let params: Result = serde_json::from_slice(params); + Box::pin(async move { + let params = params.map_err(|e| RpcError::invalid_params(e))?; + let result = handler(ctx, params).await?; + serde_json::to_value(&result).map_err(|_| RpcError::internal_error("parsing error")) + }) + }); + self.handlers.insert(method, boxed); + } + + pub fn build(self) -> JsonRpcRouter { + JsonRpcRouter { + handlers: self.handlers, + } + } +} + +/// Generic JSON-RPC 2.0 router +pub struct JsonRpcRouter { + handlers: HashMap<&'static str, BoxedHandler>, +} + +impl JsonRpcRouter { + pub async fn handle(&self, ctx: &RequestContext, request: &[u8]) -> Option> { + #[derive(Deserialize)] + struct BorrowedRequest<'a> { + jsonrpc: &'a str, + method: &'a str, + #[serde(borrow)] + id: Option<&'a str>, + #[serde(borrow)] + params: Option<&'a RawValue>, + } + + let req: BorrowedRequest<'_> = match serde_json::from_slice(request) { + Ok(req) => req, + Err(_) => { + return Some(error_response( + None, + RpcError::parse_error("failed to parse request"), + )) + } + }; + + if req.jsonrpc != "2.0" { + return Some(error_response(req.id, RpcError::invalid_request(""))); + } + + let handler = match self.handlers.get(req.method) { + Some(h) => h, + None => return Some(error_response(req.id, RpcError::method_not_found(""))), + }; + + // Notification -> no response + let id = match req.id { + Some(id) => id, + None => return None, + }; + + let param_bytes = match req.params { + Some(raw) => raw.get().as_bytes(), + None => b"{}", + }; + + Some(match handler(ctx, param_bytes).await { + Ok(r) => success_response(id, r), + Err(e) => error_response(Some(id), e), + }) + } + + pub fn methods(&self) -> Vec<&'static str> { + self.handlers.keys().copied().collect() + } +} + +fn success_response(id: &str, result: serde_json::Value) -> Vec { + serde_json::to_vec(&serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": result + })) + .unwrap() +} + +fn error_response(id: Option<&str>, error: RpcError) -> Vec { + serde_json::to_vec(&serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "error": { + "code": error.code, + "message": error.message, + "data": error.data + } + })) + .unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::jsonrpc::{INVALID_PARAMS, INVALID_REQUEST, METHOD_NOT_FOUND, PARSE_ERROR}; + use serde::{Deserialize, Serialize}; + use serde_json::{self, json}; + + // Simple types for testing + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + struct AddParams { + a: i32, + b: i32, + } + + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + struct AddResult { + sum: i32, + } + + fn test_peer_id() -> PublicKey { + "0279BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798" + .parse() + .unwrap() + } + + fn test_context() -> RequestContext { + RequestContext { + peer_id: test_peer_id(), + } + } + + #[tokio::test] + async fn dispatches_to_registered_handler_and_returns_success() { + let mut builder = JsonRpcRouterBuilder::new(); + builder.register("add", |p: AddParams| async move { + Ok(AddResult { sum: p.a + p.b }) + }); + + let router = builder.build(); + + let req = json!({ + "jsonrpc": "2.0", + "method": "add", + "id": "1", + "params": { "a": 1, "b": 2 }, + }); + + let req_bytes = serde_json::to_vec(&req).unwrap(); + + let resp_bytes = router + .handle(&test_context(), &req_bytes) + .await + .expect("should not be a notification"); + + let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap(); + + assert_eq!(resp["jsonrpc"], "2.0"); + assert_eq!(resp["id"], "1"); + assert_eq!(resp["result"]["sum"], 3); + assert!(resp.get("error").is_none()); + } + + #[tokio::test] + async fn returns_none_for_notification() { + let mut builder = JsonRpcRouterBuilder::new(); + builder.register("add", |p: AddParams| async move { + Ok(AddResult { sum: p.a + p.b }) + }); + + let router = builder.build(); + + // No `id` → notification + let req = json!({ + "jsonrpc": "2.0", + "method": "add", + "params": { "a": 10, "b": 20 }, + }); + + let req_bytes = serde_json::to_vec(&req).unwrap(); + let resp = router.handle(&test_context(), &req_bytes).await; + + assert!(resp.is_none(), "notifications must not produce a response"); + } + + #[tokio::test] + async fn unknown_method_returns_method_not_found() { + let builder = JsonRpcRouterBuilder::new(); + let router = builder.build(); + + let req = json!({ + "jsonrpc": "2.0", + "method": "does.not.exist", + "id": "42", + "params": {}, + }); + + let req_bytes = serde_json::to_vec(&req).unwrap(); + let resp_bytes = router + .handle(&test_context(), &req_bytes) + .await + .expect("not a notification"); + + let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap(); + + assert_eq!(resp["jsonrpc"], "2.0"); + assert_eq!(resp["id"], "42"); + assert_eq!(resp["error"]["code"], METHOD_NOT_FOUND); + assert!(resp.get("result").is_none()); + } + + #[tokio::test] + async fn invalid_json_returns_parse_error_with_null_id() { + let builder = JsonRpcRouterBuilder::new(); + let router = builder.build(); + + // Not valid JSON at all + let resp_bytes = router + .handle(&test_context(), b"this is not json") + .await + .expect("parse error still produces a response"); + + let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap(); + + assert_eq!(resp["jsonrpc"], "2.0"); + assert_eq!(resp["id"], serde_json::Value::Null); + assert_eq!(resp["error"]["code"], PARSE_ERROR); + } + + #[tokio::test] + async fn wrong_jsonrpc_version_returns_invalid_request() { + let builder = JsonRpcRouterBuilder::new(); + let router = builder.build(); + + let req = json!({ + "jsonrpc": "1.0", // wrong + "method": "add", + "id": "1", + "params": {}, + }); + + let req_bytes = serde_json::to_vec(&req).unwrap(); + let resp_bytes = router + .handle(&test_context(), &req_bytes) + .await + .expect("not a notification"); + + let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap(); + + assert_eq!(resp["error"]["code"], INVALID_REQUEST); + assert_eq!(resp["id"], "1"); + } + + #[tokio::test] + async fn bad_params_return_invalid_params_error() { + let mut builder = JsonRpcRouterBuilder::new(); + builder.register("add", |p: AddParams| async move { + Ok(AddResult { sum: p.a + p.b }) + }); + + let router = builder.build(); + + // `params` is a string, but handler expects AddParams → serde should fail → invalid_params + let req = json!({ + "jsonrpc": "2.0", + "method": "add", + "id": "1", + "params": "not an object", + }); + + let req_bytes = serde_json::to_vec(&req).unwrap(); + let resp_bytes = router + .handle(&test_context(), &req_bytes) + .await + .expect("not a notification"); + + let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap(); + + assert_eq!(resp["error"]["code"], INVALID_PARAMS); + assert_eq!(resp["id"], "1"); + assert!(resp.get("result").is_none()); + } + + #[test] + fn methods_returns_registered_method_names() { + let mut builder = JsonRpcRouterBuilder::new(); + + builder.register("add", |p: AddParams| async move { + Ok(AddResult { sum: p.a + p.b }) + }); + + builder.register("sub", |p: AddParams| async move { + Ok(AddResult { sum: p.a - p.b }) + }); + + let router = builder.build(); + + let mut methods = router.methods(); + methods.sort(); + + assert_eq!(methods, vec!["add", "sub"]); + } +}