commit eafdd22: [Minor] Try to fix more issues

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:37 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-20 15:07:58 +0100
URL: https://github.com/rspamd/rspamd/commit/eafdd221037b41ddcba0add79fd8efccecaf0775

[Minor] Try to fix more issues

---
 src/libserver/rspamd_control.c |  4 +-
 src/libserver/worker_util.c    |  2 +
 src/libutil/map.c              | 12 +++++-
 src/rspamd.c                   | 91 ++++++++++++++++++++++++++++++++++--------
 4 files changed, 89 insertions(+), 20 deletions(-)

diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 62ca24643..1d161f6bc 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -727,7 +727,7 @@ rspamd_control_hs_io_handler (int fd, short what, void *ud)
 
 	/* At this point we just ignore replies from the workers */
 	(void)read (fd, &rep, sizeof (rep));
-	rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
+	rspamd_ev_watcher_stop (elt->wrk->srv->event_loop, &elt->ev);
 	g_free (elt);
 }
 
@@ -740,7 +740,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
 
 	/* At this point we just ignore replies from the workers */
 	(void) read (fd, &rep, sizeof (rep));
-	rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
+	rspamd_ev_watcher_stop (elt->wrk->srv->event_loop, &elt->ev);
 	g_free (elt);
 }
 
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 70d349c2c..f7b4ee9ab 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -321,6 +321,8 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
 
 	event_loop = ev_default_loop (EVFLAG_SIGNALFD);
 
+	worker->srv->event_loop = event_loop;
+
 	rspamd_worker_init_signals (worker, event_loop);
 	rspamd_control_worker_add_default_handler (worker, event_loop);
 #ifdef WITH_HIREDIS
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 3ca94806f..eadf0279c 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -274,7 +274,12 @@ free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new)
 
 
 	MAP_RELEASE (cbd->bk, "rspamd_map_backend");
-	MAP_RELEASE (periodic, "periodic");
+
+	if (periodic) {
+		/* Detached in case of HTTP error */
+		MAP_RELEASE (periodic, "periodic");
+	}
+
 	g_free (cbd);
 }
 
@@ -325,7 +330,11 @@ http_map_error (struct rspamd_http_connection *conn,
 			cbd->bk->uri,
 			cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
 			err);
+	MAP_RETAIN (cbd->periodic, "periodic");
 	rspamd_map_process_periodic (cbd->periodic);
+	MAP_RELEASE (cbd->periodic, "periodic");
+	/* Detach periodic as rspamd_map_process_periodic will destroy it */
+	cbd->periodic = NULL;
 	MAP_RELEASE (cbd, "http_callback_data");
 }
 
