commit 675b33d: [Project] Adopt normal worker and contorller

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:30 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-19 17:07:56 +0100
URL: https://github.com/rspamd/rspamd/commit/675b33dd2025cc1f8e732efa9ffc72d55e5a35d9

[Project] Adopt normal worker and contorller

---
 src/controller.c                     | 89 ++++++++++++++++-----------------
 src/libserver/task.c                 |  9 +---
 src/libserver/task.h                 |  2 +-
 src/libserver/worker_util.c          |  8 +--
 src/libstat/backends/redis_backend.c |  5 +-
 src/rspamadm/control.c               |  6 +--
 src/worker.c                         | 96 ++++++++++++------------------------
 src/worker_private.h                 | 11 ++---
 8 files changed, 87 insertions(+), 139 deletions(-)

diff --git a/src/controller.c b/src/controller.c
index 851087945..374880952 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -134,8 +134,7 @@ struct rspamd_controller_worker_ctx {
 	/* Config */
 	struct rspamd_config *cfg;
 	/* END OF COMMON PART */
-	guint32 timeout;
-	struct timeval io_tv;
+	ev_tstamp timeout;
 	/* Whether we use ssl for this server */
 	gboolean use_ssl;
 	/* Webui password */
@@ -728,7 +727,7 @@ rspamd_controller_handle_auth (struct rspamd_http_connection_entry *conn_ent,
 	data[4] = st->actions_stat[METRIC_ACTION_SOFT_REJECT];
 
 	/* Get uptime */
-	uptime = time (NULL) - session->ctx->start_time;
+	uptime = ev_time () - session->ctx->start_time;
 
 	ucl_object_insert_key (obj, ucl_object_fromstring (
 			RVERSION),			   "version",  0, false);
@@ -996,7 +995,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
 	struct rspamd_controller_session *session = conn_ent->ud;
 	GList *cur;
 	struct rspamd_map *map;
-	struct rspamd_map_backend *bk;
+	struct rspamd_map_backend *bk = NULL;
 	const rspamd_ftok_t *idstr;
 	struct stat st;
 	gint fd;
@@ -1037,7 +1036,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
 		cur = g_list_next (cur);
 	}
 
-	if (!found) {
+	if (!found || bk == NULL) {
 		msg_info_session ("map not found");
 		rspamd_controller_send_error (conn_ent, 404, "Map not found");
 		return 0;
@@ -1075,7 +1074,7 @@ rspamd_controller_handle_get_map (struct rspamd_http_connection_entry *conn_ent,
 	rspamd_http_router_insert_headers (conn_ent->rt, reply);
 	rspamd_http_connection_write_message (conn_ent->conn, reply, NULL,
 			"text/plain", conn_ent,
-			conn_ent->rt->ptv);
+			conn_ent->rt->timeout);
 	conn_ent->is_reply = TRUE;
 
 	return 0;
@@ -1385,13 +1384,13 @@ rspamd_controller_handle_legacy_history (
 		row = &copied_rows[row_num];
 		/* Get only completed rows */
 		if (row->completed) {
-			rspamd_localtime (row->tv.tv_sec, &tm);
+			rspamd_localtime (row->timestamp, &tm);
 			strftime (timebuf, sizeof (timebuf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
 			obj = ucl_object_typed_new (UCL_OBJECT);
 			ucl_object_insert_key (obj, ucl_object_fromstring (
 					timebuf),		  "time", 0, false);
 			ucl_object_insert_key (obj, ucl_object_fromint (
-					row->tv.tv_sec), "unix_time", 0, false);
+					row->timestamp), "unix_time", 0, false);
 			ucl_object_insert_key (obj, ucl_object_fromstring (
 					row->message_id), "id",	  0, false);
 			ucl_object_insert_key (obj, ucl_object_fromstring (row->from_addr),
@@ -1935,7 +1934,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task)
 	rspamd_http_connection_reset (conn_ent->conn);
 	rspamd_http_router_insert_headers (conn_ent->rt, msg);
 	rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
-			"application/json", conn_ent, conn_ent->rt->ptv);
+			"application/json", conn_ent, conn_ent->rt->timeout);
 	conn_ent->is_reply = TRUE;
 }
 
@@ -2125,13 +2124,10 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
 	}
 
 	if (ctx->task_timeout > 0.0) {
-		struct timeval task_tv;
-
-		event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-				task);
-		event_base_set (ctx->event_loop, &task->timeout_ev);
-		double_to_tv (ctx->task_timeout, &task_tv);
-		event_add (&task->timeout_ev, &task_tv);
+		task->timeout_ev.data = task;
+		ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+				ctx->task_timeout, 0.0);
+		ev_timer_start (task->event_loop, &task->timeout_ev);
 	}
 
 end:
