commit 57d60a6: [Project] Add experimental HTTP statistics backend

Vsevolod Stakhov vsevolod at rspamd.com
Sat Jun 11 12:42:05 UTC 2022


Author: Vsevolod Stakhov
Date: 2022-06-11 13:24:50 +0100
URL: https://github.com/rspamd/rspamd/commit/57d60a6c7974da4604a2158c407c25251f070d7d (HEAD -> master)

[Project] Add experimental HTTP statistics backend

---
 src/libmime/received.cxx              |   2 +-
 src/libserver/mempool_vars_internal.h |   1 +
 src/libstat/CMakeLists.txt            |   1 +
 src/libstat/backends/backends.h       |   5 +-
 src/libstat/backends/cdb_backend.cxx  |   4 +-
 src/libstat/backends/http_backend.cxx | 366 ++++++++++++++++++++++++++++++++++
 6 files changed, 374 insertions(+), 5 deletions(-)

diff --git a/src/libmime/received.cxx b/src/libmime/received.cxx
index 82ec2dac0..691d7ca04 100644
--- a/src/libmime/received.cxx
+++ b/src/libmime/received.cxx
@@ -14,11 +14,11 @@
  * limitations under the License.
  */
 
-#include <mempool_vars_internal.h>
 #include "config.h"
 #include "libserver/url.h"
 #include "lua/lua_common.h"
 #include "libserver/cfg_file.h"
+#include "libserver/mempool_vars_internal.h"
 #include "mime_string.hxx"
 #include "smtp_parsers.h"
 #include "message.h"
diff --git a/src/libserver/mempool_vars_internal.h b/src/libserver/mempool_vars_internal.h
index 6b68dd5a5..72cf1b095 100644
--- a/src/libserver/mempool_vars_internal.h
+++ b/src/libserver/mempool_vars_internal.h
@@ -41,5 +41,6 @@
 #define RSPAMD_MEMPOOL_SPAM_LEARNS "spam_learns"
 #define RSPAMD_MEMPOOL_HAM_LEARNS "ham_learns"
 #define RSPAMD_MEMPOOL_RE_MAPS_CACHE "re_maps_cache"
+#define RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME "stat_http_runtime"
 
 #endif
diff --git a/src/libstat/CMakeLists.txt b/src/libstat/CMakeLists.txt
index 19962239d..b1df5c1e6 100644
--- a/src/libstat/CMakeLists.txt
+++ b/src/libstat/CMakeLists.txt
@@ -11,6 +11,7 @@ SET(CLASSIFIERSSRC	${CMAKE_CURRENT_SOURCE_DIR}/classifiers/bayes.c
 SET(BACKENDSSRC 	${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c
 					${CMAKE_CURRENT_SOURCE_DIR}/backends/sqlite3_backend.c
 					${CMAKE_CURRENT_SOURCE_DIR}/backends/cdb_backend.cxx
+					${CMAKE_CURRENT_SOURCE_DIR}/backends/http_backend.cxx
 					${CMAKE_CURRENT_SOURCE_DIR}/backends/redis_backend.c)
 
 SET(CACHESSRC 	${CMAKE_CURRENT_SOURCE_DIR}/learn_cache/sqlite3_cache.c
diff --git a/src/libstat/backends/backends.h b/src/libstat/backends/backends.h
index 4c0b2276b..afb408a1a 100644
--- a/src/libstat/backends/backends.h
+++ b/src/libstat/backends/backends.h
@@ -84,13 +84,13 @@ struct rspamd_stat_backend {
                 gboolean learn, gpointer ctx); \
         gboolean rspamd_##name##_process_tokens (struct rspamd_task *task, \
                 GPtrArray *tokens, gint id, \
-                gpointer ctx); \
+                gpointer runtime); \
         gboolean rspamd_##name##_finalize_process (struct rspamd_task *task, \
                 gpointer runtime, \
                 gpointer ctx); \
         gboolean rspamd_##name##_learn_tokens (struct rspamd_task *task, \
                 GPtrArray *tokens, gint id, \
