commit 8e33c25: [Project] Rework and simplify fuzzy storage, remove mirroring

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:28 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-19 14:13:39 +0100
URL: https://github.com/rspamd/rspamd/commit/8e33c251c98c5835b0c77cd82157480061db5a03

[Project] Rework and simplify fuzzy storage, remove mirroring

---
 src/controller.c    |   58 +-
 src/fuzzy_storage.c | 1648 +++++++--------------------------------------------
 2 files changed, 234 insertions(+), 1472 deletions(-)

diff --git a/src/controller.c b/src/controller.c
index bf74f03a3..851087945 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -105,12 +105,8 @@ INIT_LOG_MODULE(controller)
 #define COLOR_REJECT "#CB4B4B"
 #define COLOR_TOTAL "#9440ED"
 
-const struct timeval rrd_update_time = {
-		.tv_sec = 1,
-		.tv_usec = 0
-};
-
-const guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL;
+const static ev_tstamp rrd_update_time = 1.0;
+const static guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL;
 
 extern void fuzzy_stat_command (struct rspamd_task *task);
 
@@ -132,7 +128,7 @@ worker_t controller_worker = {
 struct rspamd_controller_worker_ctx {
 	guint64 magic;
 	/* Events base */
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	/* DNS resolver */
 	struct rspamd_dns_resolver *resolver;
 	/* Config */
@@ -153,7 +149,7 @@ struct rspamd_controller_worker_ctx {
 	struct rspamd_http_context *http_ctx;
 	struct rspamd_http_connection_router *http;
 	/* Server's start time */
-	time_t start_time;
+	ev_tstamp start_time;
 	/* Main server */
 	struct rspamd_main *srv;
 	/* SSL cert */
@@ -182,9 +178,9 @@ struct rspamd_controller_worker_ctx {
 	/* Local keypair */
 	gpointer key;
 
-	struct event *rrd_event;
+	ev_timer rrd_event;
 	struct rspamd_rrd_file *rrd;
-	struct event save_stats_event;
+	ev_timer save_stats_event;
 	struct rspamd_lang_detector *lang_det;
 	gdouble task_timeout;
 };
@@ -1525,7 +1521,7 @@ rspamd_controller_handle_lua_history (lua_State *L,
 
 			if (lua_isfunction (L, -1)) {
 				task = rspamd_task_new (session->ctx->worker, session->cfg,
-						session->pool, ctx->lang_det, ctx->ev_base);
+						session->pool, ctx->lang_det, ctx->event_loop);
 
 				task->resolver = ctx->resolver;
 				task->s = rspamd_session_create (session->pool,
@@ -1822,7 +1818,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent,
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->ev_base);
+			ctx->lang_det, ctx->event_loop);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -2004,7 +2000,7 @@ rspamd_controller_handle_learn_common (
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			session->ctx->lang_det, ctx->ev_base);
+			session->ctx->lang_det, ctx->event_loop);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -2102,7 +2098,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->ev_base);
+			ctx->lang_det, ctx->event_loop);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -2133,7 +2129,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
 
 		event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
 				task);
-		event_base_set (ctx->ev_base, &task->timeout_ev);
+		event_base_set (ctx->event_loop, &task->timeout_ev);
 		double_to_tv (ctx->task_timeout, &task_tv);
 		event_add (&task->timeout_ev, &task_tv);
 	}
@@ -2600,7 +2596,7 @@ rspamd_controller_handle_stat_common (
 	ctx = session->ctx;
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->ev_base);
+			ctx->lang_det, ctx->event_loop);
 	task->resolver = ctx->resolver;
 	cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata));
 	cbdata->conn_ent = conn_ent;
@@ -3002,7 +2998,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->ev_base);
+			ctx->lang_det, ctx->event_loop);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -3487,7 +3483,7 @@ lua_csession_get_ev_base (lua_State *L)
 		s = c->ud;
 		pbase = lua_newuserdata (L, sizeof (struct ev_loop *));
 		rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
