commit 45b60f8: [Project] More libserver adoptions

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:25 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-19 13:16:25 +0100
URL: https://github.com/rspamd/rspamd/commit/45b60f8df79e6d5ecdb85725aa4841151bdcd853

[Project] More libserver adoptions

---
 contrib/libev/CMakeLists.txt   |   4 +
 src/libserver/rspamd_control.c | 154 ++++++++++-----------
 src/plugins/fuzzy_check.c      | 299 ++++++++++++++++++-----------------------
 src/plugins/surbl.c            |   5 +-
 4 files changed, 208 insertions(+), 254 deletions(-)

diff --git a/contrib/libev/CMakeLists.txt b/contrib/libev/CMakeLists.txt
index e681aeb91..c99b4dd32 100644
--- a/contrib/libev/CMakeLists.txt
+++ b/contrib/libev/CMakeLists.txt
@@ -64,4 +64,8 @@ ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
 		)
 IF(HAVE_EVENTFD)
 	ADD_DEFINITIONS(-DEV_USE_EVENTFD=1)
+ENDIF()
+
+IF(ENABLE_FULL_DEBUG MATCHES "ON")
+	ADD_DEFINITIONS(-DEV_VERIFY=3)
 ENDIF()
\ No newline at end of file
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 9788d47ed..62ca24643 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -19,6 +19,7 @@
 #include "worker_util.h"
 #include "libutil/http_connection.h"
 #include "libutil/http_private.h"
+#include "libutil/libev_helper.h"
 #include "unix-std.h"
 #include "utlist.h"
 
@@ -26,20 +27,14 @@
 #include <sys/resource.h>
 #endif
 
-static struct timeval io_timeout = {
-		.tv_sec = 30,
-		.tv_usec = 0
-};
-static struct timeval worker_io_timeout = {
-		.tv_sec = 0,
-		.tv_usec = 500000
-};
+static ev_tstamp io_timeout = 30.0;
+static ev_tstamp worker_io_timeout = 0.5;
 
 struct rspamd_control_session;
 
 struct rspamd_control_reply_elt {
 	struct rspamd_control_reply reply;
-	struct event io_ev;
+	struct rspamd_io_ev ev;
 	struct rspamd_worker *wrk;
 	gpointer ud;
 	gint attached_fd;
@@ -48,6 +43,7 @@ struct rspamd_control_reply_elt {
 
 struct rspamd_control_session {
 	gint fd;
+	struct ev_loop *event_loop;
 	struct rspamd_main *rspamd_main;
 	struct rspamd_http_connection *conn;
 	struct rspamd_control_command cmd;
@@ -131,7 +127,7 @@ rspamd_control_send_error (struct rspamd_control_session *session,
 			NULL,
 			"application/json",
 			session,
-			&io_timeout);
+			io_timeout);
 }
 
 static void
@@ -154,7 +150,7 @@ rspamd_control_send_ucl (struct rspamd_control_session *session,
 			NULL,
 			"application/json",
 			session,
-			&io_timeout);
+			io_timeout);
 }
 
 static void
@@ -168,7 +164,8 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
 			rspamd_inet_address_to_string (session->addr));
 
 	DL_FOREACH_SAFE (session->replies, elt, telt) {
-		event_del (&elt->io_ev);
+		rspamd_ev_watcher_stop (session->event_loop,
+				&elt->ev);
 		g_free (elt);
 	}
 
@@ -358,7 +355,8 @@ rspamd_control_wrk_io (gint fd, short what, gpointer ud)
 	}
 
 	session->replies_remain --;
