commit 845ae26: [Feature] Allow fuzzy workers to exchange blocked information

Vsevolod Stakhov vsevolod at rspamd.com
Sat Jul 1 15:49:03 UTC 2023


Author: Vsevolod Stakhov
Date: 2023-07-01 13:32:22 +0100
URL: https://github.com/rspamd/rspamd/commit/845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5

[Feature] Allow fuzzy workers to exchange blocked information

---
 src/fuzzy_storage.c               | 107 ++++++++++++++++++++++++++++++++++++--
 src/libserver/hyperscan_tools.cxx |   2 +-
 src/libserver/rspamd_control.c    |  18 ++++++-
 src/libserver/rspamd_control.h    |  27 +++++++++-
 4 files changed, 147 insertions(+), 7 deletions(-)

diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 33f9d40e8..31a2c46e6 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -273,7 +273,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
 		(time_t)session->timestamp);
 
 	if (elt) {
-		gboolean ratelimited = FALSE;
+		gboolean ratelimited = FALSE, new_ratelimit = FALSE;
 
 		if (isnan (elt->cur)) {
 			/* Ratelimit exceeded, preserve it for the whole ttl */
@@ -301,18 +301,40 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
 						rspamd_inet_address_to_string (masked),
 						session->ctx->leaky_bucket_burst);
 				elt->cur = NAN;
+				new_ratelimit = TRUE;
 			}
 			else {
 				elt->cur ++; /* Allow one more request */
 			}
 		}
 
-		rspamd_inet_address_free (masked);
-
 		if (ratelimited) {
 			rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
 		}
 
+		if (new_ratelimit) {
+			struct rspamd_srv_command srv_cmd;
+
+			srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
+			srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
+
+			if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
+				socklen_t slen;
+				struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
+
+				if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
+					memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
+					msg_debug("propagating blocked address to other workers");
+					rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+				}
+				else {
+					msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+				}
+			}
+		}
+
+		rspamd_inet_address_free (masked);
+
 		return !ratelimited;
 	}
 	else {
@@ -1906,6 +1928,83 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
 	return TRUE;
 }
 
+static gboolean
+rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main,
+						   struct rspamd_worker *worker, gint fd,
+						   gint attached_fd,
+						   struct rspamd_control_command *cmd,
+						   gpointer ud)
+{
+	struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud;
+	struct rspamd_control_reply rep;
+	struct rspamd_leaky_bucket_elt *elt;
+	ev_tstamp now = ev_now (ctx->event_loop);
+	rspamd_inet_addr_t *addr = NULL;
+
+	rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+	rep.reply.fuzzy_blocked.status = 0;
+
+	if (cmd->cmd.fuzzy_blocked.af == AF_INET) {
+		addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+				sizeof (struct sockaddr_in));
+	}
+	else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) {
+		addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+				sizeof (struct sockaddr_in6));
+	}
+	else {
+		msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af);
+		rep.reply.fuzzy_blocked.status = -1;
+	}
+
+	if (addr) {
+		elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr,
+			(time_t) now);
+
+		if (elt) {
+			if (isnan (elt->cur)) {
+				/* Already ratelimited, ignore */
+			}
+			else {
+				elt->last = now;
+				elt->cur = NAN;
+
+				msg_info ("propagating ratelimiting %s, %.1f max elts",
+					rspamd_inet_address_to_string(addr),
+					ctx->leaky_bucket_burst);
+				rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+			}
+
+			rspamd_inet_address_free(addr);
+
+		}
+		else {
+			/* New bucket */
+			elt = g_malloc(sizeof(*elt));
+			elt->addr = addr; /* transfer ownership */
+			elt->cur = NAN;
+			elt->last = now;
+
+			rspamd_lru_hash_insert(ctx->ratelimit_buckets,
+				addr,
+				elt,
+				(time_t)now,
+				ctx->leaky_bucket_ttl);
+			msg_info ("propagating ratelimiting %s, %.1f max elts",
+				rspamd_inet_address_to_string(addr),
+				ctx->leaky_bucket_burst);
+			rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+		}
+	}
+
+	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+		msg_err ("cannot write reply to the control socket: %s",
+			strerror (errno));
+	}
+
+	return TRUE;
+}
+
 static gboolean
 rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
 		struct rspamd_worker *worker, gint fd,
@@ -2856,6 +2955,8 @@ start_fuzzy (struct rspamd_worker *worker)
 			rspamd_fuzzy_storage_stat, ctx);
 	rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
 			rspamd_fuzzy_storage_sync, ctx);
