commit b3c005c: [Rework] Kill old dragons in symcache processing stages

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Aug 19 11:42:03 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-08-19 12:39:32 +0100
URL: https://github.com/rspamd/rspamd/commit/b3c005cfcb5bcdd66ec9640afa620a30fa2e0718 (HEAD -> master)

[Rework] Kill old dragons in symcache processing stages

---
 src/libserver/rspamd_symcache.c | 187 +++++++++++++---------------------------
 src/libserver/task.c            |  55 ++++++------
 2 files changed, 85 insertions(+), 157 deletions(-)

diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c
index 71b8a4c6a..a1c3d8d28 100644
--- a/src/libserver/rspamd_symcache.c
+++ b/src/libserver/rspamd_symcache.c
@@ -209,19 +209,7 @@ struct delayed_cache_condition {
 	lua_State *L;
 };
 
-enum rspamd_cache_savepoint_stage {
-	RSPAMD_CACHE_PASS_INIT = 0,
-	RSPAMD_CACHE_PASS_PREFILTERS_EMPTY,
-	RSPAMD_CACHE_PASS_PREFILTERS,
-	RSPAMD_CACHE_PASS_FILTERS,
-	RSPAMD_CACHE_PASS_POSTFILTERS,
-	RSPAMD_CACHE_PASS_IDEMPOTENT,
-	RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
-	RSPAMD_CACHE_PASS_DONE,
-};
-
 struct cache_savepoint {
-	enum rspamd_cache_savepoint_stage pass;
 	guint version;
 	guint items_inflight;
 	gboolean profile;
@@ -1824,10 +1812,8 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task,
 		cache->last_profile = now;
 	}
 
-	checkpoint->pass = RSPAMD_CACHE_PASS_INIT;
 	task->checkpoint = checkpoint;
 
-
 	return checkpoint;
 }
 
@@ -1935,12 +1921,9 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 	struct rspamd_symcache_item *item = NULL;
 	struct rspamd_symcache_dynamic_item *dyn_item;
 	struct cache_savepoint *checkpoint;
-	GPtrArray *sel;
 	gint i;
-	gboolean all_done;
+	gboolean all_done = TRUE;
 	gint saved_priority;
-	enum rspamd_cache_savepoint_stage next;
-	gint next_task_stage;
 	guint start_events_pending;
 
 	g_assert (cache != NULL);
@@ -1953,44 +1936,17 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 		checkpoint = task->checkpoint;
 	}
 
-	if (stage == RSPAMD_TASK_STAGE_POST_FILTERS && checkpoint->pass <
-			RSPAMD_CACHE_PASS_POSTFILTERS) {
-		checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
-	}
-
-	if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT && checkpoint->pass <
-			RSPAMD_CACHE_PASS_IDEMPOTENT) {
-		checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
-	}
-
-	msg_debug_cache_task ("symbols processing stage at pass: %d, %d stage requested",
-			checkpoint->pass, stage);
+	msg_debug_cache_task ("symbols processing stage at pass: %d");
 	start_events_pending = rspamd_session_events_pending (task->s);
 