@@ -2210,6 +2206,7 @@ rspamd_controller_handle_saveactions (
 
 		switch (i) {
 		case 0:
+		default:
 			act = METRIC_ACTION_REJECT;
 			break;
 		case 1:
@@ -2404,7 +2401,7 @@ rspamd_controller_handle_savemap (struct rspamd_http_connection_entry *conn_ent,
 {
 	struct rspamd_controller_session *session = conn_ent->ud;
 	GList *cur;
-	struct rspamd_map *map;
+	struct rspamd_map *map = NULL;
 	struct rspamd_map_backend *bk;
 	struct rspamd_controller_worker_ctx *ctx;
 	const rspamd_ftok_t *idstr;
@@ -2903,7 +2900,7 @@ rspamd_controller_handle_ping (struct rspamd_http_connection_entry *conn_ent,
 			NULL,
 			"text/plain",
 			conn_ent,
-			conn_ent->rt->ptv);
+			conn_ent->rt->timeout);
 	conn_ent->is_reply = TRUE;
 
 	return 0;
@@ -2937,7 +2934,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
 				NULL,
 				"text/plain",
 				conn_ent,
-				conn_ent->rt->ptv);
+				conn_ent->rt->timeout);
 		conn_ent->is_reply = TRUE;
 	}
 	else {
@@ -2953,7 +2950,7 @@ rspamd_controller_handle_unknown (struct rspamd_http_connection_entry *conn_ent,
 				NULL,
 				"text/plain",
 				conn_ent,
-				conn_ent->rt->ptv);
+				conn_ent->rt->timeout);
 		conn_ent->is_reply = TRUE;
 	}
 
@@ -3077,9 +3074,9 @@ rspamd_controller_finish_handler (struct rspamd_http_connection_entry *conn_ent)
 }
 
 static void
-rspamd_controller_accept_socket (gint fd, short what, void *arg)
+rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+	struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
 	struct rspamd_controller_worker_ctx *ctx;
 	struct rspamd_controller_session *session;
 	rspamd_inet_addr_t *addr;
@@ -3088,7 +3085,8 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
 	ctx = worker->ctx;
 
 	if ((nfd =
-		rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+		rspamd_accept_from_socket (w->fd, &addr,
+				rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
 		msg_warn_ctx ("accept failed: %s", strerror (errno));
 		return;
 	}
@@ -3113,9 +3111,10 @@ rspamd_controller_accept_socket (gint fd, short what, void *arg)
 }
 
 static void
-rspamd_controller_rrd_update (gint fd, short what, void *arg)
+rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_controller_worker_ctx *ctx = arg;
+	struct rspamd_controller_worker_ctx *ctx =
+			(struct rspamd_controller_worker_ctx *)w->data;
 	struct rspamd_stat *stat;
 	GArray ar;
 	gdouble points[METRIC_ACTION_MAX];
@@ -3139,8 +3138,7 @@ rspamd_controller_rrd_update (gint fd, short what, void *arg)
 	}
 
 	/* Plan new event */
-	event_del (ctx->rrd_event);
-	evtimer_add (ctx->rrd_event, &rrd_update_time);
+	ev_timer_again (ctx->event_loop, &ctx->rrd_event);
 }
 
 static void
@@ -3278,11 +3276,13 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 }
 
 static void
-rspamd_controller_stats_save_periodic (int fd, short what, gpointer ud)
+rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_controller_worker_ctx *ctx = ud;
+	struct rspamd_controller_worker_ctx *ctx =
+			(struct rspamd_controller_worker_ctx *)w->data;
 
 	rspamd_controller_store_saved_stats (ctx);
