commit dc96f9b: [Project] Make it compileable again...

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:34 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-19 18:23:46 +0100
URL: https://github.com/rspamd/rspamd/commit/dc96f9b37ff98c12b7aeacd348162950c129098f

[Project] Make it compileable again...

---
 CMakeLists.txt               |  7 +---
 contrib/libev/CMakeLists.txt |  2 +-
 src/client/rspamc.c          | 25 ++++++-------
 src/hs_helper.c              | 32 ++++++++---------
 src/libutil/upstream.c       |  4 +--
 src/lua/lua_config.c         | 11 +++---
 src/rspamadm/control.c       |  5 +--
 src/rspamadm/lua_repl.c      | 23 ++++++------
 src/rspamadm/rspamadm.c      | 15 ++------
 src/rspamd_proxy.c           | 86 +++++++++++++++++++-------------------------
 10 files changed, 86 insertions(+), 124 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index d5f37900a..fec771663 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -864,11 +864,6 @@ CHECK_INCLUDE_FILES(readpassphrase.h HAVE_READPASSPHRASE_H)
 CHECK_INCLUDE_FILES(termios.h HAVE_TERMIOS_H)
 CHECK_INCLUDE_FILES(paths.h HAVE_PATHS_H)
 CHECK_INCLUDE_FILES(ctype.h HAVE_CTYPE_H)
-CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
-CHECK_INCLUDE_FILES(linux/falloc.h HAVE_LINUX_FALLOC_H)
-CHECK_INCLUDE_FILES(sys/eventfd.h HAVE_SYS_EVENTFD_H)
-CHECK_INCLUDE_FILES(aio.h HAVE_AIO_H)
-CHECK_INCLUDE_FILES(libaio.h HAVE_LIBAIO_H)
 CHECK_INCLUDE_FILES(unistd.h HAVE_UNISTD_H)
 CHECK_INCLUDE_FILES(cpuid.h HAVE_CPUID_H)
 CHECK_INCLUDE_FILES(dirent.h HAVE_DIRENT_H)
@@ -1185,7 +1180,6 @@ LIST(APPEND RSPAMD_REQUIRED_LIBRARIES "${LUA_LIBRARY}")
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES ucl)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rdns)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES ottery)
-LIST(APPEND RSPAMD_REQUIRED_LIBRARIES event)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES xxhash)
 
 IF(GLIB_COMPAT)
@@ -1219,6 +1213,7 @@ LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-hiredis)
 
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-actrie)
 LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-t1ha)
+LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-ev)
 
 IF(ENABLE_CLANG_PLUGIN MATCHES "ON")
 	ADD_SUBDIRECTORY(clang-plugin)
diff --git a/contrib/libev/CMakeLists.txt b/contrib/libev/CMakeLists.txt
index c99b4dd32..d363c3dbc 100644
--- a/contrib/libev/CMakeLists.txt
+++ b/contrib/libev/CMakeLists.txt
@@ -55,7 +55,7 @@ ENDIF()
 
 CONFIGURE_FILE(config.h.in libev-config.h)
 
-ADD_LIBRARY(rspamd-libev STATIC ${LIBEVSRC})
+ADD_LIBRARY(rspamd-ev STATIC ${LIBEVSRC})
 ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
 		-DEV_MULTIPLICITY=1
 		-DEV_USE_FLOOR=1
diff --git a/src/client/rspamc.c b/src/client/rspamc.c
index 3bfc785d8..cc339ef7a 100644
--- a/src/client/rspamc.c
+++ b/src/client/rspamc.c
@@ -1829,7 +1829,7 @@ rspamc_process_dir (struct ev_loop *ev_base, struct rspamc_command *cmd,
 				if (cur_req >= max_requests) {
 					cur_req = 0;
 					/* Wait for completion */
-					event_base_loop (ev_base, 0);
+					ev_loop (ev_base, 0);
 				}
 			}
 		}
