commit 0334b8e: [Project] Further rework

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:24 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-19 11:42:58 +0100
URL: https://github.com/rspamd/rspamd/commit/0334b8e433a45513c0087dda20f22a26b2e16ad1

[Project] Further rework

---
 src/libserver/milter.c                |  14 +----
 src/libserver/worker_util.c           | 115 ++++++++++++++++++----------------
 src/libserver/worker_util.h           |   2 +-
 src/libstat/learn_cache/redis_cache.c |  27 ++++----
 src/rspamd.h                          |   2 +-
 5 files changed, 77 insertions(+), 83 deletions(-)

diff --git a/src/libserver/milter.c b/src/libserver/milter.c
index bb27d2ff1..897938df0 100644
--- a/src/libserver/milter.c
+++ b/src/libserver/milter.c
@@ -186,10 +186,7 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session)
 		priv = session->priv;
 		msg_debug_milter ("destroying milter session");
 
-		if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-			event_del (&priv->ev);
-		}
-
+		rspamd_ev_watcher_stop (priv->event_loop, &priv->ev);
 		rspamd_milter_session_reset (session, RSPAMD_MILTER_RESET_ALL);
 
 		if (priv->parser.buf) {
@@ -267,14 +264,7 @@ static inline void
 rspamd_milter_plan_io (struct rspamd_milter_session *session,
 		struct rspamd_milter_private *priv, gshort what)
 {
-	if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-		event_del (&priv->ev);
-	}
-
-	event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler,
-			session);
-	event_base_set (priv->event_loop, &priv->ev);
-	event_add (&priv->ev, priv->ptv);
+	rspamd_ev_watcher_reschedule (priv->event_loop, &priv->ev, what);
 }
 
 
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 0d4c4db17..0aa0c9cf3 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -57,7 +57,7 @@
 #include <sys/ucontext.h>
 #endif
 
-static void rspamd_worker_ignore_signal (int signo);
+static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
 /**
  * Return worker's control structure by its type
  * @param type
@@ -98,6 +98,16 @@ rspamd_worker_terminate_handlers (struct rspamd_worker *w)
 
 	return ret;
 }
+
+static void
+rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents)
+{
+	ev_break (loop, EVBREAK_ALL);
+#ifdef WITH_GPERF_TOOLS
+	ProfilerStop ();
+#endif
+}
+
 /*
  * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
  */
@@ -108,7 +118,10 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 	struct timeval tv;
 
 	if (!sigh->worker->wanna_die) {
-		rspamd_worker_ignore_signal (SIGUSR2);
+		static ev_timer shutdown_ev;
+
+		rspamd_worker_ignore_signal (sigh);
+
 		tv.tv_sec = SOFT_SHUTDOWN_TIME;
 		tv.tv_usec = 0;
 		sigh->worker->wanna_die = TRUE;
@@ -119,7 +132,9 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 				G_STRFUNC,
 				"worker's shutdown is pending in %d sec",
 				SOFT_SHUTDOWN_TIME);
-		event_base_loopexit (sigh->base, &tv);
+		ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+				SOFT_SHUTDOWN_TIME, 0.0);
+		ev_timer_start (sigh->event_loop, &shutdown_ev);
 		rspamd_worker_stop_accept (sigh->worker);
 	}
 
@@ -142,9 +157,12 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 static gboolean
 rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
 {
-	struct timeval tv;
+	ev_tstamp delay;
 
 	if (!sigh->worker->wanna_die) {
+		static ev_timer shutdown_ev;
+
+		rspamd_worker_ignore_signal (sigh);
 		rspamd_default_log_function (G_LOG_LEVEL_INFO,
 				sigh->worker->srv->server_pool->tag.tagname,
 				sigh->worker->srv->server_pool->tag.uid,
@@ -152,19 +170,17 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 				"terminating after receiving signal %s",
 				g_strsignal (sigh->signo));
 
-		tv.tv_usec = 0;
 		if (rspamd_worker_terminate_handlers (sigh->worker)) {
-			tv.tv_sec =  SOFT_SHUTDOWN_TIME;
+			delay = SOFT_SHUTDOWN_TIME;
 		}
 		else {
-			tv.tv_sec = 0;
+			delay = 0;
 		}
 
 		sigh->worker->wanna_die = 1;
-		event_base_loopexit (sigh->base, &tv);
-#ifdef WITH_GPERF_TOOLS
-		ProfilerStop ();
-#endif
+		ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+				SOFT_SHUTDOWN_TIME, 0.0);
+		ev_timer_start (sigh->event_loop, &shutdown_ev);
 		rspamd_worker_stop_accept (sigh->worker);
 	}
 
@@ -173,10 +189,10 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 }
 
 static void
-rspamd_worker_signal_handle (int fd, short what, void *arg)
+rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
 {
 	struct rspamd_worker_signal_handler *sigh =
-			(struct rspamd_worker_signal_handler *) arg;
+			(struct rspamd_worker_signal_handler *)w->data;
 	struct rspamd_worker_signal_cb *cb, *cbtmp;
 
 	/* Call all signal handlers registered */
@@ -188,15 +204,9 @@ rspamd_worker_signal_handle (int fd, short what, void *arg)
 }
 
 static void
