commit b3eb4d1: [Project] Start SRV upstreams implementation

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Oct 1 17:00:06 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-10-01 17:40:26 +0100
URL: https://github.com/rspamd/rspamd/commit/b3eb4d1800eee3527772f26201f6a8a6a3a56022

[Project] Start SRV upstreams implementation

---
 src/libutil/map.c      |   4 +-
 src/libutil/upstream.c | 251 ++++++++++++++++++++++++++++++++++++++++++++-----
 src/libutil/upstream.h |   1 +
 3 files changed, 229 insertions(+), 27 deletions(-)

diff --git a/src/libutil/map.c b/src/libutil/map.c
index 4f0e2354c..42134921a 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -970,7 +970,9 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
 	}
 
 	if (periodic->locked) {
-		rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE);
+		if (!periodic->map->wrk->wanna_die) {
+			rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE);
+		}
 		g_atomic_int_set (periodic->map->locked, 0);
 		msg_debug_map ("unlocked map");
 	}
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index e2bfd94d9..22e5200c7 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -24,6 +24,7 @@
 #include "logger.h"
 
 #include <math.h>
+#include <contrib/librdns/rdns.h>
 
 struct upstream_inet_addr_entry {
 	rspamd_inet_addr_t *addr;
@@ -32,6 +33,7 @@ struct upstream_inet_addr_entry {
 
 struct upstream_addr_elt {
 	rspamd_inet_addr_t *addr;
+	guint priority;
 	guint errors;
 };
 
@@ -174,8 +176,17 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
 			upstream = cur->data;
 			if (!ev_can_stop (&upstream->ev) && upstream->ls &&
 						!(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
-				gdouble when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
-						upstream->ls->limits.lazy_resolve_time * .1);
+				gdouble when;
+
+				if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+					/* Resolve them immediately ! */
+					when = 0.0;
+				}
+				else {
+					when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+							upstream->ls->limits.lazy_resolve_time * .1);
+				}
+
 				ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
 						when, 0);
 				upstream->ev.data = upstream;
@@ -261,13 +272,20 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
 static gint
 rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
 {
-	const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
-			**ip2 = (const struct upstream_addr_elt **)b;
+	const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **)a,
+			*ip2 = *(const struct upstream_addr_elt **)b;
 	gint w1, w2;
 
-	w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
-	w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
+	if (ip1->priority == 0 && ip2->priority == 0) {
+		w1 = rspamd_upstream_af_to_weight (ip1->addr);
+		w2 = rspamd_upstream_af_to_weight (ip2->addr);
+	}
+	else {
+		w1 = ip1->priority;
+		w2 = ip2->priority;
+	}
 
+	/* Inverse order */
 	return w2 - w1;
 }
 
@@ -284,9 +302,18 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
 		if (ev_can_stop (&upstream->ev)) {
 			ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
 		}
-		/* Start lazy names resolution */
-		gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
-				ls->limits.lazy_resolve_time * 0.1);
+
+		/* Start lazy (or not so lazy) names resolution */
+		gdouble when;
+
+		if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+			/* Resolve them immediately ! */
+			when = 0.0;
+		}
+		else {
+			when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+					upstream->ls->limits.lazy_resolve_time * .1);
+		}
 		ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
 				when, 0);
 		upstream->ev.data = upstream;
@@ -435,6 +462,122 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
 	REF_RELEASE (up);
 }
 
