commit 5054b23: [Rework] Further rework of the redis pool

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Sep 11 19:56:05 UTC 2021


Author: Vsevolod Stakhov
Date: 2021-09-11 20:53:29 +0100
URL: https://github.com/rspamd/rspamd/commit/5054b23fcce97c2115ecdba0c13061635172e061 (HEAD -> master)

[Rework] Further rework of the redis pool

---
 src/libserver/redis_pool.cxx | 242 ++++++++++++++++++++++++-------------------
 1 file changed, 136 insertions(+), 106 deletions(-)

diff --git a/src/libserver/redis_pool.cxx b/src/libserver/redis_pool.cxx
index a059ea6fe..9053cc0e7 100644
--- a/src/libserver/redis_pool.cxx
+++ b/src/libserver/redis_pool.cxx
@@ -29,8 +29,8 @@
 #include "libutil/cxx/local_shared_ptr.hxx"
 
 namespace rspamd {
-struct redis_pool_elt;
-struct redis_pool;
+class redis_pool_elt;
+class redis_pool;
 
 #define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
         "redis_pool", conn->tag, \
@@ -61,31 +61,34 @@ struct redis_pool_connection {
 	using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
 	using conn_iter_t = std::list<redis_pool_connection_ptr>::iterator;
 	struct redisAsyncContext *ctx;
-	struct redis_pool_elt *elt;
-	struct redis_pool *pool;
+	redis_pool_elt *elt;
+	redis_pool *pool;
 	conn_iter_t elt_pos;
 	ev_timer timeout;
 	enum rspamd_redis_pool_connection_state state;
 	gchar tag[MEMPOOL_UID_LEN];
 
-	auto schedule_timeout () -> void;
+	auto schedule_timeout() -> void;
+
 	~redis_pool_connection();
 
-	explicit redis_pool_connection(struct redis_pool *_pool,
-									struct redis_pool_elt *_elt,
-									const char *db,
-									const char *password,
+	explicit redis_pool_connection(redis_pool *_pool,
+								   redis_pool_elt *_elt,
+								   const char *db,
+								   const char *password,
 								   struct redisAsyncContext *_ctx);
 
 private:
 	static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void;
+
 	static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void;
+
 	static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto;
 };
 
 
 using redis_pool_key_t = std::uint64_t;
-struct redis_pool;
+class redis_pool;
 
 class redis_pool_elt {
 	using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
@@ -106,15 +109,31 @@ public:
 	explicit redis_pool_elt(redis_pool *_pool,
 							const gchar *_db, const gchar *_password,
 							const char *_ip, int _port)
-			: pool(_pool), ip(_ip), db(_db), port(_port), password(_password),
-			key(redis_pool_elt::make_key(_db, _password, _ip, _port))
+			: pool(_pool), ip(_ip), db(_db), password(_password), port(_port),
+			  key(redis_pool_elt::make_key(_db, _password, _ip, _port))
 	{
 		is_unix = ip[0] == '.' || ip[0] == '/';
 	}
 
 	auto new_connection() -> redisAsyncContext *;
+
+	auto release_active(const redis_pool_connection *conn) -> void
+	{
+		active.erase(conn->elt_pos);
+	}
+
+	auto release_inactive(const redis_pool_connection *conn) -> void
+	{
+		inactive.erase(conn->elt_pos);
+	}
+
+	auto move_to_inactive(const redis_pool_connection *conn) -> void
+	{
+		inactive.splice(std::end(inactive), active, conn->elt_pos);
+	}
+
 	inline static auto make_key(const gchar *db, const gchar *password,
-					   const char *ip, int port) -> redis_pool_key_t
+								const char *ip, int port) -> redis_pool_key_t
 	{
 		rspamd_cryptobox_fast_hash_state_t st;
 
@@ -132,8 +151,14 @@ public:
 
 		return rspamd_cryptobox_fast_hash_final(&st);
 	}
+
+	auto num_active() const -> auto
+	{
+		return active.size();
+	}
+
 private:
-	auto redis_async_new() -> redisAsyncContext*
+	auto redis_async_new() -> redisAsyncContext *
 	{
 		struct redisAsyncContext *ctx;
 
@@ -164,30 +189,40 @@ class redis_pool {
 	robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key;
 	robin_hood::unordered_flat_map<redisAsyncContext *,
 			redis_pool_connection *> conns_by_ctx;
+public:
 	double timeout = default_timeout;
 	unsigned max_conns = default_max_conns;
-public:
 	struct ev_loop *event_loop;
 	struct rspamd_config *cfg;
 
 public:
-	explicit redis_pool() : event_loop(nullptr), cfg(nullptr) {
+	explicit redis_pool() : event_loop(nullptr), cfg(nullptr)
+	{
 		conns_by_ctx.reserve(max_conns);
 	}
 
 	/* Legacy stuff */
-	auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void {
+	auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void
+	{
 		event_loop = _loop;
 		cfg = _cfg;
 	}
 
 	auto new_connection(const gchar *db, const gchar *password,
 						const char *ip, int port) -> redisAsyncContext *;
-	auto release_connection(redisAsyncContext *ctx) -> void;
 
-	auto unregister_context(redisAsyncContext *ctx) -> void {
+	auto release_connection(redisAsyncContext *ctx,
+							enum rspamd_redis_pool_release_type how) -> void;
+
+	auto unregister_context(redisAsyncContext *ctx) -> void
+	{
 		conns_by_ctx.erase(ctx);
 	}
+
+	auto register_context(redisAsyncContext *ctx, redis_pool_connection *conn)
+	{
+		conns_by_ctx.emplace(ctx, conn);
+	}
 };
 
 
@@ -230,7 +265,7 @@ auto
 redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void
 {
 	struct redis_pool_connection *conn =
-			(struct redis_pool_connection *)priv;
+			(struct redis_pool_connection *) priv;
 
 	msg_debug_rpool("quit command reply for the connection %p",
 			conn->ctx);
@@ -255,7 +290,7 @@ redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv)
 auto
 redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void
 {
-	auto *conn = (struct redis_pool_connection *)w->data;
+	auto *conn = (struct redis_pool_connection *) w->data;
 
 	g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
@@ -273,7 +308,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) ->
 				conn->ctx);
 
 		/* Erasure of shared pointer will cause it to be removed */
-		conn->elt->inactive.erase(conn->elt_pos);
+		conn->elt->release_inactive(conn);
 	}
 
 }
@@ -281,7 +316,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) ->
 auto
 redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto
 {
-	auto *conn = (struct redis_pool_connection *)ac->data;
+	auto *conn = (struct redis_pool_connection *) ac->data;
 
 	/*
 	 * Here, we know that redis itself will free this connection
@@ -295,7 +330,7 @@ redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, i
 		}
 
 		/* Erasure of shared pointer will cause it to be removed */
-		conn->elt->inactive.erase(conn->elt_pos);
+		conn->elt->release_inactive(conn);
 	}
 }
 
@@ -304,15 +339,15 @@ redis_pool_connection::schedule_timeout() -> void
 {
 	const auto *conn = this; /* For debug */
 	double real_timeout;
-	auto active_elts = elt->active.size();
+	auto active_elts = elt->num_active();
 
 	if (active_elts > pool->max_conns) {
 		real_timeout = pool->timeout / 2.0;
-		real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
+		real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 4.0);
 	}
 	else {
 		real_timeout = pool->timeout;
-		real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
+		real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 2.0);
 	}
 
 	msg_debug_rpool("scheduled connection %p cleanup in %.1f seconds",
@@ -326,8 +361,8 @@ redis_pool_connection::schedule_timeout() -> void
 }
 
 
