commit 82ca7ec: [Rework] Replace controller functions by any scanner worker if needed

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Nov 11 15:35:07 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-11-11 13:57:56 +0000
URL: https://github.com/rspamd/rspamd/commit/82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22

[Rework] Replace controller functions by any scanner worker if needed

---
 conf/options.inc            |   3 +
 src/controller.c            | 107 ++++++++++++++------------------
 src/libserver/cfg_file.h    |   1 +
 src/libserver/cfg_rcl.c     |   6 ++
 src/libserver/worker_util.c | 100 ++++++++++++++++++++++++++++++
 src/libserver/worker_util.h |   6 ++
 src/lua/lua_expression.c    |   4 +-
 src/rspamd_proxy.c          |  39 +++++++++++-
 src/worker.c                | 145 ++++++++++++--------------------------------
 9 files changed, 242 insertions(+), 169 deletions(-)

diff --git a/conf/options.inc b/conf/options.inc
index f600fdb9c..e635262e3 100644
--- a/conf/options.inc
+++ b/conf/options.inc
@@ -57,6 +57,9 @@ words_decay = 600;
 # Write statistics about rspamd usage to the round-robin database
 rrd = "${DBDIR}/rspamd.rrd";
 
+# Write statistics for `rspamc` here
+stats_file = "${DBDIR}/stats.ucl";
+
 # Local networks
 local_addrs = [192.168.0.0/16, 10.0.0.0/8, 172.16.0.0/12, fd00::/8, 169.254.0.0/16, fe80::/10];
 hs_cache_dir = "${DBDIR}/";
diff --git a/src/controller.c b/src/controller.c
index 7b6ecff4a..28e12a1c7 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -38,8 +38,6 @@
 /* 60 seconds for worker's IO */
 #define DEFAULT_WORKER_IO_TIMEOUT 60000
 
-#define DEFAULT_STATS_PATH RSPAMD_DBDIR "/stats.ucl"
-
 /* HTTP paths */
 #define PATH_AUTH "/auth"
 #define PATH_SYMBOLS "/symbols"
@@ -162,9 +160,6 @@ struct rspamd_controller_worker_ctx {
 	/* Static files dir */
 	gchar *static_files_dir;
 
-	/* Saved statistics path */
-	gchar *saved_stats_path;
-
 	/* Custom commands registered by plugins */
 	GHashTable *custom_commands;
 
@@ -3141,7 +3136,8 @@ rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
 }
 
 static void
-rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
+rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
+		struct rspamd_config *cfg)
 {
 	struct ucl_parser *parser;
 	ucl_object_t *obj;
@@ -3149,19 +3145,21 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 	struct rspamd_stat *stat, stat_copy;
 	gint i;
 
-	g_assert (ctx->saved_stats_path != NULL);
+	if (cfg->stats_file == NULL) {
+		return;
+	}
 
-	if (access (ctx->saved_stats_path, R_OK) == -1) {
-		msg_err_ctx ("cannot load controller stats from %s: %s",
-				ctx->saved_stats_path, strerror (errno));
+	if (access (cfg->stats_file, R_OK) == -1) {
+		msg_err_config ("cannot load controller stats from %s: %s",
+				cfg->stats_file, strerror (errno));
 		return;
 	}
 
 	parser = ucl_parser_new (0);
 
-	if (!ucl_parser_add_file (parser, ctx->saved_stats_path)) {
-		msg_err_ctx ("cannot parse controller stats from %s: %s",
-				ctx->saved_stats_path, ucl_parser_get_error (parser));
+	if (!ucl_parser_add_file (parser, cfg->stats_file)) {
+		msg_err_config ("cannot parse controller stats from %s: %s",
+				cfg->stats_file, ucl_parser_get_error (parser));
 		ucl_parser_free (parser);
 
 		return;
@@ -3170,7 +3168,7 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 	obj = ucl_parser_get_object (parser);
 	ucl_parser_free (parser);
 
-	stat = ctx->srv->stat;
+	stat = rspamd_main->stat;
 	memcpy (&stat_copy, stat, sizeof (stat_copy));
 
 	elt = ucl_object_lookup (obj, "scanned");
@@ -3214,32 +3212,29 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 }
 
 static void
-rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
+rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
+		struct rspamd_config *cfg)
 {
 	struct rspamd_stat *stat;
 	ucl_object_t *top, *sub;
 	struct ucl_emitter_functions *efuncs;
 	gint i, fd;
+	gchar fpath[PATH_MAX], *tmpfile;
 
-	g_assert (ctx->saved_stats_path != NULL);
-
-	fd = open (ctx->saved_stats_path, O_WRONLY|O_CREAT|O_TRUNC, 00644);
-
-	if (fd == -1) {
-		msg_err_ctx ("cannot open for writing controller stats from %s: %s",
-				ctx->saved_stats_path, strerror (errno));
+	if (cfg->stats_file == NULL) {
 		return;
 	}
 
-	if (rspamd_file_lock (fd, FALSE) == -1) {
-		msg_err_ctx ("cannot lock controller stats in %s: %s",
-				ctx->saved_stats_path, strerror (errno));
-		close (fd);
+	rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
+	fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
 
+	if (fd == -1) {
+		msg_err_config ("cannot open for writing controller stats from %s: %s",
+				fpath, strerror (errno));
 		return;
 	}
 
-	stat = ctx->srv->stat;
+	stat = rspamd_main->stat;
 
 	top = ucl_object_typed_new (UCL_OBJECT);
 	ucl_object_insert_key (top, ucl_object_fromint (
@@ -3258,18 +3253,28 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 	}
 
 	ucl_object_insert_key (top,
-			ucl_object_fromint (stat->connections_count), "connections", 0, false);
+			ucl_object_fromint (stat->connections_count),
+			"connections", 0, false);
 	ucl_object_insert_key (top,
 			ucl_object_fromint (stat->control_connections_count),
 			"control_connections", 0, false);
 
-
 	efuncs = ucl_object_emit_fd_funcs (fd);
-	ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
-			efuncs, NULL);
+	if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
+			efuncs, NULL)) {
+		msg_err_config ("cannot write stats to %s: %s",
+				fpath, strerror (errno));
+
+		unlink (fpath);
+	}
+	else {
+		if (rename (fpath, cfg->stats_file) == -1) {
+			msg_err_config ("cannot rename stats from %s to %s: %s",
+					fpath, cfg->stats_file, strerror (errno));
+		}
+	}
 
 	ucl_object_unref (top);
-	rspamd_file_unlock (fd, FALSE);
 	close (fd);
 	ucl_object_emit_funcs_free (efuncs);
 }
@@ -3280,7 +3285,7 @@ rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
 	struct rspamd_controller_worker_ctx *ctx =
 			(struct rspamd_controller_worker_ctx *)w->data;
 
-	rspamd_controller_store_saved_stats (ctx);
+	rspamd_controller_store_saved_stats (ctx->srv, ctx->cfg);
 	ev_timer_again (EV_A_ w);
 }
 
@@ -3415,16 +3420,6 @@ init_controller_worker (struct rspamd_config *cfg)
 			0,
 			"Encryption keypair");
 
-	rspamd_rcl_register_worker_option (cfg,
-			type,
-			"stats_path",
-			rspamd_rcl_parse_struct_string,
-			ctx,
-			G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
-					saved_stats_path),
-			0,
-			"Directory where controller saves server's statistics between restarts");
-
 	rspamd_rcl_register_worker_option (cfg,
 			type,
 			"task_timeout",
@@ -3563,18 +3558,10 @@ lua_csession_send_string (lua_State *L)
 	return 0;
 }
 
