commit 44aa58d: [Feature] Support input vectorisation by recvmmsg call
Vsevolod Stakhov
vsevolod at highsecure.ru
Tue Jun 2 11:49:08 UTC 2020
Author: Vsevolod Stakhov
Date: 2020-06-02 12:44:21 +0100
URL: https://github.com/rspamd/rspamd/commit/44aa58ddceed5d33c0395d42335ce71e1c865e96 (HEAD -> master)
[Feature] Support input vectorisation by recvmmsg call
---
src/fuzzy_storage.c | 122 ++++++++++++++++++++++++++++++++++++----------------
1 file changed, 85 insertions(+), 37 deletions(-)
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 5eb403ce7..c25a38f39 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -1328,6 +1328,13 @@ fuzzy_session_destroy (gpointer d)
g_free (session);
}
+#define FUZZY_INPUT_BUFLEN 1024
+#ifdef HAVE_RECVMMSG
+#define MSGVEC_LEN 16
+#else
+#define MSGVEC_LEN 1
+#endif
+
/*
* Accept new connection and construct task
*/
@@ -1337,19 +1344,42 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct fuzzy_session *session;
rspamd_inet_addr_t *addr;
- gssize r;
- guint8 buf[512];
+ gssize r, msg_len;
guint64 *nerrors;
+ struct iovec iovs[MSGVEC_LEN];
+ guint8 bufs[MSGVEC_LEN][FUZZY_INPUT_BUFLEN];
+ struct sockaddr_storage peer_sa[MSGVEC_LEN];
+ socklen_t salen = sizeof (peer_sa[0]);
+#ifdef HAVE_RECVMMSG
+#define MSG_FIELD(msg, field) msg.msg_hdr.field
+ struct mmsghdr msg[MSGVEC_LEN];
+#else
+#define MSG_FIELD(msg, field) msg.field
+ struct msghdr msg[MSGVEC_LEN];
+#endif
+
+ memset (msg, 0, sizeof (*msg) * MSGVEC_LEN);
+
+ /* Prepare messages to receive */
+ for (int i = 0; i < MSGVEC_LEN; i ++) {
+ /* Prepare msghdr structs */
+ iovs[i].iov_base = bufs[i];
+ iovs[i].iov_len = sizeof (bufs[i]);
+ MSG_FIELD(msg[i], msg_name) = (void *)&peer_sa[i];
+ MSG_FIELD(msg[i], msg_namelen) = salen;
+ MSG_FIELD(msg[i], msg_iov) = &iovs[i];
+ MSG_FIELD(msg[i], msg_iovlen) = 1;
+ }
/* Got some data */
if (revents == EV_READ) {
for (;;) {
- r = rspamd_inet_address_recvfrom (w->fd,
- buf,
- sizeof (buf),
- 0,
- &addr);
+#ifdef HAVE_RECVMMSG
+ r = recvmmsg (w->fd, msg, MSGVEC_LEN, 0, NULL);
+#else
+ r = recvmsg (w->fd, msg, 0);
+#endif
if (r == -1) {
if (errno == EINTR) {
@@ -1366,40 +1396,58 @@ accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
return;
}
- session = g_malloc0 (sizeof (*session));
- REF_INIT_RETAIN (session, fuzzy_session_destroy);
- session->worker = worker;
- session->fd = w->fd;
- session->ctx = worker->ctx;
- session->time = (guint64) time (NULL);
- session->addr = addr;
- worker->nconns++;
-
- if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
- /* Check shingles count sanity */
- rspamd_fuzzy_process_command (session);
- }
- else {
- /* Discard input */
- session->ctx->stat.invalid_requests ++;
- msg_debug ("invalid fuzzy command of size %z received", r);
-
- nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
- addr, -1);
-
- if (nerrors == NULL) {
- nerrors = g_malloc (sizeof (*nerrors));
- *nerrors = 1;
- rspamd_lru_hash_insert (session->ctx->errors_ips,
- rspamd_inet_address_copy (addr),
- nerrors, -1, -1);
+#ifndef HAVE_RECVMMSG
+ msg_len = r; /* Save real length in bytes here */
+ r = 1; /* Assume that we have received a single message */
+#endif
+
+ for (int i = 0; i < r; i ++) {
+ session = g_malloc0 (sizeof (*session));
+ REF_INIT_RETAIN (session, fuzzy_session_destroy);
+ session->worker = worker;
+ session->fd = w->fd;
+ session->ctx = worker->ctx;
+ session->time = (guint64) time (NULL);
+ session->addr = rspamd_inet_address_from_sa (MSG_FIELD(msg[i], msg_name),
+ MSG_FIELD(msg[i], msg_namelen));
+
+ /* Each message can have its length in case of recvmmsg */
+#ifdef HAVE_RECVMMSG
+ msg_len = msg[i].msg_len;
+#endif
+
+ if (rspamd_fuzzy_cmd_from_wire (iovs[i].iov_base,
+ msg_len, session)) {
+ /* Check shingles count sanity */
+ worker->nconns++;
+ rspamd_fuzzy_process_command (session);
}
else {
- *nerrors = *nerrors + 1;
+ /* Discard input */
+ session->ctx->stat.invalid_requests ++;
+ msg_debug ("invalid fuzzy command of size %z received", r);
+
+ nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
+ addr, -1);
+
+ if (nerrors == NULL) {
+ nerrors = g_malloc (sizeof (*nerrors));
+ *nerrors = 1;
+ rspamd_lru_hash_insert (session->ctx->errors_ips,
+ rspamd_inet_address_copy (addr),
+ nerrors, -1, -1);
+ }
+ else {
+ *nerrors = *nerrors + 1;
+ }
}
- }
- REF_RELEASE (session);
+ REF_RELEASE (session);
+ }
+#ifdef HAVE_RECVMMSG
+ /* Stop reading as we are using recvmmsg instead of recvmsg */
+ break;
+#endif
}
}
}
More information about the Commits
mailing list