commit a30ec7c: [Feature] S3: Allow to store structured data in messagepack
Vsevolod Stakhov
vsevolod at highsecure.ru
Fri Oct 15 10:49:10 UTC 2021
Author: Vsevolod Stakhov
Date: 2021-10-15 11:43:00 +0100
URL: https://github.com/rspamd/rspamd/commit/a30ec7cc13007a579fff471cbd9e77c57fedd7a1 (HEAD -> master)
[Feature] S3: Allow to store structured data in messagepack
---
src/plugins/lua/aws_s3.lua | 136 ++++++++++++++++++++++++++++++++-------------
1 file changed, 97 insertions(+), 39 deletions(-)
diff --git a/src/plugins/lua/aws_s3.lua b/src/plugins/lua/aws_s3.lua
index bdfa4f402..7ec65eb5b 100644
--- a/src/plugins/lua/aws_s3.lua
+++ b/src/plugins/lua/aws_s3.lua
@@ -29,6 +29,8 @@ local settings = {
s3_secret_key = nil,
s3_key_id = nil,
s3_timeout = 10,
+ save_raw = true,
+ save_structure = false,
}
local settings_schema = ts.shape{
@@ -40,13 +42,11 @@ local settings_schema = ts.shape{
enabled = ts.boolean:is_optional(),
fail_action = ts.string:is_optional(),
zstd_compress = ts.boolean:is_optional(),
+ save_raw = ts.boolean:is_optional(),
+ save_structure = ts.boolean:is_optional(),
}
-local function s3_aws_callback(task)
- local uri = string.format('https://%s.s3.amazonaws.com', settings.s3_bucket)
- -- Create a nonce
- local nonce = rspamd_text.randombytes(16):base32()
- local queue_id = task:get_queue_id()
+local function raw_data(task, nonce, queue_id)
local ext, content, content_type
if settings.zstd_compress then
@@ -59,50 +59,108 @@ local function s3_aws_callback(task)
content_type = 'message/rfc-822'
end
+ local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
+
+ return path, content, content_type
+end
+
+local function structured_data(task, nonce, queue_id)
+ local ext, content, content_type
+ local lua_mime = require "lua_mime"
+ local ucl = require "ucl"
+
+ if settings.zstd_compress then
+ ext = 'msgpack.zst'
+ content = rspamd_util.zstd_compress(ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack'))
+ content_type = 'application/zstd'
+ else
+ ext = 'msgpack'
+ content = ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack')
+ content_type = 'application/msgpack'
+ end
+
+ local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
+
+ return path, content, content_type
+end
+
+local function s3_aws_callback(task)
+ local uri = string.format('https://%s.s3.amazonaws.com', settings.s3_bucket)
+ -- Create a nonce
+ local nonce = rspamd_text.randombytes(16):base32()
+ local queue_id = task:get_queue_id()
if not queue_id then
queue_id = rspamd_text.randombytes(8):base32()
end
- local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
-- Hack to pass host
local aws_host = string.format('%s.s3.amazonaws.com', settings.s3_bucket)
- local hdrs = lua_aws.aws_request_enrich({
- region = settings.s3_region,
- headers = {
- ['Content-Type'] = content_type,
- ['Host'] = aws_host
- },
- uri = path,
- key_id = settings.s3_key_id,
- secret_key = settings.s3_secret_key,
- method = 'PUT',
- }, content)
-
- local function s3_http_callback(http_err, code, body, headers)
-
- if http_err then
- if settings.fail_action then
- task:set_pre_result(settings.fail_action,
- string.format('S3 save failed: %s', http_err), N,
- nil, nil, 'least')
+ local function gen_s3_http_callback(path)
+ return function (http_err, code, body, headers)
+
+ if http_err then
+ if settings.fail_action then
+ task:set_pre_result(settings.fail_action,
+ string.format('S3 save failed: %s', http_err), N,
+ nil, nil, 'least')
+ end
+ rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
+ else
+ rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path)
end
- rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
- else
- rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path)
+ lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
+ path, http_err, code, body, headers)
end
- lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
- path, http_err, code, body, headers)
end
- rspamd_http.request({
- url = uri .. path,
- task = task,
- method = 'PUT',
- body = content,
- callback = s3_http_callback,
- headers = hdrs,
- timeout = settings.s3_timeout,
- })
+ if settings.save_raw then
+ local path, content, content_type = raw_data(task, nonce, queue_id)
+ local hdrs = lua_aws.aws_request_enrich({
+ region = settings.s3_region,
+ headers = {
+ ['Content-Type'] = content_type,
+ ['Host'] = aws_host
+ },
+ uri = path,
+ key_id = settings.s3_key_id,
+ secret_key = settings.s3_secret_key,
+ method = 'PUT',
+ }, content)
+ rspamd_http.request({
+ url = uri .. path,
+ task = task,
+ method = 'PUT',
+ body = content,
+ callback = gen_s3_http_callback(path),
+ headers = hdrs,
+ timeout = settings.s3_timeout,
+ })
+ end
+ if settings.save_structure then
+ local path, content, content_type = structured_data(task, nonce, queue_id)
+ local hdrs = lua_aws.aws_request_enrich({
+ region = settings.s3_region,
+ headers = {
+ ['Content-Type'] = content_type,
+ ['Host'] = aws_host
+ },
+ uri = path,
+ key_id = settings.s3_key_id,
+ secret_key = settings.s3_secret_key,
+ method = 'PUT',
+ }, content)
+ rspamd_http.request({
+ url = uri .. path,
+ task = task,
+ method = 'PUT',
+ body = content,
+ callback = gen_s3_http_callback(path),
+ headers = hdrs,
+ timeout = settings.s3_timeout,
+ })
+ end
+
+
end
local opts = rspamd_config:get_all_opt('aws_s3')
More information about the Commits
mailing list