commit 8655154: [Fix] Statistics: Do not query Redis tokens when there are no learns

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Jul 9 14:35:05 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-07-09 15:32:48 +0100
URL: https://github.com/rspamd/rspamd/commit/865515431e2bd1622a7dfae406e6f190b7ab5e64 (HEAD -> master)

[Fix] Statistics: Do not query Redis tokens when there are no learns

---
 src/libstat/backends/redis_backend.c   | 217 +++++++++++++++++++++------------
 src/libstat/backends/sqlite3_backend.c |   6 +-
 src/libstat/stat_api.h                 |   2 +-
 3 files changed, 143 insertions(+), 82 deletions(-)

diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 43688cb7c..7263b3c16 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -72,6 +72,7 @@ struct redis_stat_runtime {
 	struct upstream *selected;
 	ev_timer timeout_event;
 	GArray *results;
+	GPtrArray *tokens;
 	struct rspamd_statfile_config *stcf;
 	gchar *redis_object_expanded;
 	redisAsyncContext *redis;
@@ -1023,6 +1024,11 @@ rspamd_redis_fin (gpointer data)
 		ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
 	}
 
+	if (rt->tokens) {
+		g_ptr_array_unref (rt->tokens);
+		rt->tokens = NULL;
+	}
+
 	if (rt->redis) {
 		redis = rt->redis;
 		rt->redis = NULL;
@@ -1038,6 +1044,12 @@ rspamd_redis_fin_learn (gpointer data)
 	redisAsyncContext *redis;
 
 	rt->has_event = FALSE;
+
+	if (rt->tokens) {
+		g_ptr_array_unref (rt->tokens);
+		rt->tokens = NULL;
+	}
+
 	/* Stop timeout */
 	ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
 
@@ -1070,66 +1082,20 @@ rspamd_redis_timeout (EV_P_ ev_timer *w, int revents)
 		redisAsyncFree (redis);
 	}
 
+	if (rt->tokens) {
+		g_ptr_array_unref (rt->tokens);
+		rt->tokens = NULL;
+	}
+
 	if (!rt->err) {
 		g_set_error (&rt->err, rspamd_redis_stat_quark (), ETIMEDOUT,
 				"error getting reply from redis server %s: timeout",
 				rspamd_upstream_name (rt->selected));
 	}
-}
-
-/* Called when we have connected to the redis server and got stats */
-static void
-rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
-{
-	struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
-	redisReply *reply = r;
-	struct rspamd_task *task;
-	glong val = 0;
-
-	task = rt->task;
-
-	if (c->err == 0) {
-		if (r != NULL) {
-			if (G_UNLIKELY (reply->type == REDIS_REPLY_INTEGER)) {
-				val = reply->integer;
-			}
-			else if (reply->type == REDIS_REPLY_STRING) {
-				rspamd_strtol (reply->str, reply->len, &val);
-			}
-			else {
-				if (reply->type != REDIS_REPLY_NIL) {
-					msg_err_task ("bad learned type for %s: %s, nil expected",
-						rt->stcf->symbol,
-						rspamd_redis_type_to_string (reply->type));
-				}
-
-				val = 0;
-			}
-
-			if (val < 0) {
-				msg_warn_task ("invalid number of learns for %s: %L",
-						rt->stcf->symbol, val);
-				val = 0;
-			}
-
-			rt->learned = val;
-			msg_debug_stat_redis ("connected to redis server, tokens learned for %s: %uL",
-					rt->redis_object_expanded, rt->learned);
-			rspamd_upstream_ok (rt->selected);
-		}
-	}
-	else {
-		msg_err_task ("error getting reply from redis server %s: %s",
-				rspamd_upstream_name (rt->selected), c->errstr);
-		rspamd_upstream_fail (rt->selected, FALSE);
-
-		if (!rt->err) {
-			g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
-					"error getting reply from redis server %s: %s",
-					rspamd_upstream_name (rt->selected), c->errstr);
-		}
+	if (rt->has_event) {
+		rt->has_event = FALSE;
+		rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
 	}
-
 }
 
 /* Called when we have received tokens values from redis */
