commit 102bbc2: [Project] Preliminary addition of the HTTP connections pool

Vsevolod Stakhov vsevolod at highsecure.ru
Mon Mar 4 18:14:08 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-03-04 18:13:17 +0000
URL: https://github.com/rspamd/rspamd/commit/102bbc23181ac715f24095594d9ab3cb96272297 (HEAD -> master)

[Project] Preliminary addition of the HTTP connections pool

---
 src/libutil/http_connection.c | 209 +++--------------------------------------
 src/libutil/http_connection.h |   3 +
 src/libutil/http_context.c    | 213 +++++++++++++++++++++++++++++++++++++++---
 src/libutil/http_context.h    |  39 ++++++++
 src/libutil/http_message.c    | 195 ++++++++++++++++++++++++++++++++++++++
 src/libutil/http_private.h    |  11 ++-
 6 files changed, 459 insertions(+), 211 deletions(-)

diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c
index 7bc92cb1f..aea13522d 100644
--- a/src/libutil/http_connection.c
+++ b/src/libutil/http_connection.c
@@ -690,13 +690,18 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
 	struct rspamd_http_connection_private *priv;
 	gpointer ssl;
 	gint request_method;
+	rspamd_fstring_t *prev_host;
 
 	priv = conn->priv;
 	ssl = priv->ssl;
 	priv->ssl = NULL;
 	request_method = priv->msg->method;
+	/* Preserve host for keepalive */
+	prev_host = priv->msg->host;
+	priv->msg->host = NULL;
 	rspamd_http_connection_reset (conn);
 	priv->ssl = ssl;
+
 	/* Plan read message */
 
 	if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
@@ -708,7 +713,15 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
 				conn->priv->ptv);
 	}
 
-	priv->msg->method = request_method;
+	if (priv->msg) {
+		priv->msg->method = request_method;
+		priv->msg->host = prev_host;
+	}
+	else {
+		if (prev_host) {
+			rspamd_fstring_free (prev_host);
+		}
+	}
 }
 
 static void
@@ -2145,200 +2158,6 @@ rspamd_http_connection_set_max_size (struct rspamd_http_connection *conn,
 	conn->max_size = sz;
 }
 