@@ -1840,7 +1840,7 @@ rspamc_process_dir (struct ev_loop *ev_base, struct rspamc_command *cmd,
 	}
 
 	closedir (d);
-	event_base_loop (ev_base, 0);
+	ev_loop (ev_base, 0);
 }
 
 
@@ -1863,7 +1863,7 @@ main (gint argc, gchar **argv, gchar **env)
 	GPid cld;
 	struct rspamc_command *cmd;
 	FILE *in = NULL;
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	struct stat st;
 	struct sigaction sigpipe_act;
 	gchar **exclude_pattern;
@@ -1884,6 +1884,7 @@ main (gint argc, gchar **argv, gchar **env)
 	npatterns = 0;
 
 	while (exclude_pattern && *exclude_pattern) {
+		exclude_pattern ++;
 		npatterns ++;
 	}
 
@@ -1902,7 +1903,7 @@ main (gint argc, gchar **argv, gchar **env)
 	}
 
 	rspamd_init_libs ();
-	ev_base = event_base_new ();
+	event_loop = ev_default_loop (EVFLAG_SIGNALFD);
 
 	struct rspamd_http_context_cfg http_config;
 
@@ -1911,7 +1912,7 @@ main (gint argc, gchar **argv, gchar **env)
 	http_config.kp_cache_size_server = 0;
 	http_config.user_agent = user_agent;
 	http_ctx = rspamd_http_context_create_config (&http_config,
-			ev_base, NULL);
+			event_loop, NULL);
 
 	/* Ignore sigpipe */
 	sigemptyset (&sigpipe_act.sa_mask);
@@ -1972,10 +1973,10 @@ main (gint argc, gchar **argv, gchar **env)
 	if (start_argc == argc) {
 		/* Do command without input or with stdin */
 		if (empty_input) {
-			rspamc_process_input (ev_base, cmd, NULL, "empty", kwattrs);
+			rspamc_process_input (event_loop, cmd, NULL, "empty", kwattrs);
 		}
 		else {
-			rspamc_process_input (ev_base, cmd, in, "stdin", kwattrs);
+			rspamc_process_input (event_loop, cmd, in, "stdin", kwattrs);
 		}
 	}
 	else {
@@ -1990,7 +1991,7 @@ main (gint argc, gchar **argv, gchar **env)
 				}
 				if (S_ISDIR (st.st_mode)) {
 					/* Directories are processed with a separate limit */
-					rspamc_process_dir (ev_base, cmd, argv[i], kwattrs);
+					rspamc_process_dir (event_loop, cmd, argv[i], kwattrs);
 					cur_req = 0;
 				}
 				else {
@@ -1999,24 +2000,24 @@ main (gint argc, gchar **argv, gchar **env)
 						fprintf (stderr, "cannot open file %s\n", argv[i]);
 						exit (EXIT_FAILURE);
 					}
-					rspamc_process_input (ev_base, cmd, in, argv[i], kwattrs);
+					rspamc_process_input (event_loop, cmd, in, argv[i], kwattrs);
 					cur_req++;
 					fclose (in);
 				}
 				if (cur_req >= max_requests) {
 					cur_req = 0;
 					/* Wait for completion */
-					event_base_loop (ev_base, 0);
+					ev_loop (event_loop, 0);
 				}
 			}
 		}
 
 		if (cmd->cmd == RSPAMC_COMMAND_FUZZY_DELHASH) {
-			rspamc_process_input (ev_base, cmd, NULL, "hashes", kwattrs);
+			rspamc_process_input (event_loop, cmd, NULL, "hashes", kwattrs);
 		}
 	}
 
-	event_base_loop (ev_base, 0);
+	ev_loop (event_loop, 0);
 
 	g_queue_free_full (kwattrs, rspamc_kwattr_free);
 
