commit bfbb746: [Feature] Elastic: Modernize plugin

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Dec 27 18:28:07 UTC 2018


Author: Vsevolod Stakhov
Date: 2018-12-12 14:34:00 +0000
URL: https://github.com/rspamd/rspamd/commit/bfbb746648ba27c82d62b06425f12386a9606bb3

[Feature] Elastic: Modernize plugin

---
 src/plugins/lua/elastic.lua | 76 ++++++++++++++++++++++++---------------------
 1 file changed, 41 insertions(+), 35 deletions(-)

diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index c7f9f392e..aa3702112 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -20,7 +20,6 @@ local rspamd_http = require "rspamd_http"
 local lua_util = require "lua_util"
 local util = require "rspamd_util"
 local ucl = require "ucl"
-local hash = require "rspamd_cryptobox_hash"
 local rspamd_redis = require "lua_redis"
 local upstream_list = require "rspamd_upstream_list"
 
@@ -30,6 +29,7 @@ end
 
 local rows = {}
 local nrows = 0
+local failed_sends = 0
 local elastic_template
 local redis_params
 local N = "elastic"
@@ -37,7 +37,7 @@ local E = {}
 local connect_prefix = 'http://'
 local enabled = true
 local settings = {
-  limit = 10,
+  limit = 500,
   index_pattern = 'rspamd-%Y.%m.%d',
   template_file = rspamd_paths['PLUGINSDIR'] .. '/elastic/rspamd_template.json',
   kibana_file = rspamd_paths['PLUGINSDIR'] ..'/elastic/kibana.json',
@@ -52,6 +52,7 @@ local settings = {
   user = nil,
   password = nil,
   no_ssl_verify = false,
+  max_fail = 3,
 }
 
 local function read_file(path)
@@ -78,34 +79,7 @@ local function elastic_send_data(task)
 
   local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
   local bulk_json = table.concat(tbl, "\n")
-  local function http_index_data_callback(err, code, body, _)
-    -- todo error handling we may store the rows it into redis and send it again late
-    lua_util.debugm(N, task, "After create data %1", body)
-    if code ~= 200 then
-      rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s (%s)",
-            push_url, err, code)
-      if settings['failover'] then
-        local h = hash.create()
-        h:update(bulk_json)
-        local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
-        local data = util.zstd_compress(bulk_json)
-        local function redis_set_cb(rerr)
-          if rerr ~=nil then
-            rspamd_logger.errx(task, 'redis_set_cb received error: %1', rerr)
-          end
-        end
-        rspamd_redis.make_request(task,
-          redis_params, -- connect params
-          key, -- hash key
-          true, -- is write
-          redis_set_cb, --callback
-          'SETEX', -- command
-          {key, tostring(settings['expire']), data} -- arguments
-        )
-      end
-    end
-  end
-  rspamd_http.request({
+  local err, response = rspamd_http.request({
     url = push_url,
     headers = {
       ['Content-Type'] = 'application/x-ndjson',
@@ -117,11 +91,27 @@ local function elastic_send_data(task)
     no_ssl_verify = settings.no_ssl_verify,
     user = settings.user,
     password = settings.password,
-    callback = http_index_data_callback,
     timeout = settings.timeout,
   })
 
+  if err then
+    rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
+        push_url, err, failed_sends, settings.max_fail)
+  else
+    if response.code ~= 200 then
+      rspamd_logger.infox(task, "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
+          push_url, err, response.code, failed_sends, settings.max_fail)
+    else
+      lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
+          nrows, #bulk_json)
+
+      return true
+    end
+  end
+
+  return false
 end
+
 local function get_general_metadata(task)
   local r = {}
   local ip_addr = task:get_ip()
@@ -175,6 +165,7 @@ local function get_general_metadata(task)
   else
     r.from = 'unknown'
   end
+
   local syminf = task:get_symbols_all()
   r.symbols = syminf
   r.asn = {}
@@ -182,6 +173,7 @@ local function get_general_metadata(task)
   r.asn.country = pool:get_variable("country") or 'unknown'
   r.asn.asn   = pool:get_variable("asn") or 0
   r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
+
   local function process_header(name)
     local hdr = task:get_header_full(name)
     if hdr then
@@ -216,9 +208,22 @@ local function elastic_collect(task)
   table.insert(rows, row)
   nrows = nrows + 1
   if nrows > settings['limit'] then
-    elastic_send_data(task)
-    nrows = 0
-    rows = {}
+    lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
+    if elastic_send_data(task) then
+      nrows = 0
+      rows = {}
+      failed_sends = 0;
+    else
+      failed_sends = failed_sends + 1
+
+      if failed_sends > settings.max_fail then
+        rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
+            nrows, failed_sends)
+        nrows = 0
+        rows = {}
+        failed_sends = 0;
+      end
+    end
   end
 end
 
@@ -246,6 +251,7 @@ local function check_elastic_server(cfg, ev_base, _)
         for _,plugin in pairs(value['plugins']) do
           if plugin['name'] == 'ingest-geoip' then
             plugin_found = true
+            lua_util.debugm(N, "ingest-geoip plugin has been found")
           end
         end
         if not plugin_found then
@@ -276,7 +282,7 @@ end
 
 -- import ingest pipeline and kibana dashboard/visualization
 local function initial_setup(cfg, ev_base, worker)
-  if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+  if not worker:is_primary_controller() then return end
 
   local upstream = settings.upstream:get_upstream_round_robin()
   local ip_addr = upstream:get_addr():to_string(true)


More information about the Commits mailing list