commit 63f823e: [Project] Further workers refactoring

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:32 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-19 17:46:28 +0100
URL: https://github.com/rspamd/rspamd/commit/63f823eb9d6b4cfed6c3014ab350dfc61f33cb28

[Project] Further workers refactoring

---
 src/CMakeLists.txt          |  5 ++-
 src/rspamadm/CMakeLists.txt |  3 +-
 src/rspamadm/lua_repl.c     |  2 +-
 src/rspamd.c                | 84 +++++++++++++++++++++------------------------
 4 files changed, 43 insertions(+), 51 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index caa0b5d0d..a23c4e505 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -91,8 +91,7 @@ SET(RSPAMDSRC	controller.c
 				fuzzy_storage.c
 				rspamd.c
 				worker.c
-				rspamd_proxy.c
-				log_helper.c)
+				rspamd_proxy.c)
 
 SET(PLUGINSSRC	plugins/surbl.c
 				plugins/regexp.c
@@ -103,7 +102,7 @@ SET(PLUGINSSRC	plugins/surbl.c
 				libserver/rspamd_control.c)
 
 SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller fuzzy lua rspamd_proxy log_helper)
+SET(WORKERS_LIST normal controller fuzzy rspamd_proxy)
 IF (ENABLE_HYPERSCAN MATCHES "ON")
 	LIST(APPEND WORKERS_LIST "hs_helper")
 	LIST(APPEND RSPAMDSRC "hs_helper.c")
diff --git a/src/rspamadm/CMakeLists.txt b/src/rspamadm/CMakeLists.txt
index b98b99473..3d4f2f490 100644
--- a/src/rspamadm/CMakeLists.txt
+++ b/src/rspamadm/CMakeLists.txt
@@ -16,8 +16,7 @@ SET(RSPAMADMSRC rspamadm.c
         ${CMAKE_SOURCE_DIR}/src/controller.c
         ${CMAKE_SOURCE_DIR}/src/fuzzy_storage.c
         ${CMAKE_SOURCE_DIR}/src/worker.c
-        ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c
-        ${CMAKE_SOURCE_DIR}/src/log_helper.c)
+        ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c)
 INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR})
 IF (ENABLE_HYPERSCAN MATCHES "ON")
     LIST(APPEND RSPAMADMSRC "${CMAKE_SOURCE_DIR}/src/hs_helper.c")
diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c
index 33805d66b..a95521bdb 100644
--- a/src/rspamadm/lua_repl.c
+++ b/src/rspamadm/lua_repl.c
@@ -296,7 +296,7 @@ wait_session_events (void)
 {
 	/* XXX: it's probably worth to add timeout here - not to wait forever */
 	while (rspamd_session_events_pending (rspamadm_session) > 0) {
-		event_base_loop (rspamd_main->event_loop, EVLOOP_ONCE);
+		ev_loop (rspamd_main->event_loop, EVLOOP_ONESHOT);
 	}
 }
 
diff --git a/src/rspamd.c b/src/rspamd.c
index a4a659a53..813b7b7bb 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -358,21 +358,22 @@ reread_config (struct rspamd_main *rspamd_main)
 
 struct waiting_worker {
 	struct rspamd_main *rspamd_main;
-	struct event wait_ev;
+ 	struct ev_timer wait_ev;
 	struct rspamd_worker_conf *cf;
 	guint oldindex;
 };
 
 static void
-rspamd_fork_delayed_cb (gint signo, short what, gpointer arg)
+rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents)
 {
-	struct waiting_worker *w = arg;
-
-	event_del (&w->wait_ev);
-	rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex,
-			w->rspamd_main->event_loop);
-	REF_RELEASE (w->cf);
-	g_free (w);
+	struct waiting_worker *waiting_worker = (struct waiting_worker *)w->data;
+
+	ev_timer_stop (EV_A_ &waiting_worker->wait_ev);
+	rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf,
+			waiting_worker->oldindex,
+			waiting_worker->rspamd_main->event_loop);
+	REF_RELEASE (waiting_worker->cf);
+	g_free (waiting_worker);
 }
 
 static void
@@ -390,9 +391,8 @@ rspamd_fork_delayed (struct rspamd_worker_conf *cf,
 	tv.tv_sec = SOFT_FORK_TIME;
 	tv.tv_usec = 0;
 	REF_RETAIN (cf);
-	event_set (&nw->wait_ev, -1, EV_TIMEOUT, rspamd_fork_delayed_cb, nw);
-	event_base_set (rspamd_main->event_loop, &nw->wait_ev);
-	event_add (&nw->wait_ev, &tv);
+	ev_timer_init (&nw->wait_ev, rspamd_fork_delayed_cb, SOFT_FORK_TIME, 0.0);
+	ev_timer_start (rspamd_main->event_loop, &nw->wait_ev);
 }
 
 static GList *
