commit 48036e4: [Feature] Initial support of subscribe command in lua_redis

Vsevolod Stakhov vsevolod at highsecure.ru
Tue May 19 20:07:10 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-05-19 21:00:16 +0100
URL: https://github.com/rspamd/rspamd/commit/48036e40210693fbe552016bfe0deb35796438af (HEAD -> master)

[Feature] Initial support of subscribe command in lua_redis

---
 src/lua/lua_redis.c | 78 ++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 54 insertions(+), 24 deletions(-)

diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index f9dbbdd13..492c63906 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -122,6 +122,7 @@ INIT_LOG_MODULE(lua_redis)
 #define LUA_REDIS_TEXTDATA (1 << 1)
 #define LUA_REDIS_TERMINATED (1 << 2)
 #define LUA_REDIS_NO_POOL (1 << 3)
+#define LUA_REDIS_SUBSCRIBED (1 << 4)
 #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
 
 struct lua_redis_request_specific_userdata {
@@ -263,7 +264,9 @@ lua_redis_fin (void *arg)
 	ctx = sp_ud->ctx;
 	ud = sp_ud->c;
 
-	ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
+	if (ev_can_stop (&sp_ud->timeout_ev)) {
+		ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
+	}
 
 	msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d",
 			sp_ud, ctx, ctx->ref.refcount);
@@ -383,7 +386,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
 	struct lua_callback_state cbs;
 	lua_State *L;
 
-	if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
+	if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) ||
+			(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
 		if (sp_ud->cbref != -1) {
 			lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
 			L = cbs.L;
@@ -409,17 +413,29 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
 			lua_thread_pool_restore_callback (&cbs);
 		}
 
+		if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
+			if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
+				if (ev_can_stop (&sp_ud->timeout_ev)) {
+					ev_timer_stop (sp_ud->ctx->async.event_loop,
+							&sp_ud->timeout_ev);
+				}
+			}
+		}
+
 		sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
 
-		if (ud->s) {
-			if (ud->item) {
-				rspamd_symcache_item_async_dec_check (ud->task, ud->item, M);
-			}
+		if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
+			if (ud->s) {
+				if (ud->item) {
+					rspamd_symcache_item_async_dec_check (ud->task,
+							ud->item, M);
+				}
 
-			rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
-		}
-		else {
-			lua_redis_fin (sp_ud);
+				rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
+			}
+			else {
+				lua_redis_fin (sp_ud);
+			}
 		}
 	}
 }
@@ -453,7 +469,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 	REDIS_RETAIN (ctx);
 
 	/* If session is finished, we cannot call lua callbacks */
-	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
+			(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
 		if (c->err == 0) {
 			if (r != NULL) {
 				if (reply->type != REDIS_REPLY_ERROR) {
@@ -477,20 +494,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 		}
 	}
 
-	ctx->cmds_pending --;
+	if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
+		ctx->cmds_pending--;
 
-	if (ctx->cmds_pending == 0 && !ud->terminated) {
-		/* Disconnect redis early as we don't need it anymore */
-		ud->terminated = 1;
-		ac = ud->ctx;
-		ud->ctx = NULL;
+		if (ctx->cmds_pending == 0 && !ud->terminated) {
+			/* Disconnect redis early as we don't need it anymore */
+			ud->terminated = 1;
+			ac = ud->ctx;
+			ud->ctx = NULL;
 
-		if (ac) {
-			msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
-					ud, ctx, ctx->ref.refcount);
-			rspamd_redis_pool_release_connection (ud->pool, ac,
-					(ctx->flags & LUA_REDIS_NO_POOL) ?
-					RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
+			if (ac) {
+				msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
+						ud, ctx, ctx->ref.refcount);
+				rspamd_redis_pool_release_connection (ud->pool, ac,
+						(ctx->flags & LUA_REDIS_NO_POOL) ?
+						RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
+			}
 		}
 	}
 
@@ -586,7 +605,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
 		return;
 	}
 
-	ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
+	if (ev_can_stop ( &sp_ud->timeout_ev)) {
+		ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
+	}
+
 	msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
 
 	struct lua_redis_result *result = g_malloc0 (sizeof *result);
@@ -617,6 +639,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
 			lua_pushstring (L, ac->errstr);
 		}
 	}
+
 	/* if error happened, we should terminate the connection,
 	   and release it */
 
@@ -1125,10 +1148,17 @@ lua_redis_make_request (lua_State *L)
 
 			REDIS_RETAIN (ctx); /* Cleared by fin event */
 			ctx->cmds_pending ++;
+
+			if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
+				msg_debug_lua_redis ("subscribe command, never unref/timeout");
+				sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
+			}
+
 			sp_ud->timeout_ev.data = sp_ud;
 			ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop);
 			ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
 			ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
+
 			ret = TRUE;
 		}
 		else {


More information about the Commits mailing list