commit 82ca7ec: [Rework] Replace controller functions by any scanner worker if needed
Vsevolod Stakhov
vsevolod at highsecure.ru
Mon Nov 11 15:35:07 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-11-11 13:57:56 +0000
URL: https://github.com/rspamd/rspamd/commit/82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22
[Rework] Replace controller functions by any scanner worker if needed
---
conf/options.inc | 3 +
src/controller.c | 107 ++++++++++++++------------------
src/libserver/cfg_file.h | 1 +
src/libserver/cfg_rcl.c | 6 ++
src/libserver/worker_util.c | 100 ++++++++++++++++++++++++++++++
src/libserver/worker_util.h | 6 ++
src/lua/lua_expression.c | 4 +-
src/rspamd_proxy.c | 39 +++++++++++-
src/worker.c | 145 ++++++++++++--------------------------------
9 files changed, 242 insertions(+), 169 deletions(-)
diff --git a/conf/options.inc b/conf/options.inc
index f600fdb9c..e635262e3 100644
--- a/conf/options.inc
+++ b/conf/options.inc
@@ -57,6 +57,9 @@ words_decay = 600;
# Write statistics about rspamd usage to the round-robin database
rrd = "${DBDIR}/rspamd.rrd";
+# Write statistics for `rspamc` here
+stats_file = "${DBDIR}/stats.ucl";
+
# Local networks
local_addrs = [192.168.0.0/16, 10.0.0.0/8, 172.16.0.0/12, fd00::/8, 169.254.0.0/16, fe80::/10];
hs_cache_dir = "${DBDIR}/";
diff --git a/src/controller.c b/src/controller.c
index 7b6ecff4a..28e12a1c7 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -38,8 +38,6 @@
/* 60 seconds for worker's IO */
#define DEFAULT_WORKER_IO_TIMEOUT 60000
-#define DEFAULT_STATS_PATH RSPAMD_DBDIR "/stats.ucl"
-
/* HTTP paths */
#define PATH_AUTH "/auth"
#define PATH_SYMBOLS "/symbols"
@@ -162,9 +160,6 @@ struct rspamd_controller_worker_ctx {
/* Static files dir */
gchar *static_files_dir;
- /* Saved statistics path */
- gchar *saved_stats_path;
-
/* Custom commands registered by plugins */
GHashTable *custom_commands;
@@ -3141,7 +3136,8 @@ rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
}
static void
-rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
+rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
+ struct rspamd_config *cfg)
{
struct ucl_parser *parser;
ucl_object_t *obj;
@@ -3149,19 +3145,21 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
struct rspamd_stat *stat, stat_copy;
gint i;
- g_assert (ctx->saved_stats_path != NULL);
+ if (cfg->stats_file == NULL) {
+ return;
+ }
- if (access (ctx->saved_stats_path, R_OK) == -1) {
- msg_err_ctx ("cannot load controller stats from %s: %s",
- ctx->saved_stats_path, strerror (errno));
+ if (access (cfg->stats_file, R_OK) == -1) {
+ msg_err_config ("cannot load controller stats from %s: %s",
+ cfg->stats_file, strerror (errno));
return;
}
parser = ucl_parser_new (0);
- if (!ucl_parser_add_file (parser, ctx->saved_stats_path)) {
- msg_err_ctx ("cannot parse controller stats from %s: %s",
- ctx->saved_stats_path, ucl_parser_get_error (parser));
+ if (!ucl_parser_add_file (parser, cfg->stats_file)) {
+ msg_err_config ("cannot parse controller stats from %s: %s",
+ cfg->stats_file, ucl_parser_get_error (parser));
ucl_parser_free (parser);
return;
@@ -3170,7 +3168,7 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
obj = ucl_parser_get_object (parser);
ucl_parser_free (parser);
- stat = ctx->srv->stat;
+ stat = rspamd_main->stat;
memcpy (&stat_copy, stat, sizeof (stat_copy));
elt = ucl_object_lookup (obj, "scanned");
@@ -3214,32 +3212,29 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
}
static void
-rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
+rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
+ struct rspamd_config *cfg)
{
struct rspamd_stat *stat;
ucl_object_t *top, *sub;
struct ucl_emitter_functions *efuncs;
gint i, fd;
+ gchar fpath[PATH_MAX], *tmpfile;
- g_assert (ctx->saved_stats_path != NULL);
-
- fd = open (ctx->saved_stats_path, O_WRONLY|O_CREAT|O_TRUNC, 00644);
-
- if (fd == -1) {
- msg_err_ctx ("cannot open for writing controller stats from %s: %s",
- ctx->saved_stats_path, strerror (errno));
+ if (cfg->stats_file == NULL) {
return;
}
- if (rspamd_file_lock (fd, FALSE) == -1) {
- msg_err_ctx ("cannot lock controller stats in %s: %s",
- ctx->saved_stats_path, strerror (errno));
- close (fd);
+ rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
+ fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
+ if (fd == -1) {
+ msg_err_config ("cannot open for writing controller stats from %s: %s",
+ fpath, strerror (errno));
return;
}
- stat = ctx->srv->stat;
+ stat = rspamd_main->stat;
top = ucl_object_typed_new (UCL_OBJECT);
ucl_object_insert_key (top, ucl_object_fromint (
@@ -3258,18 +3253,28 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
}
ucl_object_insert_key (top,
- ucl_object_fromint (stat->connections_count), "connections", 0, false);
+ ucl_object_fromint (stat->connections_count),
+ "connections", 0, false);
ucl_object_insert_key (top,
ucl_object_fromint (stat->control_connections_count),
"control_connections", 0, false);
-
efuncs = ucl_object_emit_fd_funcs (fd);
- ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
- efuncs, NULL);
+ if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
+ efuncs, NULL)) {
+ msg_err_config ("cannot write stats to %s: %s",
+ fpath, strerror (errno));
+
+ unlink (fpath);
+ }
+ else {
+ if (rename (fpath, cfg->stats_file) == -1) {
+ msg_err_config ("cannot rename stats from %s to %s: %s",
+ fpath, cfg->stats_file, strerror (errno));
+ }
+ }
ucl_object_unref (top);
- rspamd_file_unlock (fd, FALSE);
close (fd);
ucl_object_emit_funcs_free (efuncs);
}
@@ -3280,7 +3285,7 @@ rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
struct rspamd_controller_worker_ctx *ctx =
(struct rspamd_controller_worker_ctx *)w->data;
- rspamd_controller_store_saved_stats (ctx);
+ rspamd_controller_store_saved_stats (ctx->srv, ctx->cfg);
ev_timer_again (EV_A_ w);
}
@@ -3415,16 +3420,6 @@ init_controller_worker (struct rspamd_config *cfg)
0,
"Encryption keypair");
- rspamd_rcl_register_worker_option (cfg,
- type,
- "stats_path",
- rspamd_rcl_parse_struct_string,
- ctx,
- G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
- saved_stats_path),
- 0,
- "Directory where controller saves server's statistics between restarts");
-
rspamd_rcl_register_worker_option (cfg,
type,
"task_timeout",
@@ -3563,18 +3558,10 @@ lua_csession_send_string (lua_State *L)
return 0;
}
-static void
+void
rspamd_controller_on_terminate (struct rspamd_worker *worker)
{
- struct rspamd_controller_worker_ctx *ctx = worker->ctx;
-
- rspamd_controller_store_saved_stats (ctx);
-
- if (ctx->rrd) {
- msg_info ("closing rrd file: %s", ctx->rrd->filename);
- ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
- rspamd_rrd_close (ctx->rrd);
- }
+ rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
}
static void
@@ -3724,13 +3711,7 @@ start_controller_worker (struct rspamd_worker *worker)
&ctx->secure_map, NULL);
}
- if (ctx->saved_stats_path == NULL) {
- /* Assume default path */
- ctx->saved_stats_path = rspamd_mempool_strdup (worker->srv->cfg->cfg_pool,
- DEFAULT_STATS_PATH);
- }
-
- rspamd_controller_load_saved_stats (ctx);
+ rspamd_controller_load_saved_stats (ctx->srv, ctx->cfg);
ctx->lang_det = ctx->cfg->lang_det;
/* RRD collector */
@@ -3925,6 +3906,12 @@ start_controller_worker (struct rspamd_worker *worker)
rspamd_worker_block_signals ();
rspamd_controller_on_terminate (worker);
+ if (ctx->rrd) {
+ msg_info ("closing rrd file: %s", ctx->rrd->filename);
+ ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
+ rspamd_rrd_close (ctx->rrd);
+ }
+
rspamd_stat_close ();
rspamd_http_router_free (ctx->http);
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 750e48325..e604fe43a 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -439,6 +439,7 @@ struct rspamd_config {
gchar *rrd_file; /**< rrd file to store statistics */
gchar *history_file; /**< file to save rolling history */
+ gchar *stats_file; /**< file to save stats */
gchar *tld_file; /**< file to load effective tld list from */
gchar *hs_cache_dir; /**< directory to save hyperscan databases */
gchar *events_backend; /**< string representation of the events backend used */
diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c
index c7d414df4..01b0c43ea 100644
--- a/src/libserver/cfg_rcl.c
+++ b/src/libserver/cfg_rcl.c
@@ -1963,6 +1963,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
G_STRUCT_OFFSET (struct rspamd_config, rrd_file),
RSPAMD_CL_FLAG_STRING_PATH,
"Path to RRD file");
+ rspamd_rcl_add_default_handler (sub,
+ "stats_file",
+ rspamd_rcl_parse_struct_string,
+ G_STRUCT_OFFSET (struct rspamd_config, stats_file),
+ RSPAMD_CL_FLAG_STRING_PATH,
+ "Path to stats file");
rspamd_rcl_add_default_handler (sub,
"history_file",
rspamd_rcl_parse_struct_string,
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 1a09dfce7..05997d8b3 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -62,6 +62,7 @@
#endif
#include "contrib/libev/ev.h"
+#include "libstat/stat_api.h"
/* Forward declaration */
static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
@@ -1616,4 +1617,103 @@ rspamd_worker_check_context (gpointer ctx, guint64 magic)
struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx;
return actx->magic == magic;
+}
+
+static gboolean
+rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_config *cfg = ud;
+ struct rspamd_worker_log_pipe *lp;
+ struct rspamd_control_reply rep;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_LOG_PIPE;
+
+ if (attached_fd != -1) {
+ lp = g_malloc0 (sizeof (*lp));
+ lp->fd = attached_fd;
+ lp->type = cmd->cmd.log_pipe.type;
+
+ DL_APPEND (cfg->log_pipes, lp);
+ msg_info ("added new log pipe");
+ }
+ else {
+ rep.reply.log_pipe.status = ENOENT;
+ msg_err ("cannot attach log pipe: invalid fd");
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_control_reply rep;
+ struct rspamd_monitored *m;
+ struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+ struct rspamd_config *cfg = ud;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+
+ if (cmd->cmd.monitored_change.sender != getpid ()) {
+ m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+ if (m != NULL) {
+ rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+ rep.reply.monitored_change.status = 1;
+ msg_info_config ("updated monitored status for %s: %s",
+ cmd->cmd.monitored_change.tag,
+ cmd->cmd.monitored_change.alive ? "alive" : "dead");
+ } else {
+ msg_err ("cannot find monitored by tag: %*s", 32,
+ cmd->cmd.monitored_change.tag);
+ rep.reply.monitored_change.status = 0;
+ }
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
+void
+rspamd_worker_init_scanner (struct rspamd_worker *worker,
+ struct ev_loop *ev_base,
+ struct rspamd_dns_resolver *resolver,
+ struct rspamd_lang_detector **plang_det)
+{
+ rspamd_stat_init (worker->srv->cfg, ev_base);
+#ifdef WITH_HYPERSCAN
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_HYPERSCAN_LOADED,
+ rspamd_worker_hyperscan_ready,
+ NULL);
+#endif
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_LOG_PIPE,
+ rspamd_worker_log_pipe_handler,
+ worker->srv->cfg);
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_MONITORED_CHANGE,
+ rspamd_worker_monitored_handler,
+ worker->srv->cfg);
+
+ *plang_det = worker->srv->cfg->lang_det;
}
\ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 15d79df2f..b94d8bd9b 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -244,6 +244,12 @@ gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
*/
gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
+/**
+ * Defined in controller.c
+ * @param worker
+ */
+extern void rspamd_controller_on_terminate (struct rspamd_worker *worker);
+
#ifdef WITH_HYPERSCAN
struct rspamd_control_command;
diff --git a/src/lua/lua_expression.c b/src/lua/lua_expression.c
index 60ee8fdf7..b2addd30c 100644
--- a/src/lua/lua_expression.c
+++ b/src/lua/lua_expression.c
@@ -36,12 +36,12 @@ local rspamd_mempool = require "rspamd_mempool"
local function parse_func(str)
-- extract token till the first space character
- local token = table.join('', take_while(function(s) return s ~= ' ' end, str))
+ local token = table.concat(totable(take_while(function(s) return s ~= ' ' end, iter(str))))
-- Return token name
return token
end
-local function process_func(token, task)
+local function process_func(token)
-- Do something using token and task
end
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 227e20c99..0c9687c7e 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -2225,7 +2225,6 @@ start_rspamd_proxy (struct rspamd_worker *worker)
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
ctx->event_loop,
worker->srv->cfg);
- rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->event_loop, ctx->resolver->r);
@@ -2236,11 +2235,49 @@ start_rspamd_proxy (struct rspamd_worker *worker)
(rspamd_mempool_destruct_t)rspamd_http_context_free,
ctx->http_ctx);
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
+ worker, 0);
+
if (ctx->has_self_scan) {
/* Additional initialisation needed */
rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
+ if (worker->index == 0) {
+ /*
+ * If there are no controllers and no normal workers,
+ * then pretend that we are a controller
+ */
+ gboolean controller_seen = FALSE;
+ GList *cur;
+
+ cur = worker->srv->cfg->workers;
+
+ while (cur) {
+ struct rspamd_worker_conf *cf;
+
+ cf = (struct rspamd_worker_conf *)cur->data;
+ if ((cf->type == g_quark_from_static_string ("controller")) ||
+ (cf->type == g_quark_from_static_string ("normal"))) {
+
+ if (cf->enabled && cf->count >= 0) {
+ controller_seen = TRUE;
+ break;
+ }
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ if (!controller_seen) {
+ msg_info ("no controller or normal workers defined, execute "
+ "controller periodics in this worker");
+ worker->flags |= RSPAMD_WORKER_CONTROLLER;
+ }
+ }
+ }
+ else {
+ worker->flags &= ~RSPAMD_WORKER_SCANNER;
}
if (worker->srv->cfg->enable_sessions_cache) {
diff --git a/src/worker.c b/src/worker.c
index 81ec0904a..193c74319 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -27,16 +27,13 @@
#include "libserver/dns.h"
#include "libmime/message.h"
#include "rspamd.h"
-#include "keypairs_cache.h"
#include "libstat/stat_api.h"
#include "libserver/worker_util.h"
#include "libserver/rspamd_control.h"
#include "worker_private.h"
-#include "utlist.h"
#include "libutil/http_private.h"
-#include "libmime/lang_detection.h"
+#include "libserver/cfg_file_private.h"
#include <math.h>
-#include <src/libserver/cfg_file_private.h>
#include "unix-std.h"
#include "lua/lua_common.h"
@@ -57,15 +54,15 @@ worker_t normal_worker = {
};
#define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
- "controller", ctx->cfg->cfg_pool->tag.uid, \
+ "worker", ctx->cfg->cfg_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_warn_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
- "controller", ctx->cfg->cfg_pool->tag.uid, \
+ "worker", ctx->cfg->cfg_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_info_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
- "controller", ctx->cfg->cfg_pool->tag.uid, \
+ "worker", ctx->cfg->cfg_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)
@@ -405,80 +402,6 @@ accept_socket (EV_P_ ev_io *w, int revents)
ctx->timeout);
}
-static gboolean
-rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
- struct rspamd_worker *worker, gint fd,
- gint attached_fd,
- struct rspamd_control_command *cmd,
- gpointer ud)
-{
- struct rspamd_config *cfg = ud;
- struct rspamd_worker_log_pipe *lp;
- struct rspamd_control_reply rep;
-
- memset (&rep, 0, sizeof (rep));
- rep.type = RSPAMD_CONTROL_LOG_PIPE;
-
- if (attached_fd != -1) {
- lp = g_malloc0 (sizeof (*lp));
- lp->fd = attached_fd;
- lp->type = cmd->cmd.log_pipe.type;
-
- DL_APPEND (cfg->log_pipes, lp);
- msg_info ("added new log pipe");
- }
- else {
- rep.reply.log_pipe.status = ENOENT;
- msg_err ("cannot attach log pipe: invalid fd");
- }
-
- if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
- msg_err ("cannot write reply to the control socket: %s",
- strerror (errno));
- }
-
- return TRUE;
-}
-
-static gboolean
-rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
- struct rspamd_worker *worker, gint fd,
- gint attached_fd,
- struct rspamd_control_command *cmd,
- gpointer ud)
-{
- struct rspamd_control_reply rep;
- struct rspamd_monitored *m;
- struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
- struct rspamd_config *cfg = ud;
-
- memset (&rep, 0, sizeof (rep));
- rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
-
- if (cmd->cmd.monitored_change.sender != getpid ()) {
- m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
-
- if (m != NULL) {
- rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
- rep.reply.monitored_change.status = 1;
- msg_info_config ("updated monitored status for %s: %s",
- cmd->cmd.monitored_change.tag,
- cmd->cmd.monitored_change.alive ? "alive" : "dead");
- } else {
- msg_err ("cannot find monitored by tag: %*s", 32,
- cmd->cmd.monitored_change.tag);
- rep.reply.monitored_change.status = 0;
- }
- }
-
- if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
- msg_err ("cannot write reply to the control socket: %s",
- strerror (errno));
- }
-
- return TRUE;
-}
-
gpointer
init_worker (struct rspamd_config *cfg)
{
@@ -557,31 +480,6 @@ init_worker (struct rspamd_config *cfg)
return ctx;
}
-void
-rspamd_worker_init_scanner (struct rspamd_worker *worker,
- struct ev_loop *ev_base,
- struct rspamd_dns_resolver *resolver,
- struct rspamd_lang_detector **plang_det)
-{
- rspamd_stat_init (worker->srv->cfg, ev_base);
-#ifdef WITH_HYPERSCAN
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_HYPERSCAN_LOADED,
- rspamd_worker_hyperscan_ready,
- NULL);
-#endif
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_LOG_PIPE,
- rspamd_worker_log_pipe_handler,
- worker->srv->cfg);
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_MONITORED_CHANGE,
- rspamd_worker_monitored_handler,
- worker->srv->cfg);
-
- *plang_det = worker->srv->cfg->lang_det;
-}
-
/*
* Start worker process
*/
@@ -589,6 +487,7 @@ void
start_worker (struct rspamd_worker *worker)
{
struct rspamd_worker_ctx *ctx = worker->ctx;
+ gboolean is_controller = FALSE;
g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
ctx->cfg = worker->srv->cfg;
@@ -619,12 +518,46 @@ start_worker (struct rspamd_worker *worker)
ctx->http_ctx);
rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
+
+ if (worker->index == 0) {
+ /* If there are no controllers, then pretend that we are a controller */
+ gboolean controller_seen = FALSE;
+ GList *cur;
+
+ cur = worker->srv->cfg->workers;
+
+ while (cur) {
+ struct rspamd_worker_conf *cf;
+
+ cf = (struct rspamd_worker_conf *)cur->data;
+ if (cf->type == g_quark_from_static_string ("controller")) {
+ if (cf->enabled && cf->count >= 0) {
+ controller_seen = TRUE;
+ break;
+ }
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ if (!controller_seen) {
+ msg_info_ctx ("no controller workers defined, execute "
+ "controller periodics in this worker");
+ worker->flags |= RSPAMD_WORKER_CONTROLLER;
+ is_controller = TRUE;
+ }
+ }
+
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
*** OUTPUT TRUNCATED, 8 LINES SKIPPED ***
More information about the Commits
mailing list