-	switch (checkpoint->pass) {
-	case RSPAMD_CACHE_PASS_INIT:
-	case RSPAMD_CACHE_PASS_PREFILTERS:
-	case RSPAMD_CACHE_PASS_PREFILTERS_EMPTY:
+	switch (stage) {
+	case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
 		/* Check for prefilters */
 		saved_priority = G_MININT;
 		all_done = TRUE;
 
-		if (checkpoint->pass != RSPAMD_CACHE_PASS_PREFILTERS) {
-			sel = cache->prefilters_empty;
-			next = RSPAMD_CACHE_PASS_PREFILTERS;
-			next_task_stage = RSPAMD_TASK_STAGE_PRE_FILTERS;
-			checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS_EMPTY;
-		}
-		else {
-			sel = cache->prefilters;
-			next = RSPAMD_CACHE_PASS_FILTERS;
-			checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS;
-			next_task_stage = RSPAMD_TASK_STAGE_FILTERS;
-		}
-
-
-		for (i = 0; i < (gint)sel->len; i ++) {
-			item = g_ptr_array_index (sel, i);
+		for (i = 0; i < (gint) cache->prefilters_empty->len; i++) {
+			item = g_ptr_array_index (cache->prefilters_empty, i);
 			dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
 
 			if (RSPAMD_TASK_IS_SKIPPED (task)) {
@@ -1998,19 +1954,19 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 			}
 
 			if (!CHECK_START_BIT (checkpoint, dyn_item) &&
-					!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+				!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
 				/* Check priorities */
 				if (saved_priority == G_MININT) {
 					saved_priority = item->priority;
 				}
 				else {
 					if (item->priority < saved_priority &&
-							rspamd_session_events_pending (task->s) > start_events_pending) {
+						rspamd_session_events_pending (task->s) > start_events_pending) {
 						/*
 						 * Delay further checks as we have higher
 						 * priority filters to be processed
 						 */
-						return TRUE;
+						return FALSE;
 					}
 				}
 
@@ -2020,25 +1976,50 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 			}
 		}
 
-		if (all_done || stage == next_task_stage) {
-			checkpoint->pass = next;
-		}
+		break;
+
+	case RSPAMD_TASK_STAGE_PRE_FILTERS:
+		/* Check for prefilters */
+		saved_priority = G_MININT;
+		all_done = TRUE;
+
+		for (i = 0; i < (gint) cache->prefilters->len; i++) {
+			item = g_ptr_array_index (cache->prefilters, i);
+			dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
+
+			if (RSPAMD_TASK_IS_SKIPPED (task)) {
+				return TRUE;
+			}
+
+			if (!CHECK_START_BIT (checkpoint, dyn_item) &&
+				!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+				/* Check priorities */
+				if (saved_priority == G_MININT) {
+					saved_priority = item->priority;
+				}
+				else {
+					if (item->priority < saved_priority &&
+						rspamd_session_events_pending (task->s) > start_events_pending) {
+						/*
+						 * Delay further checks as we have higher
+						 * priority filters to be processed
+						 */
+						return FALSE;
+					}
+				}
 
-		if (stage == next_task_stage) {
-			return rspamd_symcache_process_symbols (task, cache, stage);
+				rspamd_symcache_check_symbol (task, cache, item,
+						checkpoint);
+				all_done = FALSE;
+			}
 		}
 
 		break;
 
-	case RSPAMD_CACHE_PASS_FILTERS:
-		/*
-		 * On the first pass we check symbols that do not have dependencies
-		 * If we figure out symbol that has no dependencies satisfied, then
-		 * we just save it for another pass
-		 */
+	case RSPAMD_TASK_STAGE_FILTERS:
 		all_done = TRUE;
 
-		for (i = 0; i < (gint)checkpoint->version; i ++) {
+		for (i = 0; i < (gint) checkpoint->version; i++) {
 			if (RSPAMD_TASK_IS_SKIPPED (task)) {
 				return TRUE;
 			}
@@ -2057,7 +2038,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 						checkpoint, 0, FALSE)) {
 
 					msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
-							"resolved",
+										  "resolved",
 							item->id, item->symbol);
 
 					continue;
@@ -2079,23 +2060,14 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 			}
 		}
 
-		if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
-			checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
-		}
-
-		if (stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
-
-			return rspamd_symcache_process_symbols (task, cache, stage);
-		}
-
 		break;
 
-	case RSPAMD_CACHE_PASS_POSTFILTERS:
+	case RSPAMD_TASK_STAGE_POST_FILTERS:
 		/* Check for postfilters */
 		saved_priority = G_MININT;
 		all_done = TRUE;
 
-		for (i = 0; i < (gint)cache->postfilters->len; i ++) {
+		for (i = 0; i < (gint) cache->postfilters->len; i++) {
 			if (RSPAMD_TASK_IS_SKIPPED (task)) {
 				return TRUE;
 			}
@@ -2104,7 +2076,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 			dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
 
 			if (!CHECK_START_BIT (checkpoint, dyn_item) &&
-					!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+				!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
 				/* Check priorities */
 				all_done = FALSE;
 
@@ -2113,14 +2085,13 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 				}
 				else {
 					if (item->priority > saved_priority &&
-							rspamd_session_events_pending (task->s) > start_events_pending) {
+						rspamd_session_events_pending (task->s) > start_events_pending) {
 						/*
 						 * Delay further checks as we have higher
 						 * priority filters to be processed
 						 */
-						checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
 
-						return TRUE;
+						return FALSE;
 					}
 				}
 
@@ -2129,80 +2100,42 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
 			}
 		}
 
-		if (all_done) {
-			checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
-		}
-
-		if (checkpoint->items_inflight == 0 ||
-				stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
-			checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
-		}
-
-		if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
-			return rspamd_symcache_process_symbols (task, cache, stage);
-		}
-
 		break;
 
-	case RSPAMD_CACHE_PASS_IDEMPOTENT:
+	case RSPAMD_TASK_STAGE_IDEMPOTENT:
 		/* Check for postfilters */
 		saved_priority = G_MININT;
 
-		for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+		for (i = 0; i < (gint) cache->idempotent->len; i++) {
 			item = g_ptr_array_index (cache->idempotent, i);
 			dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
 
 			if (!CHECK_START_BIT (checkpoint, dyn_item) &&
-					!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+				!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
 				/* Check priorities */
 				if (saved_priority == G_MININT) {
 					saved_priority = item->priority;
 				}
 				else {
 					if (item->priority > saved_priority &&
-							rspamd_session_events_pending (task->s) > start_events_pending) {
+						rspamd_session_events_pending (task->s) > start_events_pending) {
 						/*
 						 * Delay further checks as we have higher
 						 * priority filters to be processed
 						 */
-						checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
-
-						return TRUE;
+						return FALSE;
 					}
 				}
 				rspamd_symcache_check_symbol (task, cache, item,
 						checkpoint);
 			}
 		}
