commit f827a09: [Project] Add logic to break execution when processing symbols
Vsevolod Stakhov
vsevolod at highsecure.ru
Tue Dec 24 17:07:09 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-12-24 16:26:12 +0000
URL: https://github.com/rspamd/rspamd/commit/f827a09afb7785c97010e7116dc63b61efde6a25
[Project] Add logic to break execution when processing symbols
---
src/controller.c | 1 +
src/libserver/rspamd_symcache.c | 105 ++++++++++++++++++++++++++++++++++++++--
src/libserver/rspamd_symcache.h | 5 ++
3 files changed, 107 insertions(+), 4 deletions(-)
diff --git a/src/controller.c b/src/controller.c
index d36431702..adbc2b848 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -2126,6 +2126,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
ctx->task_timeout, ctx->task_timeout);
ev_timer_start (task->event_loop, &task->timeout_ev);
+ ev_set_priority (&task->timeout_ev, EV_MAXPRI);
}
end:
diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c
index f232a6a24..31bba1386 100644
--- a/src/libserver/rspamd_symcache.c
+++ b/src/libserver/rspamd_symcache.c
@@ -204,6 +204,7 @@ struct cache_savepoint {
guint version;
guint items_inflight;
gboolean profile;
+ gboolean has_slow;
gdouble profile_start;
struct rspamd_scan_result *rs;
@@ -1761,7 +1762,8 @@ rspamd_symcache_check_symbol (struct rspamd_task *task,
msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
if (checkpoint->profile) {
- dyn_item->start_msec = (rspamd_get_virtual_ticks () -
+ ev_now_update_if_cheap (task->event_loop);
+ dyn_item->start_msec = (ev_now (task->event_loop) -
checkpoint->profile_start) * 1e3;
}
@@ -1914,14 +1916,15 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task,
rspamd_symcache_order_unref, checkpoint->order);
/* Calculate profile probability */
+ ev_now_update_if_cheap (task->event_loop);
ev_tstamp now = ev_now (task->event_loop);
+ checkpoint->profile_start = now;
if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) ||
(task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
(rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) {
msg_debug_cache_task ("enable profiling of symbols for task");
checkpoint->profile = TRUE;
- checkpoint->profile_start = rspamd_get_virtual_ticks ();
cache->last_profile = now;
}
@@ -2029,7 +2032,8 @@ rspamd_symcache_process_settings (struct rspamd_task *task,
gboolean
rspamd_symcache_process_symbols (struct rspamd_task *task,
- struct rspamd_symcache *cache, gint stage)
+ struct rspamd_symcache *cache,
+ gint stage)
{
struct rspamd_symcache_item *item = NULL;
struct rspamd_symcache_dynamic_item *dyn_item;
@@ -2986,6 +2990,63 @@ rspamd_symcache_set_cur_item (struct rspamd_task *task,
return ex;
}
+struct rspamd_symcache_delayed_cbdata {
+ struct rspamd_symcache_item *item;
+ struct rspamd_task *task;
+ struct ev_timer tm;
+};
+
+static void
+rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what)
+{
+ struct rspamd_symcache_delayed_cbdata *cbd =
+ (struct rspamd_symcache_delayed_cbdata *)w->data;
+ struct rspamd_symcache_item *item;
+ struct rspamd_task *task;
+ struct cache_dependency *rdep;
+ struct cache_savepoint *checkpoint;
+ struct rspamd_symcache_dynamic_item *dyn_item;
+ guint i;
+
+ item = cbd->item;
+ task = cbd->task;
+ checkpoint = task->checkpoint;
+ checkpoint->has_slow = FALSE;
+ ev_timer_stop (EV_A_ w);
+
+ /* Process all reverse dependencies */
+ PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
+ if (rdep->item) {
+ dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item);
+ if (!CHECK_START_BIT (checkpoint, dyn_item)) {
+ msg_debug_cache_task ("check item %d(%s) rdep of %s ",
+ rdep->item->id, rdep->item->symbol, item->symbol);
+
+ if (!rspamd_symcache_check_deps (task, task->cfg->cache,
+ rdep->item,
+ checkpoint, 0, FALSE)) {
+ msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s "
+ "unless deps are resolved",
+ rdep->item->id, rdep->item->symbol, item->symbol);
+ }
+ else {
+ rspamd_symcache_check_symbol (task, task->cfg->cache,
+ rdep->item,
+ checkpoint);
+ }
+ }
+ }
+ }
+}
+
+static void
+rspamd_delayed_timer_dtor (gpointer d)
+{
+ struct rspamd_symcache_delayed_cbdata *cbd =
+ (struct rspamd_symcache_delayed_cbdata *)d;
+
+ ev_timer_stop (cbd->task->event_loop, &cbd->tm);
+}
/**
* Finalize the current async element potentially calling its deps
@@ -3027,11 +3088,14 @@ rspamd_symcache_finalize_item (struct rspamd_task *task,
checkpoint->cur_item = NULL;
if (checkpoint->profile) {
- diff = ((rspamd_get_virtual_ticks () - checkpoint->profile_start) * 1e3 -
+ ev_now_update_if_cheap (task->event_loop);
+ diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 -
dyn_item->start_msec);
+
if (diff > slow_diff_limit) {
msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id,
diff);
+ checkpoint->has_slow = TRUE;
}
if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
@@ -3043,6 +3107,24 @@ rspamd_symcache_finalize_item (struct rspamd_task *task,
}
}
+ if (checkpoint->has_slow) {
+ struct rspamd_symcache_delayed_cbdata *cbd = rspamd_mempool_alloc (task->task_pool,
+ sizeof (*cbd));
+ /* Add timer to allow something else to be executed */
+ ev_timer *tm = &cbd->tm;
+
+ ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
+ ev_set_priority (tm, EV_MINPRI);
+ rspamd_mempool_add_destructor (task->task_pool,
+ rspamd_delayed_timer_dtor, cbd);
+ cbd->task = task;
+ cbd->item = item;
+ tm->data = cbd;
+ ev_timer_start (task->event_loop, tm);
+
+ return;
+ }
+
/* Process all reverse dependencies */
PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
if (rdep->item) {
@@ -3598,4 +3680,19 @@ rspamd_symcache_item_get_rdeps (struct rspamd_symcache_item *item)
}
return NULL;
+}
+
+void
+rspamd_symcache_enable_profile (struct rspamd_task *task)
+{
+ struct cache_savepoint *checkpoint = task->checkpoint;
+
+ if (checkpoint && !checkpoint->profile) {
+ ev_now_update_if_cheap (task->event_loop);
+ ev_tstamp now = ev_now (task->event_loop);
+ checkpoint->profile_start = now;
+
+ msg_debug_cache_task ("enable profiling of symbols for task");
+ checkpoint->profile = TRUE;
+ }
}
\ No newline at end of file
diff --git a/src/libserver/rspamd_symcache.h b/src/libserver/rspamd_symcache.h
index 0eab043fd..c220b1ccc 100644
--- a/src/libserver/rspamd_symcache.h
+++ b/src/libserver/rspamd_symcache.h
@@ -569,6 +569,11 @@ const GPtrArray* rspamd_symcache_item_get_rdeps (
struct rspamd_symcache_item *item);
+/**
+ * Enable profiling for task (e.g. when a slow rule has been found)
+ * @param task
+ */
+void rspamd_symcache_enable_profile (struct rspamd_task *task);
#ifdef __cplusplus
}
#endif
More information about the Commits
mailing list