commit 1e3f1f0: [Project] Clickhouse: Add extra columns concept

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Feb 17 14:42:07 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-02-17 10:04:19 +0000
URL: https://github.com/rspamd/rspamd/commit/1e3f1f0c2d454ee283c05c70912993cf199e0fe1

[Project] Clickhouse: Add extra columns concept

---
 src/plugins/lua/clickhouse.lua | 112 ++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 110 insertions(+), 2 deletions(-)

diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 442aee4c1..ab1693b48 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -91,7 +91,8 @@ local settings = {
     method = 'detach',
     period_months = 3,
     run_every = '7d',
-  }
+  },
+  extra_columns = {},
 }
 
 --- @language SQL
@@ -350,6 +351,10 @@ local function clickhouse_asn_row(res)
   for _,v in ipairs(fields) do table.insert(res, v) end
 end
 
+local function clickhouse_extra_columns(res)
+  for _,v in ipairs(settings.extra_columns) do table.insert(res, v.name) end
+end
+
 local function today(ts)
   return os.date('!%Y-%m-%d', ts)
 end
@@ -430,6 +435,10 @@ local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
     clickhouse_groups_row(fields)
   end
 
+  if #settings.extra_columns > 0 then
+    clickhouse_extra_columns(fields)
+  end
+
   send_data('generic data', gen_rows,
       string.format('INSERT INTO rspamd (%s)',
           table.concat(fields, ',')))
@@ -831,6 +840,19 @@ local function clickhouse_collect(task)
     table.insert(row, gr_scores_tab)
   end
 
+  -- Extra columns
+  if #settings.extra_columns > 0 then
+    for _,col in ipairs(settings.extra_columns) do
+      local elts = col.selector(task)
+
+      if elts then
+        table.insert(row, elts)
+      else
+        table.insert(row, col.default_value)
+      end
+    end
+  end
+
   -- Custom data
   for k,rule in pairs(settings.custom_rules) do
     if not custom_rows[k] then custom_rows[k] = {} end
@@ -1107,6 +1129,47 @@ local function maybe_apply_migrations(upstream, ev_base, cfg, version)
   migration_recursor(version)
 end
 
+local function add_extra_columns(upstream, ev_base, cfg)
+  local ch_params = {
+    ev_base = ev_base,
+    config = cfg,
+  }
+  -- Apply migrations sequentially
+  local function columns_recursor(i)
+    if i <= #settings.extra_columns  then
+      local col = settings.extra_columns[i]
+      local prev_column
+      if i == 1 then
+        prev_column = 'Digest'
+      else
+        prev_column = settings.extra_columns[i - 1].name
+      end
+      local sql = string.format('ALTER TABLE rspamd ADD COLUMN IF NOT EXISTS `%s` %s AFTER `%s`',
+          col.name, col.type, prev_column)
+      local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+          function(_, _)
+            rspamd_logger.infox(rspamd_config,
+                'added extra column %s (%s) after %s',
+                col.name, col.type, prev_column)
+            -- Apply the next statement
+            columns_recursor(i + 1)
+          end ,
+          function(_, err)
+            rspamd_logger.errx(rspamd_config,
+                "cannot apply add column alter %s: '%s' on clickhouse server %s: %s",
+                i, sql, upstream:get_addr():to_string(true), err)
+          end)
+      if not ret then
+        rspamd_logger.errx(rspamd_config,
+            "cannot apply add column alter %s: '%s' on clickhouse server %s: cannot make request",
+            i, sql, upstream:get_addr():to_string(true))
+      end
+    end
+  end
+
+  columns_recursor(1)
+end
+
 local function check_rspamd_table(upstream, ev_base, cfg)
   local ch_params = {
     ev_base = ev_base,
@@ -1173,6 +1236,10 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
     local version = tonumber(rows[1].v)
     maybe_apply_migrations(upstream, ev_base, cfg, version)
   end
+
+  if #settings.extra_columns > 0 then
+    add_extra_columns(upstream, ev_base, cfg)
+  end
 end
 
 local opts = rspamd_config:get_all_opt('clickhouse')
@@ -1223,7 +1290,7 @@ if opts then
         end
       end
     else
-      settings[k] = v
+      settings[k] = lua_util.deepcopy(v)
     end
   end
 
@@ -1251,6 +1318,47 @@ if opts then
           settings.exceptions, N)
     end
 
+    if settings.extra_columns then
+      -- Check sanity and create selector closures
+      local lua_selectors = require "lua_selectors"
+
+      for col_name,col_data in pairs(settings.extra_columns) do
+        if not col_data.selector or not col_data.type then
+          rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: no type or no selector',
+              col_name)
+        else
+          local selector = lua_selectors.create_selector_closure(rspamd_config,
+              col_data.selector, col_data.delimiter or '', false)
+
+          if not selector then
+            rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: bad selector: %s',
+                col_name, col_data.selector)
+            -- Remove column
+            settings.extra_columns[col_name] = nil
+          else
+            if not col_data.default_value then
+              if col_data.type:lower():match('Array') then
+                col_data.default_value = {}
+              else
+                col_data.default_value = ''
+              end
+            end
+          end
+        end
+      end
+
+      -- Convert extra columns from a map to an array sorted by column name to
+      -- preserve strict order when doing altering
+      local extra_cols = {}
+      for col_name,col_data in pairs(settings.extra_columns) do
+        local nelt = lua_util.shallowcopy(col_data)
+        nelt.name = col_name
+        extra_cols[#extra_cols + 1] = nelt
+      end
+      table.sort(extra_cols, function(c1, c2) return c1.name < c2.name end)
+      settings.extra_columns = extra_cols
+    end
+
     rspamd_config:register_symbol({
       name = 'CLICKHOUSE_COLLECT',
       type = 'idempotent',


More information about the Commits mailing list