-		checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
-		break;
-
-	case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT:
-		all_done = TRUE;
-
-		for (i = 0; i < (gint)cache->idempotent->len; i ++) {
-			item = g_ptr_array_index (cache->idempotent, i);
-			dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
-
-			if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
-				all_done = FALSE;
-				break;
-			}
-		}
-
-		if (all_done) {
-			checkpoint->pass = RSPAMD_CACHE_PASS_DONE;
-
-			return TRUE;
-		}
-		break;
-
-	case RSPAMD_CACHE_PASS_DONE:
-		return TRUE;
 		break;
+	default:
+		g_assert_not_reached ();
 	}
 
-	return FALSE;
+	return all_done;
 }
 
 struct counters_cbdata {
diff --git a/src/libserver/task.c b/src/libserver/task.c
index e9a63cbad..b8e178a7b 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -693,7 +693,7 @@ gboolean
 rspamd_task_process (struct rspamd_task *task, guint stages)
 {
 	gint st;
-	gboolean ret = TRUE;
+	gboolean ret = TRUE, all_done = TRUE;
 	GError *stat_error = NULL;
 
 	/* Avoid nested calls */
@@ -717,8 +717,10 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
 		break;
 
 	case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
-		rspamd_symcache_process_symbols (task, task->cfg->cache,
-				RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY);
+	case RSPAMD_TASK_STAGE_PRE_FILTERS:
+	case RSPAMD_TASK_STAGE_FILTERS:
+	case RSPAMD_TASK_STAGE_IDEMPOTENT:
+		all_done = rspamd_symcache_process_symbols (task, task->cfg->cache, st);
 		break;
 
 	case RSPAMD_TASK_STAGE_PROCESS_MESSAGE:
@@ -727,16 +729,6 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
 		}
 		break;
 
-	case RSPAMD_TASK_STAGE_PRE_FILTERS:
-		rspamd_symcache_process_symbols (task, task->cfg->cache,
-				RSPAMD_TASK_STAGE_PRE_FILTERS);
-		break;
-
-	case RSPAMD_TASK_STAGE_FILTERS:
-		rspamd_symcache_process_symbols (task, task->cfg->cache,
-				RSPAMD_TASK_STAGE_FILTERS);
-		break;
-
 	case RSPAMD_TASK_STAGE_CLASSIFIERS:
 	case RSPAMD_TASK_STAGE_CLASSIFIERS_PRE:
 	case RSPAMD_TASK_STAGE_CLASSIFIERS_POST:
@@ -754,10 +746,10 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
 		break;
 
 	case RSPAMD_TASK_STAGE_POST_FILTERS:
-		rspamd_symcache_process_symbols (task, task->cfg->cache,
-				RSPAMD_TASK_STAGE_POST_FILTERS);
+		all_done = rspamd_symcache_process_symbols (task, task->cfg->cache,
+				st);
 
-		if ((task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
+		if (all_done && (task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
 				!RSPAMD_TASK_IS_EMPTY (task) &&
 				!(task->flags & (RSPAMD_TASK_FLAG_LEARN_SPAM|RSPAMD_TASK_FLAG_LEARN_HAM))) {
 			rspamd_stat_check_autolearn (task);
@@ -811,10 +803,6 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
 		/* Second run of composites processing before idempotent filters */
 		rspamd_make_composites (task);
 		break;
-	case RSPAMD_TASK_STAGE_IDEMPOTENT:
-		rspamd_symcache_process_symbols (task, task->cfg->cache,
-				RSPAMD_TASK_STAGE_IDEMPOTENT);
-		break;
 
 	case RSPAMD_TASK_STAGE_DONE:
 		task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
@@ -843,17 +831,24 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
 		return ret;
 	}
 
-	if (rspamd_session_events_pending (task->s) != 0) {
-		/* We have events pending, so we consider this stage as incomplete */
-		msg_debug_task ("need more work on stage %d", st);
-	}
-	else {
-		/* Mark the current stage as done and go to the next stage */
-		msg_debug_task ("completed stage %d", st);
-		task->processed_stages |= st;
+	if (ret) {
+		if (rspamd_session_events_pending (task->s) != 0) {
+			/* We have events pending, so we consider this stage as incomplete */
+			msg_debug_task ("need more work on stage %d", st);
+		}
+		else {
+			if (all_done) {
+				/* Mark the current stage as done and go to the next stage */
+				msg_debug_task ("completed stage %d", st);
+				task->processed_stages |= st;
+			}
+			else {
+				msg_debug_task ("need more processing on stage %d", st);
+			}
 
-		/* Tail recursion */
-		return rspamd_task_process (task, stages);
+			/* Tail recursion */
+			return rspamd_task_process (task, stages);
+		}
 	}
 
 	return ret;


More information about the Commits mailing list