commit 423edef: [Project] Start maps rework
Vsevolod Stakhov
vsevolod at highsecure.ru
Sat Jun 22 12:14:15 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-06-16 21:09:59 +0100
URL: https://github.com/rspamd/rspamd/commit/423edefbda01bb9a44afd69e78d591080d571f58
[Project] Start maps rework
---
src/libutil/map.c | 125 +++++++++++++++++++++-------------------------
src/libutil/map_private.h | 16 +++---
2 files changed, 66 insertions(+), 75 deletions(-)
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 82c668952..17da0062a 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -44,7 +44,7 @@ static void free_http_cbdata_common (struct http_callback_data *cbd,
gboolean plan_new);
static void free_http_cbdata_dtor (gpointer p);
static void free_http_cbdata (struct http_callback_data *cbd);
-static void rspamd_map_periodic_callback (gint fd, short what, void *ud);
+static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
static void rspamd_map_schedule_periodic (struct rspamd_map *map, gboolean locked,
gboolean initial, gboolean errored);
static gboolean read_map_file_chunks (struct rspamd_map *map,
@@ -130,7 +130,7 @@ write_http_request (struct http_callback_data *cbd)
cbd->data->host,
NULL,
cbd,
- &cbd->tv);
+ cbd->timeout);
}
static gboolean
@@ -325,21 +325,23 @@ http_map_error (struct rspamd_http_connection *conn,
cbd->bk->uri,
cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
err);
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
}
static void
-rspamd_map_cache_cb (gint fd, short what, gpointer ud)
+rspamd_map_cache_cb (struct ev_loop *loop, ev_periodic *w, int revents)
{
- struct rspamd_http_map_cached_cbdata *cache_cbd = ud;
+ struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
+ w->data;
struct rspamd_map *map;
struct http_map_data *data;
- struct timeval tv;
map = cache_cbd->map;
data = cache_cbd->data;
+ ev_periodic_stop (loop, &cache_cbd->timeout);
+
if (cache_cbd->gen != cache_cbd->data->gen) {
/* We have another update, so this cache element is obviously expired */
/*
@@ -349,7 +351,6 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
cache_cbd->gen, cache_cbd->data->gen, map->name);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
- event_del (&cache_cbd->timeout);
g_free (cache_cbd);
}
else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
@@ -359,15 +360,13 @@ rspamd_map_cache_cb (gint fd, short what, gpointer ud)
*/
cache_cbd->last_checked = cache_cbd->data->last_checked;
msg_debug_map ("cached data is up to date for %s", map->name);
- double_to_tv (map->poll_timeout * 2, &tv);
- event_add (&cache_cbd->timeout, &tv);
+ ev_periodic_again (loop, &cache_cbd->timeout);
}
else {
data->cur_cache_cbd = NULL;
g_atomic_int_set (&data->cache->available, 0);
MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
msg_info_map ("cached data is now expired for %s", map->name);
- event_del (&cache_cbd->timeout);
g_free (cache_cbd);
}
}
@@ -456,7 +455,7 @@ http_map_finish (struct rspamd_http_connection *conn,
g_atomic_int_set (&data->cache->available, 0);
data->cur_cache_cbd = NULL;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
return 0;
@@ -622,6 +621,8 @@ read_data:
}
/* Check for expires */
+ double cached_timeout = map->poll_timeout * 2;
+
expires_hdr = rspamd_http_message_find_header (msg, "Expires");
if (expires_hdr) {
@@ -635,19 +636,12 @@ read_data:
hdate = MIN (map->next_check, hdate);
}
- double cached_timeout = map->next_check - msg->date +
- map->poll_timeout * 2;
+ cached_timeout = map->next_check - msg->date +
+ map->poll_timeout * 2;
map->next_check = hdate;
- double_to_tv (cached_timeout, &tv);
- }
- else {
- double_to_tv (map->poll_timeout * 2, &tv);
}
}
- else {
- double_to_tv (map->poll_timeout * 2, &tv);
- }
/* Check for etag */
etag_hdr = rspamd_http_message_find_header (msg, "ETag");
@@ -688,10 +682,9 @@ read_data:
cache_cbd->gen = cbd->data->gen;
MAP_RETAIN (cache_cbd->shm, "shmem_data");
- event_set (&cache_cbd->timeout, -1, EV_TIMEOUT, rspamd_map_cache_cb,
- cache_cbd);
- event_base_set (cbd->ev_base, &cache_cbd->timeout);
- event_add (&cache_cbd->timeout, &tv);
+ ev_periodic_set (&cache_cbd->timeout, 0.0, cached_timeout, NULL);
+ ev_periodic_start (cbd->event_loop, &cache_cbd->timeout);
+ cache_cbd->timeout.data = cache_cbd;
data->cur_cache_cbd = cache_cbd;
if (map->next_check) {
@@ -700,7 +693,7 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
- time (NULL) + map->poll_timeout);
+ ev_now (cbd->event_loop) + map->poll_timeout);
}
@@ -773,7 +766,7 @@ read_data:
cbd->periodic->cur_backend ++;
munmap (in, dlen);
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
else if (msg->code == 304 && (cbd->check && cbd->stage == map_load_file)) {
cbd->data->last_checked = msg->date;
@@ -819,13 +812,13 @@ read_data:
}
else {
rspamd_http_date_format (next_check_date, sizeof (next_check_date),
- time (NULL) + map->poll_timeout);
+ ev_now (cbd->event_loop) + map->poll_timeout);
}
msg_info_map ("data is not modified for server %s, next check at %s",
cbd->data->host, next_check_date);
cbd->periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
else {
msg_info_map ("cannot load map %s from %s: HTTP error %d",
@@ -838,7 +831,7 @@ read_data:
err:
cbd->periodic->errored = 1;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
MAP_RELEASE (cbd, "http_callback_data");
return 0;
@@ -951,6 +944,7 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
}
}
+ ev_stat_stat (map->event_loop, &data->st_ev);
len = st.st_size;
if (bk->is_signed) {
@@ -1045,9 +1039,6 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
}
- /* Also update at the read time */
- memcpy (&data->st, &st, sizeof (struct stat));
-
return TRUE;
}
@@ -1143,7 +1134,6 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
map = periodic->map;
msg_debug_map ("periodic dtor %p", periodic);
- event_del (&periodic->ev);
if (periodic->need_modify) {
/* We are done */
@@ -1162,6 +1152,16 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
g_free (periodic);
}
+/* Called on timer execution */
+static void
+rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;
+
+ ev_timer_stop (loop, w);
+ rspamd_map_process_periodic (cbd);
+}
+
static void
rspamd_map_schedule_periodic (struct rspamd_map *map,
gboolean locked, gboolean initial, gboolean errored)
@@ -1224,14 +1224,11 @@ rspamd_map_schedule_periodic (struct rspamd_map *map,
map->scheduled_check = TRUE;
REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);
- evtimer_set (&cbd->ev, rspamd_map_periodic_callback, cbd);
- event_base_set (map->ev_base, &cbd->ev);
-
+ ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
+ ev_timer_start (map->event_loop, &cbd->ev);
msg_debug_map ("schedule new periodic event %p in %.2f seconds",
cbd, jittered_sec);
- double_to_tv (jittered_sec, &map->tv);
- evtimer_add (&cbd->ev, &map->tv);
}
static void
@@ -1286,7 +1283,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
msg_err_map ("cannot resolve %s: %s", cbd->data->host,
rdns_strerror (reply->code));
cbd->periodic->errored = 1;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, cbd->periodic);
+ rspamd_map_process_periodic (cbd->periodic);
}
}
@@ -1567,7 +1564,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
periodic->need_modify = TRUE;
/* Reset the whole chain */
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
else {
if (map->active_http) {
@@ -1577,7 +1574,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else {
/* Switch to the next backend */
periodic->cur_backend++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
}
@@ -1592,7 +1589,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
/* Switch to the next backend */
periodic->cur_backend++;
data->last_modified = data->cache->last_modified;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
@@ -1601,7 +1598,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
else if (!map->active_http) {
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
@@ -1609,7 +1606,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map,
check:
cbd = g_malloc0 (sizeof (struct http_callback_data));
- cbd->ev_base = map->ev_base;
+ cbd->event_loop = map->event_loop;
cbd->map = map;
cbd->data = data;
cbd->check = check;
@@ -1618,7 +1615,6 @@ check:
cbd->bk = bk;
MAP_RETAIN (bk, "rspamd_map_backend");
cbd->stage = map_resolve_host2;
- double_to_tv (map->cfg->map_timeout, &cbd->tv);
REF_INIT_RETAIN (cbd, free_http_cbdata);
msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
@@ -1686,7 +1682,7 @@ rspamd_map_http_check_callback (gint fd, short what, void *ud)
}
static void
-rspamd_map_http_read_callback (gint fd, short what, void *ud)
+rspamd_map_http_read_callback (void *ud)
{
struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
@@ -1698,36 +1694,31 @@ rspamd_map_http_read_callback (gint fd, short what, void *ud)
}
static void
-rspamd_map_file_check_callback (gint fd, short what, void *ud)
+rspamd_map_file_check_callback (void *ud)
{
struct rspamd_map *map;
struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
- struct stat st;
map = periodic->map;
bk = g_ptr_array_index (map->backends, periodic->cur_backend);
data = bk->data.fd;
- if (stat (data->filename, &st) != -1 &&
- (st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
- /* File was modified since last check */
- msg_info_map ("old mtime is %t, new mtime is %t for map file %s",
- data->st.st_mtime, st.st_mtime, data->filename);
- memcpy (&data->st, &st, sizeof (struct stat));
+ if (!data->processed) {
+ /* File has never been read */
periodic->need_modify = TRUE;
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
@@ -1746,21 +1737,20 @@ rspamd_map_static_check_callback (gint fd, short what, void *ud)
periodic->need_modify = TRUE;
periodic->cur_backend = 0;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
return;
}
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_file_read_callback (gint fd, short what, void *ud)
+rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct file_map_data *data;
struct rspamd_map_backend *bk;
@@ -1774,17 +1764,19 @@ rspamd_map_file_read_callback (gint fd, short what, void *ud)
if (!read_map_file (map, data, bk, periodic)) {
periodic->errored = TRUE;
}
+ else {
+ data->processed = TRUE;
+ }
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_static_read_callback (gint fd, short what, void *ud)
+rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
{
struct rspamd_map *map;
- struct map_periodic_cbdata *periodic = ud;
struct static_map_data *data;
struct rspamd_map_backend *bk;
@@ -1801,14 +1793,13 @@ rspamd_map_static_read_callback (gint fd, short what, void *ud)
/* Switch to the next backend */
periodic->cur_backend ++;
- rspamd_map_periodic_callback (-1, EV_TIMEOUT, periodic);
+ rspamd_map_process_periodic (periodic);
}
static void
-rspamd_map_periodic_callback (gint fd, short what, void *ud)
+rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
{
struct rspamd_map_backend *bk;
- struct map_periodic_cbdata *cbd = ud;
struct rspamd_map *map;
map = cbd->map;
@@ -1904,7 +1895,7 @@ rspamd_map_watch (struct rspamd_config *cfg,
/* First of all do synced read of data */
while (cur) {
map = cur->data;
- map->ev_base = ev_base;
+ map->event_loop = ev_base;
map->r = resolver;
map->wrk = worker;
diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h
index 8b45881b6..71f05aee7 100644
--- a/src/libutil/map_private.h
+++ b/src/libutil/map_private.h
@@ -54,7 +54,8 @@ enum fetch_proto {
*/
struct file_map_data {
gchar *filename;
- struct stat st;
+ gboolean processed;
+ ev_stat st_ev;
};
@@ -130,7 +131,7 @@ struct rspamd_map {
map_fin_cb_t fin_callback;
map_dtor_t dtor;
void **user_data;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
struct rspamd_worker *wrk;
gchar *description;
gchar *name;
@@ -143,7 +144,7 @@ struct rspamd_map {
gsize nelts;
guint64 digest;
/* Should we check HTTP or just load cached data */
- struct timeval tv;
+ ev_tstamp timeout;
gdouble poll_timeout;
time_t next_check;
gboolean active_http;
@@ -164,7 +165,7 @@ enum rspamd_map_http_stage {
struct map_periodic_cbdata {
struct rspamd_map *map;
struct map_cb_data cbdata;
- struct event ev;
+ ev_timer ev;
gboolean need_modify;
gboolean errored;
gboolean locked;
@@ -183,7 +184,7 @@ struct rspamd_http_file_data {
};
struct http_callback_data {
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
struct rspamd_http_connection *conn;
rspamd_inet_addr_t *addr;
struct rspamd_map *map;
@@ -191,16 +192,15 @@ struct http_callback_data {
struct http_map_data *data;
struct map_periodic_cbdata *periodic;
struct rspamd_cryptobox_pubkey *pk;
- gboolean check;
struct rspamd_storage_shmem *shmem_data;
struct rspamd_storage_shmem *shmem_sig;
struct rspamd_storage_shmem *shmem_pubkey;
gsize data_len;
gsize sig_len;
gsize pubkey_len;
-
+ gboolean check;
enum rspamd_map_http_stage stage;
- struct timeval tv;
+ ev_tstamp timeout;
ref_entry_t ref;
};
More information about the Commits
mailing list