aboutsummaryrefslogtreecommitdiff
path: root/util/rcu.c
diff options
context:
space:
mode:
Diffstat (limited to 'util/rcu.c')
-rw-r--r--util/rcu.c114
1 files changed, 75 insertions, 39 deletions
diff --git a/util/rcu.c b/util/rcu.c
index 13ac0f75cb..fa32c942e4 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -46,6 +46,7 @@
unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
QemuEvent rcu_gp_event;
+static int in_drain_call_rcu;
static QemuMutex rcu_registry_lock;
static QemuMutex rcu_sync_lock;
@@ -64,7 +65,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr)
/* Written to only by each individual reader. Read by both the reader and the
* writers.
*/
-__thread struct rcu_reader_data rcu_reader;
+QEMU_DEFINE_CO_TLS(struct rcu_reader_data, rcu_reader)
/* Protected by rcu_registry_lock. */
typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
@@ -82,12 +83,6 @@ static void wait_for_readers(void)
*/
qemu_event_reset(&rcu_gp_event);
- /* Instead of using qatomic_mb_set for index->waiting, and
- * qatomic_mb_read for index->ctr, memory barriers are placed
- * manually since writes to different threads are independent.
- * qemu_event_reset has acquire semantics, so no memory barrier
- * is needed here.
- */
QLIST_FOREACH(index, &registry, node) {
qatomic_set(&index->waiting, true);
}
@@ -95,6 +90,10 @@ static void wait_for_readers(void)
/* Here, order the stores to index->waiting before the loads of
* index->ctr. Pairs with smp_mb_placeholder() in rcu_read_unlock(),
* ensuring that the loads of index->ctr are sequentially consistent.
+ *
+ * If this is the last iteration, this barrier also prevents
+ * frees from seeping upwards, and orders the two wait phases
+ * on architectures with 32-bit longs; see synchronize_rcu().
*/
smp_mb_global();
@@ -103,10 +102,12 @@ static void wait_for_readers(void)
QLIST_REMOVE(index, node);
QLIST_INSERT_HEAD(&qsreaders, index, node);
- /* No need for mb_set here, worst of all we
+ /* No need for memory barriers here, worst of all we
* get some extra futex wakeups.
*/
qatomic_set(&index->waiting, false);
+ } else if (qatomic_read(&in_drain_call_rcu)) {
+ notifier_list_notify(&index->force_rcu, NULL);
}
}
@@ -146,26 +147,26 @@ void synchronize_rcu(void)
/* Write RCU-protected pointers before reading p_rcu_reader->ctr.
* Pairs with smp_mb_placeholder() in rcu_read_lock().
+ *
+ * Also orders write to RCU-protected pointers before
+ * write to rcu_gp_ctr.
*/
smp_mb_global();
QEMU_LOCK_GUARD(&rcu_registry_lock);
if (!QLIST_EMPTY(&registry)) {
- /* In either case, the qatomic_mb_set below blocks stores that free
- * old RCU-protected pointers.
- */
if (sizeof(rcu_gp_ctr) < 8) {
/* For architectures with 32-bit longs, a two-subphases algorithm
* ensures we do not encounter overflow bugs.
*
* Switch parity: 0 -> 1, 1 -> 0.
*/
- qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
wait_for_readers();
- qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
} else {
/* Increment current grace period. */
- qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+ qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
}
wait_for_readers();
@@ -188,8 +189,22 @@ static void enqueue(struct rcu_head *node)
struct rcu_head **old_tail;
node->next = NULL;
+
+ /*
+ * Make this node the tail of the list. The node will be
+ * used by further enqueue operations, but it will not
+ * be dequeued yet...
+ */
old_tail = qatomic_xchg(&tail, &node->next);
- qatomic_mb_set(old_tail, node);
+
+ /*
+ * ... until it is pointed to from another item in the list.
+ * In the meantime, try_dequeue() will find a NULL next pointer
+ * and loop.
+ *
+ * Synchronizes with qatomic_load_acquire() in try_dequeue().
+ */
+ qatomic_store_release(old_tail, node);
}
static struct rcu_head *try_dequeue(void)
@@ -197,26 +212,31 @@ static struct rcu_head *try_dequeue(void)
struct rcu_head *node, *next;
retry:
- /* Test for an empty list, which we do not expect. Note that for
+ /* Head is only written by this thread, so no need for barriers. */
+ node = head;
+
+ /*
+ * If the head node has NULL in its next pointer, the value is
+ * wrong and we need to wait until its enqueuer finishes the update.
+ */
+ next = qatomic_load_acquire(&node->next);
+ if (!next) {
+ return NULL;
+ }
+
+ /*
+ * Test for an empty list, which we do not expect. Note that for
* the consumer head and tail are always consistent. The head
* is consistent because only the consumer reads/writes it.
* The tail, because it is the first step in the enqueuing.
* It is only the next pointers that might be inconsistent.
*/
- if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) {
+ if (head == &dummy && qatomic_read(&tail) == &dummy.next) {
abort();
}
- /* If the head node has NULL in its next pointer, the value is
- * wrong and we need to wait until its enqueuer finishes the update.
- */
- node = head;
- next = qatomic_mb_read(&head->next);
- if (!next) {
- return NULL;
- }
-
- /* Since we are the sole consumer, and we excluded the empty case
+ /*
+ * Since we are the sole consumer, and we excluded the empty case
* above, the queue will always have at least two nodes: the
* dummy node, and the one being removed. So we do not need to update
* the tail pointer.
@@ -263,24 +283,24 @@ static void *call_rcu_thread(void *opaque)
qatomic_sub(&rcu_call_count, n);
synchronize_rcu();
- qemu_mutex_lock_iothread();
+ bql_lock();
while (n > 0) {
node = try_dequeue();
while (!node) {
- qemu_mutex_unlock_iothread();
+ bql_unlock();
qemu_event_reset(&rcu_call_ready_event);
node = try_dequeue();
if (!node) {
qemu_event_wait(&rcu_call_ready_event);
node = try_dequeue();
}
- qemu_mutex_lock_iothread();
+ bql_lock();
}
n--;
node->func(node);
}
- qemu_mutex_unlock_iothread();
+ bql_unlock();
}
abort();
}
@@ -317,13 +337,13 @@ static void drain_rcu_callback(struct rcu_head *node)
void drain_call_rcu(void)
{
struct rcu_drain rcu_drain;
- bool locked = qemu_mutex_iothread_locked();
+ bool locked = bql_locked();
memset(&rcu_drain, 0, sizeof(struct rcu_drain));
qemu_event_init(&rcu_drain.drain_complete_event, false);
if (locked) {
- qemu_mutex_unlock_iothread();
+ bql_unlock();
}
@@ -335,31 +355,47 @@ void drain_call_rcu(void)
*
* Note that since we have only one global queue of the RCU callbacks,
* we also end up waiting for most of RCU callbacks that were registered
- * on the other threads, but this is a side effect that shoudn't be
+ * on the other threads, but this is a side effect that shouldn't be
* assumed.
*/
+ qatomic_inc(&in_drain_call_rcu);
call_rcu1(&rcu_drain.rcu, drain_rcu_callback);
qemu_event_wait(&rcu_drain.drain_complete_event);
+ qatomic_dec(&in_drain_call_rcu);
if (locked) {
- qemu_mutex_lock_iothread();
+ bql_lock();
}
}
void rcu_register_thread(void)
{
- assert(rcu_reader.ctr == 0);
+ assert(get_ptr_rcu_reader()->ctr == 0);
qemu_mutex_lock(&rcu_registry_lock);
- QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
+ QLIST_INSERT_HEAD(&registry, get_ptr_rcu_reader(), node);
qemu_mutex_unlock(&rcu_registry_lock);
}
void rcu_unregister_thread(void)
{
qemu_mutex_lock(&rcu_registry_lock);
- QLIST_REMOVE(&rcu_reader, node);
+ QLIST_REMOVE(get_ptr_rcu_reader(), node);
+ qemu_mutex_unlock(&rcu_registry_lock);
+}
+
+void rcu_add_force_rcu_notifier(Notifier *n)
+{
+ qemu_mutex_lock(&rcu_registry_lock);
+ notifier_list_add(&get_ptr_rcu_reader()->force_rcu, n);
+ qemu_mutex_unlock(&rcu_registry_lock);
+}
+
+void rcu_remove_force_rcu_notifier(Notifier *n)
+{
+ qemu_mutex_lock(&rcu_registry_lock);
+ notifier_remove(n);
qemu_mutex_unlock(&rcu_registry_lock);
}
@@ -373,7 +409,7 @@ static void rcu_init_complete(void)
qemu_event_init(&rcu_call_ready_event, false);
- /* The caller is assumed to have iothread lock, so the call_rcu thread
+ /* The caller is assumed to have BQL, so the call_rcu thread
* must have been quiescent even after forking, just recreate it.
*/
qemu_thread_create(&thread, "call_rcu", call_rcu_thread,