commit d4e58c1: [Rework] Implement cache resorting

Vsevolod Stakhov vsevolod at rspamd.com
Sat Apr 30 19:21:33 UTC 2022


Author: Vsevolod Stakhov
Date: 2022-04-17 20:55:46 +0100
URL: https://github.com/rspamd/rspamd/commit/d4e58c1d726df732c871d2c365581b88ac463ae6

[Rework] Implement cache resorting

---
 src/libserver/rspamd_symcache.h              |  2 +-
 src/libserver/symcache/symcache_c.cxx        | 16 +++++
 src/libserver/symcache/symcache_impl.cxx     | 73 ++++++++++++++++++++++
 src/libserver/symcache/symcache_internal.hxx | 18 +++---
 src/libserver/symcache/symcache_periodic.hxx | 90 ++++++++++++++++++++++++++++
 5 files changed, 189 insertions(+), 10 deletions(-)

diff --git a/src/libserver/rspamd_symcache.h b/src/libserver/rspamd_symcache.h
index 915da9b15..71cb6e285 100644
--- a/src/libserver/rspamd_symcache.h
+++ b/src/libserver/rspamd_symcache.h
@@ -210,7 +210,7 @@ ucl_object_t *rspamd_symcache_counters (struct rspamd_symcache *cache);
  * @param cache
  * @param ev_base
  */
-void rspamd_symcache_start_refresh (struct rspamd_symcache *cache,
+void* rspamd_symcache_start_refresh (struct rspamd_symcache *cache,
 									struct ev_loop *ev_base,
 									struct rspamd_worker *w);
 
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
index d081d7841..c87f16d51 100644
--- a/src/libserver/symcache/symcache_c.cxx
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -15,6 +15,7 @@
  */
 
 #include "symcache_internal.hxx"
+#include "symcache_periodic.hxx"
 
 /**
  * C API for symcache
@@ -158,4 +159,19 @@ rspamd_symcache_validate(struct rspamd_symcache *cache,
 	auto *real_cache = C_API_SYMCACHE(cache);
 
 	return real_cache->validate(strict);
+}
+
+ucl_object_t *
+rspamd_symcache_counters (struct rspamd_symcache *cache)
+{
+	auto *real_cache = C_API_SYMCACHE(cache);
+	return real_cache->counters();
+}
+
+void *
+rspamd_symcache_start_refresh (struct rspamd_symcache *cache,
+							   struct ev_loop *ev_base, struct rspamd_worker *w)
+{
+	auto *real_cache = C_API_SYMCACHE(cache);
+	return new rspamd::symcache::cache_refresh_cbdata{real_cache, ev_base, w};
 }
\ No newline at end of file
diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx
index a324fe523..807ec7372 100644
--- a/src/libserver/symcache/symcache_impl.cxx
+++ b/src/libserver/symcache/symcache_impl.cxx
@@ -829,6 +829,79 @@ auto symcache::counters() const -> ucl_object_t *
 	return top;
 }
 
+auto symcache::periodic_resort(struct ev_loop *ev_loop, double cur_time, double last_resort) -> void
+{
+	static const double decay_rate = 0.25;
+
+	for (const auto &item: filters) {
+		item->st->total_hits += item->st->hits;
+		g_atomic_int_set (&item->st->hits, 0);
+
+		if (item->last_count > 0) {
+			gdouble cur_err, cur_value;
+
+			cur_value = (item->st->total_hits - item->last_count) /
+						(cur_time - last_resort);
+			rspamd_set_counter_ema(&item->st->frequency_counter,
+					cur_value, decay_rate);
+			item->st->avg_frequency = item->st->frequency_counter.mean;
+			item->st->stddev_frequency = item->st->frequency_counter.stddev;
+
+			if (cur_value > 0) {
+				msg_debug_cache ("frequency for %s is %.2f, avg: %.2f",
+						item->symbol.c_str(), cur_value, item->st->avg_frequency);
+			}
+
+			cur_err = (item->st->avg_frequency - cur_value);
+			cur_err *= cur_err;
+
+			if (item->st->frequency_counter.number > 10 &&
+				cur_err > sqrt(item->st->stddev_frequency) * 3) {
+				item->frequency_peaks++;
+				msg_debug_cache ("peak found for %s is %.2f, avg: %.2f, "
+								 "stddev: %.2f, error: %.2f, peaks: %d",
+						item->symbol.c_str(), cur_value,
+						item->st->avg_frequency,
+						item->st->stddev_frequency,
+						cur_err,
+						item->frequency_peaks);
+
+				if (peak_cb != -1) {
+					struct ev_loop **pbase;
+
+					lua_rawgeti(L, LUA_REGISTRYINDEX, peak_cb);
+					pbase = (struct ev_loop **) lua_newuserdata(L, sizeof(*pbase));
+					*pbase = ev_loop;
+					rspamd_lua_setclass(L, "rspamd{ev_base}", -1);
+					lua_pushlstring(L, item->symbol.c_str(), item->symbol.size());
+					lua_pushnumber(L, item->st->avg_frequency);
+					lua_pushnumber(L, ::sqrt(item->st->stddev_frequency));
+					lua_pushnumber(L, cur_value);
+					lua_pushnumber(L, cur_err);
+
+					if (lua_pcall(L, 6, 0, 0) != 0) {
+						msg_info_cache ("call to peak function for %s failed: %s",
+								item->symbol.c_str(), lua_tostring(L, -1));
+						lua_pop (L, 1);
+					}
+				}
+			}
+		}
+
+		item->last_count = item->st->total_hits;
+
+		if (item->cd->number > 0) {
+			if (!item->is_virtual()) {
+				item->st->avg_time = item->cd->mean;
+				rspamd_set_counter_ema(&item->st->time_counter,
+						item->st->avg_time, decay_rate);
+				item->st->avg_time = item->st->time_counter.mean;
+				memset(item->cd, 0, sizeof(*item->cd));
+			}
+		}
+	}
+}
+
 auto cache_item::get_parent(const symcache &cache) const -> const cache_item *
 {
 	if (is_virtual()) {
diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx
index dea2ff647..b7dfdad77 100644
--- a/src/libserver/symcache/symcache_internal.hxx
+++ b/src/libserver/symcache/symcache_internal.hxx
@@ -31,6 +31,8 @@
 #include <string_view>
 #include <memory>
 #include <variant>
+
+#include "contrib/libev/ev.h"
 #include "contrib/robin-hood/robin_hood.h"
 #include "contrib/expected/expected.hpp"
 #include "cfg_file.h"
@@ -550,6 +552,13 @@ public:
 	 * @return
 	 */
 	auto counters() const -> ucl_object_t *;
+
+	/**
+	 * Adjusts stats of the cache for the periodic counter
+	 */
+	auto periodic_resort(struct ev_loop *ev_loop, double cur_time, double last_resort) -> void;
+
+	auto get_reload_time() const { return reload_time; };
 };
 
 /*
@@ -580,15 +589,6 @@ struct cache_savepoint {
 	/* Dynamically expanded as needed */
 	struct cache_dynamic_item dynamic_items[];
 };
