commit f827a09: [Project] Add logic to break execution when processing symbols

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Dec 24 17:07:09 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-12-24 16:26:12 +0000
URL: https://github.com/rspamd/rspamd/commit/f827a09afb7785c97010e7116dc63b61efde6a25

[Project] Add logic to break execution when processing symbols

---
 src/controller.c                |   1 +
 src/libserver/rspamd_symcache.c | 105 ++++++++++++++++++++++++++++++++++++++--
 src/libserver/rspamd_symcache.h |   5 ++
 3 files changed, 107 insertions(+), 4 deletions(-)

diff --git a/src/controller.c b/src/controller.c
index d36431702..adbc2b848 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -2126,6 +2126,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
 		ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
 				ctx->task_timeout, ctx->task_timeout);
 		ev_timer_start (task->event_loop, &task->timeout_ev);
+		ev_set_priority (&task->timeout_ev, EV_MAXPRI);
 	}
 
 end:
diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c
index f232a6a24..31bba1386 100644
--- a/src/libserver/rspamd_symcache.c
+++ b/src/libserver/rspamd_symcache.c
@@ -204,6 +204,7 @@ struct cache_savepoint {
 	guint version;
 	guint items_inflight;
 	gboolean profile;
+	gboolean has_slow;
 	gdouble profile_start;
 
 	struct rspamd_scan_result *rs;
@@ -1761,7 +1762,8 @@ rspamd_symcache_check_symbol (struct rspamd_task *task,
 		msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
 
 		if (checkpoint->profile) {
-			dyn_item->start_msec = (rspamd_get_virtual_ticks () -
+			ev_now_update_if_cheap (task->event_loop);
+			dyn_item->start_msec = (ev_now (task->event_loop) -
 					checkpoint->profile_start) * 1e3;
 		}
 
@@ -1914,14 +1916,15 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task,
 			rspamd_symcache_order_unref, checkpoint->order);
 
 	/* Calculate profile probability */
+	ev_now_update_if_cheap (task->event_loop);
 	ev_tstamp now = ev_now (task->event_loop);
+	checkpoint->profile_start = now;
 
 	if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) ||
 			(task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
 			(rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) {
 		msg_debug_cache_task ("enable profiling of symbols for task");
 		checkpoint->profile = TRUE;
-		checkpoint->profile_start = rspamd_get_virtual_ticks ();
 		cache->last_profile = now;
 	}
 
@@ -2029,7 +2032,8 @@ rspamd_symcache_process_settings (struct rspamd_task *task,
 
 gboolean
 rspamd_symcache_process_symbols (struct rspamd_task *task,
-								 struct rspamd_symcache *cache, gint stage)
+								 struct rspamd_symcache *cache,
+								 gint stage)
 {
 	struct rspamd_symcache_item *item = NULL;
 	struct rspamd_symcache_dynamic_item *dyn_item;
@@ -2986,6 +2990,63 @@ rspamd_symcache_set_cur_item (struct rspamd_task *task,
 	return ex;
 }
 
+struct rspamd_symcache_delayed_cbdata {
+	struct rspamd_symcache_item *item;
+	struct rspamd_task *task;
+	struct ev_timer tm;
+};
+
+static void
+rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what)
+{
+	struct rspamd_symcache_delayed_cbdata *cbd =
+			(struct rspamd_symcache_delayed_cbdata *)w->data;
+	struct rspamd_symcache_item *item;
+	struct rspamd_task *task;
+	struct cache_dependency *rdep;
+	struct cache_savepoint *checkpoint;
+	struct rspamd_symcache_dynamic_item *dyn_item;
+	guint i;
+
+	item = cbd->item;
+	task = cbd->task;
+	checkpoint = task->checkpoint;
+	checkpoint->has_slow = FALSE;
+	ev_timer_stop (EV_A_ w);
+
+	/* Process all reverse dependencies */
+	PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
+		if (rdep->item) {
+			dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item);
+			if (!CHECK_START_BIT (checkpoint, dyn_item)) {
+				msg_debug_cache_task ("check item %d(%s) rdep of %s ",
+						rdep->item->id, rdep->item->symbol, item->symbol);
+
+				if (!rspamd_symcache_check_deps (task, task->cfg->cache,
+						rdep->item,
+						checkpoint, 0, FALSE)) {
+					msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s "
+										  "unless deps are resolved",
+							rdep->item->id, rdep->item->symbol, item->symbol);
+				}
+				else {
+					rspamd_symcache_check_symbol (task, task->cfg->cache,
+							rdep->item,
+							checkpoint);
+				}
+			}
+		}
+	}
+}
+
+static void
+rspamd_delayed_timer_dtor (gpointer d)
+{
+	struct rspamd_symcache_delayed_cbdata *cbd =
+			(struct rspamd_symcache_delayed_cbdata *)d;
+
+	ev_timer_stop (cbd->task->event_loop, &cbd->tm);
+}
 
 /**
  * Finalize the current async element potentially calling its deps
@@ -3027,11 +3088,14 @@ rspamd_symcache_finalize_item (struct rspamd_task *task,
 	checkpoint->cur_item = NULL;
 
 	if (checkpoint->profile) {
-		diff = ((rspamd_get_virtual_ticks () - checkpoint->profile_start) * 1e3 -
+		ev_now_update_if_cheap (task->event_loop);
+		diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 -
 				dyn_item->start_msec);
+
 		if (diff > slow_diff_limit) {
 			msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id,
 					diff);
+			checkpoint->has_slow = TRUE;
 		}
 
 		if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
@@ -3043,6 +3107,24 @@ rspamd_symcache_finalize_item (struct rspamd_task *task,
 		}
 	}
 
+	if (checkpoint->has_slow) {
+		struct rspamd_symcache_delayed_cbdata *cbd = rspamd_mempool_alloc (task->task_pool,
+				sizeof (*cbd));
+		/* Add timer to allow something else to be executed */
+		ev_timer *tm = &cbd->tm;
+
+		ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
+		ev_set_priority (tm, EV_MINPRI);
+		rspamd_mempool_add_destructor (task->task_pool,
+				rspamd_delayed_timer_dtor, cbd);
+		cbd->task = task;
+		cbd->item = item;
+		tm->data = cbd;
+		ev_timer_start (task->event_loop, tm);
+
+		return;
+	}
+
 	/* Process all reverse dependencies */
 	PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
 		if (rdep->item) {
@@ -3598,4 +3680,19 @@ rspamd_symcache_item_get_rdeps (struct rspamd_symcache_item *item)
 	}
 
 	return NULL;
+}
+
+void
+rspamd_symcache_enable_profile (struct rspamd_task *task)
+{
+	struct cache_savepoint *checkpoint = task->checkpoint;
+
+	if (checkpoint && !checkpoint->profile) {
+		ev_now_update_if_cheap (task->event_loop);
+		ev_tstamp now = ev_now (task->event_loop);
+		checkpoint->profile_start = now;
+
+		msg_debug_cache_task ("enable profiling of symbols for task");
+		checkpoint->profile = TRUE;
+	}
 }
\ No newline at end of file
diff --git a/src/libserver/rspamd_symcache.h b/src/libserver/rspamd_symcache.h
index 0eab043fd..c220b1ccc 100644
--- a/src/libserver/rspamd_symcache.h
+++ b/src/libserver/rspamd_symcache.h
@@ -569,6 +569,11 @@ const GPtrArray* rspamd_symcache_item_get_rdeps (
 		struct rspamd_symcache_item *item);
 
 
+/**
+ * Enable profiling for task (e.g. when a slow rule has been found)
+ * @param task
+ */
+void rspamd_symcache_enable_profile (struct rspamd_task *task);
 #ifdef  __cplusplus
 }
 #endif


More information about the Commits mailing list