+struct rspamd_upstream_srv_dns_cb {
+	struct upstream *up;
+	guint ttl;
+	guint priority;
+	guint port;
+	guint requests_inflight;
+};
+
+/* Used when we have resolved SRV record and resolved addrs */
+static void
+rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg)
+{
+	struct rspamd_upstream_srv_dns_cb *cbdata =
+			(struct rspamd_upstream_srv_dns_cb *)arg;
+	struct upstream *up;
+	struct rdns_reply_entry *entry;
+	struct upstream_inet_addr_entry *up_ent;
+
+	up = cbdata->up;
+
+	if (reply->code == RDNS_RC_NOERROR) {
+		entry = reply->entries;
+
+		RSPAMD_UPSTREAM_LOCK (up->lock);
+		while (entry) {
+
+			if (entry->type == RDNS_REQUEST_A) {
+				up_ent = g_malloc0 (sizeof (*up_ent));
+				up_ent->addr = rspamd_inet_address_new (AF_INET,
+						&entry->content.a.addr);
+				LL_PREPEND (up->new_addrs, up_ent);
+			}
+			else if (entry->type == RDNS_REQUEST_AAAA) {
+				up_ent = g_malloc0 (sizeof (*up_ent));
+				up_ent->addr = rspamd_inet_address_new (AF_INET6,
+						&entry->content.aaa.addr);
+				LL_PREPEND (up->new_addrs, up_ent);
+			}
+			entry = entry->next;
+		}
+
+		RSPAMD_UPSTREAM_UNLOCK (up->lock);
+	}
+
+	up->dns_requests--;
+	cbdata->requests_inflight --;
+
+	if (cbdata->requests_inflight == 0) {
+		g_free (cbdata);
+	}
+
+	if (up->dns_requests == 0) {
+		rspamd_upstream_update_addrs (up);
+	}
+
+	REF_RELEASE (up);
+}
+
+static void
+rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg)
+{
+	struct upstream *upstream = (struct upstream *) arg;
+	struct rdns_reply_entry *entry;
+	struct rspamd_upstream_srv_dns_cb *ncbdata;
+
+	if (reply->code == RDNS_RC_NOERROR) {
+		entry = reply->entries;
+
+		RSPAMD_UPSTREAM_LOCK (upstream->lock);
+		while (entry) {
+			/* XXX: we ignore weight as it contradicts with upstreams logic */
+			if (entry->type == RDNS_REQUEST_SRV) {
+				msg_debug_upstream ("got srv reply for %s: %s "
+						"(weight=%d, priority=%d, port=%d)",
+						upstream->name, entry->content.srv.target,
+						entry->content.srv.weight, entry->content.srv.priority,
+						entry->content.srv.port);
+				ncbdata = g_malloc0 (sizeof (*ncbdata));
+				ncbdata->priority = entry->content.srv.weight;
+				ncbdata->port = entry->content.srv.port;
+				ncbdata->ttl = entry->ttl;
+
+				if (rdns_make_request_full (upstream->ctx->res,
+						rspamd_upstream_dns_srv_phase2_cb, ncbdata,
+						upstream->ls->limits.dns_timeout,
+						upstream->ls->limits.dns_retransmits,
+						1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
+					upstream->dns_requests++;
+					REF_RETAIN (upstream);
+					ncbdata->requests_inflight ++;
+				}
+
+				if (rdns_make_request_full (upstream->ctx->res,
+						rspamd_upstream_dns_srv_phase2_cb, ncbdata,
+						upstream->ls->limits.dns_timeout,
+						upstream->ls->limits.dns_retransmits,
+						1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
+					upstream->dns_requests++;
+					REF_RETAIN (upstream);
+					ncbdata->requests_inflight ++;
+				}
+
+				if (ncbdata->requests_inflight == 0) {
+					g_free (ncbdata);
+				}
+			}
+			entry = entry->next;
+		}
+
+		RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+	}
+
+	upstream->dns_requests--;
+	REF_RELEASE (upstream);
+}
+
 static void
 rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
 {
@@ -464,19 +607,31 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
 			!(up->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
 		/* Resolve name of the upstream one more time */
 		if (up->name[0] != '/') {
-
-			if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
-					ls->limits.dns_timeout, ls->limits.dns_retransmits,
-					1, up->name, RDNS_REQUEST_A) != NULL) {
-				up->dns_requests ++;
-				REF_RETAIN (up);
+			if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+				if (rdns_make_request_full (up->ctx->res,
+						rspamd_upstream_dns_srv_cb, up,
+						ls->limits.dns_timeout, ls->limits.dns_retransmits,
+						1, up->name, RDNS_REQUEST_SRV) != NULL) {
+					up->dns_requests++;
+					REF_RETAIN (up);
+				}
 			}
+			else {
+				if (rdns_make_request_full (up->ctx->res,
+						rspamd_upstream_dns_cb, up,
+						ls->limits.dns_timeout, ls->limits.dns_retransmits,
+						1, up->name, RDNS_REQUEST_A) != NULL) {
+					up->dns_requests++;
+					REF_RETAIN (up);
+				}
 
-			if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
-					ls->limits.dns_timeout, ls->limits.dns_retransmits,
-					1, up->name, RDNS_REQUEST_AAAA) != NULL) {
-				up->dns_requests ++;
-				REF_RETAIN (up);
+				if (rdns_make_request_full (up->ctx->res,
+						rspamd_upstream_dns_cb, up,
+						ls->limits.dns_timeout, ls->limits.dns_retransmits,
+						1, up->name, RDNS_REQUEST_AAAA) != NULL) {
+					up->dns_requests++;
+					REF_RETAIN (up);
+				}
 			}
 		}
 	}
