commit 4beec4e: [Project] Add some major stuff to implement client side of the fuzzy ping

Vsevolod Stakhov vsevolod at
Fri Nov 10 13:49:09 UTC 2023

Author: Vsevolod Stakhov
Date: 2023-11-09 17:57:21 +0000

[Project] Add some major stuff to implement client side of the fuzzy ping

 lualib/rspamadm/fuzzy_ping.lua |  67 +++++++-
 src/plugins/fuzzy_check.c      | 357 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 417 insertions(+), 7 deletions(-)

diff --git a/lualib/rspamadm/fuzzy_ping.lua b/lualib/rspamadm/fuzzy_ping.lua
index e13220865..eed509282 100644
--- a/lualib/rspamadm/fuzzy_ping.lua
+++ b/lualib/rspamadm/fuzzy_ping.lua
@@ -33,16 +33,14 @@ parser:option "-r --rule"
       :description "Storage to ping (must be configured in Rspamd configuration)"
-parser:option "-f --flood"
-      :description "Flood mode (send requests as fast as possible)"
-      :argname("<count>")
-      :convert(tonumber)
-      :default(10)
 parser:option "-t --timeout"
       :description "Timeout for requests"
+parser:option "-s --server"
+      :description "Override server to ping"
+      :argname("<name>")
 parser:option "-n --number"
       :description "Timeout for requests"
@@ -90,18 +88,73 @@ local function print_storages(rules)
+local function print_results(results)
+  for _, res in ipairs(results) do
+    if res.success then
+      print(highlight('Server %s: %s ms', res.server, res.latency))
+    else
+      print(highlight('Server %s: %s', res.server, res.error))
+    end
+  end
 local function handler(args)
   local opts = parser:parse(args)
   if opts.list then