@@ -1146,7 +1112,7 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
 
 	task = rt->task;
 
-	if (c->err == 0) {
+	if (c->err == 0 && rt->has_event) {
 		if (r != NULL) {
 			if (reply->type == REDIS_REPLY_ARRAY) {
 
@@ -1161,12 +1127,12 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
 						}
 						else if (elt->type == REDIS_REPLY_STRING) {
 							if (rt->stcf->clcf->flags &
-									RSPAMD_FLAG_CLASSIFIER_INTEGER) {
+								RSPAMD_FLAG_CLASSIFIER_INTEGER) {
 								rspamd_strtoul (elt->str, elt->len, &val);
 								tok->values[rt->id] = val;
 							}
 							else {
-								float_val = strtod (elt->str, NULL);
+								float_val = strtof (elt->str, NULL);
 								tok->values[rt->id] = float_val;
 							}
 
@@ -1188,7 +1154,7 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
 				}
 				else {
 					msg_err_task_check ("got invalid length of reply vector from redis: "
-							"%d, expected: %d",
+										"%d, expected: %d",
 							(gint)reply->elements,
 							(gint)task->tokens->len);
 				}
@@ -1223,6 +1189,110 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
 	}
 }
 
+/* Called when we have connected to the redis server and got stats */
+static void
+rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+	struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
+	redisReply *reply = r;
+	struct rspamd_task *task;
+	glong val = 0;
+	gboolean final = TRUE;
+
+	task = rt->task;
+
+	if (c->err == 0 && rt->has_event) {
+		if (r != NULL) {
+			if (G_UNLIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+				val = reply->integer;
+			}
+			else if (reply->type == REDIS_REPLY_STRING) {
+				rspamd_strtol (reply->str, reply->len, &val);
+			}
+			else {
+				if (reply->type != REDIS_REPLY_NIL) {
+					msg_err_task ("bad learned type for %s: %s, nil expected",
+						rt->stcf->symbol,
+						rspamd_redis_type_to_string (reply->type));
+				}
+
+				val = 0;
+			}
+
+			if (val < 0) {
+				msg_warn_task ("invalid number of learns for %s: %L",
+						rt->stcf->symbol, val);
+				val = 0;
+			}
+
+			rt->learned = val;
+			msg_debug_stat_redis ("connected to redis server, tokens learned for %s: %uL",
+					rt->redis_object_expanded, rt->learned);
+			rspamd_upstream_ok (rt->selected);
+
+			if (rt->learned >= rt->stcf->clcf->min_learns && rt->learned > 0) {
+				rspamd_fstring_t *query = rspamd_redis_tokens_to_query (
+						task,
+						rt,
+						rt->tokens,
+						rt->ctx->new_schema ? "HGET" : "HMGET",
+						rt->redis_object_expanded, FALSE, -1,
+						rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
+				g_assert (query != NULL);
+				rspamd_mempool_add_destructor (task->task_pool,
+						(rspamd_mempool_destruct_t)rspamd_fstring_free, query);
+
+				int ret = redisAsyncFormattedCommand (rt->redis,
+						rspamd_redis_processed, rt,
+						query->str, query->len);
+
+				if (ret != REDIS_OK) {
+					msg_err_task ("call to redis failed: %s", rt->redis->errstr);
+				}
+				else {
+					/* Further is handled by rspamd_redis_processed */
+					final = FALSE;
+					/* Restart timeout */
+					if (ev_is_active (&rt->timeout_event)) {
+						rt->timeout_event.repeat = rt->ctx->timeout;
+						ev_timer_again (task->event_loop, &rt->timeout_event);
+					}
+					else {
+						rt->timeout_event.data = rt;
+						ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
+								rt->ctx->timeout, 0.);
+						ev_timer_start (task->event_loop, &rt->timeout_event);
+					}
+				}
+			}
+			else {
+				if (!rt->err) {
+					g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
+							"skip obtaining bayes tokens for %s: "
+							"not enough learns %d; %d required",
+							rt->stcf->symbol, (int)rt->learned,
+							rt->stcf->clcf->min_learns);
+				}
+			}
+		}
+	}
+	else if (rt->has_event) {
+		msg_err_task ("error getting reply from redis server %s: %s",
+				rspamd_upstream_name (rt->selected), c->errstr);
+		rspamd_upstream_fail (rt->selected, FALSE);
+
+		if (!rt->err) {
+			g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
+					"error getting reply from redis server %s: %s",
+					rspamd_upstream_name (rt->selected), c->errstr);
+		}
+	}
+
+	if (final && rt->has_event) {
+		rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+	}
+}
+
 /* Called when we have set tokens during learning */
 static void
 rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
