commit 423edef: [Project] Start maps rework

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:15 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-16 21:09:59 +0100
URL: https://github.com/rspamd/rspamd/commit/423edefbda01bb9a44afd69e78d591080d571f58

[Project] Start maps rework

---
 src/libutil/map.c         | 125 +++++++++++++++++++++-------------------------
 src/libutil/map_private.h |  16 +++---
 2 files changed, 66 insertions(+), 75 deletions(-)

diff --git a/src/libutil/map.c b/src/libutil/map.c
index 82c668952..17da0062a 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -44,7 +44,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd,
 									 gboolean plan_new);
 static void free_http_cbdata_dtor (gpointer p);
 static void free_http_cbdata (struct http_callback_data *cbd);
-static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
+static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
 static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
 										  gboolean initial, gboolean errored);
 static gboolean read_map_file_chunks (struct rspamd_map *map,
@@ -130,7 +130,7 @@ write_http_request (struct http_callback_data *cbd)
 			cbd->data->host,
 			NULL,
 			cbd,
-			&cbd->tv);
+			cbd->timeout);
 }
 
 static gboolean
@@ -325,21 +325,23 @@ http_map_error (struct rspamd_http_connection *conn,
 			cbd->bk->uri,
 			cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
 			err);
-	rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+	rspamd_map_process_periodic (cbd->periodic);
 	MAP_RELEASE (cbd, "http_callback_data");
 }
 
 static void
-rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+rspamd_map_cache_cb (struct ev_loop *loop, ev_periodic *w, int revents)
 {
-	struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+	struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
+			w->data;
 	struct rspamd_map *map;
 	struct http_map_data *data;
-	struct timeval tv;
 
 	map = cache_cbd->map;
 	data = cache_cbd->data;
 
+	ev_periodic_stop (loop, &cache_cbd->timeout);
+
 	if (cache_cbd->gen != cache_cbd->data->gen) {
 		/* We have another update, so this cache element is obviously expired */
 		/*
@@ -349,7 +351,6 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
 		msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
 				cache_cbd->gen, cache_cbd->data->gen, map->name);
 		MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
-		event_del (&cache_cbd->timeout);
 		g_free (cache_cbd);
 	}
 	else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
@@ -359,15 +360,13 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
 		 */
 		cache_cbd->last_checked = cache_cbd->data->last_checked;
 		msg_debug_map ("cached data is up to date for %s", map->name);
-		double_to_tv (map->poll_timeout * 2, &tv);
-		event_add (&cache_cbd->timeout, &tv);
+		ev_periodic_again (loop, &cache_cbd->timeout);
 	}
 	else {
 		data->cur_cache_cbd = NULL;
 		g_atomic_int_set (&data->cache->available, 0);
 		MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
 		msg_info_map ("cached data is now expired for %s", map->name);
-		event_del (&cache_cbd->timeout);
 		g_free (cache_cbd);
 	}
 }
@@ -456,7 +455,7 @@ http_map_finish (struct rspamd_http_connection *conn,
 			g_atomic_int_set (&data->cache->available, 0);
 			data->cur_cache_cbd = NULL;
 
-			rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+			rspamd_map_process_periodic (cbd->periodic);
 			MAP_RELEASE (cbd, "http_callback_data");
 
 			return 0;
@@ -622,6 +621,8 @@ read_data:
 		}
 
 		/* Check for expires */
+		double cached_timeout = map->poll_timeout * 2;
+
 		expires_hdr = rspamd_http_message_find_header (msg, "Expires");
 
 		if (expires_hdr) {
@@ -635,19 +636,12 @@ read_data:
 					hdate = MIN (map->next_check, hdate);
 				}
 
-				double cached_timeout = map->next_check - msg->date +
-					map->poll_timeout * 2;
+				cached_timeout = map->next_check - msg->date +
+								 map->poll_timeout * 2;
 
 				map->next_check = hdate;
