commit 296c0cd: [Rework] Rework final scripts logic

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Nov 7 14:49:06 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-11-07 14:31:08 +0000
URL: https://github.com/rspamd/rspamd/commit/296c0cd69431ffc59b12adff6b3d58c9069f81ff

[Rework] Rework final scripts logic

---
 src/fuzzy_storage.c         |   2 +-
 src/libserver/worker_util.c | 132 +++++++++++++++++++++++++++++++-------------
 src/libserver/worker_util.h |   7 +++
 src/libutil/map.c           |   6 +-
 src/rspamd.c                |   8 +--
 src/rspamd.h                |  11 +++-
 src/worker.c                |  80 +++++----------------------
 7 files changed, 128 insertions(+), 118 deletions(-)

diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index f7aec3e27..f8b18b78a 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -481,7 +481,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
 		}
 	}
 
-	if (ctx->worker->wanna_die) {
+	if (ctx->worker->state != rspamd_worker_state_running) {
 		/* Plan exit */
 		ev_break (ctx->event_loop, EVBREAK_ALL);
 	}
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index ddf74136d..63342a72a 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -113,47 +113,102 @@ rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents)
 	}
 }
 
+static gboolean
+rspamd_worker_finalize (gpointer user_data)
+{
+	struct rspamd_task *task = user_data;
+
+	if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
+		msg_info_task ("finishing actions has been processed, terminating");
+		/* ev_break (task->event_loop, EVBREAK_ALL); */
+		task->worker->state = rspamd_worker_wanna_die;
+		rspamd_session_destroy (task->s);
+
+		return TRUE;
+	}
+
+	return FALSE;
+}
+
+gboolean
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+	struct rspamd_task *task;
+	struct rspamd_config *cfg = worker->srv->cfg;
+	struct rspamd_abstract_worker_ctx *ctx;
+	struct rspamd_config_cfg_lua_script *sc;
+
+	if (cfg->on_term_scripts) {
+		ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
+		/* Create a fake task object for async events */
+		task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+		task->resolver = ctx->resolver;
+		task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
+		task->s = rspamd_session_create (task->task_pool,
+				rspamd_worker_finalize,
+				NULL,
+				(event_finalizer_t) rspamd_task_free,
+				task);
+
+		DL_FOREACH (cfg->on_term_scripts, sc) {
+			lua_call_finish_script (sc, task);
+		}
+
+		task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
+
+		if (rspamd_session_pending (task->s)) {
+			return TRUE;
+		}
+	}
+
+	return FALSE;
+}
+
 static void
 rspamd_worker_terminate_handlers (struct rspamd_worker *w)
 {
-	guint i;
-	gboolean (*cb)(struct rspamd_worker *);
 	struct rspamd_abstract_worker_ctx *actx;
-	struct ev_loop *final_gift, *orig_loop;
-	static ev_timer margin_call;
-	static int nchecks = 0;
-
-	if (w->finish_actions->len == 0) {
-		/* Nothing to do */
-		return;
-	}
 
 	actx = (struct rspamd_abstract_worker_ctx *)w->ctx;
 
-	/*
-	 * Here are dragons:
-	 * - we create a new loop
-	 * - we set a new ev_loop for worker via injection over rspamd_abstract_worker_ctx
-	 * - then we run finish actions
-	 * - then we create a special timer to kill worker if it fails to finish
-	 */
-	final_gift = ev_loop_new (EVBACKEND_ALL);
-	orig_loop = actx->event_loop;
-	actx->event_loop = final_gift;
-	margin_call.data = &nchecks;
-	ev_timer_init (&margin_call, rspamd_worker_check_finished, 0.1,
-			0.1);
-	ev_timer_start (final_gift, &margin_call);
-
-	for (i = 0; i < w->finish_actions->len; i ++) {
-		cb = g_ptr_array_index (w->finish_actions, i);
-		cb (w);
+	if (w->nconns == 0 &&
+		(!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) {
+		/*
+		 * We are here either:
+		 * - No active connections are represented
+		 * - No term scripts are registered
+		 * - Worker is not a scanner, so it can die safely
+		 */
+		ev_break (actx->event_loop, EVBREAK_ALL);
+		w->state = rspamd_worker_wanna_die;
+
+		return;
+	}
+	else if (w->nconns > 0) {
+		/*
+		 * Wait until all connections are terminated
+		 */
+		w->state = rspamd_worker_wait_connections;
 	}
+	else {
+		/*
+		 * Start finish scripts
+		 */
+		w->state = rspamd_worker_wait_final_scripts;
+		msg_info ("performing finishing actions");
 
-	ev_run (final_gift, 0);
-	ev_loop_destroy (final_gift);
-	/* Restore original loop */
-	actx->event_loop = orig_loop;
+		if ((w->flags & RSPAMD_WORKER_SCANNER) &&
+			rspamd_worker_call_finish_handlers (w)) {
+			w->state = rspamd_worker_wait_final_scripts;
+		}
+		else {
+			/*
+			 * We are done now
+			 */
+			ev_break (actx->event_loop, EVBREAK_ALL);
+			w->state = rspamd_worker_wanna_die;
+		}
+	}
 }
 
 static void
@@ -172,7 +227,7 @@ static gboolean
 rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
 {
 	/* Do not accept new connections, preparing to end worker's process */
-	if (!sigh->worker->wanna_die) {
+	if (sigh->worker->state == rspamd_worker_state_running) {
 		static ev_timer shutdown_ev;
 		ev_tstamp shutdown_ts;
 
@@ -180,8 +235,8 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 				sigh->worker->srv->cfg->task_timeout * 2.0);
 
 		rspamd_worker_ignore_signal (sigh);
-		sigh->worker->wanna_die = TRUE;
-		rspamd_worker_terminate_handlers (sigh->worker);
+		sigh->worker->state = rspamd_worker_state_terminating;
+
 		rspamd_default_log_function (G_LOG_LEVEL_INFO,
 				sigh->worker->srv->server_pool->tag.tagname,
 				sigh->worker->srv->server_pool->tag.uid,
@@ -216,10 +271,11 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 static gboolean
 rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
 {
-	if (!sigh->worker->wanna_die) {
+	if (sigh->worker->state == rspamd_worker_state_running) {
 		static ev_timer shutdown_ev;
 
 		rspamd_worker_ignore_signal (sigh);
+		sigh->worker->state = rspamd_worker_state_terminating;
 		rspamd_default_log_function (G_LOG_LEVEL_INFO,
 				sigh->worker->srv->server_pool->tag.tagname,
 				sigh->worker->srv->server_pool->tag.uid,
@@ -228,7 +284,6 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 				g_strsignal (sigh->signo));
 
 		rspamd_worker_terminate_handlers (sigh->worker);
-		sigh->worker->wanna_die = 1;
 		ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
 				0.0, 0.0);
 		ev_timer_start (sigh->event_loop, &shutdown_ev);
@@ -862,7 +917,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
 	REF_RETAIN (cf);
 	wrk->index = index;
 	wrk->ctx = cf->ctx;
-	wrk->finish_actions = g_ptr_array_new ();
 	wrk->ppid = getpid ();
 	wrk->pid = fork ();
 	wrk->cores_throttled = rspamd_main->cores_throttling;
@@ -1377,7 +1431,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
 {
 	gboolean need_refork = TRUE;
 
-	if (wrk->wanna_die || rspamd_main->wanna_die) {
+	if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) {
 		/* Do not refork workers that are intended to be terminated */
 		need_refork = FALSE;
 	}
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 6fbda0b4a..15d79df2f 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -237,6 +237,13 @@ void rspamd_worker_throttle_accept_events (gint sock, void *data);
 gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
 										  struct rspamd_worker *wrk, int status);
 
+/**
+ * Call for final scripts for a worker
+ * @param worker
+ * @return
+ */
+gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
+
 #ifdef WITH_HYPERSCAN
 struct rspamd_control_command;
 
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 441b408ba..8da3c7bd4 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -970,7 +970,7 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
 		g_atomic_int_set (periodic->map->locked, 0);
 		msg_debug_map ("unlocked map %s", periodic->map->name);
 
-		if (!periodic->map->wrk->wanna_die) {
+		if (periodic->map->wrk->state == rspamd_worker_state_running) {
 			rspamd_map_schedule_periodic (periodic->map,
 					RSPAMD_SYMBOL_RESULT_NORMAL);
 		}
@@ -1001,7 +1001,7 @@ rspamd_map_schedule_periodic (struct rspamd_map *map, int how)
 	gdouble timeout;
 	struct map_periodic_cbdata *cbd;
 
-	if (map->scheduled_check || (map->wrk && map->wrk->wanna_die)) {
+	if (map->scheduled_check || (map->wrk && map->wrk->state == rspamd_worker_state_running)) {
 		/* Do not schedule check if some check is already scheduled */
 		return;
 	}
@@ -1897,7 +1897,7 @@ rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
 		return;
 	}
 
-	if (!(cbd->map->wrk && cbd->map->wrk->wanna_die)) {
+	if (!(cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running)) {
 		bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend);
 		g_assert (bk != NULL);
 
diff --git a/src/rspamd.c b/src/rspamd.c
index 495da45d9..4099e5003 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -691,8 +691,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
 
 	rspamd_main = w->srv;
 
-	if (!w->wanna_die) {
-		w->wanna_die = TRUE;
+	if (w->state == rspamd_worker_state_running) {
+		w->state = rspamd_worker_state_terminating;
 		kill (w->pid, SIGUSR2);
 		ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
 		msg_info_main ("send signal to worker %P", w->pid);
@@ -1095,10 +1095,6 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
 		rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);
 	}
 
-	if (wrk->finish_actions) {
-		g_ptr_array_free (wrk->finish_actions, TRUE);
-	}
-
 	need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
 
 	if (need_refork) {
diff --git a/src/rspamd.h b/src/rspamd.h
index d32681359..773be7c56 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -82,6 +82,14 @@ struct rspamd_worker_heartbeat {
 	gint64 nbeats;                  /**< positive for beats received, negative for beats missed */
 };
 
+enum rspamd_worker_state {
+	rspamd_worker_state_running = 0,
+	rspamd_worker_state_terminating,
+	rspamd_worker_wait_connections,
+	rspamd_worker_wait_final_scripts,
+	rspamd_worker_wanna_die
+};
+
 /**
  * Worker process structure
  */
@@ -90,7 +98,7 @@ struct rspamd_worker {
 	pid_t ppid;                     /**< pid of parent									*/
 	guint index;                    /**< index number									*/
 	guint nconns;                   /**< current connections count						*/
-	gboolean wanna_die;             /**< worker is terminating							*/
+	enum rspamd_worker_state state; /**< current worker state							*/
 	gboolean cores_throttled;       /**< set to true if cores throttling took place		*/
 	gdouble start_time;             /**< start time										*/
 	struct rspamd_main *srv;        /**< pointer to server structure					*/
@@ -108,7 +116,6 @@ struct rspamd_worker {
 	struct rspamd_worker_heartbeat hb; /**< heartbeat data */
 	gpointer control_data;          /**< used by control protocol to handle commands	*/
 	gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
-	GPtrArray *finish_actions;      /**< called when worker is terminated				*/
 	ev_child cld_ev;                /**< to allow reaping								*/
 	rspamd_worker_term_cb term_handler; /**< custom term handler						*/
 };
diff --git a/src/worker.c b/src/worker.c
index 04447feea..81ec0904a 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -69,56 +69,6 @@ worker_t normal_worker = {
         G_STRFUNC, \
         __VA_ARGS__)
 
-static gboolean
-rspamd_worker_finalize (gpointer user_data)
-{
-	struct rspamd_task *task = user_data;
-
-	if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
-		msg_info_task ("finishing actions has been processed, terminating");
-		/* ev_break (task->event_loop, EVBREAK_ALL); */
-		rspamd_session_destroy (task->s);
-
-		return TRUE;
-	}
-
-	return FALSE;
-}
-
-static gboolean
-rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
-{
-	struct rspamd_task *task;
-	struct rspamd_config *cfg = worker->srv->cfg;
-	struct rspamd_abstract_worker_ctx *ctx;
-	struct rspamd_config_cfg_lua_script *sc;
-
-	if (cfg->on_term_scripts) {
-		ctx = worker->ctx;
-		/* Create a fake task object for async events */
-		task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
-		task->resolver = ctx->resolver;
-		task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
-		task->s = rspamd_session_create (task->task_pool,
-				rspamd_worker_finalize,
-				NULL,
-				(event_finalizer_t) rspamd_task_free,
-				task);
-
-		DL_FOREACH (cfg->on_term_scripts, sc) {
-			lua_call_finish_script (sc, task);
-		}
-
-		task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
-
-		if (rspamd_session_pending (task->s)) {
-			return TRUE;
-		}
-	}
-
-	return FALSE;
-}
-
 /*
  * Reduce number of tasks proceeded
  */
@@ -129,9 +79,20 @@ reduce_tasks_count (gpointer arg)
 
 	worker->nconns --;
 
-	if (worker->wanna_die && worker->nconns == 0) {
+	if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
+
+		worker->state = rspamd_worker_wait_final_scripts;
 		msg_info ("performing finishing actions");
-		rspamd_worker_call_finish_handlers (worker);
+
+		if (rspamd_worker_call_finish_handlers (worker)) {
+			worker->state = rspamd_worker_wait_final_scripts;
+		}
+		else {
+			worker->state = rspamd_worker_wanna_die;
+		}
+	}
+	else if (worker->state != rspamd_worker_state_running) {
+		worker->state = rspamd_worker_wait_connections;
 	}
 }
 
@@ -596,19 +557,6 @@ init_worker (struct rspamd_config *cfg)
 	return ctx;
 }
 
-static gboolean
-rspamd_worker_on_terminate (struct rspamd_worker *worker)
-{
-	if (worker->nconns == 0) {
-		msg_info ("performing finishing actions");
-		if (rspamd_worker_call_finish_handlers (worker)) {
-			return TRUE;
-		}
-	}
-
-	return FALSE;
-}
-
 void
 rspamd_worker_init_scanner (struct rspamd_worker *worker,
 		struct ev_loop *ev_base,
@@ -616,8 +564,6 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
 		struct rspamd_lang_detector **plang_det)
 {
 	rspamd_stat_init (worker->srv->cfg, ev_base);
-	g_ptr_array_add (worker->finish_actions,
-			(gpointer) rspamd_worker_on_terminate);
 #ifdef WITH_HYPERSCAN
 	rspamd_control_worker_add_cmd_handler (worker,
 			RSPAMD_CONTROL_HYPERSCAN_LOADED,


More information about the Commits mailing list