diff options
Diffstat (limited to 'util/rcu.c')
-rw-r--r-- | util/rcu.c | 114 |
1 files changed, 75 insertions, 39 deletions
diff --git a/util/rcu.c b/util/rcu.c index 13ac0f75cb..fa32c942e4 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -46,6 +46,7 @@ unsigned long rcu_gp_ctr = RCU_GP_LOCKED; QemuEvent rcu_gp_event; +static int in_drain_call_rcu; static QemuMutex rcu_registry_lock; static QemuMutex rcu_sync_lock; @@ -64,7 +65,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr) /* Written to only by each individual reader. Read by both the reader and the * writers. */ -__thread struct rcu_reader_data rcu_reader; +QEMU_DEFINE_CO_TLS(struct rcu_reader_data, rcu_reader) /* Protected by rcu_registry_lock. */ typedef QLIST_HEAD(, rcu_reader_data) ThreadList; @@ -82,12 +83,6 @@ static void wait_for_readers(void) */ qemu_event_reset(&rcu_gp_event); - /* Instead of using qatomic_mb_set for index->waiting, and - * qatomic_mb_read for index->ctr, memory barriers are placed - * manually since writes to different threads are independent. - * qemu_event_reset has acquire semantics, so no memory barrier - * is needed here. - */ QLIST_FOREACH(index, ®istry, node) { qatomic_set(&index->waiting, true); } @@ -95,6 +90,10 @@ static void wait_for_readers(void) /* Here, order the stores to index->waiting before the loads of * index->ctr. Pairs with smp_mb_placeholder() in rcu_read_unlock(), * ensuring that the loads of index->ctr are sequentially consistent. + * + * If this is the last iteration, this barrier also prevents + * frees from seeping upwards, and orders the two wait phases + * on architectures with 32-bit longs; see synchronize_rcu(). */ smp_mb_global(); @@ -103,10 +102,12 @@ static void wait_for_readers(void) QLIST_REMOVE(index, node); QLIST_INSERT_HEAD(&qsreaders, index, node); - /* No need for mb_set here, worst of all we + /* No need for memory barriers here, worst of all we * get some extra futex wakeups. */ qatomic_set(&index->waiting, false); + } else if (qatomic_read(&in_drain_call_rcu)) { + notifier_list_notify(&index->force_rcu, NULL); } } @@ -146,26 +147,26 @@ void synchronize_rcu(void) /* Write RCU-protected pointers before reading p_rcu_reader->ctr. * Pairs with smp_mb_placeholder() in rcu_read_lock(). + * + * Also orders write to RCU-protected pointers before + * write to rcu_gp_ctr. */ smp_mb_global(); QEMU_LOCK_GUARD(&rcu_registry_lock); if (!QLIST_EMPTY(®istry)) { - /* In either case, the qatomic_mb_set below blocks stores that free - * old RCU-protected pointers. - */ if (sizeof(rcu_gp_ctr) < 8) { /* For architectures with 32-bit longs, a two-subphases algorithm * ensures we do not encounter overflow bugs. * * Switch parity: 0 -> 1, 1 -> 0. */ - qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); + qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); wait_for_readers(); - qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); + qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); } else { /* Increment current grace period. */ - qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); + qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); } wait_for_readers(); @@ -188,8 +189,22 @@ static void enqueue(struct rcu_head *node) struct rcu_head **old_tail; node->next = NULL; + + /* + * Make this node the tail of the list. The node will be + * used by further enqueue operations, but it will not + * be dequeued yet... + */ old_tail = qatomic_xchg(&tail, &node->next); - qatomic_mb_set(old_tail, node); + + /* + * ... until it is pointed to from another item in the list. + * In the meantime, try_dequeue() will find a NULL next pointer + * and loop. + * + * Synchronizes with qatomic_load_acquire() in try_dequeue(). + */ + qatomic_store_release(old_tail, node); } static struct rcu_head *try_dequeue(void) @@ -197,26 +212,31 @@ static struct rcu_head *try_dequeue(void) struct rcu_head *node, *next; retry: - /* Test for an empty list, which we do not expect. Note that for + /* Head is only written by this thread, so no need for barriers. */ + node = head; + + /* + * If the head node has NULL in its next pointer, the value is + * wrong and we need to wait until its enqueuer finishes the update. + */ + next = qatomic_load_acquire(&node->next); + if (!next) { + return NULL; + } + + /* + * Test for an empty list, which we do not expect. Note that for * the consumer head and tail are always consistent. The head * is consistent because only the consumer reads/writes it. * The tail, because it is the first step in the enqueuing. * It is only the next pointers that might be inconsistent. */ - if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) { + if (head == &dummy && qatomic_read(&tail) == &dummy.next) { abort(); } - /* If the head node has NULL in its next pointer, the value is - * wrong and we need to wait until its enqueuer finishes the update. - */ - node = head; - next = qatomic_mb_read(&head->next); - if (!next) { - return NULL; - } - - /* Since we are the sole consumer, and we excluded the empty case + /* + * Since we are the sole consumer, and we excluded the empty case * above, the queue will always have at least two nodes: the * dummy node, and the one being removed. So we do not need to update * the tail pointer. @@ -263,24 +283,24 @@ static void *call_rcu_thread(void *opaque) qatomic_sub(&rcu_call_count, n); synchronize_rcu(); - qemu_mutex_lock_iothread(); + bql_lock(); while (n > 0) { node = try_dequeue(); while (!node) { - qemu_mutex_unlock_iothread(); + bql_unlock(); qemu_event_reset(&rcu_call_ready_event); node = try_dequeue(); if (!node) { qemu_event_wait(&rcu_call_ready_event); node = try_dequeue(); } - qemu_mutex_lock_iothread(); + bql_lock(); } n--; node->func(node); } - qemu_mutex_unlock_iothread(); + bql_unlock(); } abort(); } @@ -317,13 +337,13 @@ static void drain_rcu_callback(struct rcu_head *node) void drain_call_rcu(void) { struct rcu_drain rcu_drain; - bool locked = qemu_mutex_iothread_locked(); + bool locked = bql_locked(); memset(&rcu_drain, 0, sizeof(struct rcu_drain)); qemu_event_init(&rcu_drain.drain_complete_event, false); if (locked) { - qemu_mutex_unlock_iothread(); + bql_unlock(); } @@ -335,31 +355,47 @@ void drain_call_rcu(void) * * Note that since we have only one global queue of the RCU callbacks, * we also end up waiting for most of RCU callbacks that were registered - * on the other threads, but this is a side effect that shoudn't be + * on the other threads, but this is a side effect that shouldn't be * assumed. */ + qatomic_inc(&in_drain_call_rcu); call_rcu1(&rcu_drain.rcu, drain_rcu_callback); qemu_event_wait(&rcu_drain.drain_complete_event); + qatomic_dec(&in_drain_call_rcu); if (locked) { - qemu_mutex_lock_iothread(); + bql_lock(); } } void rcu_register_thread(void) { - assert(rcu_reader.ctr == 0); + assert(get_ptr_rcu_reader()->ctr == 0); qemu_mutex_lock(&rcu_registry_lock); - QLIST_INSERT_HEAD(®istry, &rcu_reader, node); + QLIST_INSERT_HEAD(®istry, get_ptr_rcu_reader(), node); qemu_mutex_unlock(&rcu_registry_lock); } void rcu_unregister_thread(void) { qemu_mutex_lock(&rcu_registry_lock); - QLIST_REMOVE(&rcu_reader, node); + QLIST_REMOVE(get_ptr_rcu_reader(), node); + qemu_mutex_unlock(&rcu_registry_lock); +} + +void rcu_add_force_rcu_notifier(Notifier *n) +{ + qemu_mutex_lock(&rcu_registry_lock); + notifier_list_add(&get_ptr_rcu_reader()->force_rcu, n); + qemu_mutex_unlock(&rcu_registry_lock); +} + +void rcu_remove_force_rcu_notifier(Notifier *n) +{ + qemu_mutex_lock(&rcu_registry_lock); + notifier_remove(n); qemu_mutex_unlock(&rcu_registry_lock); } @@ -373,7 +409,7 @@ static void rcu_init_complete(void) qemu_event_init(&rcu_call_ready_event, false); - /* The caller is assumed to have iothread lock, so the call_rcu thread + /* The caller is assumed to have BQL, so the call_rcu thread * must have been quiescent even after forking, just recreate it. */ qemu_thread_create(&thread, "call_rcu", call_rcu_thread, |