@@ -1559,8 +1629,6 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
 		gint id, gpointer p)
 {
 	struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
-	rspamd_fstring_t *query;
-	gint ret;
 	const gchar *learned_key = "learns";
 
 	if (rspamd_session_blocked (task->s)) {
@@ -1587,17 +1655,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
 
 		rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
 		rt->has_event = TRUE;
-
-		query = rspamd_redis_tokens_to_query (task, rt, tokens,
-				rt->ctx->new_schema ? "HGET" : "HMGET",
-				rt->redis_object_expanded, FALSE, -1,
-				rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
-		g_assert (query != NULL);
-		rspamd_mempool_add_destructor (task->task_pool,
-				(rspamd_mempool_destruct_t)rspamd_fstring_free, query);
-
-		ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt,
-				query->str, query->len);
+		rt->tokens = g_ptr_array_ref (tokens);
 
 		if (ev_is_active (&rt->timeout_event)) {
 			rt->timeout_event.repeat = rt->ctx->timeout;
@@ -1609,13 +1667,6 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
 					rt->ctx->timeout, 0.);
 			ev_timer_start (task->event_loop, &rt->timeout_event);
 		}
-
-		if (ret == REDIS_OK) {
-			return TRUE;
-		}
-		else {
-			msg_err_task ("call to redis failed: %s", rt->redis->errstr);
-		}
 	}
 
 	return FALSE;
@@ -1638,7 +1689,17 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
 		redisAsyncFree (redis);
 	}
 
+	if (rt->tokens) {
+		g_ptr_array_unref (rt->tokens);
+		rt->tokens = NULL;
+	}
+
 	if (rt->err) {
+		msg_info_task ("cannot retreive stat tokens from Redis: %e", rt->err);
+		g_error_free (rt->err);
+		rt->err = NULL;
+
+
 		return FALSE;
 	}
 
diff --git a/src/libstat/backends/sqlite3_backend.c b/src/libstat/backends/sqlite3_backend.c
index 2a512db8b..a3d6ac9db 100644
--- a/src/libstat/backends/sqlite3_backend.c
+++ b/src/libstat/backends/sqlite3_backend.c
@@ -700,7 +700,7 @@ rspamd_sqlite3_process_tokens (struct rspamd_task *task,
 
 		if (bk == NULL) {
 			/* Statfile is does not exist, so all values are zero */
-			tok->values[id] = 0.0;
+			tok->values[id] = 0.0f;
 			continue;
 		}
 
@@ -735,7 +735,7 @@ rspamd_sqlite3_process_tokens (struct rspamd_task *task,
 				tok->values[id] = iv;
 			}
 			else {
-				tok->values[id] = 0.0;
+				tok->values[id] = 0.0f;
 			}
 		}
 		else {
@@ -745,7 +745,7 @@ rspamd_sqlite3_process_tokens (struct rspamd_task *task,
 				tok->values[id] = iv;
 			}
 			else {
-				tok->values[id] = 0.0;
+				tok->values[id] = 0.0f;
 			}
 		}
 
diff --git a/src/libstat/stat_api.h b/src/libstat/stat_api.h
index f91c8b79a..cc34c7a0b 100644
--- a/src/libstat/stat_api.h
+++ b/src/libstat/stat_api.h
@@ -59,7 +59,7 @@ typedef struct token_node_s {
 	guint flags;
 	rspamd_stat_token_t *t1;
 	rspamd_stat_token_t *t2;
-	gdouble values[];
+	float values[];
 } rspamd_token_t;
 
 struct rspamd_stat_ctx;


More information about the Commits mailing list