-static void
+void
 rspamd_controller_on_terminate (struct rspamd_worker *worker)
 {
-	struct rspamd_controller_worker_ctx *ctx = worker->ctx;
-
-	rspamd_controller_store_saved_stats (ctx);
-
-	if (ctx->rrd) {
-		msg_info ("closing rrd file: %s", ctx->rrd->filename);
-		ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
-		rspamd_rrd_close (ctx->rrd);
-	}
+	rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
 }
 
 static void
@@ -3724,13 +3711,7 @@ start_controller_worker (struct rspamd_worker *worker)
 				&ctx->secure_map, NULL);
 	}
 
-	if (ctx->saved_stats_path == NULL) {
-		/* Assume default path */
-		ctx->saved_stats_path = rspamd_mempool_strdup (worker->srv->cfg->cfg_pool,
-				DEFAULT_STATS_PATH);
-	}
-
-	rspamd_controller_load_saved_stats (ctx);
+	rspamd_controller_load_saved_stats (ctx->srv, ctx->cfg);
 	ctx->lang_det = ctx->cfg->lang_det;
 
 	/* RRD collector */
@@ -3925,6 +3906,12 @@ start_controller_worker (struct rspamd_worker *worker)
 	rspamd_worker_block_signals ();
 	rspamd_controller_on_terminate (worker);
 
+	if (ctx->rrd) {
+		msg_info ("closing rrd file: %s", ctx->rrd->filename);
+		ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
+		rspamd_rrd_close (ctx->rrd);
+	}
+
 	rspamd_stat_close ();
 	rspamd_http_router_free (ctx->http);
 
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 750e48325..e604fe43a 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -439,6 +439,7 @@ struct rspamd_config {
 
 	gchar *rrd_file;                               /**< rrd file to store statistics						*/
 	gchar *history_file;                           /**< file to save rolling history						*/
+	gchar *stats_file;                           /**< file to save stats 						*/
 	gchar *tld_file;                               /**< file to load effective tld list from				*/
 	gchar *hs_cache_dir;                           /**< directory to save hyperscan databases				*/
 	gchar *events_backend;                         /**< string representation of the events backend used	*/
diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c
index c7d414df4..01b0c43ea 100644
--- a/src/libserver/cfg_rcl.c
+++ b/src/libserver/cfg_rcl.c
@@ -1963,6 +1963,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
 				G_STRUCT_OFFSET (struct rspamd_config, rrd_file),
 				RSPAMD_CL_FLAG_STRING_PATH,
 				"Path to RRD file");
