commit b3eb4d1: [Project] Start SRV upstreams implementation
Vsevolod Stakhov
vsevolod at highsecure.ru
Tue Oct 1 17:00:06 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-10-01 17:40:26 +0100
URL: https://github.com/rspamd/rspamd/commit/b3eb4d1800eee3527772f26201f6a8a6a3a56022
[Project] Start SRV upstreams implementation
---
src/libutil/map.c | 4 +-
src/libutil/upstream.c | 251 ++++++++++++++++++++++++++++++++++++++++++++-----
src/libutil/upstream.h | 1 +
3 files changed, 229 insertions(+), 27 deletions(-)
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 4f0e2354c..42134921a 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -970,7 +970,9 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
}
if (periodic->locked) {
- rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE);
+ if (!periodic->map->wrk->wanna_die) {
+ rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE);
+ }
g_atomic_int_set (periodic->map->locked, 0);
msg_debug_map ("unlocked map");
}
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index e2bfd94d9..22e5200c7 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -24,6 +24,7 @@
#include "logger.h"
#include <math.h>
+#include <contrib/librdns/rdns.h>
struct upstream_inet_addr_entry {
rspamd_inet_addr_t *addr;
@@ -32,6 +33,7 @@ struct upstream_inet_addr_entry {
struct upstream_addr_elt {
rspamd_inet_addr_t *addr;
+ guint priority;
guint errors;
};
@@ -174,8 +176,17 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
upstream = cur->data;
if (!ev_can_stop (&upstream->ev) && upstream->ls &&
!(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
- gdouble when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
- upstream->ls->limits.lazy_resolve_time * .1);
+ gdouble when;
+
+ if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ /* Resolve them immediately ! */
+ when = 0.0;
+ }
+ else {
+ when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+ upstream->ls->limits.lazy_resolve_time * .1);
+ }
+
ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
when, 0);
upstream->ev.data = upstream;
@@ -261,13 +272,20 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
static gint
rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
{
- const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
- **ip2 = (const struct upstream_addr_elt **)b;
+ const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **)a,
+ *ip2 = *(const struct upstream_addr_elt **)b;
gint w1, w2;
- w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
- w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
+ if (ip1->priority == 0 && ip2->priority == 0) {
+ w1 = rspamd_upstream_af_to_weight (ip1->addr);
+ w2 = rspamd_upstream_af_to_weight (ip2->addr);
+ }
+ else {
+ w1 = ip1->priority;
+ w2 = ip2->priority;
+ }
+ /* Inverse order */
return w2 - w1;
}
@@ -284,9 +302,18 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
if (ev_can_stop (&upstream->ev)) {
ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
}
- /* Start lazy names resolution */
- gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
- ls->limits.lazy_resolve_time * 0.1);
+
+ /* Start lazy (or not so lazy) names resolution */
+ gdouble when;
+
+ if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ /* Resolve them immediately ! */
+ when = 0.0;
+ }
+ else {
+ when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+ upstream->ls->limits.lazy_resolve_time * .1);
+ }
ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
when, 0);
upstream->ev.data = upstream;
@@ -435,6 +462,122 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
REF_RELEASE (up);
}
+struct rspamd_upstream_srv_dns_cb {
+ struct upstream *up;
+ guint ttl;
+ guint priority;
+ guint port;
+ guint requests_inflight;
+};
+
+/* Used when we have resolved SRV record and resolved addrs */
+static void
+rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg)
+{
+ struct rspamd_upstream_srv_dns_cb *cbdata =
+ (struct rspamd_upstream_srv_dns_cb *)arg;
+ struct upstream *up;
+ struct rdns_reply_entry *entry;
+ struct upstream_inet_addr_entry *up_ent;
+
+ up = cbdata->up;
+
+ if (reply->code == RDNS_RC_NOERROR) {
+ entry = reply->entries;
+
+ RSPAMD_UPSTREAM_LOCK (up->lock);
+ while (entry) {
+
+ if (entry->type == RDNS_REQUEST_A) {
+ up_ent = g_malloc0 (sizeof (*up_ent));
+ up_ent->addr = rspamd_inet_address_new (AF_INET,
+ &entry->content.a.addr);
+ LL_PREPEND (up->new_addrs, up_ent);
+ }
+ else if (entry->type == RDNS_REQUEST_AAAA) {
+ up_ent = g_malloc0 (sizeof (*up_ent));
+ up_ent->addr = rspamd_inet_address_new (AF_INET6,
+ &entry->content.aaa.addr);
+ LL_PREPEND (up->new_addrs, up_ent);
+ }
+ entry = entry->next;
+ }
+
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ }
+
+ up->dns_requests--;
+ cbdata->requests_inflight --;
+
+ if (cbdata->requests_inflight == 0) {
+ g_free (cbdata);
+ }
+
+ if (up->dns_requests == 0) {
+ rspamd_upstream_update_addrs (up);
+ }
+
+ REF_RELEASE (up);
+}
+
+static void
+rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg)
+{
+ struct upstream *upstream = (struct upstream *) arg;
+ struct rdns_reply_entry *entry;
+ struct rspamd_upstream_srv_dns_cb *ncbdata;
+
+ if (reply->code == RDNS_RC_NOERROR) {
+ entry = reply->entries;
+
+ RSPAMD_UPSTREAM_LOCK (upstream->lock);
+ while (entry) {
+ /* XXX: we ignore weight as it contradicts with upstreams logic */
+ if (entry->type == RDNS_REQUEST_SRV) {
+ msg_debug_upstream ("got srv reply for %s: %s "
+ "(weight=%d, priority=%d, port=%d)",
+ upstream->name, entry->content.srv.target,
+ entry->content.srv.weight, entry->content.srv.priority,
+ entry->content.srv.port);
+ ncbdata = g_malloc0 (sizeof (*ncbdata));
+ ncbdata->priority = entry->content.srv.weight;
+ ncbdata->port = entry->content.srv.port;
+ ncbdata->ttl = entry->ttl;
+
+ if (rdns_make_request_full (upstream->ctx->res,
+ rspamd_upstream_dns_srv_phase2_cb, ncbdata,
+ upstream->ls->limits.dns_timeout,
+ upstream->ls->limits.dns_retransmits,
+ 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
+ upstream->dns_requests++;
+ REF_RETAIN (upstream);
+ ncbdata->requests_inflight ++;
+ }
+
+ if (rdns_make_request_full (upstream->ctx->res,
+ rspamd_upstream_dns_srv_phase2_cb, ncbdata,
+ upstream->ls->limits.dns_timeout,
+ upstream->ls->limits.dns_retransmits,
+ 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
+ upstream->dns_requests++;
+ REF_RETAIN (upstream);
+ ncbdata->requests_inflight ++;
+ }
+
+ if (ncbdata->requests_inflight == 0) {
+ g_free (ncbdata);
+ }
+ }
+ entry = entry->next;
+ }
+
+ RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+ }
+
+ upstream->dns_requests--;
+ REF_RELEASE (upstream);
+}
+
static void
rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
{
@@ -464,19 +607,31 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
!(up->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
/* Resolve name of the upstream one more time */
if (up->name[0] != '/') {
-
- if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
- ls->limits.dns_timeout, ls->limits.dns_retransmits,
- 1, up->name, RDNS_REQUEST_A) != NULL) {
- up->dns_requests ++;
- REF_RETAIN (up);
+ if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+ if (rdns_make_request_full (up->ctx->res,
+ rspamd_upstream_dns_srv_cb, up,
+ ls->limits.dns_timeout, ls->limits.dns_retransmits,
+ 1, up->name, RDNS_REQUEST_SRV) != NULL) {
+ up->dns_requests++;
+ REF_RETAIN (up);
+ }
}
+ else {
+ if (rdns_make_request_full (up->ctx->res,
+ rspamd_upstream_dns_cb, up,
+ ls->limits.dns_timeout, ls->limits.dns_retransmits,
+ 1, up->name, RDNS_REQUEST_A) != NULL) {
+ up->dns_requests++;
+ REF_RETAIN (up);
+ }
- if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
- ls->limits.dns_timeout, ls->limits.dns_retransmits,
- 1, up->name, RDNS_REQUEST_AAAA) != NULL) {
- up->dns_requests ++;
- REF_RETAIN (up);
+ if (rdns_make_request_full (up->ctx->res,
+ rspamd_upstream_dns_cb, up,
+ ls->limits.dns_timeout, ls->limits.dns_retransmits,
+ 1, up->name, RDNS_REQUEST_AAAA) != NULL) {
+ up->dns_requests++;
+ REF_RETAIN (up);
+ }
}
}
}
@@ -491,6 +646,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
ev_timer_stop (loop, w);
if (up->ls) {
+
rspamd_upstream_resolve_addrs (up->ls, up);
w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time,
@@ -773,18 +929,61 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
{
struct upstream *upstream;
GPtrArray *addrs = NULL;
- guint i;
+ guint i, slen;
rspamd_inet_addr_t *addr;
enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
upstream = g_malloc0 (sizeof (*upstream));
+ slen = strlen (str);
switch (parse_type) {
case RSPAMD_UPSTREAM_PARSE_DEFAULT:
- ret = rspamd_parse_host_port_priority (str, &addrs,
- &upstream->weight,
- &upstream->name, def_port,
- ups->ctx ? ups->ctx->pool : NULL);
+ if (slen > sizeof ("service=") &&
+ RSPAMD_LEN_CHECK_STARTS_WITH (str, slen, "service=")) {
+ const gchar *plus_pos, *service_pos, *semicolon_pos;
+
+ /* Accept service=srv_name+hostname[:priority] */
+ service_pos = str + sizeof ("service=") - 1;
+ plus_pos = strchr (service_pos, '+');
+
+ if (plus_pos != NULL) {
+ semicolon_pos = strchr (plus_pos + 1, ':');
+
+ if (semicolon_pos) {
+ upstream->weight = strtoul (semicolon_pos + 1, NULL, 10);
+ }
+ else {
+ semicolon_pos = plus_pos + strlen (plus_pos);
+ }
+
+ /*
+ * Now our name is _service._tcp.<domain>
+ * where <domain> is string between semicolon_pos and plus_pos +1
+ * while service is a string between service_pos and plus_pos
+ */
+ guint namelen = (semicolon_pos - (plus_pos + 1)) +
+ (plus_pos - service_pos) +
+ (sizeof ("tcp") - 1) +
+ 4;
+ addrs = g_ptr_array_sized_new (1);
+ upstream->name = ups->ctx ?
+ rspamd_mempool_alloc (ups->ctx->pool, namelen + 1) :
+ g_malloc (namelen + 1);
+
+ rspamd_snprintf (upstream->name, namelen + 1,
+ "_%*s._tcp.%*s",
+ (gint)(plus_pos - service_pos), service_pos,
+ (gint)(semicolon_pos - (plus_pos + 1)), plus_pos + 1);
+ upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
+ ret = RSPAMD_PARSE_ADDR_RESOLVED;
+ }
+ }
+ else {
+ ret = rspamd_parse_host_port_priority (str, &addrs,
+ &upstream->weight,
+ &upstream->name, def_port,
+ ups->ctx ? ups->ctx->pool : NULL);
+ }
break;
case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
addrs = g_ptr_array_sized_new (1);
@@ -824,7 +1023,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
return FALSE;
}
else {
- upstream->flags = ups->flags;
+ upstream->flags |= ups->flags;
if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
/* Add noresolve flag */
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index 04ec6d984..5d3e00514 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -21,6 +21,7 @@ enum rspamd_upstream_rotation {
enum rspamd_upstream_flag {
RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0),
+ RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1),
};
struct rspamd_config;
More information about the Commits
mailing list