commit 239f604: [Project] Lua_udp: Implement fully functional client
Vsevolod Stakhov
vsevolod at highsecure.ru
Mon Jan 21 17:14:03 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-01-21 16:26:11 +0000
URL: https://github.com/rspamd/rspamd/commit/239f6045f9cfad501d55f926771a2dd7c664c5b2
[Project] Lua_udp: Implement fully functional client
---
src/lua/lua_udp.c | 272 +++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 250 insertions(+), 22 deletions(-)
diff --git a/src/lua/lua_udp.c b/src/lua/lua_udp.c
index 3b54c5e9e..1bff702a9 100644
--- a/src/lua/lua_udp.c
+++ b/src/lua/lua_udp.c
@@ -54,16 +54,21 @@ static const struct luaL_reg udp_libf[] = {
struct lua_udp_cbdata {
struct event io;
+ struct timeval tv;
struct event_base *ev_base;
+ struct rspamd_async_event *async_ev;
+ struct rspamd_task *task;
rspamd_mempool_t *pool;
rspamd_inet_addr_t *addr;
struct rspamd_symcache_item *item;
struct rspamd_async_session *s;
struct iovec *iov;
lua_State *L;
+ guint retransmits;
guint iovlen;
gint sock;
gint cbref;
+ gboolean sent;
};
#define msg_debug_udp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -110,6 +115,26 @@ lua_udp_cbd_fin (gpointer p)
if (cbd->addr) {
rspamd_inet_address_free (cbd->addr);
}
+
+ if (cbd->cbref) {
+ luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ }
+}
+
+static void
+lua_udp_maybe_free (struct lua_udp_cbdata *cbd)
+{
+ if (cbd->item) {
+ rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
+ cbd->item = NULL;
+ }
+
+ if (cbd->async_ev) {
+ rspamd_session_remove_event (cbd->s, lua_udp_cbd_fin, cbd);
+ }
+ else {
+ lua_udp_cbd_fin (cbd);
+ }
}
@@ -143,44 +168,186 @@ lua_try_send_request (struct lua_udp_cbdata *cbd)
return RSPAMD_SENT_FAILURE;
}
+static void
+lua_udp_maybe_push_error (struct lua_udp_cbdata *cbd, const gchar *err)
+{
+ if (cbd->cbref != -1) {
+ gint top;
+ lua_State *L = cbd->L;
+
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+
+ /* Error message */
+ lua_pushboolean (L, false);
+ lua_pushstring (L, err);
+
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item (cbd->task, cbd->item);
+ }
+
+ if (lua_pcall (L, 2, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
+ }
+
+ lua_settop (L, top);
+ }
+
+ lua_udp_maybe_free (cbd);
+}
+
+static void
+lua_udp_push_data (struct lua_udp_cbdata *cbd, const gchar *data,
+ gssize len)
+{
+ if (cbd->cbref != -1) {
+ gint top;
+ lua_State *L = cbd->L;
+
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+
+ /* Error message */
+ lua_pushboolean (L, true);
+ lua_pushlstring (L, data, len);
+
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item (cbd->task, cbd->item);
+ }
+
+ if (lua_pcall (L, 2, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
+ }
+
+ lua_settop (L, top);
+ }
+
+ lua_udp_maybe_free (cbd);
+}
+
+static gboolean
+lua_udp_maybe_register_event (struct lua_udp_cbdata *cbd)
+{
+ if (cbd->s) {
+ cbd->async_ev = rspamd_session_add_event (cbd->s, lua_udp_cbd_fin,
+ cbd, M);
+
+ if (!cbd->async_ev) {
+ return FALSE;
+ }
+ }
+
+ if (cbd->task) {
+ cbd->item = rspamd_symcache_get_cur_item (cbd->task);
+ rspamd_symcache_item_async_inc (cbd->task, cbd->item, M);
+ }
+
+ return TRUE;
+}
+
+static void
+lua_udp_io_handler (gint fd, short what, gpointer p)
+{
+ struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
+ lua_State *L;
+ gssize r;
+
+ L = cbd->L;
+
+ if (what == EV_TIMEOUT) {
+ if (cbd->sent && cbd->retransmits > 0) {
+ r = lua_try_send_request (cbd);
+
+ if (r == RSPAMD_SENT_OK) {
+ event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->io);
+ event_add (&cbd->io, &cbd->tv);
+ lua_udp_maybe_register_event (cbd);
+ }
+ else if (r == RSPAMD_SENT_FAILURE) {
+ lua_udp_maybe_push_error (cbd, "write error");
+ }
+ else {
+ cbd->retransmits --;
+ event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->io);
+ event_add (&cbd->io, &cbd->tv);
+ }
+ }
+ else {
+ if (!cbd->sent) {
+ lua_udp_maybe_push_error (cbd, "sent timeout");
+ }
+ else {
+ lua_udp_maybe_push_error (cbd, "read timeout");
+ }
+ }
+ }
+ else if (what == EV_WRITE) {
+ r = lua_try_send_request (cbd);
+
+ if (r == RSPAMD_SENT_OK) {
+ if (cbd->cbref != -1) {
+ event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->io);
+ event_add (&cbd->io, &cbd->tv);
+ cbd->sent = TRUE;
+ }
+ else {
+ lua_udp_maybe_free (cbd);
+ }
+ }
+ else if (r == RSPAMD_SENT_FAILURE) {
+ lua_udp_maybe_push_error (cbd, "write error");
+ }
+ else {
+ cbd->retransmits --;
+ event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->io);
+ event_add (&cbd->io, &cbd->tv);
+ }
+ }
+ else if (what == EV_READ) {
+ guchar udpbuf[4096];
+ socklen_t slen;
+ struct sockaddr *sa;
+
+ sa = rspamd_inet_address_get_sa (cbd->addr, &slen);
+
+ r = recvfrom (cbd->sock, udpbuf, sizeof (udpbuf), 0, sa, &slen);
+
+ if (r == -1) {
+ lua_udp_maybe_push_error (cbd, strerror (errno));
+ }
+ else {
+ lua_udp_push_data (cbd, udpbuf, r);
+ }
+ }
+}
/***
- * @function rspamd_tcp.request({params})
- * This function creates and sends TCP request to the specified host and port,
- * resolves hostname (if needed) and invokes continuation callback upon data received
- * from the remote peer. This function accepts table of arguments with the following
- * attributes
+ * @function rspamd_udp.sendto({params})
+ * This function simply sends data to an external UDP service
*
* - `task`: rspamd task objects (implies `pool`, `session`, `ev_base` and `resolver` arguments)
* - `ev_base`: event base (if no task specified)
* - `session`: events session (no task)
* - `host`: IP or name of the peer (required)
- * - `port`: remote port to use
+ * - `port`: remote port to use (if `host` has no port part this is required)
* - `data`: a table of strings or `rspamd_text` objects that contains data pieces
- * - `callback`: continuation function (required)
- * - `on_connect`: callback called on connection success
- * - `timeout`: floating point value that specifies timeout for IO operations in **seconds**
- * - `partial`: boolean flag that specifies that callback should be called on any data portion received
- * - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp)
- * - `shutdown`: half-close socket after writing (boolean: default false)
- * - `read`: read response after sending request (boolean: default true)
- * @return {boolean} true if request has been sent
+ * @return {boolean} true if request has been sent (additional string if it has not)
*/
static gint
lua_udp_sendto (lua_State *L) {
LUA_TRACE_POINT;
const gchar *host;
guint port;
- gint cbref, tp, conn_cbref = -1;
struct event_base *ev_base = NULL;
struct lua_udp_cbdata *cbd;
struct rspamd_async_session *session = NULL;
struct rspamd_task *task = NULL;
rspamd_inet_addr_t *addr;
rspamd_mempool_t *pool = NULL;
- struct iovec *iov = NULL;
- guint niov = 0, total_out;
- guint64 h;
gdouble timeout = default_udp_timeout;
if (lua_type (L, 1) == LUA_TTABLE) {
@@ -204,7 +371,9 @@ lua_udp_sendto (lua_State *L) {
host = luaL_checkstring (L, -1);
if (rspamd_parse_inet_address (&addr, host, 0)) {
- rspamd_inet_address_set_port (addr, port);
+ if (port != 0) {
+ rspamd_inet_address_set_port (addr, port);
+ }
}
else {
lua_pop (L, 1);
@@ -222,6 +391,10 @@ lua_udp_sendto (lua_State *L) {
}
addr = rspamd_inet_address_copy (lip->addr);
+
+ if (port != 0) {
+ rspamd_inet_address_set_port (addr, port);
+ }
}
else {
lua_pop (L, 1);
@@ -276,6 +449,14 @@ lua_udp_sendto (lua_State *L) {
}
lua_pop (L, 1);
+ if (!ev_base || !pool) {
+ rspamd_inet_address_free (addr);
+
+ return luaL_error (L, "invalid arguments");
+ }
+
+
+
if (!ev_base || !pool) {
rspamd_inet_address_free (addr);
@@ -289,6 +470,8 @@ lua_udp_sendto (lua_State *L) {
cbd->addr = addr;
cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
SOCK_DGRAM, 0, TRUE);
+ cbd->cbref = -1;
+ double_to_tv (timeout, &cbd->tv);
if (cbd->sock == -1) {
rspamd_inet_address_free (addr);
@@ -301,6 +484,8 @@ lua_udp_sendto (lua_State *L) {
gsize data_len;
lua_pushstring (L, "data");
+ lua_gettable (L, -2);
+
if (lua_type (L, -1) == LUA_TTABLE) {
data_len = rspamd_lua_table_size (L, -1);
cbd->iov = rspamd_mempool_alloc (pool,
@@ -322,22 +507,65 @@ lua_udp_sendto (lua_State *L) {
lua_pop (L, 1);
+ lua_pushstring (L, "callback");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TFUNCTION) {
+ cbd->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ }
+ else {
+ lua_pop (L, 1);
+ }
+
+ lua_pushstring (L, "retransmits");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TNUMBER) {
+ cbd->retransmits = lua_tonumber (L, -1);
+ }
+ lua_pop (L, 1);
+
enum rspamd_udp_send_result r;
r = lua_try_send_request (cbd);
if (r == RSPAMD_SENT_OK) {
+ if (cbd->cbref == -1) {
+ lua_udp_maybe_free (cbd);
+ }
+ else {
+ if (!lua_udp_maybe_register_event (cbd)) {
+ lua_pushboolean (L, false);
+ lua_pushstring (L, "session error");
+ lua_udp_maybe_free (cbd);
+
+ return 2;
+ }
+
+ event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->io);
+ event_add (&cbd->io, &cbd->tv);
+ cbd->sent = TRUE;
+ }
+
lua_pushboolean (L, true);
- lua_udp_cbd_fin (cbd);
}
else if (r == RSPAMD_SENT_FAILURE) {
lua_pushboolean (L, false);
lua_pushstring (L, strerror (errno));
- lua_udp_cbd_fin (cbd);
+ lua_udp_maybe_free (cbd);
return 2;
}
else {
- /* TODO: add waiting */
+ event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->io);
+ event_add (&cbd->io, &cbd->tv);
+
+ if (!lua_udp_maybe_register_event (cbd)) {
+ lua_pushboolean (L, false);
+ lua_pushstring (L, "session error");
+ lua_udp_maybe_free (cbd);
+
+ return 2;
+ }
}
}
else {
More information about the Commits
mailing list