commit a6a1a8d: [Feature] Implement event watchers for upstreams

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Dec 27 18:28:03 UTC 2018


Author: Vsevolod Stakhov
Date: 2018-12-04 17:05:18 +0000
URL: https://github.com/rspamd/rspamd/commit/a6a1a8d5aea8439b78645d2dc77b61a898020b7d

[Feature] Implement event watchers for upstreams

---
 src/libutil/upstream.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++++--
 src/libutil/upstream.h | 25 ++++++++++++++++++
 2 files changed, 93 insertions(+), 2 deletions(-)

diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 4ed657da4..90f792bbe 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -34,6 +34,13 @@ struct upstream_addr_elt {
 	guint errors;
 };
 
+struct upstream_list_watcher {
+	rspamd_upstream_watch_func func;
+	gpointer ud;
+	enum rspamd_upstreams_watch_event events_mask;
+	struct upstream_list_watcher *next, *prev;
+};
+
 struct upstream {
 	guint weight;
 	guint cur_weight;
@@ -73,6 +80,7 @@ struct upstream_list {
 	struct upstream_ctx *ctx;
 	GPtrArray *ups;
 	GPtrArray *alive;
+	struct upstream_list_watcher *watchers;
 	rspamd_mutex_t *lock;
 	guint64 hash_seed;
 	struct upstream_limits limits;
@@ -109,8 +117,9 @@ static guint default_dns_retransmits = 2;
 
 void
 rspamd_upstreams_library_config (struct rspamd_config *cfg,
-		struct upstream_ctx *ctx, struct event_base *ev_base,
-		struct rdns_resolver *resolver)
+								 struct upstream_ctx *ctx,
+								 struct event_base *ev_base,
+								 struct rdns_resolver *resolver)
 {
 	g_assert (ctx != NULL);
 	g_assert (cfg != NULL);
@@ -405,6 +414,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 	guint i;
 	struct upstream *cur;
 	struct timeval tv;
+	struct upstream_list_watcher *w;
 
 	RSPAMD_UPSTREAM_LOCK (ls->lock);
 	g_ptr_array_remove_index (ls->alive, up->active_idx);
@@ -431,6 +441,12 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 		event_add (&up->ev, &tv);
 	}
 
+	DL_FOREACH (up->ls->watchers, w) {
+		if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
+			w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud);
+		}
+	}
+
 	RSPAMD_UPSTREAM_UNLOCK (ls->lock);
 }
 
@@ -440,6 +456,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
 	gdouble error_rate, max_error_rate;
 	gdouble sec_last, sec_cur;
 	struct upstream_addr_elt *addr_elt;
+	struct upstream_list_watcher *w;
 
 	if (up->ctx && up->active_idx != -1) {
 		sec_cur = rspamd_get_ticks (FALSE);
@@ -449,6 +466,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
 			/* We have the first error */
 			up->last_fail = sec_cur;
 			up->errors = 1;
+
+			DL_FOREACH (up->ls->watchers, w) {
+				if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+					w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
+				}
+			}
 		}
 		else {
 			sec_last = up->last_fail;
@@ -456,6 +479,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
 			if (sec_cur >= sec_last) {
 				up->errors ++;
 
+				DL_FOREACH (up->ls->watchers, w) {
+					if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+						w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, up->errors, w->ud);
+					}
+				}
+
 				if (sec_cur > sec_last) {
 					error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
 					max_error_rate = ((gdouble)up->ls->limits.max_errors) /
@@ -499,6 +528,7 @@ void
 rspamd_upstream_ok (struct upstream *up)
 {
 	struct upstream_addr_elt *addr_elt;
+	struct upstream_list_watcher *w;
 
 	RSPAMD_UPSTREAM_LOCK (up->lock);
 	if (up->errors > 0 && up->active_idx != -1) {
@@ -509,6 +539,12 @@ rspamd_upstream_ok (struct upstream *up)
 			addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
 			addr_elt->errors = 0;
 		}
+
+		DL_FOREACH (up->ls->watchers, w) {
+			if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
+				w->func (up, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
+			}
+		}
 	}
 
 	RSPAMD_UPSTREAM_UNLOCK (up->lock);
@@ -831,6 +867,7 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
 {
 	guint i;
 	struct upstream *up;
+	struct upstream_list_watcher *w, *tmp;
 
 	if (ups != NULL) {
 		g_ptr_array_free (ups->alive, TRUE);
@@ -841,6 +878,10 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
 			REF_RELEASE (up);
 		}
 
+		DL_FOREACH_SAFE (ups->watchers, w, tmp) {
+			g_free (w);
+		}
+
 		g_ptr_array_free (ups->ups, TRUE);
 		rspamd_mutex_free (ups->lock);
 		g_free (ups);
@@ -852,6 +893,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
 {
 	struct upstream *up = (struct upstream *)elt;
 	struct upstream_list *ups = (struct upstream_list *)ls;
+	struct upstream_list_watcher *w;
 
 	/* Here the upstreams list is already locked */
 	RSPAMD_UPSTREAM_LOCK (up->lock);
@@ -862,6 +904,13 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
 	g_ptr_array_add (ups->alive, up);
 	up->active_idx = ups->alive->len - 1;
 	RSPAMD_UPSTREAM_UNLOCK (up->lock);
+
+	DL_FOREACH (up->ls->watchers, w) {
+		if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
+			w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
+		}
+	}
+
 	/* For revive event */
 	REF_RELEASE (up);
 }
@@ -1125,3 +1174,20 @@ rspamd_upstreams_set_limits (struct upstream_list *ups,
 		ups->limits.dns_retransmits = dns_retransmits;
 	}
 }
+
+void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
+										  enum rspamd_upstreams_watch_event events,
+										  rspamd_upstream_watch_func func,
+										  gpointer ud)
+{
+	struct upstream_list_watcher *nw;
+
+	g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
+
+	nw = g_malloc (sizeof (*nw));
+	nw->func = func;
+	nw->events_mask = events;
+	nw->ud = ud;
+
+	DL_APPEND (ups->watchers, nw);
+}
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index 9b5c7794c..56d6fa6c5 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -181,6 +181,31 @@ typedef void (*rspamd_upstream_traverse_func) (struct upstream *up, guint idx,
 void rspamd_upstreams_foreach (struct upstream_list *ups,
 		rspamd_upstream_traverse_func cb, void *ud);
 
+enum rspamd_upstreams_watch_event {
+	RSPAMD_UPSTREAM_WATCH_SUCCESS = 1u << 0,
+	RSPAMD_UPSTREAM_WATCH_FAILURE = 1u << 1,
+	RSPAMD_UPSTREAM_WATCH_OFFLINE = 1u << 2,
+	RSPAMD_UPSTREAM_WATCH_ONLINE = 1u << 3,
+	RSPAMD_UPSTREAM_WATCH_ALL = (1u << 0) | (1u << 1) | (1u << 2) | (1u << 3),
+};
+
+typedef void (*rspamd_upstream_watch_func) (struct upstream *up,
+											enum rspamd_upstreams_watch_event event,
+											guint cur_errors,
+											void *ud);
+
+/**
+ * Adds new watcher to the upstreams list
+ * @param ups
+ * @param events
+ * @param func
+ * @param ud
+ */
+void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
+										  enum rspamd_upstreams_watch_event events,
+										  rspamd_upstream_watch_func func,
+										  gpointer ud);
+
 /**
  * Returns the current IP address of the upstream
  * @param up


More information about the Commits mailing list