diff options
Diffstat (limited to 'util/aio-posix.c')
-rw-r--r-- | util/aio-posix.c | 147 |
1 files changed, 103 insertions, 44 deletions
diff --git a/util/aio-posix.c b/util/aio-posix.c index 2b86777e91..266c9dd35f 100644 --- a/util/aio-posix.c +++ b/util/aio-posix.c @@ -15,6 +15,7 @@ #include "qemu/osdep.h" #include "block/block.h" +#include "block/thread-pool.h" #include "qemu/main-loop.h" #include "qemu/rcu.h" #include "qemu/rcu_queue.h" @@ -40,6 +41,14 @@ void aio_add_ready_handler(AioHandlerList *ready_list, QLIST_INSERT_HEAD(ready_list, node, node_ready); } +static void aio_add_poll_ready_handler(AioHandlerList *ready_list, + AioHandler *node) +{ + QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ + node->poll_ready = true; + QLIST_INSERT_HEAD(ready_list, node, node_ready); +} + static AioHandler *find_aio_handler(AioContext *ctx, int fd) { AioHandler *node; @@ -67,6 +76,7 @@ static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) } node->pfd.revents = 0; + node->poll_ready = false; /* If the fd monitor has already marked it deleted, leave it alone */ if (QLIST_IS_INSERTED(node, node_deleted)) { @@ -89,10 +99,10 @@ static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) void aio_set_fd_handler(AioContext *ctx, int fd, - bool is_external, IOHandler *io_read, IOHandler *io_write, AioPollFn *io_poll, + IOHandler *io_poll_ready, void *opaque) { AioHandler *node; @@ -101,6 +111,10 @@ void aio_set_fd_handler(AioContext *ctx, bool deleted = false; int poll_disable_change; + if (io_poll && !io_poll_ready) { + io_poll = NULL; /* polling only makes sense if there is a handler */ + } + qemu_lockcnt_lock(&ctx->list_lock); node = find_aio_handler(ctx, fd); @@ -127,8 +141,8 @@ void aio_set_fd_handler(AioContext *ctx, new_node->io_read = io_read; new_node->io_write = io_write; new_node->io_poll = io_poll; + new_node->io_poll_ready = io_poll_ready; new_node->opaque = opaque; - new_node->is_external = is_external; if (is_new) { new_node->pfd.fd = fd; @@ -164,9 +178,9 @@ void aio_set_fd_handler(AioContext *ctx, } } -void aio_set_fd_poll(AioContext *ctx, int fd, - IOHandler *io_poll_begin, - IOHandler *io_poll_end) +static void aio_set_fd_poll(AioContext *ctx, int fd, + IOHandler *io_poll_begin, + IOHandler *io_poll_end) { AioHandler *node = find_aio_handler(ctx, fd); @@ -180,12 +194,13 @@ void aio_set_fd_poll(AioContext *ctx, int fd, void aio_set_event_notifier(AioContext *ctx, EventNotifier *notifier, - bool is_external, EventNotifierHandler *io_read, - AioPollFn *io_poll) + AioPollFn *io_poll, + EventNotifierHandler *io_poll_ready) { - aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, - (IOHandler *)io_read, NULL, io_poll, notifier); + aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), + (IOHandler *)io_read, NULL, io_poll, + (IOHandler *)io_poll_ready, notifier); } void aio_set_event_notifier_poll(AioContext *ctx, @@ -198,7 +213,8 @@ void aio_set_event_notifier_poll(AioContext *ctx, (IOHandler *)io_poll_end); } -static bool poll_set_started(AioContext *ctx, bool started) +static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list, + bool started) { AioHandler *node; bool progress = false; @@ -228,8 +244,9 @@ static bool poll_set_started(AioContext *ctx, bool started) } /* Poll one last time in case ->io_poll_end() raced with the event */ - if (!started) { - progress = node->io_poll(node->opaque) || progress; + if (!started && node->io_poll(node->opaque)) { + aio_add_poll_ready_handler(ready_list, node); + progress = true; } } qemu_lockcnt_dec(&ctx->list_lock); @@ -240,8 +257,11 @@ static bool poll_set_started(AioContext *ctx, bool started) bool aio_prepare(AioContext *ctx) { + AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); + /* Poll mode cannot be used with glib's event loop, disable it. */ - poll_set_started(ctx, false); + poll_set_started(ctx, &ready_list, false); + /* TODO what to do with this list? */ return false; } @@ -260,14 +280,13 @@ bool aio_pending(AioContext *ctx) QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { int revents; + /* TODO should this check poll ready? */ revents = node->pfd.revents & node->pfd.events; - if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && - aio_node_check(ctx, node->is_external)) { + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { result = true; break; } - if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && - aio_node_check(ctx, node->is_external)) { + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { result = true; break; } @@ -301,11 +320,15 @@ static void aio_free_deleted_handlers(AioContext *ctx) static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) { bool progress = false; + bool poll_ready; int revents; revents = node->pfd.revents & node->pfd.events; node->pfd.revents = 0; + poll_ready = node->poll_ready; + node->poll_ready = false; + /* * Start polling AioHandlers when they become ready because activity is * likely to continue. Note that starvation is theoretically possible when @@ -321,10 +344,30 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) } QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); } + if (!QLIST_IS_INSERTED(node, node_deleted) && + poll_ready && revents == 0 && node->io_poll_ready) { + /* + * Remove temporarily to avoid infinite loops when ->io_poll_ready() + * calls aio_poll() before clearing the condition that made the poll + * handler become ready. + */ + QLIST_SAFE_REMOVE(node, node_poll); + + node->io_poll_ready(node->opaque); + + if (!QLIST_IS_INSERTED(node, node_poll)) { + QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); + } + + /* + * Return early since revents was zero. aio_notify() does not count as + * progress. + */ + return node->opaque != &ctx->notifier; + } if (!QLIST_IS_INSERTED(node, node_deleted) && (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && - aio_node_check(ctx, node->is_external) && node->io_read) { node->io_read(node->opaque); @@ -335,7 +378,6 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) } if (!QLIST_IS_INSERTED(node, node_deleted) && (revents & (G_IO_OUT | G_IO_ERR)) && - aio_node_check(ctx, node->is_external) && node->io_write) { node->io_write(node->opaque); progress = true; @@ -387,6 +429,7 @@ void aio_dispatch(AioContext *ctx) } static bool run_poll_handlers_once(AioContext *ctx, + AioHandlerList *ready_list, int64_t now, int64_t *timeout) { @@ -395,8 +438,9 @@ static bool run_poll_handlers_once(AioContext *ctx, AioHandler *tmp; QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { - if (aio_node_check(ctx, node->is_external) && - node->io_poll(node->opaque)) { + if (node->io_poll(node->opaque)) { + aio_add_poll_ready_handler(ready_list, node); + node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; /* @@ -420,7 +464,9 @@ static bool fdmon_supports_polling(AioContext *ctx) return ctx->fdmon_ops->need_wait != aio_poll_disabled; } -static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) +static bool remove_idle_poll_handlers(AioContext *ctx, + AioHandlerList *ready_list, + int64_t now) { AioHandler *node; AioHandler *tmp; @@ -451,7 +497,10 @@ static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) * Nevermind about re-adding the handler in the rare case where * this causes progress. */ - progress = node->io_poll(node->opaque) || progress; + if (node->io_poll(node->opaque)) { + aio_add_poll_ready_handler(ready_list, node); + progress = true; + } } } } @@ -461,6 +510,7 @@ static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) /* run_poll_handlers: * @ctx: the AioContext + * @ready_list: the list to place ready handlers on * @max_ns: maximum time to poll for, in nanoseconds * * Polls for a given time. @@ -469,7 +519,8 @@ static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) * * Returns: true if progress was made, false otherwise */ -static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) +static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list, + int64_t max_ns, int64_t *timeout) { bool progress; int64_t start_time, elapsed_time; @@ -490,13 +541,15 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); do { - progress = run_poll_handlers_once(ctx, start_time, timeout); + progress = run_poll_handlers_once(ctx, ready_list, + start_time, timeout); elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; max_ns = qemu_soonest_timeout(*timeout, max_ns); assert(!(max_ns && progress)); } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx)); - if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) { + if (remove_idle_poll_handlers(ctx, ready_list, + start_time + elapsed_time)) { *timeout = 0; progress = true; } @@ -514,6 +567,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) /* try_poll_mode: * @ctx: the AioContext + * @ready_list: list to add handlers that need to be run * @timeout: timeout for blocking wait, computed by the caller and updated if * polling succeeds. * @@ -521,7 +575,8 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) * * Returns: true if progress was made, false otherwise */ -static bool try_poll_mode(AioContext *ctx, int64_t *timeout) +static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list, + int64_t *timeout) { int64_t max_ns; @@ -531,25 +586,22 @@ static bool try_poll_mode(AioContext *ctx, int64_t *timeout) max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) { - poll_set_started(ctx, true); + /* + * Enable poll mode. It pairs with the poll_set_started() in + * aio_poll() which disables poll mode. + */ + poll_set_started(ctx, ready_list, true); - if (run_poll_handlers(ctx, max_ns, timeout)) { + if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) { return true; } } - - if (poll_set_started(ctx, false)) { - *timeout = 0; - return true; - } - return false; } bool aio_poll(AioContext *ctx, bool blocking) { AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); - int ret = 0; bool progress; bool use_notify_me; int64_t timeout; @@ -574,7 +626,7 @@ bool aio_poll(AioContext *ctx, bool blocking) } timeout = blocking ? aio_compute_timeout(ctx) : 0; - progress = try_poll_mode(ctx, &timeout); + progress = try_poll_mode(ctx, &ready_list, &timeout); assert(!(timeout && progress)); /* @@ -604,7 +656,18 @@ bool aio_poll(AioContext *ctx, bool blocking) * system call---a single round of run_poll_handlers_once suffices. */ if (timeout || ctx->fdmon_ops->need_wait(ctx)) { - ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout); + /* + * Disable poll mode. poll mode should be disabled before the call + * of ctx->fdmon_ops->wait() so that guest's notification can wake + * up IO threads when some work becomes pending. It is essential to + * avoid hangs or unnecessary latency. + */ + if (poll_set_started(ctx, &ready_list, false)) { + timeout = 0; + progress = true; + } + + ctx->fdmon_ops->wait(ctx, &ready_list, timeout); } if (use_notify_me) { @@ -657,10 +720,7 @@ bool aio_poll(AioContext *ctx, bool blocking) } progress |= aio_bh_poll(ctx); - - if (ret > 0) { - progress |= aio_dispatch_ready_handlers(ctx, &ready_list); - } + progress |= aio_dispatch_ready_handlers(ctx, &ready_list); aio_free_deleted_handlers(ctx); @@ -717,8 +777,7 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, aio_notify(ctx); } -void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch, - Error **errp) +void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch) { /* * No thread synchronization here, it doesn't matter if an incorrect value |