commit b3c005c: [Rework] Kill old dragons in symcache processing stages
Vsevolod Stakhov
vsevolod at highsecure.ru
Mon Aug 19 11:42:03 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-08-19 12:39:32 +0100
URL: https://github.com/rspamd/rspamd/commit/b3c005cfcb5bcdd66ec9640afa620a30fa2e0718 (HEAD -> master)
[Rework] Kill old dragons in symcache processing stages
---
src/libserver/rspamd_symcache.c | 187 +++++++++++++---------------------------
src/libserver/task.c | 55 ++++++------
2 files changed, 85 insertions(+), 157 deletions(-)
diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c
index 71b8a4c6a..a1c3d8d28 100644
--- a/src/libserver/rspamd_symcache.c
+++ b/src/libserver/rspamd_symcache.c
@@ -209,19 +209,7 @@ struct delayed_cache_condition {
lua_State *L;
};
-enum rspamd_cache_savepoint_stage {
- RSPAMD_CACHE_PASS_INIT = 0,
- RSPAMD_CACHE_PASS_PREFILTERS_EMPTY,
- RSPAMD_CACHE_PASS_PREFILTERS,
- RSPAMD_CACHE_PASS_FILTERS,
- RSPAMD_CACHE_PASS_POSTFILTERS,
- RSPAMD_CACHE_PASS_IDEMPOTENT,
- RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
- RSPAMD_CACHE_PASS_DONE,
-};
-
struct cache_savepoint {
- enum rspamd_cache_savepoint_stage pass;
guint version;
guint items_inflight;
gboolean profile;
@@ -1824,10 +1812,8 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task,
cache->last_profile = now;
}
- checkpoint->pass = RSPAMD_CACHE_PASS_INIT;
task->checkpoint = checkpoint;
-
return checkpoint;
}
@@ -1935,12 +1921,9 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
struct rspamd_symcache_item *item = NULL;
struct rspamd_symcache_dynamic_item *dyn_item;
struct cache_savepoint *checkpoint;
- GPtrArray *sel;
gint i;
- gboolean all_done;
+ gboolean all_done = TRUE;
gint saved_priority;
- enum rspamd_cache_savepoint_stage next;
- gint next_task_stage;
guint start_events_pending;
g_assert (cache != NULL);
@@ -1953,44 +1936,17 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
checkpoint = task->checkpoint;
}
- if (stage == RSPAMD_TASK_STAGE_POST_FILTERS && checkpoint->pass <
- RSPAMD_CACHE_PASS_POSTFILTERS) {
- checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
- }
-
- if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT && checkpoint->pass <
- RSPAMD_CACHE_PASS_IDEMPOTENT) {
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
- }
-
- msg_debug_cache_task ("symbols processing stage at pass: %d, %d stage requested",
- checkpoint->pass, stage);
+ msg_debug_cache_task ("symbols processing stage at pass: %d");
start_events_pending = rspamd_session_events_pending (task->s);
- switch (checkpoint->pass) {
- case RSPAMD_CACHE_PASS_INIT:
- case RSPAMD_CACHE_PASS_PREFILTERS:
- case RSPAMD_CACHE_PASS_PREFILTERS_EMPTY:
+ switch (stage) {
+ case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
/* Check for prefilters */
saved_priority = G_MININT;
all_done = TRUE;
- if (checkpoint->pass != RSPAMD_CACHE_PASS_PREFILTERS) {
- sel = cache->prefilters_empty;
- next = RSPAMD_CACHE_PASS_PREFILTERS;
- next_task_stage = RSPAMD_TASK_STAGE_PRE_FILTERS;
- checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS_EMPTY;
- }
- else {
- sel = cache->prefilters;
- next = RSPAMD_CACHE_PASS_FILTERS;
- checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS;
- next_task_stage = RSPAMD_TASK_STAGE_FILTERS;
- }
-
-
- for (i = 0; i < (gint)sel->len; i ++) {
- item = g_ptr_array_index (sel, i);
+ for (i = 0; i < (gint) cache->prefilters_empty->len; i++) {
+ item = g_ptr_array_index (cache->prefilters_empty, i);
dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
if (RSPAMD_TASK_IS_SKIPPED (task)) {
@@ -1998,19 +1954,19 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
}
if (!CHECK_START_BIT (checkpoint, dyn_item) &&
- !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
/* Check priorities */
if (saved_priority == G_MININT) {
saved_priority = item->priority;
}
else {
if (item->priority < saved_priority &&
- rspamd_session_events_pending (task->s) > start_events_pending) {
+ rspamd_session_events_pending (task->s) > start_events_pending) {
/*
* Delay further checks as we have higher
* priority filters to be processed
*/
- return TRUE;
+ return FALSE;
}
}
@@ -2020,25 +1976,50 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
}
}
- if (all_done || stage == next_task_stage) {
- checkpoint->pass = next;
- }
+ break;
+
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ /* Check for prefilters */
+ saved_priority = G_MININT;
+ all_done = TRUE;
+
+ for (i = 0; i < (gint) cache->prefilters->len; i++) {
+ item = g_ptr_array_index (cache->prefilters, i);
+ dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
+
+ if (RSPAMD_TASK_IS_SKIPPED (task)) {
+ return TRUE;
+ }
+
+ if (!CHECK_START_BIT (checkpoint, dyn_item) &&
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ /* Check priorities */
+ if (saved_priority == G_MININT) {
+ saved_priority = item->priority;
+ }
+ else {
+ if (item->priority < saved_priority &&
+ rspamd_session_events_pending (task->s) > start_events_pending) {
+ /*
+ * Delay further checks as we have higher
+ * priority filters to be processed
+ */
+ return FALSE;
+ }
+ }
- if (stage == next_task_stage) {
- return rspamd_symcache_process_symbols (task, cache, stage);
+ rspamd_symcache_check_symbol (task, cache, item,
+ checkpoint);
+ all_done = FALSE;
+ }
}
break;
- case RSPAMD_CACHE_PASS_FILTERS:
- /*
- * On the first pass we check symbols that do not have dependencies
- * If we figure out symbol that has no dependencies satisfied, then
- * we just save it for another pass
- */
+ case RSPAMD_TASK_STAGE_FILTERS:
all_done = TRUE;
- for (i = 0; i < (gint)checkpoint->version; i ++) {
+ for (i = 0; i < (gint) checkpoint->version; i++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
return TRUE;
}
@@ -2057,7 +2038,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
checkpoint, 0, FALSE)) {
msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
- "resolved",
+ "resolved",
item->id, item->symbol);
continue;
@@ -2079,23 +2060,14 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
}
}
- if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
- checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
- }
-
- if (stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
-
- return rspamd_symcache_process_symbols (task, cache, stage);
- }
-
break;
- case RSPAMD_CACHE_PASS_POSTFILTERS:
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
/* Check for postfilters */
saved_priority = G_MININT;
all_done = TRUE;
- for (i = 0; i < (gint)cache->postfilters->len; i ++) {
+ for (i = 0; i < (gint) cache->postfilters->len; i++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
return TRUE;
}
@@ -2104,7 +2076,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
if (!CHECK_START_BIT (checkpoint, dyn_item) &&
- !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
/* Check priorities */
all_done = FALSE;
@@ -2113,14 +2085,13 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
}
else {
if (item->priority > saved_priority &&
- rspamd_session_events_pending (task->s) > start_events_pending) {
+ rspamd_session_events_pending (task->s) > start_events_pending) {
/*
* Delay further checks as we have higher
* priority filters to be processed
*/
- checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
- return TRUE;
+ return FALSE;
}
}
@@ -2129,80 +2100,42 @@ rspamd_symcache_process_symbols (struct rspamd_task *task,
}
}
- if (all_done) {
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
- }
-
- if (checkpoint->items_inflight == 0 ||
- stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
- }
-
- if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
- return rspamd_symcache_process_symbols (task, cache, stage);
- }
-
break;
- case RSPAMD_CACHE_PASS_IDEMPOTENT:
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
/* Check for postfilters */
saved_priority = G_MININT;
- for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+ for (i = 0; i < (gint) cache->idempotent->len; i++) {
item = g_ptr_array_index (cache->idempotent, i);
dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
if (!CHECK_START_BIT (checkpoint, dyn_item) &&
- !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
/* Check priorities */
if (saved_priority == G_MININT) {
saved_priority = item->priority;
}
else {
if (item->priority > saved_priority &&
- rspamd_session_events_pending (task->s) > start_events_pending) {
+ rspamd_session_events_pending (task->s) > start_events_pending) {
/*
* Delay further checks as we have higher
* priority filters to be processed
*/
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
-
- return TRUE;
+ return FALSE;
}
}
rspamd_symcache_check_symbol (task, cache, item,
checkpoint);
}
}
- checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
- break;
-
- case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT:
- all_done = TRUE;
-
- for (i = 0; i < (gint)cache->idempotent->len; i ++) {
- item = g_ptr_array_index (cache->idempotent, i);
- dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
-
- if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
- all_done = FALSE;
- break;
- }
- }
-
- if (all_done) {
- checkpoint->pass = RSPAMD_CACHE_PASS_DONE;
-
- return TRUE;
- }
- break;
-
- case RSPAMD_CACHE_PASS_DONE:
- return TRUE;
break;
+ default:
+ g_assert_not_reached ();
}
- return FALSE;
+ return all_done;
}
struct counters_cbdata {
diff --git a/src/libserver/task.c b/src/libserver/task.c
index e9a63cbad..b8e178a7b 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -693,7 +693,7 @@ gboolean
rspamd_task_process (struct rspamd_task *task, guint stages)
{
gint st;
- gboolean ret = TRUE;
+ gboolean ret = TRUE, all_done = TRUE;
GError *stat_error = NULL;
/* Avoid nested calls */
@@ -717,8 +717,10 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
break;
case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY);
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ case RSPAMD_TASK_STAGE_FILTERS:
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ all_done = rspamd_symcache_process_symbols (task, task->cfg->cache, st);
break;
case RSPAMD_TASK_STAGE_PROCESS_MESSAGE:
@@ -727,16 +729,6 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
}
break;
- case RSPAMD_TASK_STAGE_PRE_FILTERS:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_PRE_FILTERS);
- break;
-
- case RSPAMD_TASK_STAGE_FILTERS:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_FILTERS);
- break;
-
case RSPAMD_TASK_STAGE_CLASSIFIERS:
case RSPAMD_TASK_STAGE_CLASSIFIERS_PRE:
case RSPAMD_TASK_STAGE_CLASSIFIERS_POST:
@@ -754,10 +746,10 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
break;
case RSPAMD_TASK_STAGE_POST_FILTERS:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_POST_FILTERS);
+ all_done = rspamd_symcache_process_symbols (task, task->cfg->cache,
+ st);
- if ((task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
+ if (all_done && (task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
!RSPAMD_TASK_IS_EMPTY (task) &&
!(task->flags & (RSPAMD_TASK_FLAG_LEARN_SPAM|RSPAMD_TASK_FLAG_LEARN_HAM))) {
rspamd_stat_check_autolearn (task);
@@ -811,10 +803,6 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
/* Second run of composites processing before idempotent filters */
rspamd_make_composites (task);
break;
- case RSPAMD_TASK_STAGE_IDEMPOTENT:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_IDEMPOTENT);
- break;
case RSPAMD_TASK_STAGE_DONE:
task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
@@ -843,17 +831,24 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
return ret;
}
- if (rspamd_session_events_pending (task->s) != 0) {
- /* We have events pending, so we consider this stage as incomplete */
- msg_debug_task ("need more work on stage %d", st);
- }
- else {
- /* Mark the current stage as done and go to the next stage */
- msg_debug_task ("completed stage %d", st);
- task->processed_stages |= st;
+ if (ret) {
+ if (rspamd_session_events_pending (task->s) != 0) {
+ /* We have events pending, so we consider this stage as incomplete */
+ msg_debug_task ("need more work on stage %d", st);
+ }
+ else {
+ if (all_done) {
+ /* Mark the current stage as done and go to the next stage */
+ msg_debug_task ("completed stage %d", st);
+ task->processed_stages |= st;
+ }
+ else {
+ msg_debug_task ("need more processing on stage %d", st);
+ }
- /* Tail recursion */
- return rspamd_task_process (task, stages);
+ /* Tail recursion */
+ return rspamd_task_process (task, stages);
+ }
}
return ret;
More information about the Commits
mailing list