diff options
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r-- | kernel/workqueue.c | 953 |
1 files changed, 534 insertions, 419 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c index b80065a2450..31d8a4586d4 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -184,7 +184,8 @@ struct global_cwq { struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; /* L: hash of busy workers */ - struct worker_pool pools[2]; /* normal and highpri pools */ + struct worker_pool pools[NR_WORKER_POOLS]; + /* normal and highpri pools */ wait_queue_head_t rebind_hold; /* rebind hold wait */ } ____cacheline_aligned_in_smp; @@ -269,17 +270,15 @@ struct workqueue_struct { }; struct workqueue_struct *system_wq __read_mostly; -struct workqueue_struct *system_long_wq __read_mostly; -struct workqueue_struct *system_nrt_wq __read_mostly; -struct workqueue_struct *system_unbound_wq __read_mostly; -struct workqueue_struct *system_freezable_wq __read_mostly; -struct workqueue_struct *system_nrt_freezable_wq __read_mostly; EXPORT_SYMBOL_GPL(system_wq); +struct workqueue_struct *system_highpri_wq __read_mostly; +EXPORT_SYMBOL_GPL(system_highpri_wq); +struct workqueue_struct *system_long_wq __read_mostly; EXPORT_SYMBOL_GPL(system_long_wq); -EXPORT_SYMBOL_GPL(system_nrt_wq); +struct workqueue_struct *system_unbound_wq __read_mostly; EXPORT_SYMBOL_GPL(system_unbound_wq); +struct workqueue_struct *system_freezable_wq __read_mostly; EXPORT_SYMBOL_GPL(system_freezable_wq); -EXPORT_SYMBOL_GPL(system_nrt_freezable_wq); #define CREATE_TRACE_POINTS #include <trace/events/workqueue.h> @@ -534,18 +533,24 @@ static int work_next_color(int color) } /* - * A work's data points to the cwq with WORK_STRUCT_CWQ set while the - * work is on queue. Once execution starts, WORK_STRUCT_CWQ is - * cleared and the work data contains the cpu number it was last on. + * While queued, %WORK_STRUCT_CWQ is set and non flag bits of a work's data + * contain the pointer to the queued cwq. Once execution starts, the flag + * is cleared and the high bits contain OFFQ flags and CPU number. + * + * set_work_cwq(), set_work_cpu_and_clear_pending(), mark_work_canceling() + * and clear_work_data() can be used to set the cwq, cpu or clear + * work->data. These functions should only be called while the work is + * owned - ie. while the PENDING bit is set. * - * set_work_{cwq|cpu}() and clear_work_data() can be used to set the - * cwq, cpu or clear work->data. These functions should only be - * called while the work is owned - ie. while the PENDING bit is set. + * get_work_[g]cwq() can be used to obtain the gcwq or cwq corresponding to + * a work. gcwq is available once the work has been queued anywhere after + * initialization until it is sync canceled. cwq is available only while + * the work item is queued. * - * get_work_[g]cwq() can be used to obtain the gcwq or cwq - * corresponding to a work. gcwq is available once the work has been - * queued anywhere after initialization. cwq is available only from - * queueing until execution starts. + * %WORK_OFFQ_CANCELING is used to mark a work item which is being + * canceled. While being canceled, a work item may have its PENDING set + * but stay off timer and worklist for arbitrarily long and nobody should + * try to steal the PENDING bit. */ static inline void set_work_data(struct work_struct *work, unsigned long data, unsigned long flags) @@ -562,13 +567,22 @@ static void set_work_cwq(struct work_struct *work, WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); } -static void set_work_cpu(struct work_struct *work, unsigned int cpu) +static void set_work_cpu_and_clear_pending(struct work_struct *work, + unsigned int cpu) { - set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); + /* + * The following wmb is paired with the implied mb in + * test_and_set_bit(PENDING) and ensures all updates to @work made + * here are visible to and precede any updates by the next PENDING + * owner. + */ + smp_wmb(); + set_work_data(work, (unsigned long)cpu << WORK_OFFQ_CPU_SHIFT, 0); } static void clear_work_data(struct work_struct *work) { + smp_wmb(); /* see set_work_cpu_and_clear_pending() */ set_work_data(work, WORK_STRUCT_NO_CPU, 0); } @@ -591,7 +605,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work) return ((struct cpu_workqueue_struct *) (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq; - cpu = data >> WORK_STRUCT_FLAG_BITS; + cpu = data >> WORK_OFFQ_CPU_SHIFT; if (cpu == WORK_CPU_NONE) return NULL; @@ -599,6 +613,22 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work) return get_gcwq(cpu); } +static void mark_work_canceling(struct work_struct *work) +{ + struct global_cwq *gcwq = get_work_gcwq(work); + unsigned long cpu = gcwq ? gcwq->cpu : WORK_CPU_NONE; + + set_work_data(work, (cpu << WORK_OFFQ_CPU_SHIFT) | WORK_OFFQ_CANCELING, + WORK_STRUCT_PENDING); +} + +static bool work_is_canceling(struct work_struct *work) +{ + unsigned long data = atomic_long_read(&work->data); + + return !(data & WORK_STRUCT_CWQ) && (data & WORK_OFFQ_CANCELING); +} + /* * Policy functions. These define the policies on how the global worker * pools are managed. Unless noted otherwise, these functions assume that @@ -903,6 +933,194 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq, } /** + * move_linked_works - move linked works to a list + * @work: start of series of works to be scheduled + * @head: target list to append @work to + * @nextp: out paramter for nested worklist walking + * + * Schedule linked works starting from @work to @head. Work series to + * be scheduled starts at @work and includes any consecutive work with + * WORK_STRUCT_LINKED set in its predecessor. + * + * If @nextp is not NULL, it's updated to point to the next work of + * the last scheduled work. This allows move_linked_works() to be + * nested inside outer list_for_each_entry_safe(). + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void move_linked_works(struct work_struct *work, struct list_head *head, + struct work_struct **nextp) +{ + struct work_struct *n; + + /* + * Linked worklist will always end before the end of the list, + * use NULL for list head. + */ + list_for_each_entry_safe_from(work, n, NULL, entry) { + list_move_tail(&work->entry, head); + if (!(*work_data_bits(work) & WORK_STRUCT_LINKED)) + break; + } + + /* + * If we're already inside safe list traversal and have moved + * multiple works to the scheduled queue, the next position + * needs to be updated. + */ + if (nextp) + *nextp = n; +} + +static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq) +{ + struct work_struct *work = list_first_entry(&cwq->delayed_works, + struct work_struct, entry); + + trace_workqueue_activate_work(work); + move_linked_works(work, &cwq->pool->worklist, NULL); + __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work)); + cwq->nr_active++; +} + +/** + * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight + * @cwq: cwq of interest + * @color: color of work which left the queue + * @delayed: for a delayed work + * + * A work either has completed or is removed from pending queue, + * decrement nr_in_flight of its cwq and handle workqueue flushing. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color, + bool delayed) +{ + /* ignore uncolored works */ + if (color == WORK_NO_COLOR) + return; + + cwq->nr_in_flight[color]--; + + if (!delayed) { + cwq->nr_active--; + if (!list_empty(&cwq->delayed_works)) { + /* one down, submit a delayed one */ + if (cwq->nr_active < cwq->max_active) + cwq_activate_first_delayed(cwq); + } + } + + /* is flush in progress and are we at the flushing tip? */ + if (likely(cwq->flush_color != color)) + return; + + /* are there still in-flight works? */ + if (cwq->nr_in_flight[color]) + return; + + /* this cwq is done, clear flush_color */ + cwq->flush_color = -1; + + /* + * If this was the last cwq, wake up the first flusher. It + * will handle the rest. + */ + if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush)) + complete(&cwq->wq->first_flusher->done); +} + +/** + * try_to_grab_pending - steal work item from worklist and disable irq + * @work: work item to steal + * @is_dwork: @work is a delayed_work + * @flags: place to store irq state + * + * Try to grab PENDING bit of @work. This function can handle @work in any + * stable state - idle, on timer or on worklist. Return values are + * + * 1 if @work was pending and we successfully stole PENDING + * 0 if @work was idle and we claimed PENDING + * -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry + * -ENOENT if someone else is canceling @work, this state may persist + * for arbitrarily long + * + * On >= 0 return, the caller owns @work's PENDING bit. To avoid getting + * interrupted while holding PENDING and @work off queue, irq must be + * disabled on entry. This, combined with delayed_work->timer being + * irqsafe, ensures that we return -EAGAIN for finite short period of time. + * + * On successful return, >= 0, irq is disabled and the caller is + * responsible for releasing it using local_irq_restore(*@flags). + * + * This function is safe to call from any context including IRQ handler. + */ +static int try_to_grab_pending(struct work_struct *work, bool is_dwork, + unsigned long *flags) +{ + struct global_cwq *gcwq; + + WARN_ON_ONCE(in_irq()); + + local_irq_save(*flags); + + /* try to steal the timer if it exists */ + if (is_dwork) { + struct delayed_work *dwork = to_delayed_work(work); + + /* + * dwork->timer is irqsafe. If del_timer() fails, it's + * guaranteed that the timer is not queued anywhere and not + * running on the local CPU. + */ + if (likely(del_timer(&dwork->timer))) + return 1; + } + + /* try to claim PENDING the normal way */ + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) + return 0; + + /* + * The queueing is in progress, or it is already queued. Try to + * steal it from ->worklist without clearing WORK_STRUCT_PENDING. + */ + gcwq = get_work_gcwq(work); + if (!gcwq) + goto fail; + + spin_lock(&gcwq->lock); + if (!list_empty(&work->entry)) { + /* + * This work is queued, but perhaps we locked the wrong gcwq. + * In that case we must see the new value after rmb(), see + * insert_work()->wmb(). + */ + smp_rmb(); + if (gcwq == get_work_gcwq(work)) { + debug_work_deactivate(work); + list_del_init(&work->entry); + cwq_dec_nr_in_flight(get_work_cwq(work), + get_work_color(work), + *work_data_bits(work) & WORK_STRUCT_DELAYED); + + spin_unlock(&gcwq->lock); + return 1; + } + } + spin_unlock(&gcwq->lock); +fail: + local_irq_restore(*flags); + if (work_is_canceling(work)) + return -ENOENT; + cpu_relax(); + return -EAGAIN; +} + +/** * insert_work - insert a work into gcwq * @cwq: cwq @work belongs to * @work: work to insert @@ -982,7 +1200,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, struct cpu_workqueue_struct *cwq; struct list_head *worklist; unsigned int work_flags; - unsigned long flags; + unsigned int req_cpu = cpu; + + /* + * While a work item is PENDING && off queue, a task trying to + * steal the PENDING will busy-loop waiting for it to either get + * queued or lose PENDING. Grabbing PENDING and queueing should + * happen with IRQ disabled. + */ + WARN_ON_ONCE(!irqs_disabled()); debug_work_activate(work); @@ -995,21 +1221,22 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, if (!(wq->flags & WQ_UNBOUND)) { struct global_cwq *last_gcwq; - if (unlikely(cpu == WORK_CPU_UNBOUND)) + if (cpu == WORK_CPU_UNBOUND) cpu = raw_smp_processor_id(); /* - * It's multi cpu. If @wq is non-reentrant and @work - * was previously on a different cpu, it might still - * be running there, in which case the work needs to - * be queued on that cpu to guarantee non-reentrance. + * It's multi cpu. If @work was previously on a different + * cpu, it might still be running there, in which case the + * work needs to be queued on that cpu to guarantee + * non-reentrancy. */ gcwq = get_gcwq(cpu); - if (wq->flags & WQ_NON_REENTRANT && - (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { + last_gcwq = get_work_gcwq(work); + + if (last_gcwq && last_gcwq != gcwq) { struct worker *worker; - spin_lock_irqsave(&last_gcwq->lock, flags); + spin_lock(&last_gcwq->lock); worker = find_worker_executing_work(last_gcwq, work); @@ -1017,22 +1244,23 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, gcwq = last_gcwq; else { /* meh... not running there, queue here */ - spin_unlock_irqrestore(&last_gcwq->lock, flags); - spin_lock_irqsave(&gcwq->lock, flags); + spin_unlock(&last_gcwq->lock); + spin_lock(&gcwq->lock); } - } else - spin_lock_irqsave(&gcwq->lock, flags); + } else { + spin_lock(&gcwq->lock); + } } else { gcwq = get_gcwq(WORK_CPU_UNBOUND); - spin_lock_irqsave(&gcwq->lock, flags); + spin_lock(&gcwq->lock); } /* gcwq determined, get cwq and queue */ cwq = get_cwq(gcwq->cpu, wq); - trace_workqueue_queue_work(cpu, cwq, work); + trace_workqueue_queue_work(req_cpu, cwq, work); if (WARN_ON(!list_empty(&work->entry))) { - spin_unlock_irqrestore(&gcwq->lock, flags); + spin_unlock(&gcwq->lock); return; } @@ -1050,79 +1278,110 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, insert_work(cwq, work, worklist, work_flags); - spin_unlock_irqrestore(&gcwq->lock, flags); + spin_unlock(&gcwq->lock); } /** - * queue_work - queue work on a workqueue + * queue_work_on - queue work on specific cpu + * @cpu: CPU number to execute work on * @wq: workqueue to use * @work: work to queue * - * Returns 0 if @work was already on a queue, non-zero otherwise. + * Returns %false if @work was already on a queue, %true otherwise. * - * We queue the work to the CPU on which it was submitted, but if the CPU dies - * it can be processed by another CPU. + * We queue the work to a specific CPU, the caller must ensure it + * can't go away. */ -int queue_work(struct workqueue_struct *wq, struct work_struct *work) +bool queue_work_on(int cpu, struct workqueue_struct *wq, + struct work_struct *work) { - int ret; + bool ret = false; + unsigned long flags; - ret = queue_work_on(get_cpu(), wq, work); - put_cpu(); + local_irq_save(flags); + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { + __queue_work(cpu, wq, work); + ret = true; + } + + local_irq_restore(flags); return ret; } -EXPORT_SYMBOL_GPL(queue_work); +EXPORT_SYMBOL_GPL(queue_work_on); /** - * queue_work_on - queue work on specific cpu - * @cpu: CPU number to execute work on + * queue_work - queue work on a workqueue * @wq: workqueue to use * @work: work to queue * - * Returns 0 if @work was already on a queue, non-zero otherwise. + * Returns %false if @work was already on a queue, %true otherwise. * - * We queue the work to a specific CPU, the caller must ensure it - * can't go away. + * We queue the work to the CPU on which it was submitted, but if the CPU dies + * it can be processed by another CPU. */ -int -queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) +bool queue_work(struct workqueue_struct *wq, struct work_struct *work) { - int ret = 0; - - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { - __queue_work(cpu, wq, work); - ret = 1; - } - return ret; + return queue_work_on(WORK_CPU_UNBOUND, wq, work); } -EXPORT_SYMBOL_GPL(queue_work_on); +EXPORT_SYMBOL_GPL(queue_work); -static void delayed_work_timer_fn(unsigned long __data) +void delayed_work_timer_fn(unsigned long __data) { struct delayed_work *dwork = (struct delayed_work *)__data; struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); - __queue_work(smp_processor_id(), cwq->wq, &dwork->work); + /* should have been called from irqsafe timer with irq already off */ + __queue_work(dwork->cpu, cwq->wq, &dwork->work); } +EXPORT_SYMBOL_GPL(delayed_work_timer_fn); -/** - * queue_delayed_work - queue work on a workqueue after delay - * @wq: workqueue to use - * @dwork: delayable work to queue - * @delay: number of jiffies to wait before queueing - * - * Returns 0 if @work was already on a queue, non-zero otherwise. - */ -int queue_delayed_work(struct workqueue_struct *wq, - struct delayed_work *dwork, unsigned long delay) +static void __queue_delayed_work(int cpu, struct workqueue_struct *wq, + struct delayed_work *dwork, unsigned long delay) { - if (delay == 0) - return queue_work(wq, &dwork->work); + struct timer_list *timer = &dwork->timer; + struct work_struct *work = &dwork->work; + unsigned int lcpu; + + WARN_ON_ONCE(timer->function != delayed_work_timer_fn || + timer->data != (unsigned long)dwork); + BUG_ON(timer_pending(timer)); + BUG_ON(!list_empty(&work->entry)); + + timer_stats_timer_set_start_info(&dwork->timer); - return queue_delayed_work_on(-1, wq, dwork, delay); + /* + * This stores cwq for the moment, for the timer_fn. Note that the + * work's gcwq is preserved to allow reentrance detection for + * delayed works. + */ + if (!(wq->flags & WQ_UNBOUND)) { + struct global_cwq *gcwq = get_work_gcwq(work); + + /* + * If we cannot get the last gcwq from @work directly, + * select the last CPU such that it avoids unnecessarily + * triggering non-reentrancy check in __queue_work(). + */ + lcpu = cpu; + if (gcwq) + lcpu = gcwq->cpu; + if (lcpu == WORK_CPU_UNBOUND) + lcpu = raw_smp_processor_id(); + } else { + lcpu = WORK_CPU_UNBOUND; + } + + set_work_cwq(work, get_cwq(lcpu, wq), 0); + + dwork->cpu = cpu; + timer->expires = jiffies + delay; + + if (unlikely(cpu != WORK_CPU_UNBOUND)) + add_timer_on(timer, cpu); + else + add_timer(timer); } -EXPORT_SYMBOL_GPL(queue_delayed_work); /** * queue_delayed_work_on - queue work on specific CPU after delay @@ -1131,53 +1390,100 @@ EXPORT_SYMBOL_GPL(queue_delayed_work); * @dwork: work to queue * @delay: number of jiffies to wait before queueing * - * Returns 0 if @work was already on a queue, non-zero otherwise. + * Returns %false if @work was already on a queue, %true otherwise. If + * @delay is zero and @dwork is idle, it will be scheduled for immediate + * execution. */ -int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, - struct delayed_work *dwork, unsigned long delay) +bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq, + struct delayed_work *dwork, unsigned long delay) { - int ret = 0; - struct timer_list *timer = &dwork->timer; struct work_struct *work = &dwork->work; + bool ret = false; + unsigned long flags; - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { - unsigned int lcpu; + if (!delay) + return queue_work_on(cpu, wq, &dwork->work); - BUG_ON(timer_pending(timer)); - BUG_ON(!list_empty(&work->entry)); + /* read the comment in __queue_work() */ + local_irq_save(flags); - timer_stats_timer_set_start_info(&dwork->timer); + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { + __queue_delayed_work(cpu, wq, dwork, delay); + ret = true; + } - /* - * This stores cwq for the moment, for the timer_fn. - * Note that the work's gcwq is preserved to allow - * reentrance detection for delayed works. - */ - if (!(wq->flags & WQ_UNBOUND)) { - struct global_cwq *gcwq = get_work_gcwq(work); + local_irq_restore(flags); + return ret; +} +EXPORT_SYMBOL_GPL(queue_delayed_work_on); - if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND) - lcpu = gcwq->cpu; - else - lcpu = raw_smp_processor_id(); - } else - lcpu = WORK_CPU_UNBOUND; +/** + * queue_delayed_work - queue work on a workqueue after delay + * @wq: workqueue to use + * @dwork: delayable work to queue + * @delay: number of jiffies to wait before queueing + * + * Equivalent to queue_delayed_work_on() but tries to use the local CPU. + */ +bool queue_delayed_work(struct workqueue_struct *wq, + struct delayed_work *dwork, unsigned long delay) +{ + return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay); +} +EXPORT_SYMBOL_GPL(queue_delayed_work); - set_work_cwq(work, get_cwq(lcpu, wq), 0); +/** + * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU + * @cpu: CPU number to execute work on + * @wq: workqueue to use + * @dwork: work to queue + * @delay: number of jiffies to wait before queueing + * + * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise, + * modify @dwork's timer so that it expires after @delay. If @delay is + * zero, @work is guaranteed to be scheduled immediately regardless of its + * current state. + * + * Returns %false if @dwork was idle and queued, %true if @dwork was + * pending and its timer was modified. + * + * This function is safe to call from any context including IRQ handler. + * See try_to_grab_pending() for details. + */ +bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq, + struct delayed_work *dwork, unsigned long delay) +{ + unsigned long flags; + int ret; - timer->expires = jiffies + delay; - timer->data = (unsigned long)dwork; - timer->function = delayed_work_timer_fn; + do { + ret = try_to_grab_pending(&dwork->work, true, &flags); + } while (unlikely(ret == -EAGAIN)); - if (unlikely(cpu >= 0)) - add_timer_on(timer, cpu); - else - add_timer(timer); - ret = 1; + if (likely(ret >= 0)) { + __queue_delayed_work(cpu, wq, dwork, delay); + local_irq_restore(flags); } + + /* -ENOENT from try_to_grab_pending() becomes %true */ return ret; } -EXPORT_SYMBOL_GPL(queue_delayed_work_on); +EXPORT_SYMBOL_GPL(mod_delayed_work_on); + +/** + * mod_delayed_work - modify delay of or queue a delayed work + * @wq: workqueue to use + * @dwork: work to queue + * @delay: number of jiffies to wait before queueing + * + * mod_delayed_work_on() on local CPU. + */ +bool mod_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork, + unsigned long delay) +{ + return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay); +} +EXPORT_SYMBOL_GPL(mod_delayed_work); /** * worker_enter_idle - enter idle state @@ -1442,8 +1748,9 @@ retry: /* all idle workers are rebound, rebind busy workers */ for_each_busy_worker(worker, i, pos, gcwq) { - struct work_struct *rebind_work = &worker->rebind_work; unsigned long worker_flags = worker->flags; + struct work_struct *rebind_work = &worker->rebind_work; + struct workqueue_struct *wq; /* morph UNBOUND to REBIND atomically */ worker_flags &= ~WORKER_UNBOUND; @@ -1454,11 +1761,20 @@ retry: work_data_bits(rebind_work))) continue; - /* wq doesn't matter, use the default one */ debug_work_activate(rebind_work); - insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work, - worker->scheduled.next, - work_color_to_flags(WORK_NO_COLOR)); + + /* + * wq doesn't really matter but let's keep @worker->pool + * and @cwq->pool consistent for sanity. + */ + if (worker_pool_pri(worker->pool)) + wq = system_highpri_wq; + else + wq = system_wq; + + insert_work(get_cwq(gcwq->cpu, wq), rebind_work, + worker->scheduled.next, + work_color_to_flags(WORK_NO_COLOR)); } /* @@ -1887,107 +2203,6 @@ static bool manage_workers(struct worker *worker) } /** - * move_linked_works - move linked works to a list - * @work: start of series of works to be scheduled - * @head: target list to append @work to - * @nextp: out paramter for nested worklist walking - * - * Schedule linked works starting from @work to @head. Work series to - * be scheduled starts at @work and includes any consecutive work with - * WORK_STRUCT_LINKED set in its predecessor. - * - * If @nextp is not NULL, it's updated to point to the next work of - * the last scheduled work. This allows move_linked_works() to be - * nested inside outer list_for_each_entry_safe(). - * - * CONTEXT: - * spin_lock_irq(gcwq->lock). - */ -static void move_linked_works(struct work_struct *work, struct list_head *head, - struct work_struct **nextp) -{ - struct work_struct *n; - - /* - * Linked worklist will always end before the end of the list, - * use NULL for list head. - */ - list_for_each_entry_safe_from(work, n, NULL, entry) { - list_move_tail(&work->entry, head); - if (!(*work_data_bits(work) & WORK_STRUCT_LINKED)) - break; - } - - /* - * If we're already inside safe list traversal and have moved - * multiple works to the scheduled queue, the next position - * needs to be updated. - */ - if (nextp) - *nextp = n; -} - -static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq) -{ - struct work_struct *work = list_first_entry(&cwq->delayed_works, - struct work_struct, entry); - - trace_workqueue_activate_work(work); - move_linked_works(work, &cwq->pool->worklist, NULL); - __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work)); - cwq->nr_active++; -} - -/** - * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight - * @cwq: cwq of interest - * @color: color of work which left the queue - * @delayed: for a delayed work - * - * A work either has completed or is removed from pending queue, - * decrement nr_in_flight of its cwq and handle workqueue flushing. - * - * CONTEXT: - * spin_lock_irq(gcwq->lock). - */ -static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color, - bool delayed) -{ - /* ignore uncolored works */ - if (color == WORK_NO_COLOR) - return; - - cwq->nr_in_flight[color]--; - - if (!delayed) { - cwq->nr_active--; - if (!list_empty(&cwq->delayed_works)) { - /* one down, submit a delayed one */ - if (cwq->nr_active < cwq->max_active) - cwq_activate_first_delayed(cwq); - } - } - - /* is flush in progress and are we at the flushing tip? */ - if (likely(cwq->flush_color != color)) - return; - - /* are there still in-flight works? */ - if (cwq->nr_in_flight[color]) - return; - - /* this cwq is done, clear flush_color */ - cwq->flush_color = -1; - - /* - * If this was the last cwq, wake up the first flusher. It - * will handle the rest. - */ - if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush)) - complete(&cwq->wq->first_flusher->done); -} - -/** * process_one_work - process single work * @worker: self * @work: work to process @@ -2046,15 +2261,13 @@ __acquires(&gcwq->lock) return; } - /* claim and process */ + /* claim and dequeue */ debug_work_deactivate(work); hlist_add_head(&worker->hentry, bwh); worker->current_work = work; worker->current_cwq = cwq; work_color = get_work_color(work); - /* record the current cpu number in the work data and dequeue */ - set_work_cpu(work, gcwq->cpu); list_del_init(&work->entry); /* @@ -2071,9 +2284,16 @@ __acquires(&gcwq->lock) if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool)) wake_up_worker(pool); + /* + * Record the last CPU and clear PENDING which should be the last + * update to @work. Also, do this inside @gcwq->lock so that + * PENDING and queued state changes happen together while IRQ is + * disabled. + */ + set_work_cpu_and_clear_pending(work, gcwq->cpu); + spin_unlock_irq(&gcwq->lock); - work_clear_pending(work); lock_map_acquire_read(&cwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); trace_workqueue_execute_start(work); @@ -2087,11 +2307,9 @@ __acquires(&gcwq->lock) lock_map_release(&cwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { - printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " - "%s/0x%08x/%d\n", - current->comm, preempt_count(), task_pid_nr(current)); - printk(KERN_ERR " last function: "); - print_symbol("%s\n", (unsigned long)f); + pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n" + " last function: %pf\n", + current->comm, preempt_count(), task_pid_nr(current), f); debug_show_held_locks(current); dump_stack(); } @@ -2645,8 +2863,8 @@ reflush: if (++flush_cnt == 10 || (flush_cnt % 100 == 0 && flush_cnt <= 1000)) - pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n", - wq->name, flush_cnt); + pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n", + wq->name, flush_cnt); goto reflush; } @@ -2657,8 +2875,7 @@ reflush: } EXPORT_SYMBOL_GPL(drain_workqueue); -static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr, - bool wait_executing) +static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr) { struct worker *worker = NULL; struct global_cwq *gcwq; @@ -2680,13 +2897,12 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr, cwq = get_work_cwq(work); if (unlikely(!cwq || gcwq != cwq->pool->gcwq)) goto already_gone; - } else if (wait_executing) { + } else { worker = find_worker_executing_work(gcwq, work); if (!worker) goto already_gone; cwq = worker->current_cwq; - } else - goto already_gone; + } insert_wq_barrier(cwq, barr, work, worker); spin_unlock_irq(&gcwq->lock); @@ -2713,15 +2929,8 @@ already_gone: * flush_work - wait for a work to finish executing the last queueing instance * @work: the work to flush * - * Wait until @work has finished execution. This function considers - * only the last queueing instance of @work. If @work has been - * enqueued across different CPUs on a non-reentrant workqueue or on - * multiple workqueues, @work might still be executing on return on - * some of the CPUs from earlier queueing. - * - * If @work was queued only on a non-reentrant, ordered or unbound - * workqueue, @work is guaranteed to be idle on return if it hasn't - * been requeued since flush started. + * Wait until @work has finished execution. @work is guaranteed to be idle + * on return if it hasn't been requeued since flush started. * * RETURNS: * %true if flush_work() waited for the work to finish execution, @@ -2734,140 +2943,36 @@ bool flush_work(struct work_struct *work) lock_map_acquire(&work->lockdep_map); lock_map_release(&work->lockdep_map); - if (start_flush_work(work, &barr, true)) { + if (start_flush_work(work, &barr)) { wait_for_completion(&barr.done); destroy_work_on_stack(&barr.work); return true; - } else - return false; -} -EXPORT_SYMBOL_GPL(flush_work); - -static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work) -{ - struct wq_barrier barr; - struct worker *worker; - - spin_lock_irq(&gcwq->lock); - - worker = find_worker_executing_work(gcwq, work); - if (unlikely(worker)) - insert_wq_barrier(worker->current_cwq, &barr, work, worker); - - spin_unlock_irq(&gcwq->lock); - - if (unlikely(worker)) { - wait_for_completion(&barr.done); - destroy_work_on_stack(&barr.work); - return true; - } else + } else { return false; -} - -static bool wait_on_work(struct work_struct *work) -{ - bool ret = false; - int cpu; - - might_sleep(); - - lock_map_acquire(&work->lockdep_map); - lock_map_release(&work->lockdep_map); - - for_each_gcwq_cpu(cpu) - ret |= wait_on_cpu_work(get_gcwq(cpu), work); - return ret; -} - -/** - * flush_work_sync - wait until a work has finished execution - * @work: the work to flush - * - * Wait until @work has finished execution. On return, it's - * guaranteed that all queueing instances of @work which happened - * before this function is called are finished. In other words, if - * @work hasn't been requeued since this function was called, @work is - * guaranteed to be idle on return. - * - * RETURNS: - * %true if flush_work_sync() waited for the work to finish execution, - * %false if it was already idle. - */ -bool flush_work_sync(struct work_struct *work) -{ - struct wq_barrier barr; - bool pending, waited; - - /* we'll wait for executions separately, queue barr only if pending */ - pending = start_flush_work(work, &barr, false); - - /* wait for executions to finish */ - waited = wait_on_work(work); - - /* wait for the pending one */ - if (pending) { - wait_for_completion(&barr.done); - destroy_work_on_stack(&barr.work); - } - - return pending || waited; -} -EXPORT_SYMBOL_GPL(flush_work_sync); - -/* - * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, - * so this work can't be re-armed in any way. - */ -static int try_to_grab_pending(struct work_struct *work) -{ - struct global_cwq *gcwq; - int ret = -1; - - if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) - return 0; - - /* - * The queueing is in progress, or it is already queued. Try to - * steal it from ->worklist without clearing WORK_STRUCT_PENDING. - */ - gcwq = get_work_gcwq(work); - if (!gcwq) - return ret; - - spin_lock_irq(&gcwq->lock); - if (!list_empty(&work->entry)) { - /* - * This work is queued, but perhaps we locked the wrong gcwq. - * In that case we must see the new value after rmb(), see - * insert_work()->wmb(). - */ - smp_rmb(); - if (gcwq == get_work_gcwq(work)) { - debug_work_deactivate(work); - list_del_init(&work->entry); - cwq_dec_nr_in_flight(get_work_cwq(work), - get_work_color(work), - *work_data_bits(work) & WORK_STRUCT_DELAYED); - ret = 1; - } } - spin_unlock_irq(&gcwq->lock); - - return ret; } +EXPORT_SYMBOL_GPL(flush_work); -static bool __cancel_work_timer(struct work_struct *work, - struct timer_list* timer) +static bool __cancel_work_timer(struct work_struct *work, bool is_dwork) { + unsigned long flags; int ret; do { - ret = (timer && likely(del_timer(timer))); - if (!ret) - ret = try_to_grab_pending(work); - wait_on_work(work); + ret = try_to_grab_pending(work, is_dwork, &flags); + /* + * If someone else is canceling, wait for the same event it + * would be waiting for before retrying. + */ + if (unlikely(ret == -ENOENT)) + flush_work(work); } while (unlikely(ret < 0)); + /* tell other tasks trying to grab @work to back off */ + mark_work_canceling(work); + local_irq_restore(flags); + + flush_work(work); clear_work_data(work); return ret; } @@ -2892,7 +2997,7 @@ static bool __cancel_work_timer(struct work_struct *work, */ bool cancel_work_sync(struct work_struct *work) { - return __cancel_work_timer(work, NULL); + return __cancel_work_timer(work, false); } EXPORT_SYMBOL_GPL(cancel_work_sync); @@ -2910,33 +3015,44 @@ EXPORT_SYMBOL_GPL(cancel_work_sync); */ bool flush_delayed_work(struct delayed_work *dwork) { + local_irq_disable(); if (del_timer_sync(&dwork->timer)) - __queue_work(raw_smp_processor_id(), + __queue_work(dwork->cpu, get_work_cwq(&dwork->work)->wq, &dwork->work); + local_irq_enable(); return flush_work(&dwork->work); } EXPORT_SYMBOL(flush_delayed_work); /** - * flush_delayed_work_sync - wait for a dwork to finish - * @dwork: the delayed work to flush + * cancel_delayed_work - cancel a delayed work + * @dwork: delayed_work to cancel * - * Delayed timer is cancelled and the pending work is queued for - * execution immediately. Other than timer handling, its behavior - * is identical to flush_work_sync(). + * Kill off a pending delayed_work. Returns %true if @dwork was pending + * and canceled; %false if wasn't pending. Note that the work callback + * function may still be running on return, unless it returns %true and the + * work doesn't re-arm itself. Explicitly flush or use + * cancel_delayed_work_sync() to wait on it. * - * RETURNS: - * %true if flush_work_sync() waited for the work to finish execution, - * %false if it was already idle. + * This function is safe to call from any context including IRQ handler. */ -bool flush_delayed_work_sync(struct delayed_work *dwork) +bool cancel_delayed_work(struct delayed_work *dwork) { - if (del_timer_sync(&dwork->timer)) - __queue_work(raw_smp_processor_id(), - get_work_cwq(&dwork->work)->wq, &dwork->work); - return flush_work_sync(&dwork->work); + unsigned long flags; + int ret; + + do { + ret = try_to_grab_pending(&dwork->work, true, &flags); + } while (unlikely(ret == -EAGAIN)); + + if (unlikely(ret < 0)) + return false; + + set_work_cpu_and_clear_pending(&dwork->work, work_cpu(&dwork->work)); + local_irq_restore(flags); + return true; } -EXPORT_SYMBOL(flush_delayed_work_sync); +EXPORT_SYMBOL(cancel_delayed_work); /** * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish @@ -2949,54 +3065,39 @@ EXPORT_SYMBOL(flush_delayed_work_sync); */ bool cancel_delayed_work_sync(struct delayed_work *dwork) { - return __cancel_work_timer(&dwork->work, &dwork->timer); + return __cancel_work_timer(&dwork->work, true); } EXPORT_SYMBOL(cancel_delayed_work_sync); /** - * schedule_work - put work task in global workqueue - * @work: job to be done - * - * Returns zero if @work was already on the kernel-global workqueue and - * non-zero otherwise. - * - * This puts a job in the kernel-global workqueue if it was not already - * queued and leaves it in the same position on the kernel-global - * workqueue otherwise. - */ -int schedule_work(struct work_struct *work) -{ - return queue_work(system_wq, work); -} -EXPORT_SYMBOL(schedule_work); - -/* * schedule_work_on - put work task on a specific cpu * @cpu: cpu to put the work task on * @work: job to be done * * This puts a job on a specific cpu */ -int schedule_work_on(int cpu, struct work_struct *work) +bool schedule_work_on(int cpu, struct work_struct *work) { return queue_work_on(cpu, system_wq, work); } EXPORT_SYMBOL(schedule_work_on); /** - * schedule_delayed_work - put work task in global workqueue after delay - * @dwork: job to be done - * @delay: number of jiffies to wait or 0 for immediate execution + * schedule_work - put work task in global workqueue + * @work: job to be done * - * After waiting for a given time this puts a job in the kernel-global - * workqueue. + * Returns %false if @work was already on the kernel-global workqueue and + * %true otherwise. + * + * This puts a job in the kernel-global workqueue if it was not already + * queued and leaves it in the same position on the kernel-global + * workqueue otherwise. */ -int schedule_delayed_work(struct delayed_work *dwork, - unsigned long delay) +bool schedule_work(struct work_struct *work) { - return queue_delayed_work(system_wq, dwork, delay); + return queue_work(system_wq, work); } -EXPORT_SYMBOL(schedule_delayed_work); +EXPORT_SYMBOL(schedule_work); /** * schedule_delayed_work_on - queue work in global workqueue on CPU after delay @@ -3007,14 +3108,28 @@ EXPORT_SYMBOL(schedule_delayed_work); * After waiting for a given time this puts a job in the kernel-global * workqueue on the specified CPU. */ -int schedule_delayed_work_on(int cpu, - struct delayed_work *dwork, unsigned long delay) +bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork, + unsigned long delay) { return queue_delayed_work_on(cpu, system_wq, dwork, delay); } EXPORT_SYMBOL(schedule_delayed_work_on); /** + * schedule_delayed_work - put work task in global workqueue after delay + * @dwork: job to be done + * @delay: number of jiffies to wait or 0 for immediate execution + * + * After waiting for a given time this puts a job in the kernel-global + * workqueue. + */ +bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay) +{ + return queue_delayed_work(system_wq, dwork, delay); +} +EXPORT_SYMBOL(schedule_delayed_work); + +/** * schedule_on_each_cpu - execute a function synchronously on each online CPU * @func: the function to call * @@ -3161,9 +3276,8 @@ static int wq_clamp_max_active(int max_active, unsigned int flags, int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; if (max_active < 1 || max_active > lim) - printk(KERN_WARNING "workqueue: max_active %d requested for %s " - "is out of range, clamping between %d and %d\n", - max_active, name, 1, lim); + pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n", + max_active, name, 1, lim); return clamp_val(max_active, 1, lim); } @@ -3566,7 +3680,7 @@ static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb, case CPU_DOWN_PREPARE: /* unbinding should happen on the local CPU */ INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn); - schedule_work_on(cpu, &unbind_work); + queue_work_on(cpu, system_highpri_wq, &unbind_work); flush_work(&unbind_work); break; } @@ -3768,6 +3882,10 @@ static int __init init_workqueues(void) unsigned int cpu; int i; + /* make sure we have enough bits for OFFQ CPU number */ + BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_CPU_SHIFT)) < + WORK_CPU_LAST); + cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); @@ -3822,17 +3940,14 @@ static int __init init_workqueues(void) } system_wq = alloc_workqueue("events", 0, 0); + system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); - system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); - system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable", - WQ_NON_REENTRANT | WQ_FREEZABLE, 0); - BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq || - !system_unbound_wq || !system_freezable_wq || - !system_nrt_freezable_wq); + BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || + !system_unbound_wq || !system_freezable_wq); return 0; } early_initcall(init_workqueues); |