aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Hajnoczi <stefanha@redhat.com>2022-11-14 16:59:10 -0500
committerStefan Hajnoczi <stefanha@redhat.com>2022-11-14 16:59:10 -0500
commitbb00d0aa620e821cc2fbf2e0c5de45a8c957626c (patch)
tree1ae5ed8bdb47a0788d8e6860ea690a398bc096fe
parent98f10f0e2613ba1ac2ad3f57a5174014f6dcb03d (diff)
parent46530d3560b6fd5de38b6f4e45ce7d7135b9add7 (diff)
Merge tag 'for-upstream' of https://repo.or.cz/qemu/kevin into staging
Block layer patches - Fix deadlock in graph modification with iothreads - mirror: Fix non-converging cases for active mirror - qapi: Fix BlockdevOptionsNvmeIoUring @path description - blkio: Set BlockDriver::has_variable_length to false # -----BEGIN PGP SIGNATURE----- # # iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmNyIF8RHGt3b2xmQHJl # ZGhhdC5jb20ACgkQfwmycsiPL9ZcHRAAwcQ9cLu6Oh96iWvCCOIxqOsEzVYeCwxI # yJrrOYSKvMabWms+gg3m5zYt/sU4CRvjzFMd/WDl4LXN4B1SNBdOjPXkswoLA6cU # QvzbVNRPgZxodVXewjWw5fNFYkBvA+Jgx9ffEK0dYAWKFN3bT6I3NzjcKr2eJ2d2 # Y8RzltBspwwadyTH0lQxY8HfXE7UHukBCAVkcbqQQYuzKa2dR9ERKfRM10uDZwNI # eNGWu1W0xvE3+nXqnGfXUXVO7R7Q5L0HfShr4Dhw0zyWbg6DBJRi7iY8cVV1VmCp # M0C8ybODRdsMcRJh+k+Q+T33oRBnXytXDiNzNRHx2gOabuc6k/sc6aSfcIvgCMQf # PLQsHI0a1o/N238N1Znhfn+M5S0+elTy/xwmzXN2rL3whNMJ9IRoqoxh7nH90CB2 # F7lMjp7FMmJVYtmy0FcBDUVfShgzqM1TsORAXUfdU5QXf4wA+FyZ16SN/WYYfg4B # ZCsdu2vDimA4rNOiWpPEBNLnHv3S/cswTqobQUQ2QN0zzGPZxoKEWAuG4pqlmSGN # nMgEiLGFL7Ztgpjw6ZQCisL5rh0P9g53JgY8+b68KfeDXG+R2bEHPtZotIVz7mT7 # JP5ydTyxozNGvMCKg/0Fp1HaHU1ADm9swnWm5cYm/ax9hq5rMNsaq6YTLap1o1PP # e1Oe0rnq/Ys= # =zRlt # -----END PGP SIGNATURE----- # gpg: Signature made Mon 14 Nov 2022 06:02:55 EST # gpg: using RSA key DC3DEB159A9AF95D3D7456FE7F09B272C88F2FD6 # gpg: issuer "kwolf@redhat.com" # gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>" [full] # Primary key fingerprint: DC3D EB15 9A9A F95D 3D74 56FE 7F09 B272 C88F 2FD6 * tag 'for-upstream' of https://repo.or.cz/qemu/kevin: tests/stream-under-throttle: New test block: Start/end drain on correct AioContext block-backend: Update ctx immediately after root block: Make bdrv_child_get_parent_aio_context I/O block/blkio: Set BlockDriver::has_variable_length to false qapi/block-core: Fix BlockdevOptionsNvmeIoUring @path description iotests/151: Test active requests on mirror start iotests/151: Test that active mirror progresses block/mirror: Fix NULL s->job in active writes block/mirror: Drop mirror_wait_for_any_operation() block/mirror: Do not wait for active writes Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
-rw-r--r--block.c2
-rw-r--r--block/blkio.c1
-rw-r--r--block/block-backend.c9
-rw-r--r--block/io.c6
-rw-r--r--block/mirror.c78
-rw-r--r--blockjob.c3
-rw-r--r--include/block/block-global-state.h1
-rw-r--r--include/block/block-io.h2
-rw-r--r--include/block/block_int-common.h4
-rw-r--r--qapi/block-core.json2
-rwxr-xr-xtests/qemu-iotests/151227
-rw-r--r--tests/qemu-iotests/151.out4
-rwxr-xr-xtests/qemu-iotests/tests/stream-under-throttle121
-rw-r--r--tests/qemu-iotests/tests/stream-under-throttle.out5
14 files changed, 424 insertions, 41 deletions
diff --git a/block.c b/block.c
index c5e20c0bea..a18f052374 100644
--- a/block.c
+++ b/block.c
@@ -1543,7 +1543,7 @@ const BdrvChildClass child_of_bds = {
AioContext *bdrv_child_get_parent_aio_context(BdrvChild *c)
{
- GLOBAL_STATE_CODE();
+ IO_CODE();
return c->klass->get_parent_aio_context(c);
}
diff --git a/block/blkio.c b/block/blkio.c
index 620fab28a7..5eae3adfaf 100644
--- a/block/blkio.c
+++ b/block/blkio.c
@@ -993,7 +993,6 @@ static void blkio_refresh_limits(BlockDriverState *bs, Error **errp)
{ \
.format_name = name, \
.protocol_name = name, \
- .has_variable_length = true, \
.instance_size = sizeof(BDRVBlkioState), \
.bdrv_file_open = blkio_file_open, \
.bdrv_close = blkio_close, \
diff --git a/block/block-backend.c b/block/block-backend.c
index c0c7d56c8d..b48c91f4e1 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -311,6 +311,7 @@ static void blk_root_detach(BdrvChild *child)
static AioContext *blk_root_get_parent_aio_context(BdrvChild *c)
{
BlockBackend *blk = c->opaque;
+ IO_CODE();
return blk_get_aio_context(blk);
}
@@ -2157,6 +2158,11 @@ static int blk_do_set_aio_context(BlockBackend *blk, AioContext *new_context,
return ret;
}
}
+ /*
+ * Make blk->ctx consistent with the root node before we invoke any
+ * other operations like drain that might inquire blk->ctx
+ */
+ blk->ctx = new_context;
if (tgm->throttle_state) {
bdrv_drained_begin(bs);
throttle_group_detach_aio_context(tgm);
@@ -2165,9 +2171,10 @@ static int blk_do_set_aio_context(BlockBackend *blk, AioContext *new_context,
}
bdrv_unref(bs);
+ } else {
+ blk->ctx = new_context;
}
- blk->ctx = new_context;
return 0;
}
diff --git a/block/io.c b/block/io.c
index 34b30e304e..b9424024f9 100644
--- a/block/io.c
+++ b/block/io.c
@@ -71,9 +71,10 @@ static void bdrv_parent_drained_end_single_no_poll(BdrvChild *c,
void bdrv_parent_drained_end_single(BdrvChild *c)
{
int drained_end_counter = 0;
+ AioContext *ctx = bdrv_child_get_parent_aio_context(c);
IO_OR_GS_CODE();
bdrv_parent_drained_end_single_no_poll(c, &drained_end_counter);
- BDRV_POLL_WHILE(c->bs, qatomic_read(&drained_end_counter) > 0);
+ AIO_WAIT_WHILE(ctx, qatomic_read(&drained_end_counter) > 0);
}
static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
@@ -116,13 +117,14 @@ static bool bdrv_parent_drained_poll(BlockDriverState *bs, BdrvChild *ignore,
void bdrv_parent_drained_begin_single(BdrvChild *c, bool poll)
{
+ AioContext *ctx = bdrv_child_get_parent_aio_context(c);
IO_OR_GS_CODE();
c->parent_quiesce_counter++;
if (c->klass->drained_begin) {
c->klass->drained_begin(c);
}
if (poll) {
- BDRV_POLL_WHILE(c->bs, bdrv_parent_drained_poll_single(c));
+ AIO_WAIT_WHILE(ctx, bdrv_parent_drained_poll_single(c));
}
}
diff --git a/block/mirror.c b/block/mirror.c
index 1a75a47cc3..251adc5ae0 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -82,6 +82,7 @@ typedef struct MirrorBlockJob {
int max_iov;
bool initial_zeroing_ongoing;
int in_active_write_counter;
+ int64_t active_write_bytes_in_flight;
bool prepared;
bool in_drain;
} MirrorBlockJob;
@@ -304,19 +305,21 @@ static int mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
}
static inline void coroutine_fn
-mirror_wait_for_any_operation(MirrorBlockJob *s, bool active)
+mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
{
MirrorOp *op;
QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
- /* Do not wait on pseudo ops, because it may in turn wait on
+ /*
+ * Do not wait on pseudo ops, because it may in turn wait on
* some other operation to start, which may in fact be the
* caller of this function. Since there is only one pseudo op
* at any given time, we will always find some real operation
- * to wait on. */
- if (!op->is_pseudo_op && op->is_in_flight &&
- op->is_active_write == active)
- {
+ * to wait on.
+ * Also, do not wait on active operations, because they do not
+ * use up in-flight slots.
+ */
+ if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
qemu_co_queue_wait(&op->waiting_requests, NULL);
return;
}
@@ -324,13 +327,6 @@ mirror_wait_for_any_operation(MirrorBlockJob *s, bool active)
abort();
}
-static inline void coroutine_fn
-mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
-{
- /* Only non-active operations use up in-flight slots */
- mirror_wait_for_any_operation(s, false);
-}
-
/* Perform a mirror copy operation.
*
* *op->bytes_handled is set to the number of bytes copied after and
@@ -494,6 +490,13 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
}
bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
+ /*
+ * Wait for concurrent requests to @offset. The next loop will limit the
+ * copied area based on in_flight_bitmap so we only copy an area that does
+ * not overlap with concurrent in-flight requests. Still, we would like to
+ * copy something, so wait until there are at least no more requests to the
+ * very beginning of the area.
+ */
mirror_wait_on_conflicts(NULL, s, offset, 1);
job_pause_point(&s->common.job);
@@ -988,12 +991,6 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
int64_t cnt, delta;
bool should_complete;
- /* Do not start passive operations while there are active
- * writes in progress */
- while (s->in_active_write_counter) {
- mirror_wait_for_any_operation(s, true);
- }
-
if (s->ret < 0) {
ret = s->ret;
goto immediate_exit;
@@ -1010,7 +1007,9 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
/* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
* the number of bytes currently being processed; together those are
* the current remaining operation length */
- job_progress_set_remaining(&s->common.job, s->bytes_in_flight + cnt);
+ job_progress_set_remaining(&s->common.job,
+ s->bytes_in_flight + cnt +
+ s->active_write_bytes_in_flight);
/* Note that even when no rate limit is applied we need to yield
* periodically with no pending I/O so that bdrv_drain_all() returns.
@@ -1071,6 +1070,10 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
s->in_drain = true;
bdrv_drained_begin(bs);
+
+ /* Must be zero because we are drained */
+ assert(s->in_active_write_counter == 0);
+
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
if (cnt > 0 || mirror_flush(s) < 0) {
bdrv_drained_end(bs);
@@ -1306,6 +1309,7 @@ do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
}
job_progress_increase_remaining(&job->common.job, bytes);
+ job->active_write_bytes_in_flight += bytes;
switch (method) {
case MIRROR_METHOD_COPY:
@@ -1327,6 +1331,7 @@ do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
abort();
}
+ job->active_write_bytes_in_flight -= bytes;
if (ret >= 0) {
job_progress_update(&job->common.job, bytes);
} else {
@@ -1375,6 +1380,19 @@ static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
s->in_active_write_counter++;
+ /*
+ * Wait for concurrent requests affecting the area. If there are already
+ * running requests that are copying off now-to-be stale data in the area,
+ * we must wait for them to finish before we begin writing fresh data to the
+ * target so that the write operations appear in the correct order.
+ * Note that background requests (see mirror_iteration()) in contrast only
+ * wait for conflicting requests at the start of the dirty area, and then
+ * (based on the in_flight_bitmap) truncate the area to copy so it will not
+ * conflict with any requests beyond that. For active writes, however, we
+ * cannot truncate that area. The request from our parent must be blocked
+ * until the area is copied in full. Therefore, we must wait for the whole
+ * area to become free of concurrent requests.
+ */
mirror_wait_on_conflicts(op, s, offset, bytes);
bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
@@ -1420,11 +1438,13 @@ static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs,
MirrorOp *op = NULL;
MirrorBDSOpaque *s = bs->opaque;
int ret = 0;
- bool copy_to_target;
+ bool copy_to_target = false;
- copy_to_target = s->job->ret >= 0 &&
- !job_is_cancelled(&s->job->common.job) &&
- s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ if (s->job) {
+ copy_to_target = s->job->ret >= 0 &&
+ !job_is_cancelled(&s->job->common.job) &&
+ s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ }
if (copy_to_target) {
op = active_write_prepare(s->job, offset, bytes);
@@ -1469,11 +1489,13 @@ static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs,
QEMUIOVector bounce_qiov;
void *bounce_buf;
int ret = 0;
- bool copy_to_target;
+ bool copy_to_target = false;
- copy_to_target = s->job->ret >= 0 &&
- !job_is_cancelled(&s->job->common.job) &&
- s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ if (s->job) {
+ copy_to_target = s->job->ret >= 0 &&
+ !job_is_cancelled(&s->job->common.job) &&
+ s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+ }
if (copy_to_target) {
/* The guest might concurrently modify the data to write; but
diff --git a/blockjob.c b/blockjob.c
index 2d86014fa5..f51d4e18f3 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -173,7 +173,8 @@ static bool child_job_change_aio_ctx(BdrvChild *c, AioContext *ctx,
static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
{
BlockJob *job = c->opaque;
- GLOBAL_STATE_CODE();
+ IO_CODE();
+ JOB_LOCK_GUARD();
return job->job.aio_context;
}
diff --git a/include/block/block-global-state.h b/include/block/block-global-state.h
index bb42ed9559..c7bd4a2088 100644
--- a/include/block/block-global-state.h
+++ b/include/block/block-global-state.h
@@ -220,7 +220,6 @@ void coroutine_fn bdrv_co_lock(BlockDriverState *bs);
*/
void coroutine_fn bdrv_co_unlock(BlockDriverState *bs);
-AioContext *bdrv_child_get_parent_aio_context(BdrvChild *c);
bool bdrv_child_change_aio_context(BdrvChild *c, AioContext *ctx,
GHashTable *visited, Transaction *tran,
Error **errp);
diff --git a/include/block/block-io.h b/include/block/block-io.h
index 770ddeb7c8..b099d7db45 100644
--- a/include/block/block-io.h
+++ b/include/block/block-io.h
@@ -171,6 +171,8 @@ void bdrv_debug_event(BlockDriverState *bs, BlkdebugEvent event);
*/
AioContext *bdrv_get_aio_context(BlockDriverState *bs);
+AioContext *bdrv_child_get_parent_aio_context(BdrvChild *c);
+
/**
* Move the current coroutine to the AioContext of @bs and return the old
* AioContext of the coroutine. Increase bs->in_flight so that draining @bs
diff --git a/include/block/block_int-common.h b/include/block/block_int-common.h
index 5a2cc077a0..31ae91e56e 100644
--- a/include/block/block_int-common.h
+++ b/include/block/block_int-common.h
@@ -911,8 +911,6 @@ struct BdrvChildClass {
GHashTable *visited, Transaction *tran,
Error **errp);
- AioContext *(*get_parent_aio_context)(BdrvChild *child);
-
/*
* I/O API functions. These functions are thread-safe.
*
@@ -929,6 +927,8 @@ struct BdrvChildClass {
*/
const char *(*get_name)(BdrvChild *child);
+ AioContext *(*get_parent_aio_context)(BdrvChild *child);
+
/*
* If this pair of functions is implemented, the parent doesn't issue new
* requests after returning from .drained_begin() until .drained_end() is
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 6d904004f8..95ac4fa634 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -3704,7 +3704,7 @@
#
# Driver specific block device options for the nvme-io_uring backend.
#
-# @path: path to the image file
+# @path: path to the NVMe namespace's character device (e.g. /dev/ng0n1).
#
# Since: 7.2
##
diff --git a/tests/qemu-iotests/151 b/tests/qemu-iotests/151
index 93d14193d0..b4d1bc2553 100755
--- a/tests/qemu-iotests/151
+++ b/tests/qemu-iotests/151
@@ -19,7 +19,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+import math
import os
+import subprocess
+import time
+from typing import List, Optional
import iotests
from iotests import qemu_img
@@ -50,7 +54,7 @@ class TestActiveMirror(iotests.QMPTestCase):
self.vm = iotests.VM()
self.vm.add_drive_raw(self.vm.qmp_to_opts(blk_source))
self.vm.add_blockdev(self.vm.qmp_to_opts(blk_target))
- self.vm.add_device('virtio-blk,drive=source')
+ self.vm.add_device('virtio-blk,id=vblk,drive=source')
self.vm.launch()
def tearDown(self):
@@ -192,6 +196,227 @@ class TestActiveMirror(iotests.QMPTestCase):
self.potential_writes_in_flight = False
+class TestThrottledWithNbdExportBase(iotests.QMPTestCase):
+ image_len = 128 * 1024 * 1024 # MB
+ iops: Optional[int] = None
+ background_processes: List['subprocess.Popen[str]'] = []
+
+ def setUp(self):
+ # Must be set by subclasses
+ self.assertIsNotNone(self.iops)
+
+ qemu_img('create', '-f', iotests.imgfmt, source_img, '128M')
+ qemu_img('create', '-f', iotests.imgfmt, target_img, '128M')
+
+ self.vm = iotests.VM()
+ self.vm.launch()
+
+ result = self.vm.qmp('object-add', **{
+ 'qom-type': 'throttle-group',
+ 'id': 'thrgr',
+ 'limits': {
+ 'iops-total': self.iops,
+ 'iops-total-max': self.iops
+ }
+ })
+ self.assert_qmp(result, 'return', {})
+
+ result = self.vm.qmp('blockdev-add', **{
+ 'node-name': 'source-node',
+ 'driver': 'throttle',
+ 'throttle-group': 'thrgr',
+ 'file': {
+ 'driver': iotests.imgfmt,
+ 'file': {
+ 'driver': 'file',
+ 'filename': source_img
+ }
+ }
+ })
+ self.assert_qmp(result, 'return', {})
+
+ result = self.vm.qmp('blockdev-add', **{
+ 'node-name': 'target-node',
+ 'driver': iotests.imgfmt,
+ 'file': {
+ 'driver': 'file',
+ 'filename': target_img
+ }
+ })
+ self.assert_qmp(result, 'return', {})
+
+ self.nbd_sock = iotests.file_path('nbd.sock',
+ base_dir=iotests.sock_dir)
+ self.nbd_url = f'nbd+unix:///source-node?socket={self.nbd_sock}'
+
+ result = self.vm.qmp('nbd-server-start', addr={
+ 'type': 'unix',
+ 'data': {
+ 'path': self.nbd_sock
+ }
+ })
+ self.assert_qmp(result, 'return', {})
+
+ result = self.vm.qmp('block-export-add', id='exp0', type='nbd',
+ node_name='source-node', writable=True)
+ self.assert_qmp(result, 'return', {})
+
+ def tearDown(self):
+ # Wait for background requests to settle
+ try:
+ while True:
+ p = self.background_processes.pop()
+ while True:
+ try:
+ p.wait(timeout=0.0)
+ break
+ except subprocess.TimeoutExpired:
+ self.vm.qtest(f'clock_step {1 * 1000 * 1000 * 1000}')
+ except IndexError:
+ pass
+
+ # Cancel ongoing block jobs
+ for job in self.vm.qmp('query-jobs')['return']:
+ self.vm.qmp('block-job-cancel', device=job['id'], force=True)
+
+ while True:
+ self.vm.qtest(f'clock_step {1 * 1000 * 1000 * 1000}')
+ if len(self.vm.qmp('query-jobs')['return']) == 0:
+ break
+
+ self.vm.shutdown()
+ os.remove(source_img)
+ os.remove(target_img)
+
+
+class TestLowThrottledWithNbdExport(TestThrottledWithNbdExportBase):
+ iops = 16
+
+ def testUnderLoad(self):
+ '''
+ Throttle the source node, then issue a whole bunch of external requests
+ while the mirror job (in write-blocking mode) is running. We want to
+ see background requests being issued even while the source is under
+ full load by active writes, so that progress can be made towards READY.
+ '''
+
+ # Fill the first half of the source image; do not fill the second half,
+ # that is where we will have active requests occur. This ensures that
+ # active mirroring itself will not directly contribute to the job's
+ # progress (because when the job was started, those areas were not
+ # intended to be copied, so active mirroring will only lead to not
+ # losing progress, but also not making any).
+ self.vm.hmp_qemu_io('source-node',
+ f'aio_write -P 1 0 {self.image_len // 2}')
+ self.vm.qtest(f'clock_step {1 * 1000 * 1000 * 1000}')
+
+ # Launch the mirror job
+ mirror_buf_size = 65536
+ result = self.vm.qmp('blockdev-mirror',
+ job_id='mirror',
+ filter_node_name='mirror-node',
+ device='source-node',
+ target='target-node',
+ sync='full',
+ copy_mode='write-blocking',
+ buf_size=mirror_buf_size)
+ self.assert_qmp(result, 'return', {})
+
+ # We create the external requests via qemu-io processes on the NBD
+ # server. Have their offset start in the middle of the image so they
+ # do not overlap with the background requests (which start from the
+ # beginning).
+ active_request_offset = self.image_len // 2
+ active_request_len = 4096
+
+ # Create enough requests to saturate the node for 5 seconds
+ for _ in range(0, 5 * self.iops):
+ req = f'write -P 42 {active_request_offset} {active_request_len}'
+ active_request_offset += active_request_len
+ p = iotests.qemu_io_popen('-f', 'nbd', self.nbd_url, '-c', req)
+ self.background_processes += [p]
+
+ # Now advance the clock one I/O operation at a time by the 4 seconds
+ # (i.e. one less than 5). We expect the mirror job to issue background
+ # operations here, even though active requests are still in flight.
+ # The active requests will take precedence, however, because they have
+ # been issued earlier than mirror's background requests.
+ # Once the active requests we have started above are done (i.e. after 5
+ # virtual seconds), we expect those background requests to be worked
+ # on. We only advance 4 seconds here to avoid race conditions.
+ for _ in range(0, 4 * self.iops):
+ step = math.ceil(1 * 1000 * 1000 * 1000 / self.iops)
+ self.vm.qtest(f'clock_step {step}')
+
+ # Note how much remains to be done until the mirror job is finished
+ job_status = self.vm.qmp('query-jobs')['return'][0]
+ start_remaining = job_status['total-progress'] - \
+ job_status['current-progress']
+
+ # Create a whole bunch of more active requests
+ for _ in range(0, 10 * self.iops):
+ req = f'write -P 42 {active_request_offset} {active_request_len}'
+ active_request_offset += active_request_len
+ p = iotests.qemu_io_popen('-f', 'nbd', self.nbd_url, '-c', req)
+ self.background_processes += [p]
+
+ # Let the clock advance more. After 1 second, as noted above, we
+ # expect the background requests to be worked on. Give them a couple
+ # of seconds (specifically 4) to see their impact.
+ for _ in range(0, 5 * self.iops):
+ step = math.ceil(1 * 1000 * 1000 * 1000 / self.iops)
+ self.vm.qtest(f'clock_step {step}')
+
+ # Note how much remains to be done now. We expect this number to be
+ # reduced thanks to those background requests.
+ job_status = self.vm.qmp('query-jobs')['return'][0]
+ end_remaining = job_status['total-progress'] - \
+ job_status['current-progress']
+
+ # See that indeed progress was being made on the job, even while the
+ # node was saturated with active requests
+ self.assertGreater(start_remaining - end_remaining, 0)
+
+
+class TestHighThrottledWithNbdExport(TestThrottledWithNbdExportBase):
+ iops = 1024
+
+ def testActiveOnCreation(self):
+ '''
+ Issue requests on the mirror source node right as the mirror is
+ instated. It's possible that requests occur before the actual job is
+ created, but after the node has been put into the graph. Write
+ requests across the node must in that case be forwarded to the source
+ node without attempting to mirror them (there is no job object yet, so
+ attempting to access it would cause a segfault).
+ We do this with a lightly throttled node (i.e. quite high IOPS limit).
+ Using throttling seems to increase reproductivity, but if the limit is
+ too low, all requests allowed per second will be submitted before
+ mirror_start_job() gets to the problematic point.
+ '''
+
+ # Let qemu-img bench create write requests (enough for two seconds on
+ # the virtual clock)
+ bench_args = ['bench', '-w', '-d', '1024', '-f', 'nbd',
+ '-c', str(self.iops * 2), self.nbd_url]
+ p = iotests.qemu_tool_popen(iotests.qemu_img_args + bench_args)
+ self.background_processes += [p]
+
+ # Give qemu-img bench time to start up and issue requests
+ time.sleep(1.0)
+ # Flush the request queue, so new requests can come in right as we
+ # start blockdev-mirror
+ self.vm.qtest(f'clock_step {1 * 1000 * 1000 * 1000}')
+
+ result = self.vm.qmp('blockdev-mirror',
+ job_id='mirror',
+ device='source-node',
+ target='target-node',
+ sync='full',
+ copy_mode='write-blocking')
+ self.assert_qmp(result, 'return', {})
+
+
if __name__ == '__main__':
iotests.main(supported_fmts=['qcow2', 'raw'],
supported_protocols=['file'])
diff --git a/tests/qemu-iotests/151.out b/tests/qemu-iotests/151.out
index 89968f35d7..3f8a935a08 100644
--- a/tests/qemu-iotests/151.out
+++ b/tests/qemu-iotests/151.out
@@ -1,5 +1,5 @@
-....
+......
----------------------------------------------------------------------
-Ran 4 tests
+Ran 6 tests
OK
diff --git a/tests/qemu-iotests/tests/stream-under-throttle b/tests/qemu-iotests/tests/stream-under-throttle
new file mode 100755
index 0000000000..8d2d9e1684
--- /dev/null
+++ b/tests/qemu-iotests/tests/stream-under-throttle
@@ -0,0 +1,121 @@
+#!/usr/bin/env python3
+# group: rw
+#
+# Test streaming with throttle nodes on top
+#
+# Copyright (C) 2022 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import os
+from typing import List
+import iotests
+from iotests import qemu_img_create, qemu_io
+
+
+image_size = 256 * 1024 * 1024
+base_img = os.path.join(iotests.test_dir, 'base.img')
+top_img = os.path.join(iotests.test_dir, 'top.img')
+
+
+class TcgVM(iotests.VM):
+ '''
+ Variant of iotests.VM that uses -accel tcg. Simply using
+ iotests.VM.add_args('-accel', 'tcg') is not sufficient, because that will
+ put -accel qtest before -accel tcg, and -accel arguments are prioritized in
+ the order they appear.
+ '''
+ @property
+ def _base_args(self) -> List[str]:
+ # Put -accel tcg first so it takes precedence
+ return ['-accel', 'tcg'] + super()._base_args
+
+
+class TestStreamWithThrottle(iotests.QMPTestCase):
+ def setUp(self) -> None:
+ '''
+ Create a simple backing chain between two images, write something to
+ the base image. Attach them to the VM underneath two throttle nodes,
+ one of which has actually no limits set, but the other does. Then put
+ a virtio-blk device on top.
+ This test configuration has been taken from
+ https://gitlab.com/qemu-project/qemu/-/issues/1215
+ '''
+ qemu_img_create('-f', iotests.imgfmt, base_img, str(image_size))
+ qemu_img_create('-f', iotests.imgfmt, '-b', base_img, '-F',
+ iotests.imgfmt, top_img, str(image_size))
+
+ # Write something to stream
+ qemu_io(base_img, '-c', f'write 0 {image_size}')
+
+ blockdev = {
+ 'driver': 'throttle',
+ 'node-name': 'throttled-node',
+ 'throttle-group': 'thrgr-limited',
+ 'file': {
+ 'driver': 'throttle',
+ 'throttle-group': 'thrgr-unlimited',
+ 'file': {
+ 'driver': iotests.imgfmt,
+ 'node-name': 'unthrottled-node',
+ 'file': {
+ 'driver': 'file',
+ 'filename': top_img
+ }
+ }
+ }
+ }
+
+ # Issue 1215 is not reproducible in qtest mode, which is why we need to
+ # create an -accel tcg VM
+ self.vm = TcgVM()
+ self.vm.add_object('iothread,id=iothr0')
+ self.vm.add_object('throttle-group,id=thrgr-unlimited')
+ self.vm.add_object('throttle-group,id=thrgr-limited,'
+ 'x-iops-total=10000,x-bps-total=104857600')
+ self.vm.add_blockdev(self.vm.qmp_to_opts(blockdev))
+ self.vm.add_device('virtio-blk,iothread=iothr0,drive=throttled-node')
+ self.vm.launch()
+
+ def tearDown(self) -> None:
+ self.vm.shutdown()
+ os.remove(top_img)
+ os.remove(base_img)
+
+ def test_stream(self) -> None:
+ '''
+ Do a simple stream beneath the two throttle nodes. Should complete
+ with no problems.
+ '''
+ result = self.vm.qmp('block-stream',
+ job_id='stream',
+ device='unthrottled-node')
+ self.assert_qmp(result, 'return', {})
+
+ # Should succeed and not time out
+ try:
+ self.vm.run_job('stream')
+ except asyncio.TimeoutError:
+ # VM may be stuck, kill it before tearDown()
+ self.vm.kill()
+ raise
+
+
+if __name__ == '__main__':
+ # Must support backing images
+ iotests.main(supported_fmts=['qcow', 'qcow2', 'qed'],
+ supported_protocols=['file'],
+ required_fmts=['throttle'])
diff --git a/tests/qemu-iotests/tests/stream-under-throttle.out b/tests/qemu-iotests/tests/stream-under-throttle.out
new file mode 100644
index 0000000000..ae1213e6f8
--- /dev/null
+++ b/tests/qemu-iotests/tests/stream-under-throttle.out
@@ -0,0 +1,5 @@
+.
+----------------------------------------------------------------------
+Ran 1 tests
+
+OK