commit 57d60a6: [Project] Add experimental HTTP statistics backend
Vsevolod Stakhov
vsevolod at rspamd.com
Sat Jun 11 12:42:05 UTC 2022
Author: Vsevolod Stakhov
Date: 2022-06-11 13:24:50 +0100
URL: https://github.com/rspamd/rspamd/commit/57d60a6c7974da4604a2158c407c25251f070d7d (HEAD -> master)
[Project] Add experimental HTTP statistics backend
---
src/libmime/received.cxx | 2 +-
src/libserver/mempool_vars_internal.h | 1 +
src/libstat/CMakeLists.txt | 1 +
src/libstat/backends/backends.h | 5 +-
src/libstat/backends/cdb_backend.cxx | 4 +-
src/libstat/backends/http_backend.cxx | 366 ++++++++++++++++++++++++++++++++++
6 files changed, 374 insertions(+), 5 deletions(-)
diff --git a/src/libmime/received.cxx b/src/libmime/received.cxx
index 82ec2dac0..691d7ca04 100644
--- a/src/libmime/received.cxx
+++ b/src/libmime/received.cxx
@@ -14,11 +14,11 @@
* limitations under the License.
*/
-#include <mempool_vars_internal.h>
#include "config.h"
#include "libserver/url.h"
#include "lua/lua_common.h"
#include "libserver/cfg_file.h"
+#include "libserver/mempool_vars_internal.h"
#include "mime_string.hxx"
#include "smtp_parsers.h"
#include "message.h"
diff --git a/src/libserver/mempool_vars_internal.h b/src/libserver/mempool_vars_internal.h
index 6b68dd5a5..72cf1b095 100644
--- a/src/libserver/mempool_vars_internal.h
+++ b/src/libserver/mempool_vars_internal.h
@@ -41,5 +41,6 @@
#define RSPAMD_MEMPOOL_SPAM_LEARNS "spam_learns"
#define RSPAMD_MEMPOOL_HAM_LEARNS "ham_learns"
#define RSPAMD_MEMPOOL_RE_MAPS_CACHE "re_maps_cache"
+#define RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME "stat_http_runtime"
#endif
diff --git a/src/libstat/CMakeLists.txt b/src/libstat/CMakeLists.txt
index 19962239d..b1df5c1e6 100644
--- a/src/libstat/CMakeLists.txt
+++ b/src/libstat/CMakeLists.txt
@@ -11,6 +11,7 @@ SET(CLASSIFIERSSRC ${CMAKE_CURRENT_SOURCE_DIR}/classifiers/bayes.c
SET(BACKENDSSRC ${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c
${CMAKE_CURRENT_SOURCE_DIR}/backends/sqlite3_backend.c
${CMAKE_CURRENT_SOURCE_DIR}/backends/cdb_backend.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/backends/http_backend.cxx
${CMAKE_CURRENT_SOURCE_DIR}/backends/redis_backend.c)
SET(CACHESSRC ${CMAKE_CURRENT_SOURCE_DIR}/learn_cache/sqlite3_cache.c
diff --git a/src/libstat/backends/backends.h b/src/libstat/backends/backends.h
index 4c0b2276b..afb408a1a 100644
--- a/src/libstat/backends/backends.h
+++ b/src/libstat/backends/backends.h
@@ -84,13 +84,13 @@ struct rspamd_stat_backend {
gboolean learn, gpointer ctx); \
gboolean rspamd_##name##_process_tokens (struct rspamd_task *task, \
GPtrArray *tokens, gint id, \
- gpointer ctx); \
+ gpointer runtime); \
gboolean rspamd_##name##_finalize_process (struct rspamd_task *task, \
gpointer runtime, \
gpointer ctx); \
gboolean rspamd_##name##_learn_tokens (struct rspamd_task *task, \
GPtrArray *tokens, gint id, \
- gpointer ctx); \
+ gpointer runtime); \
gboolean rspamd_##name##_finalize_learn (struct rspamd_task *task, \
gpointer runtime, \
gpointer ctx, GError **err); \
@@ -116,6 +116,7 @@ RSPAMD_STAT_BACKEND_DEF(mmaped_file);
RSPAMD_STAT_BACKEND_DEF(sqlite3);
RSPAMD_STAT_BACKEND_DEF(cdb);
RSPAMD_STAT_BACKEND_DEF(redis);
+RSPAMD_STAT_BACKEND_DEF(http);
#ifdef __cplusplus
}
diff --git a/src/libstat/backends/cdb_backend.cxx b/src/libstat/backends/cdb_backend.cxx
index 15c3d3035..590cddafa 100644
--- a/src/libstat/backends/cdb_backend.cxx
+++ b/src/libstat/backends/cdb_backend.cxx
@@ -377,9 +377,9 @@ gboolean
rspamd_cdb_process_tokens(struct rspamd_task* task,
GPtrArray* tokens,
gint id,
- gpointer ctx)
+ gpointer runtime)
{
- auto *cdbp = CDB_FROM_RAW(ctx);
+ auto *cdbp = CDB_FROM_RAW(runtime);
bool seen_values = false;
for (auto i = 0u; i < tokens->len; i++) {
diff --git a/src/libstat/backends/http_backend.cxx b/src/libstat/backends/http_backend.cxx
new file mode 100644
index 000000000..bd3fd1d48
--- /dev/null
+++ b/src/libstat/backends/http_backend.cxx
@@ -0,0 +1,366 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "stat_internal.h"
+#include "libserver/http/http_connection.h"
+#include "libserver/mempool_vars_internal.h"
+#include "upstream.h"
+#include "contrib/robin-hood/robin_hood.h"
+#include <vector>
+
+namespace rspamd::stat::http {
+
+#define msg_debug_stat_http(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_stat_http_log_id, "stat_http", task->task_pool->tag.uid, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(stat_http)
+
+/* Represents all http backends defined in some configuration */
+class http_backends_collection {
+ std::vector<struct rspamd_statfile *> backends;
+ double timeout = 1.0; /* Default timeout */
+ struct upstream_list *read_servers = nullptr;
+ struct upstream_list *write_servers = nullptr;
+public:
+ static auto get() -> http_backends_collection& {
+ static http_backends_collection *singleton = nullptr;
+
+ if (singleton == nullptr) {
+ singleton = new http_backends_collection;
+ }
+
+ return *singleton;
+ }
+
+ /**
+ * Add a new backend and (optionally initialize the basic backend parameters
+ * @param ctx
+ * @param cfg
+ * @param st
+ * @return
+ */
+ auto add_backend(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool;
+ /**
+ * Remove a statfile cleaning things up if the last statfile is removed
+ * @param st
+ * @return
+ */
+ auto remove_backend(struct rspamd_statfile *st) -> bool;
+
+ upstream *get_upstream(bool is_learn);
+
+private:
+ http_backends_collection() = default;
+ auto first_init(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool;
+};
+
+/*
+ * Created one per each task
+ */
+class http_backend_runtime final {
+public:
+ static auto create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *;
+private:
+ http_backends_collection *all_backends;
+ robin_hood::unordered_flat_map<int, struct rspamd_statfile *> seen_statfiles;
+ struct upstream *selected;
+private:
+ http_backend_runtime(struct rspamd_task *task, bool is_learn) :
+ all_backends(&http_backends_collection::get()) {
+ selected = all_backends->get_upstream(is_learn);
+ }
+ ~http_backend_runtime() = default;
+ static auto dtor(void *p) -> void {
+ ((http_backend_runtime *)p)->~http_backend_runtime();
+ }
+};
+
+auto http_backend_runtime::create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *
+{
+ /* Alloc type provide proper size and alignment */
+ auto *allocated_runtime = rspamd_mempool_alloc_type(task->task_pool, http_backend_runtime);
+
+ rspamd_mempool_add_destructor(task->task_pool, http_backend_runtime::dtor, allocated_runtime);
+
+ return new (allocated_runtime) http_backend_runtime{task, is_learn};
+}
+
+auto
+http_backends_collection::add_backend(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool
+{
+ /* On empty list of backends we know that we need to load backend data actually */
+ if (backends.empty()) {
+ if (!first_init(ctx, cfg, st)) {
+ return false;
+ }
+ }
+
+ backends.push_back(st);
+
+ return true;
+}
+
+auto http_backends_collection::first_init(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool
+{
+ auto try_load_backend_config = [&](const ucl_object_t *obj) -> bool {
+ if (!obj || ucl_object_type(obj) != UCL_OBJECT) {
+ return false;
+ }
+
+ /* First try to load read servers */
+ auto *rs = ucl_object_lookup_any(obj, "read_servers", "servers", nullptr);
+ if (rs) {
+ read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+ if (read_servers == nullptr) {
+ return false;
+ }
+
+ if (!rspamd_upstreams_from_ucl(read_servers, rs, 80, this)) {
+ rspamd_upstreams_destroy(read_servers);
+ return false;
+ }
+ }
+ auto *ws = ucl_object_lookup_any(obj, "write_servers", "servers", nullptr);
+ if (ws) {
+ write_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+ if (write_servers == nullptr) {
+ return false;
+ }
+
+ if (!rspamd_upstreams_from_ucl(write_servers, rs, 80, this)) {
+ rspamd_upstreams_destroy(write_servers);
+ return false;
+ }
+ }
+
+ auto *tim = ucl_object_lookup(obj, "timeout");
+
+ if (tim) {
+ timeout = ucl_object_todouble(tim);
+ }
+
+ return true;
+ };
+
+ auto ret = false;
+ auto obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
+ if (obj != nullptr) {
+ ret = try_load_backend_config(obj);
+ }
+
+ /* Now try statfiles config */
+ if (!ret && st->stcf->opts) {
+ ret = try_load_backend_config(st->stcf->opts);
+ }
+
+ /* Now try classifier config */
+ if (!ret && st->classifier->cfg->opts) {
+ ret = try_load_backend_config(st->classifier->cfg->opts);
+ }
+
+ return ret;
+}
+
+auto http_backends_collection::remove_backend(struct rspamd_statfile *st) -> bool
+{
+ auto backend_it = std::remove(std::begin(backends), std::end(backends), st);
+
+ if (backend_it != std::end(backends)) {
+ /* Fast erasure with no order preservation */
+ std::swap(*backend_it, backends.back());
+ backends.pop_back();
+
+ if (backends.empty()) {
+ /* De-init collection - likely config reload */
+ if (read_servers) {
+ rspamd_upstreams_destroy(read_servers);
+ read_servers = nullptr;
+ }
+
+ if (write_servers) {
+ rspamd_upstreams_destroy(write_servers);
+ write_servers = nullptr;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+upstream *http_backends_collection::get_upstream(bool is_learn)
+{
+ auto *ups_list = read_servers;
+ if (is_learn) {
+ ups_list = write_servers;
+ }
+
+ return rspamd_upstream_get(ups_list, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
+}
+
+}
+
+/* C API */
+
+gpointer
+rspamd_http_init(struct rspamd_stat_ctx* ctx,
+ struct rspamd_config* cfg,
+ struct rspamd_statfile* st)
+{
+ auto &collections = rspamd::stat::http::http_backends_collection::get();
+
+ if (!collections.add_backend(ctx, cfg, st)) {
+ msg_err_config("cannot load http backend");
+
+ return nullptr;
+ }
+
+ return (void *)&collections;
+}
+gpointer
+rspamd_http_runtime(struct rspamd_task* task,
+ struct rspamd_statfile_config* stcf,
+ gboolean learn,
+ gpointer ctx)
+{
+ auto maybe_existing = rspamd_mempool_get_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME);
+
+ if (maybe_existing != nullptr) {
+ return maybe_existing;
+ }
+
+ auto runtime = rspamd::stat::http::http_backend_runtime::create(task, learn);
+
+ if (runtime) {
+ rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME,
+ (void *)runtime, nullptr);
+ }
+
+ return (void *)runtime;
+}
+
+gboolean
+rspamd_http_process_tokens(struct rspamd_task* task,
+ GPtrArray* tokens,
+ gint id,
+ gpointer runtime)
+{
+ auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime;
+
+ if (real_runtime) {
+ /* TODO */
+ return true;
+ }
+
+
+ return false;
+
+}
+gboolean
+rspamd_http_finalize_process(struct rspamd_task* task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* Not needed */
+ return true;
+}
+
+gboolean
+rspamd_http_learn_tokens(struct rspamd_task* task,
+ GPtrArray* tokens,
+ gint id,
+ gpointer runtime)
+{
+ auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime;
+
+ if (real_runtime) {
+ /* TODO */
+ return true;
+ }
+
+
+ return false;
+}
+gboolean
+rspamd_http_finalize_learn(struct rspamd_task* task,
+ gpointer runtime,
+ gpointer ctx,
+ GError** err)
+{
+ return false;
+}
+
+gulong rspamd_http_total_learns(struct rspamd_task* task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return 0;
+}
+gulong
+rspamd_http_inc_learns(struct rspamd_task* task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return 0;
+}
+gulong
+rspamd_http_dec_learns(struct rspamd_task* task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return (gulong)-1;
+}
+gulong
+rspamd_http_learns(struct rspamd_task* task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return 0;
+}
+ucl_object_t*
+rspamd_http_get_stat(gpointer runtime, gpointer ctx)
+{
+ /* TODO */
+ return nullptr;
+}
+gpointer
+rspamd_http_load_tokenizer_config(gpointer runtime, gsize* len)
+{
+ return nullptr;
+}
+void
+rspamd_http_close(gpointer ctx)
+{
+ /* TODO */
+}
\ No newline at end of file
More information about the Commits
mailing list