-void
-rspamd_http_message_free (struct rspamd_http_message *msg)
-{
-	struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp;
-
-
-	HASH_ITER (hh, msg->headers, hdr, htmp) {
-		HASH_DEL (msg->headers, hdr);
-
-		DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
-			rspamd_fstring_free (hcur->combined);
-			g_free (hcur);
-		}
-	}
-
-	rspamd_http_message_storage_cleanup (msg);
-
-	if (msg->url != NULL) {
-		rspamd_fstring_free (msg->url);
-	}
-	if (msg->status != NULL) {
-		rspamd_fstring_free (msg->status);
-	}
-	if (msg->host != NULL) {
-		rspamd_fstring_free (msg->host);
-	}
-	if (msg->peer_key != NULL) {
-		rspamd_pubkey_unref (msg->peer_key);
-	}
-
-	g_free (msg);
-}
-
-void
-rspamd_http_message_set_peer_key (struct rspamd_http_message *msg,
-		struct rspamd_cryptobox_pubkey *pk)
-{
-	if (msg->peer_key != NULL) {
-		rspamd_pubkey_unref (msg->peer_key);
-	}
-
-	if (pk) {
-		msg->peer_key = rspamd_pubkey_ref (pk);
-	}
-	else {
-		msg->peer_key = NULL;
-	}
-}
-
-void
-rspamd_http_message_add_header_len (struct rspamd_http_message *msg,
-	const gchar *name,
-	const gchar *value,
-	gsize len)
-{
-	struct rspamd_http_header *hdr, *found = NULL;
-	guint nlen, vlen;
-
-	if (msg != NULL && name != NULL && value != NULL) {
-		hdr = g_malloc0 (sizeof (struct rspamd_http_header));
-		nlen = strlen (name);
-		vlen = len;
-		hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
-		rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen,
-				value);
-		hdr->name.begin = hdr->combined->str;
-		hdr->name.len = nlen;
-		hdr->value.begin = hdr->combined->str + nlen + 2;
-		hdr->value.len = vlen;
-
-		HASH_FIND (hh, msg->headers, hdr->name.begin,
-				hdr->name.len, found);
-
-		if (found == NULL) {
-			HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
-					hdr->name.len, hdr);
-		}
-
-		DL_APPEND (found, hdr);
-	}
-}
-
-void
-rspamd_http_message_add_header (struct rspamd_http_message *msg,
-		const gchar *name,
-		const gchar *value)
-{
-	if (value) {
-		rspamd_http_message_add_header_len (msg, name, value, strlen (value));
-	}
-}
-
-void
-rspamd_http_message_add_header_fstr (struct rspamd_http_message *msg,
-		const gchar *name,
-		rspamd_fstring_t *value)
-{
-	struct rspamd_http_header *hdr, *found = NULL;
-	guint nlen, vlen;
-
-	if (msg != NULL && name != NULL && value != NULL) {
-		hdr = g_malloc0 (sizeof (struct rspamd_http_header));
-		nlen = strlen (name);
-		vlen = value->len;
-		hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
-		rspamd_printf_fstring (&hdr->combined, "%s: %V\r\n", name, value);
-		hdr->name.begin = hdr->combined->str;
-		hdr->name.len = nlen;
-		hdr->value.begin = hdr->combined->str + nlen + 2;
-		hdr->value.len = vlen;
-
-		HASH_FIND (hh, msg->headers, hdr->name.begin,
-				hdr->name.len, found);
-
-		if (found == NULL) {
-			HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
-					hdr->name.len, hdr);
-		}
-
-		DL_APPEND (found, hdr);
-	}
-}
-
-const rspamd_ftok_t *
-rspamd_http_message_find_header (struct rspamd_http_message *msg,
-	const gchar *name)
-{
-	struct rspamd_http_header *hdr;
-	const rspamd_ftok_t *res = NULL;
-	guint slen = strlen (name);
-
-	if (msg != NULL) {
-		HASH_FIND (hh, msg->headers, name, slen, hdr);
-
-		if (hdr) {
-			res = &hdr->value;
-		}
-	}
-
-	return res;
-}
-
-GPtrArray*
-rspamd_http_message_find_header_multiple (
-		struct rspamd_http_message *msg,
-		const gchar *name)
-{
-	GPtrArray *res = NULL;
-	struct rspamd_http_header *hdr, *cur;
-
-	guint slen = strlen (name);
-
-	if (msg != NULL) {
-		HASH_FIND (hh, msg->headers, name, slen, hdr);
-
-		if (hdr) {
-			res = g_ptr_array_sized_new (4);
-
-			LL_FOREACH (hdr, cur) {
-				g_ptr_array_add (res, &cur->value);
-			}
-		}
-	}
-
-
-	return res;
-}
-
-
-gboolean
-rspamd_http_message_remove_header (struct rspamd_http_message *msg,
-	const gchar *name)
-{
-	struct rspamd_http_header *hdr, *hcur, *hcurtmp;
-	gboolean res = FALSE;
-	guint slen = strlen (name);
-
-	if (msg != NULL) {
-		HASH_FIND (hh, msg->headers, name, slen, hdr);
-
-		if (hdr) {
-			HASH_DEL (msg->headers, hdr);
-			res = TRUE;
-
-			DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
-				rspamd_fstring_free (hcur->combined);
-				g_free (hcur);
-			}
-		}
-	}
-
-	return res;
-}
-
 void
 rspamd_http_connection_set_key (struct rspamd_http_connection *conn,
 		struct rspamd_cryptobox_keypair *key)
diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h
index 87159bdd0..a327eec0d 100644
--- a/src/libutil/http_connection.h
+++ b/src/libutil/http_connection.h
@@ -44,6 +44,7 @@ struct rspamd_http_connection_private;
 struct rspamd_http_connection;
 struct rspamd_http_connection_router;
 struct rspamd_http_connection_entry;
