aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatias Elo <matias.elo@nokia.com>2019-04-10 15:31:23 +0300
committerMatias Elo <matias.elo@nokia.com>2019-04-17 11:10:46 +0300
commit0a5eff079dfa17ecb5bc67244eeb6983e1bcf296 (patch)
treef04aab5c5c649b056dbbc0ac69678bb54a9342af
parentb3cacf62c188e2d2696bb6993a9328127274b9b0 (diff)
downloadodp-0a5eff079dfa17ecb5bc67244eeb6983e1bcf296.tar.gz
linux-gen: ring: enable storing pointers in ring data
Added new ring type ring_ptr_t, which uses the same implementation as old ring_t, but stores pointers instead of indices. The old ring_t type is renamed to ring_u32_t. Signed-off-by: Matias Elo <matias.elo@nokia.com> Reviewed-by: Petri Savolainen <petri.savolainen@nokia.com>
-rw-r--r--platform/linux-generic/Makefile.am3
-rw-r--r--platform/linux-generic/include/odp_pool_internal.h4
-rw-r--r--platform/linux-generic/include/odp_ring_common.h21
-rw-r--r--platform/linux-generic/include/odp_ring_internal.h115
-rw-r--r--platform/linux-generic/include/odp_ring_ptr_internal.h25
-rw-r--r--platform/linux-generic/include/odp_ring_u32_internal.h25
-rw-r--r--platform/linux-generic/odp_pool.c22
-rw-r--r--platform/linux-generic/odp_schedule_basic.c34
-rw-r--r--platform/linux-generic/odp_schedule_sp.c11
9 files changed, 193 insertions, 67 deletions
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 08b6a671f..517ea258b 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -126,10 +126,13 @@ noinst_HEADERS = \
include/odp_queue_basic_internal.h \
include/odp_queue_lf.h \
include/odp_queue_scalable_internal.h \
+ include/odp_ring_common.h \
include/odp_ring_internal.h \
include/odp_ring_mpmc_internal.h \
+ include/odp_ring_ptr_internal.h \
include/odp_ring_spsc_internal.h \
include/odp_ring_st_internal.h \
+ include/odp_ring_u32_internal.h \
include/odp_schedule_if.h \
include/odp_schedule_scalable_config.h \
include/odp_schedule_scalable.h \
diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h
index 67d7e7b7b..a44ad94f5 100644
--- a/platform/linux-generic/include/odp_pool_internal.h
+++ b/platform/linux-generic/include/odp_pool_internal.h
@@ -23,7 +23,7 @@ extern "C" {
#include <odp_buffer_internal.h>
#include <odp_config_internal.h>
-#include <odp_ring_internal.h>
+#include <odp_ring_u32_internal.h>
#include <odp/api/plat/strong_types.h>
typedef struct ODP_ALIGNED_CACHE pool_cache_t {
@@ -38,7 +38,7 @@ typedef struct ODP_ALIGNED_CACHE pool_cache_t {
/* Buffer header ring */
typedef struct ODP_ALIGNED_CACHE {
/* Ring header */
- ring_t hdr;
+ ring_u32_t hdr;
/* Ring data: buffer handles */
uint32_t buf[CONFIG_POOL_MAX_NUM + 1];
diff --git a/platform/linux-generic/include/odp_ring_common.h b/platform/linux-generic/include/odp_ring_common.h
new file mode 100644
index 000000000..88e6bf880
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_common.h
@@ -0,0 +1,21 @@
+/* Copyright (c) 2019, Nokia
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_COMMON_H_
+#define ODP_RING_COMMON_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define _ODP_RING_TYPE_U32 1
+#define _ODP_RING_TYPE_PTR 2
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h
index ad2f37ef2..af6b3294e 100644
--- a/platform/linux-generic/include/odp_ring_internal.h
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -1,9 +1,13 @@
/* Copyright (c) 2016-2018, Linaro Limited
+ * Copyright (c) 2019, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
+/* This header should NOT be included directly. There are no include guards for
+ * the function definitions! */
+
#ifndef ODP_RING_INTERNAL_H_
#define ODP_RING_INTERNAL_H_
@@ -17,8 +21,9 @@ extern "C" {
#include <odp_align_internal.h>
#include <odp/api/plat/atomic_inlines.h>
#include <odp/api/plat/cpu_inlines.h>
+#include <odp_ring_common.h>
-/* Ring of uint32_t data
+/* Generic ring implementation
*
* Ring stores head and tail counters. Ring indexes are formed from these
* counters with a mask (mask = ring_size - 1), which requires that ring size
@@ -26,7 +31,8 @@ extern "C" {
* number of data items that will be stored on it as write operations are
* assumed to succeed eventually (after readers complete their current
* operations). */
-typedef struct ODP_ALIGNED_CACHE {
+
+struct ring_common {
/* Writer head and tail */
odp_atomic_u32_t w_head;
odp_atomic_u32_t w_tail;
@@ -35,9 +41,17 @@ typedef struct ODP_ALIGNED_CACHE {
/* Reader head and tail */
odp_atomic_u32_t r_head;
odp_atomic_u32_t r_tail;
+};
+typedef struct ODP_ALIGNED_CACHE {
+ struct ring_common r;
uint32_t data[0];
-} ring_t;
+} ring_u32_t;
+
+typedef struct ODP_ALIGNED_CACHE {
+ struct ring_common r;
+ void *data[0];
+} ring_ptr_t;
/* 32-bit CAS with memory order selection */
static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val,
@@ -49,35 +63,69 @@ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val,
mo_failure);
}
+#endif /* End of include guards */
+
+#undef _ring_gen_t
+#undef _ring_data_t
+#undef _RING_INIT
+#undef _RING_DEQ
+#undef _RING_DEQ_MULTI
+#undef _RING_ENQ
+#undef _RING_ENQ_MULTI
+
+/* Remap generic types and function names to ring data type specific ones. One
+ * should never use the generic names (e.g. _RING_INIT) directly. */
+
+#if _ODP_RING_TYPE == _ODP_RING_TYPE_U32
+ #define _ring_gen_t ring_u32_t
+ #define _ring_data_t uint32_t
+
+ #define _RING_INIT ring_u32_init
+ #define _RING_DEQ ring_u32_deq
+ #define _RING_DEQ_MULTI ring_u32_deq_multi
+ #define _RING_ENQ ring_u32_enq
+ #define _RING_ENQ_MULTI ring_u32_enq_multi
+#elif _ODP_RING_TYPE == _ODP_RING_TYPE_PTR
+ #define _ring_gen_t ring_ptr_t
+ #define _ring_data_t void *
+
+ #define _RING_INIT ring_ptr_init
+ #define _RING_DEQ ring_ptr_deq
+ #define _RING_DEQ_MULTI ring_ptr_deq_multi
+ #define _RING_ENQ ring_ptr_enq
+ #define _RING_ENQ_MULTI ring_ptr_enq_multi
+#endif
+
/* Initialize ring */
-static inline void ring_init(ring_t *ring)
+static inline void _RING_INIT(_ring_gen_t *ring)
{
- odp_atomic_init_u32(&ring->w_head, 0);
- odp_atomic_init_u32(&ring->w_tail, 0);
- odp_atomic_init_u32(&ring->r_head, 0);
- odp_atomic_init_u32(&ring->r_tail, 0);
+ odp_atomic_init_u32(&ring->r.w_head, 0);
+ odp_atomic_init_u32(&ring->r.w_tail, 0);
+ odp_atomic_init_u32(&ring->r.r_head, 0);
+ odp_atomic_init_u32(&ring->r.r_tail, 0);
}
/* Dequeue data from the ring head */
-static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data)
+static inline uint32_t _RING_DEQ(_ring_gen_t *ring, uint32_t mask,
+ _ring_data_t *data)
{
uint32_t head, tail, new_head;
/* Load/CAS acquire of r_head ensures that w_tail load happens after
* r_head load, and thus head value is always behind or equal to tail
* value. */
- head = odp_atomic_load_acq_u32(&ring->r_head);
+ head = odp_atomic_load_acq_u32(&ring->r.r_head);
/* Move reader head. This thread owns data at the new head. */
do {
- tail = odp_atomic_load_acq_u32(&ring->w_tail);
+ tail = odp_atomic_load_acq_u32(&ring->r.w_tail);
if (head == tail)
return 0;
new_head = head + 1;
- } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
+ } while (odp_unlikely(cas_mo_u32(&ring->r.r_head, &head, new_head,
__ATOMIC_ACQUIRE,
__ATOMIC_ACQUIRE) == 0));
@@ -85,29 +133,29 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data)
*data = ring->data[new_head & mask];
/* Wait until other readers have updated the tail */
- while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != head))
odp_cpu_pause();
/* Update the tail. Writers acquire it. */
- odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+ odp_atomic_store_rel_u32(&ring->r.r_tail, new_head);
return 1;
}
/* Dequeue multiple data from the ring head. Num is smaller than ring size. */
-static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
- uint32_t data[], uint32_t num)
+static inline uint32_t _RING_DEQ_MULTI(_ring_gen_t *ring, uint32_t mask,
+ _ring_data_t data[], uint32_t num)
{
uint32_t head, tail, new_head, i;
/* Load/CAS acquire of r_head ensures that w_tail load happens after
* r_head load, and thus head value is always behind or equal to tail
* value. */
- head = odp_atomic_load_acq_u32(&ring->r_head);
+ head = odp_atomic_load_acq_u32(&ring->r.r_head);
/* Move reader head. This thread owns data at the new head. */
do {
- tail = odp_atomic_load_acq_u32(&ring->w_tail);
+ tail = odp_atomic_load_acq_u32(&ring->r.w_tail);
/* Ring is empty */
if (head == tail)
@@ -119,7 +167,7 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
new_head = head + num;
- } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
+ } while (odp_unlikely(cas_mo_u32(&ring->r.r_head, &head, new_head,
__ATOMIC_ACQUIRE,
__ATOMIC_ACQUIRE) == 0));
@@ -128,29 +176,30 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
data[i] = ring->data[(head + 1 + i) & mask];
/* Wait until other readers have updated the tail */
- while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != head))
odp_cpu_pause();
/* Update the tail. Writers acquire it. */
- odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+ odp_atomic_store_rel_u32(&ring->r.r_tail, new_head);
return num;
}
/* Enqueue data into the ring tail */
-static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
+static inline void _RING_ENQ(_ring_gen_t *ring, uint32_t mask,
+ _ring_data_t data)
{
uint32_t old_head, new_head;
uint32_t size = mask + 1;
/* Reserve a slot in the ring for writing */
- old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
+ old_head = odp_atomic_fetch_inc_u32(&ring->r.w_head);
new_head = old_head + 1;
/* Wait for the last reader to finish. This prevents overwrite when
* a reader has been left behind (e.g. due to an interrupt) and is
* still reading the same slot. */
- while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail)
+ while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r.r_tail)
>= size))
odp_cpu_pause();
@@ -158,28 +207,28 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
ring->data[new_head & mask] = data;
/* Wait until other writers have updated the tail */
- while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head))
odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */
- odp_atomic_store_rel_u32(&ring->w_tail, new_head);
+ odp_atomic_store_rel_u32(&ring->r.w_tail, new_head);
}
/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */
-static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
- uint32_t num)
+static inline void _RING_ENQ_MULTI(_ring_gen_t *ring, uint32_t mask,
+ _ring_data_t data[], uint32_t num)
{
uint32_t old_head, new_head, i;
uint32_t size = mask + 1;
/* Reserve a slot in the ring for writing */
- old_head = odp_atomic_fetch_add_u32(&ring->w_head, num);
+ old_head = odp_atomic_fetch_add_u32(&ring->r.w_head, num);
new_head = old_head + 1;
/* Wait for the last reader to finish. This prevents overwrite when
* a reader has been left behind (e.g. due to an interrupt) and is
* still reading these slots. */
- while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail)
+ while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r.r_tail)
>= size))
odp_cpu_pause();
@@ -188,15 +237,13 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
ring->data[(new_head + i) & mask] = data[i];
/* Wait until other writers have updated the tail */
- while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head))
odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */
- odp_atomic_store_rel_u32(&ring->w_tail, old_head + num);
+ odp_atomic_store_rel_u32(&ring->r.w_tail, old_head + num);
}
#ifdef __cplusplus
}
#endif
-
-#endif
diff --git a/platform/linux-generic/include/odp_ring_ptr_internal.h b/platform/linux-generic/include/odp_ring_ptr_internal.h
new file mode 100644
index 000000000..13b2b2fbf
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_ptr_internal.h
@@ -0,0 +1,25 @@
+/* Copyright (c) 2019, Nokia
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_PTR_INTERNAL_H_
+#define ODP_RING_PTR_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_ring_common.h>
+
+#undef _ODP_RING_TYPE
+#define _ODP_RING_TYPE _ODP_RING_TYPE_PTR
+
+#include <odp_ring_internal.h>
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/include/odp_ring_u32_internal.h b/platform/linux-generic/include/odp_ring_u32_internal.h
new file mode 100644
index 000000000..baa02e4ca
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_u32_internal.h
@@ -0,0 +1,25 @@
+/* Copyright (c) 2019, Nokia
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_U32_INTERNAL_H_
+#define ODP_RING_U32_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_ring_common.h>
+
+#undef _ODP_RING_TYPE
+#define _ODP_RING_TYPE _ODP_RING_TYPE_U32
+
+#include <odp_ring_internal.h>
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 832991d32..136f82c5e 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -20,7 +20,7 @@
#include <odp_packet_dpdk.h>
#include <odp_config_internal.h>
#include <odp_debug_internal.h>
-#include <odp_ring_internal.h>
+#include <odp_ring_u32_internal.h>
#include <odp_global_data.h>
#include <odp_libconfig_internal.h>
#include <odp_shm_internal.h>
@@ -132,14 +132,14 @@ static inline void cache_push(pool_cache_t *cache, odp_buffer_hdr_t *buf_hdr[],
static void cache_flush(pool_cache_t *cache, pool_t *pool)
{
odp_buffer_hdr_t *buf_hdr;
- ring_t *ring;
+ ring_u32_t *ring;
uint32_t mask;
ring = &pool->ring->hdr;
mask = pool->ring_mask;
while (cache_pop(cache, &buf_hdr, 1))
- ring_enq(ring, mask, buf_hdr->index.buffer);
+ ring_u32_enq(ring, mask, buf_hdr->index.buffer);
}
static int read_config_file(pool_table_t *pool_tbl)
@@ -350,7 +350,7 @@ static void init_buffers(pool_t *pool)
void *uarea = NULL;
uint8_t *data;
uint32_t offset;
- ring_t *ring;
+ ring_u32_t *ring;
uint32_t mask;
int type;
uint64_t page_size;
@@ -431,7 +431,7 @@ static void init_buffers(pool_t *pool)
/* Store buffer index into the global pool */
if (!skip)
- ring_enq(ring, mask, i);
+ ring_u32_enq(ring, mask, i);
}
pool->skipped_blocks = skipped_blocks;
}
@@ -654,7 +654,7 @@ static odp_pool_t pool_create(const char *name, odp_pool_param_t *params,
pool->uarea_base_addr = odp_shm_addr(pool->uarea_shm);
}
- ring_init(&pool->ring->hdr);
+ ring_u32_init(&pool->ring->hdr);
init_buffers(pool);
/* Create zero-copy DPDK memory pool. NOP if zero-copy is disabled. */
@@ -890,7 +890,7 @@ int odp_pool_info(odp_pool_t pool_hdl, odp_pool_info_t *info)
int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num)
{
pool_cache_t *cache = local.cache[pool->pool_idx];
- ring_t *ring;
+ ring_u32_t *ring;
odp_buffer_hdr_t *hdr;
uint32_t mask, num_ch, i;
uint32_t num_deq = 0;
@@ -914,7 +914,7 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num)
* uintptr_t and not uint32_t. */
ring = &pool->ring->hdr;
mask = pool->ring_mask;
- burst = ring_deq_multi(ring, mask, data, burst);
+ burst = ring_u32_deq_multi(ring, mask, data, burst);
cache_num = burst - num_deq;
if (odp_unlikely(burst < num_deq)) {
@@ -951,7 +951,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
odp_buffer_hdr_t *buf_hdr[], int num)
{
pool_cache_t *cache = local.cache[pool->pool_idx];
- ring_t *ring;
+ ring_u32_t *ring;
int i;
uint32_t cache_num, mask;
uint32_t cache_size = cache->size;
@@ -966,7 +966,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
for (i = 0; i < num; i++)
buf_index[i] = buf_hdr[i]->index.buffer;
- ring_enq_multi(ring, mask, buf_index, num);
+ ring_u32_enq_multi(ring, mask, buf_index, num);
return;
}
@@ -996,7 +996,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
for (i = 0; i < burst; i++)
data[i] = buf_hdr[i]->index.buffer;
- ring_enq_multi(ring, mask, data, burst);
+ ring_u32_enq_multi(ring, mask, data, burst);
}
cache_push(cache, buf_hdr, num);
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c
index d20fd7356..4b054b86c 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -1,4 +1,5 @@
/* Copyright (c) 2013-2018, Linaro Limited
+ * Copyright (c) 2019, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -24,7 +25,7 @@
#include <odp_align_internal.h>
#include <odp/api/sync.h>
#include <odp/api/packet_io.h>
-#include <odp_ring_internal.h>
+#include <odp_ring_u32_internal.h>
#include <odp_timer_internal.h>
#include <odp_queue_basic_internal.h>
#include <odp_libconfig_internal.h>
@@ -125,7 +126,7 @@ typedef struct ODP_ALIGNED_CACHE {
uint16_t ev_index;
uint32_t qi;
odp_queue_t queue;
- ring_t *ring;
+ ring_u32_t *ring;
odp_event_t ev[STASH_SIZE];
} stash;
@@ -151,7 +152,7 @@ typedef struct ODP_ALIGNED_CACHE {
/* Priority queue */
typedef struct ODP_ALIGNED_CACHE {
/* Ring header */
- ring_t ring;
+ ring_u32_t ring;
/* Ring data: queue indexes */
uint32_t queue_index[MAX_RING_SIZE];
@@ -423,7 +424,7 @@ static int schedule_init_global(void)
prio_queue_t *prio_q;
prio_q = &sched->prio_q[grp][i][j];
- ring_init(&prio_q->ring);
+ ring_u32_init(&prio_q->ring);
}
}
}
@@ -461,10 +462,12 @@ static int schedule_term_global(void)
for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {
for (i = 0; i < NUM_PRIO; i++) {
for (j = 0; j < MAX_SPREAD; j++) {
- ring_t *ring = &sched->prio_q[grp][i][j].ring;
+ ring_u32_t *ring;
uint32_t qi;
- while (ring_deq(ring, ring_mask, &qi)) {
+ ring = &sched->prio_q[grp][i][j].ring;
+
+ while (ring_u32_deq(ring, ring_mask, &qi)) {
odp_event_t events[1];
int num;
@@ -651,9 +654,9 @@ static int schedule_sched_queue(uint32_t queue_index)
int grp = sched->queue[queue_index].grp;
int prio = sched->queue[queue_index].prio;
int spread = sched->queue[queue_index].spread;
- ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
+ ring_u32_t *ring = &sched->prio_q[grp][prio][spread].ring;
- ring_enq(ring, sched->ring_mask, queue_index);
+ ring_u32_enq(ring, sched->ring_mask, queue_index);
return 0;
}
@@ -682,10 +685,10 @@ static void schedule_pktio_start(int pktio_index, int num_pktin,
static inline void release_atomic(void)
{
uint32_t qi = sched_local.stash.qi;
- ring_t *ring = sched_local.stash.ring;
+ ring_u32_t *ring = sched_local.stash.ring;
/* Release current atomic queue */
- ring_enq(ring, sched->ring_mask, qi);
+ ring_u32_enq(ring, sched->ring_mask, qi);
/* We don't hold sync context anymore */
sched_local.sync_ctx = NO_SYNC_CONTEXT;
@@ -980,7 +983,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
int num;
uint8_t sync_ctx, ordered;
odp_queue_t handle;
- ring_t *ring;
+ ring_u32_t *ring;
int pktin;
uint16_t max_deq = burst_def;
int stashed = 1;
@@ -1000,7 +1003,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Get queue index from the priority queue */
ring = &sched->prio_q[grp][prio][id].ring;
- if (ring_deq(ring, ring_mask, &qi) == 0) {
+ if (ring_u32_deq(ring, ring_mask, &qi) == 0) {
/* Priority queue empty */
i++;
id++;
@@ -1053,7 +1056,8 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
continue;
if (num_pkt == 0 || !direct_recv) {
- ring_enq(ring, ring_mask, qi);
+ ring_u32_enq(ring, ring_mask,
+ qi);
break;
}
@@ -1079,7 +1083,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
sched_local.ordered.src_queue = qi;
/* Continue scheduling ordered queues */
- ring_enq(ring, ring_mask, qi);
+ ring_u32_enq(ring, ring_mask, qi);
sched_local.sync_ctx = sync_ctx;
} else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) {
@@ -1089,7 +1093,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
sched_local.sync_ctx = sync_ctx;
} else {
/* Continue scheduling the queue */
- ring_enq(ring, ring_mask, qi);
+ ring_u32_enq(ring, ring_mask, qi);
}
handle = queue_from_index(qi);
diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c
index e7b378950..1c52cc18d 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -1,4 +1,5 @@
/* Copyright (c) 2016-2018, Linaro Limited
+ * Copyright (c) 2019, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -18,7 +19,7 @@
#include <odp_debug_internal.h>
#include <odp_align_internal.h>
#include <odp_config_internal.h>
-#include <odp_ring_internal.h>
+#include <odp_ring_u32_internal.h>
#include <odp_timer_internal.h>
#include <odp_queue_basic_internal.h>
@@ -74,7 +75,7 @@ typedef struct ODP_ALIGNED_CACHE sched_cmd_t {
typedef struct ODP_ALIGNED_CACHE {
/* Ring header */
- ring_t ring;
+ ring_u32_t ring;
/* Ring data: queue indexes */
uint32_t ring_idx[RING_SIZE];
@@ -189,7 +190,7 @@ static int init_global(void)
for (i = 0; i < NUM_GROUP; i++)
for (j = 0; j < NUM_PRIO; j++)
- ring_init(&sched_global->prio_queue[i][j].ring);
+ ring_u32_init(&sched_global->prio_queue[i][j].ring);
sched_group = &sched_global->sched_group;
odp_ticketlock_init(&sched_group->s.lock);
@@ -411,7 +412,7 @@ static inline void add_tail(sched_cmd_t *cmd)
uint32_t idx = cmd->s.ring_idx;
prio_queue = &sched_global->prio_queue[group][prio];
- ring_enq(&prio_queue->ring, RING_MASK, idx);
+ ring_u32_enq(&prio_queue->ring, RING_MASK, idx);
}
static inline sched_cmd_t *rem_head(int group, int prio)
@@ -422,7 +423,7 @@ static inline sched_cmd_t *rem_head(int group, int prio)
prio_queue = &sched_global->prio_queue[group][prio];
- if (ring_deq(&prio_queue->ring, RING_MASK, &ring_idx) == 0)
+ if (ring_u32_deq(&prio_queue->ring, RING_MASK, &ring_idx) == 0)
return NULL;
pktio = index_from_ring_idx(&index, ring_idx);