commit fd96930: [Rework] Rework files structure
Vsevolod Stakhov
vsevolod at rspamd.com
Sat Apr 30 19:21:11 UTC 2022
Author: Vsevolod Stakhov
Date: 2022-04-02 16:45:41 +0100
URL: https://github.com/rspamd/rspamd/commit/fd9693073dc162c43710bd977830a3b400f2b43b
[Rework] Rework files structure
---
src/libserver/CMakeLists.txt | 5 +-
src/libserver/rspamd_symcache.cxx | 616 ---------------------------
src/libserver/symcache/symcache_c.cxx | 48 +++
src/libserver/symcache/symcache_impl.cxx | 292 +++++++++++++
src/libserver/symcache/symcache_internal.hxx | 385 +++++++++++++++++
src/libutil/CMakeLists.txt | 3 +-
6 files changed, 730 insertions(+), 619 deletions(-)
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt
index 7371e8ade..17f5ca751 100644
--- a/src/libserver/CMakeLists.txt
+++ b/src/libserver/CMakeLists.txt
@@ -16,11 +16,12 @@ SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
- ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c
${CMAKE_CURRENT_SOURCE_DIR}/spf.c
${CMAKE_CURRENT_SOURCE_DIR}/ssl_util.c
- ${CMAKE_CURRENT_SOURCE_DIR}/rspamd_symcache.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_impl.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_c.cxx
${CMAKE_CURRENT_SOURCE_DIR}/task.c
${CMAKE_CURRENT_SOURCE_DIR}/url.c
${CMAKE_CURRENT_SOURCE_DIR}/worker_util.c
diff --git a/src/libserver/rspamd_symcache.cxx b/src/libserver/rspamd_symcache.cxx
index cd44c2b84..e1e9ade4e 100644
--- a/src/libserver/rspamd_symcache.cxx
+++ b/src/libserver/rspamd_symcache.cxx
@@ -36,26 +36,6 @@
#include "contrib/robin-hood/robin_hood.h"
-#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
- cache->static_pool->tag.tagname, cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
- cache->static_pool->tag.tagname, cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
- cache->static_pool->tag.tagname, cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \
- rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
- rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
INIT_LOG_MODULE(symcache)
@@ -73,311 +53,7 @@ INIT_LOG_MODULE(symcache)
#define CLR_FINISH_BIT(checkpoint, dyn_item) \
(dyn_item)->finished = 0
-namespace rspamd::symcache {
-static const std::uint8_t rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0};
-
-struct rspamd_symcache_header {
- std::uint8_t magic[8];
- unsigned int nitems;
- std::uint8_t checksum[64];
- std::uint8_t unused[128];
-};
-
-struct cache_item;
-using cache_item_ptr = std::shared_ptr<cache_item>;
-using cache_item_weak_ptr = std::weak_ptr<cache_item>;
-
-struct order_generation {
- std::vector<cache_item_weak_ptr> d;
- unsigned int generation_id;
-};
-
-using order_generation_ptr = std::shared_ptr<order_generation>;
-
-/*
- * This structure is optimised to store ids list:
- * - If the first element is -1 then use dynamic part, else use static part
- * There is no std::variant to save space
- */
-struct id_list {
- union {
- std::uint32_t st[4];
- struct {
- std::uint32_t e; /* First element */
- std::uint16_t len;
- std::uint16_t allocated;
- std::uint32_t *n;
- } dyn;
- } data;
-
- id_list() {
- std::memset((void *)&data, 0, sizeof(data));
- }
- /**
- * Returns ids from a compressed list, accepting a mutable reference for number of elements
- * @param nids output of the number of elements
- * @return
- */
- auto get_ids(std::size_t &nids) const -> const std::uint32_t * {
- if (data.dyn.e == -1) {
- /* Dynamic list */
- nids = data.dyn.len;
-
- return data.dyn.n;
- }
- else {
- auto cnt = 0;
-
- while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) {
- cnt ++;
- }
-
- nids = cnt;
-
- return data.st;
- }
- }
-
- auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void {
- if (data.st[0] == -1) {
- /* Dynamic array */
- if (data.dyn.len < data.dyn.allocated) {
- /* Trivial, append + sort */
- data.dyn.n[data.dyn.len++] = id;
- }
- else {
- /* Reallocate */
- g_assert (data.dyn.allocated <= G_MAXINT16);
- data.dyn.allocated *= 2;
-
- auto *new_array = rspamd_mempool_alloc_array_type(pool,
- data.dyn.allocated, std::uint32_t);
- memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t));
- data.dyn.n = new_array;
- data.dyn.n[data.dyn.len++] = id;
- }
-
- std::sort(data.dyn.n, data.dyn.n + data.dyn.len);
- }
- else {
- /* Static part */
- auto cnt = 0u;
- while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) {
- cnt ++;
- }
-
- if (cnt < G_N_ELEMENTS (data.st)) {
- data.st[cnt] = id;
- }
- else {
- /* Switch to dynamic */
- data.dyn.allocated = G_N_ELEMENTS (data.st) * 2;
- auto *new_array = rspamd_mempool_alloc_array_type(pool,
- data.dyn.allocated, std::uint32_t);
- memcpy (new_array, data.st, sizeof(data.st));
- data.dyn.n = new_array;
- data.dyn.e = -1; /* Marker */
- data.dyn.len = G_N_ELEMENTS (data.st);
-
- /* Recursively jump to dynamic branch that will handle insertion + sorting */
- add_id(id, pool); // tail call
- }
- }
- }
-};
-
-struct item_condition {
-private:
- gint cb;
- lua_State *L;
-public:
- item_condition() {
- // TODO
- }
- virtual ~item_condition() {
- // TODO
- }
-};
-
-class normal_item {
-private:
- symbol_func_t func;
- void *user_data;
- std::vector<item_condition> conditions;
-public:
- explicit normal_item() {
- // TODO
- }
- auto add_condition() -> void {
- // TODO
- }
- auto call() -> void {
- // TODO
- }
-};
-
-class virtual_item {
-private:
- int parent_id;
- cache_item_ptr parent;
-public:
- explicit virtual_item() {
- // TODO
- }
-};
-
-struct cache_item {
- /* This block is likely shared */
- struct rspamd_symcache_item_stat *st;
- struct rspamd_counter_data *cd;
-
- std::uint64_t last_count;
- std::string symbol;
- std::string_view type_descr;
- int type;
-
- /* Callback data */
- std::variant<normal_item, virtual_item> specific;
-
- /* Condition of execution */
- bool enabled;
-
- /* Priority */
- int priority;
- /* Topological order */
- unsigned int order;
- /* Unique id - counter */
- int id;
-
- int frequency_peaks;
- /* Settings ids */
- id_list allowed_ids;
- /* Allows execution but not symbols insertion */
- id_list exec_only_ids;
- id_list forbidden_ids;
-
- /* Dependencies */
- std::vector<cache_item_ptr> deps;
- /* Reverse dependencies */
- std::vector<cache_item_ptr> rdeps;
-};
-
-struct delayed_cache_dependency {
- std::string from;
- std::string to;
-};
-
-struct delayed_cache_condition {
- std::string sym;
- int cbref;
- lua_State *L;
-};
-
-struct symcache {
- /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */
- robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol;
- std::vector<cache_item_weak_ptr> items_by_id;
-
- /* Items sorted into some order */
- order_generation_ptr items_by_order;
- unsigned int cur_order_gen;
-
- std::vector<cache_item_weak_ptr> connfilters;
- std::vector<cache_item_weak_ptr> prefilters;
- std::vector<cache_item_weak_ptr> filters;
- std::vector<cache_item_weak_ptr> postfilters;
- std::vector<cache_item_weak_ptr> composites;
- std::vector<cache_item_weak_ptr> idempotent;
- std::vector<cache_item_weak_ptr> virtual_symbols;
-
- /* These are stored within pointer to clean up after init */
- std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps;
- std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions;
-
- rspamd_mempool_t *static_pool;
- std::uint64_t cksum;
- double total_weight;
- std::size_t used_items;
- std::size_t stats_symbols_count;
- std::uint64_t total_hits;
-
- struct rspamd_config *cfg;
- lua_State *L;
- double reload_time;
- double last_profile;
- int peak_cb;
- int id;
-
-public:
- explicit symcache(struct rspamd_config *cfg) : cfg(cfg) {
- /* XXX: do we need a special pool for symcache? I don't think so */
- static_pool = cfg->cfg_pool;
- reload_time = cfg->cache_reload_time;
- total_hits = 1;
- total_weight = 1.0;
- cksum = 0xdeadbabe;
- peak_cb = -1;
- id = rspamd_random_uint64_fast();
- L = (lua_State *)cfg->lua_state;
- }
-
- virtual ~symcache() {
- if (peak_cb != -1) {
- luaL_unref(L, LUA_REGISTRYINDEX, peak_cb);
- }
- }
-};
-
-
-/*
- * These items are saved within task structure and are used to track
- * symbols execution
- */
-struct cache_dynamic_item {
- std::uint16_t start_msec; /* Relative to task time */
- unsigned started: 1;
- unsigned finished: 1;
- /* unsigned pad:14; */
- std::uint32_t async_events;
-};
-
-
-struct cache_dependency {
- cache_item_ptr item; /* Owning pointer to the real dep */
- std::string_view sym; /* Symbolic dep name */
- int id; /* Real from */
- int vid; /* Virtual from */
-};
-
-struct cache_savepoint {
- unsigned order_gen;
- unsigned items_inflight;
- bool profile;
- bool has_slow;
-
- double profile_start;
- double lim;
-
- struct rspamd_scan_result *rs;
-
- struct cache_item *cur_item;
- order_generation_ptr order;
- /* Dynamically expanded as needed */
- struct cache_dynamic_item dynamic_items[];
-};
-
-struct cache_refresh_cbdata {
- double last_resort;
- ev_timer resort_ev;
- struct symcache *cache;
- struct rspamd_worker *w;
- struct ev_loop *event_loop;
-};
-
-} // namespace rspamd
-
-#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr))
/* At least once per minute */
#define PROFILE_MAX_TIME (60.0)
@@ -831,260 +507,6 @@ rspamd_symcache_process_dep (struct rspamd_symcache *cache,
}
}
-/* Sort items in logical order */
-static void
-rspamd_symcache_post_init (struct rspamd_symcache *cache)
-{
- struct rspamd_symcache_item *it, *vit;
- struct cache_dependency *dep;
- struct delayed_cache_dependency *ddep;
- struct delayed_cache_condition *dcond;
- GList *cur;
- gint i, j;
-
- cur = cache->delayed_deps;
- while (cur) {
- ddep = cur->data;
-
- vit = rspamd_symcache_find_filter (cache, ddep->from, false);
- it = rspamd_symcache_find_filter (cache, ddep->from, true);
-
- if (it == NULL || vit == NULL) {
- msg_err_cache ("cannot register delayed dependency between %s and %s: "
- "%s is missing", ddep->from, ddep->to, ddep->from);
- }
- else {
- msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from,
- it->id, vit->id, ddep->to);
- rspamd_symcache_add_dependency (cache, it->id, ddep->to, vit != it ?
- vit->id : -1);
- }
-
- cur = g_list_next (cur);
- }
-
- cur = cache->delayed_conditions;
- while (cur) {
- dcond = cur->data;
-
- it = rspamd_symcache_find_filter (cache, dcond->sym, true);
-
- if (it == NULL) {
- msg_err_cache (
- "cannot register delayed condition for %s",
- dcond->sym);
- luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref);
- }
- else {
- struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool,
- sizeof (*ncond));
- ncond->cb = dcond->cbref;
- DL_APPEND (it->specific.normal.conditions, ncond);
- }
-
- cur = g_list_next (cur);
- }
-
- PTR_ARRAY_FOREACH (cache->items_by_id, i, it) {
-
- PTR_ARRAY_FOREACH (it->deps, j, dep) {
- rspamd_symcache_process_dep (cache, it, dep);
- }
-
- if (it->deps) {
- /* Reversed loop to make removal safe */
- for (j = it->deps->len - 1; j >= 0; j--) {
- dep = g_ptr_array_index (it->deps, j);
-
- if (dep->item == NULL) {
- /* Remove useless dep */
- g_ptr_array_remove_index (it->deps, j);
- }
- }
- }
- }
-
- /* Special case for virtual symbols */
- PTR_ARRAY_FOREACH (cache->virtual, i, it) {
-
- PTR_ARRAY_FOREACH (it->deps, j, dep) {
- rspamd_symcache_process_dep (cache, it, dep);
- }
- }
-
- g_ptr_array_sort_with_data (cache->connfilters, prefilters_cmp, cache);
- g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache);
- g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache);
- g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache);
-
- rspamd_symcache_resort (cache);
-}
-
-static gboolean
-rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name)
-{
- struct rspamd_symcache_header *hdr;
- struct stat st;
- struct ucl_parser *parser;
- ucl_object_t *top;
- const ucl_object_t *cur, *elt;
- ucl_object_iter_t it;
- struct rspamd_symcache_item *item, *parent;
- const guchar *p;
- gint fd;
- gpointer map;
-
- fd = open (name, O_RDONLY);
-
- if (fd == -1) {
- msg_info_cache ("cannot open file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- rspamd_file_lock (fd, FALSE);
-
- if (fstat (fd, &st) == -1) {
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- msg_info_cache ("cannot stat file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- if (st.st_size < (gint)sizeof (*hdr)) {
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- errno = EINVAL;
- msg_info_cache ("cannot use file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
-
- if (map == MAP_FAILED) {
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- msg_info_cache ("cannot mmap file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- hdr = map;
-
- if (memcmp (hdr->magic, rspamd_symcache_magic,
- sizeof (rspamd_symcache_magic)) != 0) {
- msg_info_cache ("cannot use file %s, bad magic", name);
- munmap (map, st.st_size);
- rspamd_file_unlock (fd, FALSE);
- close (fd);
-
- return FALSE;
- }
-
- parser = ucl_parser_new (0);
- p = (const guchar *)(hdr + 1);
-
- if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) {
- msg_info_cache ("cannot use file %s, cannot parse: %s", name,
- ucl_parser_get_error (parser));
- munmap (map, st.st_size);
- ucl_parser_free (parser);
- rspamd_file_unlock (fd, FALSE);
- close (fd);
-
- return FALSE;
- }
-
- top = ucl_parser_get_object (parser);
- munmap (map, st.st_size);
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- ucl_parser_free (parser);
-
- if (top == NULL || ucl_object_type (top) != UCL_OBJECT) {
- msg_info_cache ("cannot use file %s, bad object", name);
- ucl_object_unref (top);
- return FALSE;
- }
-
- it = ucl_object_iterate_new (top);
-
- while ((cur = ucl_object_iterate_safe (it, true))) {
- item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur));
-
- if (item) {
- /* Copy saved info */
- /*
- * XXX: don't save or load weight, it should be obtained from the
- * metric
- */
-#if 0
- elt = ucl_object_lookup (cur, "weight");
-
- if (elt) {
- w = ucl_object_todouble (elt);
- if (w != 0) {
- item->weight = w;
- }
- }
-#endif
- elt = ucl_object_lookup (cur, "time");
- if (elt) {
- item->st->avg_time = ucl_object_todouble (elt);
- }
-
- elt = ucl_object_lookup (cur, "count");
- if (elt) {
- item->st->total_hits = ucl_object_toint (elt);
- item->last_count = item->st->total_hits;
- }
-
- elt = ucl_object_lookup (cur, "frequency");
- if (elt && ucl_object_type (elt) == UCL_OBJECT) {
- const ucl_object_t *freq_elt;
-
- freq_elt = ucl_object_lookup (elt, "avg");
-
- if (freq_elt) {
- item->st->avg_frequency = ucl_object_todouble (freq_elt);
- }
- freq_elt = ucl_object_lookup (elt, "stddev");
-
- if (freq_elt) {
- item->st->stddev_frequency = ucl_object_todouble (freq_elt);
- }
- }
-
- if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) {
- g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len);
- parent = g_ptr_array_index (cache->items_by_id,
- item->specific.virtual.parent);
- item->specific.virtual.parent_item = parent;
-
- if (parent->st->weight < item->st->weight) {
- parent->st->weight = item->st->weight;
- }
-
- /*
- * We maintain avg_time for virtual symbols equal to the
- * parent item avg_time
- */
- item->st->avg_time = parent->st->avg_time;
- }
-
- cache->total_weight += fabs (item->st->weight);
- cache->total_hits += item->st->total_hits;
- }
- }
-
- ucl_object_iterate_free (it);
- ucl_object_unref (top);
-
- return TRUE;
-}
-
#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0)
static gboolean
@@ -1438,21 +860,6 @@ rspamd_symcache_save (struct rspamd_symcache *cache)
}
}
-void
-rspamd_symcache_destroy (struct rspamd_symcache *cache)
-{
- auto *real_cache = C_API_SYMCACHE(cache);
-
- delete real_cache;
-}
-
-struct rspamd_symcache*
-rspamd_symcache_new (struct rspamd_config *cfg)
-{
- auto *ncache = new rspamd::symcache::symcache(cfg);
-
- return (struct rspamd_symcache*)ncache;
-}
static void
rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud)
@@ -1472,30 +879,7 @@ rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud)
}
}
-gboolean
-rspamd_symcache_init (struct rspamd_symcache *cache)
-{
- gboolean res = TRUE;
-
- g_assert (cache != NULL);
-
- cache->reload_time = cache->cfg->cache_reload_time;
- if (cache->cfg->cache_filename != NULL) {
- res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename);
- }
-
- rspamd_symcache_post_init (cache);
-
- /* Connect metric symbols with symcache symbols */
- if (cache->cfg->symbols) {
- g_hash_table_foreach (cache->cfg->symbols,
- rspamd_symcache_metric_connect_cb,
- cache);
- }
-
- return res;
-}
static void
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
new file mode 100644
index 000000000..7255f9d10
--- /dev/null
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -0,0 +1,48 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
*** OUTPUT TRUNCATED, 749 LINES SKIPPED ***
More information about the Commits
mailing list