commit f8be5f5: [Minor] Move functions

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Dec 24 17:07:12 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-12-24 17:00:46 +0000
URL: https://github.com/rspamd/rspamd/commit/f8be5f536bda6428c8fdbe306b8ceab88dedcf58

[Minor] Move functions

---
 src/libserver/task.c | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++
 src/libserver/task.h |  10 +++++
 src/worker.c         | 122 +--------------------------------------------------
 src/worker_private.h |  10 -----
 4 files changed, 132 insertions(+), 131 deletions(-)

diff --git a/src/libserver/task.c b/src/libserver/task.c
index 86886b1e2..c1fcc752f 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -1857,4 +1857,125 @@ rspamd_task_stage_name (enum rspamd_task_stage stg)
 	}
 
 	return ret;
+}
+
+void
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
+{
+	struct rspamd_task *task = (struct rspamd_task *)w->data;
+
+	if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
+		ev_now_update_if_cheap (task->event_loop);
+		msg_info_task ("processing of task time out: %.1fs spent; %.1fs limit; "
+					   "forced processing",
+				ev_now (task->event_loop) - task->task_timestamp,
+				w->repeat);
+
+		if (task->cfg->soft_reject_on_timeout) {
+			struct rspamd_action *action, *soft_reject;
+
+			action = rspamd_check_action_metric (task);
+
+			if (action->action_type != METRIC_ACTION_REJECT) {
+				soft_reject = rspamd_config_get_action_by_type (task->cfg,
+						METRIC_ACTION_SOFT_REJECT);
+				rspamd_add_passthrough_result (task,
+						soft_reject,
+						0,
+						NAN,
+						"timeout processing message",
+						"task timeout",
+						0);
+
+				ucl_object_replace_key (task->messages,
+						ucl_object_fromstring_common ("timeout processing message",
+								0, UCL_STRING_RAW),
+						"smtp_message", 0,
+						false);
+			}
+		}
+
+		ev_timer_again (EV_A_ w);
+		task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
+		rspamd_session_cleanup (task->s);
+		rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+		rspamd_session_pending (task->s);
+	}
+	else {
+		/* Postprocessing timeout */
+		msg_info_task ("post-processing of task time out: %.1f second spent; forced processing",
+				ev_now (task->event_loop) - task->task_timestamp);
+
+		if (task->cfg->soft_reject_on_timeout) {
+			struct rspamd_action *action, *soft_reject;
+
+			action = rspamd_check_action_metric (task);
+
+			if (action->action_type != METRIC_ACTION_REJECT) {
+				soft_reject = rspamd_config_get_action_by_type (task->cfg,
+						METRIC_ACTION_SOFT_REJECT);
+				rspamd_add_passthrough_result (task,
+						soft_reject,
+						0,
+						NAN,
+						"timeout post-processing message",
+						"task timeout",
+						0);
+
+				ucl_object_replace_key (task->messages,
+						ucl_object_fromstring_common ("timeout post-processing message",
+								0, UCL_STRING_RAW),
+						"smtp_message", 0,
+						false);
+			}
+		}
+
+		ev_timer_stop (EV_A_ w);
+		task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
+		rspamd_session_cleanup (task->s);
+		rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+		rspamd_session_pending (task->s);
+	}
+}
+
+void
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
+{
+	struct rspamd_task *task = (struct rspamd_task *)w->data;
+	gchar fake_buf[1024];
+	gssize r;
+
+	r = read (w->fd, fake_buf, sizeof (fake_buf));
+
+	if (r > 0) {
+		msg_warn_task ("received extra data after task is loaded, ignoring");
+	}
+	else {
+		if (r == 0) {
+			/*
+			 * Poor man approach, that might break things in case of
+			 * shutdown (SHUT_WR) but sockets are so bad that there's no
+			 * reliable way to distinguish between shutdown(SHUT_WR) and
+			 * close.
+			 */
+			if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) {
+				msg_info_task ("workaround for shutdown enabled, please update "
+							   "your client, this support might be removed in future");
+				shutdown (w->fd, SHUT_RD);
+				ev_io_stop (task->event_loop, &task->guard_ev);
+			}
+			else {
+				msg_err_task ("the peer has closed connection unexpectedly");
+				rspamd_session_destroy (task->s);
+			}
+		}
+		else if (errno != EAGAIN) {
+			msg_err_task ("the peer has closed connection unexpectedly: %s",
+					strerror (errno));
+			rspamd_session_destroy (task->s);
+		}
+		else {
+			return;
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/libserver/task.h b/src/libserver/task.h
index feac456dd..50e07b23f 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -375,6 +375,16 @@ gboolean rspamd_task_set_finish_time (struct rspamd_task *task);
  */
 const gchar *rspamd_task_stage_name (enum rspamd_task_stage stg);
 
+/*
+ * Called on forced timeout
+ */
+void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
+
+/*
+ * Called on unexpected IO error (e.g. ECONNRESET)
+ */
+void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
+
 #ifdef  __cplusplus
 }
 #endif
diff --git a/src/worker.c b/src/worker.c
index b825603e1..4f13db469 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -102,127 +102,6 @@ reduce_tasks_count (gpointer arg)
 	}
 }
 
