/* * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) * (a.k.a. Fault Tolerance or Continuous Replication) * * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. * Copyright (c) 2016 FUJITSU LIMITED * Copyright (c) 2016 Intel Corporation * * This work is licensed under the terms of the GNU GPL, version 2 or * later. See the COPYING file in the top-level directory. */ #include "qemu/osdep.h" #include "sysemu/sysemu.h" #include "migration/colo.h" #include "io/channel-buffer.h" #include "trace.h" #include "qemu/error-report.h" #include "qapi/error.h" #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) bool colo_supported(void) { return false; } bool migration_in_colo_state(void) { MigrationState *s = migrate_get_current(); return (s->state == MIGRATION_STATUS_COLO); } bool migration_incoming_in_colo_state(void) { MigrationIncomingState *mis = migration_incoming_get_current(); return mis && (mis->state == MIGRATION_STATUS_COLO); } static void colo_send_message(QEMUFile *f, COLOMessage msg, Error **errp) { int ret; if (msg >= COLO_MESSAGE__MAX) { error_setg(errp, "%s: Invalid message", __func__); return; } qemu_put_be32(f, msg); qemu_fflush(f); ret = qemu_file_get_error(f); if (ret < 0) { error_setg_errno(errp, -ret, "Can't send COLO message"); } trace_colo_send_message(COLOMessage_lookup[msg]); } static void colo_send_message_value(QEMUFile *f, COLOMessage msg, uint64_t value, Error **errp) { Error *local_err = NULL; int ret; colo_send_message(f, msg, &local_err); if (local_err) { error_propagate(errp, local_err); return; } qemu_put_be64(f, value); qemu_fflush(f); ret = qemu_file_get_error(f); if (ret < 0) { error_setg_errno(errp, -ret, "Failed to send value for message:%s", COLOMessage_lookup[msg]); } } static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) { COLOMessage msg; int ret; msg = qemu_get_be32(f); ret = qemu_file_get_error(f); if (ret < 0) { error_setg_errno(errp, -ret, "Can't receive COLO message"); return msg; } if (msg >= COLO_MESSAGE__MAX) { error_setg(errp, "%s: Invalid message", __func__); return msg; } trace_colo_receive_message(COLOMessage_lookup[msg]); return msg; } static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, Error **errp) { COLOMessage msg; Error *local_err = NULL; msg = colo_receive_message(f, &local_err); if (local_err) { error_propagate(errp, local_err); return; } if (msg != expect_msg) { error_setg(errp, "Unexpected COLO message %d, expected %d", msg, expect_msg); } } static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, Error **errp) { Error *local_err = NULL; uint64_t value; int ret; colo_receive_check_message(f, expect_msg, &local_err); if (local_err) { error_propagate(errp, local_err); return 0; } value = qemu_get_be64(f); ret = qemu_file_get_error(f); if (ret < 0) { error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", COLOMessage_lookup[expect_msg]); } return value; } static int colo_do_checkpoint_transaction(MigrationState *s, QIOChannelBuffer *bioc, QEMUFile *fb) { Error *local_err = NULL; int ret = -1; colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, &local_err); if (local_err) { goto out; } colo_receive_check_message(s->rp_state.from_dst_file, COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); if (local_err) { goto out; } /* Reset channel-buffer directly */ qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); bioc->usage = 0; qemu_mutex_lock_iothread(); vm_stop_force_state(RUN_STATE_COLO); qemu_mutex_unlock_iothread(); trace_colo_vm_state_change("run", "stop"); /* Disable block migration */ s->params.blk = 0; s->params.shared = 0; qemu_savevm_state_header(fb); qemu_savevm_state_begin(fb, &s->params); qemu_mutex_lock_iothread(); qemu_savevm_state_complete_precopy(fb, false); qemu_mutex_unlock_iothread(); qemu_fflush(fb); colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); if (local_err) { goto out; } /* * We need the size of the VMstate data in Secondary side, * With which we can decide how much data should be read. */ colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, bioc->usage, &local_err); if (local_err) { goto out; } qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); qemu_fflush(s->to_dst_file); ret = qemu_file_get_error(s->to_dst_file); if (ret < 0) { goto out; } colo_receive_check_message(s->rp_state.from_dst_file, COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); if (local_err) { goto out; } colo_receive_check_message(s->rp_state.from_dst_file, COLO_MESSAGE_VMSTATE_LOADED, &local_err); if (local_err) { goto out; } ret = 0; qemu_mutex_lock_iothread(); vm_start(); qemu_mutex_unlock_iothread(); trace_colo_vm_state_change("stop", "run"); out: if (local_err) { error_report_err(local_err); } return ret; } static void colo_process_checkpoint(MigrationState *s) { QIOChannelBuffer *bioc; QEMUFile *fb = NULL; Error *local_err = NULL; int ret; s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); if (!s->rp_state.from_dst_file) { error_report("Open QEMUFile from_dst_file failed"); goto out; } /* * Wait for Secondary finish loading VM states and enter COLO * restore. */ colo_receive_check_message(s->rp_state.from_dst_file, COLO_MESSAGE_CHECKPOINT_READY, &local_err); if (local_err) { goto out; } bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); object_unref(OBJECT(bioc)); qemu_mutex_lock_iothread(); vm_start(); qemu_mutex_unlock_iothread(); trace_colo_vm_state_change("stop", "run"); while (s->state == MIGRATION_STATUS_COLO) { ret = colo_do_checkpoint_transaction(s, bioc, fb); if (ret < 0) { goto out; } } out: /* Throw the unreported error message after exited from loop */ if (local_err) { error_report_err(local_err); } if (fb) { qemu_fclose(fb); } if (s->rp_state.from_dst_file) { qemu_fclose(s->rp_state.from_dst_file); } } void migrate_start_colo_process(MigrationState *s) { qemu_mutex_unlock_iothread(); migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_COLO); colo_process_checkpoint(s); qemu_mutex_lock_iothread(); } static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request, Error **errp) { COLOMessage msg; Error *local_err = NULL; msg = colo_receive_message(f, &local_err); if (local_err) { error_propagate(errp, local_err); return; } switch (msg) { case COLO_MESSAGE_CHECKPOINT_REQUEST: *checkpoint_request = 1; break; default: *checkpoint_request = 0; error_setg(errp, "Got unknown COLO message: %d", msg); break; } } void *colo_process_incoming_thread(void *opaque) { MigrationIncomingState *mis = opaque; QEMUFile *fb = NULL; QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ uint64_t total_size; uint64_t value; Error *local_err = NULL; migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_COLO); mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); if (!mis->to_src_file) { error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); goto out; } /* * Note: the communication between Primary side and Secondary side * should be sequential, we set the fd to unblocked in migration incoming * coroutine, and here we are in the COLO incoming thread, so it is ok to * set the fd back to blocked. */ qemu_file_set_blocking(mis->from_src_file, true); bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); object_unref(OBJECT(bioc)); colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, &local_err); if (local_err) { goto out; } while (mis->state == MIGRATION_STATUS_COLO) { int request; colo_wait_handle_message(mis->from_src_file, &request, &local_err); if (local_err) { goto out; } assert(request); /* FIXME: This is unnecessary for periodic checkpoint mode */ colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); if (local_err) { goto out; } colo_receive_check_message(mis->from_src_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); if (local_err) { goto out; } value = colo_receive_message_value(mis->from_src_file, COLO_MESSAGE_VMSTATE_SIZE, &local_err); if (local_err) { goto out; } /* * Read VM device state data into channel buffer, * It's better to re-use the memory allocated. * Here we need to handle the channel buffer directly. */ if (value > bioc->capacity) { bioc->capacity = value; bioc->data = g_realloc(bioc->data, bioc->capacity); } total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); if (total_size != value) { error_report("Got %" PRIu64 " VMState data, less than expected" " %" PRIu64, total_size, value); goto out; } bioc->usage = total_size; qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); if (local_err) { goto out; } qemu_mutex_lock_iothread(); qemu_system_reset(VMRESET_SILENT); if (qemu_loadvm_state(fb) < 0) { error_report("COLO: loadvm failed"); qemu_mutex_unlock_iothread(); goto out; } qemu_mutex_unlock_iothread(); colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, &local_err); if (local_err) { goto out; } } out: /* Throw the unreported error message after exited from loop */ if (local_err) { error_report_err(local_err); } if (fb) { qemu_fclose(fb); } if (mis->to_src_file) { qemu_fclose(mis->to_src_file); } migration_incoming_exit_colo(); return NULL; }