commit af204fe: [Project] Add message block support to the client

Vsevolod Stakhov vsevolod at highsecure.ru
Thu Jul 18 15:14:10 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-07-18 16:10:04 +0100
URL: https://github.com/rspamd/rspamd/commit/af204fee67890fd8e1043042f94ce2e9b9205bc6 (HEAD -> master)

[Project] Add message block support to the client

---
 src/client/rspamc.c               | 13 +++---
 src/client/rspamdclient.c         | 85 +++++++++++++++++++++++----------------
 src/client/rspamdclient.h         |  2 +
 src/libserver/protocol.c          |  2 +-
 src/libserver/protocol_internal.h |  1 +
 src/rspamd_proxy.c                |  8 ++--
 6 files changed, 67 insertions(+), 44 deletions(-)

diff --git a/src/client/rspamc.c b/src/client/rspamc.c
index 5b3f874ef..ea2fe0d4c 100644
--- a/src/client/rspamc.c
+++ b/src/client/rspamc.c
@@ -1522,6 +1522,7 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
 		struct rspamd_http_message *msg,
 		const gchar *name, ucl_object_t *result, GString *input,
 		gpointer ud, gdouble start_time, gdouble send_time,
+		const gchar *body, gsize bodylen,
 		GError *err)
 {
 	gchar *ucl_out;
@@ -1529,8 +1530,6 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
 	struct rspamc_command *cmd;
 	FILE *out = stdout;
 	gdouble finish = rspamd_get_ticks (FALSE), diff;
-	const gchar *body;
-	gsize body_len;
 
 	cmd = cbdata->cmd;
 
@@ -1603,11 +1602,15 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
 				rspamd_fprintf (out, "%s\n", err->message);
 
 				if (json && msg != NULL) {
-					body = rspamd_http_message_get_body (msg, &body_len);
+					const gchar *raw;
+					gsize rawlen;
 
-					if (body) {
+					raw = rspamd_http_message_get_body (msg, &rawlen);
+
+					if (raw) {
 						/* We can also output the resulting json */
-						rspamd_fprintf (out, "%*s\n", (gint)body_len, body);
+						rspamd_fprintf (out, "%*s\n", (gint)rawlen - bodylen,
+								raw);
 					}
 				}
 			}
diff --git a/src/client/rspamdclient.c b/src/client/rspamdclient.c
index c27ff9f0b..7b1bcb73e 100644
--- a/src/client/rspamdclient.c
+++ b/src/client/rspamdclient.c
@@ -17,6 +17,7 @@
 #include "libutil/util.h"
 #include "libutil/http_connection.h"
 #include "libutil/http_private.h"
+#include "libserver/protocol_internal.h"
 #include "unix-std.h"
 #include "contrib/zstd/zstd.h"
 #include "contrib/zstd/zdict.h"
@@ -96,7 +97,7 @@ rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err)
 	c = req->conn;
 	req->cb (c, NULL, c->server_name->str, NULL,
 			req->input, req->ud,
-			c->start_time, c->send_time, err);
+			c->start_time, c->send_time, NULL, 0, err);
 }
 
 static gint
@@ -109,6 +110,9 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 	struct ucl_parser *parser;
 	GError *err;
 	const rspamd_ftok_t *tok;
+	const gchar *start, *body = NULL;
+	guchar *out = NULL;
+	gsize len, bodylen = 0;
 
 	c = req->conn;
 
@@ -119,6 +123,7 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 		rspamd_http_connection_read_message (c->http_conn,
 			c->req,
 			c->timeout);
+
 		return 0;
 	}
 	else {
@@ -127,13 +132,13 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 					msg->code,
 					(gint)msg->status->len, msg->status->str);
 			req->cb (c, msg, c->server_name->str, NULL, req->input, req->ud,
-					c->start_time, c->send_time, err);
+					c->start_time, c->send_time, body, bodylen, err);
 			g_error_free (err);
 
 			return 0;
 		}
 
