commit a59ceca: [Minor] Another try to fix races in redis stats

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Oct 19 09:00:06 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-10-19 09:56:12 +0100
URL: https://github.com/rspamd/rspamd/commit/a59ceca26167101ac0f5e048f8bb610a6b358c2c (HEAD -> master)

[Minor] Another try to fix races in redis stats
Issue: #3088

---
 src/libstat/backends/redis_backend.c | 60 ++++++++++++++++++++++++------------
 src/libstat/stat_config.c            |  3 +-
 2 files changed, 43 insertions(+), 20 deletions(-)

diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index fd31d287a..9118e3fc4 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -751,12 +751,15 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
 static void
 rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
 {
-	struct rspamd_redis_stat_cbdata *cbdata = priv;
+	struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+	struct rspamd_redis_stat_cbdata *cbdata;
 	redisReply *reply = r;
 	ucl_object_t *obj;
 	gulong num = 0;
 
-	if (cbdata->wanna_die) {
+	cbdata = redis_elt->cbdata;
+
+	if (cbdata == NULL || cbdata->wanna_die) {
 		return;
 	}
 
@@ -778,6 +781,7 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
 
 	if (cbdata->inflight == 0) {
 		rspamd_redis_async_cbdata_cleanup (cbdata);
+		redis_elt->cbdata = NULL;
 	}
 }
 
@@ -785,12 +789,15 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
 static void
 rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
 {
-	struct rspamd_redis_stat_cbdata *cbdata = priv;
+	struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+	struct rspamd_redis_stat_cbdata *cbdata;
 	redisReply *reply = r;
 	ucl_object_t *obj;
 	glong num = 0;
 
-	if (cbdata->wanna_die) {
+	cbdata = redis_elt->cbdata;
+
+	if (cbdata == NULL || cbdata->wanna_die) {
 		return;
 	}
 
@@ -829,6 +836,7 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
 
 	if (cbdata->inflight == 0) {
 		rspamd_redis_async_cbdata_cleanup (cbdata);
+		redis_elt->cbdata = NULL;
 	}
 }
 
@@ -836,13 +844,15 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
 static void
 rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 {
-	struct rspamd_redis_stat_cbdata *cbdata = priv;
+	struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+	struct rspamd_redis_stat_cbdata *cbdata;
 	redisReply *reply = r, *elt;
 	gchar **pk, *k;
 	guint i, processed = 0;
 
+	cbdata = redis_elt->cbdata;
 
-	if (cbdata->wanna_die) {
+	if (cbdata == NULL || cbdata->wanna_die) {
 		return;
 	}
 
@@ -879,7 +889,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 							}
 							redisAsyncCommand (cbdata->redis,
 									rspamd_redis_stat_learns,
-									cbdata,
+									redis_elt,
 									"HGET %s %s",
 									k, learned_key);
 							cbdata->inflight += 1;
@@ -887,12 +897,12 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 						else {
 							redisAsyncCommand (cbdata->redis,
 									rspamd_redis_stat_key,
-									cbdata,
+									redis_elt,
 									"HLEN %s",
 									k);
 							redisAsyncCommand (cbdata->redis,
 									rspamd_redis_stat_learns,
-									cbdata,
+									redis_elt,
 									"HGET %s %s",
 									k, learned_key);
 							cbdata->inflight += 2;
@@ -925,6 +935,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 
 		if (cbdata->inflight == 0) {
 			rspamd_redis_async_cbdata_cleanup (cbdata);
+			redis_elt->cbdata = NULL;
 		}
 	}
 	else {
@@ -937,6 +948,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 
 		rspamd_upstream_fail (cbdata->selected, FALSE);
 		rspamd_redis_async_cbdata_cleanup (cbdata);
+		redis_elt->cbdata = NULL;
 	}
 }
 
@@ -948,6 +960,8 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
 	struct rspamd_redis_stat_cbdata *cbdata;
 	rspamd_inet_addr_t *addr;
 	struct upstream_list *ups;
+	redisAsyncContext *redis_ctx;
+	struct upstream *selected;
 
 	g_assert (redis_elt != NULL);
 
@@ -956,6 +970,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
 	if (redis_elt->cbdata) {
 		/* We have some other process pending */
 		rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+		redis_elt->cbdata = NULL;
 	}
 
 	/* Disable further events unless needed */
@@ -967,29 +982,35 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
 		return;
 	}
 
-	cbdata = g_malloc0 (sizeof (*cbdata));
-
-	cbdata->selected = rspamd_upstream_get (ups,
+	selected = rspamd_upstream_get (ups,
 					RSPAMD_UPSTREAM_ROUND_ROBIN,
 					NULL,
 					0);
 
-	g_assert (cbdata->selected != NULL);
-	addr = rspamd_upstream_addr_next (cbdata->selected);
+	g_assert (selected != NULL);
+	addr = rspamd_upstream_addr_next (selected);
 	g_assert (addr != NULL);
 
 	if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
-		cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
+		redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
 	}
 	else {
-		cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+		redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr),
 				rspamd_inet_address_get_port (addr));
 	}
 
-	g_assert (cbdata->redis != NULL);
+	if (redis_ctx == NULL) {
+		msg_warn ("cannot connect to redis server %s: %s",
+				rspamd_inet_address_to_string_pretty (addr),
+				strerror (errno));
 
-	redisLibevAttach (redis_elt->event_loop, cbdata->redis);
+		return;
+	}
 
+	redisLibevAttach (redis_elt->event_loop, redis_ctx);
+	cbdata = g_malloc0 (sizeof (*cbdata));
+	cbdata->redis = redis_ctx;
+	cbdata->selected = selected;
 	cbdata->inflight = 1;
 	cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
 	cbdata->elt = redis_elt;
@@ -999,7 +1020,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
 	/* XXX: deal with timeouts maybe */
 	/* Get keys in redis that match our symbol */
 	rspamd_redis_maybe_auth (ctx, cbdata->redis);
-	redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
+	redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
 			"SMEMBERS %s_keys",
 			ctx->stcf->symbol);
 }
@@ -1010,6 +1031,7 @@ rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
 	struct rspamd_redis_stat_elt *redis_elt = elt->ud;
 
 	rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+	redis_elt->cbdata = NULL;
 }
 
 /* Called on connection termination */
diff --git a/src/libstat/stat_config.c b/src/libstat/stat_config.c
index bc4c28b5d..17d0fdcc7 100644
--- a/src/libstat/stat_config.c
+++ b/src/libstat/stat_config.c
@@ -559,7 +559,8 @@ rspamd_stat_ctx_register_async (rspamd_stat_async_handler handler,
 		 * fast as possible
 		 */
 		elt->timer_ev.data = elt;
-		ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, 0.0, 0.0);
+		ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer,
+				0.1, 0.0);
 		ev_timer_start (st_ctx->event_loop, &elt->timer_ev);
 	}
 	else {


More information about the Commits mailing list