commit 7984ea1: [Feature] Fuzzy_storage: add preliminary support of rate limits

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Dec 27 18:28:04 UTC 2018


Author: Vsevolod Stakhov
Date: 2018-12-08 14:44:52 +0000
URL: https://github.com/rspamd/rspamd/commit/7984ea11efc5318ef4f587726cdc7c0886d9f5a5

[Feature] Fuzzy_storage: add preliminary support of rate limits

---
 src/fuzzy_storage.c | 222 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 221 insertions(+), 1 deletion(-)

diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index a3486635f..7fa821e9b 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -17,8 +17,8 @@
  * Rspamd fuzzy storage server
  */
 
-#include <src/libserver/fuzzy_wire.h>
 #include "config.h"
+#include "libserver/fuzzy_wire.h"
 #include "util.h"
 #include "rspamd.h"
 #include "map.h"
@@ -37,14 +37,20 @@
 #include "libutil/map_private.h"
 #include "libutil/hash.h"
 #include "libutil/http_private.h"
+#include "libutil/hash.h"
 #include "unix-std.h"
 
+#include <math.h>
+
 /* Resync value in seconds */
 #define DEFAULT_SYNC_TIMEOUT 60.0
 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
 #define DEFAULT_MASTER_TIMEOUT 10.0
 #define DEFAULT_UPDATES_MAXFAIL 3
 #define COOKIE_SIZE 128
+#define DEFAULT_MAX_BUCKETS 2000
+#define DEFAULT_BUCKET_TTL 3600
+#define DEFAULT_BUCKET_MASK 24
 
 static const gchar *local_db_name = "local";
 
@@ -115,6 +121,12 @@ struct rspamd_fuzzy_mirror {
 	struct rspamd_cryptobox_pubkey *key;
 };
 
+struct rspamd_leaky_bucket_elt {
+	rspamd_inet_addr_t *addr;
+	gdouble last;
+	gdouble cur;
+};
+
 static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
 
 struct rspamd_fuzzy_storage_ctx {
@@ -132,6 +144,7 @@ struct rspamd_fuzzy_storage_ctx {
 	struct rspamd_radix_map_helper *update_ips;
 	struct rspamd_radix_map_helper *master_ips;
 	struct rspamd_radix_map_helper *blocked_ips;
+	struct rspamd_radix_map_helper *ratelimit_whitelist;
 
 	struct rspamd_cryptobox_keypair *sync_keypair;
 	struct rspamd_cryptobox_pubkey *master_key;
@@ -141,6 +154,7 @@ struct rspamd_fuzzy_storage_ctx {
 	const ucl_object_t *update_map;
 	const ucl_object_t *masters_map;
 	const ucl_object_t *blocked_map;
+	const ucl_object_t *ratelimit_whitelist_map;
 
 	GHashTable *master_flags;
 	guint keypair_cache_size;
@@ -148,6 +162,7 @@ struct rspamd_fuzzy_storage_ctx {
 	struct event peer_ev;
 	struct event stat_ev;
 	struct timeval stat_tv;
+
 	/* Local keypair */
 	struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
 	struct fuzzy_key *default_key;
@@ -160,11 +175,21 @@ struct rspamd_fuzzy_storage_ctx {
 	gchar *collection_id_file;
 	struct rspamd_keypair_cache *keypair_cache;
 	rspamd_lru_hash_t *errors_ips;
+	rspamd_lru_hash_t *ratelimit_buckets;
 	struct rspamd_fuzzy_backend *backend;
 	GArray *updates_pending;
 	guint updates_failed;
 	guint updates_maxfail;
 	guint32 collection_id;
+
+	/* Ratelimits */
+	guint leaky_bucket_ttl;
+	guint leaky_bucket_mask;
+	guint max_buckets;
+	gboolean ratelimit_log_only;
+	gdouble leaky_bucket_burst;
+	gdouble leaky_bucket_rate;
+
 	struct rspamd_worker *worker;
 	struct rspamd_http_connection_router *collection_rt;
 	const ucl_object_t *skip_map;
@@ -230,6 +255,105 @@ struct fuzzy_master_update_session {
 
 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
 
+static gboolean
+rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
+{
+	rspamd_inet_addr_t *masked;
+	struct rspamd_leaky_bucket_elt *elt;
+	struct timeval tv;
+	gdouble now;
+
+	if (session->ctx->ratelimit_whitelist != NULL) {
+		if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
+				session->addr) != NULL) {
+			return TRUE;
+		}
+	}
+
+	/*
+	if (rspamd_inet_address_is_local (session->addr, TRUE)) {
+		return TRUE;
+	}
+	*/
+
+	masked = rspamd_inet_address_copy (session->addr);
+
+	if (rspamd_inet_address_get_af (masked) == AF_INET) {
+		rspamd_inet_address_apply_mask (masked,
+				MIN (session->ctx->leaky_bucket_mask, 32));
+	}
+	else {
+		/* Must be at least /64 */
+		rspamd_inet_address_apply_mask (masked,
+				MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
+	}
+
+#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
+	event_base_gettimeofday_cached (session->ctx->ev_base, &tv);
+#else
+	gettimeofday (&tv, NULL);
+#endif
+
+	now = tv_to_double (&tv);
+	elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
+			tv.tv_sec);
+
+	if (elt) {
+		gboolean ratelimited = FALSE;
+
+		if (isnan (elt->cur)) {
+			/* Ratelimit exceeded, preserve it for the whole ttl */
+			ratelimited = TRUE;
+		}
+		else {
+			/* Update bucket */
+			if (elt->last < now) {
+				elt->cur -= session->ctx->leaky_bucket_rate * (now - elt->last);
+				elt->last = now;
+
+				if (elt->cur < 0) {
+					elt->cur = 0;
+				}
+			}
+			else {
+				elt->last = now;
+			}
+
+			/* Check bucket */
+			if (elt->cur >= session->ctx->leaky_bucket_burst) {
+
+				msg_info ("ratelimiting %s (%s), %.1f max elts",
+						rspamd_inet_address_to_string (session->addr),
+						rspamd_inet_address_to_string (masked),
+						session->ctx->leaky_bucket_burst);
+				elt->cur = NAN;
+			}
+			else {
+				elt->cur ++; /* Allow one more request */
+			}
+		}
+
+		rspamd_inet_address_free (masked);
+
+		return !ratelimited;
+	}
+	else {
+		/* New bucket */
+		elt = g_malloc (sizeof (*elt));
+		elt->addr = masked; /* transfer ownership */
+		elt->cur = 1;
+		elt->last = now;
+
+		rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
+				masked,
+				elt,
+				tv.tv_sec,
+				session->ctx->leaky_bucket_ttl);
+	}
+
+	return TRUE;
+}
+
 static gboolean
 rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write)
 {
@@ -259,6 +383,15 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write)
 	}
 
 	/* Non write */
+	if (session->ctx->ratelimit_buckets) {
+		if (session->ctx->ratelimit_log_only) {
+			(void)rspamd_fuzzy_check_ratelimit (session); /* Check but ignore */
+		}
+		else {
+			return rspamd_fuzzy_check_ratelimit (session);
+		}
+	}
+
 	return TRUE;
 }
 