-		tok = rspamd_http_message_find_header (msg, "compression");
+		tok = rspamd_http_message_find_header (msg, COMPRESSION_HEADER);
 
 		if (tok) {
 			/* Need to uncompress */
@@ -146,7 +151,6 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 				ZSTD_DStream *zstream;
 				ZSTD_inBuffer zin;
 				ZSTD_outBuffer zout;
-				guchar *out;
 				gsize outlen, r;
 
 				zstream = ZSTD_createDStream ();
@@ -174,12 +178,11 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 								ZSTD_getErrorName (r));
 						req->cb (c, msg, c->server_name->str, NULL,
 								req->input, req->ud, c->start_time,
-								c->send_time, err);
+								c->send_time, body, bodylen, err);
 						g_error_free (err);
 						ZSTD_freeDStream (zstream);
-						g_free (out);
 
-						return 0;
+						goto end;
 					}
 
 					if (zout.pos == zout.size) {
@@ -191,50 +194,64 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
 
 				ZSTD_freeDStream (zstream);
 
-				parser = ucl_parser_new (0);
-				if (!ucl_parser_add_chunk (parser, zout.dst, zout.pos)) {
-					err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
-							ucl_parser_get_error (parser));
-					ucl_parser_free (parser);
-					req->cb (c, msg, c->server_name->str, NULL, req->input,
-							req->ud, c->start_time, c->send_time, err);
-					g_error_free (err);
-					g_free (zout.dst);
-
-					return 0;
-				}
-
-				g_free (zout.dst);
+				start = zout.dst;
+				len = zout.pos;
 			}
 			else {
 				err = g_error_new (RCLIENT_ERROR, 500,
 						"Invalid compression method");
 				req->cb (c, msg, c->server_name->str, NULL,
-						req->input, req->ud, c->start_time, c->send_time, err);
+						req->input, req->ud, c->start_time, c->send_time,
+						body, bodylen, err);
 				g_error_free (err);
 
 				return 0;
 			}
 		}
 		else {
-			parser = ucl_parser_new (0);
-			if (!ucl_parser_add_chunk (parser, msg->body_buf.begin, msg->body_buf.len)) {
-				err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
-						ucl_parser_get_error (parser));
-				ucl_parser_free (parser);
-				req->cb (c, msg, c->server_name->str, NULL,
-						req->input, req->ud, c->start_time, c->send_time, err);
-				g_error_free (err);
+			start = msg->body_buf.begin;
+			len = msg->body_buf.len;
+		}
 
-				return 0;
+		/* Deal with body */
+		tok = rspamd_http_message_find_header (msg, MESSAGE_OFFSET_HEADER);
+
+		if (tok) {
+			gulong value = 0;
+
+			if (rspamd_strtoul (tok->begin, tok->len, &value) &&
+					value < len) {
+				body = start + value;
+				bodylen = len - value;
+				len = value;
 			}
 		}
 
-		req->cb (c, msg, c->server_name->str, ucl_parser_get_object (
-				parser), req->input, req->ud, c->start_time, c->send_time, NULL);
+		parser = ucl_parser_new (0);
+		if (!ucl_parser_add_chunk (parser, start, len)) {
+			err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
+					ucl_parser_get_error (parser));
+			ucl_parser_free (parser);
+			req->cb (c, msg, c->server_name->str, NULL,
+					req->input, req->ud,
+					c->start_time, c->send_time, body, bodylen, err);
+			g_error_free (err);
+
+			goto end;
+		}
+
+		req->cb (c, msg, c->server_name->str,
+				ucl_parser_get_object (parser),
+				req->input, req->ud,
+				c->start_time, c->send_time, body, bodylen, NULL);
 		ucl_parser_free (parser);
 	}
 
+end:
+	if (out) {
+		g_free (out);
+	}
+
 	return 0;
 }
 