+	ev_timer_again (EV_A_ w);
 }
 
 static void
@@ -3375,7 +3375,7 @@ init_controller_worker (struct rspamd_config *cfg)
 			ctx,
 			G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
 					timeout),
-			RSPAMD_CL_FLAG_TIME_INTEGER,
+			RSPAMD_CL_FLAG_TIME_FLOAT,
 			"Protocol timeout");
 
 	rspamd_rcl_register_worker_option (cfg,
@@ -3573,7 +3573,7 @@ rspamd_controller_on_terminate (struct rspamd_worker *worker)
 
 	if (ctx->rrd) {
 		msg_info ("closing rrd file: %s", ctx->rrd->filename);
-		event_del (ctx->rrd_event);
+		ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
 		rspamd_rrd_close (ctx->rrd);
 	}
 
@@ -3694,16 +3694,14 @@ start_controller_worker (struct rspamd_worker *worker)
 	GHashTableIter iter;
 	gpointer key, value;
 	guint i;
-	struct timeval stv;
-	const guint save_stats_interval = 60 * 1000; /* 1 minute */
+	const ev_tstamp save_stats_interval = 60; /* 1 minute */
 	gpointer m;
 
 	ctx->event_loop = rspamd_prepare_worker (worker,
 			"controller",
 			rspamd_controller_accept_socket);
-	msec_to_tv (ctx->timeout, &ctx->io_tv);
 
-	ctx->start_time = time (NULL);
+	ctx->start_time = ev_time ();
 	ctx->worker = worker;
 	ctx->cfg = worker->srv->cfg;
 	ctx->srv = worker->srv;
@@ -3746,10 +3744,10 @@ start_controller_worker (struct rspamd_worker *worker)
 		ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
 
 		if (ctx->rrd) {
-			ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
-			evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
-			event_base_set (ctx->event_loop, ctx->rrd_event);
-			event_add (ctx->rrd_event, &rrd_update_time);
+			ctx->rrd_event.data = ctx;
+			ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update,
+					rrd_update_time, rrd_update_time);
+			ev_timer_start (ctx->event_loop, &ctx->rrd_event);
 		}
 		else if (rrd_err) {
 			msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
@@ -3772,7 +3770,7 @@ start_controller_worker (struct rspamd_worker *worker)
 	ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
 			ctx->cfg->ups_ctx);
 	ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
-			rspamd_controller_finish_handler, &ctx->io_tv,
+			rspamd_controller_finish_handler, ctx->timeout,
 			ctx->static_files_dir, ctx->http_ctx);
 
 	/* Add callbacks for different methods */
@@ -3903,12 +3901,11 @@ start_controller_worker (struct rspamd_worker *worker)
 				ctx->resolver, worker, TRUE);
 
 		/* Schedule periodic stats saving, see #1823 */
-		event_set (&ctx->save_stats_event, -1, EV_PERSIST,
+		ctx->save_stats_event.data = ctx;
+		ev_timer_init (&ctx->save_stats_event,
 				rspamd_controller_stats_save_periodic,
-				ctx);
-		event_base_set (ctx->event_loop, &ctx->save_stats_event);
-		msec_to_tv (save_stats_interval, &stv);
-		evtimer_add (&ctx->save_stats_event, &stv);
+				save_stats_interval, save_stats_interval);
+		ev_timer_start (ctx->event_loop, &ctx->save_stats_event);
 	}
 	else {
 		rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
@@ -3918,7 +3915,7 @@ start_controller_worker (struct rspamd_worker *worker)
 	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
 
 	/* Start event loop */
-	event_base_loop (ctx->event_loop, 0);
+	ev_loop (ctx->event_loop, 0);
 	rspamd_worker_block_signals ();
 
 	rspamd_stat_close ();
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 84ea1417a..3c92e05b9 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -316,13 +316,8 @@ rspamd_task_free (struct rspamd_task *task)
 			g_error_free (task->err);
 		}
 