@@ -299,6 +432,15 @@ struct fuzzy_slave_connection {
 	gint sock;
 };
 
+static void
+fuzzy_rl_bucket_free (gpointer p)
+{
+	struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *)p;
+
+	rspamd_inet_address_free (elt->addr);
+	g_free (elt);
+}
+
 static void
 fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
 {
@@ -2492,6 +2634,11 @@ init_fuzzy (struct rspamd_config *cfg)
 			(rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors);
 	ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
 	ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id";
+	ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK;
+	ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL;
+	ctx->max_buckets = DEFAULT_MAX_BUCKETS;
+	ctx->leaky_bucket_burst = NAN;
+	ctx->leaky_bucket_rate = NAN;
 
 	rspamd_rcl_register_worker_option (cfg,
 			type,
@@ -2682,6 +2829,65 @@ init_fuzzy (struct rspamd_config *cfg)
 			0,
 			"Skip specific hashes from the map");
 
+	/* Ratelimits */
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_whitelist",
+			rspamd_rcl_parse_struct_ucl,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_whitelist_map),
+			0,
+			"Skip specific addresses from rate limiting");
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_max_buckets",
+			rspamd_rcl_parse_struct_integer,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_buckets),
+			RSPAMD_CL_FLAG_UINT,
+			"Maximum number of leaky buckets (default: " G_STRINGIFY(DEFAULT_MAX_BUCKETS) ")");
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_network_mask",
+			rspamd_rcl_parse_struct_integer,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_mask),
+			RSPAMD_CL_FLAG_UINT,
+			"Network mask to apply for IPv4 rate addresses (default: " G_STRINGIFY(DEFAULT_BUCKET_MASK) ")");
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_bucket_ttl",
+			rspamd_rcl_parse_struct_time,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_ttl),
+			RSPAMD_CL_FLAG_TIME_INTEGER,
+			"Time to live for ratelimit element (default: " G_STRINGIFY(DEFAULT_BUCKET_TTL) ")");
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_rate",
+			rspamd_rcl_parse_struct_double,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_rate),
+			0,
+			"Leak rate in requests per second");
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_burst",
+			rspamd_rcl_parse_struct_double,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_burst),
+			0,
+			"Peak value for ratelimit bucket");
+	rspamd_rcl_register_worker_option (cfg,
+			type,
+			"ratelimit_log_only",
+			rspamd_rcl_parse_struct_boolean,
+			ctx,
+			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_log_only),
+			0,
+			"Don't really ban on ratelimit reaching, just log");
+
+
 	return ctx;
 }
 
@@ -2959,6 +3165,20 @@ start_fuzzy (struct rspamd_worker *worker)
 				&ctx->blocked_ips, NULL);
 	}
 
+	/* Create radix trees */
+	if (ctx->ratelimit_whitelist_map != NULL) {
+		rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->ratelimit_whitelist_map,
+				"Skip ratelimits from specific ip addresses/networks",
+				&ctx->ratelimit_whitelist, NULL);
+	}
+
+	/* Ratelimits */
+	if (!isnan (ctx->leaky_bucket_rate) && !isnan (ctx->leaky_bucket_burst)) {
+		ctx->ratelimit_buckets = rspamd_lru_hash_new_full (ctx->max_buckets,
+				NULL, fuzzy_rl_bucket_free,
+				rspamd_inet_address_hash, rspamd_inet_address_equal);
+	}
+
 	/* Maps events */
 	ctx->resolver = dns_resolver_init (worker->srv->logger,
 				ctx->ev_base,


More information about the Commits mailing list