aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@linaro.org>2018-06-29 10:59:30 +0300
committerMaxim Uvarov <maxim.uvarov@linaro.org>2018-07-02 16:41:53 +0300
commit838c13654c639baa1c54d19056cd771380ab8a58 (patch)
tree85c19e415da4221bcbc29e614718078d60aefb26
parentabd50139e35c67ddbc1ec09550ea6b6d27861703 (diff)
linux-gen: ring: ensure head and tail load order in dequeue
Acquire memory order is needed when loading r_head. It ensures that load of w_tail cannot get ahead of r_head load, and thus head value cannot get ahead of tail value. Queue empty check assumes that head is always behind or equal to tail (== ring empty). Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org> Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r--platform/linux-generic/include/odp_ring_internal.h30
1 files changed, 24 insertions, 6 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h
index 03c0817e7..130d74cc6 100644
--- a/platform/linux-generic/include/odp_ring_internal.h
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -41,6 +41,16 @@ typedef struct ODP_ALIGNED_CACHE {
uint32_t data[0];
} ring_t;
+/* 32-bit CAS with memory order selection */
+static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val,
+ uint32_t new_val, int mo_success, int mo_failure)
+{
+ return __atomic_compare_exchange_n(&atom->v, old_val, new_val,
+ 0 /* strong */,
+ mo_success,
+ mo_failure);
+}
+
/* Initialize ring */
static inline void ring_init(ring_t *ring)
{
@@ -56,7 +66,10 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
uint32_t head, tail, new_head;
uint32_t data;
- head = odp_atomic_load_u32(&ring->r_head);
+ /* Load/CAS acquire of r_head ensures that w_tail load happens after
+ * r_head load, and thus head value is always behind or equal to tail
+ * value. */
+ head = odp_atomic_load_acq_u32(&ring->r_head);
/* Move reader head. This thread owns data at the new head. */
do {
@@ -67,8 +80,9 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
new_head = head + 1;
- } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
- new_head) == 0));
+ } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQUIRE) == 0));
/* Read queue index */
data = ring->data[new_head & mask];
@@ -89,7 +103,10 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
{
uint32_t head, tail, new_head, i;
- head = odp_atomic_load_u32(&ring->r_head);
+ /* Load/CAS acquire of r_head ensures that w_tail load happens after
+ * r_head load, and thus head value is always behind or equal to tail
+ * value. */
+ head = odp_atomic_load_acq_u32(&ring->r_head);
/* Move reader head. This thread owns data at the new head. */
do {
@@ -105,8 +122,9 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
new_head = head + num;
- } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
- new_head) == 0));
+ } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQUIRE) == 0));
/* Read queue index */
for (i = 0; i < num; i++)