-		*pbase = s->ctx->ev_base;
+		*pbase = s->ctx->event_loop;
 	}
 	else {
 		return luaL_error (L, "invalid arguments");
@@ -3702,7 +3698,7 @@ start_controller_worker (struct rspamd_worker *worker)
 	const guint save_stats_interval = 60 * 1000; /* 1 minute */
 	gpointer m;
 
-	ctx->ev_base = rspamd_prepare_worker (worker,
+	ctx->event_loop = rspamd_prepare_worker (worker,
 			"controller",
 			rspamd_controller_accept_socket);
 	msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -3752,7 +3748,7 @@ start_controller_worker (struct rspamd_worker *worker)
 		if (ctx->rrd) {
 			ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
 			evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
-			event_base_set (ctx->ev_base, ctx->rrd_event);
+			event_base_set (ctx->event_loop, ctx->rrd_event);
 			event_add (ctx->rrd_event, &rrd_update_time);
 		}
 		else if (rrd_err) {
@@ -3773,7 +3769,7 @@ start_controller_worker (struct rspamd_worker *worker)
 			"password");
 
 	/* Accept event */
-	ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+	ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
 			ctx->cfg->ups_ctx);
 	ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
 			rspamd_controller_finish_handler, &ctx->io_tv,
@@ -3889,40 +3885,40 @@ start_controller_worker (struct rspamd_worker *worker)
 			rspamd_controller_handle_unknown);
 
 	ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-			ctx->ev_base,
+			ctx->event_loop,
 			worker->srv->cfg);
 
 	rspamd_upstreams_library_config (worker->srv->cfg, worker->srv->cfg->ups_ctx,
-			ctx->ev_base, ctx->resolver->r);
-	rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+			ctx->event_loop, ctx->resolver->r);
+	rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
 			worker);
-	rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
+	rspamd_stat_init (worker->srv->cfg, ctx->event_loop);
 
 	if (worker->index == 0) {
 		if (!ctx->cfg->disable_monitored) {
-			rspamd_worker_init_monitored (worker, ctx->ev_base, ctx->resolver);
+			rspamd_worker_init_monitored (worker, ctx->event_loop, ctx->resolver);
 		}
 
-		rspamd_map_watch (worker->srv->cfg, ctx->ev_base,
+		rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
 				ctx->resolver, worker, TRUE);
 
 		/* Schedule periodic stats saving, see #1823 */
 		event_set (&ctx->save_stats_event, -1, EV_PERSIST,
 				rspamd_controller_stats_save_periodic,
 				ctx);
-		event_base_set (ctx->ev_base, &ctx->save_stats_event);
+		event_base_set (ctx->event_loop, &ctx->save_stats_event);
 		msec_to_tv (save_stats_interval, &stv);
 		evtimer_add (&ctx->save_stats_event, &stv);
 	}
 	else {
-		rspamd_map_watch (worker->srv->cfg, ctx->ev_base,
+		rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
 				ctx->resolver, worker, FALSE);
 	}
 
-	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base, worker);
+	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
 
 	/* Start event loop */
-	event_base_loop (ctx->ev_base, 0);
+	event_base_loop (ctx->event_loop, 0);
 	rspamd_worker_block_signals ();
 
 	rspamd_stat_close ();
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 86a4230de..4565be874 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -36,8 +36,7 @@
 #include "libserver/rspamd_control.h"
 #include "libutil/hash.h"
 #include "libutil/map_private.h"
-#include "libutil/http_private.h"
-#include "libutil/http_router.h"
+#include "contrib/uthash/utlist.h"
 #include "unix-std.h"
 
 #include <math.h>