+	rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED,
+		rspamd_fuzzy_control_blocked, ctx);
 
 
 	if (ctx->update_map != NULL) {
diff --git a/src/libserver/hyperscan_tools.cxx b/src/libserver/hyperscan_tools.cxx
index 8159cbd26..6fdc72e87 100644
--- a/src/libserver/hyperscan_tools.cxx
+++ b/src/libserver/hyperscan_tools.cxx
@@ -609,7 +609,7 @@ rspamd_hyperscan_notice_known(const char *fname)
 				(int)strlen(fname), fname, (int)sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
 		}
 		else {
-			notice_cmd.type = RSPAMD_NOTICE_HYPERSCAN_CACHE;
+			notice_cmd.type = RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE;
 			rspamd_strlcpy(notice_cmd.cmd.hyperscan_cache_file.path, fname, sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
 			rspamd_srv_send_command(rspamd_current_worker,
 				rspamd_current_worker->srv->event_loop, &notice_cmd, -1,
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 82913a19f..cbafec270 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -629,6 +629,7 @@ rspamd_control_default_cmd_handler (gint fd,
 	case RSPAMD_CONTROL_FUZZY_SYNC:
 	case RSPAMD_CONTROL_LOG_PIPE:
 	case RSPAMD_CONTROL_CHILD_CHANGE:
+	case RSPAMD_CONTROL_FUZZY_BLOCKED:
 		break;
 	case RSPAMD_CONTROL_RERESOLVE:
 		if (cd->worker->srv->cfg) {
@@ -1020,12 +1021,22 @@ rspamd_srv_handler (EV_P_ ev_io *w, int revents)
 			case RSPAMD_SRV_HEALTH:
 				rspamd_fill_health_reply (srv, &rdata->rep);
 				break;
-			case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+			case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
 #ifdef WITH_HYPERSCAN
 				rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
 #endif
 				rdata->rep.reply.hyperscan_cache_file.unused = 0;
 				break;
+			case RSPAMD_SRV_FUZZY_BLOCKED:
+				/* Broadcast command to all workers */
+				memset (&wcmd, 0, sizeof (wcmd));
+				wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+				/* Ensure that memcpy is safe */
+				G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
+				memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
+				rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+					rspamd_control_ignore_io_handler, NULL, worker->pid);
+				break;
 			default:
 				msg_err ("unknown command type: %d", cmd.type);
 				break;
@@ -1354,9 +1365,12 @@ const gchar *rspamd_srv_command_to_string (enum rspamd_srv_type cmd)
 	case RSPAMD_SRV_HEALTH:
 		reply = "health";
 		break;
-	case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+	case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
 		reply = "notice_hyperscan_cache";
 		break;
+	case RSPAMD_SRV_FUZZY_BLOCKED:
+		reply = "fuzzy_blocked";
+		break;
 	}
 
 	return reply;
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index 049c9b80c..dd661c145 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -36,6 +36,7 @@ enum rspamd_control_type {
 	RSPAMD_CONTROL_FUZZY_SYNC,
 	RSPAMD_CONTROL_MONITORED_CHANGE,
 	RSPAMD_CONTROL_CHILD_CHANGE,
+	RSPAMD_CONTROL_FUZZY_BLOCKED,
 	RSPAMD_CONTROL_MAX
 };
 
@@ -47,7 +48,8 @@ enum rspamd_srv_type {
 	RSPAMD_SRV_ON_FORK,
 	RSPAMD_SRV_HEARTBEAT,
 	RSPAMD_SRV_HEALTH,
-	RSPAMD_NOTICE_HYPERSCAN_CACHE,
+	RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE,
+	RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
 };
 
 enum rspamd_log_pipe_type {
@@ -96,6 +98,14 @@ struct rspamd_control_command {
 			pid_t pid;
 			guint additional;
 		} child_change;
+		struct {
+			union {
+				struct sockaddr sa;
+				struct sockaddr_in s4;
+				struct sockaddr_in6 s6;
+			} addr;
+			sa_family_t af;
+		} fuzzy_blocked;
 	} cmd;
 };
 
@@ -134,6 +144,9 @@ struct rspamd_control_reply {
 		struct {
 			guint status;
 		} fuzzy_sync;
+		struct {
+			guint status;
+		} fuzzy_blocked;
 	} reply;
 };
 
@@ -179,6 +192,15 @@ struct rspamd_srv_command {
 		struct {
 			char path[CONTROL_PATHLEN];
 		} hyperscan_cache_file;
+		/* Send when one worker has blocked some IP address */
+		struct {
+			union {
+				struct sockaddr sa;
+				struct sockaddr_in s4;
+				struct sockaddr_in6 s6;
+			} addr;
+			sa_family_t af;
+		} fuzzy_blocked;
 	} cmd;
 };
 
@@ -213,6 +235,9 @@ struct rspamd_srv_reply {
 		struct {
 			int unused;
 		} hyperscan_cache_file;
+		struct {
+			int unused;
+		} fuzzy_blocked;
 	} reply;
 };
 


More information about the Commits mailing list