commit 23b99d3: [Project] Allow to enable mempool debugging from the protocol

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Dec 23 18:49:16 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-12-23 18:43:16 +0000
URL: https://github.com/rspamd/rspamd/commit/23b99d31a9a620b3c62848da4126dbbf0642f6db (HEAD -> master)

[Project] Allow to enable mempool debugging from the protocol

---
 src/controller.c            |  12 +--
 src/libserver/task.c        |   6 +-
 src/libserver/task.h        |   3 +-
 src/libserver/worker_util.c |   2 +-
 src/lua/lua_task.c          |   8 +-
 src/lua/lua_util.c          |   2 +-
 src/plugins/fuzzy_check.c   |   2 +-
 src/rspamadm/lua_repl.c     |   2 +-
 src/rspamd_proxy.c          |   2 +-
 src/worker.c                | 257 +++++++++++++++++++++++++++++++-------------
 10 files changed, 205 insertions(+), 91 deletions(-)

diff --git a/src/controller.c b/src/controller.c
index e4ad7085d..d36431702 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -1514,7 +1514,7 @@ rspamd_controller_handle_lua_history (lua_State *L,
 
 			if (lua_isfunction (L, -1)) {
 				task = rspamd_task_new (session->ctx->worker, session->cfg,
-						session->pool, ctx->lang_det, ctx->event_loop);
+						session->pool, ctx->lang_det, ctx->event_loop, FALSE);
 
 				task->resolver = ctx->resolver;
 				task->s = rspamd_session_create (session->pool,
@@ -1811,7 +1811,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent,
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->event_loop);
+			ctx->lang_det, ctx->event_loop, FALSE);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -1996,7 +1996,7 @@ rspamd_controller_handle_learn_common (
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			session->ctx->lang_det, ctx->event_loop);
+			session->ctx->lang_det, ctx->event_loop, FALSE);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -2095,7 +2095,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->event_loop);
+			ctx->lang_det, ctx->event_loop, FALSE);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
@@ -2584,7 +2584,7 @@ rspamd_controller_handle_stat_common (
 	ctx = session->ctx;
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->event_loop);
+			ctx->lang_det, ctx->event_loop, FALSE);
 	task->resolver = ctx->resolver;
 	cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata));
 	cbdata->conn_ent = conn_ent;
@@ -2986,7 +2986,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e
 	}
 
 	task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-			ctx->lang_det, ctx->event_loop);
+			ctx->lang_det, ctx->event_loop, FALSE);
 
 	task->resolver = ctx->resolver;
 	task->s = rspamd_session_create (session->pool,
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 9eebe02a2..ccd959920 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -64,14 +64,16 @@ rspamd_task_new (struct rspamd_worker *worker,
 				 struct rspamd_config *cfg,
 				 rspamd_mempool_t *pool,
 				 struct rspamd_lang_detector *lang_det,
-				 struct ev_loop *event_loop)
+				 struct ev_loop *event_loop,
+				 gboolean debug_mem)
 {
 	struct rspamd_task *new_task;
 	rspamd_mempool_t *task_pool;
 	guint flags = 0;
 
 	if (pool == NULL) {
-		task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "task", 0);
+		task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
+				"task", debug_mem ? RSPAMD_MEMPOOL_DEBUG : 0);
 		flags |= RSPAMD_TASK_FLAG_OWN_POOL;
 	}
 	else {
diff --git a/src/libserver/task.h b/src/libserver/task.h
index a96e2ac05..feac456dd 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -225,7 +225,8 @@ struct rspamd_task *rspamd_task_new (struct rspamd_worker *worker,
 									 struct rspamd_config *cfg,
 									 rspamd_mempool_t *pool,
 									 struct rspamd_lang_detector *lang_det,
-									 struct ev_loop *event_loop);
+									 struct ev_loop *event_loop,
+									 gboolean debug_mem);
 
 /**
  * Destroy task object and remove its IO dispatcher if it exists
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 362d64bc5..e519cb985 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -143,7 +143,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
 	if (cfg->on_term_scripts) {
 		ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
 		/* Create a fake task object for async events */
-		task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+		task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE);
 		task->resolver = ctx->resolver;
 		task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
 		task->s = rspamd_session_create (task->task_pool,
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index cf57ebf13..774bb0120 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -1615,7 +1615,7 @@ lua_task_load_from_file (lua_State * L)
 				}
 			}
 
