commit 884a962: [Project] More work towards heartbeating logic implementation

Vsevolod Stakhov vsevolod at highsecure.ru
Sun Sep 22 08:56:07 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-09-22 09:38:47 +0100
URL: https://github.com/rspamd/rspamd/commit/884a962a1ef82431b60ce66bc83416898db4ad24

[Project] More work towards heartbeating logic implementation

---
 src/libserver/cfg_rcl.c     |  2 +-
 src/libserver/worker_util.c | 33 ++++++++++++++++++++++-----------
 src/rspamd.c                |  8 ++++++++
 src/rspamd.h                |  4 ++--
 4 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c
index 11c378d5d..86d674c08 100644
--- a/src/libserver/cfg_rcl.c
+++ b/src/libserver/cfg_rcl.c
@@ -2191,7 +2191,7 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
 		rspamd_rcl_add_default_handler (sub,
 				"heartbeats_loss_max",
 				rspamd_rcl_parse_struct_integer,
-				G_STRUCT_OFFSET (struct rspamd_config, heartbeat_interval),
+				G_STRUCT_OFFSET (struct rspamd_config, heartbeats_loss_max),
 				RSPAMD_CL_FLAG_INT_32,
 				"Maximum count of heartbeats to be lost before trying to "
 				"terminate a worker (default: 0 - disabled)");
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 883e7e8a9..163b0a509 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -761,23 +761,23 @@ rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
 				-(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) {
 
 
-				if (-(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max + 1) {
-					msg_err_main ("terminate worker type %s with pid %P, "
-								  "last beat on: %s; %L heartbeat loast",
+				if (-(wrk->hb.nbeats) > rspamd_main->cfg->heartbeats_loss_max + 1) {
+					msg_err_main ("force kill worker type %s with pid %P, "
+								  "last beat on: %s; %L heartbeat lost",
 							g_quark_to_string (wrk->type),
 							wrk->pid,
 							timebuf,
 							-(wrk->hb.nbeats));
-					kill (wrk->pid, SIGTERM);
+					kill (wrk->pid, SIGKILL);
 				}
 				else {
-					msg_err_main ("force kill worker type %s with pid %P, "
-								  "last beat on: %s; %L heartbeat loast",
+					msg_err_main ("terminate worker type %s with pid %P, "
+								  "last beat on: %s; %L heartbeat lost",
 							g_quark_to_string (wrk->type),
 							wrk->pid,
 							timebuf,
 							-(wrk->hb.nbeats));
-					kill (wrk->pid, SIGKILL);
+					kill (wrk->pid, SIGTERM);
 				}
 
 			}
@@ -1363,10 +1363,21 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
 
 	if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
 		/* Normal worker termination, do not fork one more */
-		msg_info_main ("%s process %P terminated normally",
-				g_quark_to_string (wrk->type),
-				wrk->pid);
-		need_refork = FALSE;
+
+		if (wrk->hb.nbeats < 0) {
+			msg_info_main ("%s process %P terminated normally, but lost %L "
+				  "heartbeats, refork it",
+					g_quark_to_string (wrk->type),
+					wrk->pid,
+					-(wrk->hb.nbeats));
+			need_refork = TRUE;
+		}
+		else {
+			msg_info_main ("%s process %P terminated normally",
+					g_quark_to_string (wrk->type),
+					wrk->pid);
+			need_refork = FALSE;
+		}
 	}
 	else {
 		if (WIFSIGNALED (res)) {
diff --git a/src/rspamd.c b/src/rspamd.c
index d0dd7ff10..08f91674b 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -59,6 +59,7 @@
 #ifdef HAVE_OPENSSL
 #include <openssl/err.h>
 #include <openssl/evp.h>
+#include <src/libserver/rspamd_control.h>
 
 #endif
 
@@ -1030,6 +1031,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
 					struct rspamd_worker *wrk)
 {
 	gboolean need_refork;
+	static struct rspamd_control_command cmd;
 
 	/* Turn off locking for logger */
 	ev_child_stop (EV_A_ w);
@@ -1052,6 +1054,12 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
 		close (wrk->srv_pipe[0]);
 	}
 
+	cmd.type = RSPAMD_CONTROL_CHILD_CHANGE;
+	cmd.cmd.child_change.what = rspamd_child_terminated;
+	cmd.cmd.child_change.pid = wrk->pid;
+	cmd.cmd.child_change.additional = w->rstatus;
+	rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);
+
 	REF_RELEASE (wrk->cf);
 
 	if (wrk->finish_actions) {
diff --git a/src/rspamd.h b/src/rspamd.h
index 3cd6c391b..d32681359 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -78,8 +78,8 @@ typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *,
 
 struct rspamd_worker_heartbeat {
 	ev_timer heartbeat_ev;          /**< used by main for checking heartbeats and by workers to send heartbeats */
-	ev_tstamp last_event;
-	gint64 nbeats; /* positive for beats received, negative for beats missed */
+	ev_tstamp last_event;           /**< last heartbeat received timestamp */
+	gint64 nbeats;                  /**< positive for beats received, negative for beats missed */
 };
 
 /**


More information about the Commits mailing list