+		rspamd_rcl_add_default_handler (sub,
+				"stats_file",
+				rspamd_rcl_parse_struct_string,
+				G_STRUCT_OFFSET (struct rspamd_config, stats_file),
+				RSPAMD_CL_FLAG_STRING_PATH,
+				"Path to stats file");
 		rspamd_rcl_add_default_handler (sub,
 				"history_file",
 				rspamd_rcl_parse_struct_string,
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 1a09dfce7..05997d8b3 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -62,6 +62,7 @@
 #endif
 
 #include "contrib/libev/ev.h"
+#include "libstat/stat_api.h"
 
 /* Forward declaration */
 static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
@@ -1616,4 +1617,103 @@ rspamd_worker_check_context (gpointer ctx, guint64 magic)
 	struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx;
 
 	return actx->magic == magic;
+}
+
+static gboolean
+rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
+								struct rspamd_worker *worker, gint fd,
+								gint attached_fd,
+								struct rspamd_control_command *cmd,
+								gpointer ud)
+{
+	struct rspamd_config *cfg = ud;
+	struct rspamd_worker_log_pipe *lp;
+	struct rspamd_control_reply rep;
+
+	memset (&rep, 0, sizeof (rep));
+	rep.type = RSPAMD_CONTROL_LOG_PIPE;
+
+	if (attached_fd != -1) {
+		lp = g_malloc0 (sizeof (*lp));
+		lp->fd = attached_fd;
+		lp->type = cmd->cmd.log_pipe.type;
+
+		DL_APPEND (cfg->log_pipes, lp);
+		msg_info ("added new log pipe");
+	}
+	else {
+		rep.reply.log_pipe.status = ENOENT;
+		msg_err ("cannot attach log pipe: invalid fd");
+	}
+
+	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+		msg_err ("cannot write reply to the control socket: %s",
+				strerror (errno));
+	}
+
+	return TRUE;
+}
+
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+								 struct rspamd_worker *worker, gint fd,
+								 gint attached_fd,
+								 struct rspamd_control_command *cmd,
+								 gpointer ud)
+{
+	struct rspamd_control_reply rep;
+	struct rspamd_monitored *m;
+	struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+	struct rspamd_config *cfg = ud;
+
+	memset (&rep, 0, sizeof (rep));
+	rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+
+	if (cmd->cmd.monitored_change.sender != getpid ()) {
+		m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+		if (m != NULL) {
+			rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+			rep.reply.monitored_change.status = 1;
+			msg_info_config ("updated monitored status for %s: %s",
+					cmd->cmd.monitored_change.tag,
+					cmd->cmd.monitored_change.alive ? "alive" : "dead");
+		} else {
+			msg_err ("cannot find monitored by tag: %*s", 32,
+					cmd->cmd.monitored_change.tag);
+			rep.reply.monitored_change.status = 0;
+		}
+	}
+
+	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+		msg_err ("cannot write reply to the control socket: %s",
+				strerror (errno));
+	}
+
+	return TRUE;
+}
+
+void
+rspamd_worker_init_scanner (struct rspamd_worker *worker,
+							struct ev_loop *ev_base,
+							struct rspamd_dns_resolver *resolver,
+							struct rspamd_lang_detector **plang_det)
+{
+	rspamd_stat_init (worker->srv->cfg, ev_base);
+#ifdef WITH_HYPERSCAN
+	rspamd_control_worker_add_cmd_handler (worker,
+			RSPAMD_CONTROL_HYPERSCAN_LOADED,
+			rspamd_worker_hyperscan_ready,
+			NULL);
+#endif
+	rspamd_control_worker_add_cmd_handler (worker,
+			RSPAMD_CONTROL_LOG_PIPE,
+			rspamd_worker_log_pipe_handler,
+			worker->srv->cfg);
+	rspamd_control_worker_add_cmd_handler (worker,
+			RSPAMD_CONTROL_MONITORED_CHANGE,
+			rspamd_worker_monitored_handler,
+			worker->srv->cfg);
+
+	*plang_det = worker->srv->cfg->lang_det;
 }
\ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 15d79df2f..b94d8bd9b 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -244,6 +244,12 @@ gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
  */
 gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
 
+/**
+ * Defined in controller.c
+ * @param worker
+ */
+extern void rspamd_controller_on_terminate (struct rspamd_worker *worker);
+
 #ifdef WITH_HYPERSCAN
 struct rspamd_control_command;
 
diff --git a/src/lua/lua_expression.c b/src/lua/lua_expression.c
index 60ee8fdf7..b2addd30c 100644
--- a/src/lua/lua_expression.c
+++ b/src/lua/lua_expression.c
@@ -36,12 +36,12 @@ local rspamd_mempool = require "rspamd_mempool"
 
 local function parse_func(str)
 	-- extract token till the first space character
-	local token = table.join('', take_while(function(s) return s ~= ' ' end, str))
+	local token = table.concat(totable(take_while(function(s) return s ~= ' ' end, iter(str))))
 	-- Return token name
 	return token
 end
 
-local function process_func(token, task)
+local function process_func(token)
 	-- Do something using token and task
 end
 
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 227e20c99..0c9687c7e 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -2225,7 +2225,6 @@ start_rspamd_proxy (struct rspamd_worker *worker)
 	ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
 			ctx->event_loop,
 			worker->srv->cfg);
-	rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
 
 	rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
 			ctx->event_loop, ctx->resolver->r);
@@ -2236,11 +2235,49 @@ start_rspamd_proxy (struct rspamd_worker *worker)
 			(rspamd_mempool_destruct_t)rspamd_http_context_free,
 			ctx->http_ctx);
 