-			task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+			task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
 			task->msg.begin = data->str;
 			task->msg.len = data->len;
 			rspamd_mempool_add_destructor (task->task_pool,
@@ -1629,7 +1629,7 @@ lua_task_load_from_file (lua_State * L)
 			if (!map) {
 				err = strerror (errno);
 			} else {
-				task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+				task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
 				task->msg.begin = map;
 				task->msg.len = sz;
 				rspamd_mempool_add_destructor (task->task_pool,
@@ -1683,7 +1683,7 @@ lua_task_load_from_string (lua_State * L)
 			}
 		}
 
-		task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+		task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
 		task->msg.begin = g_malloc (message_len);
 		memcpy ((gchar *)task->msg.begin, str_message, message_len);
 		task->msg.len  = message_len;
@@ -1729,7 +1729,7 @@ lua_task_create (lua_State * L)
 		}
 	}
 
-	task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base);
+	task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base, FALSE);
 	task->flags |= RSPAMD_TASK_FLAG_EMPTY;
 
 	ptask = lua_newuserdata (L, sizeof (*ptask));
diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c
index b71339402..3a3561e2f 100644
--- a/src/lua/lua_util.c
+++ b/src/lua/lua_util.c
@@ -873,7 +873,7 @@ lua_util_process_message (lua_State *L)
 	if (cfg != NULL && message != NULL) {
 		base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
 		rspamd_init_filters (cfg, FALSE);
-		task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
+		task = rspamd_task_new (NULL, cfg, NULL, NULL, base, FALSE);
 		task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
 		rspamd_strlcpy ((gpointer)task->msg.begin, message, mlen);
 		task->msg.len = mlen;
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 713edc41b..e8f02652d 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -2979,7 +2979,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
 
 	/* Prepare task */
 	task = rspamd_task_new (session->wrk, session->cfg, NULL,
-			session->lang_det, conn_ent->rt->event_loop);
+			session->lang_det, conn_ent->rt->event_loop, FALSE);
 	task->cfg = ctx->cfg;
 	saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint));
 	err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *));
diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c
index ee22c4868..bceed5855 100644
--- a/src/rspamadm/lua_repl.c
+++ b/src/rspamadm/lua_repl.c
@@ -431,7 +431,7 @@ rspamadm_lua_message_handler (lua_State *L, gint argc, gchar **argv)
 			rspamd_printf ("cannot open %s: %s\n", argv[i], strerror (errno));
 		}
 		else {
-			task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL);
+			task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL, FALSE);
 
 			if (!rspamd_task_load_message (task, NULL, map, len)) {
 				rspamd_printf ("cannot load %s\n", argv[i]);
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 6153ddd21..7e460c040 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -1754,7 +1754,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
 	msg = session->client_message;
 	task = rspamd_task_new (session->worker, session->ctx->cfg,
 			session->pool, session->ctx->lang_det,
-			session->ctx->event_loop);
+			session->ctx->event_loop, FALSE);
 	task->flags |= RSPAMD_TASK_FLAG_MIME;
 
 	if (session->ctx->milter) {
diff --git a/src/worker.c b/src/worker.c
index b20800a60..8ba221c7a 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -66,6 +66,15 @@ worker_t normal_worker = {
         G_STRFUNC, \
         __VA_ARGS__)
 
+struct rspamd_worker_session {
+	gint64 magic;
+	struct rspamd_task *task;
+	gint fd;
+	rspamd_inet_addr_t *addr;
+	struct rspamd_worker_ctx *ctx;
+	struct rspamd_http_connection *http_conn;
+	struct rspamd_worker *worker;
+};
 /*
  * Reduce number of tasks proceeded
  */
@@ -216,10 +225,67 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 	struct rspamd_http_message *msg,
 	const gchar *chunk, gsize len)
 {
-	struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+	struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+	struct rspamd_task *task;
 	struct rspamd_worker_ctx *ctx;
+	const rspamd_ftok_t *hv_tok;
+	gboolean debug_mempool = FALSE;
+
+	ctx = session->ctx;
+
+	/* Check debug */
+	if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) {
+		rspamd_ftok_t cmp;
+
+		RSPAMD_FTOK_ASSIGN (&cmp, "debug");
+
+		if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) {
+			debug_mempool = TRUE;
+		}
+	}
+
+	task = rspamd_task_new (session->worker,
+			session->ctx->cfg, NULL, session->ctx->lang_det,
+			session->ctx->event_loop,
+			debug_mempool);
+	session->task = task;
+
+	msg_info_task ("accepted connection from %s port %d, task ptr: %p",
+			rspamd_inet_address_to_string (session->addr),
+			rspamd_inet_address_get_port (session->addr),
+			task);
+
+	/* Copy some variables */
+	if (ctx->is_mime) {
+		task->flags |= RSPAMD_TASK_FLAG_MIME;
+	}
+	else {
+		task->flags &= ~RSPAMD_TASK_FLAG_MIME;
+	}
+
+	/* We actually transfer ownership from session to task here  */
+	task->sock = session->fd;
+	task->client_addr = session->addr;
+	task->worker = session->worker;
+	task->http_conn = session->http_conn;
+
+	task->resolver = ctx->resolver;
+	/* TODO: allow to disable autolearn in protocol */
+	task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+
+	session->worker->nconns++;
+	rspamd_mempool_add_destructor (task->task_pool,
+			(rspamd_mempool_destruct_t)reduce_tasks_count,
+			session->worker);
+
+	/* Session memory is also now handled by task */
+	rspamd_mempool_add_destructor (task->task_pool,
+			(rspamd_mempool_destruct_t)g_free,
+			session);
 
