From 24d86a85c3af99ee7408c3cedfb1b346ffb49dbb Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 30 Jan 2023 16:54:17 +1030 Subject: [PATCH] plugins/sql: initial commit of new plugin. This is designed to allow you to perform complex server-side queries. Signed-off-by: Rusty Russell --- plugins/Makefile | 7 + plugins/sql.c | 556 +++++++++++++++++++++++++++++++++++++++++++ tests/test_plugin.py | 19 ++ 3 files changed, 582 insertions(+) create mode 100644 plugins/sql.c diff --git a/plugins/Makefile b/plugins/Makefile index ee35f1925..ffbdd0ded 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -39,6 +39,10 @@ PLUGIN_FETCHINVOICE_SRC := plugins/fetchinvoice.c PLUGIN_FETCHINVOICE_OBJS := $(PLUGIN_FETCHINVOICE_SRC:.c=.o) PLUGIN_FETCHINVOICE_HEADER := +PLUGIN_SQL_SRC := plugins/sql.c +PLUGIN_SQL_HEADER := +PLUGIN_SQL_OBJS := $(PLUGIN_SQL_SRC:.c=.o) + PLUGIN_SPENDER_SRC := \ plugins/spender/fundchannel.c \ plugins/spender/main.c \ @@ -97,6 +101,7 @@ C_PLUGINS := \ plugins/offers \ plugins/pay \ plugins/txprepare \ + plugins/sql \ plugins/spenderp PLUGINS := $(C_PLUGINS) @@ -199,6 +204,8 @@ plugins/fetchinvoice: $(PLUGIN_FETCHINVOICE_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_CO plugins/funder: bitcoin/psbt.o common/psbt_open.o $(PLUGIN_FUNDER_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_COMMON_OBJS) $(JSMN_OBJS) +plugins/sql: $(PLUGIN_SQL_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_COMMON_OBJS) $(JSMN_OBJS) + # Generated from PLUGINS definition in plugins/Makefile ALL_C_HEADERS += plugins/list_of_builtin_plugins_gen.h plugins/list_of_builtin_plugins_gen.h: plugins/Makefile Makefile config.vars diff --git a/plugins/sql.c b/plugins/sql.c new file mode 100644 index 000000000..155bac36e --- /dev/null +++ b/plugins/sql.c @@ -0,0 +1,556 @@ +/* Brilliant or insane? You decide! */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include + +/* TODO: + * 1. Generate from schemas. + * 2. Refresh time in API. + * 3. Colnames API to return dict. + * 4. sql-schemas command. + * 5. documentation. + * 6. test on mainnet. + * 7. Some cool query for documentation. + * 8. time_msec fields. + * 9. Primary key in schema? + * 10. Pagination API + */ +enum fieldtype { + /* Hex variants */ + FIELD_HEX, + FIELD_HASH, + FIELD_SECRET, + FIELD_PUBKEY, + FIELD_TXID, + /* Integer variants */ + FIELD_MSAT, + FIELD_INTEGER, + FIELD_U64, + FIELD_U32, + FIELD_U16, + FIELD_U8, + FIELD_BOOL, + /* Randoms */ + FIELD_NUMBER, + FIELD_STRING, + FIELD_SCID, +}; + +struct fieldtypemap { + const char *name; + const char *sqltype; +}; + +static const struct fieldtypemap fieldtypemap[] = { + { "hex", "BLOB" }, /* FIELD_HEX */ + { "hash", "BLOB" }, /* FIELD_HASH */ + { "secret", "BLOB" }, /* FIELD_SECRET */ + { "pubkey", "BLOB" }, /* FIELD_PUBKEY */ + { "txid", "BLOB" }, /* FIELD_TXID */ + { "msat", "INTEGER" }, /* FIELD_MSAT */ + { "integer", "INTEGER" }, /* FIELD_INTEGER */ + { "u64", "INTEGER" }, /* FIELD_U64 */ + { "u32", "INTEGER" }, /* FIELD_U32 */ + { "u16", "INTEGER" }, /* FIELD_U16 */ + { "u8", "INTEGER" }, /* FIELD_U8 */ + { "boolean", "INTEGER" }, /* FIELD_BOOL */ + { "number", "REAL" }, /* FIELD_NUMBER */ + { "string", "TEXT" }, /* FIELD_STRING */ + { "short_channel_id", "TEXT" }, /* FIELD_SCID */ +}; + +struct db_query { + sqlite3_stmt *stmt; + struct table_desc **tables; + const char *authfail; +}; + +struct table_desc { + /* e.g. peers for listpeers */ + const char *name; + const char **columns; + char *update_stmt; + enum fieldtype *fieldtypes; +}; +static STRMAP(struct table_desc *) tablemap; +static size_t max_dbmem = 500000000; +static struct sqlite3 *db; +static const char *dbfilename; + +static struct sqlite3 *sqlite_setup(struct plugin *plugin) +{ + int err; + struct sqlite3 *db; + char *errmsg; + + if (dbfilename) { + err = sqlite3_open_v2(dbfilename, &db, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, + NULL); + } else { + err = sqlite3_open_v2("", &db, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE + | SQLITE_OPEN_MEMORY, + NULL); + } + if (err != SQLITE_OK) + plugin_err(plugin, "Could not create db: errcode %u", err); + + sqlite3_extended_result_codes(db, 1); + + /* From https://www.sqlite.org/c3ref/set_authorizer.html: + * + * Applications that need to process SQL from untrusted + * sources might also consider lowering resource limits using + * sqlite3_limit() and limiting database size using the + * max_page_count PRAGMA in addition to using an authorizer. + */ + sqlite3_limit(db, SQLITE_LIMIT_LENGTH, 1000000); + sqlite3_limit(db, SQLITE_LIMIT_SQL_LENGTH, 10000); + sqlite3_limit(db, SQLITE_LIMIT_COLUMN, 100); + sqlite3_limit(db, SQLITE_LIMIT_EXPR_DEPTH, 100); + sqlite3_limit(db, SQLITE_LIMIT_COMPOUND_SELECT, 10); + sqlite3_limit(db, SQLITE_LIMIT_VDBE_OP, 1000); + sqlite3_limit(db, SQLITE_LIMIT_ATTACHED, 1); + sqlite3_limit(db, SQLITE_LIMIT_LIKE_PATTERN_LENGTH, 500); + sqlite3_limit(db, SQLITE_LIMIT_VARIABLE_NUMBER, 100); + sqlite3_limit(db, SQLITE_LIMIT_TRIGGER_DEPTH, 1); + sqlite3_limit(db, SQLITE_LIMIT_WORKER_THREADS, 1); + + /* Default is now 4k pages, so allow 500MB */ + err = sqlite3_exec(db, tal_fmt(tmpctx, "PRAGMA max_page_count = %zu;", + max_dbmem / 4096), + NULL, NULL, &errmsg); + if (err != SQLITE_OK) + plugin_err(plugin, "Could not set max_page_count: %s", errmsg); + + return db; +} + +static bool has_table_desc(struct table_desc **tables, struct table_desc *t) +{ + for (size_t i = 0; i < tal_count(tables); i++) { + if (tables[i] == t) + return true; + } + return false; +} + +static int sqlite_authorize(void *dbq_, int code, + const char *a, + const char *b, + const char *c, + const char *d) +{ + struct db_query *dbq = dbq_; + + /* You can do select statements */ + if (code == SQLITE_SELECT) + return SQLITE_OK; + + /* You can do a column read: takes a table name */ + if (code == SQLITE_READ) { + struct table_desc *td = strmap_get(&tablemap, a); + if (!td) { + dbq->authfail = tal_fmt(dbq, "Unknown table %s", a); + return SQLITE_DENY; + } + if (!has_table_desc(dbq->tables, td)) + tal_arr_expand(&dbq->tables, td); + return SQLITE_OK; + } + + /* See https://www.sqlite.org/c3ref/c_alter_table.html to decode these! */ + dbq->authfail = tal_fmt(dbq, "Unauthorized: %u arg1=%s arg2=%s dbname=%s caller=%s", + code, + a ? a : "(none)", + b ? b : "(none)", + c ? c : "(none)", + d ? d : "(none)"); + return SQLITE_DENY; +} + +static struct command_result *refresh_complete(struct command *cmd, + struct db_query *dbq) +{ + char *errmsg; + int err, num_cols; + size_t num_rows; + struct json_stream *ret; + + num_cols = sqlite3_column_count(dbq->stmt); + + /* We normally hit an error immediately, so return a simple error then */ + ret = NULL; + num_rows = 0; + errmsg = NULL; + + while ((err = sqlite3_step(dbq->stmt)) == SQLITE_ROW) { + if (!ret) { + ret = jsonrpc_stream_success(cmd); + json_array_start(ret, "rows"); + } + json_array_start(ret, NULL); + for (size_t i = 0; i < num_cols; i++) { + /* The returned value is one of + * SQLITE_INTEGER, SQLITE_FLOAT, SQLITE_TEXT, + * SQLITE_BLOB, or SQLITE_NULL */ + switch (sqlite3_column_type(dbq->stmt, i)) { + case SQLITE_INTEGER: { + s64 v = sqlite3_column_int64(dbq->stmt, i); + json_add_s64(ret, NULL, v); + break; + } + case SQLITE_FLOAT: { + double v = sqlite3_column_double(dbq->stmt, i); + json_add_primitive_fmt(ret, NULL, "%f", v); + break; + } + case SQLITE_TEXT: { + const char *c = (char *)sqlite3_column_text(dbq->stmt, i); + if (!utf8_check(c, strlen(c))) { + json_add_str_fmt(ret, NULL, + "INVALID UTF-8 STRING %s", + tal_hexstr(tmpctx, c, strlen(c))); + errmsg = tal_fmt(cmd, "Invalid UTF-8 string row %zu column %zu", + num_rows, i); + } else + json_add_string(ret, NULL, c); + break; + } + case SQLITE_BLOB: + json_add_hex(ret, NULL, + sqlite3_column_blob(dbq->stmt, i), + sqlite3_column_bytes(dbq->stmt, i)); + break; + case SQLITE_NULL: + json_add_primitive(ret, NULL, "null"); + break; + default: + errmsg = tal_fmt(cmd, "Unknown column type %i in row %zu column %zu", + sqlite3_column_type(dbq->stmt, i), + num_rows, i); + } + } + json_array_end(ret); + num_rows++; + } + if (err != SQLITE_DONE) + errmsg = tal_fmt(cmd, "Executing statement: %s", + sqlite3_errmsg(db)); + + sqlite3_finalize(dbq->stmt); + + + /* OK, did we hit some error during? Simple if we didn't + * already start answering! */ + if (errmsg) { + if (!ret) + return command_fail(cmd, LIGHTNINGD, "%s", errmsg); + + /* Otherwise, add it as a warning */ + json_array_end(ret); + json_add_string(ret, "warning_db_failure", errmsg); + } else { + /* Empty result is possible, OK. */ + if (!ret) { + ret = jsonrpc_stream_success(cmd); + json_array_start(ret, "rows"); + } + json_array_end(ret); + } + return command_finished(cmd, ret); +} + +/* Recursion */ +static struct command_result *refresh_tables(struct command *cmd, + struct db_query *dbq); + +static struct command_result *list_done(struct command *cmd, + const char *buf, + const jsmntok_t *result, + struct db_query *dbq) +{ + const struct table_desc *td = dbq->tables[0]; + size_t i; + const jsmntok_t *t, *arr = json_get_member(buf, result, td->name); + int err; + sqlite3_stmt *stmt; + char *errmsg; + + /* FIXME: this is where a wait / pagination API is useful! */ + err = sqlite3_exec(db, tal_fmt(tmpctx, "DELETE FROM %s;", td->name), + NULL, NULL, &errmsg); + if (err != SQLITE_OK) { + return command_fail(cmd, LIGHTNINGD, "cleaning '%s' failed: %s", + td->name, errmsg); + } + + err = sqlite3_prepare_v2(db, td->update_stmt, -1, &stmt, NULL); + if (err != SQLITE_OK) { + return command_fail(cmd, LIGHTNINGD, "preparing '%s' failed: %s", + td->update_stmt, + sqlite3_errmsg(db)); + } + + json_for_each_arr(i, t, arr) { + size_t c; + /* FIXME: This is O(n^2): hash td->columns and look up + * the other way. */ + for (c = 0; c < tal_count(td->columns); c++) { + const jsmntok_t *col = json_get_member(buf, t, td->columns[c]); + if (!col) + sqlite3_bind_null(stmt, c + 1); + else { + u64 val64; + struct amount_msat valmsat; + u8 *valhex; + double valdouble; + bool valbool; + + switch (td->fieldtypes[c]) { + case FIELD_U8: + case FIELD_U16: + case FIELD_U32: + case FIELD_U64: + case FIELD_INTEGER: + if (!json_to_u64(buf, col, &val64)) { + return command_fail(cmd, LIGHTNINGD, + "column %zu row %zu not a u64: %.*s", + c, i, + json_tok_full_len(col), + json_tok_full(buf, col)); + } + sqlite3_bind_int64(stmt, c + 1, val64); + break; + case FIELD_BOOL: + if (!json_to_bool(buf, col, &valbool)) { + return command_fail(cmd, LIGHTNINGD, + "column %zu row %zu not a boolean: %.*s", + c, i, + json_tok_full_len(col), + json_tok_full(buf, col)); + } + sqlite3_bind_int(stmt, c + 1, valbool); + break; + case FIELD_NUMBER: + if (!json_to_double(buf, col, &valdouble)) { + return command_fail(cmd, LIGHTNINGD, + "column %zu row %zu not a double: %.*s", + c, i, + json_tok_full_len(col), + json_tok_full(buf, col)); + } + sqlite3_bind_double(stmt, c + 1, valdouble); + break; + case FIELD_MSAT: + if (!json_to_msat(buf, col, &valmsat)) { + return command_fail(cmd, LIGHTNINGD, + "column %zu row %zu not an msat: %.*s", + c, i, + json_tok_full_len(col), + json_tok_full(buf, col)); + } + sqlite3_bind_int64(stmt, c + 1, valmsat.millisatoshis /* Raw: db */); + break; + case FIELD_SCID: + case FIELD_STRING: + sqlite3_bind_text(stmt, c + 1, buf + col->start, + col->end - col->start, SQLITE_STATIC); + break; + case FIELD_HEX: + case FIELD_HASH: + case FIELD_SECRET: + case FIELD_PUBKEY: + case FIELD_TXID: + valhex = json_tok_bin_from_hex(tmpctx, buf, col); + if (!valhex) { + return command_fail(cmd, LIGHTNINGD, + "column %zu row %zu not valid hex: %.*s", + c, i, + json_tok_full_len(col), + json_tok_full(buf, col)); + } + sqlite3_bind_blob(stmt, c + 1, valhex, tal_count(valhex), + SQLITE_STATIC); + break; + } + } + } + err = sqlite3_step(stmt); + + if (err != SQLITE_DONE) { + const char *emsg = sqlite3_errmsg(db); + sqlite3_finalize(stmt); + return command_fail(cmd, LIGHTNINGD, + "Error executing %s on column %zu row %zu: %s", + td->update_stmt, + c, i, emsg); + } + sqlite3_reset(stmt); + } + sqlite3_finalize(stmt); + + /* Remove that, iterate */ + tal_arr_remove(&dbq->tables, 0); + return refresh_tables(cmd, dbq); +} + +static struct command_result *refresh_tables(struct command *cmd, + struct db_query *dbq) +{ + struct out_req *req; + const struct table_desc *td; + + if (tal_count(dbq->tables) == 0) + return refresh_complete(cmd, dbq); + + td = dbq->tables[0]; + req = jsonrpc_request_start(cmd->plugin, cmd, + tal_fmt(tmpctx, "list%s", td->name), + list_done, forward_error, + dbq); + return send_outreq(cmd->plugin, req); +} + +static struct command_result *json_sql(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct db_query *dbq = tal(cmd, struct db_query); + const char *query; + int err; + + if (!param(cmd, buffer, params, + p_req("query", param_string, &query), + NULL)) + return command_param_failed(); + + dbq->tables = tal_arr(dbq, struct table_desc *, 0); + dbq->authfail = NULL; + + /* This both checks we're not altering, *and* tells us what + * tables to refresh. */ + err = sqlite3_set_authorizer(db, sqlite_authorize, dbq); + if (err != SQLITE_OK) { + plugin_err(cmd->plugin, "Could not set authorizer: %s", + sqlite3_errmsg(db)); + } + + err = sqlite3_prepare_v2(db, query, -1, &dbq->stmt, NULL); + sqlite3_set_authorizer(db, NULL, NULL); + + if (err != SQLITE_OK) { + char *errmsg = tal_fmt(tmpctx, "query failed with %s", sqlite3_errmsg(db)); + if (dbq->authfail) + tal_append_fmt(&errmsg, " (%s)", dbq->authfail); + return command_fail(cmd, LIGHTNINGD, "%s", errmsg); + } + + return refresh_tables(cmd, dbq); +} + +static void init_tablemap(struct plugin *plugin) +{ + struct table_desc *td; + char *create_stmt; + int err; + char *errmsg; + + strmap_init(&tablemap); + + /* FIXME: Load from schemas! */ + td = tal(NULL, struct table_desc); + td->name = "forwards"; + td->columns = tal_arr(td, const char *, 11); + td->fieldtypes = tal_arr(td, enum fieldtype, 11); + td->columns[0] = "in_htlc_id"; + td->fieldtypes[0] = FIELD_U64; + td->columns[1] = "in_channel"; + td->fieldtypes[1] = FIELD_SCID; + td->columns[2] = "in_msat"; + td->fieldtypes[2] = FIELD_MSAT; + td->columns[3] = "status"; + td->fieldtypes[3] = FIELD_STRING; + td->columns[4] = "received_time"; + td->fieldtypes[4] = FIELD_NUMBER; + td->columns[5] = "out_channel"; + td->fieldtypes[5] = FIELD_SCID; + td->columns[6] = "out_htlc_id"; + td->fieldtypes[6] = FIELD_U64; + td->columns[7] = "style"; + td->fieldtypes[7] = FIELD_STRING; + td->columns[8] = "fee_msat"; + td->fieldtypes[8] = FIELD_MSAT; + td->columns[9] = "out_msat"; + td->fieldtypes[9] = FIELD_MSAT; + td->columns[10] = "resolved_time"; + td->fieldtypes[10] = FIELD_NUMBER; + + /* FIXME: Primary key from schema? */ + create_stmt = tal_fmt(tmpctx, "CREATE TABLE %s (", td->name); + td->update_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); + for (size_t i = 0; i < tal_count(td->columns); i++) { + tal_append_fmt(&td->update_stmt, "%s?", + i == 0 ? "" : ","); + tal_append_fmt(&create_stmt, "%s%s %s", + i == 0 ? "" : ",", + td->columns[i], + fieldtypemap[td->fieldtypes[i]].sqltype); + } + tal_append_fmt(&create_stmt, ");"); + tal_append_fmt(&td->update_stmt, ");"); + + err = sqlite3_exec(db, create_stmt, NULL, NULL, &errmsg); + if (err != SQLITE_OK) + plugin_err(plugin, "Could not create %s: %s", td->name, errmsg); + + strmap_add(&tablemap, td->name, td); +} + +#if DEVELOPER +static void memleak_mark_tablemap(struct plugin *p, struct htable *memtable) +{ + memleak_ptr(memtable, dbfilename); + memleak_scan_strmap(memtable, &tablemap); +} +#endif + +static const char *init(struct plugin *plugin, + const char *buf UNUSED, const jsmntok_t *config UNUSED) +{ + db = sqlite_setup(plugin); + init_tablemap(plugin); + +#if DEVELOPER + plugin_set_memleak_handler(plugin, memleak_mark_tablemap); +#endif + return NULL; +} + +static const struct plugin_command commands[] = { { + "sql", + "misc", + "Run {query} and return result", + "This is the greatest plugin command ever!", + json_sql, + }, +}; + +int main(int argc, char *argv[]) +{ + setup_locale(); + plugin_main(argv, init, PLUGIN_RESTARTABLE, true, NULL, commands, ARRAY_SIZE(commands), + NULL, 0, NULL, 0, NULL, 0, + plugin_option("sqlfilename", + "string", + "Use on-disk sqlite3 file instead of in memory (e.g. debugging)", + charp_option, &dbfilename), + NULL); +} diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 04d1cfbe0..2e20dcd8d 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -3275,3 +3275,22 @@ def test_block_added_notifications(node_factory, bitcoind): sync_blockheight(bitcoind, [l2]) ret = l2.rpc.call("blockscatched") assert len(ret) == 3 and ret[1] == next_l2_base + 1 and ret[2] == next_l2_base + 2 + + +def test_sql(node_factory, bitcoind): + l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True) + + ret = l2.rpc.sql("SELECT * FROM forwards;") + assert ret == {'rows': []} + + # This should create a forward through l2 + l1.rpc.pay(l3.rpc.invoice(amount_msat=12300, label='inv1', description='description')['bolt11']) + + ret = l2.rpc.sql("SELECT in_htlc_id,out_msat,status,out_htlc_id FROM forwards;") + assert only_one(ret['rows'])[0] == 0 + assert only_one(ret['rows'])[1] == 12300 + assert only_one(ret['rows'])[2] == 'settled' + assert only_one(ret['rows'])[3] == 0 + + with pytest.raises(RpcError, match='Unauthorized'): + l2.rpc.sql("DELETE FROM forwards;")