diff --git a/src/hs_helper.c b/src/hs_helper.c
index 9f0a0ab33..f83a9d429 100644
--- a/src/hs_helper.c
+++ b/src/hs_helper.c
@@ -47,7 +47,7 @@ static const guint64 rspamd_hs_helper_magic = 0x22d310157a2288a0ULL;
 struct hs_helper_ctx {
 	guint64 magic;
 	/* Events base */
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	/* DNS resolver */
 	struct rspamd_dns_resolver *resolver;
 	/* Config */
@@ -57,7 +57,7 @@ struct hs_helper_ctx {
 	gboolean loaded;
 	gdouble max_time;
 	gdouble recompile_time;
-	struct event recompile_timer;
+	ev_timer recompile_timer;
 };
 
 static gpointer
@@ -216,7 +216,7 @@ rspamd_rs_compile (struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
 	 * XXX: now we just sleep for 5 seconds to ensure that
 	 */
 	if (!ctx->loaded) {
-		sleep (5);
+		ev_sleep (5.0);
 		ctx->loaded = TRUE;
 	}
 
@@ -226,7 +226,7 @@ rspamd_rs_compile (struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
 			sizeof (srv_cmd.cmd.hs_loaded.cache_dir));
 	srv_cmd.cmd.hs_loaded.forced = forced;
 
-	rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, -1, NULL, NULL);
+	rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
 
 	return TRUE;
 }
@@ -258,26 +258,23 @@ rspamd_hs_helper_reload (struct rspamd_main *rspamd_main,
 }
 
 static void
-rspamd_hs_helper_timer (gint fd, short what, gpointer ud)
+rspamd_hs_helper_timer (EV_P_ ev_timer *w, int revents)
 {
-	struct rspamd_worker *worker = ud;
+	struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
 	struct hs_helper_ctx *ctx;
-	struct timeval tv;
 	double tim;
 
 	ctx = worker->ctx;
 	tim = rspamd_time_jitter (ctx->recompile_time, 0);
-	double_to_tv (tim, &tv);
-	event_del (&ctx->recompile_timer);
+	w->repeat = tim;
 	rspamd_rs_compile (ctx, worker, FALSE);
-	event_add (&ctx->recompile_timer, &tv);
+	ev_timer_again (EV_A_ w);
 }
 
 static void
 start_hs_helper (struct rspamd_worker *worker)
 {
 	struct hs_helper_ctx *ctx = worker->ctx;
-	struct timeval tv;
 	double tim;
 
 	ctx->cfg = worker->srv->cfg;
@@ -289,7 +286,7 @@ start_hs_helper (struct rspamd_worker *worker)
 		ctx->hs_dir = RSPAMD_DBDIR "/";
 	}
 
-	ctx->ev_base = rspamd_prepare_worker (worker,
+	ctx->event_loop = rspamd_prepare_worker (worker,
 			"hs_helper",
 			NULL);
 
@@ -301,13 +298,12 @@ start_hs_helper (struct rspamd_worker *worker)
 	rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RECOMPILE,
 			rspamd_hs_helper_reload, ctx);
 
-	event_set (&ctx->recompile_timer, -1, EV_TIMEOUT, rspamd_hs_helper_timer,
-			worker);
-	event_base_set (ctx->ev_base, &ctx->recompile_timer);
+	ctx->recompile_timer.data = worker;
 	tim = rspamd_time_jitter (ctx->recompile_time, 0);
-	double_to_tv (tim, &tv);
-	event_add (&ctx->recompile_timer, &tv);
-	event_base_loop (ctx->ev_base, 0);
+	ev_timer_init (&ctx->recompile_timer, rspamd_hs_helper_timer, tim, 0.0);
+	ev_timer_start (ctx->event_loop, &ctx->recompile_timer);
+
+	ev_loop (ctx->event_loop, 0);
 	rspamd_worker_block_signals ();
 
 	rspamd_log_close (worker->srv->logger, TRUE);
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 263f52511..c445751b4 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -913,9 +913,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
 	/* Here the upstreams list is already locked */
 	RSPAMD_UPSTREAM_LOCK (up->lock);
 
