aboutsummaryrefslogtreecommitdiff
path: root/job.c
diff options
context:
space:
mode:
authorKevin Wolf <kwolf@redhat.com>2018-04-13 17:31:02 +0200
committerKevin Wolf <kwolf@redhat.com>2018-05-23 14:30:50 +0200
commitda01ff7f38f52791f93fc3ca59afcfbb220f15af (patch)
tree74010498c6229085ce3f2cc2184168aec5fb99f8 /job.c
parent1908a5590c7d214b1b6886bc19b81076fb65cec9 (diff)
job: Move coroutine and related code to Job
This commit moves some core functions for dealing with the job coroutine from BlockJob to Job. This includes primarily entering the coroutine (both for the first and reentering) and yielding explicitly and at pause points. Signed-off-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: John Snow <jsnow@redhat.com>
Diffstat (limited to 'job.c')
-rw-r--r--job.c137
1 files changed, 137 insertions, 0 deletions
diff --git a/job.c b/job.c
index c5a37fb8ef..78497fd6f5 100644
--- a/job.c
+++ b/job.c
@@ -60,6 +60,26 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
[JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
};
+/* Right now, this mutex is only needed to synchronize accesses to job->busy
+ * and job->sleep_timer, such as concurrent calls to job_do_yield and
+ * job_enter. */
+static QemuMutex job_mutex;
+
+static void job_lock(void)
+{
+ qemu_mutex_lock(&job_mutex);
+}
+
+static void job_unlock(void)
+{
+ qemu_mutex_unlock(&job_mutex);
+}
+
+static void __attribute__((__constructor__)) job_init(void)
+{
+ qemu_mutex_init(&job_mutex);
+}
+
/* TODO Make static once the whole state machine is in job.c */
void job_state_transition(Job *job, JobStatus s1)
{
@@ -101,6 +121,16 @@ bool job_is_cancelled(Job *job)
return job->cancelled;
}
+bool job_started(Job *job)
+{
+ return job->co;
+}
+
+bool job_should_pause(Job *job)
+{
+ return job->pause_count > 0;
+}
+
Job *job_next(Job *job)
{
if (!job) {
@@ -143,6 +173,9 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
job->id = g_strdup(job_id);
job->refcnt = 1;
job->aio_context = ctx;
+ job->busy = false;
+ job->paused = true;
+ job->pause_count = 1;
job_state_transition(job, JOB_STATUS_CREATED);
@@ -172,6 +205,110 @@ void job_unref(Job *job)
}
}
+void job_enter_cond(Job *job, bool(*fn)(Job *job))
+{
+ if (!job_started(job)) {
+ return;
+ }
+ if (job->deferred_to_main_loop) {
+ return;
+ }
+
+ job_lock();
+ if (job->busy) {
+ job_unlock();
+ return;
+ }
+
+ if (fn && !fn(job)) {
+ job_unlock();
+ return;
+ }
+
+ assert(!job->deferred_to_main_loop);
+ timer_del(&job->sleep_timer);
+ job->busy = true;
+ job_unlock();
+ aio_co_wake(job->co);
+}
+
+/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
+ * Reentering the job coroutine with block_job_enter() before the timer has
+ * expired is allowed and cancels the timer.
+ *
+ * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
+ * called explicitly. */
+void coroutine_fn job_do_yield(Job *job, uint64_t ns)
+{
+ job_lock();
+ if (ns != -1) {
+ timer_mod(&job->sleep_timer, ns);
+ }
+ job->busy = false;
+ job_unlock();
+ qemu_coroutine_yield();
+
+ /* Set by job_enter_cond() before re-entering the coroutine. */
+ assert(job->busy);
+}
+
+void coroutine_fn job_pause_point(Job *job)
+{
+ assert(job && job_started(job));
+
+ if (!job_should_pause(job)) {
+ return;
+ }
+ if (job_is_cancelled(job)) {
+ return;
+ }
+
+ if (job->driver->pause) {
+ job->driver->pause(job);
+ }
+
+ if (job_should_pause(job) && !job_is_cancelled(job)) {
+ JobStatus status = job->status;
+ job_state_transition(job, status == JOB_STATUS_READY
+ ? JOB_STATUS_STANDBY
+ : JOB_STATUS_PAUSED);
+ job->paused = true;
+ job_do_yield(job, -1);
+ job->paused = false;
+ job_state_transition(job, status);
+ }
+
+ if (job->driver->resume) {
+ job->driver->resume(job);
+ }
+}
+
+/**
+ * All jobs must allow a pause point before entering their job proper. This
+ * ensures that jobs can be paused prior to being started, then resumed later.
+ */
+static void coroutine_fn job_co_entry(void *opaque)
+{
+ Job *job = opaque;
+
+ assert(job && job->driver && job->driver->start);
+ job_pause_point(job);
+ job->driver->start(job);
+}
+
+
+void job_start(Job *job)
+{
+ assert(job && !job_started(job) && job->paused &&
+ job->driver && job->driver->start);
+ job->co = qemu_coroutine_create(job_co_entry, job);
+ job->pause_count--;
+ job->busy = true;
+ job->paused = false;
+ job_state_transition(job, JOB_STATUS_RUNNING);
+ aio_co_enter(job->aio_context, job->co);
+}
+
typedef struct {
Job *job;
JobDeferToMainLoopFn *fn;