+struct rspamd_keepalive_hash_key;
 
 struct rspamd_storage_shmem {
 	gchar *shm_name;
@@ -106,6 +107,8 @@ struct rspamd_http_connection {
 	rspamd_http_error_handler_t error_handler;
 	rspamd_http_finish_handler_t finish_handler;
 	gpointer ud;
+	/* Used for keepalive */
+	struct rspamd_keepalive_hash_key *keepalive_hash_key;
 	gsize max_size;
 	unsigned opts;
 	enum rspamd_http_connection_type type;
diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c
index 6695e8032..e326a74a1 100644
--- a/src/libutil/http_context.c
+++ b/src/libutil/http_context.c
@@ -24,6 +24,34 @@
 
 static struct rspamd_http_context *default_ctx = NULL;
 
+struct rspamd_http_keepalive_cbdata {
+	struct rspamd_http_connection *conn;
+	GQueue *queue;
+	GList *link;
+	struct event ev;
+};
+
+static void
+rspamd_http_keepalive_queue_cleanup (GQueue *conns)
+{
+	GList *cur;
+
+	cur = conns->head;
+
+	while (cur) {
+		struct rspamd_http_keepalive_cbdata *cbd;
+
+		cbd = (struct rspamd_http_keepalive_cbdata *)cur->data;
+		rspamd_http_connection_unref (cbd->conn);
+		event_del (&cbd->ev);
+		g_free (cbd);
+
+		cur = cur->next;
+	}
+
+	g_queue_clear (conns);
+}
+
 static void
 rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg)
 {
@@ -69,6 +97,8 @@ rspamd_http_context_new_default (struct rspamd_config *cfg,
 
 	ctx->ev_base = ev_base;
 
+	ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
+
 	return ctx;
 }
 
@@ -161,6 +191,7 @@ rspamd_http_context_create (struct rspamd_config *cfg,
 	return ctx;
 }
 
+
 void
 rspamd_http_context_free (struct rspamd_http_context *ctx)
 {
@@ -185,6 +216,20 @@ rspamd_http_context_free (struct rspamd_http_context *ctx)
 		}
 	}
 
+	struct rspamd_keepalive_hash_key *hk;
+
+	kh_foreach_key (ctx->keep_alive_hash, hk, {
+		if (hk->host) {
+			g_free (hk->host);
+		}
+
+		rspamd_inet_address_free (hk->addr);
+		rspamd_http_keepalive_queue_cleanup (&hk->conns);
+		g_free (hk);
+	});
+
+	kh_destroy (rspamd_keep_alive_hash, ctx->keep_alive_hash);
+
 	g_free (ctx);
 }
 
@@ -210,32 +255,178 @@ rspamd_http_context_default (void)
 }
 
 gint32
-rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key k)
+rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key *k)
 {
 	gint32 h;
 
-	h = rspamd_inet_address_port_hash (k.addr);
+	h = rspamd_inet_address_port_hash (k->addr);
 
-	if (k.host) {
-		h = rspamd_cryptobox_fast_hash (k.host, strlen (k.host), h);
+	if (k->host) {
+		h = rspamd_cryptobox_fast_hash (k->host, strlen (k->host), h);
 	}
 
 	return h;
 }
 
 bool
-rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key k1,
-								  struct rspamd_keepalive_hash_key k2)
+rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key *k1,
+								  struct rspamd_keepalive_hash_key *k2)
 {
-	if (k1.host && k2.host) {
-		if (rspamd_inet_address_port_equal (k1.addr, k2.addr)) {
-			return strcmp (k1.host, k2.host);
+	if (k1->host && k2->host) {
+		if (rspamd_inet_address_port_equal (k1->addr, k2->addr)) {
+			return strcmp (k1->host, k2->host);
 		}
 	}
-	else if (!k1.host && !k2.host) {
-		return rspamd_inet_address_port_equal (k1.addr, k2.addr);
+	else if (!k1->host && !k2->host) {
+		return rspamd_inet_address_port_equal (k1->addr, k2->addr);
 	}
 
 	/* One has host and another has no host */
 	return false;
+}
+
+struct rspamd_http_connection*
+rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx,
+		const rspamd_inet_addr_t *addr,
+		const gchar *host)
+{
+	struct rspamd_keepalive_hash_key hk, *phk;
+	khiter_t k;
+
+	hk.addr = (rspamd_inet_addr_t *)addr;
+	hk.host = (gchar *)host;
+
+	k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
+
+	if (k != kh_end (ctx->keep_alive_hash)) {
+		phk = kh_key (ctx->keep_alive_hash, k);
+		GQueue *conns = &phk->conns;
+
+		/* Use stack based approach */
+
+		if (g_queue_get_length (conns) > 0) {
+			struct rspamd_http_keepalive_cbdata *cbd;
+			struct rspamd_http_connection *conn;
+
+			cbd = g_queue_pop_head (conns);
+			event_del (&cbd->ev);
+			conn = cbd->conn;
+			g_free (cbd);
+
+			/* We transfer refcount here! */
+			return conn;
+		}
+	}
+
+	return NULL;
+}
+
+void
+rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
+											struct rspamd_http_connection *conn,
+											const rspamd_inet_addr_t *addr,
+											const gchar *host)
+{
+	struct rspamd_keepalive_hash_key hk, *phk;
+	khiter_t k;
+
+	hk.addr = (rspamd_inet_addr_t *)addr;
+	hk.host = (gchar *)host;
+
+	k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
+
+	if (k != kh_end (ctx->keep_alive_hash)) {
+		/* Reuse existing */
+		conn->keepalive_hash_key = kh_key (ctx->keep_alive_hash, k);
+	}
+	else {
+		/* Create new one */
+		GQueue empty_init = G_QUEUE_INIT;
+		gint r;
+
+		phk = g_malloc (sizeof (*phk));
+		phk->conns = empty_init;
+		phk->host = g_strdup (host);
+		phk->addr = rspamd_inet_address_copy (addr);
+
+		kh_put (rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r);
+		conn->keepalive_hash_key = phk;
+	}
+}
+
+static void
+rspamd_http_keepalive_handler (gint fd, short what, gpointer ud)
+{
+	struct rspamd_http_keepalive_cbdata *cbdata =
+			(struct rspamd_http_keepalive_cbdata *)ud;
+	/*
+	 * We can get here if a remote side reported something or it has
+	 * timed out. In both cases we just terminate keepalive connection.
+	 */
+
+	g_queue_delete_link (cbdata->queue, cbdata->link);
+	rspamd_http_connection_unref (cbdata->conn);
+	g_free (cbdata);
+}
+
+void
+rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
+									struct rspamd_http_connection *conn,
+									struct rspamd_http_message *msg,
+									struct event_base *ev_base)
+{
+	struct rspamd_http_keepalive_cbdata *cbdata;
+	struct timeval tv;
+	gdouble timeout = ctx->config.keepalive_interval;
+
+	g_assert (conn->keepalive_hash_key != NULL);
+
+	/* Move connection to the keepalive pool */
+	cbdata = g_malloc0 (sizeof (*cbdata));
+
+	cbdata->conn = rspamd_http_connection_ref (conn);
+	g_queue_push_tail (&conn->keepalive_hash_key->conns, cbdata);
+	cbdata->link = conn->keepalive_hash_key->conns.tail;
+	cbdata->queue = &conn->keepalive_hash_key->conns;
+
+	event_set (&cbdata->ev, conn->fd, EV_READ|EV_TIMEOUT,
+			rspamd_http_keepalive_handler,
+			&cbdata);
+
+	if (msg) {
+		const rspamd_ftok_t *tok;
+
+		tok = rspamd_http_message_find_header (msg, "Keep-Alive");
+
+		if (tok) {
+			goffset pos = rspamd_substring_search_caseless (tok->begin,
+					tok->len, "timeout=", sizeof ("timeout=") - 1);
+
+			if (pos != -1) {
+				pos += sizeof ("timeout=");
+
+				gchar *end_pos = memchr (tok->begin + pos, ',', tok->len - pos);
+				glong real_timeout;
+
+				if (end_pos) {
+					if (rspamd_strtol (tok->begin + pos + 1,
+							(end_pos - tok->begin) - pos - 1, &real_timeout) &&
+							real_timeout > 0) {
+						timeout = real_timeout;
+					}
+				}
+				else {
+					if (rspamd_strtol (tok->begin + pos + 1,
+							tok->len - pos - 1, &real_timeout) &&
+						real_timeout > 0) {
+						timeout = real_timeout;
+					}
+				}
+			}
+		}
+	}
+
+	double_to_tv (timeout, &tv);
+	event_base_set (ev_base, &cbdata->ev);
+	event_add (&cbdata->ev, &tv);
 }
\ No newline at end of file
diff --git a/src/libutil/http_context.h b/src/libutil/http_context.h
index 7d0820687..74e5c69a6 100644
--- a/src/libutil/http_context.h
+++ b/src/libutil/http_context.h
@@ -19,16 +19,19 @@
 
 #include "config.h"
 #include "ucl.h"
+#include "addr.h"
 
 #include <event.h>
 
 struct rspamd_http_context;
 struct rspamd_config;
