commit 23b99d3: [Project] Allow to enable mempool debugging from the protocol
Vsevolod Stakhov
vsevolod at highsecure.ru
Mon Dec 23 18:49:16 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-12-23 18:43:16 +0000
URL: https://github.com/rspamd/rspamd/commit/23b99d31a9a620b3c62848da4126dbbf0642f6db (HEAD -> master)
[Project] Allow to enable mempool debugging from the protocol
---
src/controller.c | 12 +--
src/libserver/task.c | 6 +-
src/libserver/task.h | 3 +-
src/libserver/worker_util.c | 2 +-
src/lua/lua_task.c | 8 +-
src/lua/lua_util.c | 2 +-
src/plugins/fuzzy_check.c | 2 +-
src/rspamadm/lua_repl.c | 2 +-
src/rspamd_proxy.c | 2 +-
src/worker.c | 257 +++++++++++++++++++++++++++++++-------------
10 files changed, 205 insertions(+), 91 deletions(-)
diff --git a/src/controller.c b/src/controller.c
index e4ad7085d..d36431702 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -1514,7 +1514,7 @@ rspamd_controller_handle_lua_history (lua_State *L,
if (lua_isfunction (L, -1)) {
task = rspamd_task_new (session->ctx->worker, session->cfg,
- session->pool, ctx->lang_det, ctx->event_loop);
+ session->pool, ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->s = rspamd_session_create (session->pool,
@@ -1811,7 +1811,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent,
}
task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
- ctx->lang_det, ctx->event_loop);
+ ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->s = rspamd_session_create (session->pool,
@@ -1996,7 +1996,7 @@ rspamd_controller_handle_learn_common (
}
task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
- session->ctx->lang_det, ctx->event_loop);
+ session->ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->s = rspamd_session_create (session->pool,
@@ -2095,7 +2095,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
}
task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
- ctx->lang_det, ctx->event_loop);
+ ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->s = rspamd_session_create (session->pool,
@@ -2584,7 +2584,7 @@ rspamd_controller_handle_stat_common (
ctx = session->ctx;
task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
- ctx->lang_det, ctx->event_loop);
+ ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata));
cbdata->conn_ent = conn_ent;
@@ -2986,7 +2986,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e
}
task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
- ctx->lang_det, ctx->event_loop);
+ ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->s = rspamd_session_create (session->pool,
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 9eebe02a2..ccd959920 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -64,14 +64,16 @@ rspamd_task_new (struct rspamd_worker *worker,
struct rspamd_config *cfg,
rspamd_mempool_t *pool,
struct rspamd_lang_detector *lang_det,
- struct ev_loop *event_loop)
+ struct ev_loop *event_loop,
+ gboolean debug_mem)
{
struct rspamd_task *new_task;
rspamd_mempool_t *task_pool;
guint flags = 0;
if (pool == NULL) {
- task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "task", 0);
+ task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
+ "task", debug_mem ? RSPAMD_MEMPOOL_DEBUG : 0);
flags |= RSPAMD_TASK_FLAG_OWN_POOL;
}
else {
diff --git a/src/libserver/task.h b/src/libserver/task.h
index a96e2ac05..feac456dd 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -225,7 +225,8 @@ struct rspamd_task *rspamd_task_new (struct rspamd_worker *worker,
struct rspamd_config *cfg,
rspamd_mempool_t *pool,
struct rspamd_lang_detector *lang_det,
- struct ev_loop *event_loop);
+ struct ev_loop *event_loop,
+ gboolean debug_mem);
/**
* Destroy task object and remove its IO dispatcher if it exists
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 362d64bc5..e519cb985 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -143,7 +143,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
if (cfg->on_term_scripts) {
ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
/* Create a fake task object for async events */
- task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+ task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
task->s = rspamd_session_create (task->task_pool,
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index cf57ebf13..774bb0120 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -1615,7 +1615,7 @@ lua_task_load_from_file (lua_State * L)
}
}
- task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+ task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
task->msg.begin = data->str;
task->msg.len = data->len;
rspamd_mempool_add_destructor (task->task_pool,
@@ -1629,7 +1629,7 @@ lua_task_load_from_file (lua_State * L)
if (!map) {
err = strerror (errno);
} else {
- task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+ task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
task->msg.begin = map;
task->msg.len = sz;
rspamd_mempool_add_destructor (task->task_pool,
@@ -1683,7 +1683,7 @@ lua_task_load_from_string (lua_State * L)
}
}
- task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+ task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
task->msg.begin = g_malloc (message_len);
memcpy ((gchar *)task->msg.begin, str_message, message_len);
task->msg.len = message_len;
@@ -1729,7 +1729,7 @@ lua_task_create (lua_State * L)
}
}
- task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base);
+ task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base, FALSE);
task->flags |= RSPAMD_TASK_FLAG_EMPTY;
ptask = lua_newuserdata (L, sizeof (*ptask));
diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c
index b71339402..3a3561e2f 100644
--- a/src/lua/lua_util.c
+++ b/src/lua/lua_util.c
@@ -873,7 +873,7 @@ lua_util_process_message (lua_State *L)
if (cfg != NULL && message != NULL) {
base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
rspamd_init_filters (cfg, FALSE);
- task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
+ task = rspamd_task_new (NULL, cfg, NULL, NULL, base, FALSE);
task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
rspamd_strlcpy ((gpointer)task->msg.begin, message, mlen);
task->msg.len = mlen;
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 713edc41b..e8f02652d 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -2979,7 +2979,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
/* Prepare task */
task = rspamd_task_new (session->wrk, session->cfg, NULL,
- session->lang_det, conn_ent->rt->event_loop);
+ session->lang_det, conn_ent->rt->event_loop, FALSE);
task->cfg = ctx->cfg;
saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint));
err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *));
diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c
index ee22c4868..bceed5855 100644
--- a/src/rspamadm/lua_repl.c
+++ b/src/rspamadm/lua_repl.c
@@ -431,7 +431,7 @@ rspamadm_lua_message_handler (lua_State *L, gint argc, gchar **argv)
rspamd_printf ("cannot open %s: %s\n", argv[i], strerror (errno));
}
else {
- task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL);
+ task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL, FALSE);
if (!rspamd_task_load_message (task, NULL, map, len)) {
rspamd_printf ("cannot load %s\n", argv[i]);
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 6153ddd21..7e460c040 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -1754,7 +1754,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
msg = session->client_message;
task = rspamd_task_new (session->worker, session->ctx->cfg,
session->pool, session->ctx->lang_det,
- session->ctx->event_loop);
+ session->ctx->event_loop, FALSE);
task->flags |= RSPAMD_TASK_FLAG_MIME;
if (session->ctx->milter) {
diff --git a/src/worker.c b/src/worker.c
index b20800a60..8ba221c7a 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -66,6 +66,15 @@ worker_t normal_worker = {
G_STRFUNC, \
__VA_ARGS__)
+struct rspamd_worker_session {
+ gint64 magic;
+ struct rspamd_task *task;
+ gint fd;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_http_connection *http_conn;
+ struct rspamd_worker *worker;
+};
/*
* Reduce number of tasks proceeded
*/
@@ -216,10 +225,67 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
const gchar *chunk, gsize len)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+ struct rspamd_task *task;
struct rspamd_worker_ctx *ctx;
+ const rspamd_ftok_t *hv_tok;
+ gboolean debug_mempool = FALSE;
+
+ ctx = session->ctx;
+
+ /* Check debug */
+ if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) {
+ rspamd_ftok_t cmp;
+
+ RSPAMD_FTOK_ASSIGN (&cmp, "debug");
+
+ if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) {
+ debug_mempool = TRUE;
+ }
+ }
+
+ task = rspamd_task_new (session->worker,
+ session->ctx->cfg, NULL, session->ctx->lang_det,
+ session->ctx->event_loop,
+ debug_mempool);
+ session->task = task;
+
+ msg_info_task ("accepted connection from %s port %d, task ptr: %p",
+ rspamd_inet_address_to_string (session->addr),
+ rspamd_inet_address_get_port (session->addr),
+ task);
+
+ /* Copy some variables */
+ if (ctx->is_mime) {
+ task->flags |= RSPAMD_TASK_FLAG_MIME;
+ }
+ else {
+ task->flags &= ~RSPAMD_TASK_FLAG_MIME;
+ }
+
+ /* We actually transfer ownership from session to task here */
+ task->sock = session->fd;
+ task->client_addr = session->addr;
+ task->worker = session->worker;
+ task->http_conn = session->http_conn;
+
+ task->resolver = ctx->resolver;
+ /* TODO: allow to disable autolearn in protocol */
+ task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+
+ session->worker->nconns++;
+ rspamd_mempool_add_destructor (task->task_pool,
+ (rspamd_mempool_destruct_t)reduce_tasks_count,
+ session->worker);
+
+ /* Session memory is also now handled by task */
+ rspamd_mempool_add_destructor (task->task_pool,
+ (rspamd_mempool_destruct_t)g_free,
+ session);
- ctx = task->worker->ctx;
+ /* Set up async session */
+ task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
+ rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
if (!rspamd_protocol_handle_request (task, msg)) {
msg_err_task ("cannot handle request: %e", task->err);
@@ -248,7 +314,9 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
/* Set socket guard */
task->guard_ev.data = task;
- ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+ ev_io_init (&task->guard_ev,
+ rspamd_worker_guard_handler,
+ task->sock, EV_READ);
ev_io_start (task->event_loop, &task->guard_ev);
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
@@ -259,43 +327,84 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
static void
rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+ struct rspamd_task *task;
struct rspamd_http_message *msg;
rspamd_fstring_t *reply;
- msg_info_task ("abnormally closing connection from: %s, error: %e",
- rspamd_inet_address_to_string (task->client_addr), err);
- if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
- /* Terminate session immediately */
- rspamd_session_destroy (task->s);
+ /*
+ * This function can be called with both struct rspamd_worker_session *
+ * and struct rspamd_task *
+ *
+ * The first case is when we read message and it is controlled by this code;
+ * the second case is when a reply is written and we do not control it normally,
+ * as it is managed by `rspamd_protocol_reply` in protocol.c
+ *
+ * Hence, we need to distinguish our arguments...
+ *
+ * The approach here is simple:
+ * - struct rspamd_worker_session starts with gint64 `magic` and we set it to
+ * MAX_INT64
+ * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system)
+ *
+ * The idea is simple: no sane pointer would reach MAX_INT64, so if this field
+ * is MAX_INT64 then it is our session, and if it is not then it is a task.
+ */
+
+ if (session->magic == G_MAXINT64) {
+ task = session->task;
}
else {
- task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
- msg = rspamd_http_new_message (HTTP_RESPONSE);
+ task = (struct rspamd_task *)conn->ud;
+ }
+
+
+ if (task) {
+ msg_info_task ("abnormally closing connection from: %s, error: %e",
+ rspamd_inet_address_to_string_pretty (task->client_addr), err);
- if (err) {
- msg->status = rspamd_fstring_new_init (err->message,
- strlen (err->message));
- msg->code = err->code;
+ if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+ /* Terminate session immediately */
+ rspamd_session_destroy (task->s);
}
else {
- msg->status = rspamd_fstring_new_init ("Internal error",
- strlen ("Internal error"));
- msg->code = 500;
- }
+ task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
- msg->date = time (NULL);
-
- reply = rspamd_fstring_sized_new (msg->status->len + 16);
- rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
- rspamd_http_message_set_body_from_fstring_steal (msg, reply);
- rspamd_http_connection_reset (task->http_conn);
- rspamd_http_connection_write_message (task->http_conn,
- msg,
- NULL,
- "application/json",
- task,
- 1.0);
+ if (err) {
+ msg->status = rspamd_fstring_new_init (err->message,
+ strlen (err->message));
+ msg->code = err->code;
+ }
+ else {
+ msg->status = rspamd_fstring_new_init ("Internal error",
+ strlen ("Internal error"));
+ msg->code = 500;
+ }
+
+ msg->date = time (NULL);
+
+ reply = rspamd_fstring_sized_new (msg->status->len + 16);
+ rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
+ rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+ rspamd_http_connection_reset (task->http_conn);
+ rspamd_http_connection_write_message (task->http_conn,
+ msg,
+ NULL,
+ "application/json",
+ task,
+ 1.0);
+ }
+ }
+ else {
+ /* If there was no task, then session is unmanaged */
+ msg_info ("no data received from: %s, error: %e",
+ rspamd_inet_address_to_string_pretty (session->addr), err);
+ rspamd_http_connection_reset (session->http_conn);
+ rspamd_http_connection_unref (session->http_conn);
+ rspamd_inet_address_free (session->addr);
+ close (session->fd);
+ g_free (session);
}
}
@@ -303,16 +412,38 @@ static gint
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+ struct rspamd_task *task;
- if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
- /* We are done here */
- msg_debug_task ("normally closing connection from: %s",
- rspamd_inet_address_to_string (task->client_addr));
- rspamd_session_destroy (task->s);
+ /* Read the comment to rspamd_worker_error_handler */
+
+ if (session->magic == G_MAXINT64) {
+ task = session->task;
}
- else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
- rspamd_session_pending (task->s);
+ else {
+ task = (struct rspamd_task *)conn->ud;
+ }
+
+ if (task) {
+ if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+ /* We are done here */
+ msg_debug_task ("normally closing connection from: %s",
+ rspamd_inet_address_to_string (task->client_addr));
+ rspamd_session_destroy (task->s);
+ }
+ else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
+ rspamd_session_pending (task->s);
+ }
+ }
+ else {
+ /* If there was no task, then session is unmanaged */
+ msg_info ("no data received from: %s, closing connection",
+ rspamd_inet_address_to_string_pretty (session->addr));
+ rspamd_inet_address_free (session->addr);
+ rspamd_http_connection_reset (session->http_conn);
+ rspamd_http_connection_unref (session->http_conn);
+ close (session->fd);
+ g_free (session);
}
return 0;
@@ -326,7 +457,7 @@ accept_socket (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_worker_ctx *ctx;
- struct rspamd_task *task;
+ struct rspamd_worker_session *session;
rspamd_inet_addr_t *addr;
gint nfd, http_opts = 0;
@@ -350,55 +481,35 @@ accept_socket (EV_P_ ev_io *w, int revents)
return;
}
- task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
-
- msg_info_task ("accepted connection from %s port %d, task ptr: %p",
- rspamd_inet_address_to_string (addr),
- rspamd_inet_address_get_port (addr),
- task);
-
- /* Copy some variables */
- if (ctx->is_mime) {
- task->flags |= RSPAMD_TASK_FLAG_MIME;
- }
- else {
- task->flags &= ~RSPAMD_TASK_FLAG_MIME;
- }
-
- task->sock = nfd;
- task->client_addr = addr;
-
- worker->srv->stat->connections_count++;
- task->resolver = ctx->resolver;
- /* TODO: allow to disable autolearn in protocol */
- task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+ session = g_malloc (sizeof (*session));
+ session->magic = G_MAXINT64;
+ session->addr = addr;
+ session->fd = nfd;
+ session->ctx = ctx;
+ session->worker = worker;
if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) {
http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
}
- task->http_conn = rspamd_http_connection_new_server (
+ session->http_conn = rspamd_http_connection_new_server (
ctx->http_ctx,
nfd,
rspamd_worker_body_handler,
rspamd_worker_error_handler,
rspamd_worker_finish_handler,
http_opts);
- rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message);
- worker->nconns++;
- rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
- /* Set up async session */
- task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
- rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
+ worker->srv->stat->connections_count++;
+ rspamd_http_connection_set_max_size (session->http_conn,
+ ctx->cfg->max_message);
if (ctx->key) {
- rspamd_http_connection_set_key (task->http_conn, ctx->key);
+ rspamd_http_connection_set_key (session->http_conn, ctx->key);
}
- rspamd_http_connection_read_message (task->http_conn,
- task,
+ rspamd_http_connection_read_message (session->http_conn,
+ session,
ctx->timeout);
}
More information about the Commits
mailing list