commit ead250d: [Feature] Send quit command to Redis

Vsevolod Stakhov vsevolod at highsecure.ru
Wed Dec 25 13:07:06 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-12-25 13:04:49 +0000
URL: https://github.com/rspamd/rspamd/commit/ead250d5804be03dbd6d7ade87450ddf3ef3a259

[Feature] Send quit command to Redis

---
 src/libserver/redis_pool.c | 81 +++++++++++++++++++++++++++++++++++-----------
 1 file changed, 62 insertions(+), 19 deletions(-)

diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
index 0d310d968..732d3b5bf 100644
--- a/src/libserver/redis_pool.c
+++ b/src/libserver/redis_pool.c
@@ -26,12 +26,18 @@
 
 struct rspamd_redis_pool_elt;
 
+enum rspamd_redis_pool_connection_state {
+	RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
+	RSPAMD_REDIS_POOL_CONN_ACTIVE,
+	RSPAMD_REDIS_POOL_CONN_FINALISING
+};
+
 struct rspamd_redis_pool_connection {
 	struct redisAsyncContext *ctx;
 	struct rspamd_redis_pool_elt *elt;
 	GList *entry;
 	ev_timer timeout;
-	gboolean active;
+	enum rspamd_redis_pool_connection_state state;
 	gchar tag[MEMPOOL_UID_LEN];
 	ref_entry_t ref;
 };
@@ -99,7 +105,7 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
 static void
 rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
 {
-	if (conn->active) {
+	if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) {
 		msg_debug_rpool ("active connection removed");
 
 		if (conn->ctx) {
@@ -126,7 +132,7 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
 			redisAsyncContext *ac = conn->ctx;
 
 			/* To prevent on_disconnect here */
-			conn->active = TRUE;
+			conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
 			g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
 			conn->ctx = NULL;
 			ac->onDisconnect = NULL;
@@ -170,16 +176,52 @@ rspamd_redis_pool_elt_dtor (gpointer p)
 	g_free (elt);
 }
 
+static void
+rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+	struct rspamd_redis_pool_connection *conn =
+			(struct rspamd_redis_pool_connection *)priv;
+
+	msg_debug_rpool ("quit command reply for the connection %p, refcount: %d",
+			conn->ctx, conn->ref.refcount);
+	/*
+	 * We now schedule timer to enforce removal after callback is executed
+	 * to prevent races. But actually, the connection will likely be freed by
+	 * hiredis itself. It is quite brain damaged logic but it is better to
+	 * deal with it... Dtor will definitely stop this timer.
+	 */
+	conn->timeout.repeat = 0.1;
+	ev_timer_again (conn->elt->pool->event_loop, &conn->timeout);
+}
+
 static void
 rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents)
 {
 	struct rspamd_redis_pool_connection *conn =
 			(struct rspamd_redis_pool_connection *)w->data;
 
-	g_assert (!conn->active);
-	msg_debug_rpool ("scheduled removal of connection %p, refcount: %d",
-			conn->ctx, conn->ref.refcount);
-	REF_RELEASE (conn);
+	g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
+
+	if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) {
+		msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d",
+				conn->ctx, conn->ref.refcount);
+		redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT");
+		conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
+		ev_timer_again (EV_A_ w);
+
+		/* Prevent reusing */
+		if (conn->entry) {
+			g_queue_unlink (conn->elt->inactive, conn->entry);
+			conn->entry = NULL;
+		}
+	}
+	else {
+		/* Finalising by timeout */
+		msg_debug_rpool ("final removal of connection %p, refcount: %d",
+				conn->ctx, conn->ref.refcount);
+		REF_RELEASE (conn);
+	}
+
 }
 
 static void
@@ -205,7 +247,7 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
 	conn->timeout.data = conn;
 	ev_timer_init (&conn->timeout,
 			rspamd_redis_conn_timeout,
-			real_timeout, 0.0);
+			real_timeout, real_timeout / 2.0);
 	ev_timer_start (conn->elt->pool->event_loop, &conn->timeout);
 }
 
@@ -219,8 +261,7 @@ rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status,
 	 * Here, we know that redis itself will free this connection
 	 * so, we need to do something very clever about it
 	 */
-
-	if (!conn->active) {
+	if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) {
 		/* Do nothing for active connections as it is already handled somewhere */
 		if (conn->ctx) {
 			msg_debug_rpool ("inactive connection terminated: %s, refs: %d",
@@ -261,7 +302,7 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
 			conn = g_malloc0 (sizeof (*conn));
 			conn->entry = g_list_prepend (NULL, conn);
 			conn->elt = elt;
-			conn->active = TRUE;
+			conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
 
 			g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
 			g_queue_push_head_link (elt->active, conn->entry);
@@ -275,10 +316,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
 					conn);
 
 			if (password) {
-				redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+				redisAsyncCommand (ctx, NULL, NULL,
+						"AUTH %s", password);
 			}
 			if (db) {
-				redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+				redisAsyncCommand (ctx, NULL, NULL,
+						"SELECT %s", db);
 			}
 		}
 
@@ -307,8 +350,8 @@ rspamd_redis_pool_init (void)
 	struct rspamd_redis_pool *pool;
 
 	pool = g_malloc0 (sizeof (*pool));
-	pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal, NULL,
-			rspamd_redis_pool_elt_dtor);
+	pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal,
+			NULL, rspamd_redis_pool_elt_dtor);
 	pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
 
 	return pool;
@@ -349,11 +392,11 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
 		if (g_queue_get_length (elt->inactive) > 0) {
 			conn_entry = g_queue_pop_head_link (elt->inactive);
 			conn = conn_entry->data;
-			g_assert (!conn->active);
+			g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
 			if (conn->ctx->err == REDIS_OK) {
 				ev_timer_stop (elt->pool->event_loop, &conn->timeout);
-				conn->active = TRUE;
+				conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
 				g_queue_push_tail_link (elt->active, conn_entry);
 				msg_debug_rpool ("reused existing connection to %s:%d: %p",
 						ip, port, conn->ctx);
@@ -404,7 +447,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
 
 	conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
 	if (conn != NULL) {
-		g_assert (conn->active);
+		g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
 		if (ctx->err != REDIS_OK) {
 			/* We need to terminate connection forcefully */
@@ -418,7 +461,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
 					/* Just move it to the inactive queue */
 					g_queue_unlink (conn->elt->active, conn->entry);
 					g_queue_push_head_link (conn->elt->inactive, conn->entry);
-					conn->active = FALSE;
+					conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
 					rspamd_redis_pool_schedule_timeout (conn);
 					msg_debug_rpool ("mark connection %p inactive", conn->ctx);
 				}


More information about the Commits mailing list