+	rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
+			worker, 0);
+
 	if (ctx->has_self_scan) {
 		/* Additional initialisation needed */
 		rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
 				&ctx->lang_det);
 
+		if (worker->index == 0) {
+			/*
+			 * If there are no controllers and no normal workers,
+			 * then pretend that we are a controller
+			 */
+			gboolean controller_seen = FALSE;
+			GList *cur;
+
+			cur = worker->srv->cfg->workers;
+
+			while (cur) {
+				struct rspamd_worker_conf *cf;
+
+				cf = (struct rspamd_worker_conf *)cur->data;
+				if ((cf->type == g_quark_from_static_string ("controller")) ||
+						(cf->type == g_quark_from_static_string ("normal"))) {
+
+					if (cf->enabled && cf->count >= 0) {
+						controller_seen = TRUE;
+						break;
+					}
+				}
+
+				cur = g_list_next (cur);
+			}
+
+			if (!controller_seen) {
+				msg_info ("no controller or normal workers defined, execute "
+							  "controller periodics in this worker");
+				worker->flags |= RSPAMD_WORKER_CONTROLLER;
+			}
+		}
+	}
+	else {
+		worker->flags &= ~RSPAMD_WORKER_SCANNER;
 	}
 
 	if (worker->srv->cfg->enable_sessions_cache) {
diff --git a/src/worker.c b/src/worker.c
index 81ec0904a..193c74319 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -27,16 +27,13 @@
 #include "libserver/dns.h"
 #include "libmime/message.h"
 #include "rspamd.h"
-#include "keypairs_cache.h"
 #include "libstat/stat_api.h"
 #include "libserver/worker_util.h"
 #include "libserver/rspamd_control.h"
 #include "worker_private.h"
-#include "utlist.h"
 #include "libutil/http_private.h"
-#include "libmime/lang_detection.h"
+#include "libserver/cfg_file_private.h"
 #include <math.h>
-#include <src/libserver/cfg_file_private.h>
 #include "unix-std.h"
 
 #include "lua/lua_common.h"
@@ -57,15 +54,15 @@ worker_t normal_worker = {
 };
 
 #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
-        "controller", ctx->cfg->cfg_pool->tag.uid, \
+        "worker", ctx->cfg->cfg_pool->tag.uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_warn_ctx(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
-        "controller", ctx->cfg->cfg_pool->tag.uid, \
+        "worker", ctx->cfg->cfg_pool->tag.uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_info_ctx(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
-        "controller", ctx->cfg->cfg_pool->tag.uid, \
+        "worker", ctx->cfg->cfg_pool->tag.uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 
@@ -405,80 +402,6 @@ accept_socket (EV_P_ ev_io *w, int revents)
 			ctx->timeout);
 }
 
-static gboolean
-rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
-		struct rspamd_worker *worker, gint fd,
-		gint attached_fd,
-		struct rspamd_control_command *cmd,
-		gpointer ud)
-{
-	struct rspamd_config *cfg = ud;
-	struct rspamd_worker_log_pipe *lp;
-	struct rspamd_control_reply rep;
-
-	memset (&rep, 0, sizeof (rep));
-	rep.type = RSPAMD_CONTROL_LOG_PIPE;
-
-	if (attached_fd != -1) {
-		lp = g_malloc0 (sizeof (*lp));
-		lp->fd = attached_fd;
-		lp->type = cmd->cmd.log_pipe.type;
-
-		DL_APPEND (cfg->log_pipes, lp);
-		msg_info ("added new log pipe");
-	}
-	else {
-		rep.reply.log_pipe.status = ENOENT;
-		msg_err ("cannot attach log pipe: invalid fd");
-	}
-
-	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
-		msg_err ("cannot write reply to the control socket: %s",
-				strerror (errno));
-	}
-
-	return TRUE;
-}
-
-static gboolean
-rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
-		struct rspamd_worker *worker, gint fd,
-		gint attached_fd,
-		struct rspamd_control_command *cmd,
-		gpointer ud)
-{
-	struct rspamd_control_reply rep;
-	struct rspamd_monitored *m;
-	struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
-	struct rspamd_config *cfg = ud;
-
-	memset (&rep, 0, sizeof (rep));
-	rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
-
-	if (cmd->cmd.monitored_change.sender != getpid ()) {
-		m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
-
-		if (m != NULL) {
-			rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
-			rep.reply.monitored_change.status = 1;
-			msg_info_config ("updated monitored status for %s: %s",
-					cmd->cmd.monitored_change.tag,
-					cmd->cmd.monitored_change.alive ? "alive" : "dead");
-		} else {
-			msg_err ("cannot find monitored by tag: %*s", 32,
-					cmd->cmd.monitored_change.tag);
-			rep.reply.monitored_change.status = 0;
-		}
-	}
-
-	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
-		msg_err ("cannot write reply to the control socket: %s",
-				strerror (errno));
-	}
-
-	return TRUE;
-}
-
 gpointer
 init_worker (struct rspamd_config *cfg)
 {
@@ -557,31 +480,6 @@ init_worker (struct rspamd_config *cfg)
 	return ctx;
 }
 
