commit 44aa58d: [Feature] Support input vectorisation by recvmmsg call

Vsevolod Stakhov vsevolod at highsecure.ru
Tue Jun 2 11:49:08 UTC 2020


Author: Vsevolod Stakhov
Date: 2020-06-02 12:44:21 +0100
URL: https://github.com/rspamd/rspamd/commit/44aa58ddceed5d33c0395d42335ce71e1c865e96 (HEAD -> master)

[Feature] Support input vectorisation by recvmmsg call

---
 src/fuzzy_storage.c | 122 ++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 85 insertions(+), 37 deletions(-)

diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 5eb403ce7..c25a38f39 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -1328,6 +1328,13 @@ fuzzy_session_destroy (gpointer d)
 	g_free (session);
 }
 
+#define FUZZY_INPUT_BUFLEN 1024
+#ifdef HAVE_RECVMMSG
+#define MSGVEC_LEN 16
+#else
+#define MSGVEC_LEN 1
+#endif
+
 /*
  * Accept new connection and construct task
  */
@@ -1337,19 +1344,42 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
 	struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
 	struct fuzzy_session *session;
 	rspamd_inet_addr_t *addr;
-	gssize r;
-	guint8 buf[512];
+	gssize r, msg_len;
 	guint64 *nerrors;
+	struct iovec iovs[MSGVEC_LEN];
+	guint8 bufs[MSGVEC_LEN][FUZZY_INPUT_BUFLEN];
+	struct sockaddr_storage peer_sa[MSGVEC_LEN];
+	socklen_t salen = sizeof (peer_sa[0]);
+#ifdef HAVE_RECVMMSG
+#define MSG_FIELD(msg, field) msg.msg_hdr.field
+	struct mmsghdr msg[MSGVEC_LEN];
+#else
+#define MSG_FIELD(msg, field) msg.field
+	struct msghdr msg[MSGVEC_LEN];
+#endif
+
+	memset (msg, 0, sizeof (*msg) * MSGVEC_LEN);
+
+	/* Prepare messages to receive */
+	for (int i = 0; i < MSGVEC_LEN; i ++) {
+		/* Prepare msghdr structs */
+		iovs[i].iov_base = bufs[i];
+		iovs[i].iov_len = sizeof (bufs[i]);
+		MSG_FIELD(msg[i], msg_name) = (void *)&peer_sa[i];
+		MSG_FIELD(msg[i], msg_namelen) = salen;
+		MSG_FIELD(msg[i], msg_iov) = &iovs[i];
+		MSG_FIELD(msg[i], msg_iovlen) = 1;
+	}
 
 	/* Got some data */
 	if (revents == EV_READ) {
 
 		for (;;) {
-			r = rspamd_inet_address_recvfrom (w->fd,
-					buf,
-					sizeof (buf),
-					0,
-					&addr);
+#ifdef HAVE_RECVMMSG
+			r = recvmmsg (w->fd, msg, MSGVEC_LEN, 0, NULL);
+#else
+			r = recvmsg (w->fd, msg, 0);
+#endif
 
 			if (r == -1) {
 				if (errno == EINTR) {
@@ -1366,40 +1396,58 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
 				return;
 			}
 
-			session = g_malloc0 (sizeof (*session));
-			REF_INIT_RETAIN (session, fuzzy_session_destroy);
-			session->worker = worker;
-			session->fd = w->fd;
-			session->ctx = worker->ctx;
-			session->time = (guint64) time (NULL);
-			session->addr = addr;
-			worker->nconns++;
-
-			if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
-				/* Check shingles count sanity */
-				rspamd_fuzzy_process_command (session);
-			}
-			else {
-				/* Discard input */
-				session->ctx->stat.invalid_requests ++;
-				msg_debug ("invalid fuzzy command of size %z received", r);
-
-				nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
-						addr, -1);
-
-				if (nerrors == NULL) {
-					nerrors = g_malloc (sizeof (*nerrors));
-					*nerrors = 1;
-					rspamd_lru_hash_insert (session->ctx->errors_ips,
-							rspamd_inet_address_copy (addr),
-							nerrors, -1, -1);
+#ifndef HAVE_RECVMMSG
+			msg_len = r; /* Save real length in bytes here */
+			r = 1; /* Assume that we have received a single message */
+#endif
+
+			for (int i = 0; i < r; i ++) {
+				session = g_malloc0 (sizeof (*session));
+				REF_INIT_RETAIN (session, fuzzy_session_destroy);
+				session->worker = worker;
+				session->fd = w->fd;
+				session->ctx = worker->ctx;
+				session->time = (guint64) time (NULL);
+				session->addr = rspamd_inet_address_from_sa (MSG_FIELD(msg[i], msg_name),
+						MSG_FIELD(msg[i], msg_namelen));
+
+				/* Each message can have its length in case of recvmmsg */
+#ifdef HAVE_RECVMMSG
+				msg_len = msg[i].msg_len;
+#endif
+
+				if (rspamd_fuzzy_cmd_from_wire (iovs[i].iov_base,
+						msg_len, session)) {
+					/* Check shingles count sanity */
+					worker->nconns++;
+					rspamd_fuzzy_process_command (session);
 				}
 				else {
-					*nerrors = *nerrors + 1;
+					/* Discard input */
+					session->ctx->stat.invalid_requests ++;
+					msg_debug ("invalid fuzzy command of size %z received", r);
+
+					nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
+							addr, -1);
+
+					if (nerrors == NULL) {
+						nerrors = g_malloc (sizeof (*nerrors));
+						*nerrors = 1;
+						rspamd_lru_hash_insert (session->ctx->errors_ips,
+								rspamd_inet_address_copy (addr),
+								nerrors, -1, -1);
+					}
+					else {
+						*nerrors = *nerrors + 1;
+					}
 				}
-			}
 
-			REF_RELEASE (session);
+				REF_RELEASE (session);
+			}
+#ifdef HAVE_RECVMMSG
+			/* Stop reading as we are using recvmmsg instead of recvmsg */
+			break;
+#endif
 		}
 	}
 }


More information about the Commits mailing list