commit 239f604: [Project] Lua_udp: Implement fully functional client

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Jan 21 17:14:03 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-01-21 16:26:11 +0000
URL: https://github.com/rspamd/rspamd/commit/239f6045f9cfad501d55f926771a2dd7c664c5b2

[Project] Lua_udp: Implement fully functional client

---
 src/lua/lua_udp.c | 272 +++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 250 insertions(+), 22 deletions(-)

diff --git a/src/lua/lua_udp.c b/src/lua/lua_udp.c
index 3b54c5e9e..1bff702a9 100644
--- a/src/lua/lua_udp.c
+++ b/src/lua/lua_udp.c
@@ -54,16 +54,21 @@ static const struct luaL_reg udp_libf[] = {
 
 struct lua_udp_cbdata {
 	struct event io;
+	struct timeval tv;
 	struct event_base *ev_base;
+	struct rspamd_async_event *async_ev;
+	struct rspamd_task *task;
 	rspamd_mempool_t *pool;
 	rspamd_inet_addr_t *addr;
 	struct rspamd_symcache_item *item;
 	struct rspamd_async_session *s;
 	struct iovec *iov;
 	lua_State *L;
+	guint retransmits;
 	guint iovlen;
 	gint sock;
 	gint cbref;
+	gboolean sent;
 };
 
 #define msg_debug_udp(...)  rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -110,6 +115,26 @@ lua_udp_cbd_fin (gpointer p)
 	if (cbd->addr) {
 		rspamd_inet_address_free (cbd->addr);
 	}
+
+	if (cbd->cbref) {
+		luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+	}
+}
+
+static void
+lua_udp_maybe_free (struct lua_udp_cbdata *cbd)
+{
+	if (cbd->item) {
+		rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
+		cbd->item = NULL;
+	}
+
+	if (cbd->async_ev) {
+		rspamd_session_remove_event (cbd->s, lua_udp_cbd_fin, cbd);
+	}
+	else {
+		lua_udp_cbd_fin (cbd);
+	}
 }
 
 
@@ -143,44 +168,186 @@ lua_try_send_request (struct lua_udp_cbdata *cbd)
 	return RSPAMD_SENT_FAILURE;
 }
 
