commit 102bbc2: [Project] Preliminary addition of the HTTP connections pool
Vsevolod Stakhov
vsevolod at highsecure.ru
Mon Mar 4 18:14:08 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-03-04 18:13:17 +0000
URL: https://github.com/rspamd/rspamd/commit/102bbc23181ac715f24095594d9ab3cb96272297 (HEAD -> master)
[Project] Preliminary addition of the HTTP connections pool
---
src/libutil/http_connection.c | 209 +++--------------------------------------
src/libutil/http_connection.h | 3 +
src/libutil/http_context.c | 213 +++++++++++++++++++++++++++++++++++++++---
src/libutil/http_context.h | 39 ++++++++
src/libutil/http_message.c | 195 ++++++++++++++++++++++++++++++++++++++
src/libutil/http_private.h | 11 ++-
6 files changed, 459 insertions(+), 211 deletions(-)
diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c
index 7bc92cb1f..aea13522d 100644
--- a/src/libutil/http_connection.c
+++ b/src/libutil/http_connection.c
@@ -690,13 +690,18 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
struct rspamd_http_connection_private *priv;
gpointer ssl;
gint request_method;
+ rspamd_fstring_t *prev_host;
priv = conn->priv;
ssl = priv->ssl;
priv->ssl = NULL;
request_method = priv->msg->method;
+ /* Preserve host for keepalive */
+ prev_host = priv->msg->host;
+ priv->msg->host = NULL;
rspamd_http_connection_reset (conn);
priv->ssl = ssl;
+
/* Plan read message */
if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
@@ -708,7 +713,15 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
conn->priv->ptv);
}
- priv->msg->method = request_method;
+ if (priv->msg) {
+ priv->msg->method = request_method;
+ priv->msg->host = prev_host;
+ }
+ else {
+ if (prev_host) {
+ rspamd_fstring_free (prev_host);
+ }
+ }
}
static void
@@ -2145,200 +2158,6 @@ rspamd_http_connection_set_max_size (struct rspamd_http_connection *conn,
conn->max_size = sz;
}
-void
-rspamd_http_message_free (struct rspamd_http_message *msg)
-{
- struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp;
-
-
- HASH_ITER (hh, msg->headers, hdr, htmp) {
- HASH_DEL (msg->headers, hdr);
-
- DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
- rspamd_fstring_free (hcur->combined);
- g_free (hcur);
- }
- }
-
- rspamd_http_message_storage_cleanup (msg);
-
- if (msg->url != NULL) {
- rspamd_fstring_free (msg->url);
- }
- if (msg->status != NULL) {
- rspamd_fstring_free (msg->status);
- }
- if (msg->host != NULL) {
- rspamd_fstring_free (msg->host);
- }
- if (msg->peer_key != NULL) {
- rspamd_pubkey_unref (msg->peer_key);
- }
-
- g_free (msg);
-}
-
-void
-rspamd_http_message_set_peer_key (struct rspamd_http_message *msg,
- struct rspamd_cryptobox_pubkey *pk)
-{
- if (msg->peer_key != NULL) {
- rspamd_pubkey_unref (msg->peer_key);
- }
-
- if (pk) {
- msg->peer_key = rspamd_pubkey_ref (pk);
- }
- else {
- msg->peer_key = NULL;
- }
-}
-
-void
-rspamd_http_message_add_header_len (struct rspamd_http_message *msg,
- const gchar *name,
- const gchar *value,
- gsize len)
-{
- struct rspamd_http_header *hdr, *found = NULL;
- guint nlen, vlen;
-
- if (msg != NULL && name != NULL && value != NULL) {
- hdr = g_malloc0 (sizeof (struct rspamd_http_header));
- nlen = strlen (name);
- vlen = len;
- hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
- rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen,
- value);
- hdr->name.begin = hdr->combined->str;
- hdr->name.len = nlen;
- hdr->value.begin = hdr->combined->str + nlen + 2;
- hdr->value.len = vlen;
-
- HASH_FIND (hh, msg->headers, hdr->name.begin,
- hdr->name.len, found);
-
- if (found == NULL) {
- HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
- hdr->name.len, hdr);
- }
-
- DL_APPEND (found, hdr);
- }
-}
-
-void
-rspamd_http_message_add_header (struct rspamd_http_message *msg,
- const gchar *name,
- const gchar *value)
-{
- if (value) {
- rspamd_http_message_add_header_len (msg, name, value, strlen (value));
- }
-}
-
-void
-rspamd_http_message_add_header_fstr (struct rspamd_http_message *msg,
- const gchar *name,
- rspamd_fstring_t *value)
-{
- struct rspamd_http_header *hdr, *found = NULL;
- guint nlen, vlen;
-
- if (msg != NULL && name != NULL && value != NULL) {
- hdr = g_malloc0 (sizeof (struct rspamd_http_header));
- nlen = strlen (name);
- vlen = value->len;
- hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
- rspamd_printf_fstring (&hdr->combined, "%s: %V\r\n", name, value);
- hdr->name.begin = hdr->combined->str;
- hdr->name.len = nlen;
- hdr->value.begin = hdr->combined->str + nlen + 2;
- hdr->value.len = vlen;
-
- HASH_FIND (hh, msg->headers, hdr->name.begin,
- hdr->name.len, found);
-
- if (found == NULL) {
- HASH_ADD_KEYPTR (hh, msg->headers, hdr->name.begin,
- hdr->name.len, hdr);
- }
-
- DL_APPEND (found, hdr);
- }
-}
-
-const rspamd_ftok_t *
-rspamd_http_message_find_header (struct rspamd_http_message *msg,
- const gchar *name)
-{
- struct rspamd_http_header *hdr;
- const rspamd_ftok_t *res = NULL;
- guint slen = strlen (name);
-
- if (msg != NULL) {
- HASH_FIND (hh, msg->headers, name, slen, hdr);
-
- if (hdr) {
- res = &hdr->value;
- }
- }
-
- return res;
-}
-
-GPtrArray*
-rspamd_http_message_find_header_multiple (
- struct rspamd_http_message *msg,
- const gchar *name)
-{
- GPtrArray *res = NULL;
- struct rspamd_http_header *hdr, *cur;
-
- guint slen = strlen (name);
-
- if (msg != NULL) {
- HASH_FIND (hh, msg->headers, name, slen, hdr);
-
- if (hdr) {
- res = g_ptr_array_sized_new (4);
-
- LL_FOREACH (hdr, cur) {
- g_ptr_array_add (res, &cur->value);
- }
- }
- }
-
-
- return res;
-}
-
-
-gboolean
-rspamd_http_message_remove_header (struct rspamd_http_message *msg,
- const gchar *name)
-{
- struct rspamd_http_header *hdr, *hcur, *hcurtmp;
- gboolean res = FALSE;
- guint slen = strlen (name);
-
- if (msg != NULL) {
- HASH_FIND (hh, msg->headers, name, slen, hdr);
-
- if (hdr) {
- HASH_DEL (msg->headers, hdr);
- res = TRUE;
-
- DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
- rspamd_fstring_free (hcur->combined);
- g_free (hcur);
- }
- }
- }
-
- return res;
-}
-
void
rspamd_http_connection_set_key (struct rspamd_http_connection *conn,
struct rspamd_cryptobox_keypair *key)
diff --git a/src/libutil/http_connection.h b/src/libutil/http_connection.h
index 87159bdd0..a327eec0d 100644
--- a/src/libutil/http_connection.h
+++ b/src/libutil/http_connection.h
@@ -44,6 +44,7 @@ struct rspamd_http_connection_private;
struct rspamd_http_connection;
struct rspamd_http_connection_router;
struct rspamd_http_connection_entry;
+struct rspamd_keepalive_hash_key;
struct rspamd_storage_shmem {
gchar *shm_name;
@@ -106,6 +107,8 @@ struct rspamd_http_connection {
rspamd_http_error_handler_t error_handler;
rspamd_http_finish_handler_t finish_handler;
gpointer ud;
+ /* Used for keepalive */
+ struct rspamd_keepalive_hash_key *keepalive_hash_key;
gsize max_size;
unsigned opts;
enum rspamd_http_connection_type type;
diff --git a/src/libutil/http_context.c b/src/libutil/http_context.c
index 6695e8032..e326a74a1 100644
--- a/src/libutil/http_context.c
+++ b/src/libutil/http_context.c
@@ -24,6 +24,34 @@
static struct rspamd_http_context *default_ctx = NULL;
+struct rspamd_http_keepalive_cbdata {
+ struct rspamd_http_connection *conn;
+ GQueue *queue;
+ GList *link;
+ struct event ev;
+};
+
+static void
+rspamd_http_keepalive_queue_cleanup (GQueue *conns)
+{
+ GList *cur;
+
+ cur = conns->head;
+
+ while (cur) {
+ struct rspamd_http_keepalive_cbdata *cbd;
+
+ cbd = (struct rspamd_http_keepalive_cbdata *)cur->data;
+ rspamd_http_connection_unref (cbd->conn);
+ event_del (&cbd->ev);
+ g_free (cbd);
+
+ cur = cur->next;
+ }
+
+ g_queue_clear (conns);
+}
+
static void
rspamd_http_context_client_rotate_ev (gint fd, short what, void *arg)
{
@@ -69,6 +97,8 @@ rspamd_http_context_new_default (struct rspamd_config *cfg,
ctx->ev_base = ev_base;
+ ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
+
return ctx;
}
@@ -161,6 +191,7 @@ rspamd_http_context_create (struct rspamd_config *cfg,
return ctx;
}
+
void
rspamd_http_context_free (struct rspamd_http_context *ctx)
{
@@ -185,6 +216,20 @@ rspamd_http_context_free (struct rspamd_http_context *ctx)
}
}
+ struct rspamd_keepalive_hash_key *hk;
+
+ kh_foreach_key (ctx->keep_alive_hash, hk, {
+ if (hk->host) {
+ g_free (hk->host);
+ }
+
+ rspamd_inet_address_free (hk->addr);
+ rspamd_http_keepalive_queue_cleanup (&hk->conns);
+ g_free (hk);
+ });
+
+ kh_destroy (rspamd_keep_alive_hash, ctx->keep_alive_hash);
+
g_free (ctx);
}
@@ -210,32 +255,178 @@ rspamd_http_context_default (void)
}
gint32
-rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key k)
+rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key *k)
{
gint32 h;
- h = rspamd_inet_address_port_hash (k.addr);
+ h = rspamd_inet_address_port_hash (k->addr);
- if (k.host) {
- h = rspamd_cryptobox_fast_hash (k.host, strlen (k.host), h);
+ if (k->host) {
+ h = rspamd_cryptobox_fast_hash (k->host, strlen (k->host), h);
}
return h;
}
bool
-rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key k1,
- struct rspamd_keepalive_hash_key k2)
+rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key *k1,
+ struct rspamd_keepalive_hash_key *k2)
{
- if (k1.host && k2.host) {
- if (rspamd_inet_address_port_equal (k1.addr, k2.addr)) {
- return strcmp (k1.host, k2.host);
+ if (k1->host && k2->host) {
+ if (rspamd_inet_address_port_equal (k1->addr, k2->addr)) {
+ return strcmp (k1->host, k2->host);
}
}
- else if (!k1.host && !k2.host) {
- return rspamd_inet_address_port_equal (k1.addr, k2.addr);
+ else if (!k1->host && !k2->host) {
+ return rspamd_inet_address_port_equal (k1->addr, k2->addr);
}
/* One has host and another has no host */
return false;
+}
+
+struct rspamd_http_connection*
+rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx,
+ const rspamd_inet_addr_t *addr,
+ const gchar *host)
+{
+ struct rspamd_keepalive_hash_key hk, *phk;
+ khiter_t k;
+
+ hk.addr = (rspamd_inet_addr_t *)addr;
+ hk.host = (gchar *)host;
+
+ k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
+
+ if (k != kh_end (ctx->keep_alive_hash)) {
+ phk = kh_key (ctx->keep_alive_hash, k);
+ GQueue *conns = &phk->conns;
+
+ /* Use stack based approach */
+
+ if (g_queue_get_length (conns) > 0) {
+ struct rspamd_http_keepalive_cbdata *cbd;
+ struct rspamd_http_connection *conn;
+
+ cbd = g_queue_pop_head (conns);
+ event_del (&cbd->ev);
+ conn = cbd->conn;
+ g_free (cbd);
+
+ /* We transfer refcount here! */
+ return conn;
+ }
+ }
+
+ return NULL;
+}
+
+void
+rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
+ struct rspamd_http_connection *conn,
+ const rspamd_inet_addr_t *addr,
+ const gchar *host)
+{
+ struct rspamd_keepalive_hash_key hk, *phk;
+ khiter_t k;
+
+ hk.addr = (rspamd_inet_addr_t *)addr;
+ hk.host = (gchar *)host;
+
+ k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
+
+ if (k != kh_end (ctx->keep_alive_hash)) {
+ /* Reuse existing */
+ conn->keepalive_hash_key = kh_key (ctx->keep_alive_hash, k);
+ }
+ else {
+ /* Create new one */
+ GQueue empty_init = G_QUEUE_INIT;
+ gint r;
+
+ phk = g_malloc (sizeof (*phk));
+ phk->conns = empty_init;
+ phk->host = g_strdup (host);
+ phk->addr = rspamd_inet_address_copy (addr);
+
+ kh_put (rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r);
+ conn->keepalive_hash_key = phk;
+ }
+}
+
+static void
+rspamd_http_keepalive_handler (gint fd, short what, gpointer ud)
+{
+ struct rspamd_http_keepalive_cbdata *cbdata =
+ (struct rspamd_http_keepalive_cbdata *)ud;
+ /*
+ * We can get here if a remote side reported something or it has
+ * timed out. In both cases we just terminate keepalive connection.
+ */
+
+ g_queue_delete_link (cbdata->queue, cbdata->link);
+ rspamd_http_connection_unref (cbdata->conn);
+ g_free (cbdata);
+}
+
+void
+rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
+ struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg,
+ struct event_base *ev_base)
+{
+ struct rspamd_http_keepalive_cbdata *cbdata;
+ struct timeval tv;
+ gdouble timeout = ctx->config.keepalive_interval;
+
+ g_assert (conn->keepalive_hash_key != NULL);
+
+ /* Move connection to the keepalive pool */
+ cbdata = g_malloc0 (sizeof (*cbdata));
+
+ cbdata->conn = rspamd_http_connection_ref (conn);
+ g_queue_push_tail (&conn->keepalive_hash_key->conns, cbdata);
+ cbdata->link = conn->keepalive_hash_key->conns.tail;
+ cbdata->queue = &conn->keepalive_hash_key->conns;
+
+ event_set (&cbdata->ev, conn->fd, EV_READ|EV_TIMEOUT,
+ rspamd_http_keepalive_handler,
+ &cbdata);
+
+ if (msg) {
+ const rspamd_ftok_t *tok;
+
+ tok = rspamd_http_message_find_header (msg, "Keep-Alive");
+
+ if (tok) {
+ goffset pos = rspamd_substring_search_caseless (tok->begin,
+ tok->len, "timeout=", sizeof ("timeout=") - 1);
+
+ if (pos != -1) {
+ pos += sizeof ("timeout=");
+
+ gchar *end_pos = memchr (tok->begin + pos, ',', tok->len - pos);
+ glong real_timeout;
+
+ if (end_pos) {
+ if (rspamd_strtol (tok->begin + pos + 1,
+ (end_pos - tok->begin) - pos - 1, &real_timeout) &&
+ real_timeout > 0) {
+ timeout = real_timeout;
+ }
+ }
+ else {
+ if (rspamd_strtol (tok->begin + pos + 1,
+ tok->len - pos - 1, &real_timeout) &&
+ real_timeout > 0) {
+ timeout = real_timeout;
+ }
+ }
+ }
+ }
+ }
+
+ double_to_tv (timeout, &tv);
+ event_base_set (ev_base, &cbdata->ev);
+ event_add (&cbdata->ev, &tv);
}
\ No newline at end of file
diff --git a/src/libutil/http_context.h b/src/libutil/http_context.h
index 7d0820687..74e5c69a6 100644
--- a/src/libutil/http_context.h
+++ b/src/libutil/http_context.h
@@ -19,16 +19,19 @@
#include "config.h"
#include "ucl.h"
+#include "addr.h"
#include <event.h>
struct rspamd_http_context;
struct rspamd_config;
+struct rspamd_http_message;
struct rspamd_http_context_cfg {
guint kp_cache_size_client;
guint kp_cache_size_server;
guint ssl_cache_size;
+ gdouble keepalive_interval;
gdouble client_key_rotate_time;
const gchar *user_agent;
};
@@ -53,4 +56,40 @@ void rspamd_http_context_free (struct rspamd_http_context *ctx);
struct rspamd_http_context* rspamd_http_context_default (void);
+/**
+ * Returns preserved keepalive connection if it's available.
+ * Refcount is transferred to caller!
+ * @param ctx
+ * @param addr
+ * @param host
+ * @return
+ */
+struct rspamd_http_connection* rspamd_http_context_check_keepalive (
+ struct rspamd_http_context *ctx, const rspamd_inet_addr_t *addr,
+ const gchar *host);
+
+/**
+ * Prepares keepalive key for a connection by creating a new entry or by reusing existent
+ * Bear in mind, that keepalive pool has currently no cleanup methods!
+ * @param ctx
+ * @param conn
+ * @param addr
+ * @param host
+ */
+void rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
+ struct rspamd_http_connection *conn,
+ const rspamd_inet_addr_t *addr,
+ const gchar *host);
+/**
+ * Pushes a connection to keepalive pool after client request is finished,
+ * keepalive key *must* be prepared before using of this function
+ * @param ctx
+ * @param conn
+ * @param msg
+ */
+void rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
+ struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg,
+ struct event_base *ev_base);
+
#endif
diff --git a/src/libutil/http_message.c b/src/libutil/http_message.c
index b090d2f71..0720dc416 100644
--- a/src/libutil/http_message.c
+++ b/src/libutil/http_message.c
@@ -18,6 +18,7 @@
#include "libutil/http_private.h"
#include "libutil/printf.h"
#include "libutil/logger.h"
+#include "utlist.h"
#include "unix-std.h"
struct rspamd_http_message *
@@ -463,3 +464,197 @@ rspamd_http_message_storage_cleanup (struct rspamd_http_message *msg)
msg->body_buf.len = 0;
}
+
+void
+rspamd_http_message_free (struct rspamd_http_message *msg)
+{
+ struct rspamd_http_header *hdr, *htmp, *hcur, *hcurtmp;
+
+
+ HASH_ITER (hh, msg->headers, hdr, htmp) {
+ HASH_DEL (msg->headers, hdr);
+
+ DL_FOREACH_SAFE (hdr, hcur, hcurtmp) {
+ rspamd_fstring_free (hcur->combined);
+ g_free (hcur);
+ }
+ }
+
+ rspamd_http_message_storage_cleanup (msg);
+
+ if (msg->url != NULL) {
+ rspamd_fstring_free (msg->url);
+ }
+ if (msg->status != NULL) {
+ rspamd_fstring_free (msg->status);
+ }
+ if (msg->host != NULL) {
+ rspamd_fstring_free (msg->host);
+ }
+ if (msg->peer_key != NULL) {
+ rspamd_pubkey_unref (msg->peer_key);
+ }
+
+ g_free (msg);
+}
+
+void
+rspamd_http_message_set_peer_key (struct rspamd_http_message *msg,
+ struct rspamd_cryptobox_pubkey *pk)
+{
+ if (msg->peer_key != NULL) {
+ rspamd_pubkey_unref (msg->peer_key);
+ }
+
+ if (pk) {
+ msg->peer_key = rspamd_pubkey_ref (pk);
+ }
+ else {
+ msg->peer_key = NULL;
+ }
+}
+
+void
+rspamd_http_message_add_header_len (struct rspamd_http_message *msg,
+ const gchar *name,
+ const gchar *value,
+ gsize len)
+{
+ struct rspamd_http_header *hdr, *found = NULL;
+ guint nlen, vlen;
+
+ if (msg != NULL && name != NULL && value != NULL) {
+ hdr = g_malloc0 (sizeof (struct rspamd_http_header));
+ nlen = strlen (name);
+ vlen = len;
+ hdr->combined = rspamd_fstring_sized_new (nlen + vlen + 4);
+ rspamd_printf_fstring (&hdr->combined, "%s: %*s\r\n", name, (gint)vlen,
+ value);
+ hdr->name.begin = hdr->combined->str;
+ hdr->name.len = nlen;
+ hdr->value.begin = hdr->combined->str + nlen + 2;
+ hdr->value.len = vlen;
+
+ HASH_FIND (hh, msg->headers, hdr->name.begin,
+ hdr->name.len, found);
+
*** OUTPUT TRUNCATED, 146 LINES SKIPPED ***
More information about the Commits
mailing list