diff options
Diffstat (limited to 'migration/multifd-zstd.c')
-rw-r--r-- | migration/multifd-zstd.c | 134 |
1 files changed, 63 insertions, 71 deletions
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c index 693bddf8c9..02112255ad 100644 --- a/migration/multifd-zstd.c +++ b/migration/multifd-zstd.c @@ -13,10 +13,12 @@ #include "qemu/osdep.h" #include <zstd.h> #include "qemu/rcu.h" +#include "exec/ramblock.h" #include "exec/target_page.h" #include "qapi/error.h" #include "migration.h" #include "trace.h" +#include "options.h" #include "multifd.h" struct zstd_data { @@ -47,15 +49,14 @@ struct zstd_data { */ static int zstd_send_setup(MultiFDSendParams *p, Error **errp) { - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); struct zstd_data *z = g_new0(struct zstd_data, 1); int res; - p->data = z; + p->compress_data = z; z->zcs = ZSTD_createCStream(); if (!z->zcs) { g_free(z); - error_setg(errp, "multifd %d: zstd createCStream failed", p->id); + error_setg(errp, "multifd %u: zstd createCStream failed", p->id); return -1; } @@ -63,18 +64,17 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp) if (ZSTD_isError(res)) { ZSTD_freeCStream(z->zcs); g_free(z); - error_setg(errp, "multifd %d: initCStream failed with error %s", + error_setg(errp, "multifd %u: initCStream failed with error %s", p->id, ZSTD_getErrorName(res)); return -1; } - /* We will never have more than page_count pages */ - z->zbuff_len = page_count * qemu_target_page_size(); - z->zbuff_len *= 2; + /* This is the maximum size of the compressed buffer */ + z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE); z->zbuff = g_try_malloc(z->zbuff_len); if (!z->zbuff) { ZSTD_freeCStream(z->zcs); g_free(z); - error_setg(errp, "multifd %d: out of memory for zbuff", p->id); + error_setg(errp, "multifd %u: out of memory for zbuff", p->id); return -1; } return 0; @@ -86,17 +86,18 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp) * Close the channel and return memory. * * @p: Params for the channel that we are using + * @errp: pointer to an error */ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) { - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; ZSTD_freeCStream(z->zcs); z->zcs = NULL; g_free(z->zbuff); z->zbuff = NULL; - g_free(p->data); - p->data = NULL; + g_free(p->compress_data); + p->compress_data = NULL; } /** @@ -108,27 +109,31 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) * Returns 0 for success or -1 for error * * @p: Params for the channel that we are using - * @used: number of pages used + * @errp: pointer to an error */ -static int zstd_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp) +static int zstd_send_prepare(MultiFDSendParams *p, Error **errp) { - struct iovec *iov = p->pages->iov; - struct zstd_data *z = p->data; + MultiFDPages_t *pages = p->pages; + struct zstd_data *z = p->compress_data; int ret; uint32_t i; + if (!multifd_send_prepare_common(p)) { + goto out; + } + z->out.dst = z->zbuff; z->out.size = z->zbuff_len; z->out.pos = 0; - for (i = 0; i < used; i++) { + for (i = 0; i < pages->normal_num; i++) { ZSTD_EndDirective flush = ZSTD_e_continue; - if (i == used - 1) { + if (i == pages->normal_num - 1) { flush = ZSTD_e_flush; } - z->in.src = iov[i].iov_base; - z->in.size = iov[i].iov_len; + z->in.src = p->pages->block->host + pages->offset[i]; + z->in.size = p->page_size; z->in.pos = 0; /* @@ -144,42 +149,28 @@ static int zstd_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp) } while (ret > 0 && (z->in.size - z->in.pos > 0) && (z->out.size - z->out.pos > 0)); if (ret > 0 && (z->in.size - z->in.pos > 0)) { - error_setg(errp, "multifd %d: compressStream buffer too small", + error_setg(errp, "multifd %u: compressStream buffer too small", p->id); return -1; } if (ZSTD_isError(ret)) { - error_setg(errp, "multifd %d: compressStream error %s", + error_setg(errp, "multifd %u: compressStream error %s", p->id, ZSTD_getErrorName(ret)); return -1; } } + p->iov[p->iovs_num].iov_base = z->zbuff; + p->iov[p->iovs_num].iov_len = z->out.pos; + p->iovs_num++; p->next_packet_size = z->out.pos; - p->flags |= MULTIFD_FLAG_ZSTD; +out: + p->flags |= MULTIFD_FLAG_ZSTD; + multifd_send_fill_packet(p); return 0; } /** - * zstd_send_write: do the actual write of the data - * - * Do the actual write of the comprresed buffer. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @used: number of pages used - * @errp: pointer to an error - */ -static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) -{ - struct zstd_data *z = p->data; - - return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size, - errp); -} - -/** * zstd_recv_setup: setup receive side * * Create the compressed channel and buffer. @@ -191,15 +182,14 @@ static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) */ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) { - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); struct zstd_data *z = g_new0(struct zstd_data, 1); int ret; - p->data = z; + p->compress_data = z; z->zds = ZSTD_createDStream(); if (!z->zds) { g_free(z); - error_setg(errp, "multifd %d: zstd createDStream failed", p->id); + error_setg(errp, "multifd %u: zstd createDStream failed", p->id); return -1; } @@ -207,20 +197,18 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) if (ZSTD_isError(ret)) { ZSTD_freeDStream(z->zds); g_free(z); - error_setg(errp, "multifd %d: initDStream failed with error %s", + error_setg(errp, "multifd %u: initDStream failed with error %s", p->id, ZSTD_getErrorName(ret)); return -1; } - /* We will never have more than page_count pages */ - z->zbuff_len = page_count * qemu_target_page_size(); - /* We know compression "could" use more space */ - z->zbuff_len *= 2; + /* To be safe, we reserve twice the size of the packet */ + z->zbuff_len = MULTIFD_PACKET_SIZE * 2; z->zbuff = g_try_malloc(z->zbuff_len); if (!z->zbuff) { ZSTD_freeDStream(z->zds); g_free(z); - error_setg(errp, "multifd %d: out of memory for zbuff", p->id); + error_setg(errp, "multifd %u: out of memory for zbuff", p->id); return -1; } return 0; @@ -235,18 +223,18 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) */ static void zstd_recv_cleanup(MultiFDRecvParams *p) { - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; ZSTD_freeDStream(z->zds); z->zds = NULL; g_free(z->zbuff); z->zbuff = NULL; - g_free(p->data); - p->data = NULL; + g_free(p->compress_data); + p->compress_data = NULL; } /** - * zstd_recv_pages: read the data from the channel into actual pages + * zstd_recv: read the data from the channel into actual pages * * Read the compressed buffer, and uncompress it into the actual * pages. @@ -254,24 +242,31 @@ static void zstd_recv_cleanup(MultiFDRecvParams *p) * Returns 0 for success or -1 for error * * @p: Params for the channel that we are using - * @used: number of pages used * @errp: pointer to an error */ -static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) +static int zstd_recv(MultiFDRecvParams *p, Error **errp) { uint32_t in_size = p->next_packet_size; uint32_t out_size = 0; - uint32_t expected_size = used * qemu_target_page_size(); + uint32_t expected_size = p->normal_num * p->page_size; uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; int ret; int i; if (flags != MULTIFD_FLAG_ZSTD) { - error_setg(errp, "multifd %d: flags received %x flags expected %x", + error_setg(errp, "multifd %u: flags received %x flags expected %x", p->id, flags, MULTIFD_FLAG_ZSTD); return -1; } + + multifd_recv_zero_page_process(p); + + if (!p->normal_num) { + assert(in_size == 0); + return 0; + } + ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp); if (ret != 0) { @@ -282,11 +277,9 @@ static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) z->in.size = in_size; z->in.pos = 0; - for (i = 0; i < used; i++) { - struct iovec *iov = &p->pages->iov[i]; - - z->out.dst = iov->iov_base; - z->out.size = iov->iov_len; + for (i = 0; i < p->normal_num; i++) { + z->out.dst = p->host + p->normal[i]; + z->out.size = p->page_size; z->out.pos = 0; /* @@ -300,21 +293,21 @@ static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) do { ret = ZSTD_decompressStream(z->zds, &z->out, &z->in); } while (ret > 0 && (z->in.size - z->in.pos > 0) - && (z->out.pos < iov->iov_len)); - if (ret > 0 && (z->out.pos < iov->iov_len)) { - error_setg(errp, "multifd %d: decompressStream buffer too small", + && (z->out.pos < p->page_size)); + if (ret > 0 && (z->out.pos < p->page_size)) { + error_setg(errp, "multifd %u: decompressStream buffer too small", p->id); return -1; } if (ZSTD_isError(ret)) { - error_setg(errp, "multifd %d: decompressStream returned %s", + error_setg(errp, "multifd %u: decompressStream returned %s", p->id, ZSTD_getErrorName(ret)); return ret; } out_size += z->out.pos; } if (out_size != expected_size) { - error_setg(errp, "multifd %d: packet size received %d size expected %d", + error_setg(errp, "multifd %u: packet size received %u size expected %u", p->id, out_size, expected_size); return -1; } @@ -325,10 +318,9 @@ static MultiFDMethods multifd_zstd_ops = { .send_setup = zstd_send_setup, .send_cleanup = zstd_send_cleanup, .send_prepare = zstd_send_prepare, - .send_write = zstd_send_write, .recv_setup = zstd_recv_setup, .recv_cleanup = zstd_recv_cleanup, - .recv_pages = zstd_recv_pages + .recv = zstd_recv }; static void multifd_zstd_register(void) |