-				double_to_tv (cached_timeout, &tv);
-			}
-			else {
-				double_to_tv (map->poll_timeout * 2, &tv);
 			}
 		}
-		else {
-			double_to_tv (map->poll_timeout * 2, &tv);
-		}
 
 		/* Check for etag */
 		etag_hdr = rspamd_http_message_find_header (msg, "ETag");
@@ -688,10 +682,9 @@ read_data:
 		cache_cbd->gen = cbd->data->gen;
 		MAP_RETAIN (cache_cbd->shm, "shmem_data");
 
-		event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
-				cache_cbd);
-		event_base_set (cbd->ev_base, &cache_cbd->timeout);
-		event_add (&cache_cbd->timeout, &tv);
+		ev_periodic_set (&cache_cbd->timeout, 0.0, cached_timeout, NULL);
+		ev_periodic_start (cbd->event_loop, &cache_cbd->timeout);
+		cache_cbd->timeout.data = cache_cbd;
 		data->cur_cache_cbd = cache_cbd;
 
 		if (map->next_check) {
@@ -700,7 +693,7 @@ read_data:
 		}
 		else {
 			rspamd_http_date_format (next_check_date, sizeof (next_check_date),
-					time (NULL) + map->poll_timeout);
+					ev_now (cbd->event_loop) + map->poll_timeout);
 		}
 
 
@@ -773,7 +766,7 @@ read_data:
 
 		cbd->periodic->cur_backend ++;
 		munmap (in, dlen);
-		rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+		rspamd_map_process_periodic (cbd->periodic);
 	}
 	else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) {
 		cbd->data->last_checked = msg->date;
@@ -819,13 +812,13 @@ read_data:
 		}
 		else {
 			rspamd_http_date_format (next_check_date, sizeof (next_check_date),
-					time (NULL) + map->poll_timeout);
+					ev_now (cbd->event_loop) + map->poll_timeout);
 		}
 		msg_info_map ("data is not modified for server %s, next check at %s",
 				cbd->data->host, next_check_date);
 
 		cbd->periodic->cur_backend ++;
-		rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+		rspamd_map_process_periodic (cbd->periodic);
 	}
 	else {
 		msg_info_map ("cannot load map %s from %s: HTTP error %d",
@@ -838,7 +831,7 @@ read_data:
 
 err:
 	cbd->periodic->errored = 1;
-	rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+	rspamd_map_process_periodic (cbd->periodic);
 	MAP_RELEASE (cbd, "http_callback_data");
 
 	return 0;
@@ -951,6 +944,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
 		}
 	}
 
+	ev_stat_stat (map->event_loop, &data->st_ev);
 	len = st.st_size;
 
 	if (bk->is_signed) {
@@ -1045,9 +1039,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
 		map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
 	}
 
-	/* Also update at the read time */
-	memcpy (&data->st, &st, sizeof (struct stat));
-
 	return TRUE;
 }
 
@@ -1143,7 +1134,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
 
 	map = periodic->map;
 	msg_debug_map ("periodic dtor %p", periodic);
-	event_del (&periodic->ev);
 
 	if (periodic->need_modify) {
 		/* We are done */
@@ -1162,6 +1152,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
 	g_free (periodic);
 }
 
+/* Called on timer execution */
+static void
+rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
+{
+	struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;
+
+	ev_timer_stop (loop, w);
+	rspamd_map_process_periodic (cbd);
+}
+
 static void
 rspamd_map_schedule_periodic (struct rspamd_map *map,
 		gboolean locked, gboolean initial, gboolean errored)
@@ -1224,14 +1224,11 @@ rspamd_map_schedule_periodic (struct rspamd_map *map,
 	map->scheduled_check = TRUE;
 	REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);
 
-	evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd);
-	event_base_set (map->ev_base, &cbd->ev);
-
+	ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
+	ev_timer_start (map->event_loop, &cbd->ev);
 
 	msg_debug_map ("schedule new periodic event %p in %.2f seconds",
 			cbd, jittered_sec);