@@ -491,6 +646,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
 	ev_timer_stop (loop, w);
 
 	if (up->ls) {
+
 		rspamd_upstream_resolve_addrs (up->ls, up);
 
 		w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time,
@@ -773,18 +929,61 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
 {
 	struct upstream *upstream;
 	GPtrArray *addrs = NULL;
-	guint i;
+	guint i, slen;
 	rspamd_inet_addr_t *addr;
 	enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
 
 	upstream = g_malloc0 (sizeof (*upstream));
+	slen = strlen (str);
 
 	switch (parse_type) {
 	case RSPAMD_UPSTREAM_PARSE_DEFAULT:
-		ret = rspamd_parse_host_port_priority (str, &addrs,
-				&upstream->weight,
-				&upstream->name, def_port,
-				ups->ctx ? ups->ctx->pool : NULL);
+		if (slen > sizeof ("service=") &&
+			RSPAMD_LEN_CHECK_STARTS_WITH (str, slen, "service=")) {
+			const gchar *plus_pos, *service_pos, *semicolon_pos;
+
+			/* Accept service=srv_name+hostname[:priority] */
+			service_pos = str + sizeof ("service=") - 1;
+			plus_pos = strchr (service_pos, '+');
+
+			if (plus_pos != NULL) {
+				semicolon_pos = strchr (plus_pos + 1, ':');
+
+				if (semicolon_pos) {
+					upstream->weight = strtoul (semicolon_pos + 1, NULL, 10);
+				}
+				else {
+					semicolon_pos = plus_pos + strlen (plus_pos);
+				}
+
+				/*
+				 * Now our name is _service._tcp.<domain>
+				 * where <domain> is string between semicolon_pos and plus_pos +1
+				 * while service is a string between service_pos and plus_pos
+				 */
+				guint namelen = (semicolon_pos - (plus_pos + 1)) +
+						(plus_pos - service_pos) +
+						(sizeof ("tcp") - 1) +
+						4;
+				addrs = g_ptr_array_sized_new (1);
+				upstream->name = ups->ctx ?
+						rspamd_mempool_alloc (ups->ctx->pool, namelen + 1) :
+						g_malloc (namelen + 1);
+
+				rspamd_snprintf (upstream->name, namelen + 1,
+						"_%*s._tcp.%*s",
+						(gint)(plus_pos - service_pos), service_pos,
+						(gint)(semicolon_pos - (plus_pos + 1)), plus_pos + 1);
+				upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
+				ret = RSPAMD_PARSE_ADDR_RESOLVED;
+			}
+		}
+		else {
+			ret = rspamd_parse_host_port_priority (str, &addrs,
+					&upstream->weight,
+					&upstream->name, def_port,
+					ups->ctx ? ups->ctx->pool : NULL);
+		}
 		break;
 	case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
 		addrs = g_ptr_array_sized_new (1);
@@ -824,7 +1023,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
 		return FALSE;
 	}
 	else {
-		upstream->flags = ups->flags;
+		upstream->flags |= ups->flags;
 
 		if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
 			/* Add noresolve flag */
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index 04ec6d984..5d3e00514 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -21,6 +21,7 @@ enum rspamd_upstream_rotation {
 
 enum rspamd_upstream_flag {
 	RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0),
+	RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1),
 };
 
 struct rspamd_config;


More information about the Commits mailing list