commit c9e6e26: [Fix] Clickhouse: Avoid potential races in collection

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Nov 5 14:56:06 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-11-05 14:54:05 +0000
URL: https://github.com/rspamd/rspamd/commit/c9e6e26319c08a0e440a9e27b9bf3743e32ad70b (HEAD -> master)

[Fix] Clickhouse: Avoid potential races in collection

---
 src/plugins/lua/clickhouse.lua | 42 ++++++++++++++++++++++++++++++++++++------
 1 file changed, 36 insertions(+), 6 deletions(-)

diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 878a15064..32fe5c25e 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -32,6 +32,7 @@ local custom_rows = {}
 local nrows = 0
 local used_memory = 0
 local last_collection = 0
+local final_call = false -- If the final collection has been started
 local schema_version = 8 -- Current schema version
 
 local settings = {
@@ -375,7 +376,7 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table,
   return false
 end
 
-local function clickhouse_send_data(task, ev_base, why)
+local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
   local log_object = task or rspamd_config
   local upstream = settings.upstream:get_upstream_round_robin()
   local ip_addr = upstream:get_addr():to_string(true)
@@ -429,11 +430,11 @@ local function clickhouse_send_data(task, ev_base, why)
     clickhouse_groups_row(fields)
   end
 
-  send_data('generic data', data_rows,
+  send_data('generic data', gen_rows,
       string.format('INSERT INTO rspamd (%s)',
           table.concat(fields, ',')))
 
-  for k,crows in pairs(custom_rows) do
+  for k,crows in pairs(cust_rows) do
     if #crows > 1 then
       send_data('custom data ('..k..')', crows,
           settings.custom_rules[k].first_row())
@@ -929,19 +930,26 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
     return settings.check_timeout
   end
 
+  if final_call then
+    lua_util.debugm(N, cfg, "no need to send data, final call has been issued")
+    return 0
+  end
+
   if settings.limits.max_rows > 0 then
     if nrows > settings.limits.max_rows then
       need_collect = true
       reason = string.format('limit of rows has been reached: %d', nrows)
     end
   end
-  if settings.limits.max_interval > 0 then
+
+  if last_collection > 0 and settings.limits.max_interval > 0 then
     if now - last_collection > settings.limits.max_interval then
       need_collect = true
       reason = string.format('limit of time since last collection has been reached: %d seconds passed',
           (now - last_collection) - settings.limits.max_interval)
     end
   end
+
   if settings.limits.max_memory > 0 then
     if used_memory >= settings.limits.max_memory then
       need_collect = true
@@ -950,14 +958,22 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
     end
   end
 
+  if last_collection == 0 then
+    last_collection = now
+  end
+
   if need_collect then
-    clickhouse_send_data(nil, ev_base, reason)
+    -- Do it atomic
+    local saved_rows = data_rows
+    local saved_custom = custom_rows
     nrows = 0
     last_collection = now
     used_memory = 0
     data_rows = {}
     custom_rows = {}
 
+    clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)
+
     if settings.collect_garbadge then
       collectgarbage()
     end
@@ -1243,7 +1259,21 @@ if opts then
     })
     rspamd_config:register_finish_script(function(task)
       if nrows > 0 then
-        clickhouse_send_data(task, nil, 'final collection')
+        final_call = true
+        local saved_rows = data_rows
+        local saved_custom = custom_rows
+
+        nrows = 0
+        data_rows = {}
+        used_memory = 0
+        custom_rows = {}
+
+        clickhouse_send_data(task, nil, 'final collection',
+            saved_rows, saved_custom)
+
+        if settings.collect_garbadge then
+          collectgarbage()
+        end
       end
     end)
     -- Create tables on load


More information about the Commits mailing list