commit 7984ea1: [Feature] Fuzzy_storage: add preliminary support of rate limits
Vsevolod Stakhov
vsevolod at highsecure.ru
Thu Dec 27 18:28:04 UTC 2018
Author: Vsevolod Stakhov
Date: 2018-12-08 14:44:52 +0000
URL: https://github.com/rspamd/rspamd/commit/7984ea11efc5318ef4f587726cdc7c0886d9f5a5
[Feature] Fuzzy_storage: add preliminary support of rate limits
---
src/fuzzy_storage.c | 222 +++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 221 insertions(+), 1 deletion(-)
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index a3486635f..7fa821e9b 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -17,8 +17,8 @@
* Rspamd fuzzy storage server
*/
-#include <src/libserver/fuzzy_wire.h>
#include "config.h"
+#include "libserver/fuzzy_wire.h"
#include "util.h"
#include "rspamd.h"
#include "map.h"
@@ -37,14 +37,20 @@
#include "libutil/map_private.h"
#include "libutil/hash.h"
#include "libutil/http_private.h"
+#include "libutil/hash.h"
#include "unix-std.h"
+#include <math.h>
+
/* Resync value in seconds */
#define DEFAULT_SYNC_TIMEOUT 60.0
#define DEFAULT_KEYPAIR_CACHE_SIZE 512
#define DEFAULT_MASTER_TIMEOUT 10.0
#define DEFAULT_UPDATES_MAXFAIL 3
#define COOKIE_SIZE 128
+#define DEFAULT_MAX_BUCKETS 2000
+#define DEFAULT_BUCKET_TTL 3600
+#define DEFAULT_BUCKET_MASK 24
static const gchar *local_db_name = "local";
@@ -115,6 +121,12 @@ struct rspamd_fuzzy_mirror {
struct rspamd_cryptobox_pubkey *key;
};
+struct rspamd_leaky_bucket_elt {
+ rspamd_inet_addr_t *addr;
+ gdouble last;
+ gdouble cur;
+};
+
static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
struct rspamd_fuzzy_storage_ctx {
@@ -132,6 +144,7 @@ struct rspamd_fuzzy_storage_ctx {
struct rspamd_radix_map_helper *update_ips;
struct rspamd_radix_map_helper *master_ips;
struct rspamd_radix_map_helper *blocked_ips;
+ struct rspamd_radix_map_helper *ratelimit_whitelist;
struct rspamd_cryptobox_keypair *sync_keypair;
struct rspamd_cryptobox_pubkey *master_key;
@@ -141,6 +154,7 @@ struct rspamd_fuzzy_storage_ctx {
const ucl_object_t *update_map;
const ucl_object_t *masters_map;
const ucl_object_t *blocked_map;
+ const ucl_object_t *ratelimit_whitelist_map;
GHashTable *master_flags;
guint keypair_cache_size;
@@ -148,6 +162,7 @@ struct rspamd_fuzzy_storage_ctx {
struct event peer_ev;
struct event stat_ev;
struct timeval stat_tv;
+
/* Local keypair */
struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
struct fuzzy_key *default_key;
@@ -160,11 +175,21 @@ struct rspamd_fuzzy_storage_ctx {
gchar *collection_id_file;
struct rspamd_keypair_cache *keypair_cache;
rspamd_lru_hash_t *errors_ips;
+ rspamd_lru_hash_t *ratelimit_buckets;
struct rspamd_fuzzy_backend *backend;
GArray *updates_pending;
guint updates_failed;
guint updates_maxfail;
guint32 collection_id;
+
+ /* Ratelimits */
+ guint leaky_bucket_ttl;
+ guint leaky_bucket_mask;
+ guint max_buckets;
+ gboolean ratelimit_log_only;
+ gdouble leaky_bucket_burst;
+ gdouble leaky_bucket_rate;
+
struct rspamd_worker *worker;
struct rspamd_http_connection_router *collection_rt;
const ucl_object_t *skip_map;
@@ -230,6 +255,105 @@ struct fuzzy_master_update_session {
static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
+static gboolean
+rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
+{
+ rspamd_inet_addr_t *masked;
+ struct rspamd_leaky_bucket_elt *elt;
+ struct timeval tv;
+ gdouble now;
+
+ if (session->ctx->ratelimit_whitelist != NULL) {
+ if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
+ session->addr) != NULL) {
+ return TRUE;
+ }
+ }
+
+ /*
+ if (rspamd_inet_address_is_local (session->addr, TRUE)) {
+ return TRUE;
+ }
+ */
+
+ masked = rspamd_inet_address_copy (session->addr);
+
+ if (rspamd_inet_address_get_af (masked) == AF_INET) {
+ rspamd_inet_address_apply_mask (masked,
+ MIN (session->ctx->leaky_bucket_mask, 32));
+ }
+ else {
+ /* Must be at least /64 */
+ rspamd_inet_address_apply_mask (masked,
+ MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
+ }
+
+#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
+ event_base_gettimeofday_cached (session->ctx->ev_base, &tv);
+#else
+ gettimeofday (&tv, NULL);
+#endif
+
+ now = tv_to_double (&tv);
+ elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
+ tv.tv_sec);
+
+ if (elt) {
+ gboolean ratelimited = FALSE;
+
+ if (isnan (elt->cur)) {
+ /* Ratelimit exceeded, preserve it for the whole ttl */
+ ratelimited = TRUE;
+ }
+ else {
+ /* Update bucket */
+ if (elt->last < now) {
+ elt->cur -= session->ctx->leaky_bucket_rate * (now - elt->last);
+ elt->last = now;
+
+ if (elt->cur < 0) {
+ elt->cur = 0;
+ }
+ }
+ else {
+ elt->last = now;
+ }
+
+ /* Check bucket */
+ if (elt->cur >= session->ctx->leaky_bucket_burst) {
+
+ msg_info ("ratelimiting %s (%s), %.1f max elts",
+ rspamd_inet_address_to_string (session->addr),
+ rspamd_inet_address_to_string (masked),
+ session->ctx->leaky_bucket_burst);
+ elt->cur = NAN;
+ }
+ else {
+ elt->cur ++; /* Allow one more request */
+ }
+ }
+
+ rspamd_inet_address_free (masked);
+
+ return !ratelimited;
+ }
+ else {
+ /* New bucket */
+ elt = g_malloc (sizeof (*elt));
+ elt->addr = masked; /* transfer ownership */
+ elt->cur = 1;
+ elt->last = now;
+
+ rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
+ masked,
+ elt,
+ tv.tv_sec,
+ session->ctx->leaky_bucket_ttl);
+ }
+
+ return TRUE;
+}
+
static gboolean
rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write)
{
@@ -259,6 +383,15 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write)
}
/* Non write */
+ if (session->ctx->ratelimit_buckets) {
+ if (session->ctx->ratelimit_log_only) {
+ (void)rspamd_fuzzy_check_ratelimit (session); /* Check but ignore */
+ }
+ else {
+ return rspamd_fuzzy_check_ratelimit (session);
+ }
+ }
+
return TRUE;
}
@@ -299,6 +432,15 @@ struct fuzzy_slave_connection {
gint sock;
};
+static void
+fuzzy_rl_bucket_free (gpointer p)
+{
+ struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *)p;
+
+ rspamd_inet_address_free (elt->addr);
+ g_free (elt);
+}
+
static void
fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
{
@@ -2492,6 +2634,11 @@ init_fuzzy (struct rspamd_config *cfg)
(rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors);
ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id";
+ ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK;
+ ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL;
+ ctx->max_buckets = DEFAULT_MAX_BUCKETS;
+ ctx->leaky_bucket_burst = NAN;
+ ctx->leaky_bucket_rate = NAN;
rspamd_rcl_register_worker_option (cfg,
type,
@@ -2682,6 +2829,65 @@ init_fuzzy (struct rspamd_config *cfg)
0,
"Skip specific hashes from the map");
+ /* Ratelimits */
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_whitelist",
+ rspamd_rcl_parse_struct_ucl,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_whitelist_map),
+ 0,
+ "Skip specific addresses from rate limiting");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_max_buckets",
+ rspamd_rcl_parse_struct_integer,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_buckets),
+ RSPAMD_CL_FLAG_UINT,
+ "Maximum number of leaky buckets (default: " G_STRINGIFY(DEFAULT_MAX_BUCKETS) ")");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_network_mask",
+ rspamd_rcl_parse_struct_integer,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_mask),
+ RSPAMD_CL_FLAG_UINT,
+ "Network mask to apply for IPv4 rate addresses (default: " G_STRINGIFY(DEFAULT_BUCKET_MASK) ")");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_bucket_ttl",
+ rspamd_rcl_parse_struct_time,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_ttl),
+ RSPAMD_CL_FLAG_TIME_INTEGER,
+ "Time to live for ratelimit element (default: " G_STRINGIFY(DEFAULT_BUCKET_TTL) ")");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_rate",
+ rspamd_rcl_parse_struct_double,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_rate),
+ 0,
+ "Leak rate in requests per second");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_burst",
+ rspamd_rcl_parse_struct_double,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_burst),
+ 0,
+ "Peak value for ratelimit bucket");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "ratelimit_log_only",
+ rspamd_rcl_parse_struct_boolean,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_log_only),
+ 0,
+ "Don't really ban on ratelimit reaching, just log");
+
+
return ctx;
}
@@ -2959,6 +3165,20 @@ start_fuzzy (struct rspamd_worker *worker)
&ctx->blocked_ips, NULL);
}
+ /* Create radix trees */
+ if (ctx->ratelimit_whitelist_map != NULL) {
+ rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->ratelimit_whitelist_map,
+ "Skip ratelimits from specific ip addresses/networks",
+ &ctx->ratelimit_whitelist, NULL);
+ }
+
+ /* Ratelimits */
+ if (!isnan (ctx->leaky_bucket_rate) && !isnan (ctx->leaky_bucket_burst)) {
+ ctx->ratelimit_buckets = rspamd_lru_hash_new_full (ctx->max_buckets,
+ NULL, fuzzy_rl_bucket_free,
+ rspamd_inet_address_hash, rspamd_inet_address_equal);
+ }
+
/* Maps events */
ctx->resolver = dns_resolver_init (worker->srv->logger,
ctx->ev_base,
More information about the Commits
mailing list