-	event_del (&elt->io_ev);
+	rspamd_ev_watcher_stop (session->event_loop,
+			&elt->ev);
 
 	if (session->replies_remain == 0) {
 		rspamd_control_write_reply (session);
@@ -434,12 +432,12 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
 			rep_elt = g_malloc0 (sizeof (*rep_elt));
 			rep_elt->wrk = wrk;
 			rep_elt->ud = ud;
-			event_set (&rep_elt->io_ev, wrk->control_pipe[0],
-					EV_READ | EV_PERSIST, handler,
+			rspamd_ev_watcher_init (&rep_elt->ev,
+					wrk->control_pipe[0],
+					EV_READ, handler,
 					rep_elt);
-			event_base_set (rspamd_main->event_loop,
-					&rep_elt->io_ev);
-			event_add (&rep_elt->io_ev, &worker_io_timeout);
+			rspamd_ev_watcher_start (rspamd_main->event_loop,
+					&rep_elt->ev, worker_io_timeout);
 
 			DL_APPEND (res, rep_elt);
 		}
@@ -527,11 +525,11 @@ rspamd_control_process_client_socket (struct rspamd_main *rspamd_main,
 	session->rspamd_main = rspamd_main;
 	session->addr = addr;
 	rspamd_http_connection_read_message (session->conn, session,
-			&io_timeout);
+			io_timeout);
 }
 
 struct rspamd_worker_control_data {
-	struct event io_ev;
+	ev_io io_ev;
 	struct rspamd_worker *worker;
 	struct ev_loop *ev_base;
 	struct {
@@ -613,9 +611,10 @@ rspamd_control_default_cmd_handler (gint fd,
 }
 
 static void
-rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
+rspamd_control_default_worker_handler (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_worker_control_data *cd = ud;
+	struct rspamd_worker_control_data *cd =
+			(struct rspamd_worker_control_data *)w->data;
 	static struct rspamd_control_command cmd;
 	static struct msghdr msg;
 	static struct iovec iov;
@@ -631,15 +630,15 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
 
-	r = recvmsg (fd, &msg, 0);
+	r = recvmsg (w->fd, &msg, 0);
 
 	if (r == -1) {
 		msg_err ("cannot read request from the control socket: %s",
 				strerror (errno));
 
 		if (errno != EAGAIN && errno != EINTR) {
-			event_del (&cd->io_ev);
-			close (fd);
+			ev_io_stop (cd->ev_base, &cd->io_ev);
+			close (w->fd);
 		}
 	}
 	else if (r < (gint)sizeof (cmd)) {
@@ -647,8 +646,8 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
 				(gint)sizeof (cmd));
 
 		if (r == 0) {
-			event_del (&cd->io_ev);
-			close (fd);
+			ev_io_stop (cd->ev_base, &cd->io_ev);
+			close (w->fd);
 		}
  	}
 	else if ((gint)cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) {
@@ -660,13 +659,13 @@ rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
 		if (cd->handlers[cmd.type].handler) {
 			cd->handlers[cmd.type].handler (cd->worker->srv,
 					cd->worker,
-					fd,
+					w->fd,
 					rfd,
 					&cmd,
 					cd->handlers[cmd.type].ud);
 		}
 		else {
-			rspamd_control_default_cmd_handler (fd, rfd, cd, &cmd);
+			rspamd_control_default_cmd_handler (w->fd, rfd, cd, &cmd);
 		}
 	}
 	else {
@@ -684,10 +683,10 @@ rspamd_control_worker_add_default_handler (struct rspamd_worker *worker,
 	cd->worker = worker;
 	cd->ev_base = ev_base;
 
-	event_set (&cd->io_ev, worker->control_pipe[1], EV_READ | EV_PERSIST,
-			rspamd_control_default_worker_handler, cd);
-	event_base_set (ev_base, &cd->io_ev);
-	event_add (&cd->io_ev, NULL);
+	cd->io_ev.data = cd;
+	ev_io_init (&cd->io_ev, rspamd_control_default_worker_handler,
+			worker->control_pipe[1], EV_READ);
+	ev_io_start (ev_base, &cd->io_ev);
 
 	worker->control_data = cd;
 }
@@ -720,26 +719,28 @@ struct rspamd_srv_reply_data {
 };
 
 static void
-rspamd_control_hs_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_hs_io_handler (int fd, short what, void *ud)
 {
-	struct rspamd_control_reply_elt *elt = ud;
+	struct rspamd_control_reply_elt *elt =
+			(struct rspamd_control_reply_elt *)ud;
 	struct rspamd_control_reply rep;
 
 	/* At this point we just ignore replies from the workers */
 	(void)read (fd, &rep, sizeof (rep));
-	event_del (&elt->io_ev);
+	rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
 	g_free (elt);
 }
 
 static void
-rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
+rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
 {
-	struct rspamd_control_reply_elt *elt = ud;
+	struct rspamd_control_reply_elt *elt =
+			(struct rspamd_control_reply_elt *)ud;
 	struct rspamd_control_reply rep;
 
 	/* At this point we just ignore replies from the workers */
 	(void) read (fd, &rep, sizeof (rep));
-	event_del (&elt->io_ev);
+	rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
 	g_free (elt);
 }
 
@@ -794,7 +795,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
 }
 
 static void
-rspamd_srv_handler (gint fd, short what, gpointer ud)
+rspamd_srv_handler (EV_P_ ev_io *w, int revents)
 {
 	struct rspamd_worker *worker;
 	static struct rspamd_srv_command cmd;
@@ -809,8 +810,8 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 	struct rspamd_control_command wcmd;
 	gssize r;
 
-	if (what == EV_READ) {
-		worker = ud;
+	if (revents == EV_READ) {
+		worker = (struct rspamd_worker *)w->data;
 		srv = worker->srv;
 		iov.iov_base = &cmd;
 		iov.iov_len = sizeof (cmd);
@@ -820,7 +821,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 		msg.msg_iov = &iov;
 		msg.msg_iovlen = 1;
 
-		r = recvmsg (fd, &msg, 0);
+		r = recvmsg (w->fd, &msg, 0);
 
 		if (r == -1) {
 			msg_err ("cannot read from worker's srv pipe: %s",
@@ -831,7 +832,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 			 * Usually this means that a worker is dead, so do not try to read
 			 * anything
 			 */
-			event_del (&worker->srv_ev);
+			ev_io_stop (EV_A_ w);
 		}
 		else if (r != sizeof (cmd)) {
 			msg_err ("cannot read from worker's srv pipe incomplete command: %d",
@@ -919,17 +920,14 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 			}
 
 			/* Now plan write event and send data back */
-			event_del (&worker->srv_ev);
-			event_set (&worker->srv_ev,
-					worker->srv_pipe[0],
-					EV_WRITE,
-					rspamd_srv_handler,
-					rdata);
-			event_add (&worker->srv_ev, NULL);
+			w->data = rdata;
+			ev_io_stop (EV_A_ w);
+			ev_io_set (w, worker->srv_pipe[0], EV_WRITE);
+			ev_io_start (EV_A_ w);
 		}
 	}
-	else if (what == EV_WRITE) {
-		rdata = ud;
+	else if (revents == EV_WRITE) {
+		rdata = (struct rspamd_srv_reply_data *)w->data;
 		worker = rdata->worker;
 		worker->tmp_data = NULL; /* Avoid race */
 		srv = rdata->srv;
@@ -953,7 +951,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 		msg.msg_iov = &iov;
 		msg.msg_iovlen = 1;
 
-		r = sendmsg (fd, &msg, 0);
+		r = sendmsg (w->fd, &msg, 0);
 
 		if (r == -1) {
 			msg_err ("cannot write to worker's srv pipe: %s",
@@ -961,13 +959,10 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 		}
 
 		g_free (rdata);
-		event_del (&worker->srv_ev);
-		event_set (&worker->srv_ev,
-				worker->srv_pipe[0],
-				EV_READ | EV_PERSIST,
-				rspamd_srv_handler,
-				worker);
-		event_add (&worker->srv_ev, NULL);
+		w->data = worker;
+		ev_io_stop (EV_A_ w);
+		ev_io_set (w, worker->srv_pipe[0], EV_READ);
+		ev_io_start (EV_A_ w);
 	}
 }
 
@@ -979,10 +974,9 @@ rspamd_srv_start_watching (struct rspamd_main *srv,
 	g_assert (worker != NULL);
 
 	worker->tmp_data = NULL;
-	event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
-			rspamd_srv_handler, worker);
-	event_base_set (ev_base, &worker->srv_ev);
-	event_add (&worker->srv_ev, NULL);
+	worker->srv_ev.data = worker;
+	ev_io_init (&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ);
+	ev_io_start (ev_base, &worker->srv_ev);
 }
 
 struct rspamd_srv_request_data {
@@ -991,14 +985,14 @@ struct rspamd_srv_request_data {
 	gint attached_fd;
 	struct rspamd_srv_reply rep;
 	rspamd_srv_reply_handler handler;
-	struct event io_ev;
+	ev_io io_ev;
 	gpointer ud;
 };
 
 static void
-rspamd_srv_request_handler (gint fd, short what, gpointer ud)
+rspamd_srv_request_handler (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_srv_request_data *rd = ud;
+	struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *)w->data;
 	struct msghdr msg;
 	struct iovec iov;
 	guchar fdspace[CMSG_SPACE(sizeof (int))];
@@ -1006,7 +1000,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
 	gssize r;
 	gint rfd = -1;
 
-	if (what == EV_WRITE) {
+	if (revents == EV_WRITE) {
 		/* Send request to server */
 		memset (&msg, 0, sizeof (msg));
 
@@ -1027,17 +1021,16 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
 		msg.msg_iov = &iov;
 		msg.msg_iovlen = 1;
 
-		r = sendmsg (fd, &msg, 0);
+		r = sendmsg (w->fd, &msg, 0);
 
 		if (r == -1) {
 			msg_err ("cannot write to server pipe: %s", strerror (errno));
 			goto cleanup;
 		}
 
-		event_del (&rd->io_ev);
-		event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ,
-				rspamd_srv_request_handler, rd);
-		event_add (&rd->io_ev, NULL);
+		ev_io_stop (EV_A_ w);
+		ev_io_set (w, rd->worker->srv_pipe[1], EV_READ);
+		ev_io_start (EV_A_ w);
 	}
 	else {
 		iov.iov_base = &rd->rep;
@@ -1048,7 +1041,7 @@ rspamd_srv_request_handler (gint fd, short what, gpointer ud)
 		msg.msg_iov = &iov;
 		msg.msg_iovlen = 1;
 
-		r = recvmsg (fd, &msg, 0);
+		r = recvmsg (w->fd, &msg, 0);
 
 		if (r == -1) {
 			msg_err ("cannot read from server pipe: %s", strerror (errno));
@@ -1075,7 +1068,8 @@ cleanup:
 	if (rd->handler) {
 		rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
 	}
-	event_del (&rd->io_ev);
+
+	ev_io_stop (EV_A_ w);
 	g_free (rd);
 }
 
@@ -1102,8 +1096,8 @@ rspamd_srv_send_command (struct rspamd_worker *worker,
 	rd->rep.type = cmd->type;
 	rd->attached_fd = attached_fd;
 
-	event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE,
-			rspamd_srv_request_handler, rd);
-	event_base_set (ev_base, &rd->io_ev);
-	event_add (&rd->io_ev, NULL);
+	rd->io_ev.data = rd;
+	ev_io_init (&rd->io_ev, rspamd_srv_request_handler,
+			rd->worker->srv_pipe[1], EV_WRITE);
+	ev_io_start (ev_base, &rd->io_ev);
 }
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 2c91869d6..75df2a645 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -47,6 +47,7 @@
 #include "libstat/stat_api.h"
 #include <math.h>
 #include <src/libmime/message.h>
+#include "libutil/libev_helper.h"
 
 #define DEFAULT_SYMBOL "R_FUZZY_HASH"
 
@@ -129,9 +130,8 @@ struct fuzzy_client_session {
 	struct rspamd_symcache_item *item;
 	struct upstream *server;
 	struct fuzzy_rule *rule;
-	struct event ev;
-	struct event timev;
-	struct timeval tv;
+	struct ev_loop *event_loop;
+	struct rspamd_io_ev ev;
 	gint state;
 	gint fd;
 	guint retransmits;
@@ -146,9 +146,8 @@ struct fuzzy_learn_session {
 	struct upstream *server;
 	struct fuzzy_rule *rule;
 	struct rspamd_task *task;
-	struct event ev;
-	struct event timev;
-	struct timeval tv;
+	struct ev_loop *event_loop;
+	struct rspamd_io_ev ev;
 	gint fd;
 	guint retransmits;
 };
@@ -1185,8 +1184,7 @@ fuzzy_io_fin (void *ud)
 		g_ptr_array_free (session->results, TRUE);
 	}
 
-	event_del (&session->ev);
-	event_del (&session->timev);
+	rspamd_ev_watcher_stop (session->event_loop, &session->ev);
 	close (session->fd);
 }
 
@@ -2181,13 +2179,49 @@ fuzzy_check_session_is_completed (struct fuzzy_client_session *session)
 	return FALSE;
 }
 
+/* Fuzzy check timeout callback */
+static void
+fuzzy_check_timer_callback (gint fd, short what, void *arg)
+{
+	struct fuzzy_client_session *session = arg;
+	struct rspamd_task *task;
+
+	task = session->task;
+
+	/* We might be here because of other checks being slow */
+	if (fuzzy_check_try_read (session) > 0) {
+		if (fuzzy_check_session_is_completed (session)) {
+			return;
+		}
+	}
+
+	if (session->retransmits >= session->rule->ctx->retransmits) {
+		msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+				rspamd_upstream_name (session->server),
+				rspamd_inet_address_to_string_pretty (
+						rspamd_upstream_addr_cur (session->server)),
+				session->retransmits);
+		rspamd_upstream_fail (session->server, TRUE);
+
+		if (session->item) {
+			rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+		}
+		rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
+	}
+	else {
+		/* Plan write event */
+		rspamd_ev_watcher_reschedule (session->event_loop,
+				&session->ev, EV_READ|EV_WRITE);
+		session->retransmits ++;
+	}
+}
+
 /* Fuzzy check callback */
 static void
 fuzzy_check_io_callback (gint fd, short what, void *arg)
 {
 	struct fuzzy_client_session *session = arg;
 	struct rspamd_task *task;
-	struct ev_loop *ev_base;
 	gint r;
 
 	enum {
@@ -2224,18 +2258,14 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
 		}
 	}
 	else {
-		/* Should not happen */
-		g_assert (0);
+		fuzzy_check_timer_callback (fd, what, arg);
+		return;
 	}
 
 	if (ret == return_want_more) {
 		/* Processed write, switch to reading */
-		ev_base = event_get_base (&session->ev);
-		event_del (&session->ev);
-		event_set (&session->ev, fd, EV_READ,
-				fuzzy_check_io_callback, session);
-		event_base_set (ev_base, &session->ev);
-		event_add (&session->ev, NULL);
+		rspamd_ev_watcher_reschedule (session->event_loop,
+				&session->ev, EV_READ);
 	}
 	else if (ret == return_error) {
 		/* Error state */
@@ -2258,77 +2288,81 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
 		/* Read something from network */
 		if (!fuzzy_check_session_is_completed (session)) {
 			/* Need to read more */
-			ev_base = event_get_base (&session->ev);
-			event_del (&session->ev);
-			event_set (&session->ev, session->fd, EV_READ,
-					fuzzy_check_io_callback, session);
-			event_base_set (ev_base, &session->ev);
-			event_add (&session->ev, NULL);
+			rspamd_ev_watcher_reschedule (session->event_loop,
+					&session->ev, EV_READ);
 		}
 	}
 }
 
-/* Fuzzy check timeout callback */
+
 static void
-fuzzy_check_timer_callback (gint fd, short what, void *arg)
+fuzzy_lua_fin (void *ud)
 {
-	struct fuzzy_client_session *session = arg;
+	struct fuzzy_learn_session *session = ud;
+
+	(*session->saved)--;
+
+	rspamd_ev_watcher_stop (session->event_loop, &session->ev);
+	close (session->fd);
+}
+
+/* Controller IO */
+
+static void
+fuzzy_controller_timer_callback (gint fd, short what, void *arg)
+{
+	struct fuzzy_learn_session *session = arg;
 	struct rspamd_task *task;
-	struct ev_loop *ev_base;
 
 	task = session->task;
 
-	/* We might be here because of other checks being slow */
-	if (fuzzy_check_try_read (session) > 0) {
-		if (fuzzy_check_session_is_completed (session)) {
-			return;
-		}
-	}
-
 	if (session->retransmits >= session->rule->ctx->retransmits) {
-		msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
+		rspamd_upstream_fail (session->server, TRUE);
+		msg_err_task_check ("got IO timeout with server %s(%s), "
+							"after %d retransmits",
 				rspamd_upstream_name (session->server),
 				rspamd_inet_address_to_string_pretty (
 						rspamd_upstream_addr_cur (session->server)),
 				session->retransmits);
-		rspamd_upstream_fail (session->server, TRUE);
 
-		if (session->item) {
-			rspamd_symcache_item_async_dec_check (session->task, session->item, M);
+		if (session->session) {
+			rspamd_session_remove_event (session->session, fuzzy_lua_fin,
+					session);
+		}
+		else {
+			if (session->http_entry) {
+				rspamd_controller_send_error (session->http_entry,
+						500, "IO timeout with fuzzy storage");
+			}
+
+			if (*session->saved > 0 ) {
+				(*session->saved)--;
+				if (*session->saved == 0) {
+					if (session->http_entry) {
+						rspamd_task_free (session->task);
+					}
+
+					session->task = NULL;
+				}
+			}
+
+			if (session->http_entry) {
+				rspamd_http_connection_unref (session->http_entry->conn);
+			}
+
+			rspamd_ev_watcher_stop (session->event_loop,
+					&session->ev);
+			close (session->fd);
 		}
-		rspamd_session_remove_event (session->task->s, fuzzy_io_fin, session);
 	}
 	else {
 		/* Plan write event */
-		ev_base = event_get_base (&session->ev);
-		event_del (&session->ev);
-		event_set (&session->ev, fd, EV_WRITE|EV_READ,
-				fuzzy_check_io_callback, session);
-		event_base_set (ev_base, &session->ev);
-		event_add (&session->ev, NULL);
-
-		/* Plan new retransmit timer */
-		ev_base = event_get_base (&session->timev);
-		event_del (&session->timev);
-		event_base_set (ev_base, &session->timev);
-		event_add (&session->timev, &session->tv);
+		rspamd_ev_watcher_reschedule (session->event_loop,
+				&session->ev, EV_READ|EV_WRITE);
 		session->retransmits ++;
 	}
 }
 
-static void
-fuzzy_lua_fin (void *ud)
-{
-	struct fuzzy_learn_session *session = ud;
-
-	(*session->saved)--;
-
-	event_del (&session->ev);
-	event_del (&session->timev);
-	close (session->fd);
-}
-
-/* Controller IO */
 static void
 fuzzy_controller_io_callback (gint fd, short what, void *arg)
 {
@@ -2340,7 +2374,6 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
 	struct fuzzy_cmd_io *io;
 	struct rspamd_fuzzy_cmd *cmd = NULL;
 	const gchar *symbol, *ftype;
-	struct ev_loop *ev_base;
 	gint r;
 	enum {
 		return_error = 0,
@@ -2355,7 +2388,8 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
 	if (what & EV_READ) {
 		if ((r = read (fd, buf, sizeof (buf) - 1)) == -1) {
 			if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
-				event_add (&session->ev, NULL);
+				rspamd_ev_watcher_reschedule (session->event_loop,
+						&session->ev, EV_READ);
 				return;
 			}
*** OUTPUT TRUNCATED, 258 LINES SKIPPED ***


More information about the Commits mailing list