aboutsummaryrefslogtreecommitdiff
path: root/posix-aio-compat.c
diff options
context:
space:
mode:
authorChristoph Hellwig <hch@lst.de>2009-08-20 16:58:19 +0200
committerAnthony Liguori <aliguori@us.ibm.com>2009-08-27 20:30:22 -0500
commit9ef91a677110ec200d7b2904fc4bcae5a77329ad (patch)
treefb402ed4b586edc26b7f4e55b378736fd36111b1 /posix-aio-compat.c
parentbf0cb498c581b6be7bf4c31f500487600abf2db5 (diff)
raw-posix: refactor AIO support
Currently the raw-posix.c code contains a lot of knowledge about the asynchronous I/O scheme that is mostly implemented in posix-aio-compat.c. All this code does not really belong here and is getting a bit in the way of implementing native AIO on Linux. So instead move all the guts of the AIO implementation into posix-aio-compat.c (which might need a better name, btw). There's now a very small interface between the AIO providers and raw-posix.c: - an init routine is called from raw_open_common to return an AIO context for this drive. An AIO implementation may either re-use one context for all drives, or use a different one for each as the Linux native AIO support will do. - an submit routine is called from the aio_reav/writev methods to submit an AIO request There are no indirect calls involved in this interface as we need to decide which one to call manually. We will only call the Linux AIO native init function if we were requested to by vl.c, and we will only call the native submit function if we are asked to and the request is properly aligned. That's also the reason why the alignment check actually does the inverse move and now goes into raw-posix.c. The old posix-aio-compat.h headers is removed now that most of it's content is private to posix-aio-compat.c, and instead we add a new block/raw-posix-aio.h headers is created containing only the tiny interface between raw-posix.c and the AIO implementation. Signed-off-by: Christoph Hellwig <hch@lst.de> Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>
Diffstat (limited to 'posix-aio-compat.c')
-rw-r--r--posix-aio-compat.c325
1 files changed, 252 insertions, 73 deletions
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 3d79f59854..5ea197f668 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -12,17 +12,49 @@
*/
#include <sys/ioctl.h>
+#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
+#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
+
+#include "sys-queue.h"
#include "osdep.h"
#include "qemu-common.h"
+#include "block_int.h"
+
+#include "block/raw-posix-aio.h"
+
+
+struct qemu_paiocb {
+ BlockDriverAIOCB common;
+ int aio_fildes;
+ union {
+ struct iovec *aio_iov;
+ void *aio_ioctl_buf;
+ };
+ int aio_niov;
+ size_t aio_nbytes;
+#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
+ int ev_signo;
+ off_t aio_offset;
+
+ TAILQ_ENTRY(qemu_paiocb) node;
+ int aio_type;
+ ssize_t ret;
+ int active;
+ struct qemu_paiocb *next;
+};
+
+typedef struct PosixAioState {
+ int rfd, wfd;
+ struct qemu_paiocb *first_aio;
+} PosixAioState;
-#include "posix-aio-compat.h"
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
@@ -132,30 +164,13 @@ qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
#endif
-/*
- * Check if we need to copy the data in the aiocb into a new
- * properly aligned buffer.
- */
-static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
-{
- if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
- int i;
-
- for (i = 0; i < aiocb->aio_niov; i++)
- if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
- return 1;
- }
-
- return 0;
-}
-
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
{
size_t offset = 0;
ssize_t len;
do {
- if (aiocb->aio_type == QEMU_PAIO_WRITE)
+ if (aiocb->aio_type & QEMU_AIO_WRITE)
len = qemu_pwritev(aiocb->aio_fildes,
aiocb->aio_iov,
aiocb->aio_niov,
@@ -178,7 +193,7 @@ static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
size_t len;
while (offset < aiocb->aio_nbytes) {
- if (aiocb->aio_type == QEMU_PAIO_WRITE)
+ if (aiocb->aio_type & QEMU_AIO_WRITE)
len = pwrite(aiocb->aio_fildes,
(const char *)buf + offset,
aiocb->aio_nbytes - offset,
@@ -208,7 +223,7 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
size_t nbytes;
char *buf;
- if (!aiocb_needs_copy(aiocb)) {
+ if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
/*
* If there is just a single buffer, and it is properly aligned
* we can just use plain pread/pwrite without any problems.
@@ -243,7 +258,7 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
* a single aligned buffer.
*/
buf = qemu_memalign(512, aiocb->aio_nbytes);
- if (aiocb->aio_type == QEMU_PAIO_WRITE) {
+ if (aiocb->aio_type & QEMU_AIO_WRITE) {
char *p = buf;
int i;
@@ -254,7 +269,7 @@ static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
}
nbytes = handle_aiocb_rw_linear(aiocb, buf);
- if (aiocb->aio_type != QEMU_PAIO_WRITE) {
+ if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
char *p = buf;
size_t count = aiocb->aio_nbytes, copy;
int i;
@@ -310,12 +325,12 @@ static void *aio_thread(void *unused)
idle_threads--;
mutex_unlock(&lock);
- switch (aiocb->aio_type) {
- case QEMU_PAIO_READ:
- case QEMU_PAIO_WRITE:
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
ret = handle_aiocb_rw(aiocb);
break;
- case QEMU_PAIO_IOCTL:
+ case QEMU_AIO_IOCTL:
ret = handle_aiocb_ioctl(aiocb);
break;
default:
@@ -346,24 +361,8 @@ static void spawn_thread(void)
thread_create(&thread_id, &attr, aio_thread, NULL);
}
-int qemu_paio_init(struct qemu_paioinit *aioinit)
+static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
- int ret;
-
- ret = pthread_attr_init(&attr);
- if (ret) die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret) die2(ret, "pthread_attr_setdetachstate");
-
- TAILQ_INIT(&request_list);
-
- return 0;
-}
-
-static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
-{
- aiocb->aio_type = type;
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
mutex_lock(&lock);
@@ -372,26 +371,9 @@ static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
TAILQ_INSERT_TAIL(&request_list, aiocb, node);
mutex_unlock(&lock);
cond_signal(&cond);
-
- return 0;
}
-int qemu_paio_read(struct qemu_paiocb *aiocb)
-{
- return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
-}
-
-int qemu_paio_write(struct qemu_paiocb *aiocb)
-{
- return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
-}
-
-int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
-{
- return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
-}
-
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
@@ -402,7 +384,7 @@ ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
return ret;
}
-int qemu_paio_error(struct qemu_paiocb *aiocb)
+static int qemu_paio_error(struct qemu_paiocb *aiocb)
{
ssize_t ret = qemu_paio_return(aiocb);
@@ -414,20 +396,217 @@ int qemu_paio_error(struct qemu_paiocb *aiocb)
return ret;
}
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
+static void posix_aio_read(void *opaque)
{
+ PosixAioState *s = opaque;
+ struct qemu_paiocb *acb, **pacb;
int ret;
+ ssize_t len;
+
+ /* read all bytes from signal pipe */
+ for (;;) {
+ char bytes[16];
+
+ len = read(s->rfd, bytes, sizeof(bytes));
+ if (len == -1 && errno == EINTR)
+ continue; /* try again */
+ if (len == sizeof(bytes))
+ continue; /* more to read */
+ break;
+ }
+
+ for(;;) {
+ pacb = &s->first_aio;
+ for(;;) {
+ acb = *pacb;
+ if (!acb)
+ goto the_end;
+ ret = qemu_paio_error(acb);
+ if (ret == ECANCELED) {
+ /* remove the request */
+ *pacb = acb->next;
+ qemu_aio_release(acb);
+ } else if (ret != EINPROGRESS) {
+ /* end of aio */
+ if (ret == 0) {
+ ret = qemu_paio_return(acb);
+ if (ret == acb->aio_nbytes)
+ ret = 0;
+ else
+ ret = -EINVAL;
+ } else {
+ ret = -ret;
+ }
+ /* remove the request */
+ *pacb = acb->next;
+ /* call the callback */
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_aio_release(acb);
+ break;
+ } else {
+ pacb = &acb->next;
+ }
+ }
+ }
+ the_end: ;
+}
+
+static int posix_aio_flush(void *opaque)
+{
+ PosixAioState *s = opaque;
+ return !!s->first_aio;
+}
+
+static PosixAioState *posix_aio_state;
+
+static void aio_signal_handler(int signum)
+{
+ if (posix_aio_state) {
+ char byte = 0;
+
+ write(posix_aio_state->wfd, &byte, sizeof(byte));
+ }
+
+ qemu_service_io();
+}
+
+static void paio_remove(struct qemu_paiocb *acb)
+{
+ struct qemu_paiocb **pacb;
+
+ /* remove the callback from the queue */
+ pacb = &posix_aio_state->first_aio;
+ for(;;) {
+ if (*pacb == NULL) {
+ fprintf(stderr, "paio_remove: aio request not found!\n");
+ break;
+ } else if (*pacb == acb) {
+ *pacb = acb->next;
+ qemu_aio_release(acb);
+ break;
+ }
+ pacb = &(*pacb)->next;
+ }
+}
+
+static void paio_cancel(BlockDriverAIOCB *blockacb)
+{
+ struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
+ int active = 0;
mutex_lock(&lock);
- if (!aiocb->active) {
- TAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->ret = -ECANCELED;
- ret = QEMU_PAIO_CANCELED;
- } else if (aiocb->ret == -EINPROGRESS)
- ret = QEMU_PAIO_NOTCANCELED;
- else
- ret = QEMU_PAIO_ALLDONE;
+ if (!acb->active) {
+ TAILQ_REMOVE(&request_list, acb, node);
+ acb->ret = -ECANCELED;
+ } else if (acb->ret == -EINPROGRESS) {
+ active = 1;
+ }
mutex_unlock(&lock);
- return ret;
+ if (active) {
+ /* fail safe: if the aio could not be canceled, we wait for
+ it */
+ while (qemu_paio_error(acb) == EINPROGRESS)
+ ;
+ }
+
+ paio_remove(acb);
+}
+
+static AIOPool raw_aio_pool = {
+ .aiocb_size = sizeof(struct qemu_paiocb),
+ .cancel = paio_cancel,
+};
+
+BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque, int type)
+{
+ struct qemu_paiocb *acb;
+
+ acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
+ if (!acb)
+ return NULL;
+ acb->aio_type = type;
+ acb->aio_fildes = fd;
+ acb->ev_signo = SIGUSR2;
+ acb->aio_iov = qiov->iov;
+ acb->aio_niov = qiov->niov;
+ acb->aio_nbytes = nb_sectors * 512;
+ acb->aio_offset = sector_num * 512;
+
+ acb->next = posix_aio_state->first_aio;
+ posix_aio_state->first_aio = acb;
+
+ qemu_paio_submit(acb);
+ return &acb->common;
+}
+
+BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
+ unsigned long int req, void *buf,
+ BlockDriverCompletionFunc *cb, void *opaque)
+{
+ struct qemu_paiocb *acb;
+
+ acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
+ if (!acb)
+ return NULL;
+ acb->aio_type = QEMU_AIO_IOCTL;
+ acb->aio_fildes = fd;
+ acb->ev_signo = SIGUSR2;
+ acb->aio_offset = 0;
+ acb->aio_ioctl_buf = buf;
+ acb->aio_ioctl_cmd = req;
+
+ acb->next = posix_aio_state->first_aio;
+ posix_aio_state->first_aio = acb;
+
+ qemu_paio_submit(acb);
+ return &acb->common;
+}
+
+void *paio_init(void)
+{
+ struct sigaction act;
+ PosixAioState *s;
+ int fds[2];
+ int ret;
+
+ if (posix_aio_state)
+ return posix_aio_state;
+
+ s = qemu_malloc(sizeof(PosixAioState));
+
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = aio_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+
+ s->first_aio = NULL;
+ if (pipe(fds) == -1) {
+ fprintf(stderr, "failed to create pipe\n");
+ return NULL;
+ }
+
+ s->rfd = fds[0];
+ s->wfd = fds[1];
+
+ fcntl(s->rfd, F_SETFL, O_NONBLOCK);
+ fcntl(s->wfd, F_SETFL, O_NONBLOCK);
+
+ qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
+
+ ret = pthread_attr_init(&attr);
+ if (ret)
+ die2(ret, "pthread_attr_init");
+
+ ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (ret)
+ die2(ret, "pthread_attr_setdetachstate");
+
+ TAILQ_INIT(&request_list);
+
+ posix_aio_state = s;
+
+ return posix_aio_state;
}