-redis_pool_connection::redis_pool_connection(struct redis_pool *_pool,
-											 struct redis_pool_elt *_elt,
+redis_pool_connection::redis_pool_connection(redis_pool *_pool,
+											 redis_pool_elt *_elt,
 											 const char *db,
 											 const char *password,
 											 struct redisAsyncContext *_ctx)
@@ -336,9 +371,9 @@ redis_pool_connection::redis_pool_connection(struct redis_pool *_pool,
 
 	state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
 
-	pool->conns_by_ctx.emplace(ctx, this);
+	pool->register_context(ctx, this);
 	ctx->data = this;
-	rspamd_random_hex((guchar *)tag, sizeof(tag));
+	rspamd_random_hex((guchar *) tag, sizeof(tag));
 
 	redisLibevAttach(pool->event_loop, ctx);
 	redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
@@ -384,14 +419,16 @@ redis_pool_elt::new_connection() -> redisAsyncContext *
 				conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
 				msg_debug_rpool("reused existing connection to %s:%d: %p",
 						ip.c_str(), port, conn->ctx);
-				active.emplace_back(std::move(conn));
+				active.emplace_front(std::move(conn));
+				active.front()->elt_pos = active.begin();
 			}
 		}
 		else {
 			auto *nctx = redis_async_new();
 			if (nctx) {
-				active.emplace_back(std::make_unique<redis_pool_connection>(pool, this,
+				active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
 						db.c_str(), password.c_str(), nctx));
+				active.front()->elt_pos = active.begin();
 			}
 
 			return nctx;
@@ -400,17 +437,20 @@ redis_pool_elt::new_connection() -> redisAsyncContext *
 	else {
 		auto *nctx = redis_async_new();
 		if (nctx) {
-			active.emplace_back(std::make_unique<redis_pool_connection>(pool, this,
+			active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
 					db.c_str(), password.c_str(), nctx));
+			active.front()->elt_pos = active.begin();
 		}
 
 		return nctx;
 	}
+
+	RSPAMD_UNREACHABLE;
 }
 
 auto
 redis_pool::new_connection(const gchar *db, const gchar *password,
-					const char *ip, int port) -> redisAsyncContext *
+						   const char *ip, int port) -> redisAsyncContext *
 {
 
 	auto key = redis_pool_elt::make_key(db, password, ip, port);
@@ -430,120 +470,110 @@ redis_pool::new_connection(const gchar *db, const gchar *password,
 	}
 }
 
