commit 856a786: [Fix] Try hard to deal with ghost workers

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Mar 26 17:42:13 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-03-26 17:36:53 +0000
URL: https://github.com/rspamd/rspamd/commit/856a7864628d8bc4e433ed65f5e0ec4e1d35b725 (HEAD -> master)

[Fix] Try hard to deal with ghost workers

---
 src/libserver/rspamd_control.c | 37 ++++++++++++++++++++++++++++++-------
 src/libserver/rspamd_control.h |  6 ++++++
 src/libserver/worker_util.c    |  3 +++
 src/rspamd.c                   |  1 +
 src/rspamd.h                   |  1 +
 5 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 4b2cfc733..30d959e47 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -40,6 +40,7 @@ struct rspamd_control_reply_elt {
 	pid_t wrk_pid;
 	gpointer ud;
 	gint attached_fd;
+	GHashTable *pending_elts;
 	struct rspamd_control_reply_elt *prev, *next;
 };
 
@@ -105,6 +106,17 @@ static const struct rspamd_control_cmd_match {
 
 static void rspamd_control_ignore_io_handler (int fd, short what, void *ud);
 
+static void
+rspamd_control_stop_pending (struct rspamd_control_reply_elt *elt)
+{
+	GHashTable *htb;
+	/* It stops event and frees hash */
+	htb = elt->pending_elts;
+	g_hash_table_remove (elt->pending_elts, elt);
+	/* Release hash reference */
+	g_hash_table_unref (htb);
+}
+
 void
 rspamd_control_send_error (struct rspamd_control_session *session,
 		gint code, const gchar *error_msg, ...)
@@ -168,9 +180,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
 			rspamd_inet_address_to_string (session->addr));
 
 	DL_FOREACH_SAFE (session->replies, elt, telt) {
-		rspamd_ev_watcher_stop (session->event_loop,
-				&elt->ev);
-		g_free (elt);
+		rspamd_control_stop_pending (elt);
 	}
 
 	rspamd_inet_address_free (session->addr);
@@ -385,6 +395,15 @@ rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
 	}
 }
 
+void
+rspamd_pending_control_free (gpointer p)
+{
+	struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *)p;
+
+	rspamd_ev_watcher_stop (rep_elt->event_loop, &rep_elt->ev);
+	g_free (rep_elt);
+}
+
 static struct rspamd_control_reply_elt *
 rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
 							  struct rspamd_control_command *cmd,
@@ -443,12 +462,14 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main,
 			rep_elt->wrk_type = wrk->type;
 			rep_elt->event_loop = rspamd_main->event_loop;
 			rep_elt->ud = ud;
+			rep_elt->pending_elts = g_hash_table_ref (wrk->control_events_pending);
 			rspamd_ev_watcher_init (&rep_elt->ev,
 					wrk->control_pipe[0],
 					EV_READ, handler,
 					rep_elt);
 			rspamd_ev_watcher_start (rspamd_main->event_loop,
 					&rep_elt->ev, worker_io_timeout);
+			g_hash_table_insert (wrk->control_events_pending, rep_elt, rep_elt);
 
 			DL_APPEND (res, rep_elt);
 		}
@@ -750,12 +771,12 @@ rspamd_control_ignore_io_handler (int fd, short what, void *ud)
 {
 	struct rspamd_control_reply_elt *elt =
 			(struct rspamd_control_reply_elt *)ud;
+
 	struct rspamd_control_reply rep;
 
 	/* At this point we just ignore replies from the workers */
 	(void)read (fd, &rep, sizeof (rep));
-	rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
-	g_free (elt);
+	rspamd_control_stop_pending (elt);
 }
 
 static void
@@ -767,8 +788,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
 
 	/* At this point we just ignore replies from the workers */
 	(void) read (fd, &rep, sizeof (rep));
-	rspamd_ev_watcher_stop (elt->event_loop, &elt->ev);
-	g_free (elt);
+	rspamd_control_stop_pending (elt);
 }
 
 static void
@@ -802,6 +822,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
 		REF_RELEASE (child->cf);
 		g_hash_table_remove (srv->workers,
 				GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid));
+		g_hash_table_unref (child->control_events_pending);
 		g_free (child);
 	}
 	else {
@@ -816,6 +837,8 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
 		child->cf = parent->cf;
 		child->ppid = parent->pid;
 		REF_RETAIN (child->cf);
+		child->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+				NULL, rspamd_pending_control_free);
 		g_hash_table_insert (srv->workers,
 				GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
 	}
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index d1ce88f31..21ab1a663 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -274,6 +274,12 @@ enum rspamd_control_type rspamd_control_command_from_string (const gchar *str);
  */
 const gchar *rspamd_control_command_to_string (enum rspamd_control_type cmd);
 
+/**
+ * Used to cleanup pending events
+ * @param p
+ */
+void rspamd_pending_control_free (gpointer p);
+
 #ifdef  __cplusplus
 }
 #endif
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index d97190f2b..5a2234f29 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -466,6 +466,7 @@ rspamd_worker_init_signals (struct rspamd_worker *worker,
 			rspamd_worker_usr2_handler, NULL);
 }
 
+
 struct ev_loop *
 rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
 					   rspamd_accept_handler hdl)
@@ -979,6 +980,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
 	wrk->pid = fork ();
 	wrk->cores_throttled = rspamd_main->cores_throttling;
 	wrk->term_handler = term_handler;
+	wrk->control_events_pending = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+			NULL, rspamd_pending_control_free);
 
 	switch (wrk->pid) {
 	case 0:
diff --git a/src/rspamd.c b/src/rspamd.c
index dee990c41..fb3b93e36 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -1181,6 +1181,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
 	}
 
 	REF_RELEASE (wrk->cf);
+	g_hash_table_unref (wrk->control_events_pending);
 	g_free (wrk);
 }
 
diff --git a/src/rspamd.h b/src/rspamd.h
index 8885480c2..9e50c054a 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -121,6 +121,7 @@ struct rspamd_worker {
 	gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
 	ev_child cld_ev;                /**< to allow reaping								*/
 	rspamd_worker_term_cb term_handler; /**< custom term handler						*/
+	GHashTable *control_events_pending; /**< control events pending indexed by ptr		*/
 };
 
 struct rspamd_abstract_worker_ctx {


More information about the Commits mailing list