commit 75f1f90: [Feature] Aws_s3: Allow to store large parts separately
Vsevolod Stakhov
vsevolod at highsecure.ru
Wed Dec 8 15:00:04 UTC 2021
Author: Vsevolod Stakhov
Date: 2021-12-08 14:57:57 +0000
URL: https://github.com/rspamd/rspamd/commit/75f1f90326f395b3726257e5b14c9f4c1bb6a41b (HEAD -> master)
[Feature] Aws_s3: Allow to store large parts separately
---
src/plugins/lua/aws_s3.lua | 71 +++++++++++++++++++++++++++++++++++++++-------
1 file changed, 61 insertions(+), 10 deletions(-)
diff --git a/src/plugins/lua/aws_s3.lua b/src/plugins/lua/aws_s3.lua
index 1a7873086..ec45dca7c 100644
--- a/src/plugins/lua/aws_s3.lua
+++ b/src/plugins/lua/aws_s3.lua
@@ -32,6 +32,7 @@ local settings = {
s3_timeout = 10,
save_raw = true,
save_structure = false,
+ inline_content_limit = nil,
}
local settings_schema = ts.shape{
@@ -46,6 +47,7 @@ local settings_schema = ts.shape{
zstd_compress = ts.boolean:is_optional(),
save_raw = ts.boolean:is_optional(),
save_structure = ts.boolean:is_optional(),
+ inline_content_limit = (ts.integer + ts.string / tonumber):is_optional(),
}
local function raw_data(task, nonce, queue_id)
@@ -66,24 +68,61 @@ local function raw_data(task, nonce, queue_id)
return path, content, content_type
end
+local function gen_ext()
+ local ext = 'msgpack'
+ if settings.zstd_compress then
+ ext = 'msgpack.zst'
+ end
+
+ return ext
+end
+
+local function convert_to_ref(task, nonce, queue_id, part, external_refs)
+ local path = string.format('/%s-%s-%s.%s', queue_id, nonce,
+ rspamd_text.randombytes(8):base32(), gen_ext())
+ local content = part.content
+
+ if settings.zstd_compress then
+ external_refs[path] = rspamd_util.zstd_compress(content)
+ else
+ external_refs[path] = content
+ end
+
+ part.content = nil
+ part.content_path = path
+
+ return path
+end
+
local function structured_data(task, nonce, queue_id)
- local ext, content, content_type
+ local content, content_type
+ local external_refs = {}
local lua_mime = require "lua_mime"
local ucl = require "ucl"
+ local message_split = lua_mime.message_to_ucl(task)
+
+ if settings.inline_content_limit and settings.inline_content_limit > 0 then
+ for i,part in ipairs(message_split.parts() or {}) do
+ if part.content and #part.content >= settings.inline_content_limit then
+ local ref = convert_to_ref(task, nonce, queue_id, part, external_refs)
+ lua_util.debugm(N, task, "convert part number %s to a reference %s",
+ i, ref)
+ end
+ end
+ end
+
if settings.zstd_compress then
- ext = 'msgpack.zst'
content = rspamd_util.zstd_compress(ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack'))
content_type = 'application/zstd'
else
- ext = 'msgpack'
content = ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack')
content_type = 'application/msgpack'
end
- local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
+ local path = string.format('/%s-%s.%s', queue_id, nonce, gen_ext())
- return path, content, content_type
+ return path, content, content_type, external_refs
end
local function s3_aws_callback(task)
@@ -97,7 +136,7 @@ local function s3_aws_callback(task)
-- Hack to pass host
local aws_host = string.format('%s.%s', settings.s3_bucket, settings.s3_host)
- local function gen_s3_http_callback(path)
+ local function gen_s3_http_callback(path, what)
return function (http_err, code, body, headers)
if http_err then
@@ -108,7 +147,7 @@ local function s3_aws_callback(task)
end
rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
else
- rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path)
+ rspamd_logger.messagex(task, 'saved %s successfully in S3 object %s', what, path)
end
lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
path, http_err, code, body, headers)
@@ -133,13 +172,13 @@ local function s3_aws_callback(task)
task = task,
method = 'PUT',
body = content,
- callback = gen_s3_http_callback(path),
+ callback = gen_s3_http_callback(path, 'raw message'),
headers = hdrs,
timeout = settings.s3_timeout,
})
end
if settings.save_structure then
- local path, content, content_type = structured_data(task, nonce, queue_id)
+ local path, content, content_type, external_refs = structured_data(task, nonce, queue_id)
local hdrs = lua_aws.aws_request_enrich({
region = settings.s3_region,
headers = {
@@ -156,10 +195,22 @@ local function s3_aws_callback(task)
task = task,
method = 'PUT',
body = content,
- callback = gen_s3_http_callback(path),
+ callback = gen_s3_http_callback(path, 'structured message'),
headers = hdrs,
timeout = settings.s3_timeout,
})
+
+ for _,ref in ipairs(external_refs) do
+ rspamd_http.request({
+ url = uri .. ref,
+ task = task,
+ method = 'PUT',
+ body = content,
+ callback = gen_s3_http_callback(ref, 'part content'),
+ headers = hdrs,
+ timeout = settings.s3_timeout,
+ })
+ end
end
More information about the Commits
mailing list