commit 3d97675: [Project] Add preliminary support of the heartbeats
Vsevolod Stakhov
vsevolod at highsecure.ru
Tue Sep 10 17:07:05 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-09-10 17:42:18 +0100
URL: https://github.com/rspamd/rspamd/commit/3d97675cf4361d30dd541eff5b6b13c57cf36b80
[Project] Add preliminary support of the heartbeats
---
src/libserver/cfg_file.h | 1 +
src/libserver/cfg_rcl.c | 6 +++
src/libserver/cfg_rcl.h | 1 +
src/libserver/cfg_utils.c | 1 +
src/libserver/worker_util.c | 91 +++++++++++++++++++++++++++++++++++++++++++++
src/rspamd.c | 1 +
src/rspamd.h | 9 ++++-
7 files changed, 109 insertions(+), 1 deletion(-)
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 263d00f38..7186a73ec 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -380,6 +380,7 @@ struct rspamd_config {
gsize images_cache_size; /**< size of LRU cache for DCT data from images */
gdouble task_timeout; /**< maximum message processing time */
gint default_max_shots; /**< default maximum count of symbols hits permitted (-1 for unlimited) */
+ gdouble heartbeat_interval; /**< interval for heartbeats for workers */
enum rspamd_log_type log_type; /**< log type */
gint log_facility; /**< log facility in case of syslog */
diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c
index fb2cbf052..5a1d3a639 100644
--- a/src/libserver/cfg_rcl.c
+++ b/src/libserver/cfg_rcl.c
@@ -2182,6 +2182,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
G_STRUCT_OFFSET (struct rspamd_config, full_gc_iters),
RSPAMD_CL_FLAG_UINT,
"Task scanned before memory gc is performed (default: 0 - disabled)");
+ rspamd_rcl_add_default_handler (sub,
+ "heartbeat_interval",
+ rspamd_rcl_parse_struct_time,
+ G_STRUCT_OFFSET (struct rspamd_config, heartbeat_interval),
+ RSPAMD_CL_FLAG_TIME_FLOAT,
+ "Time between workers heartbeats");
/* Neighbours configuration */
rspamd_rcl_add_section_doc (&sub->subsections, "neighbours", "name",
diff --git a/src/libserver/cfg_rcl.h b/src/libserver/cfg_rcl.h
index 7f97b100d..1a2d69c58 100644
--- a/src/libserver/cfg_rcl.h
+++ b/src/libserver/cfg_rcl.h
@@ -413,6 +413,7 @@ ucl_object_t *rspamd_rcl_add_doc_by_path (struct rspamd_config *cfg,
const char *default_value,
gboolean required);
+
/**
* Parses example and adds documentation according to the example:
*
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index 619e8784e..bd8595514 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -233,6 +233,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags)
cfg->max_sessions_cache = DEFAULT_MAX_SESSIONS;
cfg->maps_cache_dir = rspamd_mempool_strdup (cfg->cfg_pool, RSPAMD_DBDIR);
cfg->c_modules = g_ptr_array_new ();
+ cfg->heartbeat_interval = 10.0;
REF_INIT_RETAIN (cfg, rspamd_config_free);
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 511289a2d..47812ded1 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -63,6 +63,10 @@
#include "contrib/libev/ev.h"
+/* Forward declaration */
+static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
+ struct ev_loop *);
+
static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
/**
* Return worker's control structure by its type
@@ -362,6 +366,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
rspamd_worker_init_signals (worker, event_loop);
rspamd_control_worker_add_default_handler (worker, event_loop);
+ rspamd_worker_heartbeat_start (worker, event_loop);
#ifdef WITH_HIREDIS
rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
worker->srv->cfg, event_loop);
@@ -683,6 +688,91 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
}
}
+static void
+rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+}
+
+static void
+rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+ wrk->hb.heartbeat_ev.data = (void *)wrk;
+ ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb,
+ 0.0, wrk->srv->cfg->heartbeat_interval);
+ ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
+static void
+rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+ gdouble time_from_last = ev_time ();
+ struct rspamd_main *rspamd_main;
+ struct tm tm;
+ gchar timebuf[64];
+ gchar usec_buf[16];
+ gint r;
+
+ time_from_last -= wrk->hb.last_event;
+ rspamd_main = wrk->srv;
+
+ if (time_from_last > 0 && time_from_last > rspamd_main->cfg->heartbeat_interval) {
+ rspamd_localtime (wrk->hb.last_event, &tm);
+ r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+ rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+ wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+ rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+ "%s", usec_buf + 1);
+
+ if (wrk->hb.nbeats > 0) {
+ /* First time lost event */
+ msg_warn_main ("lost heartbeat from worker type %s with pid %P, "
+ "last beat on: %s (%L beats received previously)",
+ g_quark_to_string (wrk->type), wrk->pid,
+ timebuf,
+ wrk->hb.nbeats);
+ wrk->hb.nbeats = -1;
+ /* TODO: send notify about worker problem */
+ }
+ else {
+ wrk->hb.nbeats --;
+ msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, "
+ "last beat on: %s",
+ -(wrk->hb.nbeats),
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ timebuf);
+ }
+ }
+ else if (wrk->hb.nbeats < 0) {
+ rspamd_localtime (wrk->hb.last_event, &tm);
+ r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+ rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+ wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+ rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+ "%s", usec_buf + 1);
+
+ msg_info_main ("received heartbeat from worker type %s with pid %P, "
+ "last beat on: %s (%L beats lost previously)",
+ g_quark_to_string (wrk->type), wrk->pid,
+ timebuf,
+ -(wrk->hb.nbeats));
+ wrk->hb.nbeats = 1;
+ /* TODO: send notify about worker restoration */
+ }
+}
+
+static void
+rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+ wrk->hb.heartbeat_ev.data = (void *)wrk;
+ ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb,
+ 0.0, wrk->srv->cfg->heartbeat_interval * 2);
+ ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
struct rspamd_worker *
rspamd_fork_worker (struct rspamd_main *rspamd_main,
struct rspamd_worker_conf *cf,
@@ -822,6 +912,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
wrk->cld_ev.data = wrk;
ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
+ rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
wrk->pid), wrk);
diff --git a/src/rspamd.c b/src/rspamd.c
index 458d3d083..808ca6aaa 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -1008,6 +1008,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
g_free (wrk->tmp_data);
}
ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
+ ev_timer_stop (rspamd_main->event_loop, &wrk->hb.heartbeat_ev);
}
if (wrk->control_pipe[0] != -1) {
diff --git a/src/rspamd.h b/src/rspamd.h
index ea11965fb..3cd6c391b 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -76,6 +76,12 @@ struct rspamd_worker_accept_event {
typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *,
struct rspamd_worker *);
+struct rspamd_worker_heartbeat {
+ ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */
+ ev_tstamp last_event;
+ gint64 nbeats; /* positive for beats received, negative for beats missed */
+};
+
/**
* Worker process structure
*/
@@ -90,7 +96,7 @@ struct rspamd_worker {
struct rspamd_main *srv; /**< pointer to server structure */
GQuark type; /**< process type */
GHashTable *signal_events; /**< signal events */
- struct rspamd_worker_accept_event *accept_events; /**< socket events */
+ struct rspamd_worker_accept_event *accept_events; /**< socket events */
struct rspamd_worker_conf *cf; /**< worker config data */
gpointer ctx; /**< worker's specific data */
enum rspamd_worker_flags flags; /**< worker's flags */
@@ -99,6 +105,7 @@ struct rspamd_worker {
gint srv_pipe[2]; /**< used by workers to request something from the
main process. [0] - main, [1] - worker */
ev_io srv_ev; /**< used by main for read workers' requests */
+ struct rspamd_worker_heartbeat hb; /**< heartbeat data */
gpointer control_data; /**< used by control protocol to handle commands */
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
GPtrArray *finish_actions; /**< called when worker is terminated */
More information about the Commits
mailing list