commit e463ad5: [Project] Rework HTTP IO

Vsevolod Stakhov vsevolod at highsecure.ru
Sat Jun 22 12:14:14 UTC 2019


Author: Vsevolod Stakhov
Date: 2019-06-16 18:32:55 +0100
URL: https://github.com/rspamd/rspamd/commit/e463ad556cb35ee39b92dbf7d3934d4187ab70d2

[Project] Rework HTTP IO

---
 src/libutil/CMakeLists.txt    |   1 -
 src/libutil/aio_event.c       | 508 ------------------------------------------
 src/libutil/aio_event.h       |  59 -----
 src/libutil/http_connection.c |  94 +++-----
 src/libutil/http_connection.h |   8 +-
 src/libutil/http_context.c    |   6 +-
 src/libutil/http_private.h    |   4 +-
 src/libutil/http_router.c     |  46 ++--
 src/libutil/http_router.h     |   7 +-
 src/libutil/libev_helper.c    |  16 +-
 src/libutil/map_private.h     |   2 +-
 src/libutil/ssl_util.c        |  71 ++----
 src/libutil/ssl_util.h        |   3 +-
 src/plugins/fuzzy_check.c     |   4 +-
 src/rspamd.h                  |   2 +-
 15 files changed, 98 insertions(+), 733 deletions(-)

diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt
index fd29c5512..5a94a732c 100644
--- a/src/libutil/CMakeLists.txt
+++ b/src/libutil/CMakeLists.txt
@@ -2,7 +2,6 @@
 SET(LIBRSPAMDUTILSRC
 				${CMAKE_CURRENT_SOURCE_DIR}/addr.c
 				${CMAKE_CURRENT_SOURCE_DIR}/libev_helper.c
-				${CMAKE_CURRENT_SOURCE_DIR}/aio_event.c
 				${CMAKE_CURRENT_SOURCE_DIR}/bloom.c
 				${CMAKE_CURRENT_SOURCE_DIR}/expression.c
 				${CMAKE_CURRENT_SOURCE_DIR}/fstring.c
diff --git a/src/libutil/aio_event.c b/src/libutil/aio_event.c
deleted file mode 100644
index d0c8d3f63..000000000
--- a/src/libutil/aio_event.c
+++ /dev/null
@@ -1,508 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "contrib/libev/ev.h"
-#include "aio_event.h"
-#include "rspamd.h"
-#include "unix-std.h"
-
-#ifdef HAVE_SYS_EVENTFD_H
-#include <sys/eventfd.h>
-#endif
-
-#ifdef HAVE_AIO_H
-#include <aio.h>
-#endif
-
-/* Linux syscall numbers */
-#if defined(__i386__)
-# define SYS_io_setup      245
-# define SYS_io_destroy    246
-# define SYS_io_getevents  247
-# define SYS_io_submit     248
-# define SYS_io_cancel     249
-#elif defined(__x86_64__)
-# define SYS_io_setup       206
-# define SYS_io_destroy     207
-# define SYS_io_getevents   208
-# define SYS_io_submit      209
-# define SYS_io_cancel      210
-#else
-# warning \
-	"aio is not supported on this platform, please contact author for details"
-# define SYS_io_setup       0
-# define SYS_io_destroy     0
-# define SYS_io_getevents   0
-# define SYS_io_submit      0
-# define SYS_io_cancel      0
-#endif
-
-#define SYS_eventfd       323
-#define MAX_AIO_EV        64
-
-struct io_cbdata {
-	gint fd;
-	rspamd_aio_cb cb;
-	guint64 len;
-	gpointer buf;
-	gpointer io_buf;
-	gpointer ud;
-};
-
-#ifdef LINUX
-
-/* Linux specific mappings and utilities to avoid using of libaio */
-
-typedef unsigned long aio_context_t;
-
-typedef enum io_iocb_cmd {
-	IO_CMD_PREAD = 0,
-	IO_CMD_PWRITE = 1,
-
-	IO_CMD_FSYNC = 2,
-	IO_CMD_FDSYNC = 3,
-
-	IO_CMD_POLL = 5,
-	IO_CMD_NOOP = 6,
-} io_iocb_cmd_t;
-
-#if defined(__LITTLE_ENDIAN)
-#define PADDED(x,y)     x, y
-#elif defined(__BIG_ENDIAN)
-#define PADDED(x,y)     y, x
-#else
-#error edit for your odd byteorder.
-#endif
-
-/*
- * we always use a 64bit off_t when communicating
- * with userland.  its up to libraries to do the
- * proper padding and aio_error abstraction
- */
-
-struct iocb {
-	/* these are internal to the kernel/libc. */
-	guint64 aio_data; /* data to be returned in event's data */
-	guint32 PADDED (aio_key, aio_reserved1);
-	/* the kernel sets aio_key to the req # */
-
-	/* common fields */
-	guint16 aio_lio_opcode; /* see IOCB_CMD_ above */
-	gint16 aio_reqprio;
-	guint32 aio_fildes;
-
-	guint64 aio_buf;
-	guint64 aio_nbytes;
-	gint64 aio_offset;
-
-	/* extra parameters */
-	guint64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
-
-	/* flags for the "struct iocb" */
-	guint32 aio_flags;
-
-	/*
-	 * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
-	 * eventfd to signal AIO readiness to
-	 */
-	guint32 aio_resfd;
-};
-
-struct io_event {
-	guint64 data;   /* the data field from the iocb */
-	guint64 obj;    /* what iocb this event came from */
-	gint64 res;     /* result code for this event */
-	gint64 res2;    /* secondary result */
-};
-
-/* Linux specific io calls */
-static int
-io_setup (guint nr_reqs, aio_context_t *ctx)
-{
-	return syscall (SYS_io_setup, nr_reqs, ctx);
-}
-
-static int
-io_destroy (aio_context_t ctx)
-{
-	return syscall (SYS_io_destroy, ctx);
-}
-
-static int
-io_getevents (aio_context_t ctx,
-	long min_nr,
-	long nr,
-	struct io_event *events,
-	struct timespec *tmo)
-{
-	return syscall (SYS_io_getevents, ctx, min_nr, nr, events, tmo);
-}
-
-static int
-io_submit (aio_context_t ctx, long n, struct iocb **paiocb)
-{
-	return syscall (SYS_io_submit, ctx, n, paiocb);
-}
-
-static int
-io_cancel (aio_context_t ctx, struct iocb *iocb, struct io_event *result)
-{
-	return syscall (SYS_io_cancel, ctx, iocb, result);
-}
-
-# ifndef HAVE_SYS_EVENTFD_H
-static int
-eventfd (guint initval, guint flags)
-{
-	return syscall (SYS_eventfd, initval);
-}
-# endif
-
-#endif
-
-/**
- * AIO context
- */
-struct aio_context {
-	struct ev_loop *base;
-	gboolean has_aio;       /**< Whether we have aio support on a system */
-#ifdef LINUX
-	/* Eventfd variant */
-	gint event_fd;
-	struct event eventfd_ev;
-	aio_context_t io_ctx;
-#elif defined(HAVE_AIO_H)
-	/* POSIX aio */
-	struct event rtsigs[128];
-#endif
-};
-
-#ifdef LINUX
-/* Eventfd read callback */
-static void
-rspamd_eventfdcb (gint fd, gshort what, gpointer ud)
-{
-	struct aio_context *ctx = ud;
-	guint64 ready;
-	gint done, i;
-	struct io_event event[32];
-	struct timespec ts;
-	struct io_cbdata *ev_data;
-
-	/* Eventfd returns number of events ready got from kernel */
-	if (read (fd, &ready, 8) != 8) {
-		if (errno == EAGAIN) {
-			return;
-		}
-		msg_err ("eventfd read returned error: %s", strerror (errno));
-	}
-
-	ts.tv_sec = 0;
-	ts.tv_nsec = 0;
-
-	while (ready) {
-		/* Get events ready */
-		done = io_getevents (ctx->io_ctx, 1, 32, event, &ts);
-
-		if (done > 0) {
-			ready -= done;
-
-			for (i = 0; i < done; i++) {
-				ev_data = (struct io_cbdata *) (uintptr_t) event[i].data;
-				/* Call this callback */
-				ev_data->cb (ev_data->fd,
-					event[i].res,
-					ev_data->len,
-					ev_data->buf,
-					ev_data->ud);
-				if (ev_data->io_buf) {
-					free (ev_data->io_buf);
-				}
-				g_free (ev_data);
-			}
-		}
-		else if (done == 0) {
-			/* No more events are ready */
-			return;
-		}
-		else {
-			msg_err ("io_getevents failed: %s", strerror (errno));
-			return;
-		}
-	}
-}
-
-#endif
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context *
-rspamd_aio_init (struct ev_loop *base)
-{
-	struct aio_context *new;
-
-	/* First of all we need to detect which type of aio we can try to use */
-	new = g_malloc0 (sizeof (struct aio_context));
-	new->base = base;
-
-#ifdef LINUX
-	/* On linux we are trying to use io (3) and eventfd for notifying */
-	new->event_fd = eventfd (0, 0);
-	if (new->event_fd == -1) {
-		msg_err ("eventfd failed: %s", strerror (errno));
-	}
-	else {
-		/* Set this socket non-blocking */
-		if (rspamd_socket_nonblocking (new->event_fd) == -1) {
-			msg_err ("non blocking for eventfd failed: %s", strerror (errno));
-			close (new->event_fd);
-		}
-		else {
-			event_set (&new->eventfd_ev,
-				new->event_fd,
-				EV_READ | EV_PERSIST,
-				rspamd_eventfdcb,
-				new);
-			event_base_set (new->base, &new->eventfd_ev);
-			event_add (&new->eventfd_ev, NULL);
-			if (io_setup (MAX_AIO_EV, &new->io_ctx) == -1) {
-				msg_err ("io_setup failed: %s", strerror (errno));
-				close (new->event_fd);
-			}
-			else {
-				new->has_aio = TRUE;
-			}
-		}
-	}
-#elif defined(HAVE_AIO_H)
-	/* TODO: implement this */
-#endif
-
-	return new;
-}
-
-/**
- * Open file for aio
- */
-gint
-rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags)
-{
-	gint fd = -1;
-	/* Fallback */
-	if (!ctx->has_aio) {
-		return open (path, flags);
-	}
-#ifdef LINUX
-
-	fd = open (path, flags | O_DIRECT);
-
-	return fd;
-#elif defined(HAVE_AIO_H)
-	fd = open (path, flags);
-#endif
-
-	return fd;
-}
-
-/**
- * Asynchronous read of file
- */
-gint
-rspamd_aio_read (gint fd,
-	gpointer buf,
-	guint64 len,
-	guint64 offset,
-	struct aio_context *ctx,
-	rspamd_aio_cb cb,
-	gpointer ud)
-{
-	gint r = -1;
-
-	if (ctx->has_aio) {
-#ifdef LINUX
-		struct iocb *iocb[1];
-		struct io_cbdata *cbdata;
-
-		cbdata = g_malloc0 (sizeof (struct io_cbdata));
-		cbdata->cb = cb;
-		cbdata->buf = buf;
-		cbdata->len = len;
-		cbdata->ud = ud;
-		cbdata->fd = fd;
-		cbdata->io_buf = NULL;
-
-		iocb[0] = alloca (sizeof (struct iocb));
-		memset (iocb[0], 0, sizeof (struct iocb));
-		iocb[0]->aio_fildes = fd;
-		iocb[0]->aio_lio_opcode = IO_CMD_PREAD;
-		iocb[0]->aio_reqprio = 0;
-		iocb[0]->aio_buf = (guint64)((uintptr_t)buf);
-		iocb[0]->aio_nbytes = len;
-		iocb[0]->aio_offset = offset;
-		iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
-		iocb[0]->aio_resfd = ctx->event_fd;
-		iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
-		/* Iocb is copied to kernel internally, so it is safe to put it on stack */
-		if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
-			return len;
-		}
-		else {
-			if (errno == EAGAIN || errno == ENOSYS) {
-				/* Fall back to sync read */
-				goto blocking;
-			}
-			return -1;
-		}
-
-#elif defined(HAVE_AIO_H)
-#endif
-	}
-	else {
-		/* Blocking variant */
-		goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
-		r = lseek64 (fd, offset, SEEK_SET);
-#else
-		r = lseek (fd, offset, SEEK_SET);
-#endif
-		if (r > 0) {
-			r = read (fd, buf, len);
-			if (r >= 0) {
-				cb (fd, 0, r, buf, ud);
-			}
-			else {
-				cb (fd, r, -1, buf, ud);
-			}
-		}
-	}
-
-	return r;
-}
-
-/**
- * Asynchronous write of file
- */
-gint
-rspamd_aio_write (gint fd,
-	gpointer buf,
-	guint64 len,
-	guint64 offset,
-	struct aio_context *ctx,
-	rspamd_aio_cb cb,
-	gpointer ud)
-{
-	gint r = -1;
-
-	if (ctx->has_aio) {
-#ifdef LINUX
-		struct iocb *iocb[1];
-		struct io_cbdata *cbdata;
-
-		cbdata = g_malloc0 (sizeof (struct io_cbdata));
-		cbdata->cb = cb;
-		cbdata->buf = buf;
-		cbdata->len = len;
-		cbdata->ud = ud;
-		cbdata->fd = fd;
-		/* We need to align pointer on boundary of 512 bytes here */
-		if (posix_memalign (&cbdata->io_buf, 512, len) != 0) {
-			return -1;
-		}
-		memcpy (cbdata->io_buf, buf, len);
-
-		iocb[0] = alloca (sizeof (struct iocb));
-		memset (iocb[0], 0, sizeof (struct iocb));
-		iocb[0]->aio_fildes = fd;
-		iocb[0]->aio_lio_opcode = IO_CMD_PWRITE;
-		iocb[0]->aio_reqprio = 0;
-		iocb[0]->aio_buf = (guint64)((uintptr_t)cbdata->io_buf);
-		iocb[0]->aio_nbytes = len;
-		iocb[0]->aio_offset = offset;
-		iocb[0]->aio_flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
-		iocb[0]->aio_resfd = ctx->event_fd;
-		iocb[0]->aio_data = (guint64)((uintptr_t)cbdata);
-
-		/* Iocb is copied to kernel internally, so it is safe to put it on stack */
-		if (io_submit (ctx->io_ctx, 1, iocb) == 1) {
-			return len;
-		}
-		else {
-			if (errno == EAGAIN || errno == ENOSYS) {
-				/* Fall back to sync read */
-				goto blocking;
-			}
-			return -1;
-		}
-
-#elif defined(HAVE_AIO_H)
-#endif
-	}
-	else {
-		/* Blocking variant */
-		goto blocking;
-blocking:
-#ifdef _LARGEFILE64_SOURCE
-		r = lseek64 (fd, offset, SEEK_SET);
-#else
-		r = lseek (fd, offset, SEEK_SET);
-#endif
-		if (r > 0) {
-			r = write (fd, buf, len);
-			if (r >= 0) {
-				cb (fd, 0, r, buf, ud);
-			}
-			else {
-				cb (fd, r, -1, buf, ud);
-			}
-		}
-	}
-
-	return r;
-}
-
-/**
- * Close of aio operations
- */
-gint
-rspamd_aio_close (gint fd, struct aio_context *ctx)
-{
-	gint r = -1;
-
-	if (ctx->has_aio) {
-#ifdef LINUX
-		struct iocb iocb;
-		struct io_event ev;
-
-		memset (&iocb, 0, sizeof (struct iocb));
-		iocb.aio_fildes = fd;
-		iocb.aio_lio_opcode = IO_CMD_NOOP;
-
-		/* Iocb is copied to kernel internally, so it is safe to put it on stack */
-		r = io_cancel (ctx->io_ctx, &iocb, &ev);
-		close (fd);
-		return r;
-
-#elif defined(HAVE_AIO_H)
-#endif
-	}
-
-	r = close (fd);
-
-	return r;
-}
diff --git a/src/libutil/aio_event.h b/src/libutil/aio_event.h
deleted file mode 100644
index ededd96d4..000000000
--- a/src/libutil/aio_event.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef AIO_EVENT_H_
-#define AIO_EVENT_H_
-
-#include "config.h"
-
-/**
- * AIO context
- */
-struct aio_context;
-
-/**
- * Callback for notifying
- */
-typedef void (*rspamd_aio_cb) (gint fd, gint res, guint64 len, gpointer data,
-	gpointer ud);
-
-/**
- * Initialize aio with specified event base
- */
-struct aio_context * rspamd_aio_init (struct ev_loop *base);
-
-/**
- * Open file for aio
- */
-gint rspamd_aio_open (struct aio_context *ctx, const gchar *path, int flags);
-
-/**
- * Asynchronous read of file
- */
-gint rspamd_aio_read (gint fd, gpointer buf, guint64 len, guint64 offset,
-	struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Asynchronous write of file
- */
-gint rspamd_aio_write (gint fd, gpointer buf, guint64 len, guint64 offset,
-	struct aio_context *ctx, rspamd_aio_cb cb, gpointer ud);
-
-/**
- * Close of aio operations
- */
-gint rspamd_aio_close (gint fd, struct aio_context *ctx);
-
-#endif /* AIO_EVENT_H_ */
diff --git a/src/libutil/http_connection.c b/src/libutil/http_connection.c
index aa964f417..9bf7456e7 100644
--- a/src/libutil/http_connection.c
+++ b/src/libutil/http_connection.c
@@ -25,6 +25,7 @@
 #include "ottery.h"
 #include "keypair_private.h"
 #include "cryptobox.h"
+#include "libutil/libev_helper.h"
 #include "libutil/ssl_util.h"
 #include "libserver/url.h"
 
@@ -67,9 +68,8 @@ struct rspamd_http_connection_private {
 	struct rspamd_http_header *header;
 	struct http_parser parser;
 	struct http_parser_settings parser_cb;
-	struct event ev;
-	struct timeval tv;
-	struct timeval *ptv;
+	struct rspamd_io_ev ev;
+	ev_tstamp timeout;
 	struct rspamd_http_message *msg;
 	struct iovec *out;
 	guint outlen;
@@ -348,9 +348,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
 
 	if (msg->method == HTTP_HEAD) {
 		/* We don't care about the rest */
-		if (rspamd_event_pending (&priv->ev, EV_READ)) {
-			event_del (&priv->ev);
-		}
+		rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
 
 		msg->code = parser->status_code;
 		rspamd_http_connection_ref (conn);
@@ -358,7 +356,7 @@ rspamd_http_on_headers_complete (http_parser * parser)
 
 		if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
 			rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
-					msg, conn->priv->ctx->ev_base);
+					msg, conn->priv->ctx->event_loop);
 			rspamd_http_connection_reset (conn);
 		}
 		else {
@@ -532,17 +530,14 @@ rspamd_http_on_headers_complete_decrypted (http_parser *parser)
 
 	if (msg->method == HTTP_HEAD) {
 		/* We don't care about the rest */
-		if (rspamd_event_pending (&priv->ev, EV_READ)) {
-			event_del (&priv->ev);
-		}
-
+		rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
 		msg->code = parser->status_code;
 		rspamd_http_connection_ref (conn);
 		ret = conn->finish_handler (conn, msg);
 
 		if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
 			rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
-					msg, conn->priv->ctx->ev_base);
+					msg, conn->priv->ctx->event_loop);
 			rspamd_http_connection_reset (conn);
 		}
 		else {
@@ -692,16 +687,13 @@ rspamd_http_on_message_complete (http_parser * parser)
 	}
 
 	if (ret == 0) {
-		if (rspamd_event_pending (&priv->ev, EV_READ)) {
-			event_del (&priv->ev);
-		}
-
+		rspamd_ev_watcher_stop (priv->ctx->event_loop, &priv->ev);
 		rspamd_http_connection_ref (conn);
 		ret = conn->finish_handler (conn, priv->msg);
 
 		if (conn->opts & RSPAMD_HTTP_CLIENT_KEEP_ALIVE) {
 			rspamd_http_context_push_keepalive (conn->priv->ctx, conn,
-					priv->msg, conn->priv->ctx->ev_base);
+					priv->msg, conn->priv->ctx->event_loop);
 			rspamd_http_connection_reset (conn);
 		}
 		else {
@@ -741,11 +733,11 @@ rspamd_http_simple_client_helper (struct rspamd_http_connection *conn)
 
*** OUTPUT TRUNCATED, 649 LINES SKIPPED ***


More information about the Commits mailing list