-void
-rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
-{
-	struct rspamd_task *task = (struct rspamd_task *)w->data;
-
-	if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
-		ev_now_update_if_cheap (task->event_loop);
-		msg_info_task ("processing of task time out: %.1f (%.1f limit) second spent; "
-				 "forced processing",
-				ev_now (task->event_loop) - task->task_timestamp,
-				task->cfg->task_timeout);
-
-		if (task->cfg->soft_reject_on_timeout) {
-			struct rspamd_action *action, *soft_reject;
-
-			action = rspamd_check_action_metric (task);
-
-			if (action->action_type != METRIC_ACTION_REJECT) {
-				soft_reject = rspamd_config_get_action_by_type (task->cfg,
-						METRIC_ACTION_SOFT_REJECT);
-				rspamd_add_passthrough_result (task,
-						soft_reject,
-						0,
-						NAN,
-						"timeout processing message",
-						"task timeout",
-						0);
-
-				ucl_object_replace_key (task->messages,
-						ucl_object_fromstring_common ("timeout processing message",
-								0, UCL_STRING_RAW),
-						"smtp_message", 0,
-						false);
-			}
-		}
-
-		ev_timer_again (EV_A_ w);
-		task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
-		rspamd_session_cleanup (task->s);
-		rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
-		rspamd_session_pending (task->s);
-	}
-	else {
-		/* Postprocessing timeout */
-		msg_info_task ("post-processing of task time out: %.1f second spent; forced processing",
-				ev_now (task->event_loop) - task->task_timestamp);
-
-		if (task->cfg->soft_reject_on_timeout) {
-			struct rspamd_action *action, *soft_reject;
-
-			action = rspamd_check_action_metric (task);
-
-			if (action->action_type != METRIC_ACTION_REJECT) {
-				soft_reject = rspamd_config_get_action_by_type (task->cfg,
-						METRIC_ACTION_SOFT_REJECT);
-				rspamd_add_passthrough_result (task,
-						soft_reject,
-						0,
-						NAN,
-						"timeout post-processing message",
-						"task timeout",
-						0);
-
-				ucl_object_replace_key (task->messages,
-						ucl_object_fromstring_common ("timeout post-processing message",
-								0, UCL_STRING_RAW),
-						"smtp_message", 0,
-						false);
-			}
-		}
-
-		ev_timer_stop (EV_A_ w);
-		task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
-		rspamd_session_cleanup (task->s);
-		rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
-		rspamd_session_pending (task->s);
-	}
-}
-
-void
-rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
-{
-	struct rspamd_task *task = (struct rspamd_task *)w->data;
-	gchar fake_buf[1024];
-	gssize r;
-
-	r = read (w->fd, fake_buf, sizeof (fake_buf));
-
-	if (r > 0) {
-		msg_warn_task ("received extra data after task is loaded, ignoring");
-	}
-	else {
-		if (r == 0) {
-			/*
-			 * Poor man approach, that might break things in case of
-			 * shutdown (SHUT_WR) but sockets are so bad that there's no
-			 * reliable way to distinguish between shutdown(SHUT_WR) and
-			 * close.
-			 */
-			if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) {
-				msg_info_task ("workaround for shutdown enabled, please update "
-						"your client, this support might be removed in future");
-				shutdown (w->fd, SHUT_RD);
-				ev_io_stop (task->event_loop, &task->guard_ev);
-			}
-			else {
-				msg_err_task ("the peer has closed connection unexpectedly");
-				rspamd_session_destroy (task->s);
-			}
-		}
-		else if (errno != EAGAIN) {
-			msg_err_task ("the peer has closed connection unexpectedly: %s",
-					strerror (errno));
-			rspamd_session_destroy (task->s);
-		}
-		else {
-			return;
-		}
-	}
-}
-
 static gint
 rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 	struct rspamd_http_message *msg,
@@ -312,6 +191,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 		ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
 				ctx->task_timeout,
 				ctx->task_timeout);
+		ev_set_priority (&task->timeout_ev, EV_MAXPRI);
 		ev_timer_start (task->event_loop, &task->timeout_ev);
 	}
 
diff --git a/src/worker_private.h b/src/worker_private.h
index cef2c9a19..62fec96f1 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -65,16 +65,6 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
 								 struct rspamd_dns_resolver *resolver,
 								 struct rspamd_lang_detector **plang_det);
 
-/*
- * Called on forced timeout
- */
-void rspamd_task_timeout (EV_P_ ev_timer *w, int revents);
-
-/*
- * Called on unexpected IO error (e.g. ECONNRESET)
- */
-void rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents);
-
 #ifdef  __cplusplus
 }
 #endif


More information about the Commits mailing list