-		if (rspamd_event_pending (&task->timeout_ev, EV_TIMEOUT)) {
-			event_del (&task->timeout_ev);
-		}
-
-		if (task->guard_ev) {
-			event_del (task->guard_ev);
-		}
+		ev_timer_stop (task->event_loop, &task->timeout_ev);
+		ev_io_stop (task->event_loop, &task->guard_ev);
 
 		if (task->sock != -1) {
 			close (task->sock);
diff --git a/src/libserver/task.h b/src/libserver/task.h
index a73102424..7b30f97cd 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -200,7 +200,7 @@ struct rspamd_task {
 	struct rspamd_dns_resolver *resolver;			/**< DNS resolver									*/
 	struct ev_loop *event_loop;						/**< Event base										*/
 	struct ev_timer timeout_ev;						/**< Global task timeout							*/
-	struct ev_io *guard_ev;							/**< Event for input sanity guard 					*/
+	struct ev_io guard_ev;							/**< Event for input sanity guard 					*/
 
 	gpointer checkpoint;							/**< Opaque checkpoint data							*/
 	ucl_object_t *settings;							/**< Settings applied to task						*/
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 0aa0c9cf3..70d349c2c 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -274,10 +274,6 @@ void
 rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
 {
 	struct sigaction signals;
-	/* We ignore these signals in the worker */
-	rspamd_worker_ignore_signal (SIGPIPE);
-	rspamd_worker_ignore_signal (SIGALRM);
-	rspamd_worker_ignore_signal (SIGCHLD);
 
 	/* A set of terminating signals */
 	rspamd_worker_set_signal_handler (SIGTERM, worker, base,
@@ -298,11 +294,8 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
 	sigaddset (&signals.sa_mask, SIGTERM);
 	sigaddset (&signals.sa_mask, SIGINT);
 	sigaddset (&signals.sa_mask, SIGHUP);
-	sigaddset (&signals.sa_mask, SIGCHLD);
 	sigaddset (&signals.sa_mask, SIGUSR1);
 	sigaddset (&signals.sa_mask, SIGUSR2);
-	sigaddset (&signals.sa_mask, SIGALRM);
-	sigaddset (&signals.sa_mask, SIGPIPE);
 
 	sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 }
@@ -345,6 +338,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
 			if (ls->fd != -1) {
 				accept_ev = g_malloc0 (sizeof (*accept_ev));
 				accept_ev->event_loop = event_loop;
+				accept_ev->accept_ev.data = worker;
 				ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
 				ev_io_start (event_loop, &accept_ev->accept_ev);
 
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 4e0d806f9..5d8ccc065 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -1039,9 +1039,7 @@ rspamd_redis_fin_learn (gpointer data)
 
 	rt->has_event = FALSE;
 	/* Stop timeout */
-	if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
-		event_del (&rt->timeout_event);
-	}
+	ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
 
 	if (rt->redis) {
 		redis = rt->redis;
@@ -1654,7 +1652,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
 	struct upstream *up;
 	struct upstream_list *ups;
 	rspamd_inet_addr_t *addr;
-	struct timeval tv;
 	rspamd_fstring_t *query;
 	const gchar *redis_cmd;
 	rspamd_token_t *tok;
diff --git a/src/rspamadm/control.c b/src/rspamadm/control.c
index 8a42bdac1..754d874a2 100644
--- a/src/rspamadm/control.c
+++ b/src/rspamadm/control.c
@@ -173,7 +173,6 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
 	struct rspamd_http_connection *conn;
 	struct rspamd_http_message *msg;
 	rspamd_inet_addr_t *addr;
-	struct timeval tv;
 	static struct rspamadm_control_cbdata cbdata;
 
 	context = g_option_context_new (
@@ -239,16 +238,15 @@ rspamadm_control (gint argc, gchar **argv, const struct rspamadm_command *_cmd)
 			addr);
 	msg = rspamd_http_new_message (HTTP_REQUEST);
 	msg->url = rspamd_fstring_new_init (path, strlen (path));
-	double_to_tv (timeout, &tv);
 
 	cbdata.argc = argc;
 	cbdata.argv = argv;
 	cbdata.path = path;
 
 	rspamd_http_connection_write_message (conn, msg, NULL, NULL, &cbdata,
-			&tv);
+			timeout);
 
-	event_base_loop (rspamd_main->event_loop, 0);
+	ev_loop (rspamd_main->event_loop, 0);
 
 	rspamd_http_connection_unref (conn);
 	rspamd_inet_address_free (addr);
diff --git a/src/worker.c b/src/worker.c
index 5a4adb325..ad0782b17 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -42,7 +42,7 @@
 #include "lua/lua_common.h"
 
 /* 60 seconds for worker's IO */
-#define DEFAULT_WORKER_IO_TIMEOUT 60000
+#define DEFAULT_WORKER_IO_TIMEOUT 60.0
 
 gpointer init_worker (struct rspamd_config *cfg);
 void start_worker (struct rspamd_worker *worker);
@@ -73,11 +73,10 @@ static gboolean
 rspamd_worker_finalize (gpointer user_data)
 {
 	struct rspamd_task *task = user_data;
-	struct timeval tv = {.tv_sec = 0, .tv_usec = 0};
 
 	if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
 		msg_info_task ("finishing actions has been processed, terminating");
-		event_base_loopexit (task->event_loop, &tv);
+		ev_break (task->event_loop, EVBREAK_ALL);
 		rspamd_session_destroy (task->s);
 
 		return TRUE;
@@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg)
 }
 
 void
-rspamd_task_timeout (gint fd, short what, gpointer ud)
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_task *task = (struct rspamd_task *) ud;
+	struct rspamd_task *task = (struct rspamd_task *)w->data;
 
 	if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
 		msg_info_task ("processing of task timed out, forced processing");
@@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
 }
 
 void
-rspamd_worker_guard_handler (gint fd, short what, void *data)
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_task *task = data;
+	struct rspamd_task *task = (struct rspamd_task *)w->data;
 	gchar fake_buf[1024];
 	gssize r;
 
-#ifdef EV_CLOSED
-	if (what == EV_CLOSED) {
-		if (!(task->flags & RSPAMD_TASK_FLAG_JSON) &&
-				task->cfg->enable_shutdown_workaround) {
-			msg_info_task ("workaround for shutdown enabled, please update "
-					"your client, this support might be removed in future");
-			shutdown (fd, SHUT_RD);
-			event_del (task->guard_ev);
-			task->guard_ev = NULL;
-		}
-		else {
-			msg_err_task ("the peer has closed connection unexpectedly");
-			rspamd_session_destroy (task->s);
-		}
-
-		return;
-	}
-#endif
-
-	r = read (fd, fake_buf, sizeof (fake_buf));
+	r = read (w->fd, fake_buf, sizeof (fake_buf));
 
 	if (r > 0) {
 		msg_warn_task ("received extra data after task is loaded, ignoring");
@@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data)
 					task->cfg->enable_shutdown_workaround) {
 				msg_info_task ("workaround for shutdown enabled, please update "
 						"your client, this support might be removed in future");
-				shutdown (fd, SHUT_RD);
-				event_del (task->guard_ev);
-				task->guard_ev = NULL;
+				shutdown (w->fd, SHUT_RD);
+				ev_io_stop (task->event_loop, &task->guard_ev);
 			}
 			else {
 				msg_err_task ("the peer has closed connection unexpectedly");
@@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 {
 	struct rspamd_task *task = (struct rspamd_task *) conn->ud;
 	struct rspamd_worker_ctx *ctx;
-	struct timeval task_tv;
-	struct event *guard_ev;
 
 	ctx = task->worker->ctx;
 
@@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
 	/* Set global timeout for the task */
 	if (ctx->task_timeout > 0.0) {
-		event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-				task);
-		event_base_set (ctx->ev_base, &task->timeout_ev);
-		double_to_tv (ctx->task_timeout, &task_tv);
-		event_add (&task->timeout_ev, &task_tv);
+		task->timeout_ev.data = task;
+		ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+				ctx->task_timeout, 0.0);
+		ev_timer_start (task->event_loop, &task->timeout_ev);
 	}
 
 	/* Set socket guard */