@@ -803,7 +803,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
 		if (w->tmp_data) {
 			g_free (w->tmp_data);
 		}
-		event_del (&w->srv_ev);
+		ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
 	}
 
 	if (w->finish_actions) {
@@ -1026,9 +1026,9 @@ rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents)
 }
 
 static void
-rspamd_cld_handler (gint signo, short what, gpointer arg)
+rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
 {
-	struct rspamd_main *rspamd_main = arg;
+	struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
 	guint i;
 	gint res = 0;
 	struct rspamd_worker *cur;
@@ -1135,7 +1135,7 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
 				if (cur->tmp_data) {
 					g_free (cur->tmp_data);
 				}
-				event_del (&cur->srv_ev);
+				ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
 			}
 
 			if (cur->control_pipe[0] != -1) {
@@ -1166,29 +1166,29 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
 }
 
 static void
-rspamd_final_term_handler (gint signo, short what, gpointer arg)
+rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_main *rspamd_main = arg;
+	struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
 
 	term_attempts--;
 
 	g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
 
 	if (g_hash_table_size (rspamd_main->workers) == 0) {
-		event_base_loopexit (rspamd_main->event_loop, NULL);
+		ev_break (rspamd_main->event_loop, EVBREAK_ALL);
 	}
 }
 
 /* Control socket handler */
 static void
-rspamd_control_handler (gint fd, short what, gpointer arg)
+rspamd_control_handler (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_main *rspamd_main = arg;
+	struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
 	rspamd_inet_addr_t *addr;
 	gint nfd;
 
 	if ((nfd =
-				 rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
+				 rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) {
 		msg_warn_main ("accept failed: %s", strerror (errno));
 		return;
 	}
@@ -1246,7 +1246,6 @@ main (gint argc, gchar **argv, gchar **env)
 	struct ev_loop *event_loop;
 	ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
 	ev_io control_ev;
-	struct timeval term_tv;
 	struct rspamd_main *rspamd_main;
 	gboolean skip_pid = FALSE, valgrind_mode = FALSE;
 
@@ -1524,10 +1523,9 @@ main (gint argc, gchar **argv, gchar **env)
 
 
 	/* XXX: deal with children */
-	evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, rspamd_main);
-	event_base_set (event_loop, &cld_ev);
-	event_add (&cld_ev, NULL);
-
+	cld_ev.data = rspamd_main;
+	ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD);
+	ev_signal_start (event_loop, &cld_ev);
 
 	rspamd_check_core_limits (rspamd_main);
 	rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
@@ -1545,18 +1543,17 @@ main (gint argc, gchar **argv, gchar **env)
 		ev_io_start (event_loop, &control_ev);
 	}
 
-	event_base_loop (event_loop, 0);
+	ev_loop (event_loop, 0);
 	/* We need to block signals unless children are waited for */
 	rspamd_worker_block_signals ();
-
-	event_del (&term_ev);
-	event_del (&int_ev);
-	event_del (&hup_ev);
-	event_del (&cld_ev);
-	event_del (&usr1_ev);
+	ev_signal_stop (event_loop, &term_ev);
+	ev_signal_stop (event_loop, &int_ev);
+	ev_signal_stop (event_loop, &hup_ev);
+	ev_signal_stop (event_loop, &cld_ev);
+	ev_signal_stop (event_loop, &usr1_ev);
 
 	if (control_fd != -1) {
-		event_del (&control_ev);
+		ev_io_stop (event_loop, &control_ev);
 		close (control_fd);
 	}
 
@@ -1568,20 +1565,17 @@ main (gint argc, gchar **argv, gchar **env)
 		term_attempts = TERMINATION_ATTEMPTS;
 	}
 
-	/* Check each 200 ms */
-	term_tv.tv_sec = 0;
-	term_tv.tv_usec = 200000;
 
 	/* Wait for workers termination */
 	g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
 
-	event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST,
-			rspamd_final_term_handler, rspamd_main);
-	event_base_set (event_loop, &term_ev);
-	event_add (&term_ev, &term_tv);
+	static ev_timer ev_finale;
+
+	ev_finale.data = rspamd_main;
+	ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2);
+	ev_timer_start (event_loop, &ev_finale);
 
-	event_base_loop (event_loop, 0);
-	event_del (&term_ev);
+	ev_loop (event_loop, 0);
 
 	/* Maybe save roll history */
 	if (rspamd_main->cfg->history_file) {
@@ -1602,7 +1596,7 @@ main (gint argc, gchar **argv, gchar **env)
 	}
 
 	g_free (rspamd_main);
-	event_base_free (event_loop);
+	ev_unref (event_loop);
 	sqlite3_shutdown ();
 
 	if (control_addr) {


More information about the Commits mailing list