@@ -132,7 +131,7 @@ static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
 struct rspamd_fuzzy_storage_ctx {
 	guint64 magic;
 	/* Events base */
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	/* DNS resolver */
 	struct rspamd_dns_resolver *resolver;
 	/* Config */
@@ -146,34 +145,21 @@ struct rspamd_fuzzy_storage_ctx {
 	struct rspamd_radix_map_helper *blocked_ips;
 	struct rspamd_radix_map_helper *ratelimit_whitelist;
 
-	struct rspamd_cryptobox_keypair *sync_keypair;
-	struct rspamd_cryptobox_pubkey *master_key;
-	struct timeval master_io_tv;
-	gdouble master_timeout;
-	GPtrArray *mirrors;
 	const ucl_object_t *update_map;
-	const ucl_object_t *masters_map;
 	const ucl_object_t *blocked_map;
 	const ucl_object_t *ratelimit_whitelist_map;
 
-	GHashTable *master_flags;
 	guint keypair_cache_size;
-	gint peer_fd;
-	struct event peer_ev;
-	struct event stat_ev;
-	struct timeval stat_tv;
+	ev_timer stat_ev;
+	ev_io peer_ev;
+	ev_tstamp stat_timeout;
 
 	/* Local keypair */
 	struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
 	struct fuzzy_key *default_key;
 	GHashTable *keys;
 	gboolean encrypted_only;
-	gboolean collection_mode;
 	gboolean read_only;
-	struct rspamd_cryptobox_keypair *collection_keypair;
-	struct rspamd_cryptobox_pubkey *collection_sign_key;
-	gchar *collection_id_file;
-	struct rspamd_http_context *http_ctx;
 	struct rspamd_keypair_cache *keypair_cache;
 	rspamd_lru_hash_t *errors_ips;
 	rspamd_lru_hash_t *ratelimit_buckets;
@@ -181,7 +167,8 @@ struct rspamd_fuzzy_storage_ctx {
 	GArray *updates_pending;
 	guint updates_failed;
 	guint updates_maxfail;
-	guint32 collection_id;
+	/* Used to send data between workers */
+	gint peer_fd;
 
 	/* Ratelimits */
 	guint leaky_bucket_ttl;
@@ -192,7 +179,6 @@ struct rspamd_fuzzy_storage_ctx {
 	gdouble leaky_bucket_rate;
 
 	struct rspamd_worker *worker;
-	struct rspamd_http_connection_router *collection_rt;
 	const ucl_object_t *skip_map;
 	struct rspamd_hash_map_helper *skip_hashes;
 	guchar cookie[COOKIE_SIZE];
@@ -224,14 +210,14 @@ struct fuzzy_session {
 	enum fuzzy_cmd_type cmd_type;
 	gint fd;
 	guint64 time;
-	struct event io;
+	struct ev_io io;
 	ref_entry_t ref;
 	struct fuzzy_key_stat *key_stat;
 	guchar nm[rspamd_cryptobox_MAX_NMBYTES];
 };
 
 struct fuzzy_peer_request {
-	struct event io_ev;
+	ev_io io_ev;
 	struct fuzzy_peer_cmd cmd;
 };
 
@@ -241,19 +227,13 @@ struct fuzzy_key {
 	struct fuzzy_key_stat *stat;
 };
 
-struct fuzzy_master_update_session {
-	const gchar *name;
-	gchar uid[16];
-	struct rspamd_http_connection *conn;
-	struct rspamd_http_message *msg;
+struct rspamd_updates_cbdata {
+	GArray *updates_pending;
 	struct rspamd_fuzzy_storage_ctx *ctx;
-	const gchar *src;
-	gchar *psrc;
-	rspamd_inet_addr_t *addr;
-	gboolean replied;
-	gint sock;
+	gchar *source;
 };
 
+
 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
 
 static gboolean
@@ -261,8 +241,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
 {
 	rspamd_inet_addr_t *masked;
 	struct rspamd_leaky_bucket_elt *elt;
-	struct timeval tv;
-	gdouble now;
+	ev_tstamp now;
 
 	if (session->ctx->ratelimit_whitelist != NULL) {
 		if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
@@ -289,15 +268,9 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
 				MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
 	}
 
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
-	event_base_gettimeofday_cached (session->ctx->ev_base, &tv);
-#else
-	gettimeofday (&tv, NULL);
-#endif
-
-	now = tv_to_double (&tv);
+	now = ev_now (session->ctx->event_loop);
 	elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
-			tv.tv_sec);
+			now);
 
 	if (elt) {
 		gboolean ratelimited = FALSE;
@@ -348,7 +321,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
 		rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
 				masked,
 				elt,
-				tv.tv_sec,
+				now,
 				session->ctx->leaky_bucket_ttl);
 	}
 
@@ -424,15 +397,6 @@ fuzzy_count_callback (guint64 count, void *ud)
 	ctx->stat.fuzzy_hashes = count;
 }
 
-struct fuzzy_slave_connection {
-	struct rspamd_cryptobox_keypair *local_key;
-	struct rspamd_cryptobox_pubkey *remote_key;
-	struct upstream *up;
-	struct rspamd_http_connection *http_conn;
-	struct rspamd_fuzzy_mirror *mirror;
-	gint sock;
-};
-
 static void
 fuzzy_rl_bucket_free (gpointer p)
 {
@@ -443,227 +407,31 @@ fuzzy_rl_bucket_free (gpointer p)
 }
 
 static void
-fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
-{
-	if (conn) {
-		if (conn->http_conn) {
-			rspamd_http_connection_reset (conn->http_conn);
-			rspamd_http_connection_unref (conn->http_conn);
-		}
-
-		close (conn->sock);
-
-		g_free (conn);
-	}
-}
-
-struct rspamd_fuzzy_updates_cbdata {
-	struct rspamd_fuzzy_storage_ctx *ctx;
-	struct rspamd_http_message *msg;
-	struct fuzzy_slave_connection *conn;
-	struct rspamd_fuzzy_mirror *m;
-	GArray *updates_pending;
-};
-
-static void
-fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
-{
-	struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
-	struct fuzzy_peer_cmd *io_cmd;
-	guint32 rev32 = rev64, len;
-	const gchar *p;
-	rspamd_fstring_t *reply;
-	struct fuzzy_slave_connection *conn;
-	struct rspamd_fuzzy_storage_ctx *ctx;
-	struct rspamd_http_message *msg;
-	struct rspamd_fuzzy_mirror *m;
-	struct timeval tv;
-	guint i;
-
-	conn = cbdata->conn;
-	ctx = cbdata->ctx;
-	msg = cbdata->msg;
-	m = cbdata->m;
-
-	rev32 = GUINT32_TO_LE (rev32);
-	len = sizeof (guint32) * 2; /* revision + last chunk */
-
-	for (i = 0; i < cbdata->updates_pending->len; i ++) {
-		io_cmd = &g_array_index (cbdata->updates_pending,
-				struct fuzzy_peer_cmd, i);
-
-		if (io_cmd->is_shingle) {
-			len += sizeof (guint32) + sizeof (guint32) +
-					sizeof (struct rspamd_fuzzy_shingle_cmd);
-		}
-		else {
-			len += sizeof (guint32) + sizeof (guint32) +
-					sizeof (struct rspamd_fuzzy_cmd);
-		}
-	}
-
-	reply = rspamd_fstring_sized_new (len);
-	reply = rspamd_fstring_append (reply, (const char *)&rev32,
-			sizeof (rev32));
-
-	for (i = 0; i < cbdata->updates_pending->len; i ++) {
-		io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i);
-
-		if (io_cmd->is_shingle) {
-			len = sizeof (guint32) +
-					sizeof (struct rspamd_fuzzy_shingle_cmd);
-		}
-		else {
-			len = sizeof (guint32) +
-					sizeof (struct rspamd_fuzzy_cmd);
-		}
-
-		p = (const char *)io_cmd;
-		len = GUINT32_TO_LE (len);
-		reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
-		reply = rspamd_fstring_append (reply, p, len);
-	}
-
-	/* Last chunk */
-	len = 0;
-	reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
-	rspamd_http_message_set_body_from_fstring_steal (msg, reply);
-	double_to_tv (ctx->sync_timeout, &tv);
-	rspamd_http_connection_write_message (conn->http_conn,
-			msg, NULL, NULL, conn,
-			&tv);
-	msg_info ("send update request to %s", m->name);
-
-	g_array_free (cbdata->updates_pending, TRUE);
-	g_free (cbdata);
-}
-
-static void
-fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
-							  struct fuzzy_slave_connection *conn,
-							  struct rspamd_fuzzy_storage_ctx *ctx,
-							  struct rspamd_http_message *msg,
-							  GArray *updates)
-{
-
-	struct rspamd_fuzzy_updates_cbdata *cbdata;
-
-	cbdata = g_malloc (sizeof (*cbdata));
-	cbdata->ctx = ctx;
-	cbdata->msg = msg;
-	cbdata->conn = conn;
-	cbdata->m = m;
-	/* Copy queue */
-	cbdata->updates_pending = g_array_sized_new (FALSE, FALSE,
-			sizeof (struct fuzzy_peer_cmd), updates->len);
-	g_array_append_vals (cbdata->updates_pending, updates->data, updates->len);
-	rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
-			fuzzy_mirror_updates_version_cb, cbdata);
-}
-
-static void
-fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
-	struct fuzzy_slave_connection *bk_conn = conn->ud;
-	msg_info ("abnormally closing connection from backend: %s:%s, "
-			"error: %e",
-			bk_conn->mirror->name,
-			rspamd_inet_address_to_string (rspamd_upstream_addr_cur (bk_conn->up)),
-			err);
-
-	fuzzy_mirror_close_connection (bk_conn);
-}
-
-static gint
-fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
-	struct rspamd_http_message *msg)
+fuzzy_stat_count_callback (guint64 count, void *ud)
 {
-	struct fuzzy_slave_connection *bk_conn = conn->ud;
-
-	msg_info ("finished mirror connection to %s", bk_conn->mirror->name);
-	fuzzy_mirror_close_connection (bk_conn);
+	struct rspamd_fuzzy_storage_ctx *ctx = ud;
 
-	return 0;
+	ev_timer_again (ctx->event_loop, &ctx->stat_ev);
+	ctx->stat.fuzzy_hashes = count;
 }
 
 static void
-rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
-		struct rspamd_fuzzy_mirror *m, GArray *updates)
+rspamd_fuzzy_stat_callback (EV_P_ ev_timer *w, int revents)
 {
-	struct fuzzy_slave_connection *conn;
-	struct rspamd_http_message *msg;
-
-	conn = g_malloc0 (sizeof (*conn));
-	conn->up = rspamd_upstream_get (m->u,
-			RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0);
-	conn->mirror = m;
-
-	if (conn->up == NULL) {
-		g_free (conn);
-		msg_err ("cannot select upstream for %s", m->name);
-		return;
-	}
-
-	conn->sock = rspamd_inet_address_connect (
-			rspamd_upstream_addr_next (conn->up),
-			SOCK_STREAM, TRUE);
-
-	if (conn->sock == -1) {
-		g_free (conn);
-		msg_err ("cannot connect upstream for %s", m->name);
-		rspamd_upstream_fail (conn->up, TRUE);
-		return;
-	}
-
-	msg = rspamd_http_new_message (HTTP_REQUEST);
-	rspamd_printf_fstring (&msg->url, "/update_v1/%s", m->name);
-
-	conn->http_conn = rspamd_http_connection_new_client_socket (
-			ctx->http_ctx,
-			NULL,
-			fuzzy_mirror_error_handler,
-			fuzzy_mirror_finish_handler,
-			RSPAMD_HTTP_CLIENT_SIMPLE,
-			conn->sock);
-
-	rspamd_http_connection_set_key (conn->http_conn,
-			ctx->sync_keypair);
-	msg->peer_key = rspamd_pubkey_ref (m->key);
-	fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates);
+	struct rspamd_fuzzy_storage_ctx *ctx =
+			(struct rspamd_fuzzy_storage_ctx *)w->data;
+	rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx);
 }
 
