commit ac5e940: [Feature] Add `sentinel_password` option

Vsevolod Stakhov vsevolod at rspamd.com
Tue Aug 1 15:14:04 UTC 2023


Author: Vsevolod Stakhov
Date: 2023-08-01 16:07:13 +0100
URL: https://github.com/rspamd/rspamd/commit/ac5e940daa667b7ec2d1d29d2718806d83ee3a2c (HEAD -> master)

[Feature] Add `sentinel_password` option

---
 lualib/lua_redis.lua | 236 +++++++++++++++++++++++++++------------------------
 1 file changed, 126 insertions(+), 110 deletions(-)

diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index 62511451e..2a43a915c 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -36,34 +36,33 @@ local common_schema = ts.shape {
   sentinel_watch_time = (ts.number + ts.string / lutil.parse_time_interval):is_optional(),
   sentinel_masters_pattern = ts.string:is_optional(),
   sentinel_master_maxerrors = (ts.number + ts.string / tonumber):is_optional(),
+  sentinel_password = ts.string:is_optional(),
 }
 
-local config_schema =
-  -- Allow separate read/write servers to allow usage in the `extra_fields`
-  ts.shape({
-    read_servers = ts.string + ts.array_of(ts.string),
-  }, {extra_fields = common_schema}) +
-  ts.shape({
-    write_servers = ts.string + ts.array_of(ts.string),
-  }, {extra_fields = common_schema}) +
-  ts.shape({
-    read_servers = ts.string + ts.array_of(ts.string),
-    write_servers = ts.string + ts.array_of(ts.string),
-  }, {extra_fields = common_schema}) +
-  ts.shape({
-    servers = ts.string + ts.array_of(ts.string),
-  }, {extra_fields = common_schema}) +
-  ts.shape({
-    server = ts.string + ts.array_of(ts.string),
-  }, {extra_fields = common_schema})
+local config_schema = -- Allow separate read/write servers to allow usage in the `extra_fields`
+ts.shape({
+  read_servers = ts.string + ts.array_of(ts.string),
+}, { extra_fields = common_schema }) +
+    ts.shape({
+      write_servers = ts.string + ts.array_of(ts.string),
+    }, { extra_fields = common_schema }) +
+    ts.shape({
+      read_servers = ts.string + ts.array_of(ts.string),
+      write_servers = ts.string + ts.array_of(ts.string),
+    }, { extra_fields = common_schema }) +
+    ts.shape({
+      servers = ts.string + ts.array_of(ts.string),
+    }, { extra_fields = common_schema }) +
+    ts.shape({
+      server = ts.string + ts.array_of(ts.string),
+    }, { extra_fields = common_schema })
 
 exports.config_schema = config_schema
 
-
 local function redis_query_sentinel(ev_base, params, initialised)
   local function flatten_redis_table(tbl)
     local res = {}
-    for i=1,#tbl,2 do
+    for i = 1, #tbl, 2 do
       res[tbl[i]] = tbl[i + 1]
     end
 
@@ -83,12 +82,12 @@ local function redis_query_sentinel(ev_base, params, initialised)
 
       local pending_subrequests = 0
 
-      for _,m in ipairs(result) do
+      for _, m in ipairs(result) do
         local master = flatten_redis_table(m)
 
         -- Wrap IPv6-addresses in brackets
         if (master.ip:match(":")) then
-          master.ip = "["..master.ip.."]"
+          master.ip = "[" .. master.ip .. "]"
         end
 
         if params.sentinel_masters_pattern then
@@ -108,18 +107,18 @@ local function redis_query_sentinel(ev_base, params, initialised)
       end
 
       -- For each master we need to get a list of slaves
-      for k,v in pairs(masters) do
+      for k, v in pairs(masters) do
         v.slaves = {}
         local function slaves_cb(slave_err, slave_result)
           if not slave_err and type(slave_result) == 'table' then