-
-struct cache_refresh_cbdata {
-	double last_resort;
-	ev_timer resort_ev;
-	symcache *cache;
-	struct rspamd_worker *w;
-	struct ev_loop *event_loop;
-};
-
 } // namespace rspamd
 
 #endif //RSPAMD_SYMCACHE_INTERNAL_HXX
diff --git a/src/libserver/symcache/symcache_periodic.hxx b/src/libserver/symcache/symcache_periodic.hxx
new file mode 100644
index 000000000..2719fca25
--- /dev/null
+++ b/src/libserver/symcache/symcache_periodic.hxx
@@ -0,0 +1,90 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef RSPAMD_SYMCACHE_PERIODIC_HXX
+#define RSPAMD_SYMCACHE_PERIODIC_HXX
+
+#pragma once
+
+#include "config.h"
+#include "contrib/libev/ev.h"
+#include "symcache_internal.hxx"
+#include "worker_util.h"
+
+namespace rspamd::symcache {
+struct cache_refresh_cbdata {
+private:
+
+	symcache *cache;
+	struct ev_loop *event_loop;
+	struct rspamd_worker *w;
+	double reload_time;
+	double last_resort;
+	ev_timer resort_ev;
+
+public:
+	explicit cache_refresh_cbdata(symcache *_cache,
+										 struct ev_loop *_ev_base,
+										 struct rspamd_worker *_w)
+			: cache(_cache), event_loop(_ev_base), w(_w)
+	{
+		auto log_tag = [&]() { return cache->log_tag(); };
+		last_resort = rspamd_get_ticks(TRUE);
+		reload_time = cache->get_reload_time();
+		auto tm = rspamd_time_jitter(reload_time, 0);
+		msg_debug_cache("next reload in %.2f seconds", tm);
+		ev_timer_init (&resort_ev, cache_refresh_cbdata::resort_cb,
+				tm, tm);
+		resort_ev.data = (void *) this;
+		ev_timer_start(event_loop, &resort_ev);
+		rspamd_mempool_add_destructor(cache->get_pool(),
+				cache_refresh_cbdata::refresh_dtor, (void *) this);
+	}
+
+	static void refresh_dtor(void *d)
+	{
+		auto *cbdata = (struct cache_refresh_cbdata *) d;
+		delete cbdata;
+	}
+
+	static void resort_cb(EV_P_ ev_timer *w, int _revents) {
+		auto *cbdata = (struct cache_refresh_cbdata *)w->data;
+		static const double decay_rate = 0.25;
+
+		auto log_tag = [&]() { return cbdata->cache->log_tag(); };
+
+		if (rspamd_worker_is_primary_controller(cbdata->w)) {
+			/* Plan new event */
+			auto tm = rspamd_time_jitter(cbdata->reload_time, 0);
+			msg_debug_cache("resort symbols cache, next reload in %.2f seconds", tm);
+			cbdata->resort_ev.repeat = tm;
+			ev_timer_again(EV_A_ w);
+			auto cur_time = rspamd_get_ticks(FALSE);
+			cbdata->cache->periodic_resort(cbdata->event_loop, cur_time, cbdata->last_resort);
+			cbdata->last_resort = cur_time;
+		}
+	}
+
+private:
+	~cache_refresh_cbdata()
+	{
+		ev_timer_stop(event_loop, &resort_ev);
+	}
+};
+}
+
+#endif //RSPAMD_SYMCACHE_PERIODIC_HXX


More information about the Commits mailing list