+struct rspamd_http_message;
 
 struct rspamd_http_context_cfg {
 	guint kp_cache_size_client;
 	guint kp_cache_size_server;
 	guint ssl_cache_size;
+	gdouble keepalive_interval;
 	gdouble client_key_rotate_time;
 	const gchar *user_agent;
 };
@@ -53,4 +56,40 @@ void rspamd_http_context_free (struct rspamd_http_context *ctx);
 
 struct rspamd_http_context* rspamd_http_context_default (void);
 
+/**
+ * Returns preserved keepalive connection if it's available.
+ * Refcount is transferred to caller!
+ * @param ctx
+ * @param addr
+ * @param host
+ * @return
+ */
+struct rspamd_http_connection* rspamd_http_context_check_keepalive (
+		struct rspamd_http_context *ctx, const rspamd_inet_addr_t *addr,
+		const gchar *host);
+
+/**
+ * Prepares keepalive key for a connection by creating a new entry or by reusing existent
+ * Bear in mind, that keepalive pool has currently no cleanup methods!
+ * @param ctx
+ * @param conn
+ * @param addr
+ * @param host
+ */
+void rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
+											struct rspamd_http_connection *conn,
+											const rspamd_inet_addr_t *addr,
+											const gchar *host);
+/**
+ * Pushes a connection to keepalive pool after client request is finished,
+ * keepalive key *must* be prepared before using of this function
+ * @param ctx
+ * @param conn
+ * @param msg
+ */
+void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
+										 struct rspamd_http_connection *conn,
+										 struct rspamd_http_message *msg,
+										 struct event_base *ev_base);
+
 #endif
diff --git a/src/libutil/http_message.c b/src/libutil/http_message.c
index b090d2f71..0720dc416 100644
--- a/src/libutil/http_message.c
+++ b/src/libutil/http_message.c
@@ -18,6 +18,7 @@
 #include "libutil/http_private.h"
 #include "libutil/printf.h"
 #include "libutil/logger.h"
+#include "utlist.h"
 #include "unix-std.h"
 
 struct rspamd_http_message *
@@ -463,3 +464,197 @@ rspamd_http_message_storage_cleanup (struct rspamd_http_message *msg)
 
 	msg->body_buf.len = 0;
 }
+
+void
+rspamd_http_message_free (struct rspamd_http_message *msg)
+{
+	struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp;
+
+
+	HASH_ITER (hh, msg->headers, hdr, htmp) {
+		HASH_DEL (msg->headers, hdr);
+
+		DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
+			rspamd_fstring_free (hcur->combined);
+			g_free (hcur);
+		}
+	}
+
+	rspamd_http_message_storage_cleanup (msg);
+
+	if (msg->url != NULL) {
+		rspamd_fstring_free (msg->url);
+	}
+	if (msg->status != NULL) {
+		rspamd_fstring_free (msg->status);
+	}
+	if (msg->host != NULL) {
+		rspamd_fstring_free (msg->host);
+	}
+	if (msg->peer_key != NULL) {
+		rspamd_pubkey_unref (msg->peer_key);
+	}
+
+	g_free (msg);
+}
+
+void
+rspamd_http_message_set_peer_key (struct rspamd_http_message *msg,
+								  struct rspamd_cryptobox_pubkey *pk)
+{
+	if (msg->peer_key != NULL) {
+		rspamd_pubkey_unref (msg->peer_key);
+	}
+
+	if (pk) {
+		msg->peer_key = rspamd_pubkey_ref (pk);
+	}
+	else {
+		msg->peer_key = NULL;
+	}
+}
+
+void
+rspamd_http_message_add_header_len (struct rspamd_http_message *msg,
+									const gchar *name,
+									const gchar *value,
+									gsize len)
+{
+	struct rspamd_http_header *hdr, *found = NULL;
+	guint nlen, vlen;
+
+	if (msg != NULL && name != NULL && value != NULL) {
+		hdr = g_malloc0 (sizeof (struct rspamd_http_header));
+		nlen = strlen (name);
+		vlen = len;
+		hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
+		rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen,
+				value);
+		hdr->name.begin = hdr->combined->str;
+		hdr->name.len = nlen;
+		hdr->value.begin = hdr->combined->str + nlen + 2;
+		hdr->value.len = vlen;
+
+		HASH_FIND (hh, msg->headers, hdr->name.begin,
+				hdr->name.len, found);
+
*** OUTPUT TRUNCATED, 146 LINES SKIPPED ***


More information about the Commits mailing list