-	if (rspamd_event_pending (&up->ev, EV_TIMEOUT)) {
-		event_del (&up->ev);
-	}
+	ev_timer_stop (up->ctx->event_loop, &up->ev);
 	g_ptr_array_add (ups->alive, up);
 	up->active_idx = ups->alive->len - 1;
 	RSPAMD_UPSTREAM_UNLOCK (up->lock);
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 05c38ff32..9f7952cc3 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -3050,7 +3050,7 @@ static void lua_periodic_callback_finish (struct thread_entry *thread, int ret);
 static void lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *msg);
 
 struct rspamd_lua_periodic {
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	struct rspamd_config *cfg;
 	lua_State *L;
 	gdouble timeout;
@@ -3082,7 +3082,7 @@ lua_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
 	*pcfg = cfg;
 	pev_base = lua_newuserdata (L, sizeof (*pev_base));
 	rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
-	*pev_base = periodic->ev_base;
+	*pev_base = periodic->event_loop;
 
 	lua_thread_call (thread, 2);
 }
@@ -3097,6 +3097,7 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
 
 	L = thread->lua_state;
 
+	ev_now_update (periodic->event_loop);
 #ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
 	event_base_update_cache_time (periodic->ev_base);
 #endif
@@ -3119,11 +3120,11 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
 		}
 
 		periodic->ev.repeat = timeout;
-		ev_timer_again (periodic->ev_base, &periodic->ev);
+		ev_timer_again (periodic->event_loop, &periodic->ev);
 	}
 	else {
 		luaL_unref (L, LUA_REGISTRYINDEX, periodic->cbref);
-		ev_timer_stop (periodic->ev_base, &periodic->ev);
+		ev_timer_stop (periodic->event_loop, &periodic->ev);
 		g_free (periodic);
 	}
 }
@@ -3163,7 +3164,7 @@ lua_config_add_periodic (lua_State *L)
 	periodic->timeout = timeout;
 	periodic->L = L;
 	periodic->cfg = cfg;
-	periodic->ev_base = ev_base;
+	periodic->event_loop = ev_base;
 	periodic->need_jitter = need_jitter;
 	lua_pushvalue (L, 4);
 	periodic->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
diff --git a/src/rspamadm/control.c b/src/rspamadm/control.c
index 754d874a2..0aa995abf 100644
--- a/src/rspamadm/control.c
+++ b/src/rspamadm/control.c
@@ -111,7 +111,6 @@ rspamd_control_finish_handler (struct rspamd_http_connection *conn,
 	const gchar *body;
 	gsize body_len;
 	struct rspamadm_control_cbdata *cbdata = conn->ud;
-	struct timeval exit_tv;
 
 	body = rspamd_http_message_get_body (msg, &body_len);
 	parser = ucl_parser_new (0);
@@ -157,9 +156,7 @@ rspamd_control_finish_handler (struct rspamd_http_connection *conn,
 	}
 
 end:
-	exit_tv.tv_sec = 0;
-	exit_tv.tv_usec = 0;
-	event_base_loopexit (rspamd_main->event_loop, &exit_tv);
+	ev_break (rspamd_main->event_loop, EVBREAK_ALL);
 
 	return 0;
 }
diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c
index a95521bdb..46fc342ea 100644
--- a/src/rspamadm/lua_repl.c
+++ b/src/rspamadm/lua_repl.c
@@ -515,7 +515,7 @@ rspamadm_lua_run_repl (lua_State *L)
 {
 	gchar *input;
 	gboolean is_multiline = FALSE;
-	GString *tb;
+	GString *tb = NULL;
 	guint i;
 
 	for (;;) {
@@ -591,15 +591,16 @@ struct rspamadm_lua_repl_session {
 };
 
 static void
-rspamadm_lua_accept_cb (gint fd, short what, void *arg)
+rspamadm_lua_accept_cb (EV_P_ ev_io *w, int revents)
 {
-	struct rspamadm_lua_repl_context *ctx = arg;
+	struct rspamadm_lua_repl_context *ctx =
+			(struct rspamadm_lua_repl_context *)w->data;
 	rspamd_inet_addr_t *addr;
 	struct rspamadm_lua_repl_session *session;
 	gint nfd;
 
 	if ((nfd =
-			rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
+			rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) {
 		rspamd_fprintf (stderr, "accept failed: %s", strerror (errno));
 		return;
 	}
@@ -808,7 +809,7 @@ rspamadm_lua (gint argc, gchar **argv, const struct rspamadm_command *cmd)
 		ctx = g_malloc0  (sizeof (*ctx));
 		http = rspamd_http_router_new (rspamadm_lua_error_handler,
 						rspamadm_lua_finish_handler,
-						NULL,
+						0.0,
 						NULL,
 						rspamd_main->http_ctx);
 		ctx->L = L;
@@ -822,19 +823,17 @@ rspamadm_lua (gint argc, gchar **argv, const struct rspamadm_command *cmd)
 
 			fd = rspamd_inet_address_listen (addr, SOCK_STREAM, TRUE);
 			if (fd != -1) {
-				struct event *ev;
+				static ev_io ev;
 
-				ev = g_malloc0 (sizeof (*ev));
-				event_set (ev, fd, EV_READ|EV_PERSIST, rspamadm_lua_accept_cb,
-						ctx);
-				event_base_set (ev_base, ev);
-				event_add (ev, NULL);
+				ev.data = ctx;
+				ev_io_init (&ev, rspamadm_lua_accept_cb, fd, EV_READ);
+				ev_io_start (ev_base, &ev);
 				rspamd_printf ("listen on %s\n",
 						rspamd_inet_address_to_string_pretty (addr));
 			}
 		}
 
-		event_base_loop (ev_base, 0);
+		ev_loop (ev_base, 0);
 
 		exit (EXIT_SUCCESS);
 	}
diff --git a/src/rspamadm/rspamadm.c b/src/rspamadm/rspamadm.c
index 4320c2460..5908f77f9 100644
--- a/src/rspamadm/rspamadm.c
+++ b/src/rspamadm/rspamadm.c
@@ -379,15 +379,6 @@ main (gint argc, gchar **argv, gchar **env)
 	rspamd_main->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
 			"rspamadm");
 
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FLAG
-	struct event_config *ev_cfg;
-	ev_cfg = event_config_new ();
-	event_config_set_flag (ev_cfg, EVENT_BASE_FLAG_NO_CACHE_TIME);
-	rspamd_main->ev_base = event_base_new_with_config (ev_cfg);
-#else
-	rspamd_main->event_loop = event_init ();
-#endif
-
 	rspamadm_fill_internal_commands (all_commands);
 	help_command.command_data = all_commands;
 
@@ -565,10 +556,8 @@ main (gint argc, gchar **argv, gchar **env)
 		cmd->run (0, NULL, cmd);
 	}
 
