commit d0d0f33: [Fix] Fix sentinel connections leak by using async connections

Vsevolod Stakhov vsevolod at highsecure.ru
Fri Feb 14 13:21:09 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-02-14 13:19:54 +0000
URL: https://github.com/rspamd/rspamd/commit/d0d0f333d3fd5d10bc6b88dd364cb792e326c8c2 (HEAD -> master)

[Fix] Fix sentinel connections leak by using async connections

---
 lualib/lua_redis.lua | 159 +++++++++++++++++++++++++++++++--------------------
 1 file changed, 96 insertions(+), 63 deletions(-)

diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index fb98a5b9c..017381e02 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -66,76 +66,115 @@ local function redis_query_sentinel(ev_base, params, initialised)
   local rspamd_redis = require "rspamd_redis"
   local addr = params.sentinels:get_upstream_round_robin()
 
-  local is_ok, connection = rspamd_redis.connect_sync({
-    host = addr:get_addr(),
-    timeout = params.timeout,
-    config = rspamd_config,
-    ev_base = ev_base,
-    no_pool = true,
-  })
-
-  if not is_ok then
-    logger.errx(rspamd_config, 'cannot connect sentinel at address: %s',
-        tostring(addr:get_addr()))
-    addr:fail()
-
-    return
-  end
+  local host = addr:get_addr()
+  local masters = {}
+  local process_masters -- Function that is called to process masters data
 
-  -- Get masters list
-  connection:add_cmd('SENTINEL', {'masters'})
+  local function masters_cb(err, result)
+    if not err and result and type(result) == 'table' then
 
-  local ok,result = connection:exec()
+      local pending_subrequests = 0
 
-  if ok and result and type(result) == 'table' then
-    local masters = {}
-    for _,m in ipairs(result) do
-      local master = flatten_redis_table(m)
+      for _,m in ipairs(result) do
+        local master = flatten_redis_table(m)
 
-      -- Wrap IPv6-adresses in brackets
-      if (master.ip:match(":")) then
-        master.ip = "["..master.ip.."]"
-      end
+        -- Wrap IPv6-adresses in brackets
+        if (master.ip:match(":")) then
+          master.ip = "["..master.ip.."]"
+        end
 
-      if params.sentinel_masters_pattern then
-        if master.name:match(params.sentinel_masters_pattern) then
+        if params.sentinel_masters_pattern then
+          if master.name:match(params.sentinel_masters_pattern) then
+            lutil.debugm(N, 'found master %s with ip %s and port %s',
+                master.name, master.ip, master.port)
+            masters[master.name] = master
+          else
+            lutil.debugm(N, 'skip master %s with ip %s and port %s, pattern %s',
+                master.name, master.ip, master.port, params.sentinel_masters_pattern)
+          end
+        else
           lutil.debugm(N, 'found master %s with ip %s and port %s',
               master.name, master.ip, master.port)
           masters[master.name] = master
-        else
-          lutil.debugm(N, 'skip master %s with ip %s and port %s, pattern %s',
-              master.name, master.ip, master.port, params.sentinel_masters_pattern)
         end
-      else
-        lutil.debugm(N, 'found master %s with ip %s and port %s',
-            master.name, master.ip, master.port)
-        masters[master.name] = master
       end
-    end
 
