diff options
author | Petri Savolainen <petri.savolainen@linaro.org> | 2018-06-29 10:59:30 +0300 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2018-07-02 16:41:53 +0300 |
commit | 838c13654c639baa1c54d19056cd771380ab8a58 (patch) | |
tree | 85c19e415da4221bcbc29e614718078d60aefb26 | |
parent | abd50139e35c67ddbc1ec09550ea6b6d27861703 (diff) |
linux-gen: ring: ensure head and tail load order in dequeue
Acquire memory order is needed when loading r_head. It ensures
that load of w_tail cannot get ahead of r_head load, and thus
head value cannot get ahead of tail value.
Queue empty check assumes that head is always behind or
equal to tail (== ring empty).
Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>
Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org>
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r-- | platform/linux-generic/include/odp_ring_internal.h | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index 03c0817e7..130d74cc6 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -41,6 +41,16 @@ typedef struct ODP_ALIGNED_CACHE { uint32_t data[0]; } ring_t; +/* 32-bit CAS with memory order selection */ +static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, + uint32_t new_val, int mo_success, int mo_failure) +{ + return __atomic_compare_exchange_n(&atom->v, old_val, new_val, + 0 /* strong */, + mo_success, + mo_failure); +} + /* Initialize ring */ static inline void ring_init(ring_t *ring) { @@ -56,7 +66,10 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) uint32_t head, tail, new_head; uint32_t data; - head = odp_atomic_load_u32(&ring->r_head); + /* Load/CAS acquire of r_head ensures that w_tail load happens after + * r_head load, and thus head value is always behind or equal to tail + * value. */ + head = odp_atomic_load_acq_u32(&ring->r_head); /* Move reader head. This thread owns data at the new head. */ do { @@ -67,8 +80,9 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) new_head = head + 1; - } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, - new_head) == 0)); + } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE) == 0)); /* Read queue index */ data = ring->data[new_head & mask]; @@ -89,7 +103,10 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, { uint32_t head, tail, new_head, i; - head = odp_atomic_load_u32(&ring->r_head); + /* Load/CAS acquire of r_head ensures that w_tail load happens after + * r_head load, and thus head value is always behind or equal to tail + * value. */ + head = odp_atomic_load_acq_u32(&ring->r_head); /* Move reader head. This thread owns data at the new head. */ do { @@ -105,8 +122,9 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, new_head = head + num; - } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, - new_head) == 0)); + } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE) == 0)); /* Read queue index */ for (i = 0; i < num; i++) |