commit 6c18baf: [Fix] Lua_clickhouse: Fix CH errors processing

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Apr 15 19:28:06 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-04-15 17:32:44 +0100
URL: https://github.com/rspamd/rspamd/commit/6c18baf8309664f5cafb3d61fa2f2376f4347cd6

[Fix] Lua_clickhouse: Fix CH errors processing

---
 lualib/lua_clickhouse.lua | 65 ++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 59 insertions(+), 6 deletions(-)

diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua
index 96ea59f02..7064ac925 100644
--- a/lualib/lua_clickhouse.lua
+++ b/lualib/lua_clickhouse.lua
@@ -78,7 +78,7 @@ local function row_to_tsv(row)
 end
 
 -- Parses JSONEachRow reply from CH
-local function parse_clickhouse_response(params, data)
+local function parse_clickhouse_response_json_eachrow(params, data)
   local ucl = require "ucl"
 
   if data == nil then
@@ -111,6 +111,38 @@ local function parse_clickhouse_response(params, data)
   return parsed_rows
 end
 
+-- Parses JSON reply from CH
+local function parse_clickhouse_response_json(params, data)
+  local ucl = require "ucl"
+
+  if data == nil then
+    -- clickhouse returned no data (i.e. empty result set): exiting
+    return 'no data', {}
+  end
+
+  if data:match('DB::Exception') then
+    return data, {}
+  end
+
+  local function parse_string(s)
+    local parser = ucl.parser()
+    local res, err = parser:parse_string(s)
+    if not res then
+      rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
+      return nil
+    end
+    return parser:get_object()
+  end
+
+  local json = parse_string(data)
+
+  if not json then
+    return 'bad json', {}
+  end
+
+  return parsed_rows
+end
+
 -- Helper to generate HTTP closure
 local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
   local function http_cb(err_message, code, data, _)
@@ -128,7 +160,7 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
       upstream:fail()
     else
       upstream:ok()
-      local rows = parse_clickhouse_response(params, data)
+      local rows = parse_clickhouse_response_json_eachrow(params, data)
 
       if rows then
         if ok_cb then
@@ -173,7 +205,14 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
       upstream:ok()
 
       if ok_cb then
-        ok_cb(params, data)
+        local err,parsed = parse_clickhouse_response_json(data)
+
+        if err then
+          fail_cb(params, err, data)
+        else
+          ok_cb(params, parsed)
+        end
+
       else
         lua_util.debugm(N, params.log_obj,
             "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
@@ -291,7 +330,7 @@ exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_c
     return response.content, response
   else
     lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
-    local rows = parse_clickhouse_response(params, response.content)
+    local rows = parse_clickhouse_response_json_eachrow(params, response.content)
     return nil, rows
   end
 end
@@ -440,11 +479,25 @@ exports.generic_sync = function (upstream, settings, params, query)
     end
     local ip_addr = upstream:get_addr():to_string(true)
     local database = settings.database or 'default'
-    http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
+    http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
         connect_prefix, ip_addr, escape_spaces(database))
   end
 
-  return rspamd_http.request(http_params)
+  local err, response = rspamd_http.request(http_params)
+
+  if err then
+    return err, nil
+  elseif response.code ~= 200 then
+    return response.content, response
+  else
+    lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
+    local e,obj = parse_clickhouse_response_json(params, response.content)
+
+    if e then
+      return e,nil
+    end
+    return nil, obj
+  end
 end
 
 return exports
\ No newline at end of file


More information about the Commits mailing list