commit 3038d95: [Project] Neural: Rework train data pushing part

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jul 6 19:56:08 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-07-06 13:39:46 +0100
URL: https://github.com/rspamd/rspamd/commit/3038d9585f3c34fbd4fb65179d064fca2e445783

[Project] Neural: Rework train data pushing part

---
 src/plugins/lua/neural.lua | 91 ++++++++++++++++++++--------------------------
 1 file changed, 39 insertions(+), 52 deletions(-)

diff --git a/src/plugins/lua/neural.lua b/src/plugins/lua/neural.lua
index fdb138321..f40778a7b 100644
--- a/src/plugins/lua/neural.lua
+++ b/src/plugins/lua/neural.lua
@@ -65,7 +65,7 @@ local default_options = {
 -- Settings ANN table is loaded from Redis and represents dynamic profile for ANN
 -- Some elements are directly stored in Redis, ANN is, in turn loaded dynamically
 -- * version - version of ANN loaded from redis
--- * ann_key - name of ANN key in Redis
+-- * redis_key - name of ANN key in Redis
 -- * symbols - symbols in THIS PARTICULAR ANN (might be different from set.symbols)
 -- * distance - distance between set.symbols and set.ann.symbols
 -- * ann - kann object
@@ -85,31 +85,25 @@ end
 
 -- Lua script to train a row
 -- Uses the following keys:
--- key1 - prefix for fann
--- key2 - fann suffix (settings id)
--- key3 - spam or ham
--- key4 - maximum trains
+-- key1 - ann key
+-- key2 - spam or ham
+-- key3 - maximum trains
 -- returns 1 or 0: 1 - allow learn, 0 - not allow learn
 local redis_lua_script_can_train = [[
-  local prefix = KEYS[1] .. KEYS[2]
-  local locked = redis.call('GET', prefix .. '_locked')
+  local prefix = KEYS[1]
+  local locked = redis.call('HGET', prefix, 'lock')
   if locked then return 0 end
   local nspam = 0
   local nham = 0
-  local lim = tonumber(KEYS[4])
+  local lim = tonumber(KEYS[3])
   lim = lim + lim * 0.1
 
-  local exists = redis.call('SISMEMBER', KEYS[1], KEYS[2])
-  if not exists or tonumber(exists) == 0 then
-    redis.call('SADD', KEYS[1], KEYS[2])
-  end
-
   local ret = redis.call('LLEN', prefix .. '_spam')
   if ret then nspam = tonumber(ret) end
   ret = redis.call('LLEN', prefix .. '_ham')
   if ret then nham = tonumber(ret) end
 
-  if KEYS[3] == 'spam' then
+  if KEYS[2] == 'spam' then
     if nham <= lim and nham + 1 >= nspam then
       return tostring(nspam + 1)
     else
@@ -155,6 +149,9 @@ local redis_lua_script_maybe_invalidate = [[
     for _,k in ipairs(to_delete) do
       local tb = cjson.decode(k)
       redis.call('DEL', tb.ann_key)
+      -- Also train vectors
+      redis.call('DEL', tb.ann_key .. '_spam')
+      redis.call('DEL', tb.ann_key .. '_ham')
     end
     redis.call('ZREMRANGEBYRANK', KEYS[1], 0, (-(tonumber(KEYS[2] - 1)))
     return to_delete
@@ -328,9 +325,8 @@ local function create_ann(n, nlayers)
 end
 
 
-local function ann_train_callback(rule, task, score, required_score, id)
-  local train_opts = rule['train']
-  local fname,suffix = gen_ann_prefix(rule, id)
+local function ann_train_callback(rule, task, score, required_score, set)
+  local train_opts = rule.train
 
   local learn_spam, learn_ham
 
@@ -360,16 +356,15 @@ local function ann_train_callback(rule, task, score, required_score, id)
 
 
   if learn_spam or learn_ham then
-    local k
-    local vec_len = 0
-    if learn_spam then k = 'spam' else k = 'ham' end
+    local learn_type
+    if learn_spam then learn_type = 'spam' else learn_type = 'ham' end
 
     local function learn_vec_cb(err)
       if err then
         rspamd_logger.errx(task, 'cannot store train vector for %s: %s', fname, err)
       else
         rspamd_logger.infox(task, "trained ANN rule %s, save %s vector, %s bytes",
-          rule['name'], k, vec_len)
+          rule['name'], learn_type, vec_len)
       end
     end
 
@@ -380,42 +375,36 @@ local function ann_train_callback(rule, task, score, required_score, id)
           rspamd_logger.infox(task, 'probabilistically skip sample: %s', coin)
           return
         end
-        local ann_data = task:get_symbols_tokens()
-        local mt = meta_functions.rspamd_gen_metatokens(task)
-        -- Add filtered meta tokens
-        fun.each(function(e) table.insert(ann_data, e) end, mt)
-        -- Check NaNs in train data
-        if fun.all(function(e) return e == e end, ann_data) then
-          local str = rspamd_util.zstd_compress(table.concat(ann_data, ';'))
-          vec_len = #str
-
-          lua_redis.redis_make_request(task,
+        local vec = result_to_vector(task, set)
+
+        local str = rspamd_util.zstd_compress(table.concat(vec, ';'))
+
+        lua_redis.redis_make_request(task,
             rule.redis,
             nil,
             true, -- is write
             learn_vec_cb, --callback
             'LPUSH', -- command
-            {fname .. '_' .. k, str} -- arguments
-          )
-        else
-          rspamd_logger.errx(task, "do not store learn vector as it contains %s NaN values",
-            fun.length(fun.filter(function(e) return e ~= e end, ann_data)))
-        end
-
+            { set.ann.redis_prefix .. '_' .. learn_type, str} -- arguments
+        )
       else
         if err then
           rspamd_logger.errx(task, 'cannot check if we can train %s: %s', fname, err)
         elseif tonumber(data) < 0 then
-          rspamd_logger.infox(task, "cannot learn ANN %s: too many %s samples: %s",
-            fname, k, -tonumber(data))
+          rspamd_logger.infox(task, "cannot learn ANN %s:%s: too many %s samples: %s",
+            rule.prefix, set.name, learn_type, -tonumber(data))
         end
       end
     end
 
+    if not set.ann then
+      -- Need to create or load a profile corresponding to the current configuration
+    end
+    -- Check if we can learn
     lua_redis.exec_redis_script(redis_can_train_id,
-      {task = task, is_write = true},
-      can_train_cb,
-      {gen_ann_prefix(rule, nil), suffix, k, tostring(train_opts.max_trains)})
+        {task = task, is_write = true},
+        can_train_cb,
+        { set.ann.redis_key, learn_type, tostring(train_opts.max_trains)})
   end
 end
 
@@ -742,7 +731,8 @@ local function load_new_ann(rule, ev_base, set, profile, min_diff)
             ann = ann,
             version = profile.version,
             symbols = profile.symbols,
-            distance = min_diff
+            distance = min_diff,
+            redis_key = profile.ann_key
           }
 
           rspamd_logger.infox(rspamd_config, 'loaded ANN for %s from %s; %s bytes compressed; version=%s',
@@ -909,15 +899,12 @@ local function ann_push_vector(task)
   if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end
   local scores = task:get_metric_score()
   for _,rule in pairs(settings.rules) do
-    local sid = "0"
-    if rule.use_settings then
-      sid = tostring(task:get_settings_id())
-    end
-    if rule.per_user then
-      local r = task:get_principal_recipient()
-      sid = sid .. r
+    local sid = task:get_settings_id() or -1
+
+    if rule.settings[sid] then
+      ann_train_callback(rule, task, scores[1], scores[2], rule.settings[sid])
     end
-    ann_train_callback(rule, task, scores[1], scores[2], sid)
+
   end
 end
 


More information about the Commits mailing list