-void
-rspamd_worker_init_scanner (struct rspamd_worker *worker,
-		struct ev_loop *ev_base,
-		struct rspamd_dns_resolver *resolver,
-		struct rspamd_lang_detector **plang_det)
-{
-	rspamd_stat_init (worker->srv->cfg, ev_base);
-#ifdef WITH_HYPERSCAN
-	rspamd_control_worker_add_cmd_handler (worker,
-			RSPAMD_CONTROL_HYPERSCAN_LOADED,
-			rspamd_worker_hyperscan_ready,
-			NULL);
-#endif
-	rspamd_control_worker_add_cmd_handler (worker,
-			RSPAMD_CONTROL_LOG_PIPE,
-			rspamd_worker_log_pipe_handler,
-			worker->srv->cfg);
-	rspamd_control_worker_add_cmd_handler (worker,
-			RSPAMD_CONTROL_MONITORED_CHANGE,
-			rspamd_worker_monitored_handler,
-			worker->srv->cfg);
-
-	*plang_det = worker->srv->cfg->lang_det;
-}
-
 /*
  * Start worker process
  */
@@ -589,6 +487,7 @@ void
 start_worker (struct rspamd_worker *worker)
 {
 	struct rspamd_worker_ctx *ctx = worker->ctx;
+	gboolean is_controller = FALSE;
 
 	g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
 	ctx->cfg = worker->srv->cfg;
@@ -619,12 +518,46 @@ start_worker (struct rspamd_worker *worker)
 			ctx->http_ctx);
 	rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
 			&ctx->lang_det);
+
+	if (worker->index == 0) {
+		/* If there are no controllers, then pretend that we are a controller */
+		gboolean controller_seen = FALSE;
+		GList *cur;
+
+		cur = worker->srv->cfg->workers;
+
+		while (cur) {
+			struct rspamd_worker_conf *cf;
+
+			cf = (struct rspamd_worker_conf *)cur->data;
+			if (cf->type == g_quark_from_static_string ("controller")) {
+				if (cf->enabled && cf->count >= 0) {
+					controller_seen = TRUE;
+					break;
+				}
+			}
+
+			cur = g_list_next (cur);
+		}
+
+		if (!controller_seen) {
+			msg_info_ctx ("no controller workers defined, execute "
+				 "controller periodics in this worker");
+			worker->flags |= RSPAMD_WORKER_CONTROLLER;
+			is_controller = TRUE;
+		}
+	}
+
 	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
 			worker);
 
 	ev_loop (ctx->event_loop, 0);
 	rspamd_worker_block_signals ();
*** OUTPUT TRUNCATED, 8 LINES SKIPPED ***


More information about the Commits mailing list