commit a6a1a8d: [Feature] Implement event watchers for upstreams
Vsevolod Stakhov
vsevolod at highsecure.ru
Thu Dec 27 18:28:03 UTC 2018
Author: Vsevolod Stakhov
Date: 2018-12-04 17:05:18 +0000
URL: https://github.com/rspamd/rspamd/commit/a6a1a8d5aea8439b78645d2dc77b61a898020b7d
[Feature] Implement event watchers for upstreams
---
src/libutil/upstream.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++++--
src/libutil/upstream.h | 25 ++++++++++++++++++
2 files changed, 93 insertions(+), 2 deletions(-)
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 4ed657da4..90f792bbe 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -34,6 +34,13 @@ struct upstream_addr_elt {
guint errors;
};
+struct upstream_list_watcher {
+ rspamd_upstream_watch_func func;
+ gpointer ud;
+ enum rspamd_upstreams_watch_event events_mask;
+ struct upstream_list_watcher *next, *prev;
+};
+
struct upstream {
guint weight;
guint cur_weight;
@@ -73,6 +80,7 @@ struct upstream_list {
struct upstream_ctx *ctx;
GPtrArray *ups;
GPtrArray *alive;
+ struct upstream_list_watcher *watchers;
rspamd_mutex_t *lock;
guint64 hash_seed;
struct upstream_limits limits;
@@ -109,8 +117,9 @@ static guint default_dns_retransmits = 2;
void
rspamd_upstreams_library_config (struct rspamd_config *cfg,
- struct upstream_ctx *ctx, struct event_base *ev_base,
- struct rdns_resolver *resolver)
+ struct upstream_ctx *ctx,
+ struct event_base *ev_base,
+ struct rdns_resolver *resolver)
{
g_assert (ctx != NULL);
g_assert (cfg != NULL);
@@ -405,6 +414,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
guint i;
struct upstream *cur;
struct timeval tv;
+ struct upstream_list_watcher *w;
RSPAMD_UPSTREAM_LOCK (ls->lock);
g_ptr_array_remove_index (ls->alive, up->active_idx);
@@ -431,6 +441,12 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
event_add (&up->ev, &tv);
}
+ DL_FOREACH (up->ls->watchers, w) {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
+ w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud);
+ }
+ }
+
RSPAMD_UPSTREAM_UNLOCK (ls->lock);
}
@@ -440,6 +456,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
gdouble error_rate, max_error_rate;
gdouble sec_last, sec_cur;
struct upstream_addr_elt *addr_elt;
+ struct upstream_list_watcher *w;
if (up->ctx && up->active_idx != -1) {
sec_cur = rspamd_get_ticks (FALSE);
@@ -449,6 +466,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
/* We have the first error */
up->last_fail = sec_cur;
up->errors = 1;
+
+ DL_FOREACH (up->ls->watchers, w) {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+ w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
+ }
+ }
}
else {
sec_last = up->last_fail;
@@ -456,6 +479,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
if (sec_cur >= sec_last) {
up->errors ++;
+ DL_FOREACH (up->ls->watchers, w) {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
+ w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, up->errors, w->ud);
+ }
+ }
+
if (sec_cur > sec_last) {
error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
max_error_rate = ((gdouble)up->ls->limits.max_errors) /
@@ -499,6 +528,7 @@ void
rspamd_upstream_ok (struct upstream *up)
{
struct upstream_addr_elt *addr_elt;
+ struct upstream_list_watcher *w;
RSPAMD_UPSTREAM_LOCK (up->lock);
if (up->errors > 0 && up->active_idx != -1) {
@@ -509,6 +539,12 @@ rspamd_upstream_ok (struct upstream *up)
addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
addr_elt->errors = 0;
}
+
+ DL_FOREACH (up->ls->watchers, w) {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
+ w->func (up, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
+ }
+ }
}
RSPAMD_UPSTREAM_UNLOCK (up->lock);
@@ -831,6 +867,7 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
{
guint i;
struct upstream *up;
+ struct upstream_list_watcher *w, *tmp;
if (ups != NULL) {
g_ptr_array_free (ups->alive, TRUE);
@@ -841,6 +878,10 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
REF_RELEASE (up);
}
+ DL_FOREACH_SAFE (ups->watchers, w, tmp) {
+ g_free (w);
+ }
+
g_ptr_array_free (ups->ups, TRUE);
rspamd_mutex_free (ups->lock);
g_free (ups);
@@ -852,6 +893,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
{
struct upstream *up = (struct upstream *)elt;
struct upstream_list *ups = (struct upstream_list *)ls;
+ struct upstream_list_watcher *w;
/* Here the upstreams list is already locked */
RSPAMD_UPSTREAM_LOCK (up->lock);
@@ -862,6 +904,13 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
RSPAMD_UPSTREAM_UNLOCK (up->lock);
+
+ DL_FOREACH (up->ls->watchers, w) {
+ if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
+ w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
+ }
+ }
+
/* For revive event */
REF_RELEASE (up);
}
@@ -1125,3 +1174,20 @@ rspamd_upstreams_set_limits (struct upstream_list *ups,
ups->limits.dns_retransmits = dns_retransmits;
}
}
+
+void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
+ enum rspamd_upstreams_watch_event events,
+ rspamd_upstream_watch_func func,
+ gpointer ud)
+{
+ struct upstream_list_watcher *nw;
+
+ g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
+
+ nw = g_malloc (sizeof (*nw));
+ nw->func = func;
+ nw->events_mask = events;
+ nw->ud = ud;
+
+ DL_APPEND (ups->watchers, nw);
+}
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index 9b5c7794c..56d6fa6c5 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -181,6 +181,31 @@ typedef void (*rspamd_upstream_traverse_func) (struct upstream *up, guint idx,
void rspamd_upstreams_foreach (struct upstream_list *ups,
rspamd_upstream_traverse_func cb, void *ud);
+enum rspamd_upstreams_watch_event {
+ RSPAMD_UPSTREAM_WATCH_SUCCESS = 1u << 0,
+ RSPAMD_UPSTREAM_WATCH_FAILURE = 1u << 1,
+ RSPAMD_UPSTREAM_WATCH_OFFLINE = 1u << 2,
+ RSPAMD_UPSTREAM_WATCH_ONLINE = 1u << 3,
+ RSPAMD_UPSTREAM_WATCH_ALL = (1u << 0) | (1u << 1) | (1u << 2) | (1u << 3),
+};
+
+typedef void (*rspamd_upstream_watch_func) (struct upstream *up,
+ enum rspamd_upstreams_watch_event event,
+ guint cur_errors,
+ void *ud);
+
+/**
+ * Adds new watcher to the upstreams list
+ * @param ups
+ * @param events
+ * @param func
+ * @param ud
+ */
+void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
+ enum rspamd_upstreams_watch_event events,
+ rspamd_upstream_watch_func func,
+ gpointer ud);
+
/**
* Returns the current IP address of the upstream
* @param up
More information about the Commits
mailing list