commit f514841: [Fix] Stat_redis_backend: Fix memory leak and simplify learn path

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Mar 9 17:07:06 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-03-09 16:59:54 +0000
URL: https://github.com/rspamd/rspamd/commit/f514841c75aae5c8dfc3a8e4fba409a70424517f (HEAD -> master)

[Fix] Stat_redis_backend: Fix memory leak and simplify learn path

---
 contrib/hiredis/async.c              |   5 ++
 src/libstat/backends/redis_backend.c | 142 +++++++----------------------------
 src/libstat/stat_process.c           |   2 +
 3 files changed, 35 insertions(+), 114 deletions(-)

diff --git a/contrib/hiredis/async.c b/contrib/hiredis/async.c
index a508036e6..851676263 100644
--- a/contrib/hiredis/async.c
+++ b/contrib/hiredis/async.c
@@ -483,6 +483,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
              * abort with an error, but simply ignore it because the client
              * doesn't know what the server will spit out over the wire. */
             c->reader->fn->freeObject(reply);
+			/* Proceed with free'ing when redisAsyncFree() was called. */
+			if (c->flags & REDIS_FREEING) {
+				__redisAsyncFree(ac);
+				return;
+			}
         }
     }
 
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 5774b46a6..84bc0ba77 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -637,7 +637,6 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task,
 	guint i, blen, klen;
 	rspamd_fstring_t *out;
 
-	out = rspamd_fstring_sized_new (1024);
 	sig = rspamd_mempool_get_variable (task->task_pool,
 			RSPAMD_MEMPOOL_STAT_SIGNATURE);
 
@@ -646,11 +645,10 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task,
 		return;
 	}
 
+	out = rspamd_fstring_sized_new (1024);
 	klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
 			prefix, sig, rt->stcf->is_spam ? "S" : "H");
 
-	out->len = 0;
-
 	/* Cleanup key */
 	rspamd_printf_fstring (&out, ""
 					"*2\r\n"
@@ -1069,7 +1067,12 @@ rspamd_redis_fin (gpointer data)
 	struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
 	redisAsyncContext *redis;
 
-	rt->has_event = FALSE;
+	if (rt->has_event) {
+		/* Should not happen ! */
+		msg_err ("FIXME: this code path should not be reached!");
+		rspamd_session_remove_event (rt->task->s, NULL, rt);
+		rt->has_event = FALSE;
+	}
 	/* Stop timeout */
 	if (ev_can_stop (&rt->timeout_event)) {
 		ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
@@ -1086,29 +1089,9 @@ rspamd_redis_fin (gpointer data)
 		/* This calls for all callbacks pending */
 		redisAsyncFree (redis);
 	}
-}
-
-static void
-rspamd_redis_fin_learn (gpointer data)
-{
-	struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
-	redisAsyncContext *redis;
-
-	rt->has_event = FALSE;
 
-	if (rt->tokens) {
-		g_ptr_array_unref (rt->tokens);
-		rt->tokens = NULL;
-	}
-
-	/* Stop timeout */
-	ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
-
-	if (rt->redis) {
-		redis = rt->redis;
-		rt->redis = NULL;
-		/* This calls for all callbacks pending */
-		redisAsyncFree (redis);
+	if (rt->err) {
+		g_error_free (rt->err);
 	}
 }
 
@@ -1145,7 +1128,7 @@ rspamd_redis_timeout (EV_P_ ev_timer *w, int revents)
 	}
 	if (rt->has_event) {
 		rt->has_event = FALSE;
-		rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+		rspamd_session_remove_event (task->s, NULL, rt);
 	}
 }
 
@@ -1236,7 +1219,8 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
 	}
 
 	if (rt->has_event) {
-		rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+		rt->has_event = FALSE;
+		rspamd_session_remove_event (task->s, NULL, rt);
 	}
 }
 
@@ -1363,7 +1347,8 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
 	}
 
 	if (final && rt->has_event) {
-		rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+		rt->has_event = FALSE;
+		rspamd_session_remove_event (task->s, NULL, rt);
 	}
 }
 
@@ -1395,7 +1380,8 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
 	}
 
 	if (rt->has_event) {
-		rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
+		rt->has_event = FALSE;
+		rspamd_session_remove_event (task->s, NULL, rt);
 	}
 }
 static void
@@ -1647,15 +1633,14 @@ rspamd_redis_runtime (struct rspamd_task *task,
 
 	if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
 			&object_expanded) == 0) {
-		msg_err_task ("expansion for learning failed for symbol %s "
+		msg_err_task ("expansion for %s failed for symbol %s "
 				 "(maybe learning per user classifier with no user or recipient)",
+				 learn ? "learning" : "classifying",
 				 stcf->symbol);
 		return NULL;
 	}
 
 	rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
-	rspamd_mempool_add_destructor (task->task_pool,
-			rspamd_gerror_free_maybe, &rt->err);
 	rt->selected = up;
 	rt->task = task;
 	rt->ctx = ctx;
@@ -1692,6 +1677,8 @@ rspamd_redis_runtime (struct rspamd_task *task,
 	redisLibevAttach (task->event_loop, rt->redis);
 	rspamd_redis_maybe_auth (ctx, rt->redis);
 
+	rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt);
+
 	return rt;
 }
 