+static void
+lua_udp_maybe_push_error (struct lua_udp_cbdata *cbd, const gchar *err)
+{
+	if (cbd->cbref != -1) {
+		gint top;
+		lua_State *L = cbd->L;
+
+		top = lua_gettop (L);
+		lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+
+		/* Error message */
+		lua_pushboolean (L, false);
+		lua_pushstring (L, err);
+
+		if (cbd->item) {
+			rspamd_symcache_set_cur_item (cbd->task, cbd->item);
+		}
+
+		if (lua_pcall (L, 2, 0, 0) != 0) {
+			msg_info ("callback call failed: %s", lua_tostring (L, -1));
+		}
+
+		lua_settop (L, top);
+	}
+
+	lua_udp_maybe_free (cbd);
+}
+
+static void
+lua_udp_push_data (struct lua_udp_cbdata *cbd, const gchar *data,
+		gssize len)
+{
+	if (cbd->cbref != -1) {
+		gint top;
+		lua_State *L = cbd->L;
+
+		top = lua_gettop (L);
+		lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+
+		/* Error message */
+		lua_pushboolean (L, true);
+		lua_pushlstring (L, data, len);
+
+		if (cbd->item) {
+			rspamd_symcache_set_cur_item (cbd->task, cbd->item);
+		}
+
+		if (lua_pcall (L, 2, 0, 0) != 0) {
+			msg_info ("callback call failed: %s", lua_tostring (L, -1));
+		}
+
+		lua_settop (L, top);
+	}
+
+	lua_udp_maybe_free (cbd);
+}
+
+static gboolean
+lua_udp_maybe_register_event (struct lua_udp_cbdata *cbd)
+{
+	if (cbd->s) {
+		cbd->async_ev = rspamd_session_add_event (cbd->s, lua_udp_cbd_fin,
+				cbd, M);
+
+		if (!cbd->async_ev) {
+			return FALSE;
+		}
+	}
+
+	if (cbd->task) {
+		cbd->item = rspamd_symcache_get_cur_item (cbd->task);
+		rspamd_symcache_item_async_inc (cbd->task, cbd->item, M);
+	}
+
+	return TRUE;
+}
+
+static void
+lua_udp_io_handler (gint fd, short what, gpointer p)
+{
+	struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
+	lua_State *L;
+	gssize r;
+
+	L = cbd->L;
+
+	if (what == EV_TIMEOUT) {
+		if (cbd->sent  && cbd->retransmits > 0) {
+			r = lua_try_send_request (cbd);
+
+			if (r == RSPAMD_SENT_OK) {
+				event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+				event_base_set (cbd->ev_base, &cbd->io);
+				event_add (&cbd->io, &cbd->tv);
+				lua_udp_maybe_register_event (cbd);
+			}
+			else if (r == RSPAMD_SENT_FAILURE) {
+				lua_udp_maybe_push_error (cbd, "write error");
+			}
+			else {
+				cbd->retransmits --;
+				event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+				event_base_set (cbd->ev_base, &cbd->io);
+				event_add (&cbd->io, &cbd->tv);
+			}
+		}
+		else {
+			if (!cbd->sent) {
+				lua_udp_maybe_push_error (cbd, "sent timeout");
+			}
+			else {
+				lua_udp_maybe_push_error (cbd, "read timeout");
+			}
+		}
+	}
+	else if (what == EV_WRITE) {
+		r = lua_try_send_request (cbd);
+
+		if (r == RSPAMD_SENT_OK) {
+			if (cbd->cbref != -1) {
+				event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+				event_base_set (cbd->ev_base, &cbd->io);
+				event_add (&cbd->io, &cbd->tv);
+				cbd->sent = TRUE;
+			}
+			else {
+				lua_udp_maybe_free (cbd);
+			}
+		}
+		else if (r == RSPAMD_SENT_FAILURE) {
+			lua_udp_maybe_push_error (cbd, "write error");
+		}
+		else {
+			cbd->retransmits --;
+			event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+			event_base_set (cbd->ev_base, &cbd->io);
+			event_add (&cbd->io, &cbd->tv);
+		}
+	}
+	else if (what == EV_READ) {
+		guchar udpbuf[4096];
+		socklen_t slen;
+		struct sockaddr *sa;
+
+		sa = rspamd_inet_address_get_sa (cbd->addr, &slen);
+
+		r = recvfrom (cbd->sock, udpbuf, sizeof (udpbuf), 0, sa, &slen);
+
+		if (r == -1) {
+			lua_udp_maybe_push_error (cbd, strerror (errno));
+		}
+		else {
+			lua_udp_push_data (cbd, udpbuf, r);
+		}
+	}
+}
 
 /***
- * @function rspamd_tcp.request({params})
- * This function creates and sends TCP request to the specified host and port,
- * resolves hostname (if needed) and invokes continuation callback upon data received
- * from the remote peer. This function accepts table of arguments with the following
- * attributes
+ * @function rspamd_udp.sendto({params})
+ * This function simply sends data to an external UDP service
  *
  * - `task`: rspamd task objects (implies `pool`, `session`, `ev_base` and `resolver` arguments)
  * - `ev_base`: event base (if no task specified)
  * - `session`: events session (no task)
  * - `host`: IP or name of the peer (required)
- * - `port`: remote port to use
+ * - `port`: remote port to use (if `host` has no port part this is required)
  * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
- * - `callback`: continuation function (required)
- * - `on_connect`: callback called on connection success
- * - `timeout`: floating point value that specifies timeout for IO operations in **seconds**
- * - `partial`: boolean flag that specifies that callback should be called on any data portion received
- * - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp)
- * - `shutdown`: half-close socket after writing (boolean: default false)
- * - `read`: read response after sending request (boolean: default true)
- * @return {boolean} true if request has been sent
+ * @return {boolean} true if request has been sent (additional string if it has not)
  */
 static gint
 lua_udp_sendto (lua_State *L) {
 	LUA_TRACE_POINT;
 	const gchar *host;
 	guint port;
-	gint cbref, tp, conn_cbref = -1;
 	struct event_base *ev_base = NULL;
 	struct lua_udp_cbdata *cbd;
 	struct rspamd_async_session *session = NULL;
 	struct rspamd_task *task = NULL;
 	rspamd_inet_addr_t *addr;
 	rspamd_mempool_t *pool = NULL;
-	struct iovec *iov = NULL;
-	guint niov = 0, total_out;
-	guint64 h;
 	gdouble timeout = default_udp_timeout;
 
 	if (lua_type (L, 1) == LUA_TTABLE) {
@@ -204,7 +371,9 @@ lua_udp_sendto (lua_State *L) {
 			host = luaL_checkstring (L, -1);
 
 			if (rspamd_parse_inet_address (&addr, host, 0)) {
-				rspamd_inet_address_set_port (addr, port);
+				if (port != 0) {
+					rspamd_inet_address_set_port (addr, port);
+				}
 			}
 			else {
 				lua_pop (L, 1);
@@ -222,6 +391,10 @@ lua_udp_sendto (lua_State *L) {
 			}
 
 			addr = rspamd_inet_address_copy (lip->addr);
+
+			if (port != 0) {
+				rspamd_inet_address_set_port (addr, port);
+			}
 		}
 		else {
 			lua_pop (L, 1);
@@ -276,6 +449,14 @@ lua_udp_sendto (lua_State *L) {
 		}
 		lua_pop (L, 1);
 
+		if (!ev_base || !pool) {
+			rspamd_inet_address_free (addr);
+
+			return luaL_error (L, "invalid arguments");
+		}
+
+
+
 		if (!ev_base || !pool) {
 			rspamd_inet_address_free (addr);
 
@@ -289,6 +470,8 @@ lua_udp_sendto (lua_State *L) {
 		cbd->addr = addr;
 		cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
 				SOCK_DGRAM, 0, TRUE);
+		cbd->cbref = -1;
+		double_to_tv (timeout, &cbd->tv);
 
 		if (cbd->sock == -1) {
 			rspamd_inet_address_free (addr);
@@ -301,6 +484,8 @@ lua_udp_sendto (lua_State *L) {
 		gsize data_len;
 
 		lua_pushstring (L, "data");
+		lua_gettable (L, -2);
+
 		if (lua_type (L, -1) == LUA_TTABLE) {
 			data_len = rspamd_lua_table_size (L, -1);
 			cbd->iov = rspamd_mempool_alloc (pool,
@@ -322,22 +507,65 @@ lua_udp_sendto (lua_State *L) {
 
 		lua_pop (L, 1);
 
+		lua_pushstring (L, "callback");
+		lua_gettable (L, -2);
+		if (lua_type (L, -1) == LUA_TFUNCTION) {
+			cbd->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+		}
+		else {
+			lua_pop (L, 1);
+		}
+
+		lua_pushstring (L, "retransmits");
+		lua_gettable (L, -2);
+		if (lua_type (L, -1) == LUA_TNUMBER) {
+			cbd->retransmits = lua_tonumber (L, -1);
+		}
+		lua_pop (L, 1);
+
 		enum rspamd_udp_send_result r;
 
 		r = lua_try_send_request (cbd);
 		if (r == RSPAMD_SENT_OK) {
+			if (cbd->cbref == -1) {
+				lua_udp_maybe_free (cbd);
+			}
+			else {
+				if (!lua_udp_maybe_register_event (cbd)) {
+					lua_pushboolean (L, false);
+					lua_pushstring (L, "session error");
+					lua_udp_maybe_free (cbd);
+
+					return 2;
+				}
+
+				event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+				event_base_set (cbd->ev_base, &cbd->io);
+				event_add (&cbd->io, &cbd->tv);
+				cbd->sent = TRUE;
+			}
+
 			lua_pushboolean (L, true);
-			lua_udp_cbd_fin (cbd);
 		}
 		else if (r == RSPAMD_SENT_FAILURE) {
 			lua_pushboolean (L, false);
 			lua_pushstring (L, strerror (errno));
-			lua_udp_cbd_fin (cbd);
+			lua_udp_maybe_free (cbd);
 
 			return 2;
 		}
 		else {
-			/* TODO: add waiting */
+			event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+			event_base_set (cbd->ev_base, &cbd->io);
+			event_add (&cbd->io, &cbd->tv);
+
+			if (!lua_udp_maybe_register_event (cbd)) {
+				lua_pushboolean (L, false);
+				lua_pushstring (L, "session error");
+				lua_udp_maybe_free (cbd);
+
+				return 2;
+			}
 		}
 	}
 	else {


More information about the Commits mailing list