commit 1e3f1f0: [Project] Clickhouse: Add extra columns concept
Vsevolod Stakhov
vsevolod at highsecure.ru
Mon Feb 17 14:42:07 UTC 2020
Author: Vsevolod Stakhov
Date: 2020-02-17 10:04:19 +0000
URL: https://github.com/rspamd/rspamd/commit/1e3f1f0c2d454ee283c05c70912993cf199e0fe1
[Project] Clickhouse: Add extra columns concept
---
src/plugins/lua/clickhouse.lua | 112 ++++++++++++++++++++++++++++++++++++++++-
1 file changed, 110 insertions(+), 2 deletions(-)
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 442aee4c1..ab1693b48 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -91,7 +91,8 @@ local settings = {
method = 'detach',
period_months = 3,
run_every = '7d',
- }
+ },
+ extra_columns = {},
}
--- @language SQL
@@ -350,6 +351,10 @@ local function clickhouse_asn_row(res)
for _,v in ipairs(fields) do table.insert(res, v) end
end
+local function clickhouse_extra_columns(res)
+ for _,v in ipairs(settings.extra_columns) do table.insert(res, v.name) end
+end
+
local function today(ts)
return os.date('!%Y-%m-%d', ts)
end
@@ -430,6 +435,10 @@ local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
clickhouse_groups_row(fields)
end
+ if #settings.extra_columns > 0 then
+ clickhouse_extra_columns(fields)
+ end
+
send_data('generic data', gen_rows,
string.format('INSERT INTO rspamd (%s)',
table.concat(fields, ',')))
@@ -831,6 +840,19 @@ local function clickhouse_collect(task)
table.insert(row, gr_scores_tab)
end
+ -- Extra columns
+ if #settings.extra_columns > 0 then
+ for _,col in ipairs(settings.extra_columns) do
+ local elts = col.selector(task)
+
+ if elts then
+ table.insert(row, elts)
+ else
+ table.insert(row, col.default_value)
+ end
+ end
+ end
+
-- Custom data
for k,rule in pairs(settings.custom_rules) do
if not custom_rows[k] then custom_rows[k] = {} end
@@ -1107,6 +1129,47 @@ local function maybe_apply_migrations(upstream, ev_base, cfg, version)
migration_recursor(version)
end
+local function add_extra_columns(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply migrations sequentially
+ local function columns_recursor(i)
+ if i <= #settings.extra_columns then
+ local col = settings.extra_columns[i]
+ local prev_column
+ if i == 1 then
+ prev_column = 'Digest'
+ else
+ prev_column = settings.extra_columns[i - 1].name
+ end
+ local sql = string.format('ALTER TABLE rspamd ADD COLUMN IF NOT EXISTS `%s` %s AFTER `%s`',
+ col.name, col.type, prev_column)
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config,
+ 'added extra column %s (%s) after %s',
+ col.name, col.type, prev_column)
+ -- Apply the next statement
+ columns_recursor(i + 1)
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply add column alter %s: '%s' on clickhouse server %s: %s",
+ i, sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply add column alter %s: '%s' on clickhouse server %s: cannot make request",
+ i, sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ columns_recursor(1)
+end
+
local function check_rspamd_table(upstream, ev_base, cfg)
local ch_params = {
ev_base = ev_base,
@@ -1173,6 +1236,10 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
local version = tonumber(rows[1].v)
maybe_apply_migrations(upstream, ev_base, cfg, version)
end
+
+ if #settings.extra_columns > 0 then
+ add_extra_columns(upstream, ev_base, cfg)
+ end
end
local opts = rspamd_config:get_all_opt('clickhouse')
@@ -1223,7 +1290,7 @@ if opts then
end
end
else
- settings[k] = v
+ settings[k] = lua_util.deepcopy(v)
end
end
@@ -1251,6 +1318,47 @@ if opts then
settings.exceptions, N)
end
+ if settings.extra_columns then
+ -- Check sanity and create selector closures
+ local lua_selectors = require "lua_selectors"
+
+ for col_name,col_data in pairs(settings.extra_columns) do
+ if not col_data.selector or not col_data.type then
+ rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: no type or no selector',
+ col_name)
+ else
+ local selector = lua_selectors.create_selector_closure(rspamd_config,
+ col_data.selector, col_data.delimiter or '', false)
+
+ if not selector then
+ rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: bad selector: %s',
+ col_name, col_data.selector)
+ -- Remove column
+ settings.extra_columns[col_name] = nil
+ else
+ if not col_data.default_value then
+ if col_data.type:lower():match('Array') then
+ col_data.default_value = {}
+ else
+ col_data.default_value = ''
+ end
+ end
+ end
+ end
+ end
+
+ -- Convert extra columns from a map to an array sorted by column name to
+ -- preserve strict order when doing altering
+ local extra_cols = {}
+ for col_name,col_data in pairs(settings.extra_columns) do
+ local nelt = lua_util.shallowcopy(col_data)
+ nelt.name = col_name
+ extra_cols[#extra_cols + 1] = nelt
+ end
+ table.sort(extra_cols, function(c1, c2) return c1.name < c2.name end)
+ settings.extra_columns = extra_cols
+ end
+
rspamd_config:register_symbol({
name = 'CLICKHOUSE_COLLECT',
type = 'idempotent',
More information about the Commits
mailing list