-	ctx = task->worker->ctx;
+	/* Set up async session */
+	task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
+			rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
 
 	if (!rspamd_protocol_handle_request (task, msg)) {
 		msg_err_task ("cannot handle request: %e", task->err);
@@ -248,7 +314,9 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
 	/* Set socket guard */
 	task->guard_ev.data = task;
-	ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+	ev_io_init (&task->guard_ev,
+			rspamd_worker_guard_handler,
+			task->sock, EV_READ);
 	ev_io_start (task->event_loop, &task->guard_ev);
 
 	rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
@@ -259,43 +327,84 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 static void
 rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
 {
-	struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+	struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+	struct rspamd_task *task;
 	struct rspamd_http_message *msg;
 	rspamd_fstring_t *reply;
 
-	msg_info_task ("abnormally closing connection from: %s, error: %e",
-		rspamd_inet_address_to_string (task->client_addr), err);
-	if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
-		/* Terminate session immediately */
-		rspamd_session_destroy (task->s);
+	/*
+	 * This function can be called with both struct rspamd_worker_session *
+	 * and struct rspamd_task *
+	 *
+	 * The first case is when we read message and it is controlled by this code;
+	 * the second case is when a reply is written and we do not control it normally,
+	 * as it is managed by `rspamd_protocol_reply` in protocol.c
+	 *
+	 * Hence, we need to distinguish our arguments...
+	 *
+	 * The approach here is simple:
+	 * - struct rspamd_worker_session starts with gint64 `magic` and we set it to
+	 * MAX_INT64
+	 * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system)
+	 *
+	 * The idea is simple: no sane pointer would reach MAX_INT64, so if this field
+	 * is MAX_INT64 then it is our session, and if it is not then it is a task.
+	 */
+
+	if (session->magic == G_MAXINT64) {
+		task = session->task;
 	}
 	else {
-		task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
-		msg = rspamd_http_new_message (HTTP_RESPONSE);
+		task = (struct rspamd_task *)conn->ud;
+	}
+
+
+	if (task) {
+		msg_info_task ("abnormally closing connection from: %s, error: %e",
+				rspamd_inet_address_to_string_pretty (task->client_addr), err);
 
-		if (err) {
-			msg->status = rspamd_fstring_new_init (err->message,
-					strlen (err->message));
-			msg->code = err->code;
+		if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+			/* Terminate session immediately */
+			rspamd_session_destroy (task->s);
 		}
 		else {
-			msg->status = rspamd_fstring_new_init ("Internal error",
-					strlen ("Internal error"));
-			msg->code = 500;
-		}
+			task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
+			msg = rspamd_http_new_message (HTTP_RESPONSE);
 
-		msg->date = time (NULL);
-
-		reply = rspamd_fstring_sized_new (msg->status->len + 16);
-		rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
-		rspamd_http_message_set_body_from_fstring_steal (msg, reply);
-		rspamd_http_connection_reset (task->http_conn);
-		rspamd_http_connection_write_message (task->http_conn,
-				msg,
-				NULL,
-				"application/json",
-				task,
-				1.0);
+			if (err) {
+				msg->status = rspamd_fstring_new_init (err->message,
+						strlen (err->message));
+				msg->code = err->code;
+			}
+			else {
+				msg->status = rspamd_fstring_new_init ("Internal error",
+						strlen ("Internal error"));
+				msg->code = 500;
+			}
+
+			msg->date = time (NULL);
+
+			reply = rspamd_fstring_sized_new (msg->status->len + 16);
+			rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
+			rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+			rspamd_http_connection_reset (task->http_conn);
+			rspamd_http_connection_write_message (task->http_conn,
+					msg,
+					NULL,
+					"application/json",
+					task,
+					1.0);
+		}
+	}
+	else {
+		/* If there was no task, then session is unmanaged */
+		msg_info ("no data received from: %s, error: %e",
+				rspamd_inet_address_to_string_pretty (session->addr), err);
+		rspamd_http_connection_reset (session->http_conn);
+		rspamd_http_connection_unref (session->http_conn);
+		rspamd_inet_address_free (session->addr);
+		close (session->fd);
+		g_free (session);
 	}
 }
 