-rspamd_worker_ignore_signal (int signo)
+rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
 {
-	struct sigaction sig;
-
-	sigemptyset (&sig.sa_mask);
-	sigaddset (&sig.sa_mask, signo);
-	sig.sa_handler = SIG_IGN;
-	sig.sa_flags = 0;
-	sigaction (signo, &sig, NULL);
+	ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
 }
 
 static void
@@ -222,14 +232,14 @@ rspamd_sigh_free (void *p)
 		g_free (cb);
 	}
 
-	event_del (&sigh->ev);
+	ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
 	rspamd_worker_default_signal (sigh->signo);
 	g_free (sigh);
 }
 
 void
 rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
-		struct ev_loop *base,
+		struct ev_loop *event_loop,
 		rspamd_worker_signal_handler handler,
 		void *handler_data)
 {
@@ -242,12 +252,12 @@ rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
 		sigh = g_malloc0 (sizeof (*sigh));
 		sigh->signo = signo;
 		sigh->worker = worker;
-		sigh->base = base;
+		sigh->event_loop = event_loop;
 		sigh->enabled = TRUE;
 
-		signal_set (&sigh->ev, signo, rspamd_worker_signal_handle, sigh);
-		event_base_set (base, &sigh->ev);
-		signal_add (&sigh->ev, NULL);
+		sigh->ev_sig.data = sigh;
+		ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo);
+		ev_signal_start (event_loop, &sigh->ev_sig);
 
 		g_hash_table_insert (worker->signal_events,
 				GINT_TO_POINTER (signo),
@@ -428,7 +438,7 @@ rspamd_controller_send_error (struct rspamd_http_connection_entry *entry,
 		NULL,
 		"application/json",
 		entry,
-		entry->rt->ptv);
+		entry->rt->timeout);
 	entry->is_reply = TRUE;
 }
 
@@ -460,7 +470,7 @@ rspamd_controller_send_string (struct rspamd_http_connection_entry *entry,
 		NULL,
 		"application/json",
 		entry,
-		entry->rt->ptv);
+		entry->rt->timeout);
 	entry->is_reply = TRUE;
 }
 
@@ -486,7 +496,7 @@ rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry,
 		NULL,
 		"application/json",
 		entry,
-		entry->rt->ptv);
+		entry->rt->timeout);
 	entry->is_reply = TRUE;
 }
 
@@ -676,9 +686,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
 #endif
 
 		/* Remove the inherited event base */
-		event_reinit (rspamd_main->event_loop);
-		event_base_free (rspamd_main->event_loop);
-
+		ev_loop_destroy (EV_DEFAULT);
+		rspamd_main->event_loop = NULL;
 		/* Drop privileges */
 		rspamd_worker_drop_priv (rspamd_main);
 		/* Set limits */
@@ -853,8 +862,7 @@ struct rspamd_worker_session_cache {
 	struct ev_loop *ev_base;
 	GHashTable *cache;
 	struct rspamd_config *cfg;
-	struct timeval tv;
-	struct event periodic;
+	struct ev_timer periodic;
 };
 
 static gint
@@ -868,9 +876,10 @@ rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb)
 }
 
 static void
-rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
+rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_worker_session_cache *c = p;
+	struct rspamd_worker_session_cache *c =
+			(struct rspamd_worker_session_cache *)w->data;
 	GHashTableIter it;
 	gchar timebuf[32];
 	gpointer k, v;
@@ -902,6 +911,8 @@ rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
 					timebuf);
 		}
 	}
+
+	ev_timer_again (EV_A_ w);
 }
 
 void *
@@ -916,11 +927,10 @@ rspamd_worker_session_cache_new (struct rspamd_worker *w,
 	c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal,
 			NULL, g_free);
 	c->cfg = w->srv->cfg;
-	double_to_tv (periodic_interval, &c->tv);
-	event_set (&c->periodic, -1, EV_TIMEOUT|EV_PERSIST,
-			rspamd_sessions_cache_periodic, c);
-	event_base_set (ev_base, &c->periodic);
-	event_add (&c->periodic, &c->tv);
+	c->periodic.data = c;
+	ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval,
+			periodic_interval);
+	ev_timer_start (ev_base, &c->periodic);
 
 	return c;
 }
@@ -1115,12 +1125,13 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
 }
 
 static void
-rspamd_enable_accept_event (gint fd, short what, gpointer d)
+rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents)
 {
-	struct event *events = d;
+	struct rspamd_worker_accept_event *ac_ev =
+			(struct rspamd_worker_accept_event *)w->data;
 
-	event_del (&events[1]);
-	event_add (&events[0], NULL);
+	ev_timer_stop (EV_A_ w);
+	ev_io_start (EV_A_ &ac_ev->accept_ev);
 }
 
 void