-            for _,s in ipairs(slave_result) do
+            for _, s in ipairs(slave_result) do
               local slave = flatten_redis_table(s)
               lutil.debugm(N, rspamd_config,
                   'found slave for master %s with ip %s and port %s',
                   v.name, slave.ip, slave.port)
               -- Wrap IPv6-addresses in brackets
               if (slave.ip:match(":")) then
-                slave.ip = "["..slave.ip.."]"
+                slave.ip = "[" .. slave.ip .. "]"
               end
               v.slaves[#v.slaves + 1] = slave
             end
@@ -137,16 +136,17 @@ local function redis_query_sentinel(ev_base, params, initialised)
           end
         end
 
-        local ret = rspamd_redis.make_request({
+        local ret = rspamd_redis.make_request {
           host = addr:get_addr(),
           timeout = params.timeout,
+          password = params.sentinel_password,
           config = rspamd_config,
           ev_base = ev_base,
           cmd = 'SENTINEL',
-          args = {'slaves', k},
+          args = { 'slaves', k },
           no_pool = true,
           callback = slaves_cb
-        })
+        }
 
         if not ret then
           logger.errx(rspamd_config, 'cannot connect sentinel when query slaves at address: %s',
@@ -165,16 +165,17 @@ local function redis_query_sentinel(ev_base, params, initialised)
     end
   end
 
-  local ret = rspamd_redis.make_request({
+  local ret = rspamd_redis.make_request {
     host = addr:get_addr(),
     timeout = params.timeout,
     config = rspamd_config,
     ev_base = ev_base,
+    password = params.sentinel_password,
     cmd = 'SENTINEL',
-    args = {'masters'},
+    args = { 'masters' },
     no_pool = true,
     callback = masters_cb,
-  })
+  }
 
   if not ret then
     logger.errx(rspamd_config, 'cannot connect sentinel at address: %s',
@@ -186,7 +187,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
     -- We now form new strings for masters and slaves
     local read_servers_tbl, write_servers_tbl = {}, {}
 
-    for _,master in pairs(masters) do
+    for _, master in pairs(masters) do
       write_servers_tbl[#write_servers_tbl + 1] = string.format(
           '%s:%s', master.ip, master.port
       )
@@ -194,7 +195,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
           '%s:%s', master.ip, master.port
       )
 
-      for _,slave in ipairs(master.slaves) do
+      for _, slave in ipairs(master.slaves) do
         if slave['master-link-status'] == 'ok' then
           read_servers_tbl[#read_servers_tbl + 1] = string.format(
               '%s:%s', slave.ip, slave.port
@@ -309,7 +310,7 @@ local function calculate_redis_hash(params)
       h:update(k)
       h:update(tostring(v))
     elseif type(v) == 'table' then
-      for kk,vv in pairs(v) do
+      for kk, vv in pairs(v) do
         rec_hash(kk, vv)
       end
     end
@@ -415,17 +416,17 @@ local function process_redis_options(options, rspamd_config, result)
   if options['read_servers'] then
     if rspamd_config then
       upstreams_read = upstream_list.create(rspamd_config,
-        options['read_servers'], default_port)
+          options['read_servers'], default_port)
     else
       upstreams_read = upstream_list.create(options['read_servers'],
-        default_port)
+          default_port)
     end
 
     result.read_servers_str = options['read_servers']
   elseif options['servers'] then
     if rspamd_config then
       upstreams_read = upstream_list.create(rspamd_config,
-        options['servers'], default_port)
+          options['servers'], default_port)
     else
       upstreams_read = upstream_list.create(options['servers'], default_port)
     end
@@ -435,7 +436,7 @@ local function process_redis_options(options, rspamd_config, result)
   elseif options['server'] then
     if rspamd_config then
       upstreams_read = upstream_list.create(rspamd_config,
-        options['server'], default_port)
+          options['server'], default_port)
     else
       upstreams_read = upstream_list.create(options['server'], default_port)
     end
