commit 776d104: [Feature] Dmarc_report: allow sending reports in batches

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Aug 5 14:56:04 UTC 2021


Author: Vsevolod Stakhov
Date: 2021-08-05 15:53:00 +0100
URL: https://github.com/rspamd/rspamd/commit/776d1048a93c9bb609ee4ae918eaf513ea8d7a09 (HEAD -> master)

[Feature] Dmarc_report: allow sending reports in batches

---
 lualib/rspamadm/dmarc_report.lua | 105 ++++++++++++++++++++++++++-------------
 1 file changed, 70 insertions(+), 35 deletions(-)

diff --git a/lualib/rspamadm/dmarc_report.lua b/lualib/rspamadm/dmarc_report.lua
index 8894f23e3..fcc4f5e7f 100644
--- a/lualib/rspamadm/dmarc_report.lua
+++ b/lualib/rspamadm/dmarc_report.lua
@@ -49,6 +49,11 @@ parser:argument "date"
        :description "Date to process (today by default)"
        :argname "<DDMMYYYY>"
        :args "*"
+parser:option "-b --batch-size"
+      :description "Send reports in batches up to <batch-size> messages"
+      :argname "<number>"
+      :convert(tonumber)
+      :default "10"
 
 local report_template = [[From: "{= from_name =}" <{= from_addr =}>
 To: {= rcpt =}
@@ -396,44 +401,66 @@ local function rcpt_list(tbl, func)
 end
 
 -- Synchronous smtp send function
-local function send_reports_by_smtp(opts, reports)
+local function send_reports_by_smtp(opts, reports, finish_cb)
   local lua_smtp = require "lua_smtp"
-  local reports_remaining = #reports
   local reports_failed = 0
+  local reports_sent = 0
   local report_settings = dmarc_settings.reporting
 
-  local function gen_sendmail_cb(report)
+  local function gen_sendmail_cb(report, args)
     return function(ret, err)
-      reports_remaining = reports_remaining - 1
+      -- We modify this from all callbacks
+      args.nreports = args.nreports - 1
       if not ret then
         logger.errx("Couldn't send mail for %s: %s", report.reporting_domain, err)
         reports_failed = reports_failed + 1
       else
+        reports_sent = reports_sent + 1
         lua_util.debugm(N, 'successfully sent a report for %s: %s bytes sent',
             report.reporting_domain, #report.message)
       end
+
+      -- Tail call to the next batch or to the final function
+      if args.nreports == 0 then
+        if args.next_start > #reports then
+          finish_cb(reports_sent, reports_failed)
+        else
+          args.cont_func(args.next_start)
+        end
+      end
     end
   end
 
-  for _,report in ipairs(reports) do
-    local ret = lua_smtp.sendmail({
-      ev_base = rspamadm_ev_base,
-      session = rspamadm_session,
-      config = rspamd_config,
-      host = report_settings.smtp,
-      port = report_settings.smtp_port or 25,
-      resolver = rspamadm_dns_resolver,
-      from = report_settings.email,
-      recipients = report.rcpts,
-      helo = report_settings.helo or 'rspamd.localhost',
-    }, report.message, gen_sendmail_cb(report))
-
-    if ret then
-      reports_remaining = reports_remaining + 1
+  local function send_data_in_batches(cur_batch)
+    local nreports = math.min(#reports - cur_batch + 1, opts.batch_size)
+    local next_start = cur_batch + nreports
+    lua_util.debugm(N, 'send data for %s domains (from %s to %s)',
+        nreports, cur_batch, next_start-1)
+    -- Shared across all closures
+    local gen_args = {
+      cont_func = send_data_in_batches,
+      nreports = nreports,
+      next_start = next_start
+    }
+    for i=cur_batch,next_start-1 do
+      local report = reports[i]
+      lua_smtp.sendmail({
+        ev_base = rspamadm_ev_base,
+        session = rspamadm_session,
+        config = rspamd_config,
+        host = report_settings.smtp,
+        port = report_settings.smtp_port or 25,
+        resolver = rspamadm_dns_resolver,
+        from = report_settings.email,
+        recipients = report.rcpts,
+        helo = report_settings.helo or 'rspamd.localhost',
+      },
+          report.message,
+          gen_sendmail_cb(report, gen_args))
     end
   end
 
-  return reports_remaining
+  send_data_in_batches(1)
 end
 
 local function prepare_report(opts, start_time, rep_key)
@@ -546,7 +573,7 @@ local function process_report_date(opts, start_time, date)
 
   if not ret or not results or results == 0 then
     logger.messagex('No reports for %s', date)
-    return 0
+    return {}
   end
 
   -- Rename index key to avoid races
@@ -565,7 +592,7 @@ local function process_report_date(opts, start_time, date)
           {'DEL', idx_key})
     end
     logger.messagex('Cannot get reports for %s', date)
-    return 0
+    return {}
   end
 
   local reports = {}
@@ -585,7 +612,7 @@ local function process_report_date(opts, start_time, date)
         {'DEL', idx_key})
   end
 
-  return send_reports_by_smtp(opts, reports)
+  return reports
 end
 
 local function handler(args)
@@ -638,26 +665,34 @@ local function handler(args)
 
   local ndates = 0
   local nreports = 0
+  local all_reports = {}
   for _,date in ipairs(opts.date) do
     lua_util.debugm(N, 'Process date %s', date)
-    local nproc = process_report_date(opts, start_time, date)
-    if nproc > 0 then
+    local reports_for_date = process_report_date(opts, start_time, date)
+    if #reports_for_date > 0 then
       ndates = ndates + 1
-      nreports = nreports + nproc
+      nreports = nreports + #reports_for_date
+
+      for _,r in ipairs(reports_for_date) do
+        table.insert(all_reports, r)
+      end
     end
   end
 
-  if not opts.no_opt then
-    lua_util.debugm(N, 'set last report date to %s', os.time())
-    lua_redis.request(redis_params, redis_attrs,
-        {'SETEX', 'rspamd_dmarc_last_collection', dmarc_settings.reporting.keys_expire,
-         tostring(os.time())})
-  end
+  local function finish_cb(nsuccess, nfail)
+    if not opts.no_opt then
+      lua_util.debugm(N, 'set last report date to %s', os.time())
+      lua_redis.request(redis_params, redis_attrs,
+          {'SETEX', 'rspamd_dmarc_last_collection', dmarc_settings.reporting.keys_expire,
+           tostring(os.time())})
+    end
 
-  logger.messagex('Reporting collection has finished %s dates processed, %s reports',
-      ndates, nreports)
+    logger.messagex('Reporting collection has finished %s dates processed, %s reports: %s completed, %s failed',
+        ndates, nreports, nsuccess, nfail)
 
-  pool:destroy()
+    pool:destroy()
+  end
+  send_reports_by_smtp(opts, all_reports, finish_cb)
 end
 
 return {


More information about the Commits mailing list