-	event_base_loopexit (rspamd_main->event_loop, NULL);
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FLAG
-	event_config_free (ev_cfg);
-#endif
+	ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+
 
 	REF_RELEASE (rspamd_main->cfg);
 	rspamd_log_close (rspamd_main->logger, TRUE);
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 5907a7ba4..b14bd086a 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -86,7 +86,6 @@ struct rspamd_http_upstream {
 	struct upstream_list *u;
 	struct rspamd_cryptobox_pubkey *key;
 	gdouble timeout;
-	struct timeval io_tv;
 	gint parser_from_ref;
 	gint parser_to_ref;
 	gboolean local;
@@ -101,7 +100,6 @@ struct rspamd_http_mirror {
 	struct rspamd_cryptobox_pubkey *key;
 	gdouble prob;
 	gdouble timeout;
-	struct timeval io_tv;
 	gint parser_from_ref;
 	gint parser_to_ref;
 	gboolean local;
@@ -113,14 +111,13 @@ static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
 struct rspamd_proxy_ctx {
 	guint64 magic;
 	/* Events base */
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	/* DNS resolver */
 	struct rspamd_dns_resolver *resolver;
 	/* Config */
 	struct rspamd_config *cfg;
 	/* END OF COMMON PART */
 	gdouble timeout;
-	struct timeval io_tv;
 	/* Encryption key for clients */
 	struct rspamd_cryptobox_keypair *key;
 	/* HTTP context */
@@ -174,8 +171,8 @@ struct rspamd_proxy_backend_connection {
 	ucl_object_t *results;
 	const gchar *err;
 	struct rspamd_proxy_session *s;
-	struct timeval *io_tv;
 	gint backend_sock;
+	ev_tstamp timeout;
 	enum rspamd_backend_flags flags;
 	gint parser_from_ref;
 	gint parser_to_ref;
@@ -464,8 +461,6 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
 		rspamd_lua_add_ref_dtor (L, pool, up->parser_to_ref);
 	}
 
-	double_to_tv (up->timeout, &up->io_tv);
-
 	g_hash_table_insert (ctx->upstreams, up->name, up);
 
 	return TRUE;
@@ -617,8 +612,6 @@ rspamd_proxy_parse_mirror (rspamd_mempool_t *pool,
 		up->settings_id = rspamd_mempool_strdup (pool, ucl_object_tostring (elt));
 	}
 
-	double_to_tv (up->timeout, &up->io_tv);
-
 	g_ptr_array_add (ctx->mirrors, up);
 
 	return TRUE;
@@ -1144,8 +1137,6 @@ proxy_request_decompress (struct rspamd_http_message *msg)
 		rspamd_http_message_set_body_from_fstring_steal (msg, body);
 		rspamd_http_message_remove_header (msg, "Compression");
 	}
-
-	return;
 }
 
 static struct rspamd_proxy_session *
@@ -1350,7 +1341,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
 				sizeof (*bk_conn));
 		bk_conn->s = session;
 		bk_conn->name = m->name;
-		bk_conn->io_tv = &m->io_tv;
+		bk_conn->timeout = m->timeout;
 
 		bk_conn->up = rspamd_upstream_get (m->u,
 				RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
@@ -1415,7 +1406,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
 			msg->method = HTTP_GET;
 			rspamd_http_connection_write_message_shared (bk_conn->backend_conn,
 					msg, NULL, NULL, bk_conn,
-					bk_conn->io_tv);
+					bk_conn->timeout);
 		}
 		else {
 			if (session->fname) {
@@ -1442,7 +1433,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
 
 			rspamd_http_connection_write_message (bk_conn->backend_conn,
 					msg, NULL, NULL, bk_conn,
-					bk_conn->io_tv);
+					bk_conn->timeout);
 		}
 
 		g_ptr_array_add (session->mirror_conns, bk_conn);
@@ -1468,7 +1459,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code,
 		reply->status = rspamd_fstring_new_init (status, strlen (status));
 		rspamd_http_connection_write_message (session->client_conn,
 				reply, NULL, NULL, session,
-				&session->ctx->io_tv);
+				session->ctx->timeout);
 	}
 }
 
@@ -1566,7 +1557,7 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
 	else {
 		rspamd_http_connection_write_message (session->client_conn,
 				msg, NULL, NULL, session,
-				bk_conn->io_tv);
+				bk_conn->timeout);
 	}
 
 	return 0;
@@ -1625,7 +1616,7 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task)
 				NULL,
 				ctype,
 				session,
-				NULL);
+				0);
 	}
 }
 
@@ -1666,7 +1657,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
 	msg = session->client_message;
 	task = rspamd_task_new (session->worker, session->ctx->cfg,
 			session->pool, session->ctx->lang_det,
-			session->ctx->ev_base);
+			session->ctx->event_loop);
 	task->flags |= RSPAMD_TASK_FLAG_MIME;
 	task->sock = -1;
 
