commit f2a26f2: [Minor] Try to fix sync Redis API by not resuming LUA_REDIS_SPECIFIC_FINISHED threads

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Apr 27 11:21:04 UTC 2021


Author: Vsevolod Stakhov
Date: 2021-04-27 12:15:34 +0100
URL: https://github.com/rspamd/rspamd/commit/f2a26f2235e35632ab26727670388e778ab40341 (HEAD -> master)

[Minor] Try to fix sync Redis API by not resuming LUA_REDIS_SPECIFIC_FINISHED threads

---
 src/lua/lua_redis.c       | 109 ++++++++++++++++++++++++++--------------------
 src/lua/lua_thread_pool.c |   6 +--
 src/lua/lua_thread_pool.h |  13 ++++++
 3 files changed, 78 insertions(+), 50 deletions(-)

diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 8e9e11dda..b624fc43b 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -612,75 +612,86 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
 		ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
 	}
 
-	msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
+	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+		msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
 
-	struct lua_redis_result *result = g_malloc0 (sizeof *result);
+		struct lua_redis_result *result = g_malloc0 (sizeof *result);
 
-	/* If session is finished, we cannot call lua callbacks */
-	if (ac->err == 0) {
-		if (r != NULL) {
-			if (reply->type != REDIS_REPLY_ERROR) {
-				result->is_error = FALSE;
-				lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+		if (ac->err == 0) {
+			if (r != NULL) {
+				if (reply->type != REDIS_REPLY_ERROR) {
+					result->is_error = FALSE;
+					lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+				}
+				else {
+					result->is_error = TRUE;
+					lua_pushstring (L, reply->str);
+				}
 			}
 			else {
 				result->is_error = TRUE;
-				lua_pushstring (L, reply->str);
+				lua_pushliteral (L, "received no data from server");
 			}
 		}
 		else {
 			result->is_error = TRUE;
-			lua_pushliteral (L, "received no data from server");
-		}
-	}
-	else {
-		result->is_error = TRUE;
-		if (ac->err == REDIS_ERR_IO) {
-			lua_pushstring (L, strerror (errno));
-		}
-		else {
-			lua_pushstring (L, ac->errstr);
+			if (ac->err == REDIS_ERR_IO) {
+				lua_pushstring (L, strerror (errno));
+			}
+			else {
+				lua_pushstring (L, ac->errstr);
+			}
 		}
-	}
 
-	/* if error happened, we should terminate the connection,
-	   and release it */
+		/* if error happened, we should terminate the connection,
+		   and release it */
 
-	if (result->is_error && sp_ud->c->ctx) {
-		ac = sp_ud->c->ctx;
-		/* Set to NULL to avoid double free in dtor */
-		sp_ud->c->ctx = NULL;
-		ctx->flags |= LUA_REDIS_TERMINATED;
+		if (result->is_error && sp_ud->c->ctx) {
+			ac = sp_ud->c->ctx;
+			/* Set to NULL to avoid double free in dtor */
+			sp_ud->c->ctx = NULL;
+			ctx->flags |= LUA_REDIS_TERMINATED;
 
-		/*
-		 * This will call all callbacks pending so the entire context
-		 * will be destructed
-		 */
-		rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
-				RSPAMD_REDIS_RELEASE_FATAL);
-	}
+			/*
+			 * This will call all callbacks pending so the entire context
+			 * will be destructed
+			 */
+			rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
+					RSPAMD_REDIS_RELEASE_FATAL);
+		}
+
+		result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+		result->s = ud->s;
+		result->item = ud->item;
+		result->task = ud->task;
+		result->sp_ud = sp_ud;
 
-	result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
-	result->s = ud->s;
-	result->item = ud->item;
-	result->task = ud->task;
-	result->sp_ud = sp_ud;
+		g_queue_push_tail (ctx->replies, result);
 
-	g_queue_push_tail (ctx->replies, result);
+	}
 
 	ctx->cmds_pending --;
 
 	if (ctx->cmds_pending == 0) {
 		if (ctx->thread) {
-			/* somebody yielded and waits for results */
-			thread = ctx->thread;
-			ctx->thread = NULL;
-
-			results = lua_redis_push_results (ctx, thread->lua_state);
-			lua_thread_resume (thread, results);
-			lua_redis_cleanup_events (ctx);
+			if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+				/* somebody yielded and waits for results */
+				thread = ctx->thread;
+				ctx->thread = NULL;
+
+				results = lua_redis_push_results(ctx, thread->lua_state);
+				lua_thread_resume (thread, results);
+				lua_redis_cleanup_events(ctx);
+			}
+			else {
+				/* We cannot resume the thread as the associated task has gone */
+				lua_thread_pool_terminate_entry (ud->cfg->lua_thread_pool,
+						ctx->thread);
+				ctx->thread = NULL;
+			}
 		}
 	}
+
 }
 
 static void
@@ -692,6 +703,10 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
 	struct lua_redis_userdata *ud;
 	redisAsyncContext *ac;
 
+	if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
+		return;
+	}
+
 	ud = sp_ud->c;
 	ctx = sp_ud->ctx;
 	msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
index 89a516a27..01c55b4a9 100644
--- a/src/lua/lua_thread_pool.c
+++ b/src/lua/lua_thread_pool.c
@@ -157,8 +157,8 @@ lua_thread_pool_return_full (struct lua_thread_pool *pool,
 	}
 }
 
-static void
-lua_thread_pool_terminate_entry (struct lua_thread_pool *pool,
+void
+lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool,
 		struct thread_entry *thread_entry, const gchar *loc)
 {
 	struct thread_entry *ent = NULL;
@@ -327,7 +327,7 @@ lua_resume_thread_internal_full (struct thread_entry *thread_entry,
 			 * Maybe there is a way to recover here.
 			 * For now, just remove faulty thread
 			 */
-			lua_thread_pool_terminate_entry (pool, thread_entry, loc);
+			lua_thread_pool_terminate_entry_full (pool, thread_entry, loc);
 		}
 	}
 }
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
index be954271d..66c8b991c 100644
--- a/src/lua/lua_thread_pool.h
+++ b/src/lua/lua_thread_pool.h
@@ -183,6 +183,19 @@ lua_thread_resume_full (struct thread_entry *thread_entry,
 #define lua_thread_resume(thread_entry, narg) \
     lua_thread_resume_full (thread_entry, narg, G_STRLOC)
 
+/**
+ * Terminates thread pool entry and fill the pool with another thread entry if needed
+ * @param pool
+ * @param thread_entry
+ * @param loc
+ */
+void
+lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool,
+								 struct thread_entry *thread_entry,
+								 const gchar *loc);
+#define lua_thread_pool_terminate_entry(pool, thread_entry) \
+    lua_thread_pool_terminate_entry_full (pool, thread_entry, G_STRLOC)
+
 #ifdef  __cplusplus
 }
 #endif


More information about the Commits mailing list