-	double_to_tv (jittered_sec, &map->tv);
-	evtimer_add (&cbd->ev, &map->tv);
 }
 
 static void
@@ -1286,7 +1283,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
 			msg_err_map ("cannot resolve %s: %s", cbd->data->host,
 					rdns_strerror (reply->code));
 			cbd->periodic->errored = 1;
-			rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+			rspamd_map_process_periodic (cbd->periodic);
 		}
 	}
 
@@ -1567,7 +1564,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
 				periodic->need_modify = TRUE;
 				/* Reset the whole chain */
 				periodic->cur_backend = 0;
-				rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+				rspamd_map_process_periodic (periodic);
 			}
 			else {
 				if (map->active_http) {
@@ -1577,7 +1574,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
 				else {
 					/* Switch to the next backend */
 					periodic->cur_backend++;
-					rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+					rspamd_map_process_periodic (periodic);
 				}
 			}
 
@@ -1592,7 +1589,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
 				/* Switch to the next backend */
 				periodic->cur_backend++;
 				data->last_modified = data->cache->last_modified;
-				rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+				rspamd_map_process_periodic (periodic);
 
 				return;
 			}
@@ -1601,7 +1598,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
 	else if (!map->active_http) {
 		/* Switch to the next backend */
 		periodic->cur_backend ++;
-		rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+		rspamd_map_process_periodic (periodic);
 
 		return;
 	}
@@ -1609,7 +1606,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
 check:
 	cbd = g_malloc0 (sizeof (struct http_callback_data));
 
-	cbd->ev_base = map->ev_base;
+	cbd->event_loop = map->event_loop;
 	cbd->map = map;
 	cbd->data = data;
 	cbd->check = check;
@@ -1618,7 +1615,6 @@ check:
 	cbd->bk = bk;
 	MAP_RETAIN (bk, "rspamd_map_backend");
 	cbd->stage = map_resolve_host2;
-	double_to_tv (map->cfg->map_timeout, &cbd->tv);
 	REF_INIT_RETAIN (cbd, free_http_cbdata);
 
 	msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
@@ -1686,7 +1682,7 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud)
 }
 
 static void
-rspamd_map_http_read_callback (gint fd, short what, void *ud)
+rspamd_map_http_read_callback (void *ud)
 {
 	struct map_periodic_cbdata *cbd = ud;
 	struct rspamd_map *map;
@@ -1698,36 +1694,31 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud)
 }
 
 static void
-rspamd_map_file_check_callback (gint fd, short what, void *ud)
+rspamd_map_file_check_callback (void *ud)
 {
 	struct rspamd_map *map;
 	struct map_periodic_cbdata *periodic = ud;
 	struct file_map_data *data;
 	struct rspamd_map_backend *bk;
-	struct stat st;
 
 	map = periodic->map;
 
 	bk = g_ptr_array_index (map->backends, periodic->cur_backend);
 	data = bk->data.fd;
 
-	if (stat (data->filename, &st) != -1 &&
-			(st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
-		/* File was modified since last check */
-		msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
-				data->st.st_mtime, st.st_mtime, data->filename);
-		memcpy (&data->st, &st, sizeof (struct stat));
+	if (!data->processed) {
+		/* File has never been read */
 		periodic->need_modify = TRUE;
 		periodic->cur_backend = 0;
 
-		rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+		rspamd_map_process_periodic (periodic);
 
 		return;
 	}
 
 	/* Switch to the next backend */
 	periodic->cur_backend ++;
-	rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+	rspamd_map_process_periodic (periodic);
 }
 
 static void
@@ -1746,21 +1737,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud)
 		periodic->need_modify = TRUE;
 		periodic->cur_backend = 0;
 
-		rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+		rspamd_map_process_periodic (periodic);
 
 		return;
 	}
 
 	/* Switch to the next backend */
 	periodic->cur_backend ++;
-	rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+	rspamd_map_process_periodic (periodic);
 }
 
 static void
