diff options
Diffstat (limited to 'nbd/server.c')
-rw-r--r-- | nbd/server.c | 1239 |
1 files changed, 844 insertions, 395 deletions
diff --git a/nbd/server.c b/nbd/server.c index 3927f7789d..c3484cc1eb 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016-2020 Red Hat, Inc. + * Copyright Red Hat * Copyright (C) 2005 Anthony Liguori <anthony@codemonkey.ws> * * Network Block Device Server Side @@ -19,12 +19,15 @@ #include "qemu/osdep.h" +#include "block/block_int.h" #include "block/export.h" +#include "block/dirty-bitmap.h" #include "qapi/error.h" #include "qemu/queue.h" #include "trace.h" #include "nbd-internal.h" #include "qemu/units.h" +#include "qemu/memalign.h" #define NBD_META_ID_BASE_ALLOCATION 0 #define NBD_META_ID_ALLOCATION_DEPTH 1 @@ -77,7 +80,6 @@ static int system_errno_to_nbd_errno(int err) typedef struct NBDRequestData NBDRequestData; struct NBDRequestData { - QSIMPLEQ_ENTRY(NBDRequestData) entry; NBDClient *client; uint8_t *data; bool complete; @@ -103,11 +105,13 @@ struct NBDExport { static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports); -/* NBDExportMetaContexts represents a list of contexts to be exported, +/* + * NBDMetaContexts represents a list of meta contexts in use, * as selected by NBD_OPT_SET_META_CONTEXT. Also used for - * NBD_OPT_LIST_META_CONTEXT. */ -typedef struct NBDExportMetaContexts { - NBDExport *exp; + * NBD_OPT_LIST_META_CONTEXT. + */ +struct NBDMetaContexts { + const NBDExport *exp; /* associated export */ size_t count; /* number of negotiated contexts */ bool base_allocation; /* export base:allocation context (block status) */ bool allocation_depth; /* export qemu:allocation-depth */ @@ -115,34 +119,36 @@ typedef struct NBDExportMetaContexts { * export qemu:dirty-bitmap:<export bitmap name>, * sized by exp->nr_export_bitmaps */ -} NBDExportMetaContexts; +}; struct NBDClient { - int refcount; + int refcount; /* atomic */ void (*close_fn)(NBDClient *client, bool negotiated); + QemuMutex lock; + NBDExport *exp; QCryptoTLSCreds *tlscreds; char *tlsauthz; QIOChannelSocket *sioc; /* The underlying data channel */ QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ - Coroutine *recv_coroutine; + Coroutine *recv_coroutine; /* protected by lock */ CoMutex send_lock; Coroutine *send_coroutine; - bool read_yielding; - bool quiescing; + bool read_yielding; /* protected by lock */ + bool quiescing; /* protected by lock */ QTAILQ_ENTRY(NBDClient) next; - int nb_requests; - bool closing; + int nb_requests; /* protected by lock */ + bool closing; /* protected by lock */ uint32_t check_align; /* If non-zero, check for aligned client requests */ - bool structured_reply; - NBDExportMetaContexts export_meta; + NBDMode mode; + NBDMetaContexts contexts; /* Negotiated meta contexts */ uint32_t opt; /* Current option being negotiated */ uint32_t optlen; /* remaining length of data in ioc for the option being @@ -213,7 +219,7 @@ static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type, /* Send an error reply. * Return -errno on error, 0 on success. */ -static int GCC_FMT_ATTR(4, 0) +static int G_GNUC_PRINTF(4, 0) nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type, Error **errp, const char *fmt, va_list va) { @@ -253,7 +259,7 @@ nbd_sanitize_name(const char *name) /* Send an error reply. * Return -errno on error, 0 on success. */ -static int GCC_FMT_ATTR(4, 5) +static int G_GNUC_PRINTF(4, 5) nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type, Error **errp, const char *fmt, ...) { @@ -269,7 +275,7 @@ nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type, /* Drop remainder of the current option, and send a reply with the * given error type and message. Return -errno on read or write * failure; or 0 if connection is still live. */ -static int GCC_FMT_ATTR(4, 0) +static int G_GNUC_PRINTF(4, 0) nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp, const char *fmt, va_list va) { @@ -282,7 +288,7 @@ nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp, return ret; } -static int GCC_FMT_ATTR(4, 5) +static int G_GNUC_PRINTF(4, 5) nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp, const char *fmt, ...) { @@ -296,7 +302,7 @@ nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp, return ret; } -static int GCC_FMT_ATTR(3, 4) +static int G_GNUC_PRINTF(3, 4) nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...) { int ret; @@ -453,10 +459,10 @@ static int nbd_negotiate_handle_list(NBDClient *client, Error **errp) return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp); } -static void nbd_check_meta_export(NBDClient *client) +static void nbd_check_meta_export(NBDClient *client, NBDExport *exp) { - if (client->exp != client->export_meta.exp) { - client->export_meta.count = 0; + if (exp != client->contexts.exp) { + client->contexts.count = 0; } } @@ -480,6 +486,10 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes, [10 .. 133] reserved (0) [unless no_zeroes] */ trace_nbd_negotiate_handle_export_name(); + if (client->mode >= NBD_MODE_EXTENDED) { + error_setg(errp, "Extended headers already negotiated"); + return -EINVAL; + } if (client->optlen > NBD_MAX_STRING_SIZE) { error_setg(errp, "Bad length received"); return -EINVAL; @@ -498,11 +508,15 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes, error_setg(errp, "export not found"); return -EINVAL; } + nbd_check_meta_export(client, client->exp); myflags = client->exp->nbdflags; - if (client->structured_reply) { + if (client->mode >= NBD_MODE_STRUCTURED) { myflags |= NBD_FLAG_SEND_DF; } + if (client->mode >= NBD_MODE_EXTENDED && client->contexts.count) { + myflags |= NBD_FLAG_BLOCK_STAT_PAYLOAD; + } trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags); stq_be_p(buf, client->exp->size); stw_be_p(buf + 8, myflags); @@ -515,7 +529,6 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes, QTAILQ_INSERT_TAIL(&client->exp->clients, client, next); blk_exp_ref(&client->exp->common); - nbd_check_meta_export(client); return 0; } @@ -635,6 +648,9 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp) errp, "export '%s' not present", sane_name); } + if (client->opt == NBD_OPT_GO) { + nbd_check_meta_export(client, exp); + } /* Don't bother sending NBD_INFO_NAME unless client requested it */ if (sendname) { @@ -685,9 +701,13 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp) /* Send NBD_INFO_EXPORT always */ myflags = exp->nbdflags; - if (client->structured_reply) { + if (client->mode >= NBD_MODE_STRUCTURED) { myflags |= NBD_FLAG_SEND_DF; } + if (client->mode >= NBD_MODE_EXTENDED && + (client->contexts.count || client->opt == NBD_OPT_INFO)) { + myflags |= NBD_FLAG_BLOCK_STAT_PAYLOAD; + } trace_nbd_negotiate_new_style_size_flags(exp->size, myflags); stq_be_p(buf, exp->size); stw_be_p(buf + 8, myflags); @@ -723,7 +743,6 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp) client->check_align = check_align; QTAILQ_INSERT_TAIL(&client->exp->clients, client, next); blk_exp_ref(&client->exp->common); - nbd_check_meta_export(client); rc = 1; } return rc; @@ -846,7 +865,7 @@ static bool nbd_strshift(const char **str, const char *prefix) * Handle queries to 'base' namespace. For now, only the base:allocation * context is available. Return true if @query has been handled. */ -static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta, +static bool nbd_meta_base_query(NBDClient *client, NBDMetaContexts *meta, const char *query) { if (!nbd_strshift(&query, "base:")) { @@ -866,7 +885,7 @@ static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta, * and qemu:allocation-depth contexts are available. Return true if @query * has been handled. */ -static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta, +static bool nbd_meta_qemu_query(NBDClient *client, NBDMetaContexts *meta, const char *query) { size_t i; @@ -879,7 +898,9 @@ static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta, if (!*query) { if (client->opt == NBD_OPT_LIST_META_CONTEXT) { meta->allocation_depth = meta->exp->allocation_depth; - memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps); + if (meta->exp->nr_export_bitmaps) { + memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps); + } } trace_nbd_negotiate_meta_query_parse("empty"); return true; @@ -894,7 +915,8 @@ static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta, if (nbd_strshift(&query, "dirty-bitmap:")) { trace_nbd_negotiate_meta_query_parse("dirty-bitmap:"); if (!*query) { - if (client->opt == NBD_OPT_LIST_META_CONTEXT) { + if (client->opt == NBD_OPT_LIST_META_CONTEXT && + meta->exp->nr_export_bitmaps) { memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps); } trace_nbd_negotiate_meta_query_parse("empty"); @@ -929,7 +951,7 @@ static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta, * Return -errno on I/O error, 0 if option was completely handled by * sending a reply about inconsistent lengths, or 1 on success. */ static int nbd_negotiate_meta_query(NBDClient *client, - NBDExportMetaContexts *meta, Error **errp) + NBDMetaContexts *meta, Error **errp) { int ret; g_autofree char *query = NULL; @@ -968,19 +990,20 @@ static int nbd_negotiate_meta_query(NBDClient *client, * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT * * Return -errno on I/O error, or 0 if option was completely handled. */ -static int nbd_negotiate_meta_queries(NBDClient *client, - NBDExportMetaContexts *meta, Error **errp) +static int nbd_negotiate_meta_queries(NBDClient *client, Error **errp) { int ret; g_autofree char *export_name = NULL; /* Mark unused to work around https://bugs.llvm.org/show_bug.cgi?id=3888 */ g_autofree G_GNUC_UNUSED bool *bitmaps = NULL; - NBDExportMetaContexts local_meta = {0}; + NBDMetaContexts local_meta = {0}; + NBDMetaContexts *meta; uint32_t nb_queries; size_t i; size_t count = 0; - if (!client->structured_reply) { + if (client->opt == NBD_OPT_SET_META_CONTEXT && + client->mode < NBD_MODE_STRUCTURED) { return nbd_opt_invalid(client, errp, "request option '%s' when structured reply " "is not negotiated", @@ -990,6 +1013,8 @@ static int nbd_negotiate_meta_queries(NBDClient *client, if (client->opt == NBD_OPT_LIST_META_CONTEXT) { /* Only change the caller's meta on SET. */ meta = &local_meta; + } else { + meta = &client->contexts; } g_free(meta->bitmaps); @@ -1024,7 +1049,9 @@ static int nbd_negotiate_meta_queries(NBDClient *client, /* enable all known contexts */ meta->base_allocation = true; meta->allocation_depth = meta->exp->allocation_depth; - memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps); + if (meta->exp->nr_export_bitmaps) { + memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps); + } } else { for (i = 0; i < nb_queries; ++i) { ret = nbd_negotiate_meta_query(client, meta, errp); @@ -1115,10 +1142,12 @@ static int nbd_negotiate_options(NBDClient *client, Error **errp) if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) { return -EIO; } + client->mode = NBD_MODE_EXPORT_NAME; trace_nbd_negotiate_options_flags(flags); if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) { fixedNewstyle = true; flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE; + client->mode = NBD_MODE_SIMPLE; } if (flags & NBD_FLAG_C_NO_ZEROES) { no_zeroes = true; @@ -1155,7 +1184,7 @@ static int nbd_negotiate_options(NBDClient *client, Error **errp) client->optlen = length; if (length > NBD_MAX_BUFFER_SIZE) { - error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)", + error_setg(errp, "len (%" PRIu32 ") is larger than max len (%u)", length, NBD_MAX_BUFFER_SIZE); return -EINVAL; } @@ -1182,7 +1211,7 @@ static int nbd_negotiate_options(NBDClient *client, Error **errp) } ret = 0; object_unref(OBJECT(client->ioc)); - client->ioc = QIO_CHANNEL(tioc); + client->ioc = tioc; break; case NBD_OPT_EXPORT_NAME: @@ -1254,20 +1283,36 @@ static int nbd_negotiate_options(NBDClient *client, Error **errp) case NBD_OPT_STRUCTURED_REPLY: if (length) { ret = nbd_reject_length(client, false, errp); - } else if (client->structured_reply) { + } else if (client->mode >= NBD_MODE_EXTENDED) { + ret = nbd_negotiate_send_rep_err( + client, NBD_REP_ERR_EXT_HEADER_REQD, errp, + "extended headers already negotiated"); + } else if (client->mode >= NBD_MODE_STRUCTURED) { ret = nbd_negotiate_send_rep_err( client, NBD_REP_ERR_INVALID, errp, "structured reply already negotiated"); } else { ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp); - client->structured_reply = true; + client->mode = NBD_MODE_STRUCTURED; } break; case NBD_OPT_LIST_META_CONTEXT: case NBD_OPT_SET_META_CONTEXT: - ret = nbd_negotiate_meta_queries(client, &client->export_meta, - errp); + ret = nbd_negotiate_meta_queries(client, errp); + break; + + case NBD_OPT_EXTENDED_HEADERS: + if (length) { + ret = nbd_reject_length(client, false, errp); + } else if (client->mode >= NBD_MODE_EXTENDED) { + ret = nbd_negotiate_send_rep_err( + client, NBD_REP_ERR_INVALID, errp, + "extended headers already negotiated"); + } else { + ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp); + client->mode = NBD_MODE_EXTENDED; + } break; default: @@ -1326,6 +1371,7 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp) */ qio_channel_set_blocking(client->ioc, false, NULL); + qio_channel_set_follow_coroutine_ctx(client->ioc, true); trace_nbd_negotiate_begin(); memcpy(buf, "NBDMAGIC", 8); @@ -1345,11 +1391,6 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp) return ret; } - /* Attach the channel to the same AioContext as the export */ - if (client->exp && client->exp->common.ctx) { - qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx); - } - assert(!client->optlen); trace_nbd_negotiate_success(); @@ -1376,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) len = qio_channel_readv(client->ioc, &iov, 1, errp); if (len == QIO_CHANNEL_ERR_BLOCK) { - client->read_yielding = true; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->read_yielding = true; + + /* Prompt main loop thread to re-run nbd_drained_poll() */ + aio_wait_kick(); + } qio_channel_yield(client->ioc, G_IO_IN); - client->read_yielding = false; - if (client->quiescing) { - return -EAGAIN; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->read_yielding = false; + if (client->quiescing) { + return -EAGAIN; + } } continue; } else if (len < 0) { @@ -1402,39 +1450,59 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) return 1; } -static int nbd_receive_request(NBDClient *client, NBDRequest *request, - Error **errp) +static int coroutine_fn nbd_receive_request(NBDClient *client, NBDRequest *request, + Error **errp) { - uint8_t buf[NBD_REQUEST_SIZE]; - uint32_t magic; + uint8_t buf[NBD_EXTENDED_REQUEST_SIZE]; + uint32_t magic, expect; int ret; + size_t size = client->mode >= NBD_MODE_EXTENDED ? + NBD_EXTENDED_REQUEST_SIZE : NBD_REQUEST_SIZE; - ret = nbd_read_eof(client, buf, sizeof(buf), errp); + ret = nbd_read_eof(client, buf, size, errp); if (ret < 0) { return ret; } + if (ret == 0) { + return -EIO; + } - /* Request - [ 0 .. 3] magic (NBD_REQUEST_MAGIC) - [ 4 .. 5] flags (NBD_CMD_FLAG_FUA, ...) - [ 6 .. 7] type (NBD_CMD_READ, ...) - [ 8 .. 15] handle - [16 .. 23] from - [24 .. 27] len + /* + * Compact request + * [ 0 .. 3] magic (NBD_REQUEST_MAGIC) + * [ 4 .. 5] flags (NBD_CMD_FLAG_FUA, ...) + * [ 6 .. 7] type (NBD_CMD_READ, ...) + * [ 8 .. 15] cookie + * [16 .. 23] from + * [24 .. 27] len + * Extended request + * [ 0 .. 3] magic (NBD_EXTENDED_REQUEST_MAGIC) + * [ 4 .. 5] flags (NBD_CMD_FLAG_FUA, NBD_CMD_FLAG_PAYLOAD_LEN, ...) + * [ 6 .. 7] type (NBD_CMD_READ, ...) + * [ 8 .. 15] cookie + * [16 .. 23] from + * [24 .. 31] len */ magic = ldl_be_p(buf); request->flags = lduw_be_p(buf + 4); request->type = lduw_be_p(buf + 6); - request->handle = ldq_be_p(buf + 8); + request->cookie = ldq_be_p(buf + 8); request->from = ldq_be_p(buf + 16); - request->len = ldl_be_p(buf + 24); + if (client->mode >= NBD_MODE_EXTENDED) { + request->len = ldq_be_p(buf + 24); + expect = NBD_EXTENDED_REQUEST_MAGIC; + } else { + request->len = (uint32_t)ldl_be_p(buf + 24); /* widen 32 to 64 bits */ + expect = NBD_REQUEST_MAGIC; + } trace_nbd_receive_request(magic, request->flags, request->type, request->from, request->len); - if (magic != NBD_REQUEST_MAGIC) { - error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic); + if (magic != expect) { + error_setg(errp, "invalid magic (got 0x%" PRIx32 ", expected 0x%" + PRIx32 ")", magic, expect); return -EINVAL; } return 0; @@ -1442,20 +1510,22 @@ static int nbd_receive_request(NBDClient *client, NBDRequest *request, #define MAX_NBD_REQUESTS 16 +/* Runs in export AioContext and main loop thread */ void nbd_client_get(NBDClient *client) { - client->refcount++; + qatomic_inc(&client->refcount); } void nbd_client_put(NBDClient *client) { - if (--client->refcount == 0) { + assert(qemu_in_main_thread()); + + if (qatomic_fetch_dec(&client->refcount) == 1) { /* The last reference should be dropped by client->close, * which is called by client_close. */ assert(client->closing); - qio_channel_detach_aio_context(client->ioc); object_unref(OBJECT(client->sioc)); object_unref(OBJECT(client->ioc)); if (client->tlscreds) { @@ -1466,18 +1536,48 @@ void nbd_client_put(NBDClient *client) QTAILQ_REMOVE(&client->exp->clients, client, next); blk_exp_unref(&client->exp->common); } - g_free(client->export_meta.bitmaps); + g_free(client->contexts.bitmaps); + qemu_mutex_destroy(&client->lock); g_free(client); } } +/* + * Tries to release the reference to @client, but only if other references + * remain. This is an optimization for the common case where we want to avoid + * the expense of scheduling nbd_client_put() in the main loop thread. + * + * Returns true upon success or false if the reference was not released because + * it is the last reference. + */ +static bool nbd_client_put_nonzero(NBDClient *client) +{ + int old = qatomic_read(&client->refcount); + int expected; + + do { + if (old == 1) { + return false; + } + + expected = old; + old = qatomic_cmpxchg(&client->refcount, expected, expected - 1); + } while (old != expected); + + return true; +} + static void client_close(NBDClient *client, bool negotiated) { - if (client->closing) { - return; - } + assert(qemu_in_main_thread()); + + WITH_QEMU_LOCK_GUARD(&client->lock) { + if (client->closing) { + return; + } - client->closing = true; + client->closing = true; + } /* Force requests to finish. They will drop their own references, * then we'll close the socket and free the NBDClient. @@ -1491,6 +1591,7 @@ static void client_close(NBDClient *client, bool negotiated) } } +/* Runs in export AioContext with client->lock held */ static NBDRequestData *nbd_request_get(NBDClient *client) { NBDRequestData *req; @@ -1499,11 +1600,11 @@ static NBDRequestData *nbd_request_get(NBDClient *client) client->nb_requests++; req = g_new0(NBDRequestData, 1); - nbd_client_get(client); req->client = client; return req; } +/* Runs in export AioContext with client->lock held */ static void nbd_request_put(NBDRequestData *req) { NBDClient *client = req->client; @@ -1520,8 +1621,6 @@ static void nbd_request_put(NBDRequestData *req) } nbd_client_receive_next_request(client); - - nbd_client_put(client); } static void blk_aio_attached(AioContext *ctx, void *opaque) @@ -1529,29 +1628,28 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + trace_nbd_blk_aio_attached(exp->name, ctx); exp->common.ctx = ctx; QTAILQ_FOREACH(client, &exp->clients, next) { - qio_channel_attach_aio_context(client->ioc, ctx); - - assert(client->nb_requests == 0); - assert(client->recv_coroutine == NULL); - assert(client->send_coroutine == NULL); + WITH_QEMU_LOCK_GUARD(&client->lock) { + assert(client->nb_requests == 0); + assert(client->recv_coroutine == NULL); + assert(client->send_coroutine == NULL); + } } } static void blk_aio_detach(void *opaque) { NBDExport *exp = opaque; - NBDClient *client; - trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); + assert(qemu_in_main_thread()); - QTAILQ_FOREACH(client, &exp->clients, next) { - qio_channel_detach_aio_context(client->ioc); - } + trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); exp->common.ctx = NULL; } @@ -1561,8 +1659,12 @@ static void nbd_drained_begin(void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - client->quiescing = true; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->quiescing = true; + } } } @@ -1571,29 +1673,48 @@ static void nbd_drained_end(void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - client->quiescing = false; - nbd_client_receive_next_request(client); + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->quiescing = false; + nbd_client_receive_next_request(client); + } } } +/* Runs in export AioContext */ +static void nbd_wake_read_bh(void *opaque) +{ + NBDClient *client = opaque; + qio_channel_wake_read(client->ioc); +} + static bool nbd_drained_poll(void *opaque) { NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - if (client->nb_requests != 0) { - /* - * If there's a coroutine waiting for a request on nbd_read_eof() - * enter it here so we don't depend on the client to wake it up. - */ - if (client->recv_coroutine != NULL && client->read_yielding) { - qemu_aio_coroutine_enter(exp->common.ctx, - client->recv_coroutine); - } + WITH_QEMU_LOCK_GUARD(&client->lock) { + if (client->nb_requests != 0) { + /* + * If there's a coroutine waiting for a request on nbd_read_eof() + * enter it here so we don't depend on the client to wake it up. + * + * Schedule a BH in the export AioContext to avoid missing the + * wake up due to the race between qio_channel_wake_read() and + * qio_channel_yield(). + */ + if (client->recv_coroutine != NULL && client->read_yielding) { + aio_bh_schedule_oneshot(nbd_export_aio_context(client->exp), + nbd_wake_read_bh, client); + } - return true; + return true; + } } } @@ -1604,6 +1725,8 @@ static void nbd_eject_notifier(Notifier *n, void *data) { NBDExport *exp = container_of(n, NBDExport, eject_notifier); + assert(qemu_in_main_thread()); + blk_exp_request_shutdown(&exp->common); } @@ -1630,15 +1753,16 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args, { NBDExport *exp = container_of(blk_exp, NBDExport, common); BlockExportOptionsNbd *arg = &exp_args->u.nbd; + const char *name = arg->name ?: exp_args->node_name; BlockBackend *blk = blk_exp->blk; int64_t size; uint64_t perm, shared_perm; bool readonly = !exp_args->writable; - bool shared = !exp_args->writable; - strList *bitmaps; + BlockDirtyBitmapOrStrList *bitmaps; size_t i; int ret; + GLOBAL_STATE_CODE(); assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD); if (!nbd_server_is_running()) { @@ -1646,12 +1770,8 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args, return -EINVAL; } - if (!arg->has_name) { - arg->name = exp_args->node_name; - } - - if (strlen(arg->name) > NBD_MAX_STRING_SIZE) { - error_setg(errp, "export name '%s' too long", arg->name); + if (strlen(name) > NBD_MAX_STRING_SIZE) { + error_setg(errp, "export name '%s' too long", name); return -EINVAL; } @@ -1660,8 +1780,8 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args, return -EINVAL; } - if (nbd_export_find(arg->name)) { - error_setg(errp, "NBD server already has export named '%s'", arg->name); + if (nbd_export_find(name)) { + error_setg(errp, "NBD server already has export named '%s'", name); return -EEXIST; } @@ -1681,57 +1801,79 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args, } QTAILQ_INIT(&exp->clients); - exp->name = g_strdup(arg->name); + exp->name = g_strdup(name); exp->description = g_strdup(arg->description); exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE); + + if (nbd_server_max_connections() != 1) { + exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN; + } if (readonly) { exp->nbdflags |= NBD_FLAG_READ_ONLY; - if (shared) { - exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN; - } } else { exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES | NBD_FLAG_SEND_FAST_ZERO); } exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE); + bdrv_graph_rdlock_main_loop(); + for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) { exp->nr_export_bitmaps++; } exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps); for (i = 0, bitmaps = arg->bitmaps; bitmaps; - i++, bitmaps = bitmaps->next) { - const char *bitmap = bitmaps->value; + i++, bitmaps = bitmaps->next) + { + const char *bitmap; BlockDriverState *bs = blk_bs(blk); BdrvDirtyBitmap *bm = NULL; - while (bs) { - bm = bdrv_find_dirty_bitmap(bs, bitmap); - if (bm != NULL) { - break; + switch (bitmaps->value->type) { + case QTYPE_QSTRING: + bitmap = bitmaps->value->u.local; + while (bs) { + bm = bdrv_find_dirty_bitmap(bs, bitmap); + if (bm != NULL) { + break; + } + + bs = bdrv_filter_or_cow_bs(bs); } - bs = bdrv_filter_or_cow_bs(bs); - } + if (bm == NULL) { + ret = -ENOENT; + error_setg(errp, "Bitmap '%s' is not found", + bitmaps->value->u.local); + goto fail; + } - if (bm == NULL) { - ret = -ENOENT; - error_setg(errp, "Bitmap '%s' is not found", bitmap); - goto fail; + if (readonly && bdrv_is_writable(bs) && + bdrv_dirty_bitmap_enabled(bm)) { + ret = -EINVAL; + error_setg(errp, "Enabled bitmap '%s' incompatible with " + "readonly export", bitmap); + goto fail; + } + break; + case QTYPE_QDICT: + bitmap = bitmaps->value->u.external.name; + bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node, + bitmap, NULL, errp); + if (!bm) { + ret = -ENOENT; + goto fail; + } + break; + default: + abort(); } - if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) { - ret = -EINVAL; - goto fail; - } + assert(bm); - if (readonly && bdrv_is_writable(bs) && - bdrv_dirty_bitmap_enabled(bm)) { + if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) { ret = -EINVAL; - error_setg(errp, - "Enabled bitmap '%s' incompatible with readonly export", - bitmap); goto fail; } @@ -1759,9 +1901,12 @@ static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args, QTAILQ_INSERT_TAIL(&exports, exp, next); + bdrv_graph_rdunlock_main_loop(); + return 0; fail: + bdrv_graph_rdunlock_main_loop(); g_free(exp->export_bitmaps); g_free(exp->name); g_free(exp->description); @@ -1821,15 +1966,13 @@ static void nbd_export_delete(BlockExport *blk_exp) g_free(exp->description); exp->description = NULL; - if (exp->common.blk) { - if (exp->eject_notifier_blk) { - notifier_remove(&exp->eject_notifier); - blk_unref(exp->eject_notifier_blk); - } - blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached, - blk_aio_detach, exp); - blk_set_disable_request_queuing(exp->common.blk, false); + if (exp->eject_notifier_blk) { + notifier_remove(&exp->eject_notifier); + blk_unref(exp->eject_notifier_blk); } + blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached, + blk_aio_detach, exp); + blk_set_disable_request_queuing(exp->common.blk, false); for (i = 0; i < exp->nr_export_bitmaps; i++) { bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false); @@ -1862,19 +2005,19 @@ static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov, } static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error, - uint64_t handle) + uint64_t cookie) { stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC); stl_be_p(&reply->error, error); - stq_be_p(&reply->handle, handle); + stq_be_p(&reply->cookie, cookie); } -static int nbd_co_send_simple_reply(NBDClient *client, - uint64_t handle, - uint32_t error, - void *data, - size_t len, - Error **errp) +static int coroutine_fn nbd_co_send_simple_reply(NBDClient *client, + NBDRequest *request, + uint32_t error, + void *data, + uint64_t len, + Error **errp) { NBDSimpleReply reply; int nbd_err = system_errno_to_nbd_errno(error); @@ -1883,144 +2026,184 @@ static int nbd_co_send_simple_reply(NBDClient *client, {.iov_base = data, .iov_len = len} }; - trace_nbd_co_send_simple_reply(handle, nbd_err, nbd_err_lookup(nbd_err), - len); - set_be_simple_reply(&reply, nbd_err, handle); + assert(!len || !nbd_err); + assert(len <= NBD_MAX_BUFFER_SIZE); + assert(client->mode < NBD_MODE_STRUCTURED || + (client->mode == NBD_MODE_STRUCTURED && + request->type != NBD_CMD_READ)); + trace_nbd_co_send_simple_reply(request->cookie, nbd_err, + nbd_err_lookup(nbd_err), len); + set_be_simple_reply(&reply, nbd_err, request->cookie); - return nbd_co_send_iov(client, iov, len ? 2 : 1, errp); + return nbd_co_send_iov(client, iov, 2, errp); } -static inline void set_be_chunk(NBDStructuredReplyChunk *chunk, uint16_t flags, - uint16_t type, uint64_t handle, uint32_t length) +/* + * Prepare the header of a reply chunk for network transmission. + * + * On input, @iov is partially initialized: iov[0].iov_base must point + * to an uninitialized NBDReply, while the remaining @niov elements + * (if any) must be ready for transmission. This function then + * populates iov[0] for transmission. + */ +static inline void set_be_chunk(NBDClient *client, struct iovec *iov, + size_t niov, uint16_t flags, uint16_t type, + NBDRequest *request) { - stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC); - stw_be_p(&chunk->flags, flags); - stw_be_p(&chunk->type, type); - stq_be_p(&chunk->handle, handle); - stl_be_p(&chunk->length, length); + size_t i, length = 0; + + for (i = 1; i < niov; i++) { + length += iov[i].iov_len; + } + assert(length <= NBD_MAX_BUFFER_SIZE + sizeof(NBDStructuredReadData)); + + if (client->mode >= NBD_MODE_EXTENDED) { + NBDExtendedReplyChunk *chunk = iov->iov_base; + + iov[0].iov_len = sizeof(*chunk); + stl_be_p(&chunk->magic, NBD_EXTENDED_REPLY_MAGIC); + stw_be_p(&chunk->flags, flags); + stw_be_p(&chunk->type, type); + stq_be_p(&chunk->cookie, request->cookie); + stq_be_p(&chunk->offset, request->from); + stq_be_p(&chunk->length, length); + } else { + NBDStructuredReplyChunk *chunk = iov->iov_base; + + iov[0].iov_len = sizeof(*chunk); + stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC); + stw_be_p(&chunk->flags, flags); + stw_be_p(&chunk->type, type); + stq_be_p(&chunk->cookie, request->cookie); + stl_be_p(&chunk->length, length); + } } -static int coroutine_fn nbd_co_send_structured_done(NBDClient *client, - uint64_t handle, - Error **errp) +static int coroutine_fn nbd_co_send_chunk_done(NBDClient *client, + NBDRequest *request, + Error **errp) { - NBDStructuredReplyChunk chunk; + NBDReply hdr; struct iovec iov[] = { - {.iov_base = &chunk, .iov_len = sizeof(chunk)}, + {.iov_base = &hdr}, }; - trace_nbd_co_send_structured_done(handle); - set_be_chunk(&chunk, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_NONE, handle, 0); - + trace_nbd_co_send_chunk_done(request->cookie); + set_be_chunk(client, iov, 1, NBD_REPLY_FLAG_DONE, + NBD_REPLY_TYPE_NONE, request); return nbd_co_send_iov(client, iov, 1, errp); } -static int coroutine_fn nbd_co_send_structured_read(NBDClient *client, - uint64_t handle, - uint64_t offset, - void *data, - size_t size, - bool final, - Error **errp) +static int coroutine_fn nbd_co_send_chunk_read(NBDClient *client, + NBDRequest *request, + uint64_t offset, + void *data, + uint64_t size, + bool final, + Error **errp) { + NBDReply hdr; NBDStructuredReadData chunk; struct iovec iov[] = { + {.iov_base = &hdr}, {.iov_base = &chunk, .iov_len = sizeof(chunk)}, {.iov_base = data, .iov_len = size} }; - assert(size); - trace_nbd_co_send_structured_read(handle, offset, data, size); - set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0, - NBD_REPLY_TYPE_OFFSET_DATA, handle, - sizeof(chunk) - sizeof(chunk.h) + size); + assert(size && size <= NBD_MAX_BUFFER_SIZE); + trace_nbd_co_send_chunk_read(request->cookie, offset, data, size); + set_be_chunk(client, iov, 3, final ? NBD_REPLY_FLAG_DONE : 0, + NBD_REPLY_TYPE_OFFSET_DATA, request); stq_be_p(&chunk.offset, offset); - return nbd_co_send_iov(client, iov, 2, errp); + return nbd_co_send_iov(client, iov, 3, errp); } -static int coroutine_fn nbd_co_send_structured_error(NBDClient *client, - uint64_t handle, - uint32_t error, - const char *msg, - Error **errp) +static int coroutine_fn nbd_co_send_chunk_error(NBDClient *client, + NBDRequest *request, + uint32_t error, + const char *msg, + Error **errp) { + NBDReply hdr; NBDStructuredError chunk; int nbd_err = system_errno_to_nbd_errno(error); struct iovec iov[] = { + {.iov_base = &hdr}, {.iov_base = &chunk, .iov_len = sizeof(chunk)}, {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0}, }; assert(nbd_err); - trace_nbd_co_send_structured_error(handle, nbd_err, - nbd_err_lookup(nbd_err), msg ? msg : ""); - set_be_chunk(&chunk.h, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_ERROR, handle, - sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len); + trace_nbd_co_send_chunk_error(request->cookie, nbd_err, + nbd_err_lookup(nbd_err), msg ? msg : ""); + set_be_chunk(client, iov, 3, NBD_REPLY_FLAG_DONE, + NBD_REPLY_TYPE_ERROR, request); stl_be_p(&chunk.error, nbd_err); - stw_be_p(&chunk.message_length, iov[1].iov_len); + stw_be_p(&chunk.message_length, iov[2].iov_len); - return nbd_co_send_iov(client, iov, 1 + !!iov[1].iov_len, errp); + return nbd_co_send_iov(client, iov, 3, errp); } /* Do a sparse read and send the structured reply to the client. - * Returns -errno if sending fails. bdrv_block_status_above() failure is + * Returns -errno if sending fails. blk_co_block_status_above() failure is * reported to the client, at which point this function succeeds. */ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client, - uint64_t handle, + NBDRequest *request, uint64_t offset, uint8_t *data, - size_t size, + uint64_t size, Error **errp) { int ret = 0; NBDExport *exp = client->exp; size_t progress = 0; + assert(size <= NBD_MAX_BUFFER_SIZE); while (progress < size) { int64_t pnum; - int status = bdrv_block_status_above(blk_bs(exp->common.blk), NULL, - offset + progress, - size - progress, &pnum, NULL, - NULL); + int status = blk_co_block_status_above(exp->common.blk, NULL, + offset + progress, + size - progress, &pnum, NULL, + NULL); bool final; if (status < 0) { char *msg = g_strdup_printf("unable to check for holes: %s", strerror(-status)); - ret = nbd_co_send_structured_error(client, handle, -status, msg, - errp); + ret = nbd_co_send_chunk_error(client, request, -status, msg, errp); g_free(msg); return ret; } assert(pnum && pnum <= size - progress); final = progress + pnum == size; if (status & BDRV_BLOCK_ZERO) { + NBDReply hdr; NBDStructuredReadHole chunk; struct iovec iov[] = { + {.iov_base = &hdr}, {.iov_base = &chunk, .iov_len = sizeof(chunk)}, }; - trace_nbd_co_send_structured_read_hole(handle, offset + progress, - pnum); - set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0, - NBD_REPLY_TYPE_OFFSET_HOLE, - handle, sizeof(chunk) - sizeof(chunk.h)); + trace_nbd_co_send_chunk_read_hole(request->cookie, + offset + progress, pnum); + set_be_chunk(client, iov, 2, + final ? NBD_REPLY_FLAG_DONE : 0, + NBD_REPLY_TYPE_OFFSET_HOLE, request); stq_be_p(&chunk.offset, offset + progress); stl_be_p(&chunk.length, pnum); - ret = nbd_co_send_iov(client, iov, 1, errp); + ret = nbd_co_send_iov(client, iov, 2, errp); } else { - ret = blk_pread(exp->common.blk, offset + progress, - data + progress, pnum); + ret = blk_co_pread(exp->common.blk, offset + progress, pnum, + data + progress, 0); if (ret < 0) { error_setg_errno(errp, -ret, "reading from file failed"); break; } - ret = nbd_co_send_structured_read(client, handle, offset + progress, - data + progress, pnum, final, - errp); + ret = nbd_co_send_chunk_read(client, request, offset + progress, + data + progress, pnum, final, errp); } if (ret < 0) { @@ -2032,20 +2215,24 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client, } typedef struct NBDExtentArray { - NBDExtent *extents; + NBDExtent64 *extents; unsigned int nb_alloc; unsigned int count; uint64_t total_length; + bool extended; bool can_add; bool converted_to_be; } NBDExtentArray; -static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc) +static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc, + NBDMode mode) { NBDExtentArray *ea = g_new0(NBDExtentArray, 1); + assert(mode >= NBD_MODE_STRUCTURED); ea->nb_alloc = nb_alloc; - ea->extents = g_new(NBDExtent, nb_alloc); + ea->extents = g_new(NBDExtent64, nb_alloc); + ea->extended = mode >= NBD_MODE_EXTENDED; ea->can_add = true; return ea; @@ -2056,7 +2243,7 @@ static void nbd_extent_array_free(NBDExtentArray *ea) g_free(ea->extents); g_free(ea); } -G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free); +G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free) /* Further modifications of the array after conversion are abandoned */ static void nbd_extent_array_convert_to_be(NBDExtentArray *ea) @@ -2064,39 +2251,67 @@ static void nbd_extent_array_convert_to_be(NBDExtentArray *ea) int i; assert(!ea->converted_to_be); + assert(ea->extended); + ea->can_add = false; + ea->converted_to_be = true; + + for (i = 0; i < ea->count; i++) { + ea->extents[i].length = cpu_to_be64(ea->extents[i].length); + ea->extents[i].flags = cpu_to_be64(ea->extents[i].flags); + } +} + +/* Further modifications of the array after conversion are abandoned */ +static NBDExtent32 *nbd_extent_array_convert_to_narrow(NBDExtentArray *ea) +{ + int i; + NBDExtent32 *extents = g_new(NBDExtent32, ea->count); + + assert(!ea->converted_to_be); + assert(!ea->extended); ea->can_add = false; ea->converted_to_be = true; for (i = 0; i < ea->count; i++) { - ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags); - ea->extents[i].length = cpu_to_be32(ea->extents[i].length); + assert((ea->extents[i].length | ea->extents[i].flags) <= UINT32_MAX); + extents[i].length = cpu_to_be32(ea->extents[i].length); + extents[i].flags = cpu_to_be32(ea->extents[i].flags); } + + return extents; } /* * Add extent to NBDExtentArray. If extent can't be added (no available space), * return -1. * For safety, when returning -1 for the first time, .can_add is set to false, - * further call to nbd_extent_array_add() will crash. - * (to avoid the situation, when after failing to add an extent (returned -1), - * user miss this failure and add another extent, which is successfully added - * (array is full, but new extent may be squashed into the last one), then we - * have invalid array with skipped extent) + * and further calls to nbd_extent_array_add() will crash. + * (this avoids the situation where a caller ignores failure to add one extent, + * where adding another extent that would squash into the last array entry + * would result in an incorrect range reported to the client) */ static int nbd_extent_array_add(NBDExtentArray *ea, - uint32_t length, uint32_t flags) + uint64_t length, uint32_t flags) { assert(ea->can_add); if (!length) { return 0; } + if (!ea->extended) { + assert(length <= UINT32_MAX); + } /* Extend previous extent if flags are the same */ if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) { - uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length; + uint64_t sum = length + ea->extents[ea->count - 1].length; - if (sum <= UINT32_MAX) { + /* + * sum cannot overflow: the block layer bounds image size at + * 2^63, and ea->extents[].length comes from the block layer. + */ + assert(sum >= length); + if (sum <= UINT32_MAX || ea->extended) { ea->extents[ea->count - 1].length = sum; ea->total_length += length; return 0; @@ -2109,20 +2324,21 @@ static int nbd_extent_array_add(NBDExtentArray *ea, } ea->total_length += length; - ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags}; + ea->extents[ea->count] = (NBDExtent64) {.length = length, .flags = flags}; ea->count++; return 0; } -static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset, - uint64_t bytes, NBDExtentArray *ea) +static int coroutine_fn blockstatus_to_extents(BlockBackend *blk, + uint64_t offset, uint64_t bytes, + NBDExtentArray *ea) { while (bytes) { uint32_t flags; int64_t num; - int ret = bdrv_block_status_above(bs, NULL, offset, bytes, &num, - NULL, NULL); + int ret = blk_co_block_status_above(blk, NULL, offset, bytes, &num, + NULL, NULL); if (ret < 0) { return ret; @@ -2142,13 +2358,14 @@ static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset, return 0; } -static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset, - uint64_t bytes, NBDExtentArray *ea) +static int coroutine_fn blockalloc_to_extents(BlockBackend *blk, + uint64_t offset, uint64_t bytes, + NBDExtentArray *ea) { while (bytes) { int64_t num; - int ret = bdrv_is_allocated_above(bs, NULL, false, offset, bytes, - &num); + int ret = blk_co_is_allocated_above(blk, NULL, false, offset, bytes, + &num); if (ret < 0) { return ret; @@ -2171,50 +2388,72 @@ static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset, * @ea is converted to BE by the function * @last controls whether NBD_REPLY_FLAG_DONE is sent. */ -static int nbd_co_send_extents(NBDClient *client, uint64_t handle, - NBDExtentArray *ea, - bool last, uint32_t context_id, Error **errp) -{ - NBDStructuredMeta chunk; - struct iovec iov[] = { - {.iov_base = &chunk, .iov_len = sizeof(chunk)}, - {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])} - }; +static int coroutine_fn +nbd_co_send_extents(NBDClient *client, NBDRequest *request, NBDExtentArray *ea, + bool last, uint32_t context_id, Error **errp) +{ + NBDReply hdr; + NBDStructuredMeta meta; + NBDExtendedMeta meta_ext; + g_autofree NBDExtent32 *extents = NULL; + uint16_t type; + struct iovec iov[] = { {.iov_base = &hdr}, {0}, {0} }; + + if (client->mode >= NBD_MODE_EXTENDED) { + type = NBD_REPLY_TYPE_BLOCK_STATUS_EXT; + + iov[1].iov_base = &meta_ext; + iov[1].iov_len = sizeof(meta_ext); + stl_be_p(&meta_ext.context_id, context_id); + stl_be_p(&meta_ext.count, ea->count); + + nbd_extent_array_convert_to_be(ea); + iov[2].iov_base = ea->extents; + iov[2].iov_len = ea->count * sizeof(ea->extents[0]); + } else { + type = NBD_REPLY_TYPE_BLOCK_STATUS; - nbd_extent_array_convert_to_be(ea); + iov[1].iov_base = &meta; + iov[1].iov_len = sizeof(meta); + stl_be_p(&meta.context_id, context_id); - trace_nbd_co_send_extents(handle, ea->count, context_id, ea->total_length, - last); - set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0, - NBD_REPLY_TYPE_BLOCK_STATUS, - handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len); - stl_be_p(&chunk.context_id, context_id); + extents = nbd_extent_array_convert_to_narrow(ea); + iov[2].iov_base = extents; + iov[2].iov_len = ea->count * sizeof(extents[0]); + } - return nbd_co_send_iov(client, iov, 2, errp); + trace_nbd_co_send_extents(request->cookie, ea->count, context_id, + ea->total_length, last); + set_be_chunk(client, iov, 3, last ? NBD_REPLY_FLAG_DONE : 0, type, + request); + + return nbd_co_send_iov(client, iov, 3, errp); } /* Get block status from the exported device and send it to the client */ -static int nbd_co_send_block_status(NBDClient *client, uint64_t handle, - BlockDriverState *bs, uint64_t offset, - uint32_t length, bool dont_fragment, - bool last, uint32_t context_id, - Error **errp) +static int +coroutine_fn nbd_co_send_block_status(NBDClient *client, NBDRequest *request, + BlockBackend *blk, uint64_t offset, + uint64_t length, bool dont_fragment, + bool last, uint32_t context_id, + Error **errp) { int ret; unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS; - g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents); + g_autoptr(NBDExtentArray) ea = + nbd_extent_array_new(nb_extents, client->mode); if (context_id == NBD_META_ID_BASE_ALLOCATION) { - ret = blockstatus_to_extents(bs, offset, length, ea); + ret = blockstatus_to_extents(blk, offset, length, ea); } else { - ret = blockalloc_to_extents(bs, offset, length, ea); + ret = blockalloc_to_extents(blk, offset, length, ea); } if (ret < 0) { - return nbd_co_send_structured_error( - client, handle, -ret, "can't get block status", errp); + return nbd_co_send_chunk_error(client, request, -ret, + "can't get block status", errp); } - return nbd_co_send_extents(client, handle, ea, last, context_id, errp); + return nbd_co_send_extents(client, request, ea, last, context_id, errp); } /* Populate @ea from a dirty bitmap. */ @@ -2225,11 +2464,12 @@ static void bitmap_to_extents(BdrvDirtyBitmap *bitmap, int64_t start, dirty_start, dirty_count; int64_t end = offset + length; bool full = false; + int64_t bound = es->extended ? INT64_MAX : INT32_MAX; bdrv_dirty_bitmap_lock(bitmap); for (start = offset; - bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX, + bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, bound, &dirty_start, &dirty_count); start = dirty_start + dirty_count) { @@ -2249,17 +2489,105 @@ static void bitmap_to_extents(BdrvDirtyBitmap *bitmap, bdrv_dirty_bitmap_unlock(bitmap); } -static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle, - BdrvDirtyBitmap *bitmap, uint64_t offset, - uint32_t length, bool dont_fragment, bool last, - uint32_t context_id, Error **errp) +static int coroutine_fn nbd_co_send_bitmap(NBDClient *client, + NBDRequest *request, + BdrvDirtyBitmap *bitmap, + uint64_t offset, + uint64_t length, bool dont_fragment, + bool last, uint32_t context_id, + Error **errp) { unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS; - g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents); + g_autoptr(NBDExtentArray) ea = + nbd_extent_array_new(nb_extents, client->mode); bitmap_to_extents(bitmap, offset, length, ea); - return nbd_co_send_extents(client, handle, ea, last, context_id, errp); + return nbd_co_send_extents(client, request, ea, last, context_id, errp); +} + +/* + * nbd_co_block_status_payload_read + * Called when a client wants a subset of negotiated contexts via a + * BLOCK_STATUS payload. Check the payload for valid length and + * contents. On success, return 0 with request updated to effective + * length. If request was invalid but all payload consumed, return 0 + * with request->len and request->contexts->count set to 0 (which will + * trigger an appropriate NBD_EINVAL response later on). Return + * negative errno if the payload was not fully consumed. + */ +static int +nbd_co_block_status_payload_read(NBDClient *client, NBDRequest *request, + Error **errp) +{ + uint64_t payload_len = request->len; + g_autofree char *buf = NULL; + size_t count, i, nr_bitmaps; + uint32_t id; + + if (payload_len > NBD_MAX_BUFFER_SIZE) { + error_setg(errp, "len (%" PRIu64 ") is larger than max len (%u)", + request->len, NBD_MAX_BUFFER_SIZE); + return -EINVAL; + } + + assert(client->contexts.exp == client->exp); + nr_bitmaps = client->exp->nr_export_bitmaps; + request->contexts = g_new0(NBDMetaContexts, 1); + request->contexts->exp = client->exp; + + if (payload_len % sizeof(uint32_t) || + payload_len < sizeof(NBDBlockStatusPayload) || + payload_len > (sizeof(NBDBlockStatusPayload) + + sizeof(id) * client->contexts.count)) { + goto skip; + } + + buf = g_malloc(payload_len); + if (nbd_read(client->ioc, buf, payload_len, + "CMD_BLOCK_STATUS data", errp) < 0) { + return -EIO; + } + trace_nbd_co_receive_request_payload_received(request->cookie, + payload_len); + request->contexts->bitmaps = g_new0(bool, nr_bitmaps); + count = (payload_len - sizeof(NBDBlockStatusPayload)) / sizeof(id); + payload_len = 0; + + for (i = 0; i < count; i++) { + id = ldl_be_p(buf + sizeof(NBDBlockStatusPayload) + sizeof(id) * i); + if (id == NBD_META_ID_BASE_ALLOCATION) { + if (!client->contexts.base_allocation || + request->contexts->base_allocation) { + goto skip; + } + request->contexts->base_allocation = true; + } else if (id == NBD_META_ID_ALLOCATION_DEPTH) { + if (!client->contexts.allocation_depth || + request->contexts->allocation_depth) { + goto skip; + } + request->contexts->allocation_depth = true; + } else { + unsigned idx = id - NBD_META_ID_DIRTY_BITMAP; + + if (idx >= nr_bitmaps || !client->contexts.bitmaps[idx] || + request->contexts->bitmaps[idx]) { + goto skip; + } + request->contexts->bitmaps[idx] = true; + } + } + + request->len = ldq_be_p(buf); + request->contexts->count = count; + return 0; + + skip: + trace_nbd_co_receive_block_status_payload_compliance(request->from, + request->len); + request->len = request->contexts->count = 0; + return nbd_drop(client->ioc, payload_len, errp); } /* nbd_co_receive_request @@ -2269,76 +2597,158 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle, * to the client (although the caller may still need to disconnect after * reporting the error). */ -static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, - Error **errp) +static int coroutine_fn nbd_co_receive_request(NBDRequestData *req, + NBDRequest *request, + Error **errp) { NBDClient *client = req->client; - int valid_flags; + bool extended_with_payload; + bool check_length = false; + bool check_rofs = false; + bool allocate_buffer = false; + bool payload_okay = false; + uint64_t payload_len = 0; + int valid_flags = NBD_CMD_FLAG_FUA; int ret; g_assert(qemu_in_coroutine()); - assert(client->recv_coroutine == qemu_coroutine_self()); ret = nbd_receive_request(client, request, errp); if (ret < 0) { - return ret; + return ret; } - trace_nbd_co_receive_request_decode_type(request->handle, request->type, + trace_nbd_co_receive_request_decode_type(request->cookie, request->type, nbd_cmd_lookup(request->type)); - - if (request->type != NBD_CMD_WRITE) { - /* No payload, we are ready to read the next request. */ - req->complete = true; + extended_with_payload = client->mode >= NBD_MODE_EXTENDED && + request->flags & NBD_CMD_FLAG_PAYLOAD_LEN; + if (extended_with_payload) { + payload_len = request->len; + check_length = true; } - if (request->type == NBD_CMD_DISC) { + switch (request->type) { + case NBD_CMD_DISC: /* Special case: we're going to disconnect without a reply, * whether or not flags, from, or len are bogus */ + req->complete = true; return -EIO; - } - if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE || - request->type == NBD_CMD_CACHE) - { - if (request->len > NBD_MAX_BUFFER_SIZE) { - error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)", - request->len, NBD_MAX_BUFFER_SIZE); - return -EINVAL; + case NBD_CMD_READ: + if (client->mode >= NBD_MODE_STRUCTURED) { + valid_flags |= NBD_CMD_FLAG_DF; + } + check_length = true; + allocate_buffer = true; + break; + + case NBD_CMD_WRITE: + if (client->mode >= NBD_MODE_EXTENDED) { + if (!extended_with_payload) { + /* The client is noncompliant. Trace it, but proceed. */ + trace_nbd_co_receive_ext_payload_compliance(request->from, + request->len); + } + valid_flags |= NBD_CMD_FLAG_PAYLOAD_LEN; } + payload_okay = true; + payload_len = request->len; + check_length = true; + allocate_buffer = true; + check_rofs = true; + break; + + case NBD_CMD_FLUSH: + break; + + case NBD_CMD_TRIM: + check_rofs = true; + break; + + case NBD_CMD_CACHE: + check_length = true; + break; + + case NBD_CMD_WRITE_ZEROES: + valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO; + check_rofs = true; + break; - if (request->type != NBD_CMD_CACHE) { - req->data = blk_try_blockalign(client->exp->common.blk, - request->len); - if (req->data == NULL) { - error_setg(errp, "No memory"); - return -ENOMEM; + case NBD_CMD_BLOCK_STATUS: + if (extended_with_payload) { + ret = nbd_co_block_status_payload_read(client, request, errp); + if (ret < 0) { + return ret; } + /* payload now consumed */ + check_length = false; + payload_len = 0; + valid_flags |= NBD_CMD_FLAG_PAYLOAD_LEN; + } else { + request->contexts = &client->contexts; } + valid_flags |= NBD_CMD_FLAG_REQ_ONE; + break; + + default: + /* Unrecognized, will fail later */ + ; } - if (request->type == NBD_CMD_WRITE) { - if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data", - errp) < 0) - { + /* Payload and buffer handling. */ + if (!payload_len) { + req->complete = true; + } + if (check_length && request->len > NBD_MAX_BUFFER_SIZE) { + /* READ, WRITE, CACHE */ + error_setg(errp, "len (%" PRIu64 ") is larger than max len (%u)", + request->len, NBD_MAX_BUFFER_SIZE); + return -EINVAL; + } + if (payload_len && !payload_okay) { + /* + * For now, we don't support payloads on other commands; but + * we can keep the connection alive by ignoring the payload. + * We will fail the command later with NBD_EINVAL for the use + * of an unsupported flag (and not for access beyond bounds). + */ + assert(request->type != NBD_CMD_WRITE); + request->len = 0; + } + if (allocate_buffer) { + /* READ, WRITE */ + req->data = blk_try_blockalign(client->exp->common.blk, + request->len); + if (req->data == NULL) { + error_setg(errp, "No memory"); + return -ENOMEM; + } + } + if (payload_len) { + if (payload_okay) { + /* WRITE */ + assert(req->data); + ret = nbd_read(client->ioc, req->data, payload_len, + "CMD_WRITE data", errp); + } else { + ret = nbd_drop(client->ioc, payload_len, errp); + } + if (ret < 0) { return -EIO; } req->complete = true; - - trace_nbd_co_receive_request_payload_received(request->handle, - request->len); + trace_nbd_co_receive_request_payload_received(request->cookie, + payload_len); } /* Sanity checks. */ - if (client->exp->nbdflags & NBD_FLAG_READ_ONLY && - (request->type == NBD_CMD_WRITE || - request->type == NBD_CMD_WRITE_ZEROES || - request->type == NBD_CMD_TRIM)) { + if (client->exp->nbdflags & NBD_FLAG_READ_ONLY && check_rofs) { + /* WRITE, TRIM, WRITE_ZEROES */ error_setg(errp, "Export is read-only"); return -EROFS; } if (request->from > client->exp->size || request->len > client->exp->size - request->from) { - error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32 + error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu64 ", Size: %" PRIu64, request->from, request->len, client->exp->size); return (request->type == NBD_CMD_WRITE || @@ -2355,14 +2765,6 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, request->len, client->check_align); } - valid_flags = NBD_CMD_FLAG_FUA; - if (request->type == NBD_CMD_READ && client->structured_reply) { - valid_flags |= NBD_CMD_FLAG_DF; - } else if (request->type == NBD_CMD_WRITE_ZEROES) { - valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO; - } else if (request->type == NBD_CMD_BLOCK_STATUS) { - valid_flags |= NBD_CMD_FLAG_REQ_ONE; - } if (request->flags & ~valid_flags) { error_setg(errp, "unsupported flags for command %s (got 0x%x)", nbd_cmd_lookup(request->type), request->flags); @@ -2377,16 +2779,17 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, * Returns 0 if connection is still live, -errno on failure to talk to client */ static coroutine_fn int nbd_send_generic_reply(NBDClient *client, - uint64_t handle, + NBDRequest *request, int ret, const char *error_msg, Error **errp) { - if (client->structured_reply && ret < 0) { - return nbd_co_send_structured_error(client, handle, -ret, error_msg, - errp); + if (client->mode >= NBD_MODE_STRUCTURED && ret < 0) { + return nbd_co_send_chunk_error(client, request, -ret, error_msg, errp); + } else if (client->mode >= NBD_MODE_EXTENDED) { + return nbd_co_send_chunk_done(client, request, errp); } else { - return nbd_co_send_simple_reply(client, handle, ret < 0 ? -ret : 0, + return nbd_co_send_simple_reply(client, request, ret < 0 ? -ret : 0, NULL, 0, errp); } } @@ -2401,39 +2804,39 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request, NBDExport *exp = client->exp; assert(request->type == NBD_CMD_READ); + assert(request->len <= NBD_MAX_BUFFER_SIZE); /* XXX: NBD Protocol only documents use of FUA with WRITE */ if (request->flags & NBD_CMD_FLAG_FUA) { ret = blk_co_flush(exp->common.blk); if (ret < 0) { - return nbd_send_generic_reply(client, request->handle, ret, + return nbd_send_generic_reply(client, request, ret, "flush failed", errp); } } - if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) && - request->len) + if (client->mode >= NBD_MODE_STRUCTURED && + !(request->flags & NBD_CMD_FLAG_DF) && request->len) { - return nbd_co_send_sparse_read(client, request->handle, request->from, + return nbd_co_send_sparse_read(client, request, request->from, data, request->len, errp); } - ret = blk_pread(exp->common.blk, request->from, data, request->len); + ret = blk_co_pread(exp->common.blk, request->from, request->len, data, 0); if (ret < 0) { - return nbd_send_generic_reply(client, request->handle, ret, + return nbd_send_generic_reply(client, request, ret, "reading from file failed", errp); } - if (client->structured_reply) { + if (client->mode >= NBD_MODE_STRUCTURED) { if (request->len) { - return nbd_co_send_structured_read(client, request->handle, - request->from, data, - request->len, true, errp); + return nbd_co_send_chunk_read(client, request, request->from, data, + request->len, true, errp); } else { - return nbd_co_send_structured_done(client, request->handle, errp); + return nbd_co_send_chunk_done(client, request, errp); } } else { - return nbd_co_send_simple_reply(client, request->handle, 0, + return nbd_co_send_simple_reply(client, request, 0, data, request->len, errp); } } @@ -2452,11 +2855,12 @@ static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request, NBDExport *exp = client->exp; assert(request->type == NBD_CMD_CACHE); + assert(request->len <= NBD_MAX_BUFFER_SIZE); ret = blk_co_preadv(exp->common.blk, request->from, request->len, NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH); - return nbd_send_generic_reply(client, request->handle, ret, + return nbd_send_generic_reply(client, request, ret, "caching data failed", errp); } @@ -2485,9 +2889,10 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, if (request->flags & NBD_CMD_FLAG_FUA) { flags |= BDRV_REQ_FUA; } - ret = blk_pwrite(exp->common.blk, request->from, data, request->len, - flags); - return nbd_send_generic_reply(client, request->handle, ret, + assert(request->len <= NBD_MAX_BUFFER_SIZE); + ret = blk_co_pwrite(exp->common.blk, request->from, request->len, data, + flags); + return nbd_send_generic_reply(client, request, ret, "writing to file failed", errp); case NBD_CMD_WRITE_ZEROES: @@ -2501,17 +2906,9 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, if (request->flags & NBD_CMD_FLAG_FAST_ZERO) { flags |= BDRV_REQ_NO_FALLBACK; } - ret = 0; - /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */ - while (ret >= 0 && request->len) { - int align = client->check_align ?: 1; - int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES, - align)); - ret = blk_pwrite_zeroes(exp->common.blk, request->from, len, flags); - request->len -= len; - request->from += len; - } - return nbd_send_generic_reply(client, request->handle, ret, + ret = blk_co_pwrite_zeroes(exp->common.blk, request->from, request->len, + flags); + return nbd_send_generic_reply(client, request, ret, "writing to file failed", errp); case NBD_CMD_DISC: @@ -2520,38 +2917,32 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, case NBD_CMD_FLUSH: ret = blk_co_flush(exp->common.blk); - return nbd_send_generic_reply(client, request->handle, ret, + return nbd_send_generic_reply(client, request, ret, "flush failed", errp); case NBD_CMD_TRIM: - ret = 0; - /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */ - while (ret >= 0 && request->len) { - int align = client->check_align ?: 1; - int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES, - align)); - ret = blk_co_pdiscard(exp->common.blk, request->from, len); - request->len -= len; - request->from += len; - } + ret = blk_co_pdiscard(exp->common.blk, request->from, request->len); if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) { ret = blk_co_flush(exp->common.blk); } - return nbd_send_generic_reply(client, request->handle, ret, + return nbd_send_generic_reply(client, request, ret, "discard failed", errp); case NBD_CMD_BLOCK_STATUS: - if (!request->len) { - return nbd_send_generic_reply(client, request->handle, -EINVAL, - "need non-zero length", errp); - } - if (client->export_meta.count) { + assert(request->contexts); + assert(client->mode >= NBD_MODE_EXTENDED || + request->len <= UINT32_MAX); + if (request->contexts->count) { bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE; - int contexts_remaining = client->export_meta.count; + int contexts_remaining = request->contexts->count; - if (client->export_meta.base_allocation) { - ret = nbd_co_send_block_status(client, request->handle, - blk_bs(exp->common.blk), + if (!request->len) { + return nbd_send_generic_reply(client, request, -EINVAL, + "need non-zero length", errp); + } + if (request->contexts->base_allocation) { + ret = nbd_co_send_block_status(client, request, + exp->common.blk, request->from, request->len, dont_fragment, !--contexts_remaining, @@ -2562,9 +2953,9 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, } } - if (client->export_meta.allocation_depth) { - ret = nbd_co_send_block_status(client, request->handle, - blk_bs(exp->common.blk), + if (request->contexts->allocation_depth) { + ret = nbd_co_send_block_status(client, request, + exp->common.blk, request->from, request->len, dont_fragment, !--contexts_remaining, @@ -2575,11 +2966,12 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, } } + assert(request->contexts->exp == client->exp); for (i = 0; i < client->exp->nr_export_bitmaps; i++) { - if (!client->export_meta.bitmaps[i]) { + if (!request->contexts->bitmaps[i]) { continue; } - ret = nbd_co_send_bitmap(client, request->handle, + ret = nbd_co_send_bitmap(client, request, client->exp->export_bitmaps[i], request->from, request->len, dont_fragment, !--contexts_remaining, @@ -2592,8 +2984,12 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, assert(!contexts_remaining); return 0; + } else if (client->contexts.count) { + return nbd_send_generic_reply(client, request, -EINVAL, + "CMD_BLOCK_STATUS payload not valid", + errp); } else { - return nbd_send_generic_reply(client, request->handle, -EINVAL, + return nbd_send_generic_reply(client, request, -EINVAL, "CMD_BLOCK_STATUS not negotiated", errp); } @@ -2601,7 +2997,7 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, default: msg = g_strdup_printf("invalid request type (%" PRIu32 ") received", request->type); - ret = nbd_send_generic_reply(client, request->handle, -EINVAL, msg, + ret = nbd_send_generic_reply(client, request, -EINVAL, msg, errp); g_free(msg); return ret; @@ -2611,16 +3007,24 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, /* Owns a reference to the NBDClient passed as opaque. */ static coroutine_fn void nbd_trip(void *opaque) { - NBDClient *client = opaque; - NBDRequestData *req; + NBDRequestData *req = opaque; + NBDClient *client = req->client; NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */ int ret; Error *local_err = NULL; + /* + * Note that nbd_client_put() and client_close() must be called from the + * main loop thread. Use aio_co_reschedule_self() to switch AioContext + * before calling these functions. + */ + trace_nbd_trip(); + + qemu_mutex_lock(&client->lock); + if (client->closing) { - nbd_client_put(client); - return; + goto done; } if (client->quiescing) { @@ -2628,14 +3032,25 @@ static coroutine_fn void nbd_trip(void *opaque) * We're switching between AIO contexts. Don't attempt to receive a new * request and kick the main context which may be waiting for us. */ - nbd_client_put(client); client->recv_coroutine = NULL; aio_wait_kick(); - return; + goto done; } - req = nbd_request_get(client); - ret = nbd_co_receive_request(req, &request, &local_err); + /* + * nbd_co_receive_request() returns -EAGAIN when nbd_drained_begin() has + * set client->quiescing but by the time we get back nbd_drained_end() may + * have already cleared client->quiescing. In that case we try again + * because nothing else will spawn an nbd_trip() coroutine until we set + * client->recv_coroutine = NULL further down. + */ + do { + assert(client->recv_coroutine == qemu_coroutine_self()); + qemu_mutex_unlock(&client->lock); + ret = nbd_co_receive_request(req, &request, &local_err); + qemu_mutex_lock(&client->lock); + } while (ret == -EAGAIN && !client->quiescing); + client->recv_coroutine = NULL; if (client->closing) { @@ -2647,34 +3062,47 @@ static coroutine_fn void nbd_trip(void *opaque) } if (ret == -EAGAIN) { - assert(client->quiescing); goto done; } nbd_client_receive_next_request(client); + if (ret == -EIO) { goto disconnect; } + qemu_mutex_unlock(&client->lock); + qio_channel_set_cork(client->ioc, true); + if (ret < 0) { - /* It wans't -EIO, so, according to nbd_co_receive_request() + /* It wasn't -EIO, so, according to nbd_co_receive_request() * semantics, we should return the error to the client. */ Error *export_err = local_err; local_err = NULL; - ret = nbd_send_generic_reply(client, request.handle, -EINVAL, + ret = nbd_send_generic_reply(client, &request, -EINVAL, error_get_pretty(export_err), &local_err); error_free(export_err); } else { ret = nbd_handle_request(client, &request, req->data, &local_err); } + if (request.contexts && request.contexts != &client->contexts) { + assert(request.type == NBD_CMD_BLOCK_STATUS); + g_free(request.contexts->bitmaps); + g_free(request.contexts); + } + + qio_channel_set_cork(client->ioc, false); + qemu_mutex_lock(&client->lock); + if (ret < 0) { error_prepend(&local_err, "Failed to send reply: "); goto disconnect; } - /* We must disconnect after NBD_CMD_WRITE if we did not - * read the payload. + /* + * We must disconnect after NBD_CMD_WRITE or BLOCK_STATUS with + * payload if we did not read the payload. */ if (!req->complete) { error_setg(&local_err, "Request handling failed in intermediate state"); @@ -2683,24 +3111,41 @@ static coroutine_fn void nbd_trip(void *opaque) done: nbd_request_put(req); - nbd_client_put(client); + + qemu_mutex_unlock(&client->lock); + + if (!nbd_client_put_nonzero(client)) { + aio_co_reschedule_self(qemu_get_aio_context()); + nbd_client_put(client); + } return; disconnect: if (local_err) { error_reportf_err(local_err, "Disconnect client, due to: "); } + nbd_request_put(req); + qemu_mutex_unlock(&client->lock); + + aio_co_reschedule_self(qemu_get_aio_context()); client_close(client, true); nbd_client_put(client); } +/* + * Runs in export AioContext and main loop thread. Caller must hold + * client->lock. + */ static void nbd_client_receive_next_request(NBDClient *client) { + NBDRequestData *req; + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && !client->quiescing) { nbd_client_get(client); - client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); + req = nbd_request_get(client); + client->recv_coroutine = qemu_coroutine_create(nbd_trip, req); aio_co_schedule(client->exp->common.ctx, client->recv_coroutine); } } @@ -2720,7 +3165,9 @@ static coroutine_fn void nbd_co_client_start(void *opaque) return; } - nbd_client_receive_next_request(client); + WITH_QEMU_LOCK_GUARD(&client->lock) { + nbd_client_receive_next_request(client); + } } /* @@ -2737,6 +3184,7 @@ void nbd_client_new(QIOChannelSocket *sioc, Coroutine *co; client = g_new0(NBDClient, 1); + qemu_mutex_init(&client->lock); client->refcount = 1; client->tlscreds = tlscreds; if (tlscreds) { @@ -2744,6 +3192,7 @@ void nbd_client_new(QIOChannelSocket *sioc, } client->tlsauthz = g_strdup(tlsauthz); client->sioc = sioc; + qio_channel_set_delay(QIO_CHANNEL(sioc), false); object_ref(OBJECT(client->sioc)); client->ioc = QIO_CHANNEL(sioc); object_ref(OBJECT(client->ioc)); |