commit af204fe: [Project] Add message block support to the client
Vsevolod Stakhov
vsevolod at highsecure.ru
Thu Jul 18 15:14:10 UTC 2019
Author: Vsevolod Stakhov
Date: 2019-07-18 16:10:04 +0100
URL: https://github.com/rspamd/rspamd/commit/af204fee67890fd8e1043042f94ce2e9b9205bc6 (HEAD -> master)
[Project] Add message block support to the client
---
src/client/rspamc.c | 13 +++---
src/client/rspamdclient.c | 85 +++++++++++++++++++++++----------------
src/client/rspamdclient.h | 2 +
src/libserver/protocol.c | 2 +-
src/libserver/protocol_internal.h | 1 +
src/rspamd_proxy.c | 8 ++--
6 files changed, 67 insertions(+), 44 deletions(-)
diff --git a/src/client/rspamc.c b/src/client/rspamc.c
index 5b3f874ef..ea2fe0d4c 100644
--- a/src/client/rspamc.c
+++ b/src/client/rspamc.c
@@ -1522,6 +1522,7 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
struct rspamd_http_message *msg,
const gchar *name, ucl_object_t *result, GString *input,
gpointer ud, gdouble start_time, gdouble send_time,
+ const gchar *body, gsize bodylen,
GError *err)
{
gchar *ucl_out;
@@ -1529,8 +1530,6 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
struct rspamc_command *cmd;
FILE *out = stdout;
gdouble finish = rspamd_get_ticks (FALSE), diff;
- const gchar *body;
- gsize body_len;
cmd = cbdata->cmd;
@@ -1603,11 +1602,15 @@ rspamc_client_cb (struct rspamd_client_connection *conn,
rspamd_fprintf (out, "%s\n", err->message);
if (json && msg != NULL) {
- body = rspamd_http_message_get_body (msg, &body_len);
+ const gchar *raw;
+ gsize rawlen;
- if (body) {
+ raw = rspamd_http_message_get_body (msg, &rawlen);
+
+ if (raw) {
/* We can also output the resulting json */
- rspamd_fprintf (out, "%*s\n", (gint)body_len, body);
+ rspamd_fprintf (out, "%*s\n", (gint)rawlen - bodylen,
+ raw);
}
}
}
diff --git a/src/client/rspamdclient.c b/src/client/rspamdclient.c
index c27ff9f0b..7b1bcb73e 100644
--- a/src/client/rspamdclient.c
+++ b/src/client/rspamdclient.c
@@ -17,6 +17,7 @@
#include "libutil/util.h"
#include "libutil/http_connection.h"
#include "libutil/http_private.h"
+#include "libserver/protocol_internal.h"
#include "unix-std.h"
#include "contrib/zstd/zstd.h"
#include "contrib/zstd/zdict.h"
@@ -96,7 +97,7 @@ rspamd_client_error_handler (struct rspamd_http_connection *conn, GError *err)
c = req->conn;
req->cb (c, NULL, c->server_name->str, NULL,
req->input, req->ud,
- c->start_time, c->send_time, err);
+ c->start_time, c->send_time, NULL, 0, err);
}
static gint
@@ -109,6 +110,9 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
struct ucl_parser *parser;
GError *err;
const rspamd_ftok_t *tok;
+ const gchar *start, *body = NULL;
+ guchar *out = NULL;
+ gsize len, bodylen = 0;
c = req->conn;
@@ -119,6 +123,7 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
rspamd_http_connection_read_message (c->http_conn,
c->req,
c->timeout);
+
return 0;
}
else {
@@ -127,13 +132,13 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
msg->code,
(gint)msg->status->len, msg->status->str);
req->cb (c, msg, c->server_name->str, NULL, req->input, req->ud,
- c->start_time, c->send_time, err);
+ c->start_time, c->send_time, body, bodylen, err);
g_error_free (err);
return 0;
}
- tok = rspamd_http_message_find_header (msg, "compression");
+ tok = rspamd_http_message_find_header (msg, COMPRESSION_HEADER);
if (tok) {
/* Need to uncompress */
@@ -146,7 +151,6 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
ZSTD_DStream *zstream;
ZSTD_inBuffer zin;
ZSTD_outBuffer zout;
- guchar *out;
gsize outlen, r;
zstream = ZSTD_createDStream ();
@@ -174,12 +178,11 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
ZSTD_getErrorName (r));
req->cb (c, msg, c->server_name->str, NULL,
req->input, req->ud, c->start_time,
- c->send_time, err);
+ c->send_time, body, bodylen, err);
g_error_free (err);
ZSTD_freeDStream (zstream);
- g_free (out);
- return 0;
+ goto end;
}
if (zout.pos == zout.size) {
@@ -191,50 +194,64 @@ rspamd_client_finish_handler (struct rspamd_http_connection *conn,
ZSTD_freeDStream (zstream);
- parser = ucl_parser_new (0);
- if (!ucl_parser_add_chunk (parser, zout.dst, zout.pos)) {
- err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
- ucl_parser_get_error (parser));
- ucl_parser_free (parser);
- req->cb (c, msg, c->server_name->str, NULL, req->input,
- req->ud, c->start_time, c->send_time, err);
- g_error_free (err);
- g_free (zout.dst);
-
- return 0;
- }
-
- g_free (zout.dst);
+ start = zout.dst;
+ len = zout.pos;
}
else {
err = g_error_new (RCLIENT_ERROR, 500,
"Invalid compression method");
req->cb (c, msg, c->server_name->str, NULL,
- req->input, req->ud, c->start_time, c->send_time, err);
+ req->input, req->ud, c->start_time, c->send_time,
+ body, bodylen, err);
g_error_free (err);
return 0;
}
}
else {
- parser = ucl_parser_new (0);
- if (!ucl_parser_add_chunk (parser, msg->body_buf.begin, msg->body_buf.len)) {
- err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
- ucl_parser_get_error (parser));
- ucl_parser_free (parser);
- req->cb (c, msg, c->server_name->str, NULL,
- req->input, req->ud, c->start_time, c->send_time, err);
- g_error_free (err);
+ start = msg->body_buf.begin;
+ len = msg->body_buf.len;
+ }
- return 0;
+ /* Deal with body */
+ tok = rspamd_http_message_find_header (msg, MESSAGE_OFFSET_HEADER);
+
+ if (tok) {
+ gulong value = 0;
+
+ if (rspamd_strtoul (tok->begin, tok->len, &value) &&
+ value < len) {
+ body = start + value;
+ bodylen = len - value;
+ len = value;
}
}
- req->cb (c, msg, c->server_name->str, ucl_parser_get_object (
- parser), req->input, req->ud, c->start_time, c->send_time, NULL);
+ parser = ucl_parser_new (0);
+ if (!ucl_parser_add_chunk (parser, start, len)) {
+ err = g_error_new (RCLIENT_ERROR, msg->code, "Cannot parse UCL: %s",
+ ucl_parser_get_error (parser));
+ ucl_parser_free (parser);
+ req->cb (c, msg, c->server_name->str, NULL,
+ req->input, req->ud,
+ c->start_time, c->send_time, body, bodylen, err);
+ g_error_free (err);
+
+ goto end;
+ }
+
+ req->cb (c, msg, c->server_name->str,
+ ucl_parser_get_object (parser),
+ req->input, req->ud,
+ c->start_time, c->send_time, body, bodylen, NULL);
ucl_parser_free (parser);
}
+end:
+ if (out) {
+ g_free (out);
+ }
+
return 0;
}
@@ -419,7 +436,7 @@ rspamd_client_command (struct rspamd_client_connection *conn,
}
if (compressed) {
- rspamd_http_message_add_header (req->msg, "Compression", "zstd");
+ rspamd_http_message_add_header (req->msg, COMPRESSION_HEADER, "zstd");
if (dict_id != 0) {
gchar dict_str[32];
diff --git a/src/client/rspamdclient.h b/src/client/rspamdclient.h
index dc274b7a8..b42abebda 100644
--- a/src/client/rspamdclient.h
+++ b/src/client/rspamdclient.h
@@ -49,6 +49,8 @@ typedef void (*rspamd_client_callback) (
gpointer ud,
gdouble start_time,
gdouble send_time,
+ const gchar *body,
+ gsize body_len,
GError *err);
struct rspamd_http_context;
diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c
index e8bbd979b..dd840284a 100644
--- a/src/libserver/protocol.c
+++ b/src/libserver/protocol.c
@@ -1661,7 +1661,7 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg,
compressed_reply->len = zout.pos;
rspamd_fstring_free (reply);
rspamd_http_message_set_body_from_fstring_steal (msg, compressed_reply);
- rspamd_http_message_add_header (msg, "Compression", "zstd");
+ rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd");
if (task->cfg->libs_ctx->out_dict &&
task->cfg->libs_ctx->out_dict->id != 0) {
diff --git a/src/libserver/protocol_internal.h b/src/libserver/protocol_internal.h
index d9616e03d..8cec5d80e 100644
--- a/src/libserver/protocol_internal.h
+++ b/src/libserver/protocol_internal.h
@@ -88,6 +88,7 @@ extern "C" {
#define CERT_ISSUER_HEADER "TLS-Cert-Issuer"
#define MAILER_HEADER "Mailer"
#define RAW_DATA_HEADER "Raw"
+#define COMPRESSION_HEADER "Compression"
#define MESSAGE_OFFSET_HEADER "Message-Offset"
#ifdef __cplusplus
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index cca9f792f..70e393ddd 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -1066,7 +1066,7 @@ proxy_request_compress (struct rspamd_http_message *msg)
flags = rspamd_http_message_get_flags (msg);
- if (!rspamd_http_message_find_header (msg, "Compression")) {
+ if (!rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) {
if ((flags & RSPAMD_HTTP_FLAG_SHMEM) ||
!(flags & RSPAMD_HTTP_FLAG_HAS_BODY)) {
/* Cannot compress shared or empty message */
@@ -1094,7 +1094,7 @@ proxy_request_compress (struct rspamd_http_message *msg)
ZSTD_freeCCtx (zctx);
rspamd_http_message_set_body_from_fstring_steal (msg, body);
- rspamd_http_message_add_header (msg, "Compression", "zstd");
+ rspamd_http_message_add_header (msg, COMPRESSION_HEADER, "zstd");
}
}
@@ -1108,7 +1108,7 @@ proxy_request_decompress (struct rspamd_http_message *msg)
ZSTD_inBuffer zin;
ZSTD_outBuffer zout;
- if (rspamd_http_message_find_header (msg, "Compression")) {
+ if (rspamd_http_message_find_header (msg, COMPRESSION_HEADER)) {
in = rspamd_http_message_get_body (msg, &inlen);
if (in == NULL || inlen == 0) {
@@ -1154,7 +1154,7 @@ proxy_request_decompress (struct rspamd_http_message *msg)
body->len = zout.pos;
ZSTD_freeDStream (zstream);
rspamd_http_message_set_body_from_fstring_steal (msg, body);
- rspamd_http_message_remove_header (msg, "Compression");
+ rspamd_http_message_remove_header (msg, COMPRESSION_HEADER);
}
}
More information about the Commits
mailing list