commit f3d197f: [Feature] Upstreams: Set noresolve flag on numeric upstreams

Vsevolod Stakhov vsevolod at highsecure.ru
Fri Jul 26 17:21:05 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-07-26 18:16:07 +0100
URL: https://github.com/rspamd/rspamd/commit/f3d197ff3d67e5435882dd4c79f4e56ef03c0f5d

[Feature] Upstreams: Set noresolve flag on numeric upstreams

---
 src/libutil/upstream.c | 216 +++++++++++++++++++++++++++++--------------------
 1 file changed, 129 insertions(+), 87 deletions(-)

diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 4cd39f5ea..d1da9f7d3 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -21,6 +21,7 @@
 #include "rdns.h"
 #include "cryptobox.h"
 #include "utlist.h"
+#include "logger.h"
 
 #include <math.h>
 
@@ -53,6 +54,7 @@ struct upstream {
 	ev_timer ev;
 	gdouble last_fail;
 	gpointer ud;
+	enum rspamd_upstream_flag flags;
 	struct upstream_list *ls;
 	GList *ctx_pos;
 	struct upstream_ctx *ctx;
@@ -65,6 +67,7 @@ struct upstream {
 	struct upstream_inet_addr_entry *new_addrs;
 	rspamd_mutex_t *lock;
 	gpointer data;
+	gchar uid[8];
 	ref_entry_t ref;
 };
 
@@ -86,8 +89,8 @@ struct upstream_list {
 	rspamd_mutex_t *lock;
 	guint64 hash_seed;
 	struct upstream_limits limits;
-	guint cur_elt;
 	enum rspamd_upstream_flag flags;
+	guint cur_elt;
 	enum rspamd_upstream_rotation rot_alg;
 };
 
@@ -109,6 +112,13 @@ struct upstream_ctx {
 #define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x)
 #endif
 
+#define msg_debug_upstream(...)  rspamd_conditional_debug_fast (NULL, NULL, \
+        rspamd_upstream_log_id, "upstream", upstream->uid, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+
+INIT_LOG_MODULE(upstream)
+
 /* 4 errors in 10 seconds */
 static guint default_max_errors = 4;
 static gdouble default_revive_time = 60;
@@ -139,6 +149,9 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
 	if (cfg->upstream_revive_time) {
 		ctx->limits.revive_time = cfg->upstream_max_errors;
 	}
+	if (cfg->upstream_lazy_resolve_time) {
+		ctx->limits.lazy_resolve_time = cfg->upstream_lazy_resolve_time;
+	}
 	if (cfg->dns_retransmits) {
 		ctx->limits.dns_retransmits = cfg->dns_retransmits;
 	}
@@ -153,20 +166,20 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
 	/* Start lazy resolving */
 	if (event_loop && resolver) {
 		GList *cur;
-		struct upstream *u;
+		struct upstream *upstream;
 
 		cur = ctx->upstreams->head;
 
 		while (cur) {
-			u = cur->data;
-			if (!ev_is_active (&u->ev) && u->ls &&
-				!(u->ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
-				gdouble when = rspamd_time_jitter (u->ls->limits.lazy_resolve_time,
-						u->ls->limits.lazy_resolve_time * .1);
-				ev_timer_init (&u->ev, rspamd_upstream_lazy_resolve_cb,
+			upstream = cur->data;
+			if (!ev_is_active (&upstream->ev) && upstream->ls &&
+						!(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+				gdouble when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+						upstream->ls->limits.lazy_resolve_time * .1);
+				ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
 						when, 0);
-				u->ev.data = u;
-				ev_timer_start (ctx->event_loop, &u->ev);
+				upstream->ev.data = upstream;
+				ev_timer_start (ctx->event_loop, &upstream->ev);
 			}
 
 			cur = g_list_next (cur);
@@ -259,24 +272,27 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
 }
 
 static void
-rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
+rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
 {
 	RSPAMD_UPSTREAM_LOCK (ls->lock);
-	g_ptr_array_add (ls->alive, up);
-	up->active_idx = ls->alive->len - 1;
+	g_ptr_array_add (ls->alive, upstream);
+	upstream->active_idx = ls->alive->len - 1;
 
-	if (up->ctx && up->ctx->configured &&
-		!(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+	if (upstream->ctx && upstream->ctx->configured &&
+		!(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
 
-		if (ev_is_active (&up->ev)) {
-			ev_timer_stop (up->ctx->event_loop, &up->ev);
+		if (ev_is_active (&upstream->ev)) {
+			ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
 		}
 		/* Start lazy names resolution */
 		gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
 				ls->limits.lazy_resolve_time * 0.1);
-		ev_timer_init (&up->ev, rspamd_upstream_lazy_resolve_cb, when, 0);
-		up->ev.data = up;
-		ev_timer_start (up->ctx->event_loop, &up->ev);
+		ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
+				when, 0);
+		upstream->ev.data = upstream;
+		msg_debug_upstream ("start lazy resolving for %s in %.0f seconds",
+				upstream->name, when);
+		ev_timer_start (upstream->ctx->event_loop, &upstream->ev);
 	}
 
 	RSPAMD_UPSTREAM_UNLOCK (ls->lock);
@@ -294,7 +310,7 @@ rspamd_upstream_addr_elt_dtor (gpointer a)
 }
 
 static void
-rspamd_upstream_update_addrs (struct upstream *up)
+rspamd_upstream_update_addrs (struct upstream *upstream)
 {
 	guint addr_cnt, i, port;
 	gboolean seen_addr, reset_errors = FALSE;
@@ -306,33 +322,35 @@ rspamd_upstream_update_addrs (struct upstream *up)
 	 * We need first of all get the saved port, since DNS gives us no
 	 * idea about what port has been used previously
 	 */
-	RSPAMD_UPSTREAM_LOCK (up->lock);
+	RSPAMD_UPSTREAM_LOCK (upstream->lock);
 
-	if (up->addrs.addr->len > 0 && up->new_addrs) {
-		addr_elt = g_ptr_array_index (up->addrs.addr, 0);
+	if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
+		addr_elt = g_ptr_array_index (upstream->addrs.addr, 0);
 		port = rspamd_inet_address_get_port (addr_elt->addr);
 
 		/* Now calculate new addrs count */
 		addr_cnt = 0;
-		LL_FOREACH (up->new_addrs, cur) {
+		LL_FOREACH (upstream->new_addrs, cur) {
 			addr_cnt++;
 		}
 
 		/* At 10% probability reset errors on addr elements */
 		if (rspamd_random_double_fast () > 0.9) {
 			reset_errors = TRUE;
+			msg_debug_upstream ("reset errors on upstream %s",
+					upstream->name);
 		}
 
 		new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor);
 
 		/* Copy addrs back */
-		LL_FOREACH (up->new_addrs, cur) {
+		LL_FOREACH (upstream->new_addrs, cur) {
 			seen_addr = FALSE;
 			naddr = NULL;
 			/* Ports are problematic, set to compare in the next block */
 			rspamd_inet_address_set_port (cur->addr, port);
 
-			PTR_ARRAY_FOREACH (up->addrs.addr, i, addr_elt) {
+			PTR_ARRAY_FOREACH (upstream->addrs.addr, i, addr_elt) {
 				if (rspamd_inet_address_compare (addr_elt->addr, cur->addr, FALSE) == 0) {
 					naddr = g_malloc0 (sizeof (*naddr));
 					naddr->addr = cur->addr;
@@ -347,26 +365,34 @@ rspamd_upstream_update_addrs (struct upstream *up)
 				naddr = g_malloc0 (sizeof (*naddr));
 				naddr->addr = cur->addr;
 				naddr->errors = 0;
+				msg_debug_upstream ("new address for %s: %s",
+						upstream->name,
+						rspamd_inet_address_to_string_pretty (naddr->addr));
+			}
+			else {
+				msg_debug_upstream ("existing address for %s: %s",
+						upstream->name,
+						rspamd_inet_address_to_string_pretty (cur->addr));
 			}
 
 			g_ptr_array_add (new_addrs, naddr);
 		}
 
 		/* Free old addresses */
-		g_ptr_array_free (up->addrs.addr, TRUE);
+		g_ptr_array_free (upstream->addrs.addr, TRUE);
 
-		up->addrs.cur = 0;
-		up->addrs.addr = new_addrs;
-		g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
+		upstream->addrs.cur = 0;
+		upstream->addrs.addr = new_addrs;
+		g_ptr_array_sort (upstream->addrs.addr, rspamd_upstream_addr_sort_func);
 	}
 
-	LL_FOREACH_SAFE (up->new_addrs, cur, tmp) {
+	LL_FOREACH_SAFE (upstream->new_addrs, cur, tmp) {
 		/* Do not free inet address pointer since it has been transferred to up */
 		g_free (cur);
 	}
 
-	up->new_addrs = NULL;
-	RSPAMD_UPSTREAM_UNLOCK (up->lock);
+	upstream->new_addrs = NULL;
+	RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
 }
 
 static void
@@ -412,17 +438,19 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
 static void
 rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
 {
-	struct upstream *up = (struct upstream *)w->data;
+	struct upstream *upstream = (struct upstream *)w->data;
 
-	RSPAMD_UPSTREAM_LOCK (up->lock);
+	RSPAMD_UPSTREAM_LOCK (upstream->lock);
 	ev_timer_stop (loop, w);
 
-	if (up->ls) {
-		rspamd_upstream_set_active (up->ls, up);
+	msg_debug_upstream ("revive upstream %s", upstream->name);
+
+	if (upstream->ls) {
+		rspamd_upstream_set_active (upstream->ls, upstream);
 	}
 
-	RSPAMD_UPSTREAM_UNLOCK (up->lock);
-	REF_RELEASE (up);
+	RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+	REF_RELEASE (upstream);
 }
 
 static void
@@ -432,7 +460,7 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
 	if (up->ctx->res != NULL &&
 			up->ctx->configured &&
 			up->dns_requests == 0 &&
-			!(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+			!(up->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
 		/* Resolve name of the upstream one more time */
 		if (up->name[0] != '/') {
 
@@ -473,7 +501,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
 }
 
 static void
-rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstream)
 {
 	gdouble ntim;
 	guint i;
@@ -481,8 +509,8 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 	struct upstream_list_watcher *w;
 
 	RSPAMD_UPSTREAM_LOCK (ls->lock);
-	g_ptr_array_remove_index (ls->alive, up->active_idx);
-	up->active_idx = -1;
+	g_ptr_array_remove_index (ls->alive, upstream->active_idx);
+	upstream->active_idx = -1;
 
 	/* We need to update all indicies */
 	for (i = 0; i < ls->alive->len; i ++) {
@@ -490,28 +518,30 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 		cur->active_idx = i;
 	}
 
-	if (up->ctx) {
-		rspamd_upstream_resolve_addrs (ls, up);
+	if (upstream->ctx) {
+		rspamd_upstream_resolve_addrs (ls, upstream);
 
-		REF_RETAIN (up);
+		REF_RETAIN (upstream);
 		ntim = rspamd_time_jitter (ls->limits.revive_time,
 				ls->limits.revive_jitter);
 
-		if (ev_is_active (&up->ev)) {
-			ev_timer_stop (up->ctx->event_loop, &up->ev);
+		if (ev_is_active (&upstream->ev)) {
+			ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
 		}
 
-		ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0);
-		up->ev.data = up;
+		msg_debug_upstream ("mark upstream %s inactive; revive in %.0f seconds",
+				upstream->name, ntim);
+		ev_timer_init (&upstream->ev, rspamd_upstream_revive_cb, ntim, 0);
+		upstream->ev.data = upstream;
 
-		if (up->ctx->event_loop != NULL && up->ctx->configured) {
-			ev_timer_start (up->ctx->event_loop, &up->ev);
+		if (upstream->ctx->event_loop != NULL && upstream->ctx->configured) {
+			ev_timer_start (upstream->ctx->event_loop, &upstream->ev);
 		}
 	}
 
-	DL_FOREACH (up->ls->watchers, w) {
+	DL_FOREACH (upstream->ls->watchers, w) {
 		if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
-			w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud);
+			w->func (upstream, RSPAMD_UPSTREAM_WATCH_OFFLINE, upstream->errors, w->ud);
 		}
 	}
 
@@ -740,32 +770,30 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
 		guint16 def_port, enum rspamd_upstream_parse_type parse_type,
 		void *data)
 {
-	struct upstream *up;
+	struct upstream *upstream;
 	GPtrArray *addrs = NULL;
 	guint i;
 	rspamd_inet_addr_t *addr;
-	gboolean ret = FALSE;
+	enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
 
-	up = g_malloc0 (sizeof (*up));
+	upstream = g_malloc0 (sizeof (*upstream));
 
 	switch (parse_type) {
 	case RSPAMD_UPSTREAM_PARSE_DEFAULT:
 		ret = rspamd_parse_host_port_priority (str, &addrs,
-				&up->weight,
-				&up->name, def_port, ups->ctx ? ups->ctx->pool : NULL);
+				&upstream->weight,
+				&upstream->name, def_port,
+				ups->ctx ? ups->ctx->pool : NULL);
 		break;
 	case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
 		addrs = g_ptr_array_sized_new (1);
-		ret = rspamd_parse_inet_address (&addr, str, strlen (str));
-
-		if (ups->ctx) {
-			up->name = rspamd_mempool_strdup (ups->ctx->pool, str);
-		}
-		else {
-			up->name = g_strdup (str);
-		}
-
-		if (ret) {
+		if (rspamd_parse_inet_address (&addr, str, strlen (str))) {
+			if (ups->ctx) {
+				upstream->name = rspamd_mempool_strdup (ups->ctx->pool, str);
+			}
+			else {
+				upstream->name = g_strdup (str);
+			}
 			if (rspamd_inet_address_get_port (addr) == 0) {
 				rspamd_inet_address_set_port (addr, def_port);
 			}
@@ -788,41 +816,55 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
 		break;
 	}
 
-	if (!ret) {
-		g_free (up);
+	if (ret == RSPAMD_PARSE_ADDR_FAIL) {
+		g_free (upstream);
 		return FALSE;
 	}
 	else {
+		upstream->flags = ups->flags;
+
+		if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
+			/* Add noresolve flag */
+			upstream->flags |= RSPAMD_UPSTREAM_FLAG_NORESOLVE;
+		}
 		for (i = 0; i < addrs->len; i ++) {
 			addr = g_ptr_array_index (addrs, i);
-			rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr));
+			rspamd_upstream_add_addr (upstream, rspamd_inet_address_copy (addr));
 		}
 	}
 
-	if (up->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) {
+	if (upstream->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) {
 		/* Special heuristic for master-slave rotation */
 		if (ups->ups->len == 0) {
 			/* Prioritize the first */
-			up->weight = 1;
+			upstream->weight = 1;
 		}
 	}
 
-	g_ptr_array_add (ups->ups, up);
-	up->ud = data;
-	up->cur_weight = up->weight;
-	up->ls = ups;
-	REF_INIT_RETAIN (up, rspamd_upstream_dtor);
-	up->lock = rspamd_mutex_new ();
-	up->ctx = ups->ctx;
+	g_ptr_array_add (ups->ups, upstream);
+	upstream->ud = data;
+	upstream->cur_weight = upstream->weight;
+	upstream->ls = ups;
+	REF_INIT_RETAIN (upstream, rspamd_upstream_dtor);
+	upstream->lock = rspamd_mutex_new ();
+	upstream->ctx = ups->ctx;
 
-	if (up->ctx) {
+	if (upstream->ctx) {
 		REF_RETAIN (ups->ctx);
-		g_queue_push_tail (ups->ctx->upstreams, up);
-		up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
+		g_queue_push_tail (ups->ctx->upstreams, upstream);
+		upstream->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
 	}
 
-	g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
-	rspamd_upstream_set_active (ups, up);
+	guint h = rspamd_cryptobox_fast_hash (upstream->name,
+			strlen (upstream->name), 0);
+	memset (upstream->uid, 0, sizeof (upstream->uid));
+	rspamd_encode_base32_buf ((const guchar *)&h, sizeof (h),
+			upstream->uid, sizeof (upstream->uid) - 1);
+
+	msg_debug_upstream ("added upstream %s (%s)", upstream->name,
+			upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name");
+	g_ptr_array_sort (upstream->addrs.addr, rspamd_upstream_addr_sort_func);
+	rspamd_upstream_set_active (ups, upstream);
 
 	return TRUE;
 }


More information about the Commits mailing list