path: root/net
diff options
authorTrond Myklebust <Trond.Myklebust@netapp.com>2011-02-21 11:05:41 -0800
committerTrond Myklebust <Trond.Myklebust@netapp.com>2011-03-10 15:04:52 -0500
commitbf294b41cefcb22fc3139e0f42c5b3f06728bd5e (patch)
tree250251c040a2d2e278b5a2ddd03c8d20a27be129 /net
parent214d93b02c4fe93638ad268613c9702a81ed9192 (diff)
SUNRPC: Close a race in __rpc_wait_for_completion_task()
Although they run as rpciod background tasks, under normal operation (i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck() and nfs4_do_close() want to be fully synchronous. This means that when we exit, we want all references to the rpc_task to be gone, and we want any dentry references etc. held by that task to be released. For this reason these functions call __rpc_wait_for_completion_task(), followed by rpc_put_task() in the expectation that the latter will be releasing the last reference to the rpc_task, and thus ensuring that the callback_ops->rpc_release() has been called synchronously. This patch fixes a race which exists due to the fact that rpciod calls rpc_complete_task() (in order to wake up the callers of __rpc_wait_for_completion_task()) and then subsequently calls rpc_put_task() without ensuring that these two steps are done atomically. In order to avoid adding new spin locks, the patch uses the existing waitqueue spin lock to order the rpc_task reference count releases between the waiting process and rpciod. The common case where nobody is waiting for completion is optimised for by checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task reference count is 1: in those cases we drop trying to grab the spin lock, and immediately free up the rpc_task. Those few processes that need to put the rpc_task from inside an asynchronous context and that do not care about ordering are given a new helper: rpc_put_task_async(). Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net')
1 files changed, 61 insertions, 14 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 243fc09b164e..59e599498e37 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)
* Mark an RPC call as having completed by clearing the 'active' bit
+ * and then waking up all tasks that were sleeping.
-static void rpc_mark_complete_task(struct rpc_task *task)
+static int rpc_complete_task(struct rpc_task *task)
- smp_mb__before_clear_bit();
+ void *m = &task->tk_runstate;
+ wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
+ struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
+ unsigned long flags;
+ int ret;
+ spin_lock_irqsave(&wq->lock, flags);
clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
- smp_mb__after_clear_bit();
- wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+ ret = atomic_dec_and_test(&task->tk_count);
+ if (waitqueue_active(wq))
+ __wake_up_locked_key(wq, TASK_NORMAL, &k);
+ spin_unlock_irqrestore(&wq->lock, flags);
+ return ret;
* Allow callers to wait for completion of an RPC call
+ *
+ * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
+ * to enforce taking of the wq->lock and hence avoid races with
+ * rpc_complete_task().
int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
if (action == NULL)
action = rpc_wait_bit_killable;
- return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+ return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
@@ -857,34 +871,67 @@ static void rpc_async_release(struct work_struct *work)
rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
-void rpc_put_task(struct rpc_task *task)
+static void rpc_release_resources_task(struct rpc_task *task)
- if (!atomic_dec_and_test(&task->tk_count))
- return;
- /* Release resources */
if (task->tk_rqstp)
if (task->tk_msg.rpc_cred)
- if (task->tk_workqueue != NULL) {
+static void rpc_final_put_task(struct rpc_task *task,
+ struct workqueue_struct *q)
+ if (q != NULL) {
INIT_WORK(&task->u.tk_work, rpc_async_release);
- queue_work(task->tk_workqueue, &task->u.tk_work);
+ queue_work(q, &task->u.tk_work);
} else
+static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
+ if (atomic_dec_and_test(&task->tk_count)) {
+ rpc_release_resources_task(task);
+ rpc_final_put_task(task, q);
+ }
+void rpc_put_task(struct rpc_task *task)
+ rpc_do_put_task(task, NULL);
+void rpc_put_task_async(struct rpc_task *task)
+ rpc_do_put_task(task, task->tk_workqueue);
static void rpc_release_task(struct rpc_task *task)
dprintk("RPC: %5u release task\n", task->tk_pid);
- /* Wake up anyone who is waiting for task completion */
- rpc_mark_complete_task(task);
+ rpc_release_resources_task(task);
- rpc_put_task(task);
+ /*
+ * Note: at this point we have been removed from rpc_clnt->cl_tasks,
+ * so it should be safe to use task->tk_count as a test for whether
+ * or not any other processes still hold references to our rpc_task.
+ */
+ if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
+ /* Wake up anyone who may be waiting for task completion */
+ if (!rpc_complete_task(task))
+ return;
+ } else {
+ if (!atomic_dec_and_test(&task->tk_count))
+ return;
+ }
+ rpc_final_put_task(task, task->tk_workqueue);
int rpciod_up(void)