@@ -1711,23 +1702,18 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
 
 	/* Set global timeout for the task */
 	if (session->ctx->default_upstream->timeout > 0.0) {
-		struct timeval task_tv;
+		task->timeout_ev.data = task;
+		ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+				session->ctx->default_upstream->timeout, 0.0);
+		ev_timer_start (task->event_loop, &task->timeout_ev);
 
-		event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-				task);
-		event_base_set (session->ctx->ev_base, &task->timeout_ev);
-		double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
-		event_add (&task->timeout_ev, &task_tv);
 	}
 	else if (session->ctx->has_self_scan) {
 		if (session->ctx->cfg->task_timeout > 0) {
-			struct timeval task_tv;
-
-			event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
-					task);
-			event_base_set (session->ctx->ev_base, &task->timeout_ev);
-			double_to_tv (session->ctx->cfg->task_timeout, &task_tv);
-			event_add (&task->timeout_ev, &task_tv);
+			task->timeout_ev.data = task;
+			ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+					session->ctx->cfg->task_timeout, 0.0);
+			ev_timer_start (task->event_loop, &task->timeout_ev);
 		}
 	}
 
@@ -1783,7 +1769,7 @@ retry:
 
 		session->master_conn->up = rspamd_upstream_get (backend->u,
 				RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
-		session->master_conn->io_tv = &backend->io_tv;
+		session->master_conn->timeout = backend->timeout;
 
 		if (session->master_conn->up == NULL) {
 			msg_err_session ("cannot select upstream for %s",
@@ -1853,7 +1839,7 @@ retry:
 			rspamd_http_connection_write_message_shared (
 					session->master_conn->backend_conn,
 					msg, NULL, NULL, session->master_conn,
-					session->master_conn->io_tv);
+					session->master_conn->timeout);
 		}
 		else {
 			if (session->fname) {
@@ -1881,7 +1867,7 @@ retry:
 			rspamd_http_connection_write_message (
 					session->master_conn->backend_conn,
 					msg, NULL, NULL, session->master_conn,
-					session->master_conn->io_tv);
+					session->master_conn->timeout);
 		}
 	}
 
@@ -2031,9 +2017,9 @@ proxy_milter_error_handler (gint fd,
 }
 
 static void
-proxy_accept_socket (gint fd, short what, void *arg)
+proxy_accept_socket (EV_P_ ev_io *w, int revents)
 {
-	struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+	struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
 	struct rspamd_proxy_ctx *ctx;
 	rspamd_inet_addr_t *addr;
 	struct rspamd_proxy_session *session;
@@ -2042,7 +2028,8 @@ proxy_accept_socket (gint fd, short what, void *arg)
 	ctx = worker->ctx;
 
 	if ((nfd =
-		rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+		rspamd_accept_from_socket (w->fd, &addr,
+				rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
 		msg_warn ("accept failed: %s", strerror (errno));
 		return;
 	}
@@ -2086,7 +2073,7 @@ proxy_accept_socket (gint fd, short what, void *arg)
 
 		rspamd_http_connection_read_message_shared (session->client_conn,
 				session,
-				&ctx->io_tv);
+				session->ctx->timeout);
 	}
 	else {
 		msg_info_session ("accepted milter connection from %s port %d",
@@ -2110,9 +2097,9 @@ proxy_accept_socket (gint fd, short what, void *arg)
 		}
 #endif
 
-		rspamd_milter_handle_socket (nfd, NULL,
+		rspamd_milter_handle_socket (nfd, 0.0,
 				session->pool,
-				ctx->ev_base,
+				ctx->event_loop,
 				proxy_milter_finish_handler,
 				proxy_milter_error_handler,
 				session);
*** OUTPUT TRUNCATED, 52 LINES SKIPPED ***


More information about the Commits mailing list