aboutsummaryrefslogtreecommitdiff
path: root/migration/rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/rdma.c')
-rw-r--r--migration/rdma.c1774
1 files changed, 881 insertions, 893 deletions
diff --git a/migration/rdma.c b/migration/rdma.c
index 5c2d113aa9..855753c671 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -17,11 +17,12 @@
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "qemu/cutils.h"
+#include "exec/target_page.h"
#include "rdma.h"
#include "migration.h"
+#include "migration-stats.h"
#include "qemu-file.h"
#include "ram.h"
-#include "qemu-file-channel.h"
#include "qemu/error-report.h"
#include "qemu/main-loop.h"
#include "qemu/module.h"
@@ -36,19 +37,9 @@
#include <rdma/rdma_cma.h>
#include "trace.h"
#include "qom/object.h"
+#include "options.h"
#include <poll.h>
-/*
- * Print and error on both the Monitor and the Log file.
- */
-#define ERROR(errp, fmt, ...) \
- do { \
- fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
- if (errp && (*(errp) == NULL)) { \
- error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
- } \
- } while (0)
-
#define RDMA_RESOLVE_TIMEOUT_MS 10000
/* Do not merge data if larger than this. */
@@ -83,18 +74,6 @@
*/
static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
-#define CHECK_ERROR_STATE() \
- do { \
- if (rdma->error_state) { \
- if (!rdma->error_reported) { \
- error_report("RDMA is in an error state waiting migration" \
- " to abort!"); \
- rdma->error_reported = 1; \
- } \
- return rdma->error_state; \
- } \
- } while (0)
-
/*
* A work request ID is 64-bits and we split up these bits
* into 3 parts:
@@ -131,13 +110,6 @@ enum {
RDMA_WRID_RECV_CONTROL = 4000,
};
-static const char *wrid_desc[] = {
- [RDMA_WRID_NONE] = "NONE",
- [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
- [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
- [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
-};
-
/*
* Work request IDs for IB SEND messages only (not RDMA writes).
* This is used by the migration protocol to transmit
@@ -266,6 +238,7 @@ static const char *control_desc(unsigned int rdma_control)
return strs[rdma_control];
}
+#if !defined(htonll)
static uint64_t htonll(uint64_t v)
{
union { uint32_t lv[2]; uint64_t llv; } u;
@@ -273,13 +246,16 @@ static uint64_t htonll(uint64_t v)
u.lv[1] = htonl(v & 0xFFFFFFFFULL);
return u.llv;
}
+#endif
+#if !defined(ntohll)
static uint64_t ntohll(uint64_t v)
{
union { uint32_t lv[2]; uint64_t llv; } u;
u.llv = v;
return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
}
+#endif
static void dest_block_to_network(RDMADestBlock *db)
{
@@ -317,7 +293,6 @@ typedef struct RDMALocalBlocks {
typedef struct RDMAContext {
char *host;
int port;
- char *host_port;
RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
@@ -358,18 +333,20 @@ typedef struct RDMAContext {
struct ibv_context *verbs;
struct rdma_event_channel *channel;
struct ibv_qp *qp; /* queue pair */
- struct ibv_comp_channel *comp_channel; /* completion channel */
+ struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */
+ struct ibv_comp_channel *send_comp_channel; /* send completion channel */
struct ibv_pd *pd; /* protection domain */
- struct ibv_cq *cq; /* completion queue */
+ struct ibv_cq *recv_cq; /* recvieve completion queue */
+ struct ibv_cq *send_cq; /* send completion queue */
/*
* If a previous write failed (perhaps because of a failed
* memory registration, then do not attempt any future work
* and remember the error state.
*/
- int error_state;
- int error_reported;
- int received_error;
+ bool errored;
+ bool error_reported;
+ bool received_error;
/*
* Description of ram blocks used throughout the code.
@@ -454,6 +431,16 @@ typedef struct QEMU_PACKED {
uint64_t chunks; /* how many sequential chunks to register */
} RDMARegister;
+static bool rdma_errored(RDMAContext *rdma)
+{
+ if (rdma->errored && !rdma->error_reported) {
+ error_report("RDMA is in an error state waiting migration"
+ " to abort!");
+ rdma->error_reported = true;
+ }
+ return rdma->errored;
+}
+
static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
{
RDMALocalBlock *local_block;
@@ -531,11 +518,12 @@ static void network_to_result(RDMARegisterResult *result)
result->host_addr = ntohll(result->host_addr);
};
-const char *print_wrid(int wrid);
static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
uint8_t *data, RDMAControlHeader *resp,
int *resp_idx,
- int (*callback)(RDMAContext *rdma));
+ int (*callback)(RDMAContext *rdma,
+ Error **errp),
+ Error **errp);
static inline uint64_t ram_chunk_index(const uint8_t *start,
const uint8_t *host)
@@ -563,9 +551,9 @@ static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
return result;
}
-static int rdma_add_block(RDMAContext *rdma, const char *block_name,
- void *host_addr,
- ram_addr_t block_offset, uint64_t length)
+static void rdma_add_block(RDMAContext *rdma, const char *block_name,
+ void *host_addr,
+ ram_addr_t block_offset, uint64_t length)
{
RDMALocalBlocks *local = &rdma->local_ram_blocks;
RDMALocalBlock *block;
@@ -574,10 +562,8 @@ static int rdma_add_block(RDMAContext *rdma, const char *block_name,
local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
if (local->nb_blocks) {
- int x;
-
if (rdma->blockmap) {
- for (x = 0; x < local->nb_blocks; x++) {
+ for (int x = 0; x < local->nb_blocks; x++) {
g_hash_table_remove(rdma->blockmap,
(void *)(uintptr_t)old[x].offset);
g_hash_table_insert(rdma->blockmap,
@@ -619,8 +605,6 @@ static int rdma_add_block(RDMAContext *rdma, const char *block_name,
block->nb_chunks);
local->nb_blocks++;
-
- return 0;
}
/*
@@ -634,7 +618,8 @@ static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
void *host_addr = qemu_ram_get_host_addr(rb);
ram_addr_t block_offset = qemu_ram_get_offset(rb);
ram_addr_t length = qemu_ram_get_used_length(rb);
- return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
+ rdma_add_block(opaque, block_name, host_addr, block_offset, length);
+ return 0;
}
/*
@@ -642,7 +627,7 @@ static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
* identify chunk boundaries inside each RAMBlock and also be referenced
* during dynamic page registration.
*/
-static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
+static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
{
RDMALocalBlocks *local = &rdma->local_ram_blocks;
int ret;
@@ -650,33 +635,27 @@ static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
assert(rdma->blockmap == NULL);
memset(local, 0, sizeof *local);
ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
- if (ret) {
- return ret;
- }
+ assert(!ret);
trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
rdma->dest_blocks = g_new0(RDMADestBlock,
rdma->local_ram_blocks.nb_blocks);
local->init = true;
- return 0;
}
/*
* Note: If used outside of cleanup, the caller must ensure that the destination
* block structures are also updated
*/
-static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
+static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
{
RDMALocalBlocks *local = &rdma->local_ram_blocks;
RDMALocalBlock *old = local->block;
- int x;
if (rdma->blockmap) {
g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
}
if (block->pmr) {
- int j;
-
- for (j = 0; j < block->nb_chunks; j++) {
+ for (int j = 0; j < block->nb_chunks; j++) {
if (!block->pmr[j]) {
continue;
}
@@ -706,7 +685,7 @@ static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
block->block_name = NULL;
if (rdma->blockmap) {
- for (x = 0; x < local->nb_blocks; x++) {
+ for (int x = 0; x < local->nb_blocks; x++) {
g_hash_table_remove(rdma->blockmap,
(void *)(uintptr_t)old[x].offset);
}
@@ -724,7 +703,7 @@ static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
memcpy(local->block + block->index, old + (block->index + 1),
sizeof(RDMALocalBlock) *
(local->nb_blocks - (block->index + 1)));
- for (x = block->index; x < local->nb_blocks - 1; x++) {
+ for (int x = block->index; x < local->nb_blocks - 1; x++) {
local->block[x].index--;
}
}
@@ -744,49 +723,40 @@ static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
local->nb_blocks--;
if (local->nb_blocks && rdma->blockmap) {
- for (x = 0; x < local->nb_blocks; x++) {
+ for (int x = 0; x < local->nb_blocks; x++) {
g_hash_table_insert(rdma->blockmap,
(void *)(uintptr_t)local->block[x].offset,
&local->block[x]);
}
}
-
- return 0;
}
/*
- * Put in the log file which RDMA device was opened and the details
- * associated with that device.
+ * Trace RDMA device open, with device details.
*/
static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
{
struct ibv_port_attr port;
if (ibv_query_port(verbs, 1, &port)) {
- error_report("Failed to query port information");
+ trace_qemu_rdma_dump_id_failed(who);
return;
}
- printf("%s RDMA Device opened: kernel name %s "
- "uverbs device name %s, "
- "infiniband_verbs class device path %s, "
- "infiniband class device path %s, "
- "transport: (%d) %s\n",
- who,
+ trace_qemu_rdma_dump_id(who,
verbs->device->name,
verbs->device->dev_name,
verbs->device->dev_path,
verbs->device->ibdev_path,
port.link_layer,
- (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
- ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
- ? "Ethernet" : "Unknown"));
+ port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband"
+ : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet"
+ : "Unknown");
}
/*
- * Put in the log file the RDMA gid addressing information,
- * useful for folks who have trouble understanding the
- * RDMA device hierarchy in the kernel.
+ * Trace RDMA gid addressing information.
+ * Useful for understanding the RDMA device hierarchy in the kernel.
*/
static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
{
@@ -856,25 +826,34 @@ static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
* Otherwise, there are no guarantees until the bug is fixed in linux.
*/
if (!verbs) {
- int num_devices, x;
+ int num_devices;
struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
bool roce_found = false;
bool ib_found = false;
- for (x = 0; x < num_devices; x++) {
+ for (int x = 0; x < num_devices; x++) {
verbs = ibv_open_device(dev_list[x]);
+ /*
+ * ibv_open_device() is not documented to set errno. If
+ * it does, it's somebody else's doc bug. If it doesn't,
+ * the use of errno below is wrong.
+ * TODO Find out whether ibv_open_device() sets errno.
+ */
if (!verbs) {
if (errno == EPERM) {
continue;
} else {
- return -EINVAL;
+ error_setg_errno(errp, errno,
+ "could not open RDMA device context");
+ return -1;
}
}
if (ibv_query_port(verbs, 1, &port_attr)) {
ibv_close_device(verbs);
- ERROR(errp, "Could not query initial IB port");
- return -EINVAL;
+ error_setg(errp,
+ "RDMA ERROR: Could not query initial IB port");
+ return -1;
}
if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
@@ -889,17 +868,18 @@ static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
if (roce_found) {
if (ib_found) {
- fprintf(stderr, "WARN: migrations may fail:"
- " IPv6 over RoCE / iWARP in linux"
- " is broken. But since you appear to have a"
- " mixed RoCE / IB environment, be sure to only"
- " migrate over the IB fabric until the kernel "
- " fixes the bug.\n");
+ warn_report("migrations may fail:"
+ " IPv6 over RoCE / iWARP in linux"
+ " is broken. But since you appear to have a"
+ " mixed RoCE / IB environment, be sure to only"
+ " migrate over the IB fabric until the kernel "
+ " fixes the bug.");
} else {
- ERROR(errp, "You only have RoCE / iWARP devices in your systems"
- " and your management software has specified '[::]'"
- ", but IPv6 over RoCE / iWARP is not supported in Linux.");
- return -ENONET;
+ error_setg(errp, "RDMA ERROR: "
+ "You only have RoCE / iWARP devices in your systems"
+ " and your management software has specified '[::]'"
+ ", but IPv6 over RoCE / iWARP is not supported in Linux.");
+ return -1;
}
}
@@ -914,14 +894,15 @@ static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
/* IB ports start with 1, not 0 */
if (ibv_query_port(verbs, 1, &port_attr)) {
- ERROR(errp, "Could not query initial IB port");
- return -EINVAL;
+ error_setg(errp, "RDMA ERROR: Could not query initial IB port");
+ return -1;
}
if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
- ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
- "(but patches on linux-rdma in progress)");
- return -ENONET;
+ error_setg(errp, "RDMA ERROR: "
+ "Linux kernel's RoCE / iWARP does not support IPv6 "
+ "(but patches on linux-rdma in progress)");
+ return -1;
}
#endif
@@ -936,29 +917,29 @@ static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
*/
static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
{
+ Error *err = NULL;
int ret;
struct rdma_addrinfo *res;
char port_str[16];
struct rdma_cm_event *cm_event;
char ip[40] = "unknown";
- struct rdma_addrinfo *e;
if (rdma->host == NULL || !strcmp(rdma->host, "")) {
- ERROR(errp, "RDMA hostname has not been set");
- return -EINVAL;
+ error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
+ return -1;
}
/* create CM channel */
rdma->channel = rdma_create_event_channel();
if (!rdma->channel) {
- ERROR(errp, "could not create CM channel");
- return -EINVAL;
+ error_setg(errp, "RDMA ERROR: could not create CM channel");
+ return -1;
}
/* create CM id */
ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
- if (ret) {
- ERROR(errp, "could not create channel id");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: could not create channel id");
goto err_resolve_create_id;
}
@@ -966,31 +947,42 @@ static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
port_str[15] = '\0';
ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
- if (ret < 0) {
- ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
+ if (ret) {
+ error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
+ rdma->host);
goto err_resolve_get_addr;
}
- for (e = res; e != NULL; e = e->ai_next) {
+ /* Try all addresses, saving the first error in @err */
+ for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) {
+ Error **local_errp = err ? NULL : &err;
+
inet_ntop(e->ai_family,
&((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
RDMA_RESOLVE_TIMEOUT_MS);
- if (!ret) {
+ if (ret >= 0) {
if (e->ai_family == AF_INET6) {
- ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
- if (ret) {
+ ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs,
+ local_errp);
+ if (ret < 0) {
continue;
}
}
+ error_free(err);
goto route;
}
}
rdma_freeaddrinfo(res);
- ERROR(errp, "could not resolve address %s", rdma->host);
+ if (err) {
+ error_propagate(errp, err);
+ } else {
+ error_setg(errp, "RDMA ERROR: could not resolve address %s",
+ rdma->host);
+ }
goto err_resolve_get_addr;
route:
@@ -998,38 +990,37 @@ route:
qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (ret) {
- ERROR(errp, "could not perform event_addr_resolved");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
goto err_resolve_get_addr;
}
if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
- ERROR(errp, "result not equal to event_addr_resolved %s",
- rdma_event_str(cm_event->event));
- error_report("rdma_resolve_addr");
+ error_setg(errp,
+ "RDMA ERROR: result not equal to event_addr_resolved %s",
+ rdma_event_str(cm_event->event));
rdma_ack_cm_event(cm_event);
- ret = -EINVAL;
goto err_resolve_get_addr;
}
rdma_ack_cm_event(cm_event);
/* resolve route */
ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
- if (ret) {
- ERROR(errp, "could not resolve rdma route");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: could not resolve rdma route");
goto err_resolve_get_addr;
}
ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (ret) {
- ERROR(errp, "could not perform event_route_resolved");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
goto err_resolve_get_addr;
}
if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
- ERROR(errp, "result not equal to event_route_resolved: %s",
- rdma_event_str(cm_event->event));
+ error_setg(errp, "RDMA ERROR: "
+ "result not equal to event_route_resolved: %s",
+ rdma_event_str(cm_event->event));
rdma_ack_cm_event(cm_event);
- ret = -EINVAL;
goto err_resolve_get_addr;
}
rdma_ack_cm_event(cm_event);
@@ -1044,36 +1035,49 @@ err_resolve_get_addr:
err_resolve_create_id:
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
- return ret;
+ return -1;
}
/*
* Create protection domain and completion queues
*/
-static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
{
/* allocate pd */
rdma->pd = ibv_alloc_pd(rdma->verbs);
if (!rdma->pd) {
- error_report("failed to allocate protection domain");
+ error_setg(errp, "failed to allocate protection domain");
return -1;
}
- /* create completion channel */
- rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
- if (!rdma->comp_channel) {
- error_report("failed to allocate completion channel");
+ /* create receive completion channel */
+ rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
+ if (!rdma->recv_comp_channel) {
+ error_setg(errp, "failed to allocate receive completion channel");
goto err_alloc_pd_cq;
}
/*
- * Completion queue can be filled by both read and write work requests,
- * so must reflect the sum of both possible queue sizes.
+ * Completion queue can be filled by read work requests.
*/
- rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
- NULL, rdma->comp_channel, 0);
- if (!rdma->cq) {
- error_report("failed to allocate completion queue");
+ rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
+ NULL, rdma->recv_comp_channel, 0);
+ if (!rdma->recv_cq) {
+ error_setg(errp, "failed to allocate receive completion queue");
+ goto err_alloc_pd_cq;
+ }
+
+ /* create send completion channel */
+ rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
+ if (!rdma->send_comp_channel) {
+ error_setg(errp, "failed to allocate send completion channel");
+ goto err_alloc_pd_cq;
+ }
+
+ rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
+ NULL, rdma->send_comp_channel, 0);
+ if (!rdma->send_cq) {
+ error_setg(errp, "failed to allocate send completion queue");
goto err_alloc_pd_cq;
}
@@ -1083,11 +1087,19 @@ err_alloc_pd_cq:
if (rdma->pd) {
ibv_dealloc_pd(rdma->pd);
}
- if (rdma->comp_channel) {
- ibv_destroy_comp_channel(rdma->comp_channel);
+ if (rdma->recv_comp_channel) {
+ ibv_destroy_comp_channel(rdma->recv_comp_channel);
+ }
+ if (rdma->send_comp_channel) {
+ ibv_destroy_comp_channel(rdma->send_comp_channel);
+ }
+ if (rdma->recv_cq) {
+ ibv_destroy_cq(rdma->recv_cq);
+ rdma->recv_cq = NULL;
}
rdma->pd = NULL;
- rdma->comp_channel = NULL;
+ rdma->recv_comp_channel = NULL;
+ rdma->send_comp_channel = NULL;
return -1;
}
@@ -1098,18 +1110,16 @@ err_alloc_pd_cq:
static int qemu_rdma_alloc_qp(RDMAContext *rdma)
{
struct ibv_qp_init_attr attr = { 0 };
- int ret;
attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
attr.cap.max_recv_wr = 3;
attr.cap.max_send_sge = 1;
attr.cap.max_recv_sge = 1;
- attr.send_cq = rdma->cq;
- attr.recv_cq = rdma->cq;
+ attr.send_cq = rdma->send_cq;
+ attr.recv_cq = rdma->recv_cq;
attr.qp_type = IBV_QPT_RC;
- ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
- if (ret) {
+ if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) {
return -1;
}
@@ -1117,30 +1127,94 @@ static int qemu_rdma_alloc_qp(RDMAContext *rdma)
return 0;
}
-static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
+/* Check whether On-Demand Paging is supported by RDAM device */
+static bool rdma_support_odp(struct ibv_context *dev)
+{
+ struct ibv_device_attr_ex attr = {0};
+
+ if (ibv_query_device_ex(dev, NULL, &attr)) {
+ return false;
+ }
+
+ if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * ibv_advise_mr to avoid RNR NAK error as far as possible.
+ * The responder mr registering with ODP will sent RNR NAK back to
+ * the requester in the face of the page fault.
+ */
+static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
+ uint32_t len, uint32_t lkey,
+ const char *name, bool wr)
+{
+#ifdef HAVE_IBV_ADVISE_MR
+ int ret;
+ int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
+ IBV_ADVISE_MR_ADVICE_PREFETCH;
+ struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
+
+ ret = ibv_advise_mr(pd, advice,
+ IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
+ /* ignore the error */
+ trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
+#endif
+}
+
+static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
{
int i;
RDMALocalBlocks *local = &rdma->local_ram_blocks;
for (i = 0; i < local->nb_blocks; i++) {
+ int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
+
local->block[i].mr =
ibv_reg_mr(rdma->pd,
local->block[i].local_host_addr,
- local->block[i].length,
- IBV_ACCESS_LOCAL_WRITE |
- IBV_ACCESS_REMOTE_WRITE
+ local->block[i].length, access
);
+ /*
+ * ibv_reg_mr() is not documented to set errno. If it does,
+ * it's somebody else's doc bug. If it doesn't, the use of
+ * errno below is wrong.
+ * TODO Find out whether ibv_reg_mr() sets errno.
+ */
+ if (!local->block[i].mr &&
+ errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
+ access |= IBV_ACCESS_ON_DEMAND;
+ /* register ODP mr */
+ local->block[i].mr =
+ ibv_reg_mr(rdma->pd,
+ local->block[i].local_host_addr,
+ local->block[i].length, access);
+ trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
+
+ if (local->block[i].mr) {
+ qemu_rdma_advise_prefetch_mr(rdma->pd,
+ (uintptr_t)local->block[i].local_host_addr,
+ local->block[i].length,
+ local->block[i].mr->lkey,
+ local->block[i].block_name,
+ true);
+ }
+ }
+
if (!local->block[i].mr) {
- perror("Failed to register local dest ram block!");
- break;
+ error_setg_errno(errp, errno,
+ "Failed to register local dest ram block!");
+ goto err;
}
rdma->total_registrations++;
}
- if (i >= local->nb_blocks) {
- return 0;
- }
+ return 0;
+err:
for (i--; i >= 0; i--) {
ibv_dereg_mr(local->block[i].mr);
local->block[i].mr = NULL;
@@ -1157,15 +1231,13 @@ static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
*
* Once the block is found, also identify which 'chunk' within that
* block that the page belongs to.
- *
- * This search cannot fail or the migration will fail.
*/
-static int qemu_rdma_search_ram_block(RDMAContext *rdma,
- uintptr_t block_offset,
- uint64_t offset,
- uint64_t length,
- uint64_t *block_index,
- uint64_t *chunk_index)
+static void qemu_rdma_search_ram_block(RDMAContext *rdma,
+ uintptr_t block_offset,
+ uint64_t offset,
+ uint64_t length,
+ uint64_t *block_index,
+ uint64_t *chunk_index)
{
uint64_t current_addr = block_offset + offset;
RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
@@ -1177,8 +1249,6 @@ static int qemu_rdma_search_ram_block(RDMAContext *rdma,
*block_index = block->index;
*chunk_index = ram_chunk_index(block->local_host_addr,
block->local_host_addr + (current_addr - block->offset));
-
- return 0;
}
/*
@@ -1215,28 +1285,37 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
*/
if (!block->pmr[chunk]) {
uint64_t len = chunk_end - chunk_start;
+ int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
+ 0;
trace_qemu_rdma_register_and_get_keys(len, chunk_start);
- block->pmr[chunk] = ibv_reg_mr(rdma->pd,
- chunk_start, len,
- (rkey ? (IBV_ACCESS_LOCAL_WRITE |
- IBV_ACCESS_REMOTE_WRITE) : 0));
-
- if (!block->pmr[chunk]) {
- perror("Failed to register chunk!");
- fprintf(stderr, "Chunk details: block: %d chunk index %d"
- " start %" PRIuPTR " end %" PRIuPTR
- " host %" PRIuPTR
- " local %" PRIuPTR " registrations: %d\n",
- block->index, chunk, (uintptr_t)chunk_start,
- (uintptr_t)chunk_end, host_addr,
- (uintptr_t)block->local_host_addr,
- rdma->total_registrations);
- return -1;
+ block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
+ /*
+ * ibv_reg_mr() is not documented to set errno. If it does,
+ * it's somebody else's doc bug. If it doesn't, the use of
+ * errno below is wrong.
+ * TODO Find out whether ibv_reg_mr() sets errno.
+ */
+ if (!block->pmr[chunk] &&
+ errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
+ access |= IBV_ACCESS_ON_DEMAND;
+ /* register ODP mr */
+ block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
+ trace_qemu_rdma_register_odp_mr(block->block_name);
+
+ if (block->pmr[chunk]) {
+ qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
+ len, block->pmr[chunk]->lkey,
+ block->block_name, rkey);
+
+ }
}
- rdma->total_registrations++;
}
+ if (!block->pmr[chunk]) {
+ return -1;
+ }
+ rdma->total_registrations++;
if (lkey) {
*lkey = block->pmr[chunk]->lkey;
@@ -1260,42 +1339,9 @@ static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
rdma->total_registrations++;
return 0;
}
- error_report("qemu_rdma_reg_control failed");
return -1;
}
-const char *print_wrid(int wrid)
-{
- if (wrid >= RDMA_WRID_RECV_CONTROL) {
- return wrid_desc[RDMA_WRID_RECV_CONTROL];
- }
- return wrid_desc[wrid];
-}
-
-/*
- * RDMA requires memory registration (mlock/pinning), but this is not good for
- * overcommitment.
- *
- * In preparation for the future where LRU information or workload-specific
- * writable writable working set memory access behavior is available to QEMU
- * it would be nice to have in place the ability to UN-register/UN-pin
- * particular memory regions from the RDMA hardware when it is determine that
- * those regions of memory will likely not be accessed again in the near future.
- *
- * While we do not yet have such information right now, the following
- * compile-time option allows us to perform a non-optimized version of this
- * behavior.
- *
- * By uncommenting this option, you will cause *all* RDMA transfers to be
- * unregistered immediately after the transfer completes on both sides of the
- * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
- *
- * This will have a terrible impact on migration performance, so until future
- * workload information or LRU information is available, do not attempt to use
- * this feature except for basic testing.
- */
-/* #define RDMA_UNREGISTRATION_EXAMPLE */
-
/*
* Perform a non-optimized memory unregistration after every transfer
* for demonstration purposes, only if pin-all is not requested.
@@ -1309,6 +1355,8 @@ const char *print_wrid(int wrid)
*/
static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
{
+ Error *err = NULL;
+
while (rdma->unregistrations[rdma->unregister_current]) {
int ret;
uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
@@ -1358,17 +1406,19 @@ static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
block->remote_keys[chunk] = 0;
if (ret != 0) {
- perror("unregistration chunk failed");
- return -ret;
+ error_report("unregistration chunk failed: %s",
+ strerror(ret));
+ return -1;
}
rdma->total_registrations--;
reg.key.chunk = chunk;
register_to_network(rdma, &reg);
ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
- &resp, NULL, NULL);
+ &resp, NULL, NULL, &err);
if (ret < 0) {
- return ret;
+ error_report_err(err);
+ return -1;
}
trace_qemu_rdma_unregister_waiting_complete(chunk);
@@ -1389,46 +1439,18 @@ static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
}
/*
- * Set bit for unregistration in the next iteration.
- * We cannot transmit right here, but will unpin later.
- */
-static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
- uint64_t chunk, uint64_t wr_id)
-{
- if (rdma->unregistrations[rdma->unregister_next] != 0) {
- error_report("rdma migration: queue is full");
- } else {
- RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
-
- if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
- trace_qemu_rdma_signal_unregister_append(chunk,
- rdma->unregister_next);
-
- rdma->unregistrations[rdma->unregister_next++] =
- qemu_rdma_make_wrid(wr_id, index, chunk);
-
- if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
- rdma->unregister_next = 0;
- }
- } else {
- trace_qemu_rdma_signal_unregister_already(chunk);
- }
- }
-}
-
-/*
* Consult the connection manager to see a work request
* (of any kind) has completed.
* Return the work request ID that completed.
*/
-static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
- uint32_t *byte_len)
+static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
+ uint64_t *wr_id_out, uint32_t *byte_len)
{
int ret;
struct ibv_wc wc;
uint64_t wr_id;
- ret = ibv_poll_cq(rdma->cq, 1, &wc);
+ ret = ibv_poll_cq(cq, 1, &wc);
if (!ret) {
*wr_id_out = RDMA_WRID_NONE;
@@ -1436,24 +1458,19 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
}
if (ret < 0) {
- error_report("ibv_poll_cq return %d", ret);
- return ret;
+ return -1;
}
wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
if (wc.status != IBV_WC_SUCCESS) {
- fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
- wc.status, ibv_wc_status_str(wc.status));
- fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
-
return -1;
}
if (rdma->control_ready_expected &&
(wr_id >= RDMA_WRID_RECV_CONTROL)) {
- trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
- wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
+ trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
+ rdma->nb_sent);
rdma->control_ready_expected = 0;
}
@@ -1464,7 +1481,7 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
(wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
- trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
+ trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
index, chunk, block->local_host_addr,
(void *)(uintptr_t)block->remote_host_addr);
@@ -1473,20 +1490,8 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
if (rdma->nb_sent > 0) {
rdma->nb_sent--;
}
-
- if (!rdma->pin_all) {
- /*
- * FYI: If one wanted to signal a specific chunk to be unregistered
- * using LRU or workload-specific information, this is the function
- * you would call to do so. That chunk would then get asynchronously
- * unregistered later.
- */
-#ifdef RDMA_UNREGISTRATION_EXAMPLE
- qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
-#endif
- }
} else {
- trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
+ trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
}
*wr_id_out = wc.wr_id;
@@ -1500,10 +1505,10 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
/* Wait for activity on the completion channel.
* Returns 0 on success, none-0 on error.
*/
-static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
+ struct ibv_comp_channel *comp_channel)
{
struct rdma_cm_event *cm_event;
- int ret = -1;
/*
* Coroutine doesn't start until migration_fd_process_incoming()
@@ -1511,7 +1516,7 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
*/
if (rdma->migration_started_on_destination &&
migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
- yield_until_fd_readable(rdma->comp_channel->fd);
+ yield_until_fd_readable(comp_channel->fd);
} else {
/* This is the source side, we're in a separate thread
* or destination prior to migration_fd_process_incoming()
@@ -1520,9 +1525,9 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
* But we need to be able to handle 'cancel' or an error
* without hanging forever.
*/
- while (!rdma->error_state && !rdma->received_error) {
+ while (!rdma->errored && !rdma->received_error) {
GPollFD pfds[2];
- pfds[0].fd = rdma->comp_channel->fd;
+ pfds[0].fd = comp_channel->fd;
pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
pfds[0].revents = 0;
@@ -1539,19 +1544,14 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
}
if (pfds[1].revents) {
- ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (ret) {
- error_report("failed to get cm event while wait "
- "completion channel");
- return -EPIPE;
+ if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
+ return -1;
}
- error_report("receive cm event while wait comp channel,"
- "cm event is %d", cm_event->event);
if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
rdma_ack_cm_event(cm_event);
- return -EPIPE;
+ return -1;
}
rdma_ack_cm_event(cm_event);
}
@@ -1563,21 +1563,31 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
default: /* Error of some type -
* I don't trust errno from qemu_poll_ns
*/
- error_report("%s: poll failed", __func__);
- return -EPIPE;
+ return -1;
}
if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
/* Bail out and let the cancellation happen */
- return -EPIPE;
+ return -1;
}
}
}
if (rdma->received_error) {
- return -EPIPE;
+ return -1;
}
- return rdma->error_state;
+ return -rdma->errored;
+}
+
+static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
+{
+ return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
+ rdma->recv_comp_channel;
+}
+
+static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
+{
+ return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
}
/*
@@ -1593,22 +1603,25 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
* completions only need to be recorded, but do not actually
* need further processing.
*/
-static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
+ uint64_t wrid_requested,
uint32_t *byte_len)
{
- int num_cq_events = 0, ret = 0;
+ int num_cq_events = 0, ret;
struct ibv_cq *cq;
void *cq_ctx;
uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
+ struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
+ struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
- if (ibv_req_notify_cq(rdma->cq, 0)) {
+ if (ibv_req_notify_cq(poll_cq, 0)) {
return -1;
}
/* poll cq first */
while (wr_id != wrid_requested) {
- ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+ ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
if (ret < 0) {
- return ret;
+ return -1;
}
wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
@@ -1617,8 +1630,7 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
break;
}
if (wr_id != wrid_requested) {
- trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
- wrid_requested, print_wrid(wr_id), wr_id);
+ trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
}
}
@@ -1627,26 +1639,24 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
}
while (1) {
- ret = qemu_rdma_wait_comp_channel(rdma);
- if (ret) {
+ ret = qemu_rdma_wait_comp_channel(rdma, ch);
+ if (ret < 0) {
goto err_block_for_wrid;
}
- ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
- if (ret) {
- perror("ibv_get_cq_event");
+ ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
+ if (ret < 0) {
goto err_block_for_wrid;
}
num_cq_events++;
- ret = -ibv_req_notify_cq(cq, 0);
- if (ret) {
+ if (ibv_req_notify_cq(cq, 0)) {
goto err_block_for_wrid;
}
while (wr_id != wrid_requested) {
- ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+ ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
if (ret < 0) {
goto err_block_for_wrid;
}
@@ -1657,8 +1667,7 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
break;
}
if (wr_id != wrid_requested) {
- trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
- wrid_requested, print_wrid(wr_id), wr_id);
+ trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
}
}
@@ -1678,8 +1687,8 @@ err_block_for_wrid:
ibv_ack_cq_events(cq, num_cq_events);
}
- rdma->error_state = ret;
- return ret;
+ rdma->errored = true;
+ return -1;
}
/*
@@ -1687,9 +1696,10 @@ err_block_for_wrid:
* containing some data and block until the post completes.
*/
static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
- RDMAControlHeader *head)
+ RDMAControlHeader *head,
+ Error **errp)
{
- int ret = 0;
+ int ret;
RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
struct ibv_send_wr *bad_wr;
struct ibv_sge sge = {
@@ -1727,23 +1737,25 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
if (ret > 0) {
- error_report("Failed to use post IB SEND for control");
- return -ret;
+ error_setg(errp, "Failed to use post IB SEND for control");
+ return -1;
}
ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
if (ret < 0) {
- error_report("rdma migration: send polling control error");
+ error_setg(errp, "rdma migration: send polling control error");
+ return -1;
}
- return ret;
+ return 0;
}
/*
* Post a RECV work request in anticipation of some future receipt
* of data on the control channel.
*/
-static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
+static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
+ Error **errp)
{
struct ibv_recv_wr *bad_wr;
struct ibv_sge sge = {
@@ -1760,6 +1772,7 @@ static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
+ error_setg(errp, "error posting control recv");
return -1;
}
@@ -1770,15 +1783,16 @@ static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
* Block and wait for a RECV control channel message to arrive.
*/
static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
- RDMAControlHeader *head, int expecting, int idx)
+ RDMAControlHeader *head, uint32_t expecting, int idx,
+ Error **errp)
{
uint32_t byte_len;
int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
&byte_len);
if (ret < 0) {
- error_report("rdma migration: recv polling control error!");
- return ret;
+ error_setg(errp, "rdma migration: recv polling control error!");
+ return -1;
}
network_to_control((void *) rdma->wr_data[idx].control);
@@ -1790,22 +1804,23 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
head->type);
} else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
- error_report("Was expecting a %s (%d) control message"
+ error_setg(errp, "Was expecting a %s (%d) control message"
", but got: %s (%d), length: %d",
control_desc(expecting), expecting,
control_desc(head->type), head->type, head->len);
if (head->type == RDMA_CONTROL_ERROR) {
rdma->received_error = true;
}
- return -EIO;
+ return -1;
}
if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
- error_report("too long length: %d", head->len);
- return -EINVAL;
+ error_setg(errp, "too long length: %d", head->len);
+ return -1;
}
if (sizeof(*head) + head->len != byte_len) {
- error_report("Malformed length: %d byte_len %d", head->len, byte_len);
- return -EINVAL;
+ error_setg(errp, "Malformed length: %d byte_len %d",
+ head->len, byte_len);
+ return -1;
}
return 0;
@@ -1843,20 +1858,24 @@ static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
uint8_t *data, RDMAControlHeader *resp,
int *resp_idx,
- int (*callback)(RDMAContext *rdma))
+ int (*callback)(RDMAContext *rdma,
+ Error **errp),
+ Error **errp)
{
- int ret = 0;
+ int ret;
/*
* Wait until the dest is ready before attempting to deliver the message
* by waiting for a READY message.
*/
if (rdma->control_ready_expected) {
- RDMAControlHeader resp;
- ret = qemu_rdma_exchange_get_response(rdma,
- &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
+ RDMAControlHeader resp_ignored;
+
+ ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
+ RDMA_CONTROL_READY,
+ RDMA_WRID_READY, errp);
if (ret < 0) {
- return ret;
+ return -1;
}
}
@@ -1864,31 +1883,27 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
* If the user is expecting a response, post a WR in anticipation of it.
*/
if (resp) {
- ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
- if (ret) {
- error_report("rdma migration: error posting"
- " extra control recv for anticipated result!");
- return ret;
+ ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
+ if (ret < 0) {
+ return -1;
}
}
/*
* Post a WR to replace the one we just consumed for the READY message.
*/
- ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
- if (ret) {
- error_report("rdma migration: error posting first control recv!");
- return ret;
+ ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
+ if (ret < 0) {
+ return -1;
}
/*
* Deliver the control message that was requested.
*/
- ret = qemu_rdma_post_send_control(rdma, data, head);
+ ret = qemu_rdma_post_send_control(rdma, data, head, errp);
if (ret < 0) {
- error_report("Failed to send control buffer!");
- return ret;
+ return -1;
}
/*
@@ -1897,18 +1912,19 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
if (resp) {
if (callback) {
trace_qemu_rdma_exchange_send_issue_callback();
- ret = callback(rdma);
+ ret = callback(rdma, errp);
if (ret < 0) {
- return ret;
+ return -1;
}
}
trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
ret = qemu_rdma_exchange_get_response(rdma, resp,
- resp->type, RDMA_WRID_DATA);
+ resp->type, RDMA_WRID_DATA,
+ errp);
if (ret < 0) {
- return ret;
+ return -1;
}
qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
@@ -1928,7 +1944,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
* control-channel message.
*/
static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
- int expecting)
+ uint32_t expecting, Error **errp)
{
RDMAControlHeader ready = {
.len = 0,
@@ -1940,21 +1956,20 @@ static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
/*
* Inform the source that we're ready to receive a message.
*/
- ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
+ ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
if (ret < 0) {
- error_report("Failed to send control buffer!");
- return ret;
+ return -1;
}
/*
* Block and wait for the message.
*/
ret = qemu_rdma_exchange_get_response(rdma, head,
- expecting, RDMA_WRID_READY);
+ expecting, RDMA_WRID_READY, errp);
if (ret < 0) {
- return ret;
+ return -1;
}
qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
@@ -1962,10 +1977,9 @@ static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
/*
* Post a new RECV work request to replace the one we just consumed.
*/
- ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
- if (ret) {
- error_report("rdma migration: error posting second control recv!");
- return ret;
+ ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
+ if (ret < 0) {
+ return -1;
}
return 0;
@@ -1977,9 +1991,9 @@ static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
* If we're using dynamic registration on the dest-side, we have to
* send a registration command first.
*/
-static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
+static int qemu_rdma_write_one(RDMAContext *rdma,
int current_index, uint64_t current_addr,
- uint64_t length)
+ uint64_t length, Error **errp)
{
struct ibv_sge sge;
struct ibv_send_wr send_wr = { 0 };
@@ -2025,11 +2039,6 @@ retry:
chunk_end = ram_chunk_end(block, chunk + chunks);
- if (!rdma->pin_all) {
-#ifdef RDMA_UNREGISTRATION_EXAMPLE
- qemu_rdma_unregister_waiting(rdma);
-#endif
- }
while (test_bit(chunk, block->transit_bitmap)) {
(void)count;
@@ -2039,11 +2048,11 @@ retry:
ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
if (ret < 0) {
- error_report("Failed to Wait for previous write to complete "
+ error_setg(errp, "Failed to Wait for previous write to complete "
"block %d chunk %" PRIu64
" current %" PRIu64 " len %" PRIu64 " %d",
current_index, chunk, sge.addr, length, rdma->nb_sent);
- return ret;
+ return -1;
}
}
@@ -2071,14 +2080,24 @@ retry:
compress_to_network(rdma, &comp);
ret = qemu_rdma_exchange_send(rdma, &head,
- (uint8_t *) &comp, NULL, NULL, NULL);
+ (uint8_t *) &comp, NULL, NULL, NULL, errp);
if (ret < 0) {
- return -EIO;
+ return -1;
}
- acct_update_position(f, sge.length, true);
-
+ /*
+ * TODO: Here we are sending something, but we are not
+ * accounting for anything transferred. The following is wrong:
+ *
+ * stat64_add(&mig_stats.rdma_bytes, sge.length);
+ *
+ * because we are using some kind of compression. I
+ * would think that head.len would be the more similar
+ * thing to a correct value.
+ */
+ stat64_add(&mig_stats.zero_pages,
+ sge.length / qemu_target_page_size());
return 1;
}
@@ -2098,17 +2117,17 @@ retry:
register_to_network(rdma, &reg);
ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
- &resp, &reg_result_idx, NULL);
+ &resp, &reg_result_idx, NULL, errp);
if (ret < 0) {
- return ret;
+ return -1;
}
/* try to overlap this single registration with the one we sent. */
if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
&sge.lkey, NULL, chunk,
chunk_start, chunk_end)) {
- error_report("cannot get lkey");
- return -EINVAL;
+ error_setg(errp, "cannot get lkey");
+ return -1;
}
reg_result = (RDMARegisterResult *)
@@ -2126,8 +2145,8 @@ retry:
if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
&sge.lkey, NULL, chunk,
chunk_start, chunk_end)) {
- error_report("cannot get lkey!");
- return -EINVAL;
+ error_setg(errp, "cannot get lkey!");
+ return -1;
}
}
@@ -2138,8 +2157,8 @@ retry:
if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
&sge.lkey, NULL, chunk,
chunk_start, chunk_end)) {
- error_report("cannot get lkey!");
- return -EINVAL;
+ error_setg(errp, "cannot get lkey!");
+ return -1;
}
}
@@ -2172,20 +2191,32 @@ retry:
trace_qemu_rdma_write_one_queue_full();
ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
if (ret < 0) {
- error_report("rdma migration: failed to make "
- "room in full send queue! %d", ret);
- return ret;
+ error_setg(errp, "rdma migration: failed to make "
+ "room in full send queue!");
+ return -1;
}
goto retry;
} else if (ret > 0) {
- perror("rdma migration: post rdma write failed");
- return -ret;
+ error_setg_errno(errp, ret,
+ "rdma migration: post rdma write failed");
+ return -1;
}
set_bit(chunk, block->transit_bitmap);
- acct_update_position(f, sge.length, false);
+ stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
+ /*
+ * We are adding to transferred the amount of data written, but no
+ * overhead at all. I will assume that RDMA is magicaly and don't
+ * need to transfer (at least) the addresses where it wants to
+ * write the pages. Here it looks like it should be something
+ * like:
+ * sizeof(send_wr) + sge.length
+ * but this being RDMA, who knows.
+ */
+ stat64_add(&mig_stats.rdma_bytes, sge.length);
+ ram_transferred_add(sge.length);
rdma->total_writes++;
return 0;
@@ -2197,7 +2228,7 @@ retry:
* We support sending out multiple chunks at the same time.
* Not all of them need to get signaled in the completion queue.
*/
-static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
+static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
{
int ret;
@@ -2205,11 +2236,11 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
return 0;
}
- ret = qemu_rdma_write_one(f, rdma,
- rdma->current_index, rdma->current_addr, rdma->current_length);
+ ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
+ rdma->current_length, errp);
if (ret < 0) {
- return ret;
+ return -1;
}
if (ret == 0) {
@@ -2223,7 +2254,7 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
return 0;
}
-static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
+static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
uint64_t offset, uint64_t len)
{
RDMALocalBlock *block;
@@ -2231,11 +2262,11 @@ static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
uint8_t *chunk_end;
if (rdma->current_index < 0) {
- return 0;
+ return false;
}
if (rdma->current_chunk < 0) {
- return 0;
+ return false;
}
block = &(rdma->local_ram_blocks.block[rdma->current_index]);
@@ -2243,29 +2274,29 @@ static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
chunk_end = ram_chunk_end(block, rdma->current_chunk);
if (rdma->current_length == 0) {
- return 0;
+ return false;
}
/*
* Only merge into chunk sequentially.
*/
if (offset != (rdma->current_addr + rdma->current_length)) {
- return 0;
+ return false;
}
if (offset < block->offset) {
- return 0;
+ return false;
}
if ((offset + len) > (block->offset + block->length)) {
- return 0;
+ return false;
}
if ((host_addr + len) > chunk_end) {
- return 0;
+ return false;
}
- return 1;
+ return true;
}
/*
@@ -2278,30 +2309,24 @@ static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
* and only require that a batch gets acknowledged in the completion
* queue instead of each individual chunk.
*/
-static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
+static int qemu_rdma_write(RDMAContext *rdma,
uint64_t block_offset, uint64_t offset,
- uint64_t len)
+ uint64_t len, Error **errp)
{
uint64_t current_addr = block_offset + offset;
uint64_t index = rdma->current_index;
uint64_t chunk = rdma->current_chunk;
- int ret;
/* If we cannot merge it, we flush the current buffer first. */
- if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
- ret = qemu_rdma_write_flush(f, rdma);
- if (ret) {
- return ret;
+ if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
+ if (qemu_rdma_write_flush(rdma, errp) < 0) {
+ return -1;
}
rdma->current_length = 0;
rdma->current_addr = current_addr;
- ret = qemu_rdma_search_ram_block(rdma, block_offset,
- offset, len, &index, &chunk);
- if (ret) {
- error_report("ram block search failed");
- return ret;
- }
+ qemu_rdma_search_ram_block(rdma, block_offset,
+ offset, len, &index, &chunk);
rdma->current_index = index;
rdma->current_chunk = chunk;
}
@@ -2311,7 +2336,7 @@ static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
/* flush it if buffer is too large */
if (rdma->current_length >= RDMA_MERGE_MAX) {
- return qemu_rdma_write_flush(f, rdma);
+ return qemu_rdma_write_flush(rdma, errp);
}
return 0;
@@ -2319,18 +2344,20 @@ static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
static void qemu_rdma_cleanup(RDMAContext *rdma)
{
- int idx;
+ Error *err = NULL;
if (rdma->cm_id && rdma->connected) {
- if ((rdma->error_state ||
+ if ((rdma->errored ||
migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
!rdma->received_error) {
RDMAControlHeader head = { .len = 0,
.type = RDMA_CONTROL_ERROR,
.repeat = 1,
};
- error_report("Early error. Sending error.");
- qemu_rdma_post_send_control(rdma, NULL, &head);
+ warn_report("Early error. Sending error.");
+ if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
+ warn_report_err(err);
+ }
}
rdma_disconnect(rdma->cm_id);
@@ -2344,12 +2371,12 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
g_free(rdma->dest_blocks);
rdma->dest_blocks = NULL;
- for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
- if (rdma->wr_data[idx].control_mr) {
+ for (int i = 0; i < RDMA_WRID_MAX; i++) {
+ if (rdma->wr_data[i].control_mr) {
rdma->total_registrations--;
- ibv_dereg_mr(rdma->wr_data[idx].control_mr);
+ ibv_dereg_mr(rdma->wr_data[i].control_mr);
}
- rdma->wr_data[idx].control_mr = NULL;
+ rdma->wr_data[i].control_mr = NULL;
}
if (rdma->local_ram_blocks.block) {
@@ -2362,13 +2389,21 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
rdma_destroy_qp(rdma->cm_id);
rdma->qp = NULL;
}
- if (rdma->cq) {
- ibv_destroy_cq(rdma->cq);
- rdma->cq = NULL;
+ if (rdma->recv_cq) {
+ ibv_destroy_cq(rdma->recv_cq);
+ rdma->recv_cq = NULL;
}
- if (rdma->comp_channel) {
- ibv_destroy_comp_channel(rdma->comp_channel);
- rdma->comp_channel = NULL;
+ if (rdma->send_cq) {
+ ibv_destroy_cq(rdma->send_cq);
+ rdma->send_cq = NULL;
+ }
+ if (rdma->recv_comp_channel) {
+ ibv_destroy_comp_channel(rdma->recv_comp_channel);
+ rdma->recv_comp_channel = NULL;
+ }
+ if (rdma->send_comp_channel) {
+ ibv_destroy_comp_channel(rdma->send_comp_channel);
+ rdma->send_comp_channel = NULL;
}
if (rdma->pd) {
ibv_dealloc_pd(rdma->pd);
@@ -2399,16 +2434,13 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
rdma->channel = NULL;
}
g_free(rdma->host);
- g_free(rdma->host_port);
rdma->host = NULL;
- rdma->host_port = NULL;
}
static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
{
- int ret, idx;
- Error *local_err = NULL, **temp = &local_err;
+ int ret;
/*
* Will be validated against destination's actual capabilities
@@ -2416,44 +2448,37 @@ static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
*/
rdma->pin_all = pin_all;
- ret = qemu_rdma_resolve_host(rdma, temp);
- if (ret) {
+ ret = qemu_rdma_resolve_host(rdma, errp);
+ if (ret < 0) {
goto err_rdma_source_init;
}
- ret = qemu_rdma_alloc_pd_cq(rdma);
- if (ret) {
- ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
- " limits may be too low. Please check $ ulimit -a # and "
- "search for 'ulimit -l' in the output");
+ ret = qemu_rdma_alloc_pd_cq(rdma, errp);
+ if (ret < 0) {
goto err_rdma_source_init;
}
ret = qemu_rdma_alloc_qp(rdma);
- if (ret) {
- ERROR(temp, "rdma migration: error allocating qp!");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
goto err_rdma_source_init;
}
- ret = qemu_rdma_init_ram_blocks(rdma);
- if (ret) {
- ERROR(temp, "rdma migration: error initializing ram blocks!");
- goto err_rdma_source_init;
- }
+ qemu_rdma_init_ram_blocks(rdma);
/* Build the hash that maps from offset to RAMBlock */
rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
- for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
+ for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) {
g_hash_table_insert(rdma->blockmap,
- (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
- &rdma->local_ram_blocks.block[idx]);
+ (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset,
+ &rdma->local_ram_blocks.block[i]);
}
- for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
- ret = qemu_rdma_reg_control(rdma, idx);
- if (ret) {
- ERROR(temp, "rdma migration: error registering %d control!",
- idx);
+ for (int i = 0; i < RDMA_WRID_MAX; i++) {
+ ret = qemu_rdma_reg_control(rdma, i);
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: rdma migration: error "
+ "registering %d control!", i);
goto err_rdma_source_init;
}
}
@@ -2461,7 +2486,6 @@ static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
return 0;
err_rdma_source_init:
- error_propagate(errp, local_err);
qemu_rdma_cleanup(rdma);
return -1;
}
@@ -2482,20 +2506,27 @@ static int qemu_get_cm_event_timeout(RDMAContext *rdma,
} while (ret < 0 && errno == EINTR);
if (ret == 0) {
- ERROR(errp, "poll cm event timeout");
+ error_setg(errp, "RDMA ERROR: poll cm event timeout");
return -1;
} else if (ret < 0) {
- ERROR(errp, "failed to poll cm event, errno=%i", errno);
+ error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
+ errno);
return -1;
} else if (poll_fd.revents & POLLIN) {
- return rdma_get_cm_event(rdma->channel, cm_event);
+ if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
+ error_setg(errp, "RDMA ERROR: failed to get cm event");
+ return -1;
+ }
+ return 0;
} else {
- ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
+ error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
+ poll_fd.revents);
return -1;
}
}
-static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
+static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
+ Error **errp)
{
RDMACapabilities cap = {
.version = RDMA_CONTROL_VERSION_CURRENT,
@@ -2520,16 +2551,15 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
caps_to_network(&cap);
- ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
- if (ret) {
- ERROR(errp, "posting second control recv");
+ ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
+ if (ret < 0) {
goto err_rdma_source_connect;
}
ret = rdma_connect(rdma->cm_id, &conn_param);
- if (ret) {
- perror("rdma_connect");
- ERROR(errp, "connecting to destination!");
+ if (ret < 0) {
+ error_setg_errno(errp, errno,
+ "RDMA ERROR: connecting to destination!");
goto err_rdma_source_connect;
}
@@ -2537,16 +2567,17 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
} else {
ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (ret < 0) {
+ error_setg_errno(errp, errno,
+ "RDMA ERROR: failed to get cm event");
+ }
}
- if (ret) {
- perror("rdma_get_cm_event after rdma_connect");
- ERROR(errp, "connecting to destination!");
+ if (ret < 0) {
goto err_rdma_source_connect;
}
if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
- error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
- ERROR(errp, "connecting to destination!");
+ error_setg(errp, "RDMA ERROR: connecting to destination!");
rdma_ack_cm_event(cm_event);
goto err_rdma_source_connect;
}
@@ -2560,8 +2591,8 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
* and disable them otherwise.
*/
if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
- ERROR(errp, "Server cannot support pinning all memory. "
- "Will register memory dynamically.");
+ warn_report("RDMA: Server cannot support pinning all memory. "
+ "Will register memory dynamically.");
rdma->pin_all = false;
}
@@ -2580,34 +2611,36 @@ err_rdma_source_connect:
static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
{
- int ret, idx;
+ Error *err = NULL;
+ int ret;
struct rdma_cm_id *listen_id;
char ip[40] = "unknown";
struct rdma_addrinfo *res, *e;
char port_str[16];
+ int reuse = 1;
- for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
- rdma->wr_data[idx].control_len = 0;
- rdma->wr_data[idx].control_curr = NULL;
+ for (int i = 0; i < RDMA_WRID_MAX; i++) {
+ rdma->wr_data[i].control_len = 0;
+ rdma->wr_data[i].control_curr = NULL;
}
if (!rdma->host || !rdma->host[0]) {
- ERROR(errp, "RDMA host is not set!");
- rdma->error_state = -EINVAL;
+ error_setg(errp, "RDMA ERROR: RDMA host is not set!");
+ rdma->errored = true;
return -1;
}
/* create CM channel */
rdma->channel = rdma_create_event_channel();
if (!rdma->channel) {
- ERROR(errp, "could not create rdma event channel");
- rdma->error_state = -EINVAL;
+ error_setg(errp, "RDMA ERROR: could not create rdma event channel");
+ rdma->errored = true;
return -1;
}
/* create CM id */
ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
- if (ret) {
- ERROR(errp, "could not create cm_id!");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: could not create cm_id!");
goto err_dest_init_create_listen_id;
}
@@ -2615,31 +2648,48 @@ static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
port_str[15] = '\0';
ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
+ if (ret) {
+ error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
+ rdma->host);
+ goto err_dest_init_bind_addr;
+ }
+
+ ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
+ &reuse, sizeof reuse);
if (ret < 0) {
- ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
+ error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
goto err_dest_init_bind_addr;
}
+ /* Try all addresses, saving the first error in @err */
for (e = res; e != NULL; e = e->ai_next) {
+ Error **local_errp = err ? NULL : &err;
+
inet_ntop(e->ai_family,
&((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
trace_qemu_rdma_dest_init_trying(rdma->host, ip);
ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
- if (ret) {
+ if (ret < 0) {
continue;
}
if (e->ai_family == AF_INET6) {
- ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
- if (ret) {
+ ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs,
+ local_errp);
+ if (ret < 0) {
continue;
}
}
+ error_free(err);
break;
}
rdma_freeaddrinfo(res);
if (!e) {
- ERROR(errp, "Error: could not rdma_bind_addr!");
+ if (err) {
+ error_propagate(errp, err);
+ } else {
+ error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
+ }
goto err_dest_init_bind_addr;
}
@@ -2652,19 +2702,17 @@ err_dest_init_bind_addr:
err_dest_init_create_listen_id:
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
- rdma->error_state = ret;
- return ret;
+ rdma->errored = true;
+ return -1;
}
static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
RDMAContext *rdma)
{
- int idx;
-
- for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
- rdma_return_path->wr_data[idx].control_len = 0;
- rdma_return_path->wr_data[idx].control_curr = NULL;
+ for (int i = 0; i < RDMA_WRID_MAX; i++) {
+ rdma_return_path->wr_data[i].control_len = 0;
+ rdma_return_path->wr_data[i].control_curr = NULL;
}
/*the CM channel and CM id is shared*/
@@ -2676,30 +2724,16 @@ static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
rdma_return_path->is_return_path = true;
}
-static void *qemu_rdma_data_init(const char *host_port, Error **errp)
+static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp)
{
RDMAContext *rdma = NULL;
- InetSocketAddress *addr;
-
- if (host_port) {
- rdma = g_new0(RDMAContext, 1);
- rdma->current_index = -1;
- rdma->current_chunk = -1;
-
- addr = g_new(InetSocketAddress, 1);
- if (!inet_parse(addr, host_port, NULL)) {
- rdma->port = atoi(addr->port);
- rdma->host = g_strdup(addr->host);
- rdma->host_port = g_strdup(host_port);
- } else {
- ERROR(errp, "bad RDMA migration address '%s'", host_port);
- g_free(rdma);
- rdma = NULL;
- }
- qapi_free_InetSocketAddress(addr);
- }
+ rdma = g_new0(RDMAContext, 1);
+ rdma->current_index = -1;
+ rdma->current_chunk = -1;
+ rdma->host = g_strdup(saddr->host);
+ rdma->port = atoi(saddr->port);
return rdma;
}
@@ -2713,40 +2747,44 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
size_t niov,
int *fds,
size_t nfds,
+ int flags,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
- QEMUFile *f = rioc->file;
RDMAContext *rdma;
int ret;
ssize_t done = 0;
- size_t i;
- size_t len = 0;
+ size_t len;
RCU_READ_LOCK_GUARD();
rdma = qatomic_rcu_read(&rioc->rdmaout);
if (!rdma) {
- return -EIO;
+ error_setg(errp, "RDMA control channel output is not set");
+ return -1;
}
- CHECK_ERROR_STATE();
+ if (rdma->errored) {
+ error_setg(errp,
+ "RDMA is in an error state waiting migration to abort!");
+ return -1;
+ }
/*
* Push out any writes that
* we're queued up for VM's ram.
*/
- ret = qemu_rdma_write_flush(f, rdma);
+ ret = qemu_rdma_write_flush(rdma, errp);
if (ret < 0) {
- rdma->error_state = ret;
- return ret;
+ rdma->errored = true;
+ return -1;
}
- for (i = 0; i < niov; i++) {
+ for (int i = 0; i < niov; i++) {
size_t remaining = iov[i].iov_len;
uint8_t * data = (void *)iov[i].iov_base;
while (remaining) {
- RDMAControlHeader head;
+ RDMAControlHeader head = {};
len = MIN(remaining, RDMA_SEND_INCREMENT);
remaining -= len;
@@ -2754,11 +2792,12 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
head.len = len;
head.type = RDMA_CONTROL_QEMU_FILE;
- ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+ ret = qemu_rdma_exchange_send(rdma, &head,
+ data, NULL, NULL, NULL, errp);
if (ret < 0) {
- rdma->error_state = ret;
- return ret;
+ rdma->errored = true;
+ return -1;
}
data += len;
@@ -2796,25 +2835,31 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
+ int flags,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
RDMAContext *rdma;
RDMAControlHeader head;
- int ret = 0;
- ssize_t i;
- size_t done = 0;
+ int ret;
+ ssize_t done = 0;
+ size_t len;
RCU_READ_LOCK_GUARD();
rdma = qatomic_rcu_read(&rioc->rdmain);
if (!rdma) {
- return -EIO;
+ error_setg(errp, "RDMA control channel input is not set");
+ return -1;
}
- CHECK_ERROR_STATE();
+ if (rdma->errored) {
+ error_setg(errp,
+ "RDMA is in an error state waiting migration to abort!");
+ return -1;
+ }
- for (i = 0; i < niov; i++) {
+ for (int i = 0; i < niov; i++) {
size_t want = iov[i].iov_len;
uint8_t *data = (void *)iov[i].iov_base;
@@ -2823,9 +2868,9 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
* were given and dish out the bytes until we run
* out of bytes.
*/
- ret = qemu_rdma_fill(rdma, data, want, 0);
- done += ret;
- want -= ret;
+ len = qemu_rdma_fill(rdma, data, want, 0);
+ done += len;
+ want -= len;
/* Got what we needed, so go to next iovec */
if (want == 0) {
continue;
@@ -2841,19 +2886,20 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
/* We've got nothing at all, so lets wait for
* more to arrive
*/
- ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+ ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
+ errp);
if (ret < 0) {
- rdma->error_state = ret;
- return ret;
+ rdma->errored = true;
+ return -1;
}
/*
* SEND was received with new bytes, now try again.
*/
- ret = qemu_rdma_fill(rdma, data, want, 0);
- done += ret;
- want -= ret;
+ len = qemu_rdma_fill(rdma, data, want, 0);
+ done += len;
+ want -= len;
/* Still didn't get enough, so lets just return */
if (want) {
@@ -2870,19 +2916,19 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
/*
* Block until all the outstanding chunks have been delivered by the hardware.
*/
-static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
+static int qemu_rdma_drain_cq(RDMAContext *rdma)
{
- int ret;
+ Error *err = NULL;
- if (qemu_rdma_write_flush(f, rdma) < 0) {
- return -EIO;
+ if (qemu_rdma_write_flush(rdma, &err) < 0) {
+ error_report_err(err);
+ return -1;
}
while (rdma->nb_sent) {
- ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
- if (ret < 0) {
+ if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) {
error_report("rdma migration: complete polling error!");
- return -EIO;
+ return -1;
}
}
@@ -3006,7 +3052,7 @@ qio_channel_rdma_source_finalize(GSource *source)
object_unref(OBJECT(ssource->rioc));
}
-GSourceFuncs qio_channel_rdma_source_funcs = {
+static GSourceFuncs qio_channel_rdma_source_funcs = {
qio_channel_rdma_source_prepare,
qio_channel_rdma_source_check,
qio_channel_rdma_source_dispatch,
@@ -3033,18 +3079,23 @@ static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
}
static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
- IOHandler *io_read,
- IOHandler *io_write,
- void *opaque)
+ AioContext *read_ctx,
+ IOHandler *io_read,
+ AioContext *write_ctx,
+ IOHandler *io_write,
+ void *opaque)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
if (io_read) {
- aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
- false, io_read, io_write, NULL, opaque);
+ aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
+ aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
} else {
- aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
- false, io_read, io_write, NULL, opaque);
+ aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
+ aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
+ io_read, io_write, NULL, NULL, opaque);
}
}
@@ -3112,21 +3163,21 @@ qio_channel_rdma_shutdown(QIOChannel *ioc,
switch (how) {
case QIO_CHANNEL_SHUTDOWN_READ:
if (rdmain) {
- rdmain->error_state = -1;
+ rdmain->errored = true;
}
break;
case QIO_CHANNEL_SHUTDOWN_WRITE:
if (rdmaout) {
- rdmaout->error_state = -1;
+ rdmaout->errored = true;
}
break;
case QIO_CHANNEL_SHUTDOWN_BOTH:
default:
if (rdmain) {
- rdmain->error_state = -1;
+ rdmain->errored = true;
}
if (rdmaout) {
- rdmaout->error_state = -1;
+ rdmaout->errored = true;
}
break;
}
@@ -3146,33 +3197,17 @@ qio_channel_rdma_shutdown(QIOChannel *ioc,
* Offset is an offset to be added to block_offset and used
* to also lookup the corresponding RAMBlock.
*
- * @size > 0 :
- * Initiate an transfer this size.
+ * @size : Number of bytes to transfer
*
- * @size == 0 :
- * A 'hint' or 'advice' that means that we wish to speculatively
- * and asynchronously unregister this memory. In this case, there is no
- * guarantee that the unregister will actually happen, for example,
- * if the memory is being actively transmitted. Additionally, the memory
- * may be re-registered at any future time if a write within the same
- * chunk was requested again, even if you attempted to unregister it
- * here.
- *
- * @size < 0 : TODO, not yet supported
- * Unregister the memory NOW. This means that the caller does not
- * expect there to be any future RDMA transfers and we just want to clean
- * things up. This is used in case the upper layer owns the memory and
- * cannot wait for qemu_fclose() to occur.
- *
- * @bytes_sent : User-specificed pointer to indicate how many bytes were
+ * @pages_sent : User-specificed pointer to indicate how many pages were
* sent. Usually, this will not be more than a few bytes of
* the protocol because most transfers are sent asynchronously.
*/
-static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
- ram_addr_t block_offset, ram_addr_t offset,
- size_t size, uint64_t *bytes_sent)
+static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
+ ram_addr_t offset, size_t size)
{
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
+ Error *err = NULL;
RDMAContext *rdma;
int ret;
@@ -3180,72 +3215,24 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
rdma = qatomic_rcu_read(&rioc->rdmaout);
if (!rdma) {
- return -EIO;
+ return -1;
}
- CHECK_ERROR_STATE();
-
- if (migration_in_postcopy()) {
- return RAM_SAVE_CONTROL_NOT_SUPP;
+ if (rdma_errored(rdma)) {
+ return -1;
}
qemu_fflush(f);
- if (size > 0) {
- /*
- * Add this page to the current 'chunk'. If the chunk
- * is full, or the page doesn't belong to the current chunk,
- * an actual RDMA write will occur and a new chunk will be formed.
- */
- ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
- if (ret < 0) {
- error_report("rdma migration: write error! %d", ret);
- goto err;
- }
-
- /*
- * We always return 1 bytes because the RDMA
- * protocol is completely asynchronous. We do not yet know
- * whether an identified chunk is zero or not because we're
- * waiting for other pages to potentially be merged with
- * the current chunk. So, we have to call qemu_update_position()
- * later on when the actual write occurs.
- */
- if (bytes_sent) {
- *bytes_sent = 1;
- }
- } else {
- uint64_t index, chunk;
-
- /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
- if (size < 0) {
- ret = qemu_rdma_drain_cq(f, rdma);
- if (ret < 0) {
- fprintf(stderr, "rdma: failed to synchronously drain"
- " completion queue before unregistration.\n");
- goto err;
- }
- }
- */
-
- ret = qemu_rdma_search_ram_block(rdma, block_offset,
- offset, size, &index, &chunk);
-
- if (ret) {
- error_report("ram block search failed");
- goto err;
- }
-
- qemu_rdma_signal_unregister(rdma, index, chunk, 0);
-
- /*
- * TODO: Synchronous, guaranteed unregistration (should not occur during
- * fast-path). Otherwise, unregisters will process on the next call to
- * qemu_rdma_drain_cq()
- if (size < 0) {
- qemu_rdma_unregister_waiting(rdma);
- }
- */
+ /*
+ * Add this page to the current 'chunk'. If the chunk
+ * is full, or the page doesn't belong to the current chunk,
+ * an actual RDMA write will occur and a new chunk will be formed.
+ */
+ ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
+ if (ret < 0) {
+ error_report_err(err);
+ goto err;
}
/*
@@ -3257,9 +3244,26 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
*/
while (1) {
uint64_t wr_id, wr_id_in;
- int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
+ ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
+
if (ret < 0) {
- error_report("rdma migration: polling error! %d", ret);
+ error_report("rdma migration: polling error");
+ goto err;
+ }
+
+ wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+ if (wr_id == RDMA_WRID_NONE) {
+ break;
+ }
+ }
+
+ while (1) {
+ uint64_t wr_id, wr_id_in;
+ ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
+
+ if (ret < 0) {
+ error_report("rdma migration: polling error");
goto err;
}
@@ -3271,8 +3275,27 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
}
return RAM_SAVE_CONTROL_DELAYED;
+
err:
- rdma->error_state = ret;
+ rdma->errored = true;
+ return -1;
+}
+
+int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset,
+ ram_addr_t offset, size_t size)
+{
+ if (!migrate_rdma() || migration_in_postcopy()) {
+ return RAM_SAVE_CONTROL_NOT_SUPP;
+ }
+
+ int ret = qemu_rdma_save_page(f, block_offset, offset, size);
+
+ if (ret != RAM_SAVE_CONTROL_DELAYED &&
+ ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+ if (ret < 0) {
+ qemu_file_set_error(f, ret);
+ }
+ }
return ret;
}
@@ -3281,31 +3304,28 @@ static void rdma_accept_incoming_migration(void *opaque);
static void rdma_cm_poll_handler(void *opaque)
{
RDMAContext *rdma = opaque;
- int ret;
struct rdma_cm_event *cm_event;
MigrationIncomingState *mis = migration_incoming_get_current();
- ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (ret) {
+ if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
error_report("get_cm_event failed %d", errno);
return;
}
if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
- if (!rdma->error_state &&
+ if (!rdma->errored &&
migration_incoming_get_current()->state !=
MIGRATION_STATUS_COMPLETED) {
error_report("receive cm event, cm event is %d", cm_event->event);
- rdma->error_state = -EPIPE;
+ rdma->errored = true;
if (rdma->return_path) {
- rdma->return_path->error_state = -EPIPE;
+ rdma->return_path->errored = true;
}
}
rdma_ack_cm_event(cm_event);
-
- if (mis->migration_incoming_co) {
- qemu_coroutine_enter(mis->migration_incoming_co);
+ if (mis->loadvm_co) {
+ qemu_coroutine_enter(mis->loadvm_co);
}
return;
}
@@ -3314,6 +3334,7 @@ static void rdma_cm_poll_handler(void *opaque)
static int qemu_rdma_accept(RDMAContext *rdma)
{
+ Error *err = NULL;
RDMACapabilities cap;
struct rdma_conn_param conn_param = {
.responder_resources = 2,
@@ -3321,13 +3342,13 @@ static int qemu_rdma_accept(RDMAContext *rdma)
.private_data_len = sizeof(cap),
};
RDMAContext *rdma_return_path = NULL;
+ g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1);
struct rdma_cm_event *cm_event;
struct ibv_context *verbs;
- int ret = -EINVAL;
- int idx;
+ int ret;
ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (ret) {
+ if (ret < 0) {
goto err_rdma_dest_wait;
}
@@ -3336,12 +3357,16 @@ static int qemu_rdma_accept(RDMAContext *rdma)
goto err_rdma_dest_wait;
}
+ isock->host = g_strdup(rdma->host);
+ isock->port = g_strdup_printf("%d", rdma->port);
+
/*
* initialize the RDMAContext for return path for postcopy after first
* connection request reached.
*/
- if (migrate_postcopy() && !rdma->is_return_path) {
- rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
+ if ((migrate_postcopy() || migrate_return_path())
+ && !rdma->is_return_path) {
+ rdma_return_path = qemu_rdma_data_init(isock, NULL);
if (rdma_return_path == NULL) {
rdma_ack_cm_event(cm_event);
goto err_rdma_dest_wait;
@@ -3355,10 +3380,10 @@ static int qemu_rdma_accept(RDMAContext *rdma)
network_to_caps(&cap);
if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
- error_report("Unknown source RDMA version: %d, bailing...",
- cap.version);
- rdma_ack_cm_event(cm_event);
- goto err_rdma_dest_wait;
+ error_report("Unknown source RDMA version: %d, bailing...",
+ cap.version);
+ rdma_ack_cm_event(cm_event);
+ goto err_rdma_dest_wait;
}
/*
@@ -3388,41 +3413,38 @@ static int qemu_rdma_accept(RDMAContext *rdma)
if (!rdma->verbs) {
rdma->verbs = verbs;
} else if (rdma->verbs != verbs) {
- error_report("ibv context not matching %p, %p!", rdma->verbs,
- verbs);
- goto err_rdma_dest_wait;
+ error_report("ibv context not matching %p, %p!", rdma->verbs,
+ verbs);
+ goto err_rdma_dest_wait;
}
qemu_rdma_dump_id("dest_init", verbs);
- ret = qemu_rdma_alloc_pd_cq(rdma);
- if (ret) {
- error_report("rdma migration: error allocating pd and cq!");
+ ret = qemu_rdma_alloc_pd_cq(rdma, &err);
+ if (ret < 0) {
+ error_report_err(err);
goto err_rdma_dest_wait;
}
ret = qemu_rdma_alloc_qp(rdma);
- if (ret) {
+ if (ret < 0) {
error_report("rdma migration: error allocating qp!");
goto err_rdma_dest_wait;
}
- ret = qemu_rdma_init_ram_blocks(rdma);
- if (ret) {
- error_report("rdma migration: error initializing ram blocks!");
- goto err_rdma_dest_wait;
- }
+ qemu_rdma_init_ram_blocks(rdma);
- for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
- ret = qemu_rdma_reg_control(rdma, idx);
- if (ret) {
- error_report("rdma: error registering %d control", idx);
+ for (int i = 0; i < RDMA_WRID_MAX; i++) {
+ ret = qemu_rdma_reg_control(rdma, i);
+ if (ret < 0) {
+ error_report("rdma: error registering %d control", i);
goto err_rdma_dest_wait;
}
}
/* Accept the second connection request for return path */
- if (migrate_postcopy() && !rdma->is_return_path) {
+ if ((migrate_postcopy() || migrate_return_path())
+ && !rdma->is_return_path) {
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL,
(void *)(intptr_t)rdma->return_path);
@@ -3432,14 +3454,14 @@ static int qemu_rdma_accept(RDMAContext *rdma)
}
ret = rdma_accept(rdma->cm_id, &conn_param);
- if (ret) {
- error_report("rdma_accept returns %d", ret);
+ if (ret < 0) {
+ error_report("rdma_accept failed");
goto err_rdma_dest_wait;
}
ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (ret) {
- error_report("rdma_accept get_cm_event failed %d", ret);
+ if (ret < 0) {
+ error_report("rdma_accept get_cm_event failed");
goto err_rdma_dest_wait;
}
@@ -3452,9 +3474,9 @@ static int qemu_rdma_accept(RDMAContext *rdma)
rdma_ack_cm_event(cm_event);
rdma->connected = true;
- ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
- if (ret) {
- error_report("rdma migration: error posting second control recv");
+ ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
+ if (ret < 0) {
+ error_report_err(err);
goto err_rdma_dest_wait;
}
@@ -3463,10 +3485,10 @@ static int qemu_rdma_accept(RDMAContext *rdma)
return 0;
err_rdma_dest_wait:
- rdma->error_state = ret;
+ rdma->errored = true;
qemu_rdma_cleanup(rdma);
g_free(rdma_return_path);
- return ret;
+ return -1;
}
static int dest_ram_sort_func(const void *a, const void *b)
@@ -3486,7 +3508,7 @@ static int dest_ram_sort_func(const void *a, const void *b)
*
* Keep doing this until the source tells us to stop.
*/
-static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
+int rdma_registration_handle(QEMUFile *f)
{
RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
.type = RDMA_CONTROL_REGISTER_RESULT,
@@ -3498,7 +3520,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
};
RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
.repeat = 1 };
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ QIOChannelRDMA *rioc;
+ Error *err = NULL;
RDMAContext *rdma;
RDMALocalBlocks *local;
RDMAControlHeader head;
@@ -3508,34 +3531,39 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
RDMALocalBlock *block;
void *host_addr;
- int ret = 0;
+ int ret;
int idx = 0;
- int count = 0;
- int i = 0;
+
+ if (!migrate_rdma()) {
+ return 0;
+ }
RCU_READ_LOCK_GUARD();
+ rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
rdma = qatomic_rcu_read(&rioc->rdmain);
if (!rdma) {
- return -EIO;
+ return -1;
}
- CHECK_ERROR_STATE();
+ if (rdma_errored(rdma)) {
+ return -1;
+ }
local = &rdma->local_ram_blocks;
do {
- trace_qemu_rdma_registration_handle_wait();
+ trace_rdma_registration_handle_wait();
- ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
+ ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
if (ret < 0) {
+ error_report_err(err);
break;
}
if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
error_report("rdma: Too many requests in this message (%d)."
"Bailing.", head.repeat);
- ret = -EIO;
break;
}
@@ -3544,30 +3572,33 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
network_to_compress(comp);
- trace_qemu_rdma_registration_handle_compress(comp->length,
- comp->block_idx,
- comp->offset);
+ trace_rdma_registration_handle_compress(comp->length,
+ comp->block_idx,
+ comp->offset);
if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
error_report("rdma: 'compress' bad block index %u (vs %d)",
(unsigned int)comp->block_idx,
rdma->local_ram_blocks.nb_blocks);
- ret = -EIO;
- goto out;
+ goto err;
}
block = &(rdma->local_ram_blocks.block[comp->block_idx]);
host_addr = block->local_host_addr +
(comp->offset - block->offset);
-
- ram_handle_compressed(host_addr, comp->value, comp->length);
+ if (comp->value) {
+ error_report("rdma: Zero page with non-zero (%d) value",
+ comp->value);
+ goto err;
+ }
+ ram_handle_zero(host_addr, comp->length);
break;
case RDMA_CONTROL_REGISTER_FINISHED:
- trace_qemu_rdma_registration_handle_finished();
- goto out;
+ trace_rdma_registration_handle_finished();
+ return 0;
case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
- trace_qemu_rdma_registration_handle_ram_blocks();
+ trace_rdma_registration_handle_ram_blocks();
/* Sort our local RAM Block list so it's the same as the source,
* we can do this since we've filled in a src_index in the list
@@ -3576,16 +3607,15 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
qsort(rdma->local_ram_blocks.block,
rdma->local_ram_blocks.nb_blocks,
sizeof(RDMALocalBlock), dest_ram_sort_func);
- for (i = 0; i < local->nb_blocks; i++) {
+ for (int i = 0; i < local->nb_blocks; i++) {
local->block[i].index = i;
}
if (rdma->pin_all) {
- ret = qemu_rdma_reg_whole_ram_blocks(rdma);
- if (ret) {
- error_report("rdma migration: error dest "
- "registering ram blocks");
- goto out;
+ ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
+ if (ret < 0) {
+ error_report_err(err);
+ goto err;
}
}
@@ -3595,7 +3625,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
* Both sides use the "remote" structure to communicate and update
* their "local" descriptions with what was sent.
*/
- for (i = 0; i < local->nb_blocks; i++) {
+ for (int i = 0; i < local->nb_blocks; i++) {
rdma->dest_blocks[i].remote_host_addr =
(uintptr_t)(local->block[i].local_host_addr);
@@ -3607,7 +3637,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
rdma->dest_blocks[i].length = local->block[i].length;
dest_block_to_network(&rdma->dest_blocks[i]);
- trace_qemu_rdma_registration_handle_ram_blocks_loop(
+ trace_rdma_registration_handle_ram_blocks_loop(
local->block[i].block_name,
local->block[i].offset,
local->block[i].length,
@@ -3620,21 +3650,22 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
ret = qemu_rdma_post_send_control(rdma,
- (uint8_t *) rdma->dest_blocks, &blocks);
+ (uint8_t *) rdma->dest_blocks, &blocks,
+ &err);
if (ret < 0) {
- error_report("rdma migration: error sending remote info");
- goto out;
+ error_report_err(err);
+ goto err;
}
break;
case RDMA_CONTROL_REGISTER_REQUEST:
- trace_qemu_rdma_registration_handle_register(head.repeat);
+ trace_rdma_registration_handle_register(head.repeat);
reg_resp.repeat = head.repeat;
registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
- for (count = 0; count < head.repeat; count++) {
+ for (int count = 0; count < head.repeat; count++) {
uint64_t chunk;
uint8_t *chunk_start, *chunk_end;
@@ -3643,15 +3674,14 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
reg_result = &results[count];
- trace_qemu_rdma_registration_handle_register_loop(count,
+ trace_rdma_registration_handle_register_loop(count,
reg->current_index, reg->key.current_addr, reg->chunks);
if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
error_report("rdma: 'register' bad block index %u (vs %d)",
(unsigned int)reg->current_index,
rdma->local_ram_blocks.nb_blocks);
- ret = -ENOENT;
- goto out;
+ goto err;
}
block = &(rdma->local_ram_blocks.block[reg->current_index]);
if (block->is_ram_block) {
@@ -3660,8 +3690,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
" offset: %" PRIx64 " current_addr: %" PRIx64,
block->block_name, block->offset,
reg->key.current_addr);
- ret = -ERANGE;
- goto out;
+ goto err;
}
host_addr = (block->local_host_addr +
(reg->key.current_addr - block->offset));
@@ -3676,8 +3705,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
error_report("rdma: bad chunk for block %s"
" chunk: %" PRIx64,
block->block_name, reg->key.chunk);
- ret = -ERANGE;
- goto out;
+ goto err;
}
}
chunk_start = ram_chunk_start(block, chunk);
@@ -3688,37 +3716,35 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
(uintptr_t)host_addr, NULL, &tmp_rkey,
chunk, chunk_start, chunk_end)) {
error_report("cannot get rkey");
- ret = -EINVAL;
- goto out;
+ goto err;
}
reg_result->rkey = tmp_rkey;
reg_result->host_addr = (uintptr_t)block->local_host_addr;
- trace_qemu_rdma_registration_handle_register_rkey(
- reg_result->rkey);
+ trace_rdma_registration_handle_register_rkey(reg_result->rkey);
result_to_network(reg_result);
}
ret = qemu_rdma_post_send_control(rdma,
- (uint8_t *) results, &reg_resp);
+ (uint8_t *) results, &reg_resp, &err);
if (ret < 0) {
- error_report("Failed to send control buffer");
- goto out;
+ error_report_err(err);
+ goto err;
}
break;
case RDMA_CONTROL_UNREGISTER_REQUEST:
- trace_qemu_rdma_registration_handle_unregister(head.repeat);
+ trace_rdma_registration_handle_unregister(head.repeat);
unreg_resp.repeat = head.repeat;
registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
- for (count = 0; count < head.repeat; count++) {
+ for (int count = 0; count < head.repeat; count++) {
reg = &registers[count];
network_to_register(reg);
- trace_qemu_rdma_registration_handle_unregister_loop(count,
+ trace_rdma_registration_handle_unregister_loop(count,
reg->current_index, reg->key.chunk);
block = &(rdma->local_ram_blocks.block[reg->current_index]);
@@ -3727,60 +3753,58 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
block->pmr[reg->key.chunk] = NULL;
if (ret != 0) {
- perror("rdma unregistration chunk failed");
- ret = -ret;
- goto out;
+ error_report("rdma unregistration chunk failed: %s",
+ strerror(errno));
+ goto err;
}
rdma->total_registrations--;
- trace_qemu_rdma_registration_handle_unregister_success(
- reg->key.chunk);
+ trace_rdma_registration_handle_unregister_success(reg->key.chunk);
}
- ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
+ ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
if (ret < 0) {
- error_report("Failed to send control buffer");
- goto out;
+ error_report_err(err);
+ goto err;
}
break;
case RDMA_CONTROL_REGISTER_RESULT:
error_report("Invalid RESULT message at dest.");
- ret = -EIO;
- goto out;
+ goto err;
default:
error_report("Unknown control message %s", control_desc(head.type));
- ret = -EIO;
- goto out;
+ goto err;
}
} while (1);
-out:
- if (ret < 0) {
- rdma->error_state = ret;
- }
- return ret;
+
+err:
+ rdma->errored = true;
+ return -1;
}
/* Destination:
- * Called via a ram_control_load_hook during the initial RAM load section which
- * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks
- * on the source.
- * We've already built our local RAMBlock list, but not yet sent the list to
- * the source.
+ * Called during the initial RAM load section which lists the
+ * RAMBlocks by name. This lets us know the order of the RAMBlocks on
+ * the source. We've already built our local RAMBlock list, but not
+ * yet sent the list to the source.
*/
-static int
-rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
+int rdma_block_notification_handle(QEMUFile *f, const char *name)
{
- RDMAContext *rdma;
int curr;
int found = -1;
+ if (!migrate_rdma()) {
+ return 0;
+ }
+
RCU_READ_LOCK_GUARD();
- rdma = qatomic_rcu_read(&rioc->rdmain);
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
+ RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain);
if (!rdma) {
- return -EIO;
+ return -1;
}
/* Find the matching RAMBlock in our local list */
@@ -3793,7 +3817,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
if (found == -1) {
error_report("RAMBlock '%s' not found on destination", name);
- return -ENOENT;
+ return -1;
}
rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
@@ -3803,72 +3827,57 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
return 0;
}
-static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
+int rdma_registration_start(QEMUFile *f, uint64_t flags)
{
- switch (flags) {
- case RAM_CONTROL_BLOCK_REG:
- return rdma_block_notification_handle(opaque, data);
-
- case RAM_CONTROL_HOOK:
- return qemu_rdma_registration_handle(f, opaque);
-
- default:
- /* Shouldn't be called with any other values */
- abort();
+ if (!migrate_rdma() || migration_in_postcopy()) {
+ return 0;
}
-}
-
-static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
- uint64_t flags, void *data)
-{
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
RCU_READ_LOCK_GUARD();
- rdma = qatomic_rcu_read(&rioc->rdmaout);
+ RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout);
if (!rdma) {
- return -EIO;
+ return -1;
}
- CHECK_ERROR_STATE();
-
- if (migration_in_postcopy()) {
- return 0;
+ if (rdma_errored(rdma)) {
+ return -1;
}
- trace_qemu_rdma_registration_start(flags);
+ trace_rdma_registration_start(flags);
qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
- qemu_fflush(f);
-
- return 0;
+ return qemu_fflush(f);
}
/*
* Inform dest that dynamic registrations are done for now.
* First, flush writes, if any.
*/
-static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
- uint64_t flags, void *data)
+int rdma_registration_stop(QEMUFile *f, uint64_t flags)
{
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ QIOChannelRDMA *rioc;
+ Error *err = NULL;
RDMAContext *rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
- int ret = 0;
+ int ret;
+
+ if (!migrate_rdma() || migration_in_postcopy()) {
+ return 0;
+ }
RCU_READ_LOCK_GUARD();
+ rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
rdma = qatomic_rcu_read(&rioc->rdmaout);
if (!rdma) {
- return -EIO;
+ return -1;
}
- CHECK_ERROR_STATE();
-
- if (migration_in_postcopy()) {
- return 0;
+ if (rdma_errored(rdma)) {
+ return -1;
}
qemu_fflush(f);
- ret = qemu_rdma_drain_cq(f, rdma);
+ ret = qemu_rdma_drain_cq(rdma);
if (ret < 0) {
goto err;
@@ -3877,10 +3886,10 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
if (flags == RAM_CONTROL_SETUP) {
RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
RDMALocalBlocks *local = &rdma->local_ram_blocks;
- int reg_result_idx, i, nb_dest_blocks;
+ int reg_result_idx, nb_dest_blocks;
head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
- trace_qemu_rdma_registration_stop_ram();
+ trace_rdma_registration_stop_ram();
/*
* Make sure that we parallelize the pinning on both sides.
@@ -3892,10 +3901,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
*/
ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
&reg_result_idx, rdma->pin_all ?
- qemu_rdma_reg_whole_ram_blocks : NULL);
+ qemu_rdma_reg_whole_ram_blocks : NULL,
+ &err);
if (ret < 0) {
- fprintf(stderr, "receiving remote info!");
- return ret;
+ error_report_err(err);
+ return -1;
}
nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
@@ -3913,28 +3923,29 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
*/
if (local->nb_blocks != nb_dest_blocks) {
- fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
- "Your QEMU command line parameters are probably "
- "not identical on both the source and destination.",
- local->nb_blocks, nb_dest_blocks);
- rdma->error_state = -EINVAL;
- return -EINVAL;
+ error_report("ram blocks mismatch (Number of blocks %d vs %d)",
+ local->nb_blocks, nb_dest_blocks);
+ error_printf("Your QEMU command line parameters are probably "
+ "not identical on both the source and destination.");
+ rdma->errored = true;
+ return -1;
}
qemu_rdma_move_header(rdma, reg_result_idx, &resp);
memcpy(rdma->dest_blocks,
rdma->wr_data[reg_result_idx].control_curr, resp.len);
- for (i = 0; i < nb_dest_blocks; i++) {
+ for (int i = 0; i < nb_dest_blocks; i++) {
network_to_dest_block(&rdma->dest_blocks[i]);
/* We require that the blocks are in the same order */
if (rdma->dest_blocks[i].length != local->block[i].length) {
- fprintf(stderr, "Block %s/%d has a different length %" PRIu64
- "vs %" PRIu64, local->block[i].block_name, i,
- local->block[i].length,
- rdma->dest_blocks[i].length);
- rdma->error_state = -EINVAL;
- return -EINVAL;
+ error_report("Block %s/%d has a different length %" PRIu64
+ "vs %" PRIu64,
+ local->block[i].block_name, i,
+ local->block[i].length,
+ rdma->dest_blocks[i].length);
+ rdma->errored = true;
+ return -1;
}
local->block[i].remote_host_addr =
rdma->dest_blocks[i].remote_host_addr;
@@ -3942,32 +3953,22 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
}
}
- trace_qemu_rdma_registration_stop(flags);
+ trace_rdma_registration_stop(flags);
head.type = RDMA_CONTROL_REGISTER_FINISHED;
- ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
+ ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
if (ret < 0) {
+ error_report_err(err);
goto err;
}
return 0;
err:
- rdma->error_state = ret;
- return ret;
+ rdma->errored = true;
+ return -1;
}
-static const QEMUFileHooks rdma_read_hooks = {
- .hook_ram_load = rdma_load_hook,
-};
-
-static const QEMUFileHooks rdma_write_hooks = {
- .before_ram_iterate = qemu_rdma_registration_start,
- .after_ram_iterate = qemu_rdma_registration_stop,
- .save_page = qemu_rdma_save_page,
-};
-
-
static void qio_channel_rdma_finalize(Object *obj)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
@@ -4012,27 +4013,24 @@ static void qio_channel_rdma_register_types(void)
type_init(qio_channel_rdma_register_types);
-static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+static QEMUFile *rdma_new_input(RDMAContext *rdma)
{
- QIOChannelRDMA *rioc;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
- if (qemu_file_mode_is_not_valid(mode)) {
- return NULL;
- }
+ rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
+ rioc->rdmain = rdma;
+ rioc->rdmaout = rdma->return_path;
- rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+ return rioc->file;
+}
- if (mode[0] == 'w') {
- rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
- rioc->rdmaout = rdma;
- rioc->rdmain = rdma->return_path;
- qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
- } else {
- rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
- rioc->rdmain = rdma;
- rioc->rdmaout = rdma->return_path;
- qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
- }
+static QEMUFile *rdma_new_output(RDMAContext *rdma)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+
+ rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
+ rioc->rdmaout = rdma;
+ rioc->rdmain = rdma->return_path;
return rioc->file;
}
@@ -4040,15 +4038,11 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
static void rdma_accept_incoming_migration(void *opaque)
{
RDMAContext *rdma = opaque;
- int ret;
QEMUFile *f;
- Error *local_err = NULL;
trace_qemu_rdma_accept_incoming_migration();
- ret = qemu_rdma_accept(rdma);
-
- if (ret) {
- fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
+ if (qemu_rdma_accept(rdma) < 0) {
+ error_report("RDMA ERROR: Migration initialization failed");
return;
}
@@ -4058,25 +4052,23 @@ static void rdma_accept_incoming_migration(void *opaque)
return;
}
- f = qemu_fopen_rdma(rdma, "rb");
+ f = rdma_new_input(rdma);
if (f == NULL) {
- fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
+ error_report("RDMA ERROR: could not open RDMA for input");
qemu_rdma_cleanup(rdma);
return;
}
rdma->migration_started_on_destination = 1;
- migration_fd_process_incoming(f, &local_err);
- if (local_err) {
- error_reportf_err(local_err, "RDMA ERROR:");
- }
+ migration_fd_process_incoming(f);
}
-void rdma_start_incoming_migration(const char *host_port, Error **errp)
+void rdma_start_incoming_migration(InetSocketAddress *host_port,
+ Error **errp)
{
+ MigrationState *s = migrate_get_current();
int ret;
- RDMAContext *rdma, *rdma_return_path = NULL;
- Error *local_err = NULL;
+ RDMAContext *rdma;
trace_rdma_start_incoming_migration();
@@ -4086,14 +4078,13 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
return;
}
- rdma = qemu_rdma_data_init(host_port, &local_err);
+ rdma = qemu_rdma_data_init(host_port, errp);
if (rdma == NULL) {
goto err;
}
- ret = qemu_rdma_dest_init(rdma, &local_err);
-
- if (ret) {
+ ret = qemu_rdma_dest_init(rdma, errp);
+ if (ret < 0) {
goto err;
}
@@ -4101,13 +4092,13 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
ret = rdma_listen(rdma->listen_id, 5);
- if (ret) {
- ERROR(errp, "listening on socket!");
+ if (ret < 0) {
+ error_setg(errp, "RDMA ERROR: listening on socket!");
goto cleanup_rdma;
}
trace_rdma_start_incoming_migration_after_rdma_listen();
-
+ s->rdma_migration = true;
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL, (void *)(intptr_t)rdma);
return;
@@ -4115,22 +4106,19 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
cleanup_rdma:
qemu_rdma_cleanup(rdma);
err:
- error_propagate(errp, local_err);
if (rdma) {
g_free(rdma->host);
- g_free(rdma->host_port);
}
g_free(rdma);
- g_free(rdma_return_path);
}
void rdma_start_outgoing_migration(void *opaque,
- const char *host_port, Error **errp)
+ InetSocketAddress *host_port, Error **errp)
{
MigrationState *s = opaque;
RDMAContext *rdma_return_path = NULL;
RDMAContext *rdma;
- int ret = 0;
+ int ret;
/* Avoid ram_block_discard_disable(), cannot change during migration. */
if (ram_block_discard_is_required()) {
@@ -4143,22 +4131,21 @@ void rdma_start_outgoing_migration(void *opaque,
goto err;
}
- ret = qemu_rdma_source_init(rdma,
- s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
+ ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
- if (ret) {
+ if (ret < 0) {
goto err;
}
trace_rdma_start_outgoing_migration_after_rdma_source_init();
- ret = qemu_rdma_connect(rdma, errp, false);
+ ret = qemu_rdma_connect(rdma, false, errp);
- if (ret) {
+ if (ret < 0) {
goto err;
}
/* RDMA postcopy need a separate queue pair for return path */
- if (migrate_postcopy()) {
+ if (migrate_postcopy() || migrate_return_path()) {
rdma_return_path = qemu_rdma_data_init(host_port, errp);
if (rdma_return_path == NULL) {
@@ -4166,15 +4153,15 @@ void rdma_start_outgoing_migration(void *opaque,
}
ret = qemu_rdma_source_init(rdma_return_path,
- s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
+ migrate_rdma_pin_all(), errp);
- if (ret) {
+ if (ret < 0) {
goto return_path_err;
}
- ret = qemu_rdma_connect(rdma_return_path, errp, true);
+ ret = qemu_rdma_connect(rdma_return_path, true, errp);
- if (ret) {
+ if (ret < 0) {
goto return_path_err;
}
@@ -4185,7 +4172,8 @@ void rdma_start_outgoing_migration(void *opaque,
trace_rdma_start_outgoing_migration_after_rdma_connect();
- s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
+ s->to_dst_file = rdma_new_output(rdma);
+ s->rdma_migration = true;
migrate_fd_connect(s, NULL);
return;
return_path_err: