commit 7a77919: [Feature] Clickhouse: Rework Clickhouse collection logic

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Oct 29 16:00:11 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-10-29 15:55:58 +0000
URL: https://github.com/rspamd/rspamd/commit/7a7791926e0ad43ce69d3ba9cc5c73dc8fa9d3a2 (HEAD -> master)

[Feature] Clickhouse: Rework Clickhouse collection logic
Issue: #3127

---
 src/plugins/lua/clickhouse.lua | 304 ++++++++++++++++++++++++-----------------
 1 file changed, 182 insertions(+), 122 deletions(-)

diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 45c555460..93165d842 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -15,7 +15,6 @@ limitations under the License.
 ]]--
 
 local rspamd_logger = require 'rspamd_logger'
-local rspamd_lua_utils = require "lua_util"
 local upstream_list = require "rspamd_upstream_list"
 local lua_util = require "lua_util"
 local lua_clickhouse = require "lua_clickhouse"
@@ -31,10 +30,18 @@ end
 local data_rows = {}
 local custom_rows = {}
 local nrows = 0
+local used_memory = 0
+local last_collection = 0
 local schema_version = 8 -- Current schema version
 
 local settings = {
-  limit = 1000,
+  limits = { -- Collection limits
+    max_rows = 1000, -- How many rows are allowed (0 for disable this)
+    max_memory = 50 * 1024 * 1024, -- How many memory should be occupied before sending collection
+    max_interval = 60, -- Maximum collection interval
+  },
+  collect_garbage = false, -- Peform GC collection after sending the data
+  check_timeout = 10.0, -- Periodic timeout
   timeout = 5.0,
   bayes_spam_symbols = {'BAYES_SPAM'},
   bayes_ham_symbols = {'BAYES_HAM'},
@@ -368,23 +375,25 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table,
   return false
 end
 
-local function clickhouse_send_data(task, ev_base)
+local function clickhouse_send_data(task, ev_base, why)
   local log_object = task or rspamd_config
   local upstream = settings.upstream:get_upstream_round_robin()
   local ip_addr = upstream:get_addr():to_string(true)
+  rspamd_logger.infox(log_object, "trying to send %s rows to clickhouse server %s; started as %s",
+      nrows, ip_addr, why)
 
   local function gen_success_cb(what, how_many)
     return function (_, _)
-      rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s",
-          how_many, what, ip_addr)
+      rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s; started as %s",
+          how_many, what, ip_addr, why)
       upstream:ok()
     end
   end
 
   local function gen_fail_cb(what, how_many)
     return function (_, err)
-      rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s",
-          how_many, what, ip_addr, err)
+      rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s; started as %s",
+          how_many, what, ip_addr, err, why)
       upstream:fail()
     end
   end
@@ -437,7 +446,7 @@ local function clickhouse_collect(task)
     return
   end
 
-  if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then
+  if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
     return
   end
 
@@ -828,15 +837,13 @@ local function clickhouse_collect(task)
   end
 
   nrows = nrows + 1
-  data_rows[#data_rows + 1] = lua_clickhouse.row_to_tsv(row)
-  lua_util.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
-
-  if nrows >= settings['limit'] then
-    clickhouse_send_data(task)
-    nrows = 0
-    data_rows = {}
-    custom_rows = {}
-  end
+  local tsv_row = lua_clickhouse.row_to_tsv(row)
+  used_memory = used_memory + #tsv_row
+  data_rows[#data_rows + 1] = tsv_row
+  lua_util.debugm(N, task,
+      "add clickhouse row %s / %s; used memory: %s / %s",
+      nrows, settings.limits.max_rows,
+      used_memory, settings.limits.max_memory)
 end
 
 local function do_remove_partition(ev_base, cfg, table_name, partition_id)
@@ -850,7 +857,7 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id)
     ['partition_id']   = partition_id
   }
 
-  local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params)
+  local sql = lua_util.template(remove_partition_sql, sql_params)
 
   local ch_params = {
     body = sql,
@@ -913,6 +920,50 @@ local function get_last_removal_ago()
   return (last_ts + settings.retention.period) - current_ts
 end
 
+local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
+  local need_collect = false
+  local reason
+
+  if nrows == 0 then
+    lua_util.debugm(N, cfg, "no need to send data, as there are no rows to collect")
+    return settings.check_timeout
+  end
+
+  if settings.limits.max_rows > 0 then
+    if nrows > settings.max_rows then
+      need_collect = true
+      reason = 'limit of rows has been reached'
+    end
+  end
+  if settings.limits.max_interval > 0 then
+    if now - last_collection > settings.limits.max_interval then
+      need_collect = true
+      reason = 'limit of time since last collection has been reached'
+    end
+  end
+  if settings.limits.max_memory > 0 then
+    if used_memory >= settings.limits.max_memory then
+      need_collect = true
+      reason = 'limit of memory has been reached'
+    end
+  end
+
+  if need_collect then
+    clickhouse_send_data(nil, ev_base, reason)
+    nrows = 0
+    last_collection = now
+    used_memory = 0
+    data_rows = {}
+    custom_rows = {}
+
+    if settings.collect_garbadge then
+      collectgarbage()
+    end
+  end
+
+  return settings.check_timeout
+end
+
 local function clickhouse_remove_old_partitions(cfg, ev_base)
   local last_time_ago = get_last_removal_ago()
   if last_time_ago == nil then
@@ -932,7 +983,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
     tables = tables,
     month  = settings.retention.period_months,
   }
-  local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
+  local sql = lua_util.template(partition_to_remove_sql, sql_params)
 
 
   local ch_params = {
@@ -1076,7 +1127,7 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
   -- If we have some custom rules, we just send its schema to the upstream
   for k,rule in pairs(settings.custom_rules) do
     if rule.schema then
-      local sql = rspamd_lua_utils.template(rule.schema, settings)
+      local sql = lua_util.template(rule.schema, settings)
       local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
       if err then
         rspamd_logger.errx(rspamd_config, 'cannot send custom schema %s to clickhouse server %s: ' ..
@@ -1091,10 +1142,12 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
   local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
   if err then
     if rows and rows.code == 404 then
-      rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+      rspamd_logger.infox(rspamd_config,
+          'table rspamd_version does not exist, check rspamd table')
       check_rspamd_table(upstream, ev_base, cfg)
     else
-      rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: %s",
+      rspamd_logger.errx(rspamd_config,
+          "cannot get version on clickhouse server %s: %s",
         upstream:get_addr():to_string(true), err)
     end
   else
@@ -1105,125 +1158,132 @@ end
 
 local opts = rspamd_config:get_all_opt('clickhouse')
 if opts then
-    for k,v in pairs(opts) do
-      if k == 'custom_rules' then
-        if not v[1] then
-          v = {v}
-        end
+  -- Legacy `limit` options
+  if opts.limit and not opts.limits then
+    settings.limits.max_rows = opts.limit
+  end
+  for k,v in pairs(opts) do
+    if k == 'custom_rules' then
+      if not v[1] then
+        v = {v}
+      end
 
-        for i,rule in ipairs(v) do
-          if rule.schema and rule.first_row and rule.get_row then
-            local first_row, get_row
-            local loadstring = loadstring or load
-            local ret, res_or_err = pcall(loadstring(rule.first_row))
-
-            if not ret or type(res_or_err) ~= 'function' then
-              rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function',
-                  res_or_err)
-            else
-              first_row = res_or_err
-            end
+      for i,rule in ipairs(v) do
+        if rule.schema and rule.first_row and rule.get_row then
+          local first_row, get_row
+          local loadstring = loadstring or load
+          local ret, res_or_err = pcall(loadstring(rule.first_row))
 
-            ret, res_or_err = pcall(loadstring(rule.get_row))
+          if not ret or type(res_or_err) ~= 'function' then
+            rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function',
+                res_or_err)
+          else
+            first_row = res_or_err
+          end
 
-            if not ret or type(res_or_err) ~= 'function' then
-              rspamd_logger.errx(rspamd_config, 'invalid get_row (%s) - must be a function',
-                  res_or_err)
-            else
-              get_row = res_or_err
-            end
+          ret, res_or_err = pcall(loadstring(rule.get_row))
 
-            if first_row and get_row then
-              local name = rule.name or tostring(i)
-              settings.custom_rules[name] = {
-                schema = rule.schema,
-                first_row = first_row,
-                get_row = get_row,
-              }
-            end
+          if not ret or type(res_or_err) ~= 'function' then
+            rspamd_logger.errx(rspamd_config,
+                'invalid get_row (%s) - must be a function',
+                res_or_err)
           else
-            rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row')
+            get_row = res_or_err
           end
+
+          if first_row and get_row then
+            local name = rule.name or tostring(i)
+            settings.custom_rules[name] = {
+              schema = rule.schema,
+              first_row = first_row,
+              get_row = get_row,
+            }
+          end
+        else
+          rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row')
         end
-      else
-        settings[k] = v
       end
+    else
+      settings[k] = v
     end
+  end
 
-    if not settings['server'] and not settings['servers'] then
-      rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
-      rspamd_lua_utils.disable_module(N, "config")
-    else
-      settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
+  if not settings['server'] and not settings['servers'] then
+    rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
+    lua_util.disable_module(N, "config")
+  else
+    settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
         'regexp', 'clickhouse specific domains')
 
-      settings.upstream = upstream_list.create(rspamd_config,
+    settings.upstream = upstream_list.create(rspamd_config,
         settings['server'] or settings['servers'], 8123)
 
-      if not settings.upstream then
-        rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
-            settings['server'] or settings['servers'])
-        rspamd_lua_utils.disable_module(N, "config")
-        return
-      end
+    if not settings.upstream then
+      rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
+          settings['server'] or settings['servers'])
+      lua_util.disable_module(N, "config")
+      return
+    end
 
-      if settings.exceptions then
-        local maps_expressions = require "lua_maps_expressions"
+    if settings.exceptions then
+      local maps_expressions = require "lua_maps_expressions"
 
-        settings.exceptions = maps_expressions.create(rspamd_config,
-            settings.exceptions, N)
-      end
+      settings.exceptions = maps_expressions.create(rspamd_config,
+          settings.exceptions, N)
+    end
 
-      rspamd_config:register_symbol({
-        name = 'CLICKHOUSE_COLLECT',
-        type = 'idempotent',
-        callback = clickhouse_collect,
-        priority = 10,
-        flags = 'empty,explicit_disable,ignore_passthrough',
-      })
-      rspamd_config:register_finish_script(function(task)
-        if nrows > 0 then
-          clickhouse_send_data(task, nil)
+    rspamd_config:register_symbol({
+      name = 'CLICKHOUSE_COLLECT',
+      type = 'idempotent',
+      callback = clickhouse_collect,
+      priority = 10,
+      flags = 'empty,explicit_disable,ignore_passthrough',
+    })
+    rspamd_config:register_finish_script(function(task)
+      if nrows > 0 then
+        clickhouse_send_data(task, nil, 'final collection')
+      end
+    end)
+    -- Create tables on load
+    rspamd_config:add_on_load(function(cfg, ev_base, worker)
+      rspamd_config:add_periodic(ev_base, 0,
+          clickhouse_maybe_send_data_periodic, true)
+      if worker:is_primary_controller() then
+        local upstreams = settings.upstream:all_upstreams()
+
+        for _,up in ipairs(upstreams) do
+          check_clickhouse_upstream(up, ev_base, cfg)
         end
-      end)
-      -- Create tables on load
-      rspamd_config:add_on_load(function(cfg, ev_base, worker)
-        if worker:is_primary_controller() then
-          local upstreams = settings.upstream:all_upstreams()
-
-          for _,up in ipairs(upstreams) do
-            check_clickhouse_upstream(up, ev_base, cfg)
-          end
 
-          if settings.retention.enable and settings.retention.method ~= 'drop' and
-              settings.retention.method ~= 'detach' then
-            rspamd_logger.errx(rspamd_config,
-                "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
-                settings.retention.method)
-            settings.retention.enable = false
-          end
-          if settings.retention.enable and settings.retention.period_months < 1 or
-              settings.retention.period_months > 1000 then
-            rspamd_logger.errx(rspamd_config,
-                "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
-                settings.retention.period_months)
-            settings.retention.enable = false
-          end
-          local period = lua_util.parse_time_interval(settings.retention.run_every)
-          if settings.retention.enable and period == nil then
-            rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
-                    settings.retention.run_every)
-            settings.retention.enable = false
-          end
+        if settings.retention.enable and settings.retention.method ~= 'drop' and
+            settings.retention.method ~= 'detach' then
+          rspamd_logger.errx(rspamd_config,
+              "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
+              settings.retention.method)
+          settings.retention.enable = false
+        end
+        if settings.retention.enable and settings.retention.period_months < 1 or
+            settings.retention.period_months > 1000 then
+          rspamd_logger.errx(rspamd_config,
+              "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
+              settings.retention.period_months)
+          settings.retention.enable = false
+        end
+        local period = lua_util.parse_time_interval(settings.retention.run_every)
+        if settings.retention.enable and period == nil then
+          rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
+              settings.retention.run_every)
+          settings.retention.enable = false
+        end
 
-          if settings.retention.enable then
-            settings.retention.period = period
-            rspamd_logger.infox(rspamd_config,
-                "retention will be performed each %s seconds for %s month with method %s",
-                period, settings.retention.period_months, settings.retention.method)
-            rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
-          end
+        if settings.retention.enable then
+          settings.retention.period = period
+          rspamd_logger.infox(rspamd_config,
+              "retention will be performed each %s seconds for %s month with method %s",
+              period, settings.retention.period_months, settings.retention.method)
+          rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
         end
-      end)
-    end
+      end
+    end)
+  end
 end


More information about the Commits mailing list