@@ -1738,7 +1725,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
 	if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
 			rt->redis_object_expanded, learned_key) == REDIS_OK) {
 
-		rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
+		rspamd_session_add_event (task->s, NULL, rt, M);
 		rt->has_event = TRUE;
 		rt->tokens = g_ptr_array_ref (tokens);
 
@@ -1762,32 +1749,18 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
 		gpointer ctx)
 {
 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
-	redisAsyncContext *redis;
-
-	if (ev_can_stop (&rt->timeout_event)) {
-		ev_timer_stop (task->event_loop, &rt->timeout_event);
-	}
-
-	if (rt->redis) {
-		redis = rt->redis;
-		rt->redis = NULL;
-		redisAsyncFree (redis);
-	}
-
-	if (rt->tokens) {
-		g_ptr_array_unref (rt->tokens);
-		rt->tokens = NULL;
-	}
 
 	if (rt->err) {
 		msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err);
 		g_error_free (rt->err);
 		rt->err = NULL;
-
+		rspamd_redis_fin (rt);
 
 		return FALSE;
 	}
 
+	rspamd_redis_fin (rt);
+
 	return TRUE;
 }
 
@@ -1796,9 +1769,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
 		gint id, gpointer p)
 {
 	struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
-	struct upstream *up;
-	struct upstream_list *ups;
-	rspamd_inet_addr_t *addr;
 	rspamd_fstring_t *query;
 	const gchar *redis_cmd;
 	rspamd_token_t *tok;
@@ -1810,23 +1780,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
 		return FALSE;
 	}
 
-	ups = rspamd_redis_get_servers (rt->ctx, "write_servers");
-
-	if (!ups) {
-		return FALSE;
-	}
-	up = rspamd_upstream_get (ups,
-			RSPAMD_UPSTREAM_MASTER_SLAVE,
-			NULL,
-			0);
-
-	if (up == NULL) {
-		msg_err_task ("no upstreams reachable");
-		return FALSE;
-	}
-
-	rt->selected = up;
-
 	if (rt->ctx->new_schema) {
 		if (rt->ctx->stcf->is_spam) {
 			learned_key = "learns_spam";
@@ -1836,37 +1789,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
 		}
 	}
 
-	addr = rspamd_upstream_addr_next (up);
-	g_assert (addr != NULL);
-
-	if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
-		rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
-	}
-	else {
-		rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
-				rspamd_inet_address_get_port (addr));
-	}
-
-	if (rt->redis == NULL) {
-		msg_warn_task ("cannot connect to redis server %s: %s",
-				rspamd_inet_address_to_string_pretty (addr),
-				strerror (errno));
-
-		return FALSE;
-	}
-	else if (rt->redis->err != REDIS_OK) {
-		msg_warn_task ("cannot connect to redis server %s: %s",
-				rspamd_inet_address_to_string_pretty (addr),
-				rt->redis->errstr);
-		redisAsyncFree (rt->redis);
-		rt->redis = NULL;
-
-		return FALSE;
-	}
-
-	redisLibevAttach (task->event_loop, rt->redis);
-	rspamd_redis_maybe_auth (rt->ctx, rt->redis);
-
 	/*
 	 * Add the current key to the set of learned keys
 	 */
@@ -1958,7 +1880,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
 					"RSIG");
 		}
 
-		rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M);
+		rspamd_session_add_event (task->s, NULL, rt, M);
 		rt->has_event = TRUE;
 
 		/* Set timeout */
@@ -1988,25 +1910,17 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
 		gpointer ctx, GError **err)
 {
 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
-	redisAsyncContext *redis;
-
-	if (ev_can_stop (&rt->timeout_event)) {
-		ev_timer_stop (task->event_loop, &rt->timeout_event);
-	}
-
-	if (rt->redis) {
-		redis = rt->redis;
-		rt->redis = NULL;
-		redisAsyncFree (redis);
-	}
 
 	if (rt->err) {
 		g_propagate_error (err, rt->err);
 		rt->err = NULL;
+		rspamd_redis_fin (rt);
 
 		return FALSE;
 	}
 
+	rspamd_redis_fin (rt);
+
 	return TRUE;
 }
 
diff --git a/src/libstat/stat_process.c b/src/libstat/stat_process.c
index 6bd17f38f..fc42cd875 100644
--- a/src/libstat/stat_process.c
+++ b/src/libstat/stat_process.c
@@ -818,6 +818,8 @@ rspamd_stat_learn (struct rspamd_task *task,
 
 	if (stage == RSPAMD_TASK_STAGE_LEARN_PRE) {
 		/* Process classifiers */
+		rspamd_stat_preprocess (st_ctx, task, TRUE);
+
 		if (!rspamd_stat_cache_check (st_ctx, task, classifier, spam, err)) {
 			return RSPAMD_STAT_PROCESS_ERROR;
 		}


More information about the Commits mailing list