commit 3d97675: [Project] Add preliminary support of the heartbeats

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Sep 10 17:07:05 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-09-10 17:42:18 +0100
URL: https://github.com/rspamd/rspamd/commit/3d97675cf4361d30dd541eff5b6b13c57cf36b80

[Project] Add preliminary support of the heartbeats

---
 src/libserver/cfg_file.h    |  1 +
 src/libserver/cfg_rcl.c     |  6 +++
 src/libserver/cfg_rcl.h     |  1 +
 src/libserver/cfg_utils.c   |  1 +
 src/libserver/worker_util.c | 91 +++++++++++++++++++++++++++++++++++++++++++++
 src/rspamd.c                |  1 +
 src/rspamd.h                |  9 ++++-
 7 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 263d00f38..7186a73ec 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -380,6 +380,7 @@ struct rspamd_config {
 	gsize images_cache_size;                        /**< size of LRU cache for DCT data from images			*/
 	gdouble task_timeout;                           /**< maximum message processing time					*/
 	gint default_max_shots;                         /**< default maximum count of symbols hits permitted (-1 for unlimited) */
+	gdouble heartbeat_interval;                     /**< interval for heartbeats for workers				*/
 
 	enum rspamd_log_type log_type;                  /**< log type											*/
 	gint log_facility;                              /**< log facility in case of syslog						*/
diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c
index fb2cbf052..5a1d3a639 100644
--- a/src/libserver/cfg_rcl.c
+++ b/src/libserver/cfg_rcl.c
@@ -2182,6 +2182,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
 				G_STRUCT_OFFSET (struct rspamd_config, full_gc_iters),
 				RSPAMD_CL_FLAG_UINT,
 				"Task scanned before memory gc is performed (default: 0 - disabled)");
+		rspamd_rcl_add_default_handler (sub,
+				"heartbeat_interval",
+				rspamd_rcl_parse_struct_time,
+				G_STRUCT_OFFSET (struct rspamd_config, heartbeat_interval),
+				RSPAMD_CL_FLAG_TIME_FLOAT,
+				"Time between workers heartbeats");
 
 		/* Neighbours configuration */
 		rspamd_rcl_add_section_doc (&sub->subsections, "neighbours", "name",
diff --git a/src/libserver/cfg_rcl.h b/src/libserver/cfg_rcl.h
index 7f97b100d..1a2d69c58 100644
--- a/src/libserver/cfg_rcl.h
+++ b/src/libserver/cfg_rcl.h
@@ -413,6 +413,7 @@ ucl_object_t *rspamd_rcl_add_doc_by_path (struct rspamd_config *cfg,
 		const char *default_value,
 		gboolean required);
 
+
 /**
  * Parses example and adds documentation according to the example:
  *
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index 619e8784e..bd8595514 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -233,6 +233,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags)
 	cfg->max_sessions_cache = DEFAULT_MAX_SESSIONS;
 	cfg->maps_cache_dir = rspamd_mempool_strdup (cfg->cfg_pool, RSPAMD_DBDIR);
 	cfg->c_modules = g_ptr_array_new ();
+	cfg->heartbeat_interval = 10.0;
 
 	REF_INIT_RETAIN (cfg, rspamd_config_free);
 
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 511289a2d..47812ded1 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -63,6 +63,10 @@
 
 #include "contrib/libev/ev.h"
 
+/* Forward declaration */
+static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
+		struct ev_loop *);
+
 static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
 /**
  * Return worker's control structure by its type
@@ -362,6 +366,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
 
 	rspamd_worker_init_signals (worker, event_loop);
 	rspamd_control_worker_add_default_handler (worker, event_loop);
+	rspamd_worker_heartbeat_start (worker, event_loop);
 #ifdef WITH_HIREDIS
 	rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
 			worker->srv->cfg, event_loop);
@@ -683,6 +688,91 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
 	}
 }
 
+static void
+rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+	struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+}
+
+static void
+rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+	wrk->hb.heartbeat_ev.data = (void *)wrk;
+	ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb,
+			0.0, wrk->srv->cfg->heartbeat_interval);
+	ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
+static void
+rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+	struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+	gdouble time_from_last = ev_time ();
+	struct rspamd_main *rspamd_main;
+	struct tm tm;
+	gchar timebuf[64];
+	gchar usec_buf[16];
+	gint r;
+
+	time_from_last -= wrk->hb.last_event;
+	rspamd_main = wrk->srv;
+
+	if (time_from_last > 0 && time_from_last > rspamd_main->cfg->heartbeat_interval) {
+		rspamd_localtime (wrk->hb.last_event, &tm);
+		r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+		rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+				wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+		rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+				"%s", usec_buf + 1);
+
+		if (wrk->hb.nbeats > 0) {
+			/* First time lost event */
+			msg_warn_main ("lost heartbeat from worker type %s with pid %P, "
+				  "last beat on: %s (%L beats received previously)",
+					g_quark_to_string (wrk->type), wrk->pid,
+					timebuf,
+					wrk->hb.nbeats);
+			wrk->hb.nbeats = -1;
+			/* TODO: send notify about worker problem */
+		}
+		else {
+			wrk->hb.nbeats --;
+			msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, "
+						   "last beat on: %s",
+					-(wrk->hb.nbeats),
+					g_quark_to_string (wrk->type),
+					wrk->pid,
+					timebuf);
+		}
+	}
+	else if (wrk->hb.nbeats < 0) {
+		rspamd_localtime (wrk->hb.last_event, &tm);
+		r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+		rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+				wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+		rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+				"%s", usec_buf + 1);
+
+		msg_info_main ("received heartbeat from worker type %s with pid %P, "
+					   "last beat on: %s (%L beats lost previously)",
+				g_quark_to_string (wrk->type), wrk->pid,
+				timebuf,
+				-(wrk->hb.nbeats));
+		wrk->hb.nbeats = 1;
+		/* TODO: send notify about worker restoration */
+	}
+}
+
+static void
+rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+	wrk->hb.heartbeat_ev.data = (void *)wrk;
+	ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb,
+			0.0, wrk->srv->cfg->heartbeat_interval * 2);
+	ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
 struct rspamd_worker *
 rspamd_fork_worker (struct rspamd_main *rspamd_main,
 					struct rspamd_worker_conf *cf,
@@ -822,6 +912,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
 		wrk->cld_ev.data = wrk;
 		ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
 		ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
+		rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop);
 		/* Insert worker into worker's table, pid is index */
 		g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
 				wrk->pid), wrk);