-	guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
-#ifdef EV_CLOSED
-	event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
-				rspamd_worker_guard_handler, task);
-#else
-	event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
-			rspamd_worker_guard_handler, task);
-#endif
-	event_base_set (task->event_loop, guard_ev);
-	event_add (guard_ev, NULL);
-	task->guard_ev = guard_ev;
+	task->guard_ev.data = task;
+	ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+	ev_io_start (task->event_loop, &task->guard_ev);
 
 	rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
 
@@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
  * Accept new connection and construct task
  */
 static void
-accept_socket (gint fd, short what, void *arg)
+accept_socket (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+	struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
 	struct rspamd_worker_ctx *ctx;
 	struct rspamd_task *task;
 	rspamd_inet_addr_t *addr;
@@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg)
 	}
 
 	if ((nfd =
-		rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+		rspamd_accept_from_socket (w->fd, &addr,
+				rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
 		msg_warn_ctx ("accept failed: %s", strerror (errno));
 		return;
 	}
@@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg)
 		return;
 	}
 
-	task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base);
+	task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
 
 	msg_info_task ("accepted connection from %s port %d, task ptr: %p",
 		rspamd_inet_address_to_string (addr),
@@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg)
 
 	rspamd_http_connection_read_message (task->http_conn,
 			task,
-			&ctx->io_tv);
+			ctx->timeout);
 }
 
 #ifdef WITH_HYPERSCAN
