commit a30ec7c: [Feature] S3: Allow to store structured data in messagepack

Vsevolod Stakhov vsevolod at highsecure.ru
Fri Oct 15 10:49:10 UTC 2021


Author: Vsevolod Stakhov
Date: 2021-10-15 11:43:00 +0100
URL: https://github.com/rspamd/rspamd/commit/a30ec7cc13007a579fff471cbd9e77c57fedd7a1 (HEAD -> master)

[Feature] S3: Allow to store structured data in messagepack

---
 src/plugins/lua/aws_s3.lua | 136 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 97 insertions(+), 39 deletions(-)

diff --git a/src/plugins/lua/aws_s3.lua b/src/plugins/lua/aws_s3.lua
index bdfa4f402..7ec65eb5b 100644
--- a/src/plugins/lua/aws_s3.lua
+++ b/src/plugins/lua/aws_s3.lua
@@ -29,6 +29,8 @@ local settings = {
   s3_secret_key = nil,
   s3_key_id = nil,
   s3_timeout = 10,
+  save_raw = true,
+  save_structure = false,
 }
 
 local settings_schema = ts.shape{
@@ -40,13 +42,11 @@ local settings_schema = ts.shape{
   enabled = ts.boolean:is_optional(),
   fail_action = ts.string:is_optional(),
   zstd_compress = ts.boolean:is_optional(),
+  save_raw = ts.boolean:is_optional(),
+  save_structure = ts.boolean:is_optional(),
 }
 
-local function s3_aws_callback(task)
-  local uri = string.format('https://%s.s3.amazonaws.com', settings.s3_bucket)
-  -- Create a nonce
-  local nonce = rspamd_text.randombytes(16):base32()
-  local queue_id = task:get_queue_id()
+local function raw_data(task, nonce, queue_id)
   local ext, content, content_type
 
   if settings.zstd_compress then
@@ -59,50 +59,108 @@ local function s3_aws_callback(task)
     content_type = 'message/rfc-822'
   end
 
+  local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
+
+  return path, content, content_type
+end
+
+local function structured_data(task, nonce, queue_id)
+  local ext, content, content_type
+  local lua_mime = require "lua_mime"
+  local ucl = require "ucl"
+
+  if settings.zstd_compress then
+    ext = 'msgpack.zst'
+    content = rspamd_util.zstd_compress(ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack'))
+    content_type = 'application/zstd'
+  else
+    ext = 'msgpack'
+    content = ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack')
+    content_type = 'application/msgpack'
+  end
+
+  local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
+
+  return path, content, content_type
+end
+
+local function s3_aws_callback(task)
+  local uri = string.format('https://%s.s3.amazonaws.com', settings.s3_bucket)
+  -- Create a nonce
+  local nonce = rspamd_text.randombytes(16):base32()
+  local queue_id = task:get_queue_id()
   if not queue_id then
     queue_id = rspamd_text.randombytes(8):base32()
   end
-  local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
   -- Hack to pass host
   local aws_host = string.format('%s.s3.amazonaws.com', settings.s3_bucket)
 
-  local hdrs = lua_aws.aws_request_enrich({
-    region = settings.s3_region,
-    headers = {
-      ['Content-Type'] = content_type,
-      ['Host'] = aws_host
-    },
-    uri = path,
-    key_id = settings.s3_key_id,
-    secret_key = settings.s3_secret_key,
-    method = 'PUT',
-  }, content)
-
-  local function s3_http_callback(http_err, code, body, headers)
-
-    if http_err then
-      if settings.fail_action then
-        task:set_pre_result(settings.fail_action,
-            string.format('S3 save failed: %s', http_err), N,
-            nil, nil, 'least')
+  local function gen_s3_http_callback(path)
+    return function (http_err, code, body, headers)
+
+      if http_err then
+        if settings.fail_action then
+          task:set_pre_result(settings.fail_action,
+              string.format('S3 save failed: %s', http_err), N,
+              nil, nil, 'least')
+        end
+        rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
+      else
+        rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path)
       end
-      rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
-    else
-      rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path)
+      lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
+          path, http_err, code, body, headers)
     end
-    lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
-      path, http_err, code, body, headers)
   end
 
-  rspamd_http.request({
-    url = uri .. path,
-    task = task,
-    method = 'PUT',
-    body = content,
-    callback = s3_http_callback,
-    headers = hdrs,
-    timeout = settings.s3_timeout,
-  })
+  if settings.save_raw then
+    local path, content, content_type = raw_data(task, nonce, queue_id)
+    local hdrs = lua_aws.aws_request_enrich({
+      region = settings.s3_region,
+      headers = {
+        ['Content-Type'] = content_type,
+        ['Host'] = aws_host
+      },
+      uri = path,
+      key_id = settings.s3_key_id,
+      secret_key = settings.s3_secret_key,
+      method = 'PUT',
+    }, content)
+    rspamd_http.request({
+      url = uri .. path,
+      task = task,
+      method = 'PUT',
+      body = content,
+      callback = gen_s3_http_callback(path),
+      headers = hdrs,
+      timeout = settings.s3_timeout,
+    })
+  end
+  if settings.save_structure then
+    local path, content, content_type = structured_data(task, nonce, queue_id)
+    local hdrs = lua_aws.aws_request_enrich({
+      region = settings.s3_region,
+      headers = {
+        ['Content-Type'] = content_type,
+        ['Host'] = aws_host
+      },
+      uri = path,
+      key_id = settings.s3_key_id,
+      secret_key = settings.s3_secret_key,
+      method = 'PUT',
+    }, content)
+    rspamd_http.request({
+      url = uri .. path,
+      task = task,
+      method = 'PUT',
+      body = content,
+      callback = gen_s3_http_callback(path),
+      headers = hdrs,
+      timeout = settings.s3_timeout,
+    })
+  end
+
+
 end
 
 local opts = rspamd_config:get_all_opt('aws_s3')


More information about the Commits mailing list