diff --git a/src/rspamd.c b/src/rspamd.c
index 458d3d083..808ca6aaa 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -1008,6 +1008,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
 			g_free (wrk->tmp_data);
 		}
 		ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
+		ev_timer_stop (rspamd_main->event_loop, &wrk->hb.heartbeat_ev);
 	}
 
 	if (wrk->control_pipe[0] != -1) {
diff --git a/src/rspamd.h b/src/rspamd.h
index ea11965fb..3cd6c391b 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -76,6 +76,12 @@ struct rspamd_worker_accept_event {
 typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *,
 									   struct rspamd_worker *);
 
+struct rspamd_worker_heartbeat {
+	ev_timer heartbeat_ev;          /**< used by main for checking heartbeats and by workers to send heartbeats */
+	ev_tstamp last_event;
+	gint64 nbeats; /* positive for beats received, negative for beats missed */
+};
+
 /**
  * Worker process structure
  */
@@ -90,7 +96,7 @@ struct rspamd_worker {
 	struct rspamd_main *srv;        /**< pointer to server structure					*/
 	GQuark type;                    /**< process type									*/
 	GHashTable *signal_events;      /**< signal events									*/
-	struct rspamd_worker_accept_event *accept_events; /**< socket events									*/
+	struct rspamd_worker_accept_event *accept_events; /**< socket events				*/
 	struct rspamd_worker_conf *cf;  /**< worker config data								*/
 	gpointer ctx;                   /**< worker's specific data							*/
 	enum rspamd_worker_flags flags; /**< worker's flags									*/
@@ -99,6 +105,7 @@ struct rspamd_worker {
 	gint srv_pipe[2];               /**< used by workers to request something from the
 	                                     main process. [0] - main, [1] - worker			*/
 	ev_io srv_ev;                   /**< used by main for read workers' requests		*/
+	struct rspamd_worker_heartbeat hb; /**< heartbeat data */
 	gpointer control_data;          /**< used by control protocol to handle commands	*/
 	gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
 	GPtrArray *finish_actions;      /**< called when worker is terminated				*/


More information about the Commits mailing list