-    local storages = rspamd_plugins.fuzzy_check.list_storages(rspamd_config)
-    print_storages(storages)
+    print_storages(rspamd_plugins.fuzzy_check.list_storages(rspamd_config))
+  -- Perform ping using a fake task from async stuff provided by rspamadm
+  local rspamd_task = require "rspamd_task"
+  local task = rspamd_task.create(rspamd_config, rspamadm_ev_base)
+  task:set_session(rspamadm_session)
+  task:set_resolver(rspamadm_dns_resolver)
+  local replied = 0
+  local results = {}
+  local function gen_ping_fuzzy_cb(num)
+    return function(success, server, latency_or_err)
+      if not success then
+        results[num] = {
+          success = false,
+          error = latency_or_err,
+          server = server,
+        }
+      else
+        results[num] = {
+          success = true,
+          latency = latency_or_err,
+          server = server,
+        }
+      end
+      if replied == opts.number - 1 then
+        print_results(results)
+      else
+        replied = replied + 1
+      end
+    end
+  end
+  local function ping_fuzzy(num)
+    local ret, err = rspamd_plugins.fuzzy_check.ping_storage(task, gen_ping_fuzzy_cb(num),
+        opts.rule, opts.timeout, opts.server)
+    if not ret then
+      rspamd_logger.errx('cannot ping fuzzy storage: %s', err)
+      os.exit(1)
+    end
+  end
+  for i = 1, opts.number do
+    ping_fuzzy(i)
+  end
 return {
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 96c283c5d..2576586c4 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -196,6 +196,7 @@ static gint fuzzy_lua_unlearn_handler(lua_State *L);
 static gint fuzzy_lua_gen_hashes_handler(lua_State *L);
 static gint fuzzy_lua_hex_hashes_handler(lua_State *L);
 static gint fuzzy_lua_list_storages(lua_State *L);
+static gint fuzzy_lua_ping_storage(lua_State *L);
 module_t fuzzy_check_module = {
@@ -1227,6 +1228,9 @@ gint fuzzy_check_module_config(struct rspamd_config *cfg, bool validate)
 		lua_pushstring(L, "list_storages");
 		lua_pushcfunction(L, fuzzy_lua_list_storages);
 		lua_settable(L, -3);
+		lua_pushstring(L, "ping_storage");
+		lua_pushcfunction(L, fuzzy_lua_ping_storage);
+		lua_settable(L, -3);
 		/* Finish fuzzy_check key */
 		lua_settable(L, -3);
@@ -1369,6 +1373,59 @@ fuzzy_cmd_stat(struct fuzzy_rule *rule,
 	return io;
+static inline double
+	double now = rspamd_get_calendar_ticks();
+	double ms = now - (int64_t) now;
+	now = (((int64_t) now % 86400) + ms) * 1000;
+	return now;
+static struct fuzzy_cmd_io *
+fuzzy_cmd_ping(struct fuzzy_rule *rule,
+			   rspamd_mempool_t *pool)
+	struct rspamd_fuzzy_cmd *cmd;
+	struct rspamd_fuzzy_encrypted_cmd *enccmd = NULL;
+	struct fuzzy_cmd_io *io;
+	if (rule->peer_key) {
+		enccmd = rspamd_mempool_alloc0(pool, sizeof(*enccmd));
+		cmd = &enccmd->cmd;
+	}
+	else {
+		cmd = rspamd_mempool_alloc0(pool, sizeof(*cmd));
+	}
+	/* Get milliseconds since midnight */
+	cmd->cmd = FUZZY_PING;
+	cmd->shingles_count = 0;
+	cmd->value = fuzzy_milliseconds_since_midnight(); /* Record timestamp */
+	cmd->tag = ottery_rand_uint32();
+	io = rspamd_mempool_alloc(pool, sizeof(*io));
+	io->flags = 0;
+	io->tag = cmd->tag;
+	memcpy(&io->cmd, cmd, sizeof(io->cmd));
+	if (rule->peer_key && enccmd) {
+		fuzzy_encrypt_cmd(rule, &enccmd->hdr, (guchar *) cmd, sizeof(*cmd));
+		io->io.iov_base = enccmd;
+		io->io.iov_len = sizeof(*enccmd);
+	}
+	else {
+		io->io.iov_base = cmd;
+		io->io.iov_len = sizeof(*cmd);
+	}
+	return io;
 static struct fuzzy_cmd_io *
 fuzzy_cmd_hash(struct fuzzy_rule *rule,
 			   int c,
@@ -3040,6 +3097,16 @@ fuzzy_generate_commands(struct rspamd_task *task, struct fuzzy_rule *rule,
 		goto end;
+	else if (c == FUZZY_PING) {
+		res = g_ptr_array_sized_new(1);
+		io = fuzzy_cmd_ping(rule, task->task_pool);
+		if (io) {
+			g_ptr_array_add(res, io);
+		}
+		goto end;
+	}
 	if (task->message == NULL) {
 		goto end;
@@ -4246,6 +4313,9 @@ fuzzy_attach_controller(struct module_ctx *ctx, GHashTable *commands)
 	return 0;
+/* Lua handlers */
+/* TODO: move to a separate unit, as this file is now a bit too hard to read */
 static void
 lua_upstream_str_inserter(struct upstream *up, guint idx, void *ud)
@@ -4301,5 +4371,292 @@ fuzzy_lua_list_storages(lua_State *L)
 		lua_setfield(L, -2, rule->name);
+	return 1;
+struct fuzzy_lua_session {
+	struct rspamd_task *task;
+	lua_State *L;
+	rspamd_inet_addr_t *addr;
+	GPtrArray *commands;
+	struct fuzzy_rule *rule;
+	struct rspamd_io_ev ev;
+	gint cbref;
+	gint fd;
+static void
+fuzzy_lua_session_fin(void *ud)
+	struct fuzzy_lua_session *session = ud;
+	if (session->commands) {
+		g_ptr_array_free(session->commands, TRUE);
+	}
+	rspamd_ev_watcher_stop(session->task->event_loop, &session->ev);
+	luaL_unref(session->L, LUA_REGISTRYINDEX, session->cbref);
+static gboolean
+fuzzy_lua_session_is_completed(struct fuzzy_lua_session *session)
+	struct fuzzy_cmd_io *io;
+	guint nreplied = 0, i;
+	for (i = 0; i < session->commands->len; i++) {
+		io = g_ptr_array_index(session->commands, i);
+		if (io->flags & FUZZY_CMD_FLAG_REPLIED) {
+			nreplied++;
+		}
+	}
+	if (nreplied == session->commands->len) {
+		rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+		return TRUE;
+	}
+	return FALSE;
+static gint
+fuzzy_lua_try_read(struct fuzzy_lua_session *session)
+	struct rspamd_task *task;
+	const struct rspamd_fuzzy_reply *rep;
+	struct rspamd_fuzzy_cmd *cmd = NULL;
+	struct fuzzy_cmd_io *io = NULL;
+	gint r, ret;
+	guchar buf[2048], *p;
+	task = session->task;
+	if ((r = read(session->fd, buf, sizeof(buf) - 1)) == -1) {
+		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+			return 0;
+		}
+		else {
+			return -1;
+		}
+	}
+	else {
+		p = buf;
+		ret = 0;
+		while ((rep = fuzzy_process_reply(&p, &r,
+										  session->commands, session->rule, &cmd, &io)) != NULL) {
+			lua_rawgeti(session->L, LUA_REGISTRYINDEX, session->cbref);
+			if (rep->v1.prob > 0.5) {
+				if (cmd->cmd == FUZZY_PING) {
+					/* ret, addr, latency */
+					lua_pushboolean(session->L, TRUE);
+					rspamd_lua_ip_push(session->L, session->addr);
+					lua_pushnumber(session->L, fuzzy_milliseconds_since_midnight() - rep->v1.value);
+				}
+				else {
+					/* TODO: unsupported */
+					lua_pushboolean(session->L, FALSE);
+					rspamd_lua_ip_push(session->L, session->addr);
+					lua_pushstring(session->L, "unsupported");
+				}
+			}
+			else {
+				lua_pushboolean(session->L, FALSE);
+				rspamd_lua_ip_push(session->L, session->addr);
+				lua_pushfstring(session->L, "invalid reply from server: %d", rep->v1.value);
+			}
+			/* TODO: check results maybe? */
+			lua_pcall(session->L, 3, 0, 0);
+			ret = 1;
+		}
+	}
+	return ret;
+/* Fuzzy check callback */
+static void
+fuzzy_lua_io_callback(gint fd, short what, void *arg)
+	struct fuzzy_lua_session *session = arg;
+	gint r;
+	enum {
+		return_error = 0,
+		return_want_more,
+		return_finished
+	} ret = return_error;
+	if (what & EV_READ) {
+		/* Try to read reply */
+		r = fuzzy_lua_try_read(session);
+		switch (r) {
+		case 0:
+			if (what & EV_READ) {
+				ret = return_want_more;
+			}
+			else {
+				if (what & EV_WRITE) {
+					/* Retransmit attempt */
+					if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) {
+						ret = return_error;
+					}
+					else {
+						ret = return_want_more;
+					}
+				}
+			}
+			break;
+		case 1:
+			ret = return_finished;
+			break;
+		default:
+			ret = return_error;
+			break;
+		}
+	}
+	else if (what & EV_WRITE) {
+		if (!fuzzy_cmd_vector_to_wire(fd, session->commands)) {
+			ret = return_error;
+		}
+		else {
+			ret = return_want_more;
+		}
+	}
+	else {
+		/* Timeout */
+		ret = return_error;
+	}
+	if (ret == return_want_more) {
+		/* Processed write, switch to reading */
+		rspamd_ev_watcher_reschedule(session->task->event_loop,
+									 &session->ev, EV_READ);
+	}
+	else if (ret == return_error) {
+		rspamd_session_remove_event(session->task->s, fuzzy_io_fin, session);
+	}
+	else {
+		/* Read something from network */
+		if (!fuzzy_lua_session_is_completed(session)) {
+			/* Need to
+			rspamd_ev_watcher_reschedule(session->task->event_loop,
+										 &session->ev, EV_READ);
+		}
+	}
+ * @function fuzzy_check.ping_storage(task, callback, rule, timeout[, server_override])
+ * @return
+ */
+static gint
+fuzzy_lua_ping_storage(lua_State *L)
+	struct rspamd_task *task = lua_check_task(L, 1);
+	if (task == NULL) {
+		return luaL_error(L, "invalid arguments: task");
+	}
+	/* Other arguments sanity */
+	if (lua_type(L, 2) != LUA_TFUNCTION || lua_type(L, 3) != LUA_TSTRING || lua_type(L, 4) != LUA_TNUMBER) {
+		return luaL_error(L, "invalid arguments: callback/rule/timeout argument");
+	}
+	struct fuzzy_ctx *fuzzy_module_ctx = fuzzy_get_context(task->cfg);
+	struct fuzzy_rule *rule, *rule_found = NULL;
+	int i;
+	const char *rule_name = lua_tostring(L, 3);
+	PTR_ARRAY_FOREACH(fuzzy_module_ctx->fuzzy_rules, i, rule)
+	{
+		if (strcmp(rule->name, rule_name) == 0) {
+			rule_found = rule;
+			break;
+		}
+	}
+	if (rule_found == NULL) {
+		return luaL_error(L, "invalid arguments: no such rule defined");
+	}
+	rspamd_inet_addr_t *addr = NULL;
+	if (lua_type(L, 5) == LUA_TSTRING) {
+		const gchar *server_name = lua_tostring(L, 5);
+		enum rspamd_parse_host_port_result res;
+		GPtrArray *addrs;
+		/* We resolve address synchronously here! Why? Because it is an override... */
+		res = rspamd_parse_host_port_priority(server_name, &addrs, 0, NULL,
+											  11335, FALSE, task->task_pool);
+		if (res == RSPAMD_PARSE_ADDR_FAIL) {
+			lua_pushboolean(L, FALSE);
+			lua_pushfstring(L, "invalid arguments: cannot resolve %s", server_name);
+			return 2;
+		}
+		/* Get random address */
+		addr = rspamd_inet_address_copy(g_ptr_array_index(addrs, rspamd_random_uint64_fast() % addrs->len),
+										task->task_pool);
+		rspamd_mempool_add_destructor(task->task_pool,
+									  rspamd_ptr_array_free_hard, addrs);
+	}
+	else {
+		struct upstream *selected = rspamd_upstream_get(rule_found->servers,
+		addr = rspamd_upstream_addr_next(selected);
+	}
+	if (addr != NULL) {
+		int sock;
+		GPtrArray *commands = fuzzy_generate_commands(task, rule, FUZZY_PING, 0, 0, 0);
+		if ((sock = rspamd_inet_address_connect(addr, SOCK_DGRAM, TRUE)) == -1) {
+			lua_pushboolean(L, FALSE);
+			lua_pushfstring(L, "cannot connect to %s, %s",
+							rspamd_inet_address_to_string_pretty(addr),
+							strerror(errno));
+			return 2;
+		}
+		else {
+			/* Create a dedicated ping session for a socket */
+			struct fuzzy_lua_session *session =
+				rspamd_mempool_alloc0(task->task_pool,
+									  sizeof(struct fuzzy_lua_session));
+			session->task = task;
+			session->fd = sock;
+			session->addr = addr;
+			session->commands = commands;
+			session->L = L;
+			session->rule = rule_found;
+			/* Store callback */
+			lua_pushvalue(L, 2);
+			session->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
+			rspamd_session_add_event(task->s, fuzzy_lua_session_fin, session, M);
+			rspamd_ev_watcher_init(&session->ev,
+								   sock,
+								   EV_WRITE,
+								   fuzzy_lua_io_callback,
+								   session);
+			rspamd_ev_watcher_start(session->task->event_loop, &session->ev,
+									lua_tonumber(L, 4));
+		}
+	}
+	lua_pushboolean(L, TRUE);
 	return 1;
\ No newline at end of file

More information about the Commits mailing list