@@ -303,16 +412,38 @@ static gint
 rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
 	struct rspamd_http_message *msg)
 {
-	struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+	struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+	struct rspamd_task *task;
 
-	if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
-		/* We are done here */
-		msg_debug_task ("normally closing connection from: %s",
-			rspamd_inet_address_to_string (task->client_addr));
-		rspamd_session_destroy (task->s);
+	/* Read the comment to rspamd_worker_error_handler */
+
+	if (session->magic == G_MAXINT64) {
+		task = session->task;
 	}
-	else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
-		rspamd_session_pending (task->s);
+	else {
+		task = (struct rspamd_task *)conn->ud;
+	}
+
+	if (task) {
+		if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+			/* We are done here */
+			msg_debug_task ("normally closing connection from: %s",
+					rspamd_inet_address_to_string (task->client_addr));
+			rspamd_session_destroy (task->s);
+		}
+		else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
+			rspamd_session_pending (task->s);
+		}
+	}
+	else {
+		/* If there was no task, then session is unmanaged */
+		msg_info ("no data received from: %s, closing connection",
+				rspamd_inet_address_to_string_pretty (session->addr));
+		rspamd_inet_address_free (session->addr);
+		rspamd_http_connection_reset (session->http_conn);
+		rspamd_http_connection_unref (session->http_conn);
+		close (session->fd);
+		g_free (session);
 	}
 
 	return 0;
@@ -326,7 +457,7 @@ accept_socket (EV_P_ ev_io *w, int revents)
 {
 	struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
 	struct rspamd_worker_ctx *ctx;
-	struct rspamd_task *task;
+	struct rspamd_worker_session *session;
 	rspamd_inet_addr_t *addr;
 	gint nfd, http_opts = 0;
 
@@ -350,55 +481,35 @@ accept_socket (EV_P_ ev_io *w, int revents)
 		return;
 	}
 
-	task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
-
-	msg_info_task ("accepted connection from %s port %d, task ptr: %p",
-		rspamd_inet_address_to_string (addr),
-		rspamd_inet_address_get_port (addr),
-		task);
-
-	/* Copy some variables */
-	if (ctx->is_mime) {
-		task->flags |= RSPAMD_TASK_FLAG_MIME;
-	}
-	else {
-		task->flags &= ~RSPAMD_TASK_FLAG_MIME;
-	}
-
-	task->sock = nfd;
-	task->client_addr = addr;
-
-	worker->srv->stat->connections_count++;
-	task->resolver = ctx->resolver;
-	/* TODO: allow to disable autolearn in protocol */
-	task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+	session = g_malloc (sizeof (*session));
+	session->magic = G_MAXINT64;
+	session->addr = addr;
+	session->fd = nfd;
+	session->ctx = ctx;
+	session->worker = worker;
 
 	if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) {
 		http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
 	}
 
-	task->http_conn = rspamd_http_connection_new_server (
+	session->http_conn = rspamd_http_connection_new_server (
 			ctx->http_ctx,
 			nfd,
 			rspamd_worker_body_handler,
 			rspamd_worker_error_handler,
 			rspamd_worker_finish_handler,
 			http_opts);
-	rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message);
-	worker->nconns++;
-	rspamd_mempool_add_destructor (task->task_pool,
-		(rspamd_mempool_destruct_t)reduce_tasks_count, worker);
 
-	/* Set up async session */
-	task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
-			rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
+	worker->srv->stat->connections_count++;
+	rspamd_http_connection_set_max_size (session->http_conn,
+			ctx->cfg->max_message);
 
 	if (ctx->key) {
-		rspamd_http_connection_set_key (task->http_conn, ctx->key);
+		rspamd_http_connection_set_key (session->http_conn, ctx->key);
 	}
 
-	rspamd_http_connection_read_message (task->http_conn,
-			task,
+	rspamd_http_connection_read_message (session->http_conn,
+			session,
 			ctx->timeout);
 }
 


More information about the Commits mailing list