-rspamd_map_file_read_callback (gint fd, short what, void *ud)
+rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
 {
 	struct rspamd_map *map;
-	struct map_periodic_cbdata *periodic = ud;
 	struct file_map_data *data;
 	struct rspamd_map_backend *bk;
 
@@ -1774,17 +1764,19 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud)
 	if (!read_map_file (map, data, bk, periodic)) {
 		periodic->errored = TRUE;
 	}
+	else {
+		data->processed = TRUE;
+	}
 
 	/* Switch to the next backend */
 	periodic->cur_backend ++;
-	rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+	rspamd_map_process_periodic (periodic);
 }
 
 static void
-rspamd_map_static_read_callback (gint fd, short what, void *ud)
+rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
 {
 	struct rspamd_map *map;
-	struct map_periodic_cbdata *periodic = ud;
 	struct static_map_data *data;
 	struct rspamd_map_backend *bk;
 
@@ -1801,14 +1793,13 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud)
 
 	/* Switch to the next backend */
 	periodic->cur_backend ++;
-	rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+	rspamd_map_process_periodic (periodic);
 }
 
 static void
-rspamd_map_periodic_callback (gint fd, short what, void *ud)
+rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
 {
 	struct rspamd_map_backend *bk;
-	struct map_periodic_cbdata *cbd = ud;
 	struct rspamd_map *map;
 
 	map = cbd->map;
@@ -1904,7 +1895,7 @@ rspamd_map_watch (struct rspamd_config *cfg,
 	/* First of all do synced read of data */
 	while (cur) {
 		map = cur->data;
-		map->ev_base = ev_base;
+		map->event_loop = ev_base;
 		map->r = resolver;
 		map->wrk = worker;
 
diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h
index 8b45881b6..71f05aee7 100644
--- a/src/libutil/map_private.h
+++ b/src/libutil/map_private.h
@@ -54,7 +54,8 @@ enum fetch_proto {
  */
 struct file_map_data {
 	gchar *filename;
-	struct stat st;
+	gboolean processed;
+	ev_stat st_ev;
 };
 
 
@@ -130,7 +131,7 @@ struct rspamd_map {
 	map_fin_cb_t fin_callback;
 	map_dtor_t dtor;
 	void **user_data;
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	struct rspamd_worker *wrk;
 	gchar *description;
 	gchar *name;
@@ -143,7 +144,7 @@ struct rspamd_map {
 	gsize nelts;
 	guint64 digest;
 	/* Should we check HTTP or just load cached data */
-	struct timeval tv;
+	ev_tstamp timeout;
 	gdouble poll_timeout;
 	time_t next_check;
 	gboolean active_http;
@@ -164,7 +165,7 @@ enum rspamd_map_http_stage {
 struct map_periodic_cbdata {
 	struct rspamd_map *map;
 	struct map_cb_data cbdata;
-	struct event ev;
+	ev_timer ev;
 	gboolean need_modify;
 	gboolean errored;
 	gboolean locked;
@@ -183,7 +184,7 @@ struct rspamd_http_file_data {
 };
 
 struct http_callback_data {
-	struct ev_loop *ev_base;
+	struct ev_loop *event_loop;
 	struct rspamd_http_connection *conn;
 	rspamd_inet_addr_t *addr;
 	struct rspamd_map *map;
@@ -191,16 +192,15 @@ struct http_callback_data {
 	struct http_map_data *data;
 	struct map_periodic_cbdata *periodic;
 	struct rspamd_cryptobox_pubkey *pk;
-	gboolean check;
 	struct rspamd_storage_shmem *shmem_data;
 	struct rspamd_storage_shmem *shmem_sig;
 	struct rspamd_storage_shmem *shmem_pubkey;
 	gsize data_len;
 	gsize sig_len;
 	gsize pubkey_len;
-
+	gboolean check;
 	enum rspamd_map_http_stage stage;
-	struct timeval tv;
+	ev_tstamp timeout;
 
 	ref_entry_t ref;
 };


More information about the Commits mailing list