commit 63f823e: [Project] Further workers refactoring
Vsevolod Stakhov
vsevolod at highsecure.ru
Sat Jun 22 12:14:32 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-06-19 17:46:28 +0100
URL: https://github.com/rspamd/rspamd/commit/63f823eb9d6b4cfed6c3014ab350dfc61f33cb28
[Project] Further workers refactoring
---
src/CMakeLists.txt | 5 ++-
src/rspamadm/CMakeLists.txt | 3 +-
src/rspamadm/lua_repl.c | 2 +-
src/rspamd.c | 84 +++++++++++++++++++++------------------------
4 files changed, 43 insertions(+), 51 deletions(-)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index caa0b5d0d..a23c4e505 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -91,8 +91,7 @@ SET(RSPAMDSRC controller.c
fuzzy_storage.c
rspamd.c
worker.c
- rspamd_proxy.c
- log_helper.c)
+ rspamd_proxy.c)
SET(PLUGINSSRC plugins/surbl.c
plugins/regexp.c
@@ -103,7 +102,7 @@ SET(PLUGINSSRC plugins/surbl.c
libserver/rspamd_control.c)
SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller fuzzy lua rspamd_proxy log_helper)
+SET(WORKERS_LIST normal controller fuzzy rspamd_proxy)
IF (ENABLE_HYPERSCAN MATCHES "ON")
LIST(APPEND WORKERS_LIST "hs_helper")
LIST(APPEND RSPAMDSRC "hs_helper.c")
diff --git a/src/rspamadm/CMakeLists.txt b/src/rspamadm/CMakeLists.txt
index b98b99473..3d4f2f490 100644
--- a/src/rspamadm/CMakeLists.txt
+++ b/src/rspamadm/CMakeLists.txt
@@ -16,8 +16,7 @@ SET(RSPAMADMSRC rspamadm.c
${CMAKE_SOURCE_DIR}/src/controller.c
${CMAKE_SOURCE_DIR}/src/fuzzy_storage.c
${CMAKE_SOURCE_DIR}/src/worker.c
- ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c
- ${CMAKE_SOURCE_DIR}/src/log_helper.c)
+ ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR})
IF (ENABLE_HYPERSCAN MATCHES "ON")
LIST(APPEND RSPAMADMSRC "${CMAKE_SOURCE_DIR}/src/hs_helper.c")
diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c
index 33805d66b..a95521bdb 100644
--- a/src/rspamadm/lua_repl.c
+++ b/src/rspamadm/lua_repl.c
@@ -296,7 +296,7 @@ wait_session_events (void)
{
/* XXX: it's probably worth to add timeout here - not to wait forever */
while (rspamd_session_events_pending (rspamadm_session) > 0) {
- event_base_loop (rspamd_main->event_loop, EVLOOP_ONCE);
+ ev_loop (rspamd_main->event_loop, EVLOOP_ONESHOT);
}
}
diff --git a/src/rspamd.c b/src/rspamd.c
index a4a659a53..813b7b7bb 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -358,21 +358,22 @@ reread_config (struct rspamd_main *rspamd_main)
struct waiting_worker {
struct rspamd_main *rspamd_main;
- struct event wait_ev;
+ struct ev_timer wait_ev;
struct rspamd_worker_conf *cf;
guint oldindex;
};
static void
-rspamd_fork_delayed_cb (gint signo, short what, gpointer arg)
+rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents)
{
- struct waiting_worker *w = arg;
-
- event_del (&w->wait_ev);
- rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex,
- w->rspamd_main->event_loop);
- REF_RELEASE (w->cf);
- g_free (w);
+ struct waiting_worker *waiting_worker = (struct waiting_worker *)w->data;
+
+ ev_timer_stop (EV_A_ &waiting_worker->wait_ev);
+ rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf,
+ waiting_worker->oldindex,
+ waiting_worker->rspamd_main->event_loop);
+ REF_RELEASE (waiting_worker->cf);
+ g_free (waiting_worker);
}
static void
@@ -390,9 +391,8 @@ rspamd_fork_delayed (struct rspamd_worker_conf *cf,
tv.tv_sec = SOFT_FORK_TIME;
tv.tv_usec = 0;
REF_RETAIN (cf);
- event_set (&nw->wait_ev, -1, EV_TIMEOUT, rspamd_fork_delayed_cb, nw);
- event_base_set (rspamd_main->event_loop, &nw->wait_ev);
- event_add (&nw->wait_ev, &tv);
+ ev_timer_init (&nw->wait_ev, rspamd_fork_delayed_cb, SOFT_FORK_TIME, 0.0);
+ ev_timer_start (rspamd_main->event_loop, &nw->wait_ev);
}
static GList *
@@ -803,7 +803,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
if (w->tmp_data) {
g_free (w->tmp_data);
}
- event_del (&w->srv_ev);
+ ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
}
if (w->finish_actions) {
@@ -1026,9 +1026,9 @@ rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents)
}
static void
-rspamd_cld_handler (gint signo, short what, gpointer arg)
+rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
{
- struct rspamd_main *rspamd_main = arg;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
guint i;
gint res = 0;
struct rspamd_worker *cur;
@@ -1135,7 +1135,7 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
if (cur->tmp_data) {
g_free (cur->tmp_data);
}
- event_del (&cur->srv_ev);
+ ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
}
if (cur->control_pipe[0] != -1) {
@@ -1166,29 +1166,29 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
}
static void
-rspamd_final_term_handler (gint signo, short what, gpointer arg)
+rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_main *rspamd_main = arg;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
term_attempts--;
g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
if (g_hash_table_size (rspamd_main->workers) == 0) {
- event_base_loopexit (rspamd_main->event_loop, NULL);
+ ev_break (rspamd_main->event_loop, EVBREAK_ALL);
}
}
/* Control socket handler */
static void
-rspamd_control_handler (gint fd, short what, gpointer arg)
+rspamd_control_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_main *rspamd_main = arg;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
rspamd_inet_addr_t *addr;
gint nfd;
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) {
msg_warn_main ("accept failed: %s", strerror (errno));
return;
}
@@ -1246,7 +1246,6 @@ main (gint argc, gchar **argv, gchar **env)
struct ev_loop *event_loop;
ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
ev_io control_ev;
- struct timeval term_tv;
struct rspamd_main *rspamd_main;
gboolean skip_pid = FALSE, valgrind_mode = FALSE;
@@ -1524,10 +1523,9 @@ main (gint argc, gchar **argv, gchar **env)
/* XXX: deal with children */
- evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, rspamd_main);
- event_base_set (event_loop, &cld_ev);
- event_add (&cld_ev, NULL);
-
+ cld_ev.data = rspamd_main;
+ ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD);
+ ev_signal_start (event_loop, &cld_ev);
rspamd_check_core_limits (rspamd_main);
rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
@@ -1545,18 +1543,17 @@ main (gint argc, gchar **argv, gchar **env)
ev_io_start (event_loop, &control_ev);
}
- event_base_loop (event_loop, 0);
+ ev_loop (event_loop, 0);
/* We need to block signals unless children are waited for */
rspamd_worker_block_signals ();
-
- event_del (&term_ev);
- event_del (&int_ev);
- event_del (&hup_ev);
- event_del (&cld_ev);
- event_del (&usr1_ev);
+ ev_signal_stop (event_loop, &term_ev);
+ ev_signal_stop (event_loop, &int_ev);
+ ev_signal_stop (event_loop, &hup_ev);
+ ev_signal_stop (event_loop, &cld_ev);
+ ev_signal_stop (event_loop, &usr1_ev);
if (control_fd != -1) {
- event_del (&control_ev);
+ ev_io_stop (event_loop, &control_ev);
close (control_fd);
}
@@ -1568,20 +1565,17 @@ main (gint argc, gchar **argv, gchar **env)
term_attempts = TERMINATION_ATTEMPTS;
}
- /* Check each 200 ms */
- term_tv.tv_sec = 0;
- term_tv.tv_usec = 200000;
/* Wait for workers termination */
g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
- event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST,
- rspamd_final_term_handler, rspamd_main);
- event_base_set (event_loop, &term_ev);
- event_add (&term_ev, &term_tv);
+ static ev_timer ev_finale;
+
+ ev_finale.data = rspamd_main;
+ ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2);
+ ev_timer_start (event_loop, &ev_finale);
- event_base_loop (event_loop, 0);
- event_del (&term_ev);
+ ev_loop (event_loop, 0);
/* Maybe save roll history */
if (rspamd_main->cfg->history_file) {
@@ -1602,7 +1596,7 @@ main (gint argc, gchar **argv, gchar **env)
}
g_free (rspamd_main);
- event_base_free (event_loop);
+ ev_unref (event_loop);
sqlite3_shutdown ();
if (control_addr) {
More information about the Commits
mailing list