-struct rspamd_updates_cbdata {
-	GArray *updates_pending;
-	struct rspamd_fuzzy_storage_ctx *ctx;
-	gchar *source;
-};
 
 static void
 fuzzy_update_version_callback (guint64 ver, void *ud)
 {
 	msg_info ("updated fuzzy storage from %s: version: %d",
-		(const char *)ud, (gint)ver);
+			(const char *)ud, (gint)ver);
 	g_free (ud);
 }
 
-static void
-fuzzy_stat_count_callback (guint64 count, void *ud)
-{
-	struct rspamd_fuzzy_storage_ctx *ctx = ud;
-
-	event_add (&ctx->stat_ev, &ctx->stat_tv);
-	ctx->stat.fuzzy_hashes = count;
-}
-
-static void
-rspamd_fuzzy_stat_callback (gint fd, gshort what, gpointer ud)
-{
-	struct rspamd_fuzzy_storage_ctx *ctx = ud;
-
-	event_del (&ctx->stat_ev);
-	rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx);
-}
-
 static void
 rspamd_fuzzy_updates_cb (gboolean success,
 						 guint nadded,
@@ -673,8 +441,6 @@ rspamd_fuzzy_updates_cb (gboolean success,
 						 void *ud)
 {
 	struct rspamd_updates_cbdata *cbdata = ud;
-	struct rspamd_fuzzy_mirror *m;
-	guint i;
 	struct rspamd_fuzzy_storage_ctx *ctx;
 	const gchar *source;
 
@@ -684,15 +450,6 @@ rspamd_fuzzy_updates_cb (gboolean success,
 	if (success) {
 		rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
 
-		if (ctx->updates_pending->len > 0) {
-			for (i = 0; i < ctx->mirrors->len; i ++) {
-				m = g_ptr_array_index (ctx->mirrors, i);
-
-				rspamd_fuzzy_send_update_mirror (ctx, m,
-						cbdata->updates_pending);
-			}
-		}
-
 		msg_info ("successfully updated fuzzy storage: %d updates in queue; "
 				  "%d pending currently; "
 				  "%d added, %d deleted, %d extended, %d duplicates",
@@ -727,12 +484,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
 
 	if (ctx->worker->wanna_die) {
 		/* Plan exit */
-		struct timeval tv;
-
-		tv.tv_sec = 0;
-		tv.tv_usec = 0;
-
-		event_base_loopexit (ctx->ev_base, &tv);
+		ev_break (ctx->event_loop, EVBREAK_ALL);
 	}
 
 	g_array_free (cbdata->updates_pending, TRUE);
@@ -762,9 +514,9 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
 }
 
 static void
-rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
+rspamd_fuzzy_reply_io (EV_P_ ev_io *w, int revents)
 {
-	struct fuzzy_session *session = d;
+	struct fuzzy_session *session = (struct fuzzy_session *)w->data;
 
 	rspamd_fuzzy_write_reply (session);
 	REF_RELEASE (session);
@@ -807,10 +559,9 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
 		if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
 			/* Grab reference to avoid early destruction */
 			REF_RETAIN (session);
-			event_set (&session->io, session->fd, EV_WRITE,
-					rspamd_fuzzy_reply_io, session);
-			event_base_set (session->ctx->ev_base, &session->io);
-			event_add (&session->io, NULL);
+			session->io.data = session;
+			ev_io_init (&session->io, rspamd_fuzzy_reply_io, session->fd, EV_WRITE);
+			ev_io_start (session->ctx->event_loop, &session->io);
 		}
 		else {
 			msg_err ("error while writing reply: %s", strerror (errno));
@@ -818,22 +569,6 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
 	}
 }
 
-static void
-fuzzy_peer_send_io (gint fd, gshort what, gpointer d)
-{
-	struct fuzzy_peer_request *up_req = d;
-	gssize r;
-
-	r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
-
-	if (r != sizeof (up_req->cmd)) {
-		msg_err ("cannot send update request to the peer: %s", strerror (errno));
-	}
*** OUTPUT TRUNCATED, 1571 LINES SKIPPED ***


More information about the Commits mailing list