-    -- For each master we need to get a list of slaves
-    for k,v in pairs(masters) do
-      v.slaves = {}
-      local slave_result
-
-      connection:add_cmd('SENTINEL', {'slaves', k})
-      ok,slave_result = connection:exec()
-
-      if ok then
-        for _,s in ipairs(slave_result) do
-          local slave = flatten_redis_table(s)
-          lutil.debugm(N, rspamd_config,
-              'found slave for master %s with ip %s and port %s',
-              v.name, slave.ip, slave.port)
-          -- Wrap IPv6-adresses in brackets
-          if (slave.ip:match(":")) then
-            slave.ip = "["..slave.ip.."]"
+      -- For each master we need to get a list of slaves
+      for k,v in pairs(masters) do
+        v.slaves = {}
+        local function slaves_cb(slave_err, slave_result)
+          if not slave_err and type(slave_result) == 'table' then
+            for _,s in ipairs(slave_result) do
+              local slave = flatten_redis_table(s)
+              lutil.debugm(N, rspamd_config,
+                  'found slave for master %s with ip %s and port %s',
+                  v.name, slave.ip, slave.port)
+              -- Wrap IPv6-adresses in brackets
+              if (slave.ip:match(":")) then
+                slave.ip = "["..slave.ip.."]"
+              end
+              v.slaves[#v.slaves + 1] = slave
+            end
+          else
+            logger.errx('cannot get slaves data from Redis Sentinel %s: %s',
+                host:to_string(true), slave_err)
+            addr:fail()
+          end
+
+          pending_subrequests = pending_subrequests - 1
+
+          if pending_subrequests == 0 then
+            -- Finalize masters and slaves
+            process_masters()
           end
-          v.slaves[#v.slaves + 1] = slave
+        end
+
+        local ret = rspamd_redis.make_request({
+          host = addr:get_addr(),
+          timeout = params.timeout,
+          config = rspamd_config,
+          ev_base = ev_base,
+          cmd = 'SENTINEL',
+          args = {'slaves', k},
+          no_pool = true,
+          callback = slaves_cb
+        })
+
+        if not ret then
+          logger.errx(rspamd_config, 'cannot connect sentinel when query slaves at address: %s',
+              host:to_string(true))
+          addr:fail()
+        else
+          pending_subrequests = pending_subrequests + 1
         end
       end
+
+      addr:ok()
+    else
+      logger.errx('cannot get masters data from Redis Sentinel %s: %s',
+          host:to_string(true), err)
+      addr:fail()
     end
+  end
 
+  local ret = rspamd_redis.make_request({
+    host = addr:get_addr(),
+    timeout = params.timeout,
+    config = rspamd_config,
+    ev_base = ev_base,
+    cmd = 'SENTINEL',
+    args = {'masters'},
+    no_pool = true,
+    callback = masters_cb,
+  })
+
+  if not ret then
+    logger.errx(rspamd_config, 'cannot connect sentinel at address: %s',
+        host:to_string(true))
+    addr:fail()
+  end
+
+  process_masters = function()
     -- We now form new strings for masters and slaves
     local read_servers_tbl, write_servers_tbl = {}, {}
 
@@ -175,7 +214,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
 
       if read_upstreams then
         logger.infox(rspamd_config, 'sentinel %s: replace read servers with new list: %s',
-            addr:get_addr():to_string(true), read_servers_str)
+            host:to_string(true), read_servers_str)
         params.read_servers = read_upstreams
         params.read_servers_str = read_servers_str
       end
@@ -189,7 +228,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
 
       if write_upstreams then
         logger.infox(rspamd_config, 'sentinel %s: replace write servers with new list: %s',
-            addr:get_addr():to_string(true), write_servers_str)
+            host:to_string(true), write_servers_str)
         params.write_servers = write_upstreams
         params.write_servers_str = write_servers_str
 
@@ -198,7 +237,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
         local function monitor_failures(up, _, count)
           if count > params.sentinel_master_maxerrors and not queried then
             logger.infox(rspamd_config, 'sentinel: master with address %s, caused %s failures, try to query sentinel',
-                up:get_addr():to_string(true), count)
+                host:to_string(true), count)
             queried = true -- Avoid multiple checks caused by this monitor
             redis_query_sentinel(ev_base, params, true)
           end
@@ -207,12 +246,6 @@ local function redis_query_sentinel(ev_base, params, initialised)
         write_upstreams:add_watcher('failure', monitor_failures)
       end
     end
-
-    addr:ok()
-  else
-    logger.errx('cannot get data from Redis Sentinel %s: %s',
-        addr:get_addr():to_string(true), result)
-    addr:fail()
   end
 
 end


More information about the Commits mailing list