@@ -448,10 +449,10 @@ local function process_redis_options(options, rspamd_config, result)
     if options['write_servers'] then
       if rspamd_config then
         upstreams_write = upstream_list.create(rspamd_config,
-                options['write_servers'], default_port)
+            options['write_servers'], default_port)
       else
         upstreams_write = upstream_list.create(options['write_servers'],
-                default_port)
+            default_port)
       end
       result.write_servers_str = options['write_servers']
       read_only = false
@@ -567,12 +568,12 @@ local function rspamd_parse_redis_server(module_name, module_opts, no_fallback)
 
       -- Exclude disabled
       if opts['disabled_modules'] then
-        for _,v in ipairs(opts['disabled_modules']) do
+        for _, v in ipairs(opts['disabled_modules']) do
           if v == module_name then
             logger.infox(rspamd_config, "NOT using default redis server for module %s: it is disabled",
-              module_name)
+                module_name)
 
-              return nil
+            return nil
           end
         end
       end
@@ -586,7 +587,7 @@ local function rspamd_parse_redis_server(module_name, module_opts, no_fallback)
   end
 
   if result.read_servers then
-      return maybe_return_cached(result)
+    return maybe_return_cached(result)
   end
 
   return nil
@@ -619,7 +620,7 @@ local process_cmd = {
   end,
   blpop = function(args)
     local idx_l = {}
-    for i = 1, #args -1 do
+    for i = 1, #args - 1 do
       table.insert(idx_l, i)
     end
     return idx_l
@@ -635,7 +636,7 @@ local process_cmd = {
     return idx_l
   end,
   set = function(args)
-    return {1}
+    return { 1 }
   end,
   mget = function(args)
     local idx_l = {}
@@ -659,9 +660,10 @@ local process_cmd = {
     return idx_l
   end,
   smove = function(args)
-    return {1, 2}
+    return { 1, 2 }
   end,
-  script = function() end
+  script = function()
+  end
 }
 process_cmd.append = process_cmd.set
 process_cmd.auth = process_cmd.script
@@ -835,12 +837,16 @@ local gen_meta = {
   end,
   principal_recipient_domain = function(task)
     local p = task:get_principal_recipient()
-    if not p then return end
+    if not p then
+      return
+    end
     return string.match(p, '.*@(.*)')
   end,
   ip = function(task)
     local i = task:get_ip()
-    if i and i:is_valid() then return i:to_string() end
+    if i and i:is_valid() then
+      return i:to_string()
+    end
   end,
   from = function(task)
     return ((task:get_from('smtp') or E)[1] or E)['addr']
@@ -850,7 +856,9 @@ local gen_meta = {
   end,
   from_domain_or_helo_domain = function(task)
     local d = ((task:get_from('smtp') or E)[1] or E)['domain']
-    if d and #d > 0 then return d end
+    if d and #d > 0 then
+      return d
+    end
     return task:get_helo()
   end,
   mime_from = function(task)
@@ -864,7 +872,9 @@ local gen_meta = {
 local function gen_get_esld(f)
   return function(task)
     local d = f(task)
-    if not d then return end
+    if not d then
+      return
+    end
     return rspamd_util.get_tld(d)
   end
 end
@@ -912,7 +922,7 @@ end
 -- args - table of arguments
 -- extra_opts - table of optional request arguments
 local function rspamd_redis_make_request(task, redis_params, key, is_write,
-    callback, command, args, extra_opts)
+                                         callback, command, args, extra_opts)
   local addr
   local function rspamd_redis_make_request_cb(err, data)
     if err then
@@ -925,7 +935,7 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write,
     end
   end
   if not task or not redis_params or not callback or not command then
-    return false,nil,nil
+    return false, nil, nil
   end
 
   local rspamd_redis = require "rspamd_redis"
@@ -967,7 +977,7 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write,
   }
 
   if extra_opts then
-    for k,v in pairs(extra_opts) do
+    for k, v in pairs(extra_opts) do
       options[k] = v
     end
   end
@@ -984,14 +994,14 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write,
       ' (host=%s, timeout=%s): cmd: %s', ip_addr,
       options.timeout, options.cmd)
 
-  local ret,conn = rspamd_redis.make_request(options)
+  local ret, conn = rspamd_redis.make_request(options)
 
   if not ret then
     addr:fail()
     logger.warnx(task, "cannot make redis request to: %s", tostring(ip_addr))
   end
 
-  return ret,conn,addr
+  return ret, conn, addr
 end
 
 --[[[
@@ -1010,9 +1020,9 @@ exports.rspamd_redis_make_request = rspamd_redis_make_request
 exports.redis_make_request = rspamd_redis_make_request
 
 local function redis_make_request_taskless(ev_base, cfg, redis_params, key,
-    is_write, callback, command, args, extra_opts)
+                                           is_write, callback, command, args, extra_opts)
   if not ev_base or not redis_params or not callback or not command then
-    return false,nil,nil
+    return false, nil, nil
   end
 
   local addr
@@ -1057,12 +1067,11 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key,
     args = args
   }
   if extra_opts then
-    for k,v in pairs(extra_opts) do
+    for k, v in pairs(extra_opts) do
       options[k] = v
     end
   end
 
-
   if redis_params['password'] then
     options['password'] = redis_params['password']
   end
@@ -1074,13 +1083,13 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key,
   lutil.debugm(N, cfg, 'perform taskless request to redis server' ..
       ' (host=%s, timeout=%s): cmd: %s', options.host:tostring(true),
       options.timeout, options.cmd)
-  local ret,conn = rspamd_redis.make_request(options)
+  local ret, conn = rspamd_redis.make_request(options)
   if not ret then
     logger.errx('cannot execute redis request')
     addr:fail()
   end
 
-  return ret,conn,addr
+  return ret, conn, addr
 end
 
 --[[[
@@ -1101,20 +1110,22 @@ local function script_set_loaded(script)
   end
 
   local wait_table = {}
-  for _,s in ipairs(script.waitq) do
+  for _, s in ipairs(script.waitq) do
     table.insert(wait_table, s)
   end
 
   script.waitq = {}
 
-  for _,s in ipairs(wait_table) do
+  for _, s in ipairs(wait_table) do
     s(script.loaded)
   end
 end
 
 local function prepare_redis_call(script)
   local function merge_tables(t1, t2)
-    for k,v in pairs(t2) do t1[k] = v end
+    for k, v in pairs(t2) do
+      t1[k] = v
+    end
   end
 
   local servers = {}
@@ -1129,12 +1140,12 @@ local function prepare_redis_call(script)
 
   -- Call load script on each server, set loaded flag
   script.in_flight = #servers
-  for _,s in ipairs(servers) do
+  for _, s in ipairs(servers) do
     local cur_opts = {
       host = s:get_addr(),
       timeout = script.redis_params['timeout'],
       cmd = 'SCRIPT',
-      args = {'LOAD', script.script },
+      args = { 'LOAD', script.script },
       upstream = s
     }
 
@@ -1156,7 +1167,7 @@ local function load_script_task(script, task, is_write)
   local rspamd_redis = require "rspamd_redis"
   local opts = prepare_redis_call(script)
 
-  for _,opt in ipairs(opts) do
+  for _, opt in ipairs(opts) do
     opt.task = task
     opt.is_write = is_write
     opt.callback = function(err, data)
@@ -1169,7 +1180,7 @@ local function load_script_task(script, task, is_write)
       else
         opt.upstream:ok()
         logger.infox(task,
-          "uploaded redis script to %s with id %s, sha: %s",
+            "uploaded redis script to %s with id %s, sha: %s",
             opt.upstream:get_addr():to_string(true),
             script.id, data)
         script.sha = data -- We assume that sha is the same on all servers
@@ -1185,7 +1196,7 @@ local function load_script_task(script, task, is_write)
 
     if not ret then
       logger.errx('cannot execute redis request to load script on %s',
-        opt.upstream:get_addr())
+          opt.upstream:get_addr())
       script.in_flight = script.in_flight - 1
       opt.upstream:fail()
     end
@@ -1200,7 +1211,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
   local rspamd_redis = require "rspamd_redis"
   local opts = prepare_redis_call(script)
 
-  for _,opt in ipairs(opts) do
+  for _, opt in ipairs(opts) do
     opt.config = cfg
     opt.ev_base = ev_base
     opt.is_write = is_write
@@ -1214,7 +1225,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
       else
         opt.upstream:ok()
         logger.infox(cfg,
-          "uploaded redis script to %s with id %s, sha: %s",
+            "uploaded redis script to %s with id %s, sha: %s",
             opt.upstream:get_addr():to_string(true), script.id, data)
         script.sha = data -- We assume that sha is the same on all servers
         script.fatal_error = nil
@@ -1229,7 +1240,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
 
     if not ret then
       logger.errx('cannot execute redis request to load script on %s',
-        opt.upstream:get_addr())
+          opt.upstream:get_addr())
       script.in_flight = script.in_flight - 1
       opt.upstream:fail()
     end
@@ -1247,7 +1258,9 @@ local function load_redis_script(script, cfg, ev_base, _)
 end
 
 local function add_redis_script(script, redis_params, caller_level)
-  if not caller_level then caller_level = 2 end
+  if not caller_level then
+    caller_level = 2
+  end
   local caller = debug.getinfo(caller_level)
 
   local new_script = {
@@ -1290,8 +1303,10 @@ local function load_redis_script_from_file(filename, redis_params, dir)
   local lua_util = require "lua_util"
   local rspamd_logger = require "rspamd_logger"
 
-  if not dir then dir = rspamd_paths.LUALIBDIR end
-  if filename:sub(1, 1) ~= package.config:sub(1,1) then
+  if not dir then
+    dir = rspamd_paths.LUALIBDIR
+  end
+  if filename:sub(1, 1) ~= package.config:sub(1, 1) then
     -- Relative path
     filename = lua_util.join_path(dir, "redis_scripts", filename)
   end
@@ -1318,11 +1333,10 @@ local function exec_redis_script(id, params, callback, keys, args)
   local redis_args = {}
 
   if not redis_scripts[id] then
-      logger.errx("cannot find registered script with id %s", id)
+    logger.errx("cannot find registered script with id %s", id)
     return false
   end
 
-
   local script = redis_scripts[id]
 
   if script.fatal_error then
@@ -1363,7 +1377,7 @@ local function exec_redis_script(id, params, callback, keys, args)
     if #redis_args == 0 then
       table.insert(redis_args, script.sha)
       table.insert(redis_args, tostring(#keys))
-      for _,k in ipairs(keys) do
+      for _, k in ipairs(keys) do
         table.insert(redis_args, k)
       end
 
@@ -1376,13 +1390,13 @@ local function exec_redis_script(id, params, callback, keys, args)
 
     if params.task then
       if not rspamd_redis_make_request(params.task, script.redis_params,
-        params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
+          params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
         callback('Cannot make redis request', nil)
       end
     else
       if not redis_make_request_taskless(params.ev_base, rspamd_config,
-        script.redis_params,
-        params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
+          script.redis_params,
+          params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
         callback('Cannot make redis request', nil)
       end
     end
@@ -1414,7 +1428,7 @@ exports.exec_redis_script = exec_redis_script
 
 local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base)
   if not redis_params then
-    return false,nil
+    return false, nil
   end
 
   local rspamd_redis = require "rspamd_redis"
@@ -1446,45 +1460,45 @@ local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base)
     session = redis_params.session or rspamadm_session
   }
 
-  for k,v in pairs(redis_params) do
+  for k, v in pairs(redis_params) do
     options[k] = v
   end
 
   if not options.config then
     logger.errx('config is not set')
-    return false,nil,addr
+    return false, nil, addr
   end
 
   if not options.ev_base then
     logger.errx('ev_base is not set')
-    return false,nil,addr
+    return false, nil, addr
   end
 
   if not options.session then
     logger.errx('session is not set')
-    return false,nil,addr
+    return false, nil, addr
   end
 
-  local ret,conn = rspamd_redis.connect_sync(options)
+  local ret, conn = rspamd_redis.connect_sync(options)
   if not ret then
     logger.errx('cannot create redis connection: %s', conn)
     addr:fail()
 
-    return false,nil,addr
+    return false, nil, addr
   end
 
   if conn then
     local need_exec = false
     if redis_params['password'] then
-      conn:add_cmd('AUTH', {redis_params['password']})
+      conn:add_cmd('AUTH', { redis_params['password'] })
       need_exec = true
     end
 
     if redis_params['db'] then
-      conn:add_cmd('SELECT', {tostring(redis_params['db'])})
+      conn:add_cmd('SELECT', { tostring(redis_params['db']) })
       need_exec = true
     elseif redis_params['dbname'] then
-      conn:add_cmd('SELECT', {tostring(redis_params['dbname'])})
+      conn:add_cmd('SELECT', { tostring(redis_params['dbname']) })
       need_exec = true
     end
 
@@ -1495,12 +1509,12 @@ local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base)
         logger.errx('cannot prepare redis connection (authentication or db selection failure): %s',
             res)
         addr:fail()
-        return false,nil,addr
+        return false, nil, addr
       end
     end
   end
 
-  return ret,conn,addr
+  return ret, conn, addr
 end
 
 exports.redis_connect_sync = redis_connect_sync
@@ -1520,12 +1534,12 @@ exports.request = function(redis_params, attrs, req)
 
   if not attrs or not redis_params or not req then
     logger.errx('invalid arguments for redis request')
-    return false,nil,nil
+    return false, nil, nil
   end
 
   if not (attrs.task or (attrs.config and attrs.ev_base)) then
     logger.errx('invalid attributes for redis request')
-    return false,nil,nil
+    return false, nil, nil
   end
 
   local opts = lua_util.shallowcopy(attrs)
@@ -1593,16 +1607,16 @@ exports.request = function(redis_params, attrs, req)
       opts.timeout, opts.cmd, opts.args)
 
   if opts.callback then
-    local ret,conn = rspamd_redis.make_request(opts)
+    local ret, conn = rspamd_redis.make_request(opts)
     if not ret then
       logger.errx(log_obj, 'cannot execute redis request')
       addr:fail()
     end
 
-    return ret,conn,addr
+    return ret, conn, addr
   else
     -- Coroutines version
-    local ret,conn = rspamd_redis.connect_sync(opts)
+    local ret, conn = rspamd_redis.connect_sync(opts)
     if not ret then
       logger.errx(log_obj, 'cannot execute redis request')
       addr:fail()
@@ -1610,7 +1624,7 @@ exports.request = function(redis_params, attrs, req)
       conn:add_cmd(opts.cmd, opts.args)
       return conn:exec()
     end
-    return false,nil,addr
+    return false, nil, addr
   end
 end
 
@@ -1627,12 +1641,12 @@ exports.connect = function(redis_params, attrs)
 
   if not attrs or not redis_params then
     logger.errx('invalid arguments for redis connect')
-    return false,nil,nil
+    return false, nil, nil
   end
 
   if not (attrs.task or (attrs.config and attrs.ev_base)) then
     logger.errx('invalid attributes for redis connect')
-    return false,nil,nil
+    return false, nil, nil
   end
 
   local opts = lua_util.shallowcopy(attrs)
@@ -1688,24 +1702,24 @@ exports.connect = function(redis_params, attrs)
   end
 
   if opts.callback then
*** OUTPUT TRUNCATED, 46 LINES SKIPPED ***


More information about the Commits mailing list