diff options
author | Petri Savolainen <petri.savolainen@linaro.org> | 2018-06-29 16:00:09 +0300 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2018-07-05 14:13:49 +0300 |
commit | 9ccda90b1788836848ecc5977d9cd8a807ee20c9 (patch) | |
tree | b077b27c7d1e1e4c5edde04e7e8a4b8c493ceb67 | |
parent | 97b316f2342a317843b476094ed7323ca899504c (diff) |
linux-gen: ring: remove unnecessary r_tail synchronization
Reader data cannot be overwritten until reader itself enqueues
data back to the ring. Ring size is equal or larger than maximum
number of items that will be stored into the ring.
Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>
Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org>
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r-- | platform/linux-generic/include/odp_ring_internal.h | 43 |
1 files changed, 9 insertions, 34 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index 130d74cc6..97673bef4 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -36,7 +36,6 @@ typedef struct ODP_ALIGNED_CACHE { /* Reader head and tail */ odp_atomic_u32_t r_head; - odp_atomic_u32_t r_tail; uint32_t data[0]; } ring_t; @@ -57,14 +56,12 @@ static inline void ring_init(ring_t *ring) odp_atomic_init_u32(&ring->w_head, 0); odp_atomic_init_u32(&ring->w_tail, 0); odp_atomic_init_u32(&ring->r_head, 0); - odp_atomic_init_u32(&ring->r_tail, 0); } /* Dequeue data from the ring head */ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) { uint32_t head, tail, new_head; - uint32_t data; /* Load/CAS acquire of r_head ensures that w_tail load happens after * r_head load, and thus head value is always behind or equal to tail @@ -81,20 +78,12 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) new_head = head + 1; } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, - __ATOMIC_ACQUIRE, + __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE) == 0)); - /* Read queue index */ - data = ring->data[new_head & mask]; - - /* Wait until other readers have updated the tail */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) - odp_cpu_pause(); - - /* Now update the reader tail */ - odp_atomic_store_rel_u32(&ring->r_tail, new_head); - - return data; + /* Read data. CAS acquire-release ensures that data read + * does not move above from here. */ + return ring->data[new_head & mask]; } /* Dequeue multiple data from the ring head. Num is smaller than ring size. */ @@ -123,20 +112,14 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, new_head = head + num; } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, - __ATOMIC_ACQUIRE, + __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE) == 0)); - /* Read queue index */ + /* Read data. CAS acquire-release ensures that data read + * does not move above from here. */ for (i = 0; i < num; i++) data[i] = ring->data[(head + 1 + i) & mask]; - /* Wait until other readers have updated the tail */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) - odp_cpu_pause(); - - /* Now update the reader tail */ - odp_atomic_store_rel_u32(&ring->r_tail, new_head); - return num; } @@ -149,10 +132,6 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) old_head = odp_atomic_fetch_inc_u32(&ring->w_head); new_head = old_head + 1; - /* Ring is full. Wait for the last reader to finish. */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) - odp_cpu_pause(); - /* Write data */ ring->data[new_head & mask] = data; @@ -160,7 +139,7 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) odp_cpu_pause(); - /* Now update the writer tail */ + /* Release the new writer tail, readers acquire it. */ odp_atomic_store_rel_u32(&ring->w_tail, new_head); } @@ -174,10 +153,6 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[], old_head = odp_atomic_fetch_add_u32(&ring->w_head, num); new_head = old_head + 1; - /* Ring is full. Wait for the last reader to finish. */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) - odp_cpu_pause(); - /* Write data */ for (i = 0; i < num; i++) ring->data[(new_head + i) & mask] = data[i]; @@ -186,7 +161,7 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[], while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) odp_cpu_pause(); - /* Now update the writer tail */ + /* Release the new writer tail, readers acquire it. */ odp_atomic_store_rel_u32(&ring->w_tail, old_head + num); } |