commit 70d859c: [Feature] Allow to add upstream watchers to Lua API

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Dec 27 18:28:03 UTC 2018


Author: Vsevolod Stakhov
Date: 2018-12-05 14:31:54 +0000
URL: https://github.com/rspamd/rspamd/commit/70d859cede253e512d9f968178b3155bf927a68f

[Feature] Allow to add upstream watchers to Lua API

---
 src/libutil/upstream.c |   6 ++
 src/libutil/upstream.h |   1 +
 src/lua/lua_upstream.c | 171 +++++++++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 174 insertions(+), 4 deletions(-)

diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 90f792bbe..eb88e501a 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -36,6 +36,7 @@ struct upstream_addr_elt {
 
 struct upstream_list_watcher {
 	rspamd_upstream_watch_func func;
+	GFreeFunc dtor;
 	gpointer ud;
 	enum rspamd_upstreams_watch_event events_mask;
 	struct upstream_list_watcher *next, *prev;
@@ -879,6 +880,9 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
 		}
 
 		DL_FOREACH_SAFE (ups->watchers, w, tmp) {
+			if (w->dtor) {
+				w->dtor (w->ud);
+			}
 			g_free (w);
 		}
 
@@ -1178,6 +1182,7 @@ rspamd_upstreams_set_limits (struct upstream_list *ups,
 void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
 										  enum rspamd_upstreams_watch_event events,
 										  rspamd_upstream_watch_func func,
+										  GFreeFunc dtor,
 										  gpointer ud)
 {
 	struct upstream_list_watcher *nw;
@@ -1188,6 +1193,7 @@ void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
 	nw->func = func;
 	nw->events_mask = events;
 	nw->ud = ud;
+	nw->dtor = dtor;
 
 	DL_APPEND (ups->watchers, nw);
 }
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index 56d6fa6c5..5c0c92afc 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -204,6 +204,7 @@ typedef void (*rspamd_upstream_watch_func) (struct upstream *up,
 void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
 										  enum rspamd_upstreams_watch_event events,
 										  rspamd_upstream_watch_func func,
+										  GFreeFunc free_func,
 										  gpointer ud);
 
 /**
diff --git a/src/lua/lua_upstream.c b/src/lua/lua_upstream.c
index 854bfafd9..1a4d6b128 100644
--- a/src/lua/lua_upstream.c
+++ b/src/lua/lua_upstream.c
@@ -56,6 +56,7 @@ LUA_FUNCTION_DEF (upstream_list, all_upstreams);
 LUA_FUNCTION_DEF (upstream_list, get_upstream_by_hash);
 LUA_FUNCTION_DEF (upstream_list, get_upstream_round_robin);
 LUA_FUNCTION_DEF (upstream_list, get_upstream_master_slave);
+LUA_FUNCTION_DEF (upstream_list, add_watcher);
 
 static const struct luaL_reg upstream_list_m[] = {
 
@@ -63,6 +64,7 @@ static const struct luaL_reg upstream_list_m[] = {
 	LUA_INTERFACE_DEF (upstream_list, get_upstream_round_robin),
 	LUA_INTERFACE_DEF (upstream_list, get_upstream_master_slave),
 	LUA_INTERFACE_DEF (upstream_list, all_upstreams),
+	LUA_INTERFACE_DEF (upstream_list, add_watcher),
 	{"__tostring", rspamd_lua_class_tostring},
 	{"__gc", lua_upstream_list_destroy},
 	{NULL, NULL}
@@ -290,7 +292,7 @@ lua_upstream_list_get_upstream_by_hash (lua_State *L)
 		}
 	}
 	else {
-		lua_pushnil (L);
+		return luaL_error (L, "invalid arguments");
 	}
 
 	return 1;
@@ -322,7 +324,7 @@ lua_upstream_list_get_upstream_round_robin (lua_State *L)
 		}
 	}
 	else {
-		lua_pushnil (L);
+		return luaL_error (L, "invalid arguments");
 	}
 
 	return 1;
@@ -356,7 +358,7 @@ lua_upstream_list_get_upstream_master_slave (lua_State *L)
 		}
 	}
 	else {
-		lua_pushnil (L);
+		return luaL_error (L, "invalid arguments");
 	}
 
 	return 1;
@@ -390,12 +392,173 @@ lua_upstream_list_all_upstreams (lua_State *L)
 		rspamd_upstreams_foreach (upl, lua_upstream_inserter, L);
 	}
 	else {
-		lua_pushnil (L);
+		return luaL_error (L, "invalid arguments");
 	}
 
 	return 1;
 }
 
+static inline enum rspamd_upstreams_watch_event
+lua_str_to_upstream_flag (const gchar *str)
+{
+	enum rspamd_upstreams_watch_event fl = 0;
+
+	if (strcmp (str, "success") == 0) {
+		fl = RSPAMD_UPSTREAM_WATCH_SUCCESS;
+	}
+	else if (strcmp (str, "failure") == 0) {
+		fl = RSPAMD_UPSTREAM_WATCH_FAILURE;
+	}
+	else if (strcmp (str, "online") == 0) {
+		fl = RSPAMD_UPSTREAM_WATCH_ONLINE;
+	}
+	else if (strcmp (str, "offline") == 0) {
+		fl = RSPAMD_UPSTREAM_WATCH_OFFLINE;
+	}
+	else {
+		msg_err ("invalid flag: %s", str);
+	}
+
+	return fl;
+}
+
+static inline const gchar *
+lua_upstream_flag_to_str (enum rspamd_upstreams_watch_event fl)
+{
+	const gchar *res = "unknown";
+
+	/* Works with single flags, not combinations */
+	if (fl & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
+		res = "success";
+	}
+	else if (fl & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+		res = "failure";
+	}
+	else if (fl & RSPAMD_UPSTREAM_WATCH_ONLINE) {
+		res = "online";
+	}
+	else if (fl & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
+		res = "offline";
+	}
+	else {
+		msg_err ("invalid flag: %d", fl);
+	}
+
+	return res;
+}
+
+struct rspamd_lua_upstream_watcher_cbdata {
+	lua_State *L;
+	gint cbref;
+	struct upstream_list *upl;
+};
+
+static void
+lua_upstream_watch_func (struct upstream *up,
+						 enum rspamd_upstreams_watch_event event,
+						 guint cur_errors,
+						 void *ud)
+{
+	struct rspamd_lua_upstream_watcher_cbdata *cdata =
+			(struct rspamd_lua_upstream_watcher_cbdata *)ud;
+	lua_State *L;
+	struct upstream **pup;
+	const gchar *what;
+	gint err_idx;
+
+	L = cdata->L;
+	what = lua_upstream_flag_to_str (event);
+	lua_pushcfunction (L, &rspamd_lua_traceback);
+	err_idx = lua_gettop (L);
+
+	lua_rawgeti (L, LUA_REGISTRYINDEX, cdata->cbref);
+	lua_pushstring (L, what);
+	pup = lua_newuserdata (L, sizeof (*pup));
+	*pup = up;
+	rspamd_lua_setclass (L, "rspamd{upstream}", -1);
+	lua_pushinteger (L, cur_errors);
+
+	if (lua_pcall (L, 3, 0, err_idx) != 0) {
+		GString *tb = lua_touserdata (L, -1);
+		msg_err ("cannot call watch function for upstream: %s", tb->str);
+		g_string_free (tb, TRUE);
+		lua_settop (L, 0);
+
+		return;
+	}
+
+	lua_settop (L, 0);
+}
+
+static void
+lua_upstream_watch_dtor (gpointer ud)
+{
+	struct rspamd_lua_upstream_watcher_cbdata *cdata =
+			(struct rspamd_lua_upstream_watcher_cbdata *)ud;
+
+	luaL_unref (cdata->L, LUA_REGISTRYINDEX, cdata->cbref);
+	g_free (cdata);
+}
+
+/***
+ * @method upstream_list:add_watcher(what, cb)
+ * Add new watcher to the upstream lists events (table or a string):
+ *   - `success` - called whenever upstream successfully used
+ *   - `failure` - called on upstream error
+ *   - `online` - called when upstream is being taken online from offline
+ *   - `offline` - called when upstream is being taken offline from online
+ * Callback is a function: function(what, upstream, cur_errors) ... end
+ * @example
+ups:add_watcher('success', function(what, up, cur_errors) ... end)
+ups:add_watcher({'online', 'offline'}, function(what, up, cur_errors) ... end)
+ * @return nothing
+ */
+static gint
+lua_upstream_list_add_watcher (lua_State *L)
+{
+	LUA_TRACE_POINT;
+	struct upstream_list *upl;
+
+	upl = lua_check_upstream_list (L);
+	if (upl &&
+		(lua_type (L, 2) == LUA_TTABLE ||  lua_type (L, 2) == LUA_TSTRING) &&
+		lua_type (L, 3) == LUA_TFUNCTION) {
+
+		enum rspamd_upstreams_watch_event flags = 0;
+		struct rspamd_lua_upstream_watcher_cbdata *cdata;
+
+		if (lua_type (L, 2) == LUA_TSTRING) {
+			flags = lua_str_to_upstream_flag (lua_tostring (L, 2));
+		}
+		else {
+			for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
+				if (lua_isstring (L, -1)) {
+					flags |= lua_str_to_upstream_flag (lua_tostring (L, -1));
+				}
+				else {
+					lua_pop (L, 1);
+
+					return luaL_error (L, "invalid arguments");
+				}
+			}
+		}
+
+		cdata = g_malloc0 (sizeof (*cdata));
+		lua_pushvalue (L, 3); /* callback */
+		cdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+		cdata->L = L;
+		cdata->upl = upl;
+
+		rspamd_upstreams_add_watch_callback (upl, flags,
+				lua_upstream_watch_func, lua_upstream_watch_dtor, cdata);
+	}
+	else {
+		return luaL_error (L, "invalid arguments");
+	}
+
+	return 0;
+}
+
 static gint
 lua_load_upstream_list (lua_State * L)
 {


More information about the Commits mailing list