commit a092f57: [Feature] Lua_clickhouse: Add optional row callback for large selections

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Nov 30 14:35:07 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-11-30 14:12:58 +0000
URL: https://github.com/rspamd/rspamd/commit/a092f57efaf01e16fcb090d5b3b610f0dc4f7180

[Feature] Lua_clickhouse: Add optional row callback for large selections

---
 lualib/lua_clickhouse.lua | 24 +++++++++++++++---------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua
index 09989131a..e0d4c5618 100644
--- a/lualib/lua_clickhouse.lua
+++ b/lualib/lua_clickhouse.lua
@@ -95,7 +95,7 @@ end
 exports.row_to_tsv = row_to_tsv
 
 -- Parses JSONEachRow reply from CH
-local function parse_clickhouse_response_json_eachrow(params, data)
+local function parse_clickhouse_response_json_eachrow(params, data, row_cb)
   local ucl = require "ucl"
 
   if data == nil then
@@ -125,7 +125,11 @@ local function parse_clickhouse_response_json_eachrow(params, data)
     if plain_row and #plain_row > 1 then
       local parsed_row = parse_string(plain_row)
       if parsed_row then
-        table.insert(parsed_rows, parsed_row)
+        if row_cb then
+          row_cb(parsed_row)
+        else
+          table.insert(parsed_rows, parsed_row)
+        end
       end
     end
   end
@@ -169,7 +173,7 @@ local function parse_clickhouse_response_json(params, data)
 end
 
 -- Helper to generate HTTP closure
-local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
+local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
   local function http_cb(err_message, code, data, _)
     if code ~= 200 or err_message then
       if not err_message then err_message = data end
@@ -185,7 +189,7 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
       upstream:fail()
     else
       upstream:ok()
-      local rows = parse_clickhouse_response_json_eachrow(params, data)
+      local rows = parse_clickhouse_response_json_eachrow(params, data, row_cb)
 
       if rows then
         if ok_cb then
@@ -264,16 +268,17 @@ end
 -- @param {string} query select query (passed in HTTP body)
 -- @param {function} ok_cb callback to be called in case of success
 -- @param {function} fail_cb callback to be called in case of some error
+-- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion)
 -- @return {boolean} whether a connection was successful
 -- @example
 --
 --]]
-exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
+exports.select = function (upstream, settings, params, query, ok_cb, fail_cb, row_cb)
   local http_params = {}
 
   for k,v in pairs(params) do http_params[k] = v end
 
-  http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
+  http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb, row_cb)
   http_params.gzip = settings.use_gzip
   http_params.mime_type = 'text/plain'
   http_params.timeout = settings.timeout or default_timeout
@@ -302,7 +307,7 @@ end
 
 --[[[
 -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
-      ok_cb, fail_cb)
+      ok_cb, fail_cb, row_cb)
 -- Make select request to clickhouse
 -- @param {upstream} upstream clickhouse server upstream
 -- @param {table} settings global settings table:
@@ -315,13 +320,14 @@ end
 -- @param {string} query select query (passed in HTTP body)
 -- @param {function} ok_cb callback to be called in case of success
 -- @param {function} fail_cb callback to be called in case of some error
+-- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion)
 -- @return
 --          {string} error message if exists
 --          nil | {rows} | {http_response}
 -- @example
 --
 --]]
-exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
+exports.select_sync = function (upstream, settings, params, query, row_cb)
   local http_params = {}
 
   for k,v in pairs(params) do http_params[k] = v end
@@ -357,7 +363,7 @@ exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_c
     return response.content, response
   else
     lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
-    local rows = parse_clickhouse_response_json_eachrow(params, response.content)
+    local rows = parse_clickhouse_response_json_eachrow(params, response.content, row_cb)
     return nil, rows
   end
 end


More information about the Commits mailing list