aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@linaro.org>2018-06-29 16:00:09 +0300
committerMaxim Uvarov <maxim.uvarov@linaro.org>2018-07-05 14:13:49 +0300
commit9ccda90b1788836848ecc5977d9cd8a807ee20c9 (patch)
treeb077b27c7d1e1e4c5edde04e7e8a4b8c493ceb67
parent97b316f2342a317843b476094ed7323ca899504c (diff)
linux-gen: ring: remove unnecessary r_tail synchronization
Reader data cannot be overwritten until reader itself enqueues data back to the ring. Ring size is equal or larger than maximum number of items that will be stored into the ring. Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org> Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r--platform/linux-generic/include/odp_ring_internal.h43
1 files changed, 9 insertions, 34 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h
index 130d74cc6..97673bef4 100644
--- a/platform/linux-generic/include/odp_ring_internal.h
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -36,7 +36,6 @@ typedef struct ODP_ALIGNED_CACHE {
/* Reader head and tail */
odp_atomic_u32_t r_head;
- odp_atomic_u32_t r_tail;
uint32_t data[0];
} ring_t;
@@ -57,14 +56,12 @@ static inline void ring_init(ring_t *ring)
odp_atomic_init_u32(&ring->w_head, 0);
odp_atomic_init_u32(&ring->w_tail, 0);
odp_atomic_init_u32(&ring->r_head, 0);
- odp_atomic_init_u32(&ring->r_tail, 0);
}
/* Dequeue data from the ring head */
static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
{
uint32_t head, tail, new_head;
- uint32_t data;
/* Load/CAS acquire of r_head ensures that w_tail load happens after
* r_head load, and thus head value is always behind or equal to tail
@@ -81,20 +78,12 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
new_head = head + 1;
} while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
- __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQ_REL,
__ATOMIC_ACQUIRE) == 0));
- /* Read queue index */
- data = ring->data[new_head & mask];
-
- /* Wait until other readers have updated the tail */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
- odp_cpu_pause();
-
- /* Now update the reader tail */
- odp_atomic_store_rel_u32(&ring->r_tail, new_head);
-
- return data;
+ /* Read data. CAS acquire-release ensures that data read
+ * does not move above from here. */
+ return ring->data[new_head & mask];
}
/* Dequeue multiple data from the ring head. Num is smaller than ring size. */
@@ -123,20 +112,14 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
new_head = head + num;
} while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
- __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQ_REL,
__ATOMIC_ACQUIRE) == 0));
- /* Read queue index */
+ /* Read data. CAS acquire-release ensures that data read
+ * does not move above from here. */
for (i = 0; i < num; i++)
data[i] = ring->data[(head + 1 + i) & mask];
- /* Wait until other readers have updated the tail */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
- odp_cpu_pause();
-
- /* Now update the reader tail */
- odp_atomic_store_rel_u32(&ring->r_tail, new_head);
-
return num;
}
@@ -149,10 +132,6 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
new_head = old_head + 1;
- /* Ring is full. Wait for the last reader to finish. */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
- odp_cpu_pause();
-
/* Write data */
ring->data[new_head & mask] = data;
@@ -160,7 +139,7 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
odp_cpu_pause();
- /* Now update the writer tail */
+ /* Release the new writer tail, readers acquire it. */
odp_atomic_store_rel_u32(&ring->w_tail, new_head);
}
@@ -174,10 +153,6 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
old_head = odp_atomic_fetch_add_u32(&ring->w_head, num);
new_head = old_head + 1;
- /* Ring is full. Wait for the last reader to finish. */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
- odp_cpu_pause();
-
/* Write data */
for (i = 0; i < num; i++)
ring->data[(new_head + i) & mask] = data[i];
@@ -186,7 +161,7 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
odp_cpu_pause();
- /* Now update the writer tail */
+ /* Release the new writer tail, readers acquire it. */
odp_atomic_store_rel_u32(&ring->w_tail, old_head + num);
}