commit 44f911a: [Feature] Upstreams: Add lazy resolving logic to all upstreams

Vsevolod Stakhov vsevolod at highsecure.ru
Fri Jul 26 16:28:03 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-07-26 17:26:23 +0100
URL: https://github.com/rspamd/rspamd/commit/44f911a00655f5c8caa103f15afa155c25ac25a0 (HEAD -> master)

[Feature] Upstreams: Add lazy resolving logic to all upstreams

---
 src/libutil/upstream.c | 81 ++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 79 insertions(+), 2 deletions(-)

diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index c445751b4..4cd39f5ea 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -73,6 +73,7 @@ struct upstream_limits {
 	gdouble revive_jitter;
 	gdouble error_time;
 	gdouble dns_timeout;
+	gdouble lazy_resolve_time;
 	guint max_errors;
 	guint dns_retransmits;
 };
@@ -115,6 +116,10 @@ static gdouble default_revive_jitter = 0.4;
 static gdouble default_error_time = 10;
 static gdouble default_dns_timeout = 1.0;
 static guint default_dns_retransmits = 2;
+/* TODO: make it configurable */
+static gdouble default_lazy_resolve_time = 3600.0;
+
+static void rspamd_upstream_lazy_resolve_cb (struct ev_loop *, ev_timer *, int );
 
 void
 rspamd_upstreams_library_config (struct rspamd_config *cfg,
@@ -144,6 +149,29 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
 	ctx->event_loop = event_loop;
 	ctx->res = resolver;
 	ctx->configured = TRUE;
+
+	/* Start lazy resolving */
+	if (event_loop && resolver) {
+		GList *cur;
+		struct upstream *u;
+
+		cur = ctx->upstreams->head;
+
+		while (cur) {
+			u = cur->data;
+			if (!ev_is_active (&u->ev) && u->ls &&
+				!(u->ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+				gdouble when = rspamd_time_jitter (u->ls->limits.lazy_resolve_time,
+						u->ls->limits.lazy_resolve_time * .1);
+				ev_timer_init (&u->ev, rspamd_upstream_lazy_resolve_cb,
+						when, 0);
+				u->ev.data = u;
+				ev_timer_start (ctx->event_loop, &u->ev);
+			}
+
+			cur = g_list_next (cur);
+		}
+	}
 }
 
 static void
@@ -184,6 +212,7 @@ rspamd_upstreams_library_init (void)
 	ctx->limits.dns_timeout = default_dns_timeout;
 	ctx->limits.revive_jitter = default_revive_jitter;
 	ctx->limits.revive_time = default_revive_time;
+	ctx->limits.lazy_resolve_time = default_lazy_resolve_time;
 	ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
 			"upstreams");
 
@@ -235,6 +264,21 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
 	RSPAMD_UPSTREAM_LOCK (ls->lock);
 	g_ptr_array_add (ls->alive, up);
 	up->active_idx = ls->alive->len - 1;
+
+	if (up->ctx && up->ctx->configured &&
+		!(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+
+		if (ev_is_active (&up->ev)) {
+			ev_timer_stop (up->ctx->event_loop, &up->ev);
+		}
+		/* Start lazy names resolution */
+		gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
+				ls->limits.lazy_resolve_time * 0.1);
+		ev_timer_init (&up->ev, rspamd_upstream_lazy_resolve_cb, when, 0);
+		up->ev.data = up;
+		ev_timer_start (up->ctx->event_loop, &up->ev);
+	}
+
 	RSPAMD_UPSTREAM_UNLOCK (ls->lock);
 }
 
@@ -372,6 +416,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
 
 	RSPAMD_UPSTREAM_LOCK (up->lock);
 	ev_timer_stop (loop, w);
+
 	if (up->ls) {
 		rspamd_upstream_set_active (up->ls, up);
 	}
@@ -408,6 +453,25 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
 	}
 }
 
+static void
+rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
+{
+	struct upstream *up = (struct upstream *)w->data;
+
+	RSPAMD_UPSTREAM_LOCK (up->lock);
+	ev_timer_stop (loop, w);
+
+	if (up->ls) {
+		rspamd_upstream_resolve_addrs (up->ls, up);
+
+		w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time,
+				up->ls->limits.lazy_resolve_time * .1);
+		ev_timer_again (loop, w);
+	}
+
+	RSPAMD_UPSTREAM_UNLOCK (up->lock);
+}
+
 static void
 rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 {
@@ -432,6 +496,11 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 		REF_RETAIN (up);
 		ntim = rspamd_time_jitter (ls->limits.revive_time,
 				ls->limits.revive_jitter);
+
+		if (ev_is_active (&up->ev)) {
+			ev_timer_stop (up->ctx->event_loop, &up->ev);
+		}
+
 		ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0);
 		up->ev.data = up;
 
@@ -583,6 +652,7 @@ rspamd_upstreams_create (struct upstream_ctx *ctx)
 		ls->limits.dns_timeout = default_dns_timeout;
 		ls->limits.revive_jitter = default_revive_jitter;
 		ls->limits.revive_time = default_revive_time;
+		ls->limits.lazy_resolve_time = default_lazy_resolve_time;
 	}
 
 	return ls;
@@ -620,6 +690,11 @@ rspamd_upstream_dtor (struct upstream *up)
 	rspamd_mutex_free (up->lock);
 
 	if (up->ctx) {
+
+		if (ev_is_active (&up->ev)) {
+			ev_timer_stop (up->ctx->event_loop, &up->ev);
+		}
+
 		g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
 		REF_RELEASE (up->ctx);
 	}
@@ -747,7 +822,6 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
 	}
 
 	g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
-
 	rspamd_upstream_set_active (ups, up);
 
 	return TRUE;
@@ -913,7 +987,10 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
 	/* Here the upstreams list is already locked */
 	RSPAMD_UPSTREAM_LOCK (up->lock);
 
-	ev_timer_stop (up->ctx->event_loop, &up->ev);
+	if (ev_is_active (&up->ev)) {
+		ev_timer_stop (up->ctx->event_loop, &up->ev);
+	}
+
 	g_ptr_array_add (ups->alive, up);
 	up->active_idx = ups->alive->len - 1;
 	RSPAMD_UPSTREAM_UNLOCK (up->lock);


More information about the Commits mailing list