@@ -1128,17 +1139,15 @@ rspamd_worker_throttle_accept_events (gint sock, void *data)
 {
 	struct rspamd_worker_accept_event *head, *cur;
 	const gdouble throttling = 0.5;
-	struct ev_loop *ev_base;
 
 	head = (struct rspamd_worker_accept_event *)data;
 
 	DL_FOREACH (head, cur) {
 
-		ev_base = event_get_base (&events[0]);
-		event_del (&events[0]);
-		event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
-				events);
-		event_base_set (ev_base, &events[1]);
-		event_add (&events[1], &tv);
+		ev_io_stop (cur->event_loop, &cur->accept_ev);
+		cur->throttling_ev.data = cur;
+		ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event,
+				throttling, 0.0);
+		ev_timer_start (cur->event_loop, &cur->throttling_ev);
 	}
 }
\ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 67b54e5c9..9693aa6ad 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -56,7 +56,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
  */
 void rspamd_worker_set_signal_handler (int signo,
 		struct rspamd_worker *worker,
-		struct ev_loop *base,
+		struct ev_loop *event_loop,
 		rspamd_worker_signal_handler handler,
 		void *handler_data);
 
diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c
index 3ae30c440..2313db0b2 100644
--- a/src/libstat/learn_cache/redis_cache.c
+++ b/src/libstat/learn_cache/redis_cache.c
@@ -45,7 +45,7 @@ struct rspamd_redis_cache_runtime {
 	struct rspamd_redis_cache_ctx *ctx;
 	struct rspamd_task *task;
 	struct upstream *selected;
-	struct event timeout_event;
+	ev_timer timer_ev;
 	redisAsyncContext *redis;
 	gboolean has_event;
 };
@@ -92,9 +92,7 @@ rspamd_redis_cache_fin (gpointer data)
 	redisAsyncContext *redis;
 
 	rt->has_event = FALSE;
-	if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
-		event_del (&rt->timeout_event);
-	}
+	ev_timer_stop (rt->task->event_loop, &rt->timer_ev);
 
 	if (rt->redis) {
 		redis = rt->redis;
@@ -105,9 +103,10 @@ rspamd_redis_cache_fin (gpointer data)
 }
 
 static void
-rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
+rspamd_redis_cache_timeout (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_redis_cache_runtime *rt = d;
+	struct rspamd_redis_cache_runtime *rt =
+			(struct rspamd_redis_cache_runtime *)w->data;
 	struct rspamd_task *task;
 
 	task = rt->task;
@@ -117,7 +116,7 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
 	rspamd_upstream_fail (rt->selected, FALSE);
 
 	if (rt->has_event) {
-		rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
+		rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
 	}
 }
 
@@ -401,8 +400,9 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
 	redisLibevAttach (task->event_loop, rt->redis);
 
 	/* Now check stats */
-	event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
-	event_base_set (task->event_loop, &rt->timeout_event);
+	rt->timer_ev.data = rt;
+	ev_timer_init (&rt->timer_ev, rspamd_redis_cache_timeout,
+			rt->ctx->timeout, 0.0);
 	rspamd_redis_cache_maybe_auth (ctx, rt->redis);
 
 	if (!learn) {
@@ -418,7 +418,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
 		gpointer runtime)
 {
 	struct rspamd_redis_cache_runtime *rt = runtime;
-	struct timeval tv;
 	gchar *h;
 
 	if (rspamd_session_blocked (task->s)) {
@@ -431,8 +430,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
 		return RSPAMD_LEARN_INGORE;
 	}
 
-	double_to_tv (rt->ctx->timeout, &tv);
-
 	if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
 			"HGET %s %s",
 			rt->ctx->redis_object, h) == REDIS_OK) {
@@ -440,7 +437,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
 				rspamd_redis_cache_fin,
 				rt,
 				M);
-		event_add (&rt->timeout_event, &tv);
+		ev_timer_start (rt->task->event_loop, &rt->timer_ev);
 		rt->has_event = TRUE;
 	}
 
@@ -454,7 +451,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
 		gpointer runtime)
 {
 	struct rspamd_redis_cache_runtime *rt = runtime;
-	struct timeval tv;
 	gchar *h;
 	gint flag;
 
@@ -465,7 +461,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
 	h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
 	g_assert (h != NULL);
 
-	double_to_tv (rt->ctx->timeout, &tv);
 	flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
 
 	if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
@@ -473,7 +468,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
 			rt->ctx->redis_object, h, flag) == REDIS_OK) {
 		rspamd_session_add_event (task->s,
 				rspamd_redis_cache_fin, rt, M);
-		event_add (&rt->timeout_event, &tv);
+		ev_timer_start (rt->task->event_loop, &rt->timer_ev);
 		rt->has_event = TRUE;
 	}
 
diff --git a/src/rspamd.h b/src/rspamd.h
index 9048a26bd..e47271ca3 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -122,7 +122,7 @@ struct rspamd_worker_signal_handler {
 	gint signo;
 	gboolean enabled;
 	ev_signal ev_sig;
-	struct ev_loop *base;
+	struct ev_loop *event_loop;
 	struct rspamd_worker *worker;
 	struct rspamd_worker_signal_cb *cb;
 };


More information about the Commits mailing list