aboutsummaryrefslogtreecommitdiff
path: root/job.c
diff options
context:
space:
mode:
authorKevin Wolf <kwolf@redhat.com>2018-04-23 16:06:26 +0200
committerKevin Wolf <kwolf@redhat.com>2018-05-23 14:30:51 +0200
commit7eaa8fb57da96301f4a8ce176ad503f80efc7cc0 (patch)
tree2450798efd3aa3104933bceeb7d44dfe1d8edf51 /job.c
parent62c9e4162a7bc26a1389e50d17d3b2637028bbc3 (diff)
job: Move transactions to Job
This moves the logic that implements job transactions from BlockJob to Job. Signed-off-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: Max Reitz <mreitz@redhat.com>
Diffstat (limited to 'job.c')
-rw-r--r--job.c234
1 files changed, 226 insertions, 8 deletions
diff --git a/job.c b/job.c
index aa74b4c03b..4f6fd73cd7 100644
--- a/job.c
+++ b/job.c
@@ -60,6 +60,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
[JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
};
+/* Transactional group of jobs */
+struct JobTxn {
+
+ /* Is this txn being cancelled? */
+ bool aborting;
+
+ /* List of jobs */
+ QLIST_HEAD(, Job) jobs;
+
+ /* Reference count */
+ int refcnt;
+};
+
/* Right now, this mutex is only needed to synchronize accesses to job->busy
* and job->sleep_timer, such as concurrent calls to job_do_yield and
* job_enter. */
@@ -80,6 +93,71 @@ static void __attribute__((__constructor__)) job_init(void)
qemu_mutex_init(&job_mutex);
}
+JobTxn *job_txn_new(void)
+{
+ JobTxn *txn = g_new0(JobTxn, 1);
+ QLIST_INIT(&txn->jobs);
+ txn->refcnt = 1;
+ return txn;
+}
+
+static void job_txn_ref(JobTxn *txn)
+{
+ txn->refcnt++;
+}
+
+void job_txn_unref(JobTxn *txn)
+{
+ if (txn && --txn->refcnt == 0) {
+ g_free(txn);
+ }
+}
+
+void job_txn_add_job(JobTxn *txn, Job *job)
+{
+ if (!txn) {
+ return;
+ }
+
+ assert(!job->txn);
+ job->txn = txn;
+
+ QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+ job_txn_ref(txn);
+}
+
+static void job_txn_del_job(Job *job)
+{
+ if (job->txn) {
+ QLIST_REMOVE(job, txn_list);
+ job_txn_unref(job->txn);
+ job->txn = NULL;
+ }
+}
+
+static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock)
+{
+ AioContext *ctx;
+ Job *job, *next;
+ int rc = 0;
+
+ QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
+ if (lock) {
+ ctx = job->aio_context;
+ aio_context_acquire(ctx);
+ }
+ rc = fn(job);
+ if (lock) {
+ aio_context_release(ctx);
+ }
+ if (rc) {
+ break;
+ }
+ }
+ return rc;
+}
+
+
/* TODO Make static once the whole state machine is in job.c */
void job_state_transition(Job *job, JobStatus s1)
{
@@ -181,8 +259,9 @@ static void job_sleep_timer_cb(void *opaque)
job_enter(job);
}
-void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
- int flags, BlockCompletionFunc *cb, void *opaque, Error **errp)
+void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
+ AioContext *ctx, int flags, BlockCompletionFunc *cb,
+ void *opaque, Error **errp)
{
Job *job;
@@ -228,6 +307,16 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
QLIST_INSERT_HEAD(&jobs, job, job_list);
+ /* Single jobs are modeled as single-job transactions for sake of
+ * consolidating the job management logic */
+ if (!txn) {
+ txn = job_txn_new();
+ job_txn_add_job(txn, job);
+ job_txn_unref(txn);
+ } else {
+ job_txn_add_job(txn, job);
+ }
+
return job;
}
@@ -241,6 +330,7 @@ void job_unref(Job *job)
if (--job->refcnt == 0) {
assert(job->status == JOB_STATUS_NULL);
assert(!timer_pending(&job->sleep_timer));
+ assert(!job->txn);
if (job->driver->free) {
job->driver->free(job);
@@ -263,7 +353,7 @@ void job_event_completed(Job *job)
notifier_list_notify(&job->on_finalize_completed, job);
}
-void job_event_pending(Job *job)
+static void job_event_pending(Job *job)
{
notifier_list_notify(&job->on_pending, job);
}
@@ -469,8 +559,7 @@ void job_do_dismiss(Job *job)
job->paused = false;
job->deferred_to_main_loop = true;
- /* TODO Don't assume it's a BlockJob */
- block_job_txn_del_job((BlockJob*) job);
+ job_txn_del_job(job);
job_state_transition(job, JOB_STATUS_NULL);
job_unref(job);
@@ -523,7 +612,7 @@ static void job_clean(Job *job)
}
}
-int job_finalize_single(Job *job)
+static int job_finalize_single(Job *job)
{
assert(job_is_completed(job));
@@ -550,12 +639,141 @@ int job_finalize_single(Job *job)
}
}
- /* TODO Don't assume it's a BlockJob */
- block_job_txn_del_job((BlockJob*) job);
+ job_txn_del_job(job);
job_conclude(job);
return 0;
}
+void job_cancel_async(Job *job, bool force)
+{
+ if (job->user_paused) {
+ /* Do not call job_enter here, the caller will handle it. */
+ job->user_paused = false;
+ if (job->driver->user_resume) {
+ job->driver->user_resume(job);
+ }
+ assert(job->pause_count > 0);
+ job->pause_count--;
+ }
+ job->cancelled = true;
+ /* To prevent 'force == false' overriding a previous 'force == true' */
+ job->force_cancel |= force;
+}
+
+void job_completed_txn_abort(Job *job)
+{
+ AioContext *ctx;
+ JobTxn *txn = job->txn;
+ Job *other_job;
+
+ if (txn->aborting) {
+ /*
+ * We are cancelled by another job, which will handle everything.
+ */
+ return;
+ }
+ txn->aborting = true;
+ job_txn_ref(txn);
+
+ /* We are the first failed job. Cancel other jobs. */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ ctx = other_job->aio_context;
+ aio_context_acquire(ctx);
+ }
+
+ /* Other jobs are effectively cancelled by us, set the status for
+ * them; this job, however, may or may not be cancelled, depending
+ * on the caller, so leave it. */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ if (other_job != job) {
+ job_cancel_async(other_job, false);
+ }
+ }
+ while (!QLIST_EMPTY(&txn->jobs)) {
+ other_job = QLIST_FIRST(&txn->jobs);
+ ctx = other_job->aio_context;
+ if (!job_is_completed(other_job)) {
+ assert(job_is_cancelled(other_job));
+ job_finish_sync(other_job, NULL, NULL);
+ }
+ job_finalize_single(other_job);
+ aio_context_release(ctx);
+ }
+
+ job_txn_unref(txn);
+}
+
+static int job_prepare(Job *job)
+{
+ if (job->ret == 0 && job->driver->prepare) {
+ job->ret = job->driver->prepare(job);
+ }
+ return job->ret;
+}
+
+static int job_needs_finalize(Job *job)
+{
+ return !job->auto_finalize;
+}
+
+static void job_do_finalize(Job *job)
+{
+ int rc;
+ assert(job && job->txn);
+
+ /* prepare the transaction to complete */
+ rc = job_txn_apply(job->txn, job_prepare, true);
+ if (rc) {
+ job_completed_txn_abort(job);
+ } else {
+ job_txn_apply(job->txn, job_finalize_single, true);
+ }
+}
+
+void job_finalize(Job *job, Error **errp)
+{
+ assert(job && job->id);
+ if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
+ return;
+ }
+ job_do_finalize(job);
+}
+
+static int job_transition_to_pending(Job *job)
+{
+ job_state_transition(job, JOB_STATUS_PENDING);
+ if (!job->auto_finalize) {
+ job_event_pending(job);
+ }
+ return 0;
+}
+
+void job_completed_txn_success(Job *job)
+{
+ JobTxn *txn = job->txn;
+ Job *other_job;
+
+ job_state_transition(job, JOB_STATUS_WAITING);
+
+ /*
+ * Successful completion, see if there are other running jobs in this
+ * txn.
+ */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ if (!job_is_completed(other_job)) {
+ return;
+ }
+ assert(other_job->ret == 0);
+ }
+
+ job_txn_apply(txn, job_transition_to_pending, false);
+
+ /* If no jobs need manual finalization, automatically do so */
+ if (job_txn_apply(txn, job_needs_finalize, false) == 0) {
+ job_do_finalize(job);
+ }
+}
+
void job_complete(Job *job, Error **errp)
{
/* Should not be reachable via external interface for internal jobs */