commit 4beec4e: [Project] Add some major stuff to implement client side of the fuzzy ping
Vsevolod Stakhov
vsevolod at rspamd.com
Fri Nov 10 13:49:09 UTC 2023
Author: Vsevolod Stakhov
Date: 2023-11-09 17:57:21 +0000
URL: https://github.com/rspamd/rspamd/commit/4beec4e027e10135ada07015a0f991d97ca83fee
[Project] Add some major stuff to implement client side of the fuzzy ping
---
lualib/rspamadm/fuzzy_ping.lua | 67 +++++++-
src/plugins/fuzzy_check.c | 357 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 417 insertions(+), 7 deletions(-)
diff --git a/lualib/rspamadm/fuzzy_ping.lua b/lualib/rspamadm/fuzzy_ping.lua
index e13220865..eed509282 100644
--- a/lualib/rspamadm/fuzzy_ping.lua
+++ b/lualib/rspamadm/fuzzy_ping.lua
@@ -33,16 +33,14 @@ parser:option "-r --rule"
:description "Storage to ping (must be configured in Rspamd configuration)"
:argname("<name>")
:default("rspamd.com")
-parser:option "-f --flood"
- :description "Flood mode (send requests as fast as possible)"
- :argname("<count>")
- :convert(tonumber)
- :default(10)
parser:option "-t --timeout"
:description "Timeout for requests"
:argname("<timeout>")
:convert(tonumber)
:default(5)
+parser:option "-s --server"
+ :description "Override server to ping"
+ :argname("<name>")
parser:option "-n --number"
:description "Timeout for requests"
:argname("<number>")
@@ -90,18 +88,73 @@ local function print_storages(rules)
end
end
+local function print_results(results)
+ for _, res in ipairs(results) do
+ if res.success then
+ print(highlight('Server %s: %s ms', res.server, res.latency))
+ else
+ print(highlight('Server %s: %s', res.server, res.error))
+ end
+ end
+end
+
local function handler(args)
local opts = parser:parse(args)
load_config(opts)
if opts.list then
- local storages = rspamd_plugins.fuzzy_check.list_storages(rspamd_config)
- print_storages(storages)
+ print_storages(rspamd_plugins.fuzzy_check.list_storages(rspamd_config))
os.exit(0)
end
+ -- Perform ping using a fake task from async stuff provided by rspamadm
+ local rspamd_task = require "rspamd_task"
+
+ local task = rspamd_task.create(rspamd_config, rspamadm_ev_base)
+ task:set_session(rspamadm_session)
+ task:set_resolver(rspamadm_dns_resolver)
+
+ local replied = 0
+ local results = {}
+
+ local function gen_ping_fuzzy_cb(num)
+ return function(success, server, latency_or_err)
+ if not success then
+ results[num] = {
+ success = false,
+ error = latency_or_err,
+ server = server,
+ }
+ else
+ results[num] = {
+ success = true,
+ latency = latency_or_err,
+ server = server,
+ }
+ end
+
+ if replied == opts.number - 1 then
+ print_results(results)
+ else
+ replied = replied + 1
+ end
+ end
+ end
+
+ local function ping_fuzzy(num)
+ local ret, err = rspamd_plugins.fuzzy_check.ping_storage(task, gen_ping_fuzzy_cb(num),
+ opts.rule, opts.timeout, opts.server)
+
+ if not ret then
+ rspamd_logger.errx('cannot ping fuzzy storage: %s', err)
+ os.exit(1)
+ end
+ end
+ for i = 1, opts.number do
+ ping_fuzzy(i)
+ end
end
return {
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 96c283c5d..2576586c4 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -196,6 +196,7 @@ static gint fuzzy_lua_unlearn_handler(lua_State *L);
static gint fuzzy_lua_gen_hashes_handler(lua_State *L);
static gint fuzzy_lua_hex_hashes_handler(lua_State *L);
static gint fuzzy_lua_list_storages(lua_State *L);
+static gint fuzzy_lua_ping_storage(lua_State *L);
module_t fuzzy_check_module = {
"fuzzy_check",
@@ -1227,6 +1228,9 @@ gint fuzzy_check_module_config(struct rspamd_config *cfg, bool validate)
lua_pushstring(L, "list_storages");
lua_pushcfunction(L, fuzzy_lua_list_storages);
lua_settable(L, -3);
+ lua_pushstring(L, "ping_storage");
+ lua_pushcfunction(L, fuzzy_lua_ping_storage);
+ lua_settable(L, -3);
/* Finish fuzzy_check key */
lua_settable(L, -3);
}
@@ -1369,6 +1373,59 @@ fuzzy_cmd_stat(struct fuzzy_rule *rule,
return io;
}
+static inline double
+fuzzy_milliseconds_since_midnight(void)
+{
+ double now = rspamd_get_calendar_ticks();
+ double ms = now - (int64_t) now;
+ now = (((int64_t) now % 86400) + ms) * 1000;
+
+ return now;
+}
+
+static struct fuzzy_cmd_io *
+fuzzy_cmd_ping(struct fuzzy_rule *rule,
+ rspamd_mempool_t *pool)
+{
+ struct rspamd_fuzzy_cmd *cmd;
+ struct rspamd_fuzzy_encrypted_cmd *enccmd = NULL;
+ struct fuzzy_cmd_io *io;
+
+ if (rule->peer_key) {
+ enccmd = rspamd_mempool_alloc0(pool, sizeof(*enccmd));
+ cmd = &enccmd->cmd;
+ }
+ else {
+ cmd = rspamd_mempool_alloc0(pool, sizeof(*cmd));
+ }
+
+ /* Get milliseconds since midnight */
+
+
+ cmd->cmd = FUZZY_PING;
+ cmd->version = RSPAMD_FUZZY_PLUGIN_VERSION;
+ cmd->shingles_count = 0;
+ cmd->value = fuzzy_milliseconds_since_midnight(); /* Record timestamp */
+ cmd->tag = ottery_rand_uint32();
+
+ io = rspamd_mempool_alloc(pool, sizeof(*io));
+ io->flags = 0;
+ io->tag = cmd->tag;
+ memcpy(&io->cmd, cmd, sizeof(io->cmd));
+
+ if (rule->peer_key && enccmd) {
+ fuzzy_encrypt_cmd(rule, &enccmd->hdr, (guchar *) cmd, sizeof(*cmd));
+ io->io.iov_base = enccmd;
+ io->io.iov_len = sizeof(*enccmd);
+ }
+ else {
+ io->io.iov_base = cmd;
+ io->io.iov_len = sizeof(*cmd);
+ }
+
+ return io;
+}
+
static struct fuzzy_cmd_io *
fuzzy_cmd_hash(struct fuzzy_rule *rule,
int c,
@@ -3040,6 +3097,16 @@ fuzzy_generate_commands(struct rspamd_task *task, struct fuzzy_rule *rule,
goto end;
}
+ else if (c == FUZZY_PING) {
+ res = g_ptr_array_sized_new(1);
+
+ io = fuzzy_cmd_ping(rule, task->task_pool);
+ if (io) {
+ g_ptr_array_add(res, io);
+ }
+
+ goto end;
+ }
if (task->message == NULL) {
goto end;
@@ -4246,6 +4313,9 @@ fuzzy_attach_controller(struct module_ctx *ctx, GHashTable *commands)
return 0;
}
+/* Lua handlers */
+/* TODO: move to a separate unit, as this file is now a bit too hard to read */
+
static void
lua_upstream_str_inserter(struct upstream *up, guint idx, void *ud)
{
@@ -4301,5 +4371,292 @@ fuzzy_lua_list_storages(lua_State *L)
lua_setfield(L, -2, rule->name);
}
+ return 1;
+}
+
+struct fuzzy_lua_session {
+ struct rspamd_task *task;
+ lua_State *L;
+ rspamd_inet_addr_t *addr;
+ GPtrArray *commands;
+ struct fuzzy_rule *rule;
+ struct rspamd_io_ev ev;
+ gint cbref;
+ gint fd;
+};
+
+static void
+fuzzy_lua_session_fin(void *ud)
+{
+ struct fuzzy_lua_session *session = ud;
+
+ if (session->commands) {
+ g_ptr_array_free(session->commands, TRUE);
+ }
+
+ rspamd_ev_watcher_stop(session->task->event_loop, &session->ev);
+ luaL_unref(session->L, LUA_REGISTRYINDEX, session->cbref);
+}
+
+static gboolean
+fuzzy_lua_session_is_completed(struct fuzzy_lua_session *session)
+{
+ struct fuzzy_cmd_io *io;
+ guint nreplied = 0, i;
+
+
+ for (i = 0; i < session->commands->len; i++) {
+ io = g_ptr_array_index(session->commands, i);
+
+ if (io->flags & FUZZY_CMD_FLAG_REPLIED) {
+ nreplied++;
+ }
+ }
+
+ if (nreplied == session->commands->len) {
+
+ rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static gint
+fuzzy_lua_try_read(struct fuzzy_lua_session *session)
+{
+ struct rspamd_task *task;
+ const struct rspamd_fuzzy_reply *rep;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
+ struct fuzzy_cmd_io *io = NULL;
+ gint r, ret;
+ guchar buf[2048], *p;
+
+ task = session->task;
+
+ if ((r = read(session->fd, buf, sizeof(buf) - 1)) == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ return 0;
+ }
+ else {
+ return -1;
+ }
+ }
+ else {
+ p = buf;
+
+ ret = 0;
+
+ while ((rep = fuzzy_process_reply(&p, &r,
+ session->commands, session->rule, &cmd, &io)) != NULL) {
+
+ lua_rawgeti(session->L, LUA_REGISTRYINDEX, session->cbref);
+
+ if (rep->v1.prob > 0.5) {
+ if (cmd->cmd == FUZZY_PING) {
+ /* ret, addr, latency */
+ lua_pushboolean(session->L, TRUE);
+ rspamd_lua_ip_push(session->L, session->addr);
+ lua_pushnumber(session->L, fuzzy_milliseconds_since_midnight() - rep->v1.value);
+ }
+ else {
+ /* TODO: unsupported */
+ lua_pushboolean(session->L, FALSE);
+ rspamd_lua_ip_push(session->L, session->addr);
+ lua_pushstring(session->L, "unsupported");
+ }
+ }
+ else {
+ lua_pushboolean(session->L, FALSE);
+ rspamd_lua_ip_push(session->L, session->addr);
+ lua_pushfstring(session->L, "invalid reply from server: %d", rep->v1.value);
+ }
+
+ /* TODO: check results maybe? */
+ lua_pcall(session->L, 3, 0, 0);
+
+ ret = 1;
+ }
+ }
+
+ return ret;
+}
+
+/* Fuzzy check callback */
+static void
+fuzzy_lua_io_callback(gint fd, short what, void *arg)
+{
+ struct fuzzy_lua_session *session = arg;
+ gint r;
+
+ enum {
+ return_error = 0,
+ return_want_more,
+ return_finished
+ } ret = return_error;
+
+ if (what & EV_READ) {
+ /* Try to read reply */
+ r = fuzzy_lua_try_read(session);
+
+ switch (r) {
+ case 0:
+ if (what & EV_READ) {
+ ret = return_want_more;
+ }
+ else {
+ if (what & EV_WRITE) {
+ /* Retransmit attempt */
+ if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) {
+ ret = return_error;
+ }
+ else {
+ ret = return_want_more;
+ }
+ }
+ }
+ break;
+ case 1:
+ ret = return_finished;
+ break;
+ default:
+ ret = return_error;
+ break;
+ }
+ }
+ else if (what & EV_WRITE) {
+ if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) {
+ ret = return_error;
+ }
+ else {
+ ret = return_want_more;
+ }
+ }
+ else {
+ /* Timeout */
+ ret = return_error;
+ }
+
+ if (ret == return_want_more) {
+ /* Processed write, switch to reading */
+ rspamd_ev_watcher_reschedule(session->task->event_loop,
+ &session->ev, EV_READ);
+ }
+ else if (ret == return_error) {
+ rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+ }
+ else {
+ /* Read something from network */
+ if (!fuzzy_lua_session_is_completed(session)) {
+ /* Need to read more */
+ rspamd_ev_watcher_reschedule(session->task->event_loop,
+ &session->ev, EV_READ);
+ }
+ }
+}
+
+/***
+ * @function fuzzy_check.ping_storage(task, callback, rule, timeout[, server_override])
+ * @return
+ */
+static gint
+fuzzy_lua_ping_storage(lua_State *L)
+{
+ struct rspamd_task *task = lua_check_task(L, 1);
+
+ if (task == NULL) {
+ return luaL_error(L, "invalid arguments: task");
+ }
+
+ /* Other arguments sanity */
+ if (lua_type(L, 2) != LUA_TFUNCTION || lua_type(L, 3) != LUA_TSTRING || lua_type(L, 4) != LUA_TNUMBER) {
+ return luaL_error(L, "invalid arguments: callback/rule/timeout argument");
+ }
+
+ struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context(task->cfg);
+ struct fuzzy_rule *rule, *rule_found = NULL;
+ int i;
+ const char *rule_name = lua_tostring(L, 3);
+
+ PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule)
+ {
+ if (strcmp(rule->name, rule_name) == 0) {
+ rule_found = rule;
+ break;
+ }
+ }
+
+ if (rule_found == NULL) {
+ return luaL_error(L, "invalid arguments: no such rule defined");
+ }
+
+ rspamd_inet_addr_t *addr = NULL;
+
+ if (lua_type(L, 5) == LUA_TSTRING) {
+ const gchar *server_name = lua_tostring(L, 5);
+ enum rspamd_parse_host_port_result res;
+ GPtrArray *addrs;
+
+ /* We resolve address synchronously here! Why? Because it is an override... */
+ res = rspamd_parse_host_port_priority(server_name, &addrs, 0, NULL,
+ 11335, FALSE, task->task_pool);
+
+ if (res == RSPAMD_PARSE_ADDR_FAIL) {
+ lua_pushboolean(L, FALSE);
+ lua_pushfstring(L, "invalid arguments: cannot resolve %s", server_name);
+ return 2;
+ }
+
+ /* Get random address */
+ addr = rspamd_inet_address_copy(g_ptr_array_index(addrs, rspamd_random_uint64_fast() % addrs->len),
+ task->task_pool);
+ rspamd_mempool_add_destructor(task->task_pool,
+ rspamd_ptr_array_free_hard, addrs);
+ }
+ else {
+ struct upstream *selected = rspamd_upstream_get(rule_found->servers,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+ addr = rspamd_upstream_addr_next(selected);
+ }
+
+ if (addr != NULL) {
+ int sock;
+ GPtrArray *commands = fuzzy_generate_commands(task, rule, FUZZY_PING, 0, 0, 0);
+
+ if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
+ lua_pushboolean(L, FALSE);
+ lua_pushfstring(L, "cannot connect to %s, %s",
+ rspamd_inet_address_to_string_pretty(addr),
+ strerror(errno));
+ return 2;
+ }
+ else {
+ /* Create a dedicated ping session for a socket */
+ struct fuzzy_lua_session *session =
+ rspamd_mempool_alloc0(task->task_pool,
+ sizeof(struct fuzzy_lua_session));
+ session->task = task;
+ session->fd = sock;
+ session->addr = addr;
+ session->commands = commands;
+ session->L = L;
+ session->rule = rule_found;
+ /* Store callback */
+ lua_pushvalue(L, 2);
+ session->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+ rspamd_session_add_event(task->s, fuzzy_lua_session_fin, session, M);
+ rspamd_ev_watcher_init(&session->ev,
+ sock,
+ EV_WRITE,
+ fuzzy_lua_io_callback,
+ session);
+ rspamd_ev_watcher_start(session->task->event_loop, &session->ev,
+ lua_tonumber(L, 4));
+ }
+ }
+
+ lua_pushboolean(L, TRUE);
return 1;
}
\ No newline at end of file
More information about the Commits
mailing list