-}
-
-void *
-rspamd_redis_pool_init (void)
-{
-	return new rspamd::redis_pool{};
-}
-
-void
-rspamd_redis_pool_config (void *p,
-		struct rspamd_config *cfg,
-		struct ev_loop *ev_base)
-{
-	g_assert (p != NULL);
-	auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p);
-
-	pool->do_config(ev_base, cfg);
-}
-
-
-struct redisAsyncContext*
-rspamd_redis_pool_connect (void *p,
-		const gchar *db, const gchar *password,
-		const char *ip, int port)
-{
-	g_assert (p != NULL);
-	auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p);
-
-	return pool->new_connection(db, password, ip, port);
-}
-
-
-void
-rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
-		struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+auto redis_pool::release_connection(redisAsyncContext *ctx,
+									enum rspamd_redis_pool_release_type how) -> void
 {
-	struct rspamd_redis_pool_connection *conn;
-
-	g_assert (pool != NULL);
-	g_assert (ctx != NULL);
-
-	conn = (struct rspamd_redis_pool_connection *)g_hash_table_lookup (pool->elts_by_ctx, ctx);
-	if (conn != NULL) {
+	auto conn_it = conns_by_ctx.find(ctx);
+	if (conn_it != conns_by_ctx.end()) {
+		auto *conn = conn_it->second;
 		g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
 		if (ctx->err != REDIS_OK) {
 			/* We need to terminate connection forcefully */
 			msg_debug_rpool ("closed connection %p due to an error", conn->ctx);
-			REF_RELEASE (conn);
+			conn->elt->release_active(conn);
 		}
 		else {
 			if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
 				/* Ensure that there are no callbacks attached to this conn */
-				if (ctx->replies.head == NULL) {
+				if (ctx->replies.head == nullptr) {
 					/* Just move it to the inactive queue */
-					g_queue_unlink (conn->elt->active, conn->entry);
-					g_queue_push_head_link (conn->elt->inactive, conn->entry);
 					conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
-					rspamd_redis_pool_schedule_timeout (conn);
-					msg_debug_rpool ("mark connection %p inactive", conn->ctx);
+					conn->elt->move_to_inactive(conn);
+					conn->schedule_timeout();
+					msg_debug_rpool("mark connection %p inactive", conn->ctx);
 				}
 				else {
-					msg_debug_rpool ("closed connection %p due to callbacks left",
+					msg_debug_rpool("closed connection %p due to callbacks left",
 							conn->ctx);
-					REF_RELEASE (conn);
+					conn->elt->release_active(conn);
 				}
 			}
 			else {
 				if (how == RSPAMD_REDIS_RELEASE_FATAL) {
-					msg_debug_rpool ("closed connection %p due to an fatal termination",
+					msg_debug_rpool("closed connection %p due to an fatal termination",
 							conn->ctx);
 				}
 				else {
-					msg_debug_rpool ("closed connection %p due to explicit termination",
+					msg_debug_rpool("closed connection %p due to explicit termination",
 							conn->ctx);
 				}
 
-				REF_RELEASE (conn);
+				conn->elt->release_active(conn);
 			}
 		}
 
-		REF_RELEASE (conn);
+		conn->elt->release_active(conn);
 	}
 	else {
-		g_assert_not_reached ();
+		RSPAMD_UNREACHABLE;
 	}
 }
 
+}
+
+void *
+rspamd_redis_pool_init(void)
+{
+	return new rspamd::redis_pool{};
+}
 
 void
-rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
+rspamd_redis_pool_config(void *p,
+						 struct rspamd_config *cfg,
+						 struct ev_loop *ev_base)
 {
-	struct rspamd_redis_pool_elt *elt;
-	GHashTableIter it;
-	gpointer k, v;
+	g_assert (p != NULL);
+	auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
 
-	g_assert (pool != NULL);
+	pool->do_config(ev_base, cfg);
+}
 
-	g_hash_table_iter_init (&it, pool->elts_by_key);
 
-	while (g_hash_table_iter_next (&it, &k, &v)) {
-		elt = (struct rspamd_redis_pool_elt *)v;
-		rspamd_redis_pool_elt_dtor (elt);
-		g_hash_table_iter_steal (&it);
-	}
+struct redisAsyncContext *
+rspamd_redis_pool_connect(void *p,
+						  const gchar *db, const gchar *password,
+						  const char *ip, int port)
+{
+	g_assert (p != NULL);
+	auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
+
+	return pool->new_connection(db, password, ip, port);
+}
+
 
-	g_hash_table_unref (pool->elts_by_ctx);
-	g_hash_table_unref (pool->elts_by_key);
+void
+rspamd_redis_pool_release_connection(void *p,
+									 struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+{
+	g_assert (p != NULL);
+	g_assert (ctx != NULL);
+	auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
+
+	pool->release_connection(ctx, how);
+}
+
+
+void
+rspamd_redis_pool_destroy(void *p)
+{
+	auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
 
-	g_free (pool);
+	delete pool;
 }
 
-const gchar*
-rspamd_redis_type_to_string (int type)
+const gchar *
+rspamd_redis_type_to_string(int type)
 {
 	const gchar *ret = "unknown";
 


More information about the Commits mailing list