@@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg)
 			ctx,
 			G_STRUCT_OFFSET (struct rspamd_worker_ctx,
 						timeout),
-			RSPAMD_CL_FLAG_TIME_INTEGER,
+			RSPAMD_CL_FLAG_TIME_FLOAT,
 			"Protocol IO timeout");
 
 	rspamd_rcl_register_worker_option (cfg,
@@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker)
 	struct rspamd_worker_ctx *ctx = worker->ctx;
 
 	ctx->cfg = worker->srv->cfg;
-	ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
-	msec_to_tv (ctx->timeout, &ctx->io_tv);
-	rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+	ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
+	rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
 			worker);
 
 	if (isnan (ctx->task_timeout)) {
@@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker)
 	}
 
 	ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-			ctx->ev_base,
+			ctx->event_loop,
 			worker->srv->cfg);
-	rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+	rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
 	rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
-			ctx->ev_base, ctx->resolver->r);
+			ctx->event_loop, ctx->resolver->r);
 
-	ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+	ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
 			ctx->cfg->ups_ctx);
-	rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+	rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
 			&ctx->lang_det);
-	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
 			worker);
 
-	event_base_loop (ctx->ev_base, 0);
+	ev_loop (ctx->event_loop, 0);
 	rspamd_worker_block_signals ();
 
 	rspamd_stat_close ();
diff --git a/src/worker_private.h b/src/worker_private.h
index 35a2b465b..6d0e763aa 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -30,14 +30,13 @@ struct rspamd_lang_detector;
 struct rspamd_worker_ctx {
 	guint64 magic;
 	/* Events base */
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	/* DNS resolver */
 	struct rspamd_dns_resolver *resolver;
 	/* Config */
 	struct rspamd_config *cfg;
 
-	guint32 timeout;
-	struct timeval io_tv;
+	ev_tstamp timeout;
 	/* Detect whether this worker is mime worker    */
 	gboolean is_mime;
 	/* Allow encrypted requests only using network */
@@ -45,7 +44,7 @@ struct rspamd_worker_ctx {
 	/* Limit of tasks */
 	guint32 max_tasks;
 	/* Maximum time for task processing */
-	gdouble task_timeout;
+	ev_tstamp task_timeout;
 	/* Encryption key */
 	struct rspamd_cryptobox_keypair *key;
 	/* Keys cache */
@@ -64,11 +63,11 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
 /*
  * Called on forced timeout
  */
-void rspamd_task_timeout (gint fd, short what, gpointer ud);
+void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
 
 /*
  * Called on unexpected IO error (e.g. ECONNRESET)
  */
-void rspamd_worker_guard_handler (gint fd, short what, void *data);
+void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
 
 #endif


More information about the Commits mailing list