@@ -2236,6 +2245,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
 	switch (bk->protocol) {
 	case MAP_PROTO_FILE:
 		if (bk->data.fd) {
+			ev_stat_stop (ev_default_loop (0), &bk->data.fd->st_ev);
 			g_free (bk->data.fd->filename);
 			g_free (bk->data.fd);
 		}
diff --git a/src/rspamd.c b/src/rspamd.c
index 765b4bd2b..d07961757 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -719,6 +719,7 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
 	if (!w->wanna_die) {
 		w->wanna_die = TRUE;
 		kill (w->pid, SIGUSR2);
+		ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
 		msg_info_main ("send signal to worker %P", w->pid);
 	}
 	else {
@@ -727,9 +728,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
 }
 
 static gboolean
-wait_for_workers (gpointer key, gpointer value, gpointer unused)
+rspamd_worker_wait (struct rspamd_worker *w)
 {
-	struct rspamd_worker *w = value;
 	struct rspamd_main *rspamd_main;
 	gint res = 0;
 	gboolean nowait = FALSE;
@@ -756,7 +756,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
 				if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
 					if (term_attempts % 10 == 0) {
 						msg_info_main ("waiting for worker %s(%P) to sync, "
-								"%d seconds remain",
+									   "%d seconds remain",
 								g_quark_to_string (w->type), w->pid,
 								(TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
 						kill (w->pid, SIGTERM);
@@ -768,7 +768,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
 				}
 				else {
 					msg_err_main ("data corruption warning: terminating "
-							"special worker %s(%P) with SIGKILL",
+								  "special worker %s(%P) with SIGKILL",
 							g_quark_to_string (w->type), w->pid);
 					kill (w->pid, SIGKILL);
 					if (nowait && errno == ESRCH) {
@@ -798,7 +798,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
 	msg_info_main ("%s process %P terminated %s",
 			g_quark_to_string (w->type), w->pid,
 			nowait ? "with no result available" :
-					(WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
+			(WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
 	if (w->srv_pipe[0] != -1) {
 		/* Ugly workaround */
 		if (w->tmp_data) {
@@ -817,6 +817,25 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
 	return TRUE;
 }
 
+static gboolean
+hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
+{
+	return rspamd_worker_wait ((struct rspamd_worker *)value);
+}
+
+static void
+rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents)
+{
+	struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+	g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback,
+			NULL);
+
+	if (g_hash_table_size (rspamd_main->workers) == 0) {
+		ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+	}
+}
+
+
 struct core_check_cbdata {
 	struct rspamd_config *cfg;
 	gsize total_count;
@@ -983,6 +1002,15 @@ do_encrypt_password (void)
 	rspamd_fprintf (stderr, "use rspamadm pw for this operation\n");
 }
 
+static void
+stop_srv_ev (gpointer key, gpointer value, gpointer ud)
+{
+	struct rspamd_worker *cur = (struct rspamd_worker *)value;
+	struct rspamd_main *rspamd_main = (struct rspamd_main *)ud;
+
+	ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
+}
+
 /* Signal handlers */
 static void
 rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
@@ -991,6 +1019,8 @@ rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
 
 	msg_info_main ("catch termination signal, waiting for children");
 	rspamd_log_nolock (rspamd_main->logger);
+	/* Stop srv events to avoid false notifications */
+	g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
 	rspamd_pass_signal (rspamd_main->workers, w->signum);
 
 	ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1033,22 +1063,41 @@ rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
 	guint i;
 	gint res = 0;
 	struct rspamd_worker *cur;
-	pid_t wrk;
-	gboolean need_refork = TRUE;
+	pid_t pid;
+	gboolean need_refork = TRUE, found_proc = FALSE;
 
 	/* Turn off locking for logger */
 	rspamd_log_nolock (rspamd_main->logger);
 
-	msg_info_main ("catch SIGCHLD signal, finding terminated workers");
+	msg_info_main ("got SIGCHLD signal, finding terminated workers");
 	/* Remove dead child form children list */
-	while ((wrk = waitpid (0, &res, WNOHANG)) > 0) {
+	for (;;) {
+		pid = waitpid (0, &res, WNOHANG);
+
+		if (pid == -1) {
+			if (errno != EINTR) {
+				msg_warn_main ("got unexpected system error when waiting: %s",
+						strerror (errno));
+				break;
+			}
+			else {
+				continue;
+			}
+		}
+		else if (pid == 0) {
+			/* No more processes to wait */
+			break;
+		}
+
+		found_proc = TRUE;
+
 		if ((cur =
 				g_hash_table_lookup (rspamd_main->workers,
-						GSIZE_TO_POINTER (wrk))) != NULL) {
+						GSIZE_TO_POINTER (pid))) != NULL) {
 			/* Unlink dead process from queue and hash table */
 
 			g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
-					wrk));
+					pid));
 
 			if (cur->wanna_die) {
 				/* Do not refork workers that are intended to be terminated */
@@ -1155,14 +1204,19 @@ rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
 		}
 		else {
 			for (i = 0; i < other_workers->len; i++) {
-				if (g_array_index (other_workers, pid_t, i) == wrk) {
+				if (g_array_index (other_workers, pid_t, i) == pid) {
 					g_array_remove_index_fast (other_workers, i);
-					msg_info_main ("related process %P terminated", wrk);
+					msg_info_main ("related process %P terminated", pid);
 				}
 			}
 		}
 	}
 
+	if (!found_proc) {
+		msg_err_main ("got SIGCHLD but no workers were able to be waited: %s",
+				strerror (errno));
+	}
+
 	rspamd_log_lock (rspamd_main->logger);
 }
 
@@ -1173,7 +1227,7 @@ rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
 
 	term_attempts--;
 
-	g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+	g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
 
 	if (g_hash_table_size (rspamd_main->workers) == 0) {
 		ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1245,8 +1299,8 @@ main (gint argc, gchar **argv, gchar **env)
 	GQuark type;
 	rspamd_inet_addr_t *control_addr = NULL;
 	struct ev_loop *event_loop;
-	ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
-	ev_io control_ev;
+	static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
+	static ev_io control_ev;
 	struct rspamd_main *rspamd_main;
 	gboolean skip_pid = FALSE, valgrind_mode = FALSE;
 
@@ -1568,7 +1622,10 @@ main (gint argc, gchar **argv, gchar **env)
 
 
 	/* Wait for workers termination */
-	g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+	ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD);
+	ev_signal_start (event_loop, &cld_ev);
+
+	g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
 
 	static ev_timer ev_finale;
 


More information about the Commits mailing list