@@ -419,7 +436,7 @@ rspamd_client_command (struct rspamd_client_connection *conn,
 	}
 
 	if (compressed) {
-		rspamd_http_message_add_header (req->msg, "Compression", "zstd");
+		rspamd_http_message_add_header (req->msg, COMPRESSION_HEADER, "zstd");
 
 		if (dict_id != 0) {
 			gchar dict_str[32];
diff --git a/src/client/rspamdclient.h b/src/client/rspamdclient.h
index dc274b7a8..b42abebda 100644
--- a/src/client/rspamdclient.h
+++ b/src/client/rspamdclient.h
@@ -49,6 +49,8 @@ typedef void (*rspamd_client_callback) (
 		gpointer ud,
 		gdouble start_time,
 		gdouble send_time,
+		const gchar *body,
+		gsize body_len,
 		GError *err);
 
 struct rspamd_http_context;
diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c
index e8bbd979b..dd840284a 100644
--- a/src/libserver/protocol.c
+++ b/src/libserver/protocol.c
@@ -1661,7 +1661,7 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg,
 		compressed_reply->len = zout.pos;
 		rspamd_fstring_free (reply);
 		rspamd_http_message_set_body_from_fstring_steal (msg, compressed_reply);
-		rspamd_http_message_add_header (msg, "Compression", "zstd");
+		rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd");
 
 		if (task->cfg->libs_ctx->out_dict &&
 				task->cfg->libs_ctx->out_dict->id != 0) {
diff --git a/src/libserver/protocol_internal.h b/src/libserver/protocol_internal.h
index d9616e03d..8cec5d80e 100644
--- a/src/libserver/protocol_internal.h
+++ b/src/libserver/protocol_internal.h
@@ -88,6 +88,7 @@ extern "C" {
 #define CERT_ISSUER_HEADER "TLS-Cert-Issuer"
 #define MAILER_HEADER "Mailer"
 #define RAW_DATA_HEADER "Raw"
+#define COMPRESSION_HEADER "Compression"
 #define MESSAGE_OFFSET_HEADER "Message-Offset"
 
 #ifdef  __cplusplus
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index cca9f792f..70e393ddd 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -1066,7 +1066,7 @@ proxy_request_compress (struct rspamd_http_message *msg)
 
 	flags = rspamd_http_message_get_flags (msg);
 
-	if (!rspamd_http_message_find_header (msg, "Compression")) {
+	if (!rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) {
 		if ((flags & RSPAMD_HTTP_FLAG_SHMEM) ||
 				!(flags & RSPAMD_HTTP_FLAG_HAS_BODY)) {
 			/* Cannot compress shared or empty message */
@@ -1094,7 +1094,7 @@ proxy_request_compress (struct rspamd_http_message *msg)
 
 		ZSTD_freeCCtx (zctx);
 		rspamd_http_message_set_body_from_fstring_steal (msg, body);
-		rspamd_http_message_add_header (msg, "Compression", "zstd");
+		rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd");
 	}
 }
 
@@ -1108,7 +1108,7 @@ proxy_request_decompress (struct rspamd_http_message *msg)
 	ZSTD_inBuffer zin;
 	ZSTD_outBuffer zout;
 
-	if (rspamd_http_message_find_header (msg, "Compression")) {
+	if (rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) {
 		in = rspamd_http_message_get_body (msg, &inlen);
 
 		if (in == NULL || inlen == 0) {
@@ -1154,7 +1154,7 @@ proxy_request_decompress (struct rspamd_http_message *msg)
 		body->len = zout.pos;
 		ZSTD_freeDStream (zstream);
 		rspamd_http_message_set_body_from_fstring_steal (msg, body);
-		rspamd_http_message_remove_header (msg, "Compression");
+		rspamd_http_message_remove_header (msg, COMPRESSION_HEADER);
 	}
 }
 


More information about the Commits mailing list