-                gpointer ctx); \
+                gpointer runtime); \
         gboolean rspamd_##name##_finalize_learn (struct rspamd_task *task, \
                 gpointer runtime, \
                 gpointer ctx, GError **err); \
@@ -116,6 +116,7 @@ RSPAMD_STAT_BACKEND_DEF(mmaped_file);
 RSPAMD_STAT_BACKEND_DEF(sqlite3);
 RSPAMD_STAT_BACKEND_DEF(cdb);
 RSPAMD_STAT_BACKEND_DEF(redis);
+RSPAMD_STAT_BACKEND_DEF(http);
 
 #ifdef  __cplusplus
 }
diff --git a/src/libstat/backends/cdb_backend.cxx b/src/libstat/backends/cdb_backend.cxx
index 15c3d3035..590cddafa 100644
--- a/src/libstat/backends/cdb_backend.cxx
+++ b/src/libstat/backends/cdb_backend.cxx
@@ -377,9 +377,9 @@ gboolean
 rspamd_cdb_process_tokens(struct rspamd_task* task,
 								   GPtrArray* tokens,
 								   gint id,
-								   gpointer ctx)
+								   gpointer runtime)
 {
-	auto *cdbp = CDB_FROM_RAW(ctx);
+	auto *cdbp = CDB_FROM_RAW(runtime);
 	bool seen_values = false;
 
 	for (auto i = 0u; i < tokens->len; i++) {
diff --git a/src/libstat/backends/http_backend.cxx b/src/libstat/backends/http_backend.cxx
new file mode 100644
index 000000000..bd3fd1d48
--- /dev/null
+++ b/src/libstat/backends/http_backend.cxx
@@ -0,0 +1,366 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "stat_internal.h"
+#include "libserver/http/http_connection.h"
+#include "libserver/mempool_vars_internal.h"
+#include "upstream.h"
+#include "contrib/robin-hood/robin_hood.h"
+#include <vector>
+
+namespace rspamd::stat::http {
+
+#define msg_debug_stat_http(...)  rspamd_conditional_debug_fast (NULL, NULL, \
+        rspamd_stat_http_log_id, "stat_http", task->task_pool->tag.uid, \
+        RSPAMD_LOG_FUNC, \
+        __VA_ARGS__)
+
+INIT_LOG_MODULE(stat_http)
+
+/* Represents all http backends defined in some configuration */
+class http_backends_collection {
+	std::vector<struct rspamd_statfile *> backends;
+	double timeout = 1.0; /* Default timeout */
+	struct upstream_list *read_servers = nullptr;
+	struct upstream_list *write_servers = nullptr;
+public:
+	static auto get() -> http_backends_collection& {
+		static http_backends_collection *singleton = nullptr;
+
+		if (singleton == nullptr) {
+			singleton = new http_backends_collection;
+		}
+
+		return *singleton;
+	}
+
+	/**
+	 * Add a new backend and (optionally initialize the basic backend parameters
+	 * @param ctx
+	 * @param cfg
+	 * @param st
+	 * @return
+	 */
+	auto add_backend(struct rspamd_stat_ctx *ctx,
+					 struct rspamd_config *cfg,
+					 struct rspamd_statfile *st) -> bool;
+	/**
+	 * Remove a statfile cleaning things up if the last statfile is removed
+	 * @param st
+	 * @return
+	 */
+	auto remove_backend(struct rspamd_statfile *st) -> bool;
+
+	upstream *get_upstream(bool is_learn);
+
+private:
+	http_backends_collection() = default;
+	auto first_init(struct rspamd_stat_ctx *ctx,
+					struct rspamd_config *cfg,
+					struct rspamd_statfile *st) -> bool;
+};
+
+/*
+ * Created one per each task
+ */
+class http_backend_runtime final {
+public:
+	static auto create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *;
+private:
+	http_backends_collection *all_backends;
+	robin_hood::unordered_flat_map<int, struct rspamd_statfile *> seen_statfiles;
+	struct upstream *selected;
+private:
+	http_backend_runtime(struct rspamd_task *task, bool is_learn) :
+			all_backends(&http_backends_collection::get()) {
+		selected = all_backends->get_upstream(is_learn);
+	}
+	~http_backend_runtime() = default;
+	static auto dtor(void *p) -> void {
+		((http_backend_runtime *)p)->~http_backend_runtime();
+	}
+};
+
+auto http_backend_runtime::create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *
+{
+	/* Alloc type provide proper size and alignment */
+	auto *allocated_runtime = rspamd_mempool_alloc_type(task->task_pool, http_backend_runtime);
+
+	rspamd_mempool_add_destructor(task->task_pool, http_backend_runtime::dtor, allocated_runtime);
+
+	return new (allocated_runtime) http_backend_runtime{task, is_learn};
+}
+
+auto
+http_backends_collection::add_backend(struct rspamd_stat_ctx *ctx,
+									  struct rspamd_config *cfg,
+									  struct rspamd_statfile *st) -> bool
+{
+	/* On empty list of backends we know that we need to load backend data actually */
+	if (backends.empty()) {
+		if (!first_init(ctx, cfg, st)) {
+			return false;
+		}
+	}
+
+	backends.push_back(st);
+
+	return true;
+}
+
+auto http_backends_collection::first_init(struct rspamd_stat_ctx *ctx,
+										  struct rspamd_config *cfg,
+										  struct rspamd_statfile *st) -> bool
+{
+	auto try_load_backend_config = [&](const ucl_object_t *obj) -> bool {
+		if (!obj || ucl_object_type(obj) != UCL_OBJECT) {
+			return false;
+		}
+
+		/* First try to load read servers */
+		auto *rs = ucl_object_lookup_any(obj, "read_servers", "servers", nullptr);
+		if (rs) {
+			read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+			if (read_servers == nullptr) {
+				return false;
+			}
+
+			if (!rspamd_upstreams_from_ucl(read_servers, rs, 80, this)) {
+				rspamd_upstreams_destroy(read_servers);
+				return false;
+			}
+		}
+		auto *ws = ucl_object_lookup_any(obj, "write_servers", "servers", nullptr);
+		if (ws) {
+			write_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+			if (write_servers == nullptr) {
+				return false;
+			}
+
+			if (!rspamd_upstreams_from_ucl(write_servers, rs, 80, this)) {
+				rspamd_upstreams_destroy(write_servers);
+				return false;
+			}
+		}
+
+		auto *tim = ucl_object_lookup(obj, "timeout");
+
+		if (tim) {
+			timeout = ucl_object_todouble(tim);
+		}
+
+		return true;
+	};
+
+	auto ret = false;
+	auto obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
+	if (obj != nullptr) {
+		ret = try_load_backend_config(obj);
+	}
+
+	/* Now try statfiles config */
+	if (!ret && st->stcf->opts) {
+		ret = try_load_backend_config(st->stcf->opts);
+	}
+
+	/* Now try classifier config */
+	if (!ret && st->classifier->cfg->opts) {
+		ret = try_load_backend_config(st->classifier->cfg->opts);
+	}
+
+	return ret;
+}
+
+auto http_backends_collection::remove_backend(struct rspamd_statfile *st) -> bool
+{
+	auto backend_it = std::remove(std::begin(backends), std::end(backends), st);
+
+	if (backend_it != std::end(backends)) {
+		/* Fast erasure with no order preservation */
+		std::swap(*backend_it, backends.back());
+		backends.pop_back();
+
+		if (backends.empty()) {
+			/* De-init collection - likely config reload */
+			if (read_servers) {
+				rspamd_upstreams_destroy(read_servers);
+				read_servers = nullptr;
+			}
+
+			if (write_servers) {
+				rspamd_upstreams_destroy(write_servers);
+				write_servers = nullptr;
+			}
+		}
+
+		return true;
+	}
+
+	return false;
+}
+
+upstream *http_backends_collection::get_upstream(bool is_learn)
+{
+	auto *ups_list = read_servers;
+	if (is_learn) {
+		ups_list = write_servers;
+	}
+
+	return rspamd_upstream_get(ups_list, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
+}
+
+}
+
+/* C API */
+
+gpointer
+rspamd_http_init(struct rspamd_stat_ctx* ctx,
+				struct rspamd_config* cfg,
+				struct rspamd_statfile* st)
+{
+	auto &collections = rspamd::stat::http::http_backends_collection::get();
+
+	if (!collections.add_backend(ctx, cfg, st)) {
+		msg_err_config("cannot load http backend");
+
+		return nullptr;
+	}
+
+	return (void *)&collections;
+}
+gpointer
+rspamd_http_runtime(struct rspamd_task* task,
+				   struct rspamd_statfile_config* stcf,
+				   gboolean learn,
+				   gpointer ctx)
+{
+	auto maybe_existing = rspamd_mempool_get_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME);
+
+	if (maybe_existing != nullptr) {
+		return maybe_existing;
+	}
+
+	auto runtime = rspamd::stat::http::http_backend_runtime::create(task, learn);
+
+	if (runtime) {
+		rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME,
+				(void *)runtime, nullptr);
+	}
+
+	return (void *)runtime;
+}
+
+gboolean
+rspamd_http_process_tokens(struct rspamd_task* task,
+						  GPtrArray* tokens,
+						  gint id,
+						  gpointer runtime)
+{
+	auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime;
+
+	if (real_runtime) {
+		/* TODO */
+		return true;
+	}
+
+
+	return false;
+
+}
+gboolean
+rspamd_http_finalize_process(struct rspamd_task* task,
+							gpointer runtime,
+							gpointer ctx)
+{
+	/* Not needed */
+	return true;
+}
+
+gboolean
+rspamd_http_learn_tokens(struct rspamd_task* task,
+						GPtrArray* tokens,
+						gint id,
+						gpointer runtime)
+{
+	auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime;
+
+	if (real_runtime) {
+		/* TODO */
+		return true;
+	}
+
+
+	return false;
+}
+gboolean
+rspamd_http_finalize_learn(struct rspamd_task* task,
+						  gpointer runtime,
+						  gpointer ctx,
+						  GError** err)
+{
+	return false;
+}
+
+gulong rspamd_http_total_learns(struct rspamd_task* task,
+							   gpointer runtime,
+							   gpointer ctx)
+{
+	/* TODO */
+	return 0;
+}
+gulong
+rspamd_http_inc_learns(struct rspamd_task* task,
+					  gpointer runtime,
+					  gpointer ctx)
+{
+	/* TODO */
+	return 0;
+}
+gulong
+rspamd_http_dec_learns(struct rspamd_task* task,
+					  gpointer runtime,
+					  gpointer ctx)
+{
+	/* TODO */
+	return (gulong)-1;
+}
+gulong
+rspamd_http_learns(struct rspamd_task* task,
+				  gpointer runtime,
+				  gpointer ctx)
+{
+	/* TODO */
+	return 0;
+}
+ucl_object_t*
+rspamd_http_get_stat(gpointer runtime, gpointer ctx)
+{
+	/* TODO */
+	return nullptr;
+}
+gpointer
+rspamd_http_load_tokenizer_config(gpointer runtime, gsize* len)
+{
+	return nullptr;
+}
+void
+rspamd_http_close(gpointer ctx)
+{
+	/* TODO */
+}
\ No newline at end of file


More information about the Commits mailing list