aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian Brooks <brian.brooks@arm.com>2017-06-23 16:04:40 -0500
committerMaxim Uvarov <maxim.uvarov@linaro.org>2017-06-28 23:04:09 +0300
commit759a54bf66317fee539f2b2649c283be937d832a (patch)
treee201dcc433bf0e7188fdc723a9fde6ecef9a6609
parent9fd48a9215a7831ca951839b7187bd1eb3f7bb06 (diff)
linux-gen: sched scalable: add scalable scheduler
Signed-off-by: Brian Brooks <brian.brooks@arm.com> Signed-off-by: Kevin Wang <kevin.wang@arm.com> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> Signed-off-by: Ola Liljedahl <ola.liljedahl@arm.com> Reviewed-and-tested-by: Yi He <yi.he@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r--platform/linux-generic/Makefile.am7
-rw-r--r--platform/linux-generic/include/odp/api/plat/schedule_types.h4
-rw-r--r--platform/linux-generic/include/odp_config_internal.h15
-rw-r--r--platform/linux-generic/include/odp_queue_scalable_internal.h104
-rw-r--r--platform/linux-generic/include/odp_schedule_if.h2
-rw-r--r--platform/linux-generic/include/odp_schedule_scalable.h139
-rw-r--r--platform/linux-generic/include/odp_schedule_scalable_config.h52
-rw-r--r--platform/linux-generic/include/odp_schedule_scalable_ordered.h132
-rw-r--r--platform/linux-generic/m4/odp_schedule.m455
-rw-r--r--platform/linux-generic/odp_queue_if.c8
-rw-r--r--platform/linux-generic/odp_queue_scalable.c1022
-rw-r--r--platform/linux-generic/odp_schedule_if.c6
-rw-r--r--platform/linux-generic/odp_schedule_scalable.c1980
-rw-r--r--platform/linux-generic/odp_schedule_scalable_ordered.c345
14 files changed, 3849 insertions, 22 deletions
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 19e2241bf..82ab46426 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -185,9 +185,13 @@ noinst_HEADERS = \
${srcdir}/include/odp_pool_internal.h \
${srcdir}/include/odp_posix_extensions.h \
${srcdir}/include/odp_queue_internal.h \
+ ${srcdir}/include/odp_queue_scalable_internal.h \
${srcdir}/include/odp_ring_internal.h \
${srcdir}/include/odp_queue_if.h \
${srcdir}/include/odp_schedule_if.h \
+ ${srcdir}/include/odp_schedule_scalable.h \
+ ${srcdir}/include/odp_schedule_scalable_config.h \
+ ${srcdir}/include/odp_schedule_scalable_ordered.h \
${srcdir}/include/odp_sorted_list_internal.h \
${srcdir}/include/odp_shm_internal.h \
${srcdir}/include/odp_time_internal.h \
@@ -259,12 +263,15 @@ __LIB__libodp_linux_la_SOURCES = \
odp_pool.c \
odp_queue.c \
odp_queue_if.c \
+ odp_queue_scalable.c \
odp_rwlock.c \
odp_rwlock_recursive.c \
odp_schedule.c \
odp_schedule_if.c \
odp_schedule_sp.c \
odp_schedule_iquery.c \
+ odp_schedule_scalable.c \
+ odp_schedule_scalable_ordered.c \
odp_shared_memory.c \
odp_sorted_list.c \
odp_spinlock.c \
diff --git a/platform/linux-generic/include/odp/api/plat/schedule_types.h b/platform/linux-generic/include/odp/api/plat/schedule_types.h
index 535fd6d05..4e75f9eec 100644
--- a/platform/linux-generic/include/odp/api/plat/schedule_types.h
+++ b/platform/linux-generic/include/odp/api/plat/schedule_types.h
@@ -18,6 +18,8 @@
extern "C" {
#endif
+#include <odp/api/std_types.h>
+
/** @addtogroup odp_scheduler
* @{
*/
@@ -44,7 +46,7 @@ typedef int odp_schedule_sync_t;
typedef int odp_schedule_group_t;
/* These must be kept in sync with thread_globals_t in odp_thread.c */
-#define ODP_SCHED_GROUP_INVALID -1
+#define ODP_SCHED_GROUP_INVALID ((odp_schedule_group_t)-1)
#define ODP_SCHED_GROUP_ALL 0
#define ODP_SCHED_GROUP_WORKER 1
#define ODP_SCHED_GROUP_CONTROL 2
diff --git a/platform/linux-generic/include/odp_config_internal.h b/platform/linux-generic/include/odp_config_internal.h
index dadd59e71..3cff00452 100644
--- a/platform/linux-generic/include/odp_config_internal.h
+++ b/platform/linux-generic/include/odp_config_internal.h
@@ -7,10 +7,6 @@
#ifndef ODP_CONFIG_INTERNAL_H_
#define ODP_CONFIG_INTERNAL_H_
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/*
* Maximum number of pools
*/
@@ -22,6 +18,13 @@ extern "C" {
#define ODP_CONFIG_QUEUES 1024
/*
+ * Maximum queue depth. Maximum number of elements that can be stored in a
+ * queue. This value is used only when the size is not explicitly provided
+ * during queue creation.
+ */
+#define CONFIG_QUEUE_SIZE 4096
+
+/*
* Maximum number of ordered locks per queue
*/
#define CONFIG_QUEUE_MAX_ORD_LOCKS 4
@@ -139,8 +142,4 @@ extern "C" {
*/
#define CONFIG_POOL_CACHE_SIZE 256
-#ifdef __cplusplus
-}
-#endif
-
#endif
diff --git a/platform/linux-generic/include/odp_queue_scalable_internal.h b/platform/linux-generic/include/odp_queue_scalable_internal.h
new file mode 100644
index 000000000..f15314b23
--- /dev/null
+++ b/platform/linux-generic/include/odp_queue_scalable_internal.h
@@ -0,0 +1,104 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_QUEUE_SCALABLE_INTERNAL_H_
+#define ODP_QUEUE_SCALABLE_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/queue.h>
+#include <odp_forward_typedefs_internal.h>
+#include <odp_queue_if.h>
+#include <odp_buffer_internal.h>
+#include <odp_align_internal.h>
+#include <odp/api/packet_io.h>
+#include <odp/api/align.h>
+#include <odp/api/hints.h>
+#include <odp/api/ticketlock.h>
+#include <odp_config_internal.h>
+#include <odp_schedule_scalable.h>
+#include <odp_schedule_scalable_ordered.h>
+
+#define QUEUE_STATUS_FREE 0
+#define QUEUE_STATUS_DESTROYED 1
+#define QUEUE_STATUS_READY 2
+
+struct queue_entry_s {
+ sched_elem_t sched_elem;
+
+ odp_ticketlock_t lock ODP_ALIGNED_CACHE;
+ int status;
+
+ queue_enq_fn_t enqueue ODP_ALIGNED_CACHE;
+ queue_deq_fn_t dequeue;
+ queue_enq_multi_fn_t enqueue_multi;
+ queue_deq_multi_fn_t dequeue_multi;
+
+ uint32_t index;
+ odp_queue_t handle;
+ odp_queue_type_t type;
+ odp_queue_param_t param;
+ odp_pktin_queue_t pktin;
+ odp_pktout_queue_t pktout;
+ char name[ODP_QUEUE_NAME_LEN];
+};
+
+union queue_entry_u {
+ struct queue_entry_s s;
+ uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
+};
+
+int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num);
+int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num);
+int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num);
+
+/* Round up memory size to next cache line size to
+ * align all memory addresses on cache line boundary.
+ */
+static inline void *shm_pool_alloc_align(_odp_ishm_pool_t *pool, uint32_t size)
+{
+ void *addr;
+
+ addr = _odp_ishm_pool_alloc(pool, ROUNDUP_CACHE_LINE(size));
+ ODP_ASSERT(((uintptr_t)addr & (ODP_CACHE_LINE_SIZE - 1)) == 0);
+
+ return addr;
+}
+
+static inline uint32_t queue_to_id(odp_queue_t handle)
+{
+ return _odp_typeval(handle) - 1;
+}
+
+static inline queue_entry_t *qentry_from_int(queue_t handle)
+{
+ return (queue_entry_t *)(void *)(handle);
+}
+
+static inline queue_t qentry_to_int(queue_entry_t *qentry)
+{
+ return (queue_t)(qentry);
+}
+
+static inline odp_queue_t queue_get_handle(queue_entry_t *queue)
+{
+ return queue->s.handle;
+}
+
+static inline reorder_window_t *queue_get_rwin(queue_entry_t *queue)
+{
+ return queue->s.sched_elem.rwin;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h
index 5d10cd377..5877a1cda 100644
--- a/platform/linux-generic/include/odp_schedule_if.h
+++ b/platform/linux-generic/include/odp_schedule_if.h
@@ -12,7 +12,7 @@ extern "C" {
#endif
#include <odp/api/queue.h>
-#include <odp_queue_internal.h>
+#include <odp_queue_if.h>
#include <odp/api/schedule.h>
typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue,
diff --git a/platform/linux-generic/include/odp_schedule_scalable.h b/platform/linux-generic/include/odp_schedule_scalable.h
new file mode 100644
index 000000000..8a2d70da0
--- /dev/null
+++ b/platform/linux-generic/include/odp_schedule_scalable.h
@@ -0,0 +1,139 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_SCHEDULE_SCALABLE_H
+#define ODP_SCHEDULE_SCALABLE_H
+
+#include <odp/api/align.h>
+#include <odp/api/schedule.h>
+#include <odp/api/ticketlock.h>
+
+#include <odp_schedule_scalable_config.h>
+#include <odp_schedule_scalable_ordered.h>
+#include <odp_llqueue.h>
+
+/*
+ * ODP_SCHED_PRIO_HIGHEST/NORMAL/LOWEST/DEFAULT are compile time
+ * constants, but not ODP_SCHED_PRIO_NUM. The current API for this
+ * is odp_schedule_num_prio(). The other schedulers also define
+ * this internally as NUM_PRIO.
+ */
+#define ODP_SCHED_PRIO_NUM 8
+
+typedef struct {
+ union {
+ struct {
+ struct llqueue llq;
+ uint32_t prio;
+ };
+ char line[ODP_CACHE_LINE_SIZE];
+ };
+} sched_queue_t ODP_ALIGNED_CACHE;
+
+#define TICKET_INVALID (uint16_t)(~0U)
+
+typedef struct {
+ int32_t numevts;
+ uint16_t wrr_budget;
+ uint8_t cur_ticket;
+ uint8_t nxt_ticket;
+} qschedstate_t ODP_ALIGNED(sizeof(uint64_t));
+
+typedef uint32_t ringidx_t;
+
+#ifdef CONFIG_SPLIT_PRODCONS
+#define SPLIT_PC ODP_ALIGNED_CACHE
+#else
+#define SPLIT_PC
+#endif
+
+#define ODP_NO_SCHED_QUEUE (ODP_SCHED_SYNC_ORDERED + 1)
+
+typedef struct {
+ struct llnode node; /* must be first */
+ sched_queue_t *schedq;
+#ifdef CONFIG_QSCHST_LOCK
+ odp_ticketlock_t qschlock;
+#endif
+ qschedstate_t qschst;
+ uint16_t pop_deficit;
+ uint16_t qschst_type;
+ ringidx_t prod_read SPLIT_PC;
+ ringidx_t prod_write;
+ ringidx_t prod_mask;
+ odp_buffer_hdr_t **prod_ring;
+ ringidx_t cons_write SPLIT_PC;
+ ringidx_t cons_read;
+ reorder_window_t *rwin;
+ void *user_ctx;
+#ifdef CONFIG_SPLIT_PRODCONS
+ odp_buffer_hdr_t **cons_ring;
+ ringidx_t cons_mask;
+ uint16_t cons_type;
+#else
+#define cons_mask prod_mask
+#define cons_ring prod_ring
+#define cons_type qschst_type
+#endif
+} sched_elem_t ODP_ALIGNED_CACHE;
+
+/* Number of scheduling groups */
+#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT)
+
+typedef bitset_t sched_group_mask_t;
+
+typedef struct {
+ /* Threads currently associated with the sched group */
+ bitset_t thr_actual[ODP_SCHED_PRIO_NUM] ODP_ALIGNED_CACHE;
+ bitset_t thr_wanted;
+ /* Used to spread queues over schedq's */
+ uint32_t xcount[ODP_SCHED_PRIO_NUM];
+ /* Number of schedq's per prio */
+ uint32_t xfactor;
+ char name[ODP_SCHED_GROUP_NAME_LEN];
+ /* ODP_SCHED_PRIO_NUM * xfactor. Must be last. */
+ sched_queue_t schedq[1] ODP_ALIGNED_CACHE;
+} sched_group_t;
+
+/* Number of reorder contexts per thread */
+#define TS_RVEC_SIZE 16
+
+typedef struct {
+ /* Atomic queue currently being processed or NULL */
+ sched_elem_t *atomq;
+ /* Current reorder context or NULL */
+ reorder_context_t *rctx;
+ uint8_t pause;
+ uint8_t out_of_order;
+ uint8_t tidx;
+ uint8_t pad;
+ uint32_t dequeued; /* Number of events dequeued from atomic queue */
+ uint16_t pktin_next; /* Next pktin tag to poll */
+ uint16_t pktin_poll_cnts;
+ uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID */
+ uint16_t num_schedq;
+ uint16_t sg_sem; /* Set when sg_wanted is modified by other thread */
+#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * ODP_SCHED_PRIO_NUM)
+ sched_queue_t *schedq_list[SCHEDQ_PER_THREAD];
+ /* Current sched_group membership */
+ sched_group_mask_t sg_actual[ODP_SCHED_PRIO_NUM];
+ /* Future sched_group membership. */
+ sched_group_mask_t sg_wanted[ODP_SCHED_PRIO_NUM];
+ bitset_t priv_rvec_free;
+ /* Bitset of free entries in rvec[] */
+ bitset_t rvec_free ODP_ALIGNED_CACHE;
+ /* Reordering contexts to allocate from */
+ reorder_context_t rvec[TS_RVEC_SIZE] ODP_ALIGNED_CACHE;
+} sched_scalable_thread_state_t ODP_ALIGNED_CACHE;
+
+void sched_update_enq(sched_elem_t *q, uint32_t actual);
+void sched_update_enq_sp(sched_elem_t *q, uint32_t actual);
+sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio);
+void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio);
+
+#endif /* ODP_SCHEDULE_SCALABLE_H */
diff --git a/platform/linux-generic/include/odp_schedule_scalable_config.h b/platform/linux-generic/include/odp_schedule_scalable_config.h
new file mode 100644
index 000000000..b9a9a55f8
--- /dev/null
+++ b/platform/linux-generic/include/odp_schedule_scalable_config.h
@@ -0,0 +1,52 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_SCHEDULE_SCALABLE_CONFIG_H_
+#define ODP_SCHEDULE_SCALABLE_CONFIG_H_
+
+/*
+ * Default scaling factor for the scheduler group
+ *
+ * This scaling factor is used when the application creates a scheduler
+ * group with no worker threads.
+ */
+#define CONFIG_DEFAULT_XFACTOR 4
+
+/*
+ * Default weight (in events) for WRR in scalable scheduler
+ *
+ * This controls the per-queue weight for WRR between queues of the same
+ * priority in the scalable scheduler
+ * A higher value improves throughput while a lower value increases fairness
+ * and thus likely decreases latency
+ *
+ * If WRR is undesired, set the value to ~0 which will use the largest possible
+ * weight
+ *
+ * Note: an API for specifying this on a per-queue basis would be useful but is
+ * not yet available
+ */
+#define CONFIG_WRR_WEIGHT 64
+
+/*
+ * Split queue producer/consumer metadata into separate cache lines.
+ * This is beneficial on e.g. Cortex-A57 but not so much on A53.
+ */
+#define CONFIG_SPLIT_PRODCONS
+
+/*
+ * Use locks to protect queue (ring buffer) and scheduler state updates
+ * On x86, this decreases overhead noticeably.
+ */
+#if !defined(__arm__) && !defined(__aarch64__)
+#define CONFIG_QSCHST_LOCK
+/* Keep all ring buffer/qschst data together when using locks */
+#undef CONFIG_SPLIT_PRODCONS
+#endif
+
+#endif /* ODP_SCHEDULE_SCALABLE_CONFIG_H_ */
diff --git a/platform/linux-generic/include/odp_schedule_scalable_ordered.h b/platform/linux-generic/include/odp_schedule_scalable_ordered.h
new file mode 100644
index 000000000..941304b79
--- /dev/null
+++ b/platform/linux-generic/include/odp_schedule_scalable_ordered.h
@@ -0,0 +1,132 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_SCHEDULE_SCALABLE_ORDERED_H
+#define ODP_SCHEDULE_SCALABLE_ORDERED_H
+
+#include <odp/api/shared_memory.h>
+
+#include <odp_internal.h>
+#include <odp_align_internal.h>
+#include <odp_bitset.h>
+#include <_ishmpool_internal.h>
+
+/* High level functioning of reordering
+ * Datastructures -
+ * Reorder Window - Every ordered queue is associated with a reorder window.
+ * Reorder window stores reorder contexts from threads that
+ * have completed processing out-of-order.
+ * Reorder Context - Reorder context consists of events that a thread
+ * wants to enqueue while processing a batch of events
+ * from an ordered queue.
+ *
+ * Algorithm -
+ * 1) Thread identifies the ordered queue.
+ * 2) It 'reserves a slot in the reorder window and dequeues the
+ * events' atomically. Atomicity is achieved by using a ticket-lock
+ * like design where the reorder window slot is the ticket.
+ * 3a) Upon order-release/next schedule call, the thread
+ * checks if it's slot (ticket) equals the head of the reorder window.
+ * If yes, enqueues the events to the destination queue till
+ * i) the reorder window is empty or
+ * ii) there is a gap in the reorder window
+ * If no, the reorder context is stored in the reorder window at
+ * the reserved slot.
+ * 3b) Upon the first enqueue, the thread checks if it's slot (ticket)
+ * equals the head of the reorder window.
+ * If yes, enqueues the events immediately to the destination queue
+ * If no, these (and subsequent) events are stored in the reorder context
+ * (in the application given order)
+ */
+
+/* Head and change indicator variables are used to synchronise between
+ * concurrent insert operations in the reorder window. A thread performing
+ * an in-order insertion must be notified about the newly inserted
+ * reorder contexts so that it doesn’t halt the retire process too early.
+ * A thread performing an out-of-order insertion must correspondingly
+ * notify the thread doing in-order insertion of the new waiting reorder
+ * context, which may need to be handled by that thread.
+ *
+ * Also, an out-of-order insertion may become an in-order insertion if the
+ * thread doing an in-order insertion completes before this thread completes.
+ * We need a point of synchronisation where this knowledge and potential state
+ * change can be transferred between threads.
+ */
+typedef struct hc {
+ /* First missing context */
+ uint32_t head;
+ /* Change indicator */
+ uint32_t chgi;
+} hc_t ODP_ALIGNED(sizeof(uint64_t));
+
+/* Number of reorder contects in the reorder window.
+ * Should be at least one per CPU.
+ */
+#define RWIN_SIZE 32
+ODP_STATIC_ASSERT(CHECK_IS_POWER2(RWIN_SIZE), "RWIN_SIZE is not a power of 2");
+
+typedef struct reorder_context reorder_context_t;
+
+typedef struct reorder_window {
+ /* head and change indicator */
+ hc_t hc;
+ uint32_t winmask;
+ uint32_t tail;
+ uint32_t turn;
+ uint32_t olock[CONFIG_QUEUE_MAX_ORD_LOCKS];
+ uint16_t lock_count;
+ /* Reorder contexts in this window */
+ reorder_context_t *ring[RWIN_SIZE];
+} reorder_window_t;
+
+/* Number of events that can be stored in a reorder context.
+ * This size is chosen so that there is no space left unused at the end
+ * of the last cache line (for 64b architectures and 64b handles).
+ */
+#define RC_EVT_SIZE 18
+
+struct reorder_context {
+ /* Reorder window to which this context belongs */
+ reorder_window_t *rwin;
+ /* Pointer to TS->rvec_free */
+ bitset_t *rvec_free;
+ /* Our slot number in the reorder window */
+ uint32_t sn;
+ uint8_t olock_flags;
+ /* Our index in thread_state rvec array */
+ uint8_t idx;
+ /* Use to link reorder contexts together */
+ uint8_t next_idx;
+ /* Current reorder context to save events in */
+ uint8_t cur_idx;
+ /* Number of events stored in this reorder context */
+ uint8_t numevts;
+ /* Events stored in this context */
+ odp_buffer_hdr_t *events[RC_EVT_SIZE];
+ queue_entry_t *destq[RC_EVT_SIZE];
+} ODP_ALIGNED_CACHE;
+
+reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool,
+ unsigned lock_count);
+int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin);
+bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn);
+void rwin_insert(reorder_window_t *rwin,
+ reorder_context_t *rctx,
+ uint32_t sn,
+ void (*callback)(reorder_context_t *));
+void rctx_init(reorder_context_t *rctx, uint16_t idx,
+ reorder_window_t *rwin, uint32_t sn);
+void rctx_free(const reorder_context_t *rctx);
+void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin,
+ uint32_t lock_index);
+void olock_release(const reorder_context_t *rctx);
+void rctx_retire(reorder_context_t *first);
+void rctx_release(reorder_context_t *rctx);
+int rctx_save(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
+
+#endif /* ODP_SCHEDULE_SCALABLE_ORDERED_H */
diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4
index 91c19f21a..d862b8b21 100644
--- a/platform/linux-generic/m4/odp_schedule.m4
+++ b/platform/linux-generic/m4/odp_schedule.m4
@@ -1,13 +1,44 @@
-AC_ARG_ENABLE([schedule-sp],
- [ --enable-schedule-sp enable strict priority scheduler],
- [if test x$enableval = xyes; then
- schedule_sp_enabled=yes
- ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
- fi])
+# Checks for --enable-schedule-sp and defines ODP_SCHEDULE_SP and adds
+# -DODP_SCHEDULE_SP to CFLAGS.
+AC_ARG_ENABLE(
+ [schedule_sp],
+ [AC_HELP_STRING([--enable-schedule-sp],
+ [enable strict priority scheduler])],
+ [if test "x$enableval" = xyes; then
+ schedule_sp=true
+ ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
+ else
+ schedule_sp=false
+ fi],
+ [schedule_sp=false])
+AM_CONDITIONAL([ODP_SCHEDULE_SP], [test x$schedule_sp = xtrue])
-AC_ARG_ENABLE([schedule-iquery],
- [ --enable-schedule-iquery enable interests query (sparse bitmap) scheduler],
- [if test x$enableval = xyes; then
- schedule_iquery_enabled=yes
- ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
- fi])
+# Checks for --enable-schedule-iquery and defines ODP_SCHEDULE_IQUERY and adds
+# -DODP_SCHEDULE_IQUERY to CFLAGS.
+AC_ARG_ENABLE(
+ [schedule_iquery],
+ [AC_HELP_STRING([--enable-schedule-iquery],
+ [enable interests query (sparse bitmap) scheduler])],
+ [if test "x$enableval" = xyes; then
+ schedule_iquery=true
+ ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
+ else
+ schedule_iquery=false
+ fi],
+ [schedule_iquery=false])
+AM_CONDITIONAL([ODP_SCHEDULE_IQUERY], [test x$schedule_iquery = xtrue])
+
+# Checks for --enable-schedule-scalable and defines ODP_SCHEDULE_SCALABLE and
+# adds -DODP_SCHEDULE_SCALABLE to CFLAGS.
+AC_ARG_ENABLE(
+ [schedule_scalable],
+ [AC_HELP_STRING([--enable-schedule-scalable],
+ [enable scalable scheduler])],
+ [if test "x$enableval" = xyes; then
+ schedule_scalable=true
+ ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SCALABLE"
+ else
+ schedule_scalable=false
+ fi],
+ [schedule_scalable=false])
+AM_CONDITIONAL([ODP_SCHEDULE_SCALABLE], [test x$schedule_scalable = xtrue])
diff --git a/platform/linux-generic/odp_queue_if.c b/platform/linux-generic/odp_queue_if.c
index c91f00eb2..d7471dfc7 100644
--- a/platform/linux-generic/odp_queue_if.c
+++ b/platform/linux-generic/odp_queue_if.c
@@ -6,11 +6,19 @@
#include <odp_queue_if.h>
+extern const queue_api_t queue_scalable_api;
+extern const queue_fn_t queue_scalable_fn;
+
extern const queue_api_t queue_default_api;
extern const queue_fn_t queue_default_fn;
+#ifdef ODP_SCHEDULE_SCALABLE
+const queue_api_t *queue_api = &queue_scalable_api;
+const queue_fn_t *queue_fn = &queue_scalable_fn;
+#else
const queue_api_t *queue_api = &queue_default_api;
const queue_fn_t *queue_fn = &queue_default_fn;
+#endif
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
{
diff --git a/platform/linux-generic/odp_queue_scalable.c b/platform/linux-generic/odp_queue_scalable.c
new file mode 100644
index 000000000..f95f5f930
--- /dev/null
+++ b/platform/linux-generic/odp_queue_scalable.c
@@ -0,0 +1,1022 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <odp/api/hints.h>
+#include <odp/api/plat/ticketlock_inlines.h>
+#include <odp/api/queue.h>
+#include <odp/api/schedule.h>
+#include <odp/api/shared_memory.h>
+#include <odp/api/sync.h>
+#include <odp/api/traffic_mngr.h>
+
+#include <odp_internal.h>
+#include <odp_config_internal.h>
+#include <odp_debug_internal.h>
+
+#include <odp_buffer_inlines.h>
+#include <odp_packet_io_internal.h>
+#include <odp_packet_io_queue.h>
+#include <odp_pool_internal.h>
+#include <odp_queue_scalable_internal.h>
+#include <odp_schedule_if.h>
+#include <_ishm_internal.h>
+#include <_ishmpool_internal.h>
+
+#include <string.h>
+#include <inttypes.h>
+
+#define NUM_INTERNAL_QUEUES 64
+
+#define MIN(a, b) \
+ ({ \
+ __typeof__(a) tmp_a = (a); \
+ __typeof__(b) tmp_b = (b); \
+ tmp_a < tmp_b ? tmp_a : tmp_b; \
+ })
+
+#define LOCK(a) _odp_ticketlock_lock(a)
+#define UNLOCK(a) _odp_ticketlock_unlock(a)
+#define LOCK_INIT(a) odp_ticketlock_init(a)
+
+extern __thread sched_scalable_thread_state_t *sched_ts;
+
+typedef struct queue_table_t {
+ queue_entry_t queue[ODP_CONFIG_QUEUES];
+} queue_table_t;
+
+static queue_table_t *queue_tbl;
+_odp_ishm_pool_t *queue_shm_pool;
+
+static inline odp_queue_t queue_from_id(uint32_t queue_id)
+{
+ return _odp_cast_scalar(odp_queue_t, queue_id + 1);
+}
+
+static queue_t queue_from_ext(odp_queue_t handle);
+static int _queue_enq(queue_t handle, odp_buffer_hdr_t *buf_hdr);
+static odp_buffer_hdr_t *_queue_deq(queue_t handle);
+static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[],
+ int num);
+static int _queue_deq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[],
+ int num);
+
+static queue_entry_t *get_qentry(uint32_t queue_id)
+{
+ return &queue_tbl->queue[queue_id];
+}
+
+static int _odp_queue_disable_enq(sched_elem_t *q)
+{
+ ringidx_t old_read, old_write, new_write;
+ uint32_t size;
+
+ old_write = q->prod_write;
+ size = q->prod_mask + 1;
+ do {
+ /* Need __atomic_load to avoid compiler reordering */
+ old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
+ if (old_write != old_read) {
+ /* Queue is not empty, cannot claim all elements
+ * Cannot disable enqueue.
+ */
+ return -1;
+ }
+ /* Claim all elements in ring */
+ new_write = old_write + size;
+ } while (!__atomic_compare_exchange_n(&q->prod_write,
+ &old_write, /* Updated on failure */
+ new_write,
+ true,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+ /* All remaining elements claimed, no one else can enqueue */
+ return 0;
+}
+
+static int queue_init(queue_entry_t *queue, const char *name,
+ const odp_queue_param_t *param)
+{
+ ringidx_t ring_idx;
+ sched_elem_t *sched_elem;
+ uint32_t ring_size;
+ odp_buffer_hdr_t **ring;
+ uint32_t size;
+
+ sched_elem = &queue->s.sched_elem;
+ ring_size = param->size > 0 ?
+ ROUNDUP_POWER2_U32(param->size) : CONFIG_QUEUE_SIZE;
+ strncpy(queue->s.name, name ? name : "", ODP_QUEUE_NAME_LEN - 1);
+ queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
+ memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
+
+ size = ring_size * sizeof(odp_buffer_hdr_t *);
+ ring = (odp_buffer_hdr_t **)shm_pool_alloc_align(queue_shm_pool, size);
+ if (NULL == ring)
+ return -1;
+
+ for (ring_idx = 0; ring_idx < ring_size; ring_idx++)
+ ring[ring_idx] = NULL;
+
+ queue->s.type = queue->s.param.type;
+ queue->s.enqueue = _queue_enq;
+ queue->s.dequeue = _queue_deq;
+ queue->s.enqueue_multi = _queue_enq_multi;
+ queue->s.dequeue_multi = _queue_deq_multi;
+ queue->s.pktin = PKTIN_INVALID;
+
+ sched_elem->node.next = NULL;
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK_INIT(&sched_elem->qschlock);
+#endif
+ sched_elem->qschst.numevts = 0;
+ sched_elem->qschst.wrr_budget = CONFIG_WRR_WEIGHT;
+ sched_elem->qschst.cur_ticket = 0;
+ sched_elem->qschst.nxt_ticket = 0;
+ sched_elem->pop_deficit = 0;
+ if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
+ sched_elem->qschst_type = queue->s.param.sched.sync;
+ else
+ sched_elem->qschst_type = ODP_NO_SCHED_QUEUE;
+ /* 2nd cache line - enqueue */
+ sched_elem->prod_read = 0;
+ sched_elem->prod_write = 0;
+ sched_elem->prod_ring = ring;
+ sched_elem->prod_mask = ring_size - 1;
+ /* 3rd cache line - dequeue */
+ sched_elem->cons_read = 0;
+ sched_elem->cons_write = 0;
+ sched_elem->rwin = NULL;
+ sched_elem->schedq = NULL;
+ sched_elem->user_ctx = queue->s.param.context;
+#ifdef CONFIG_SPLIT_PRODCONS
+ sched_elem->cons_ring = ring;
+ sched_elem->cons_mask = ring_size - 1;
+ sched_elem->cons_type = sched_elem->qschst_type;
+#endif
+
+ /* Queue initialized successfully, add it to the sched group */
+ if (queue->s.type == ODP_QUEUE_TYPE_SCHED) {
+ if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+ sched_elem->rwin =
+ rwin_alloc(queue_shm_pool,
+ queue->s.param.sched.lock_count);
+ if (sched_elem->rwin == NULL) {
+ ODP_ERR("Reorder window not created\n");
+ goto rwin_create_failed;
+ }
+ }
+ sched_elem->schedq =
+ schedq_from_sched_group(param->sched.group,
+ param->sched.prio);
+ }
+
+ return 0;
+
+rwin_create_failed:
+ _odp_ishm_pool_free(queue_shm_pool, ring);
+
+ return -1;
+}
+
+static int queue_init_global(void)
+{
+ uint32_t i;
+ uint64_t pool_size;
+ uint64_t min_alloc;
+ uint64_t max_alloc;
+
+ ODP_DBG("Queue init ... ");
+
+ /* Attach to the pool if it exists */
+ queue_shm_pool = _odp_ishm_pool_lookup("queue_shm_pool");
+ if (queue_shm_pool == NULL) {
+ /* Create shared memory pool to allocate shared memory for the
+ * queues. Use the default queue size.
+ */
+ /* Add size of the array holding the queues */
+ pool_size = sizeof(queue_table_t);
+ /* Add storage required for queues */
+ pool_size += (CONFIG_QUEUE_SIZE * sizeof(odp_buffer_hdr_t *)) *
+ ODP_CONFIG_QUEUES;
+ /* Add the reorder window size */
+ pool_size += sizeof(reorder_window_t) * ODP_CONFIG_QUEUES;
+ /* Choose min_alloc and max_alloc such that buddy allocator is
+ * is selected.
+ */
+ min_alloc = 0;
+ max_alloc = CONFIG_QUEUE_SIZE * sizeof(odp_buffer_hdr_t *);
+ queue_shm_pool = _odp_ishm_pool_create("queue_shm_pool",
+ pool_size,
+ min_alloc, max_alloc,
+ _ODP_ISHM_SINGLE_VA);
+ if (queue_shm_pool == NULL) {
+ ODP_ERR("Failed to allocate shared memory pool for"
+ " queues\n");
+ goto queue_shm_pool_create_failed;
+ }
+ }
+
+ queue_tbl = (queue_table_t *)
+ shm_pool_alloc_align(queue_shm_pool,
+ sizeof(queue_table_t));
+ if (queue_tbl == NULL) {
+ ODP_ERR("Failed to reserve shared memory for queue table\n");
+ goto queue_tbl_ishm_alloc_failed;
+ }
+
+ memset(queue_tbl, 0, sizeof(queue_table_t));
+
+ for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
+ /* init locks */
+ queue_entry_t *queue;
+
+ queue = get_qentry(i);
+ LOCK_INIT(&queue->s.lock);
+ queue->s.index = i;
+ queue->s.handle = queue_from_id(i);
+ }
+
+ ODP_DBG("done\n");
+ ODP_DBG("Queue init global\n");
+ ODP_DBG(" struct queue_entry_s size %zu\n",
+ sizeof(struct queue_entry_s));
+ ODP_DBG(" queue_entry_t size %zu\n",
+ sizeof(queue_entry_t));
+ ODP_DBG("\n");
+
+ return 0;
+
+queue_shm_pool_create_failed:
+
+queue_tbl_ishm_alloc_failed:
+ _odp_ishm_pool_destroy(queue_shm_pool);
+
+ return -1;
+}
+
+static int queue_term_global(void)
+{
+ int ret = 0;
+ int rc = 0;
+ queue_entry_t *queue;
+ int i;
+
+ for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
+ queue = &queue_tbl->queue[i];
+ if (__atomic_load_n(&queue->s.status,
+ __ATOMIC_RELAXED) != QUEUE_STATUS_FREE) {
+ ODP_ERR("Not destroyed queue: %s\n", queue->s.name);
+ rc = -1;
+ }
+ }
+
+ _odp_ishm_pool_free(queue_shm_pool, queue_tbl);
+
+ ret = _odp_ishm_pool_destroy(queue_shm_pool);
+ if (ret < 0) {
+ ODP_ERR("Failed to destroy shared memory pool for queues\n");
+ rc = -1;
+ }
+
+ return rc;
+}
+
+static int queue_init_local(void)
+{
+ return 0;
+}
+
+static int queue_term_local(void)
+{
+ return 0;
+}
+
+static int queue_capability(odp_queue_capability_t *capa)
+{
+ memset(capa, 0, sizeof(odp_queue_capability_t));
+
+ /* Reserve some queues for internal use */
+ capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES;
+ capa->max_ordered_locks = sched_fn->max_ordered_locks();
+ capa->max_sched_groups = sched_fn->num_grps();
+ capa->sched_prios = odp_schedule_num_prio();
+ capa->plain.max_num = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES;
+ capa->plain.max_size = 0;
+ capa->sched.max_num = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES;
+ capa->sched.max_size = 0;
+
+ return 0;
+}
+
+static odp_queue_type_t queue_type(odp_queue_t handle)
+{
+ return qentry_from_int(queue_from_ext(handle))->s.type;
+}
+
+static odp_schedule_sync_t queue_sched_type(odp_queue_t handle)
+{
+ return qentry_from_int(queue_from_ext(handle))->s.param.sched.sync;
+}
+
+static odp_schedule_prio_t queue_sched_prio(odp_queue_t handle)
+{
+ return qentry_from_int(queue_from_ext(handle))->s.param.sched.prio;
+}
+
+static odp_schedule_group_t queue_sched_group(odp_queue_t handle)
+{
+ return qentry_from_int(queue_from_ext(handle))->s.param.sched.group;
+}
+
+static int queue_lock_count(odp_queue_t handle)
+{
+ queue_entry_t *queue = qentry_from_int(queue_from_ext(handle));
+
+ return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ?
+ (int)queue->s.param.sched.lock_count : -1;
+}
+
+static odp_queue_t queue_create(const char *name,
+ const odp_queue_param_t *param)
+{
+ int queue_idx;
+ odp_queue_t handle = ODP_QUEUE_INVALID;
+ queue_entry_t *queue;
+ odp_queue_param_t default_param;
+
+ if (param == NULL) {
+ odp_queue_param_init(&default_param);
+ param = &default_param;
+ }
+
+ for (queue_idx = 0; queue_idx < ODP_CONFIG_QUEUES; queue_idx++) {
+ queue = &queue_tbl->queue[queue_idx];
+
+ if (queue->s.status != QUEUE_STATUS_FREE)
+ continue;
+
+ LOCK(&queue->s.lock);
+ if (queue->s.status == QUEUE_STATUS_FREE) {
+ if (queue_init(queue, name, param)) {
+ UNLOCK(&queue->s.lock);
+ return handle;
+ }
+ queue->s.status = QUEUE_STATUS_READY;
+ handle = queue->s.handle;
+ UNLOCK(&queue->s.lock);
+ break;
+ }
+ UNLOCK(&queue->s.lock);
+ }
+ return handle;
+}
+
+static int queue_destroy(odp_queue_t handle)
+{
+ queue_entry_t *queue;
+ sched_elem_t *q;
+
+ if (handle == ODP_QUEUE_INVALID)
+ return -1;
+
+ queue = qentry_from_int(queue_from_ext(handle));
+ LOCK(&queue->s.lock);
+ if (queue->s.status != QUEUE_STATUS_READY) {
+ UNLOCK(&queue->s.lock);
+ return -1;
+ }
+ q = &queue->s.sched_elem;
+
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&q->qschlock);
+#endif
+ if (_odp_queue_disable_enq(q)) {
+ /* Producer side not empty */
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&q->qschlock);
+#endif
+ UNLOCK(&queue->s.lock);
+ return -1;
+ }
+ /* Enqueue is now disabled */
+ if (q->cons_read != q->cons_write) {
+ /* Consumer side is not empty
+ * Roll back previous change, enable enqueue again.
+ */
+ uint32_t size;
+
+ size = q->prod_mask + 1;
+ __atomic_fetch_sub(&q->prod_write, size, __ATOMIC_RELAXED);
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&q->qschlock);
+#endif
+ UNLOCK(&queue->s.lock);
+ return -1;
+ }
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&q->qschlock);
+#endif
+ /* Producer and consumer sides empty, enqueue disabled
+ * Now wait until schedq state is empty and no outstanding tickets
+ */
+ while (__atomic_load_n(&q->qschst.numevts, __ATOMIC_RELAXED) != 0 ||
+ __atomic_load_n(&q->qschst.cur_ticket, __ATOMIC_RELAXED) !=
+ __atomic_load_n(&q->qschst.nxt_ticket, __ATOMIC_RELAXED)) {
+ sevl();
+ while (wfe() && monitor32((uint32_t *)&q->qschst.numevts,
+ __ATOMIC_RELAXED) != 0)
+ doze();
+ }
+
+ /* Adjust the spread factor for the queues in the schedule group */
+ if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
+ sched_group_xcount_dec(queue->s.param.sched.group,
+ queue->s.param.sched.prio);
+
+ _odp_ishm_pool_free(queue_shm_pool, q->prod_ring);
+
+ if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+ if (rwin_free(queue_shm_pool, q->rwin) < 0) {
+ ODP_ERR("Failed to free reorder window\n");
+ UNLOCK(&queue->s.lock);
+ return -1;
+ }
+ }
+ queue->s.status = QUEUE_STATUS_FREE;
+ UNLOCK(&queue->s.lock);
+ return 0;
+}
+
+static int queue_context_set(odp_queue_t handle, void *context,
+ uint32_t len ODP_UNUSED)
+{
+ odp_mb_full();
+ qentry_from_int(queue_from_ext(handle))->s.param.context = context;
+ odp_mb_full();
+ return 0;
+}
+
+static void *queue_context(odp_queue_t handle)
+{
+ return qentry_from_int(queue_from_ext(handle))->s.param.context;
+}
+
+static odp_queue_t queue_lookup(const char *name)
+{
+ uint32_t i;
+
+ for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
+ queue_entry_t *queue = &queue_tbl->queue[i];
+
+ if (queue->s.status == QUEUE_STATUS_FREE ||
+ queue->s.status == QUEUE_STATUS_DESTROYED)
+ continue;
+
+ LOCK(&queue->s.lock);
+ if (strcmp(name, queue->s.name) == 0) {
+ /* found it */
+ UNLOCK(&queue->s.lock);
+ return queue->s.handle;
+ }
+ UNLOCK(&queue->s.lock);
+ }
+
+ return ODP_QUEUE_INVALID;
+}
+
+#ifndef CONFIG_QSCHST_LOCK
+static inline int _odp_queue_enq(sched_elem_t *q,
+ odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ ringidx_t old_read;
+ ringidx_t old_write;
+ ringidx_t new_write;
+ int actual;
+ uint32_t mask;
+ odp_buffer_hdr_t **ring;
+
+ mask = q->prod_mask;
+ ring = q->prod_ring;
+
+ /* Load producer ring state (read & write index) */
+ old_write = __atomic_load_n(&q->prod_write, __ATOMIC_RELAXED);
+ do {
+ /* Consumer does store-release prod_read, we need
+ * load-acquire.
+ */
+ old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
+
+ actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
+ if (odp_unlikely(actual <= 0))
+ return 0;
+
+ new_write = old_write + actual;
+ } while (!__atomic_compare_exchange_n(&q->prod_write,
+ &old_write, /* Updated on failure */
+ new_write,
+ true,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+
+#ifdef CONFIG_SPLIT_PRODCONS
+ __builtin_prefetch(&q->cons_write, 0, 0);
+#endif
+ /* Store our event(s) in the ring */
+ do {
+ ring[old_write & mask] = *buf_hdr++;
+ } while (++old_write != new_write);
+ old_write -= actual;
+
+#ifdef CONFIG_SPLIT_PRODCONS
+ __builtin_prefetch(&q->node, 1, 0);
+#endif
+ /* Wait for our turn to signal consumers */
+ if (odp_unlikely(__atomic_load_n(&q->cons_write,
+ __ATOMIC_RELAXED) != old_write)) {
+ sevl();
+ while (wfe() && monitor32(&q->cons_write,
+ __ATOMIC_RELAXED) != old_write)
+ doze();
+ }
+
+ /* Signal consumers that events are available (release events)
+ * Enable other producers to continue
+ */
+ /* Wait for writes (to ring slots) to complete */
+ atomic_store_release(&q->cons_write, new_write, /*readonly=*/false);
+
+ return actual;
+}
+
+#else
+
+static inline int _odp_queue_enq_sp(sched_elem_t *q,
+ odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ ringidx_t old_read;
+ ringidx_t old_write;
+ ringidx_t new_write;
+ int actual;
+ uint32_t mask;
+ odp_buffer_hdr_t **ring;
+
+ mask = q->prod_mask;
+ ring = q->prod_ring;
+
+ /* Load producer ring state (read & write index) */
+ old_write = q->prod_write;
+ /* Consumer does store-release prod_read, we need load-acquire */
+ old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
+ actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
+ if (odp_unlikely(actual <= 0))
+ return 0;
+
+ new_write = old_write + actual;
+ q->prod_write = new_write;
+
+ /* Store our event(s) in the ring */
+ do {
+ ring[old_write & mask] = *buf_hdr++;
+ } while (++old_write != new_write);
+ old_write -= actual;
+
+#ifdef CONFIG_SPLIT_PRODCONS
+ __builtin_prefetch(&q->node, 1, 0);
+#endif
+
+ /* Signal consumers that events are available (release events)
+ * Enable other producers to continue
+ */
+#ifdef CONFIG_QSCHST_LOCK
+ q->cons_write = new_write;
+#else
+ atomic_store_release(&q->cons_write, new_write, /*readonly=*/false);
+#endif
+
+ return actual;
+}
+#endif
+
+static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ int actual;
+ queue_entry_t *queue;
+ sched_scalable_thread_state_t *ts;
+
+ queue = qentry_from_int(handle);
+ ts = sched_ts;
+ if (ts && odp_unlikely(ts->out_of_order)) {
+ actual = rctx_save(queue, buf_hdr, num);
+ return actual;
+ }
+
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&queue->s.sched_elem.qschlock);
+ actual = _odp_queue_enq_sp(&queue->s.sched_elem, buf_hdr, num);
+#else
+ actual = _odp_queue_enq(&queue->s.sched_elem, buf_hdr, num);
+#endif
+
+ if (odp_likely(queue->s.sched_elem.schedq != NULL && actual != 0)) {
+ /* Perform scheduler related updates. */
+#ifdef CONFIG_QSCHST_LOCK
+ sched_update_enq_sp(&queue->s.sched_elem, actual);
+#else
+ sched_update_enq(&queue->s.sched_elem, actual);
+#endif
+ }
+
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&queue->s.sched_elem.qschlock);
+#endif
+ return actual;
+}
+
+static int _queue_enq(queue_t handle, odp_buffer_hdr_t *buf_hdr)
+{
+ return odp_likely(
+ _queue_enq_multi(handle, &buf_hdr, 1) == 1) ? 0 : -1;
+}
+
+static int queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
+{
+ odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
+ queue_entry_t *queue;
+ int i;
+
+ if (num > QUEUE_MULTI_MAX)
+ num = QUEUE_MULTI_MAX;
+
+ queue = qentry_from_int(queue_from_ext(handle));
+
+ for (i = 0; i < num; i++)
+ buf_hdr[i] = buf_hdl_to_hdr(odp_buffer_from_event(ev[i]));
+
+ return queue->s.enqueue_multi(qentry_to_int(queue), buf_hdr, num);
+}
+
+static int queue_enq(odp_queue_t handle, odp_event_t ev)
+{
+ odp_buffer_hdr_t *buf_hdr;
+ queue_entry_t *queue;
+
+ queue = qentry_from_int(queue_from_ext(handle));
+ buf_hdr = buf_hdl_to_hdr(odp_buffer_from_event(ev));
+
+ return queue->s.enqueue(qentry_to_int(queue), buf_hdr);
+}
+
+/* Single-consumer dequeue. */
+int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num)
+{
+ int actual;
+ ringidx_t old_read;
+ ringidx_t old_write;
+ ringidx_t new_read;
+ uint32_t mask;
+ odp_buffer_hdr_t **ring;
+
+ /* Load consumer ring state (read & write index). */
+ old_read = q->cons_read;
+ /* Producer does store-release cons_write, we need load-acquire */
+ old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE);
+ actual = MIN(num, (int)(old_write - old_read));
+
+ if (odp_unlikely(actual <= 0))
+ return 0;
+
+#ifdef CONFIG_SPLIT_PRODCONS
+ __builtin_prefetch(&q->node, 1, 0);
+#endif
+ new_read = old_read + actual;
+ q->cons_read = new_read;
+
+ mask = q->cons_mask;
+ ring = q->cons_ring;
+ do {
+ *evp++ = odp_buffer_to_event(
+ odp_hdr_to_buf(ring[old_read & mask]));
+ } while (++old_read != new_read);
+
+ /* Signal producers that empty slots are available
+ * (release ring slots). Enable other consumers to continue.
+ */
+#ifdef CONFIG_QSCHST_LOCK
+ q->prod_read = new_read;
+#else
+ /* Wait for loads (from ring slots) to complete. */
+ atomic_store_release(&q->prod_read, new_read, /*readonly=*/true);
+#endif
+ return actual;
+}
+
+inline int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num)
+{
+ int actual;
+ ringidx_t old_read;
+ ringidx_t old_write;
+ ringidx_t new_read;
+ uint32_t mask;
+ odp_buffer_hdr_t **ring;
+ odp_buffer_hdr_t **p_buf_hdr;
+
+ mask = q->cons_mask;
+ ring = q->cons_ring;
+
+ /* Load consumer ring state (read & write index) */
+ old_read = __atomic_load_n(&q->cons_read, __ATOMIC_RELAXED);
+ do {
+ /* Need __atomic_load to avoid compiler reordering
+ * Producer does store-release cons_write, we need
+ * load-acquire.
+ */
+ old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE);
+ /* Prefetch ring buffer array */
+ __builtin_prefetch(&q->cons_ring[old_read & mask], 0, 0);
+
+ actual = MIN(num, (int)(old_write - old_read));
+ if (odp_unlikely(actual <= 0))
+ return 0;
+
+ /* Attempt to free ring slot(s) */
+ new_read = old_read + actual;
+ } while (!__atomic_compare_exchange_n(&q->cons_read,
+ &old_read, /* Updated on failure */
+ new_read,
+ true,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+#ifdef CONFIG_SPLIT_PRODCONS
+ __builtin_prefetch(&q->prod_read, 0, 0);
+#endif
+ p_buf_hdr = buf_hdr;
+ do {
+ *p_buf_hdr++ = ring[old_read & mask];
+ } while (++old_read != new_read);
+ old_read -= actual;
+
+#ifdef CONFIG_SPLIT_PRODCONS
+ __builtin_prefetch(&q->node, 1, 0);
+#endif
+ /* Wait for our turn to signal producers */
+ if (odp_unlikely(__atomic_load_n(&q->prod_read, __ATOMIC_RELAXED) !=
+ old_read)) {
+ sevl();
+ while (wfe() && monitor32(&q->prod_read,
+ __ATOMIC_RELAXED) != old_read)
+ doze();
+ }
+
+ /* Signal producers that empty slots are available
+ * (release ring slots)
+ * Enable other consumers to continue
+ */
+ /* Wait for loads (from ring slots) to complete */
+ atomic_store_release(&q->prod_read, new_read, /*readonly=*/true);
+
+ return actual;
+}
+
+inline int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num)
+{
+ int ret, evt_idx;
+ odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
+
+ if (num > QUEUE_MULTI_MAX)
+ num = QUEUE_MULTI_MAX;
+
+ ret = _odp_queue_deq(q, hdr_tbl, num);
+ if (odp_likely(ret != 0)) {
+ for (evt_idx = 0; evt_idx < num; evt_idx++)
+ evp[evt_idx] = odp_buffer_to_event(
+ odp_hdr_to_buf(hdr_tbl[evt_idx]));
+ }
+
+ return ret;
+}
+
+static int _queue_deq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ sched_elem_t *q;
+ queue_entry_t *queue;
+
+ queue = qentry_from_int(handle);
+ q = &queue->s.sched_elem;
+ return _odp_queue_deq(q, buf_hdr, num);
+}
+
+static odp_buffer_hdr_t *_queue_deq(queue_t handle)
+{
+ sched_elem_t *q;
+ odp_buffer_hdr_t *buf_hdr;
+ queue_entry_t *queue;
+
+ queue = qentry_from_int(handle);
+ q = &queue->s.sched_elem;
+ if (_odp_queue_deq(q, &buf_hdr, 1) == 1)
+ return buf_hdr;
+ else
+ return NULL;
+}
+
+static int queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
+{
+ queue_entry_t *queue;
+ odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
+ int i, ret;
+
+ if (num > QUEUE_MULTI_MAX)
+ num = QUEUE_MULTI_MAX;
+
+ queue = qentry_from_int(queue_from_ext(handle));
+
+ ret = queue->s.dequeue_multi(qentry_to_int(queue), buf_hdr, num);
+
+ for (i = 0; i < ret; i++)
+ events[i] = odp_buffer_to_event(buf_hdr[i]->handle.handle);
+
+ return ret;
+}
+
+static odp_event_t queue_deq(odp_queue_t handle)
+{
+ queue_entry_t *queue;
+ odp_buffer_hdr_t *buf_hdr;
+
+ queue = qentry_from_int(queue_from_ext(handle));
+ buf_hdr = queue->s.dequeue(qentry_to_int(queue));
+
+ if (buf_hdr)
+ return odp_buffer_to_event(buf_hdr->handle.handle);
+
+ return ODP_EVENT_INVALID;
+}
+
+static void queue_param_init(odp_queue_param_t *params)
+{
+ memset(params, 0, sizeof(odp_queue_param_t));
+ params->type = ODP_QUEUE_TYPE_PLAIN;
+ params->enq_mode = ODP_QUEUE_OP_MT;
+ params->deq_mode = ODP_QUEUE_OP_MT;
+ params->sched.prio = ODP_SCHED_PRIO_DEFAULT;
+ params->sched.sync = ODP_SCHED_SYNC_PARALLEL;
+ params->sched.group = ODP_SCHED_GROUP_ALL;
+}
+
+static int queue_info(odp_queue_t handle, odp_queue_info_t *info)
+{
+ uint32_t queue_id;
+ queue_entry_t *queue;
+ int status;
+
+ if (odp_unlikely(info == NULL)) {
+ ODP_ERR("Unable to store info, NULL ptr given\n");
+ return -1;
+ }
+
+ queue_id = queue_to_id(handle);
+
+ if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) {
+ ODP_ERR("Invalid queue handle:%" PRIu64 "\n",
+ odp_queue_to_u64(handle));
+ return -1;
+ }
+
+ queue = get_qentry(queue_id);
+
+ LOCK(&queue->s.lock);
+ status = queue->s.status;
+
+ if (odp_unlikely(status == QUEUE_STATUS_FREE ||
+ status == QUEUE_STATUS_DESTROYED)) {
+ UNLOCK(&queue->s.lock);
+ ODP_ERR("Invalid queue status:%d\n", status);
+ return -1;
+ }
+
+ info->name = queue->s.name;
+ info->param = queue->s.param;
+
+ UNLOCK(&queue->s.lock);
+
+ return 0;
+}
+
+static uint64_t queue_to_u64(odp_queue_t hdl)
+{
+ return _odp_pri(hdl);
+}
+
+static odp_pktout_queue_t queue_get_pktout(queue_t handle)
+{
+ return qentry_from_int(handle)->s.pktout;
+}
+
+static void queue_set_pktout(queue_t handle, odp_pktio_t pktio, int index)
+{
+ qentry_from_int(handle)->s.pktout.pktio = pktio;
+ qentry_from_int(handle)->s.pktout.index = index;
+}
+
+static odp_pktin_queue_t queue_get_pktin(queue_t handle)
+{
+ return qentry_from_int(handle)->s.pktin;
+}
+
+static void queue_set_pktin(queue_t handle, odp_pktio_t pktio, int index)
+{
+ qentry_from_int(handle)->s.pktin.pktio = pktio;
+ qentry_from_int(handle)->s.pktin.index = index;
+}
+
+static void queue_set_enq_func(queue_t handle, queue_enq_fn_t func)
+{
+ qentry_from_int(handle)->s.enqueue = func;
+}
+
+static void queue_set_enq_multi_func(queue_t handle, queue_enq_multi_fn_t func)
+{
+ qentry_from_int(handle)->s.enqueue_multi = func;
+}
+
+static void queue_set_deq_func(queue_t handle, queue_deq_fn_t func)
+{
+ qentry_from_int(handle)->s.dequeue = func;
+}
+
+static void queue_set_deq_multi_func(queue_t handle, queue_deq_multi_fn_t func)
+{
+ qentry_from_int(handle)->s.dequeue_multi = func;
+}
+
+static void queue_set_type(queue_t handle, odp_queue_type_t type)
+{
+ qentry_from_int(handle)->s.type = type;
+}
+
+static queue_t queue_from_ext(odp_queue_t handle)
+{
+ uint32_t queue_id;
+
+ queue_id = queue_to_id(handle);
+ return qentry_to_int(get_qentry(queue_id));
+}
+
+static odp_queue_t queue_to_ext(queue_t handle)
+{
+ return qentry_from_int(handle)->s.handle;
+}
+
+/* API functions */
+queue_api_t queue_scalable_api = {
+ .queue_create = queue_create,
+ .queue_destroy = queue_destroy,
+ .queue_lookup = queue_lookup,
+ .queue_capability = queue_capability,
+ .queue_context_set = queue_context_set,
+ .queue_context = queue_context,
+ .queue_enq = queue_enq,
+ .queue_enq_multi = queue_enq_multi,
+ .queue_deq = queue_deq,
+ .queue_deq_multi = queue_deq_multi,
+ .queue_type = queue_type,
+ .queue_sched_type = queue_sched_type,
+ .queue_sched_prio = queue_sched_prio,
+ .queue_sched_group = queue_sched_group,
+ .queue_lock_count = queue_lock_count,
+ .queue_to_u64 = queue_to_u64,
+ .queue_param_init = queue_param_init,
+ .queue_info = queue_info
+};
+
+/* Functions towards internal components */
+queue_fn_t queue_scalable_fn = {
+ .init_global = queue_init_global,
+ .term_global = queue_term_global,
+ .init_local = queue_init_local,
+ .term_local = queue_term_local,
+ .from_ext = queue_from_ext,
+ .to_ext = queue_to_ext,
+ .enq = _queue_enq,
+ .enq_multi = _queue_enq_multi,
+ .deq = _queue_deq,
+ .deq_multi = _queue_deq_multi,
+ .get_pktout = queue_get_pktout,
+ .set_pktout = queue_set_pktout,
+ .get_pktin = queue_get_pktin,
+ .set_pktin = queue_set_pktin,
+ .set_enq_fn = queue_set_enq_func,
+ .set_enq_multi_fn = queue_set_enq_multi_func,
+ .set_deq_fn = queue_set_deq_func,
+ .set_deq_multi_fn = queue_set_deq_multi_func,
+ .set_type = queue_set_type
+};
diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c
index a9ede98d3..2f07aafe9 100644
--- a/platform/linux-generic/odp_schedule_if.c
+++ b/platform/linux-generic/odp_schedule_if.c
@@ -15,12 +15,18 @@ extern const schedule_api_t schedule_default_api;
extern const schedule_fn_t schedule_iquery_fn;
extern const schedule_api_t schedule_iquery_api;
+extern const schedule_fn_t schedule_scalable_fn;
+extern const schedule_api_t schedule_scalable_api;
+
#ifdef ODP_SCHEDULE_SP
const schedule_fn_t *sched_fn = &schedule_sp_fn;
const schedule_api_t *sched_api = &schedule_sp_api;
#elif defined(ODP_SCHEDULE_IQUERY)
const schedule_fn_t *sched_fn = &schedule_iquery_fn;
const schedule_api_t *sched_api = &schedule_iquery_api;
+#elif defined(ODP_SCHEDULE_SCALABLE)
+const schedule_fn_t *sched_fn = &schedule_scalable_fn;
+const schedule_api_t *sched_api = &schedule_scalable_api;
#else
const schedule_fn_t *sched_fn = &schedule_default_fn;
const schedule_api_t *sched_api = &schedule_default_api;
diff --git a/platform/linux-generic/odp_schedule_scalable.c b/platform/linux-generic/odp_schedule_scalable.c
new file mode 100644
index 000000000..78159b530
--- /dev/null
+++ b/platform/linux-generic/odp_schedule_scalable.c
@@ -0,0 +1,1980 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <odp/api/align.h>
+#include <odp/api/atomic.h>
+#include <odp/api/cpu.h>
+#include <odp/api/hints.h>
+#include <odp/api/schedule.h>
+#include <odp/api/shared_memory.h>
+#include <odp/api/sync.h>
+#include <odp/api/thread.h>
+#include <odp/api/thrmask.h>
+#include <odp/api/time.h>
+
+#include <odp_internal.h>
+#include <odp_config_internal.h>
+#include <odp_debug_internal.h>
+#include <_ishm_internal.h>
+#include <_ishmpool_internal.h>
+
+#include <odp_align_internal.h>
+#include <odp_buffer_inlines.h>
+#include <odp_llqueue.h>
+#include <odp_queue_scalable_internal.h>
+#include <odp_schedule_if.h>
+#include <odp_bitset.h>
+#include <odp_packet_io_internal.h>
+
+#include <limits.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <odp/api/plat/ticketlock_inlines.h>
+#define LOCK(a) _odp_ticketlock_lock((a))
+#define UNLOCK(a) _odp_ticketlock_unlock((a))
+
+#define TAG_EMPTY 0U
+#define TAG_USED (1U << 15)
+#define TAG_BUSY (1U << 31)
+#define PKTIO_QUEUE_2_TAG(p, q) ((p) << 16 | (q) | TAG_USED)
+#define TAG_2_PKTIO(t) (((t) >> 16) & 0x7FFF)
+#define TAG_2_QUEUE(t) ((t) & 0x7FFF)
+#define TAG_IS_READY(t) (((t) & (TAG_USED | TAG_BUSY)) == TAG_USED)
+#define PKTIN_MAX (ODP_CONFIG_PKTIO_ENTRIES * PKTIO_MAX_QUEUES)
+#define MAXTHREADS ATOM_BITSET_SIZE
+
+static _odp_ishm_pool_t *sched_shm_pool;
+static uint32_t pktin_num;
+static uint32_t pktin_hi;
+static uint16_t pktin_count[ODP_CONFIG_PKTIO_ENTRIES];
+static uint32_t pktin_tags[PKTIN_MAX] ODP_ALIGNED_CACHE;
+
+#define __atomic_fetch_max(var, v, mo) do { \
+ /* Evalulate 'v' once */ \
+ __typeof__(v) tmp_v = (v); \
+ __typeof__(*var) old_var = \
+ __atomic_load_n((var), __ATOMIC_RELAXED); \
+ while (tmp_v > old_var) { \
+ /* Attempt to store 'v' in '*var' */ \
+ if (__atomic_compare_exchange_n((var), &old_var, \
+ tmp_v, true, (mo), \
+ (mo))) \
+ break; \
+ } \
+ /* v <= old_var, nothing to do */ \
+ } while (0)
+
+ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 1),
+ "lowest_prio_does_not_match_with_num_prios");
+
+ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
+ (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 1)),
+ "normal_prio_is_not_between_highest_and_lowest");
+
+ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
+ "Number_of_queues_is_not_power_of_two");
+
+/*
+ * Scheduler group related variables.
+ */
+/* Currently used scheduler groups */
+static sched_group_mask_t sg_free;
+static sched_group_t *sg_vec[MAX_SCHED_GROUP];
+/* Group lock for MT-safe APIs */
+odp_spinlock_t sched_grp_lock;
+
+#define SCHED_GROUP_JOIN 0
+#define SCHED_GROUP_LEAVE 1
+
+/*
+ * Per thread state
+ */
+static sched_scalable_thread_state_t thread_state[MAXTHREADS];
+__thread sched_scalable_thread_state_t *sched_ts;
+
+/*
+ * Forward declarations.
+ */
+static int thread_state_init(int tidx)
+{
+ sched_scalable_thread_state_t *ts;
+ uint32_t i;
+
+ ODP_ASSERT(tidx < MAXTHREADS);
+ ts = &thread_state[tidx];
+ ts->atomq = NULL;
+ ts->rctx = NULL;
+ ts->pause = false;
+ ts->out_of_order = false;
+ ts->tidx = tidx;
+ ts->dequeued = 0;
+ ts->pktin_next = 0;
+ ts->pktin_poll_cnts = 0;
+ ts->ticket = TICKET_INVALID;
+ ts->priv_rvec_free = 0;
+ ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1;
+ ts->num_schedq = 0;
+ ts->sg_sem = 1; /* Start with sched group semaphore changed */
+ memset(ts->sg_actual, 0, sizeof(ts->sg_actual));
+ for (i = 0; i < TS_RVEC_SIZE; i++) {
+ ts->rvec[i].rvec_free = &ts->rvec_free;
+ ts->rvec[i].idx = i;
+ }
+ sched_ts = ts;
+
+ return 0;
+}
+
+static void insert_schedq_in_list(sched_scalable_thread_state_t *ts,
+ sched_queue_t *schedq)
+{
+ /* Find slot for schedq */
+ for (uint32_t i = 0; i < ts->num_schedq; i++) {
+ /* Lower value is higher priority and closer to start of list */
+ if (schedq->prio <= ts->schedq_list[i]->prio) {
+ /* This is the slot! */
+ sched_queue_t *tmp;
+
+ tmp = ts->schedq_list[i];
+ ts->schedq_list[i] = schedq;
+ schedq = tmp;
+ /* Continue the insertion procedure with the
+ * new schedq.
+ */
+ }
+ }
+ if (ts->num_schedq == SCHEDQ_PER_THREAD)
+ ODP_ABORT("Too many schedqs\n");
+ ts->schedq_list[ts->num_schedq++] = schedq;
+}
+
+static void remove_schedq_from_list(sched_scalable_thread_state_t *ts,
+ sched_queue_t *schedq)
+{
+ /* Find schedq */
+ for (uint32_t i = 0; i < ts->num_schedq; i++)
+ if (ts->schedq_list[i] == schedq) {
+ /* Move remaining schedqs */
+ for (uint32_t j = i + 1; j < ts->num_schedq; j++)
+ ts->schedq_list[j - 1] = ts->schedq_list[j];
+ ts->num_schedq--;
+ return;
+ }
+ ODP_ABORT("Cannot find schedq\n");
+}
+
+/*******************************************************************************
+ * Scheduler queues
+ ******************************************************************************/
+#ifndef odp_container_of
+#define odp_container_of(pointer, type, member) \
+ ((type *)(void *)(((char *)pointer) - offsetof(type, member)))
+#endif
+
+static inline void schedq_init(sched_queue_t *schedq, uint32_t prio)
+{
+ llqueue_init(&schedq->llq);
+ schedq->prio = prio;
+}
+
+static inline sched_elem_t *schedq_peek(sched_queue_t *schedq)
+{
+ struct llnode *ptr;
+
+ ptr = llq_head(&schedq->llq);
+ return odp_container_of(ptr, sched_elem_t, node);
+}
+
+static inline odp_bool_t schedq_cond_pop(sched_queue_t *schedq,
+ sched_elem_t *elem)
+{
+ return llq_dequeue_cond(&schedq->llq, &elem->node);
+}
+
+static inline void schedq_push(sched_queue_t *schedq, sched_elem_t *elem)
+{
+ llq_enqueue(&schedq->llq, &elem->node);
+}
+
+static inline odp_bool_t schedq_cond_rotate(sched_queue_t *schedq,
+ sched_elem_t *elem)
+{
+ return llq_cond_rotate(&schedq->llq, &elem->node);
+}
+
+static inline bool schedq_elem_on_queue(sched_elem_t *elem)
+{
+ return llq_on_queue(&elem->node);
+}
+
+/*******************************************************************************
+ * Shared metadata btwn scheduler and queue
+ ******************************************************************************/
+
+void sched_update_enq(sched_elem_t *q, uint32_t actual)
+{
+ qschedstate_t oss, nss;
+ uint32_t ticket;
+
+ oss = q->qschst;
+ /* Update event counter, optionally taking a ticket. */
+ do {
+ ticket = TICKET_INVALID;
+ nss = oss;
+ nss.numevts += actual;
+ if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0))
+ /* E -> NE transition */
+ if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC ||
+ oss.cur_ticket == oss.nxt_ticket)
+ /* Parallel or ordered queues: always take
+ * ticket.
+ * Atomic queue: only take ticket if one is
+ * immediately available.
+ * Otherwise ticket already taken => queue
+ * processed by some thread.
+ */
+ ticket = nss.nxt_ticket++;
+ /* Else queue already was non-empty. */
+ /* Attempt to update numevts counter and optionally take ticket. */
+ } while (!__atomic_compare_exchange(
+ &q->qschst, &oss, &nss,
+ true, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if (odp_unlikely(ticket != TICKET_INVALID)) {
+ /* Wait for our turn to update schedq. */
+ if (odp_unlikely(
+ __atomic_load_n(&q->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)) {
+ sevl();
+ while (wfe() &&
+ monitor8(&q->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)
+ doze();
+ }
+ /* Enqueue at end of scheduler queue */
+ /* We are here because of empty-to-non-empty transition
+ * This means queue must be pushed to schedq if possible
+ * but we can't do that if it already is on the schedq
+ */
+ if (odp_likely(!schedq_elem_on_queue(q) &&
+ q->pop_deficit == 0)) {
+ /* Queue not already on schedq and no pop deficit means
+ * we can push queue to schedq */
+ schedq_push(q->schedq, q);
+ } else {
+ /* Missed push => cancels one missed pop */
+ q->pop_deficit--;
+ }
+ atomic_store_release(&q->qschst.cur_ticket, ticket + 1,
+ /*readonly=*/false);
+ }
+ /* Else queue was not empty or atomic queue already busy. */
+}
+
+void sched_update_enq_sp(sched_elem_t *q, uint32_t actual)
+{
+ qschedstate_t oss, nss;
+ uint32_t ticket;
+
+ oss = q->qschst;
+ /* Update event counter, optionally taking a ticket. */
+ ticket = TICKET_INVALID;
+ nss = oss;
+ nss.numevts += actual;
+ if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) {
+ /* E -> NE transition */
+ if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC ||
+ oss.cur_ticket == oss.nxt_ticket) {
+ /* Parallel or ordered queues: always take
+ * ticket.
+ * Atomic queue: only take ticket if one is
+ * immediately available. Otherwise ticket already
+ * taken => queue owned/processed by some thread
+ */
+ ticket = nss.nxt_ticket++;
+ }
+ }
+ /* Else queue already was non-empty. */
+ /* Attempt to update numevts counter and optionally take ticket. */
+ q->qschst = nss;
+
+ if (odp_unlikely(ticket != TICKET_INVALID)) {
+ /* Enqueue at end of scheduler queue */
+ /* We are here because of empty-to-non-empty transition
+ * This means queue must be pushed to schedq if possible
+ * but we can't do that if it already is on the schedq
+ */
+ if (odp_likely(!schedq_elem_on_queue(q) &&
+ q->pop_deficit == 0)) {
+ /* Queue not already on schedq and no pop deficit means
+ * we can push queue to schedq */
+ schedq_push(q->schedq, q);
+ } else {
+ /* Missed push => cancels one missed pop */
+ q->pop_deficit--;
+ }
+ q->qschst.cur_ticket = ticket + 1;
+ }
+ /* Else queue was not empty or atomic queue already busy. */
+}
+
+#ifndef CONFIG_QSCHST_LOCK
+/* The scheduler is the only entity that performs the dequeue from a queue. */
+static void
+sched_update_deq(sched_elem_t *q,
+ uint32_t actual,
+ bool atomic) __attribute__((always_inline));
+static inline void
+sched_update_deq(sched_elem_t *q,
+ uint32_t actual, bool atomic)
+{
+ qschedstate_t oss, nss;
+ uint32_t ticket;
+
+ if (atomic) {
+ bool pushed = false;
+
+ /* We own this atomic queue, only we can dequeue from it and
+ * thus decrease numevts. Other threads may enqueue and thus
+ * increase numevts.
+ * This means that numevts can't unexpectedly become 0 and
+ * invalidate a push operation already performed
+ */
+ oss = q->qschst;
+ do {
+ ODP_ASSERT(oss.cur_ticket == sched_ts->ticket);
+ nss = oss;
+ nss.numevts -= actual;
+ if (nss.numevts > 0 && !pushed) {
+ schedq_push(q->schedq, q);
+ pushed = true;
+ }
+ /* Attempt to release ticket expecting our view of
+ * numevts to be correct
+ * Unfortunately nxt_ticket will also be included in
+ * the CAS operation
+ */
+ nss.cur_ticket = sched_ts->ticket + 1;
+ } while (odp_unlikely(!__atomic_compare_exchange(
+ &q->qschst,
+ &oss, &nss,
+ true,
+ __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED)));
+ return;
+ }
+
+ oss = q->qschst;
+ do {
+ ticket = TICKET_INVALID;
+ nss = oss;
+ nss.numevts -= actual;
+ nss.wrr_budget -= actual;
+ if ((oss.numevts > 0 && nss.numevts <= 0) ||
+ oss.wrr_budget <= actual) {
+ /* If we have emptied parallel/ordered queue or
+ * exchausted its WRR budget, we need a ticket
+ * for a later pop.
+ */
+ ticket = nss.nxt_ticket++;
+ /* Reset wrr_budget as we might also push the
+ * queue to the schedq.
+ */
+ nss.wrr_budget = CONFIG_WRR_WEIGHT;
+ }
+ /* Attempt to update numevts and optionally take ticket. */
+ } while (!__atomic_compare_exchange(
+ &q->qschst, &oss, &nss,
+ true, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if (odp_unlikely(ticket != TICKET_INVALID)) {
+ ODP_ASSERT(q->qschst_type != ODP_SCHED_SYNC_ATOMIC);
+ /* Wait for our turn to update schedq. */
+ if (odp_unlikely(
+ __atomic_load_n(&q->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)) {
+ sevl();
+ while (wfe() &&
+ monitor8(&q->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)
+ doze();
+ }
+ /* We are here because of non-empty-to-empty transition or
+ * WRR budget exhausted
+ * This means the queue must be popped from the schedq, now or
+ * later
+ * If there was no NE->E transition but instead the WRR budget
+ * was exhausted, the queue needs to be moved (popped and
+ * pushed) to the tail of the schedq
+ */
+ if (oss.numevts > 0 && nss.numevts <= 0) {
+ /* NE->E transition, need to pop */
+ if (!schedq_elem_on_queue(q) ||
+ !schedq_cond_pop(q->schedq, q)) {
+ /* Queue not at head, failed to dequeue
+ * Missed a pop.
+ */
+ q->pop_deficit++;
+ }
+ } else {
+ /* WRR budget exhausted
+ * Need to move queue to tail of schedq if possible
+ */
+ if (odp_likely(schedq_elem_on_queue(q))) {
+ /* Queue is on schedq, try to move it to
+ * the tail
+ */
+ (void)schedq_cond_rotate(q->schedq, q);
+ }
+ /* Else queue not on schedq or not at head of schedq
+ * No pop => no push
+ */
+ }
+ atomic_store_release(&q->qschst.cur_ticket, ticket + 1,
+ /*readonly=*/false);
+ }
+}
+#endif
+
+#ifdef CONFIG_QSCHST_LOCK
+static void
+sched_update_deq_sc(sched_elem_t *q,
+ uint32_t actual,
+ bool atomic) __attribute__((always_inline));
+static inline void
+sched_update_deq_sc(sched_elem_t *q,
+ uint32_t actual, bool atomic)
+{
+ qschedstate_t oss, nss;
+ uint32_t ticket;
+
+ if (atomic) {
+ ODP_ASSERT(q->qschst.cur_ticket == sched_ts->ticket);
+ ODP_ASSERT(q->qschst.cur_ticket != q->qschst.nxt_ticket);
+ q->qschst.numevts -= actual;
+ q->qschst.cur_ticket = sched_ts->ticket + 1;
+ if (q->qschst.numevts > 0)
+ schedq_push(q->schedq, q);
+ return;
+ }
+
+ oss = q->qschst;
+ ticket = TICKET_INVALID;
+ nss = oss;
+ nss.numevts -= actual;
+ nss.wrr_budget -= actual;
+ if ((oss.numevts > 0 && nss.numevts <= 0) || oss.wrr_budget <= actual) {
+ /* If we emptied the queue or
+ * if we have served the maximum number of events
+ * then we need a ticket for a later pop.
+ */
+ ticket = nss.nxt_ticket++;
+ /* Also reset wrr_budget as we might also push the
+ * queue to the schedq.
+ */
+ nss.wrr_budget = CONFIG_WRR_WEIGHT;
+ }
+ q->qschst = nss;
+
+ if (ticket != TICKET_INVALID) {
+ if (oss.numevts > 0 && nss.numevts <= 0) {
+ /* NE->E transition, need to pop */
+ if (!schedq_elem_on_queue(q) ||
+ !schedq_cond_pop(q->schedq, q)) {
+ /* Queue not at head, failed to dequeue.
+ * Missed a pop.
+ */
+ q->pop_deficit++;
+ }
+ } else {
+ /* WRR budget exhausted
+ * Need to move queue to tail of schedq if possible
+ */
+ if (odp_likely(schedq_elem_on_queue(q))) {
+ /* Queue is on schedq, try to move it to
+ * the tail
+ */
+ (void)schedq_cond_rotate(q->schedq, q);
+ }
+ /* Else queue not on schedq or not at head of schedq
+ * No pop => no push
+ */
+ }
+ q->qschst.cur_ticket = ticket + 1;
+ }
+}
+#endif
+
+static inline void sched_update_popd_sc(sched_elem_t *elem)
+{
+ if (elem->pop_deficit != 0 &&
+ schedq_elem_on_queue(elem) &&
+ schedq_cond_pop(elem->schedq, elem))
+ elem->pop_deficit--;
+}
+
+#ifndef CONFIG_QSCHST_LOCK
+static inline void sched_update_popd(sched_elem_t *elem)
+{
+ uint32_t ticket = __atomic_fetch_add(&elem->qschst.nxt_ticket,
+ 1,
+ __ATOMIC_RELAXED);
+ if (odp_unlikely(__atomic_load_n(&elem->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)) {
+ sevl();
+ while (wfe() && monitor8(&elem->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ticket)
+ doze();
+ }
+ sched_update_popd_sc(elem);
+ atomic_store_release(&elem->qschst.cur_ticket, ticket + 1,
+ /*readonly=*/false);
+}
+#endif
+
+sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ uint32_t x;
+
+ ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP);
+ ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
+ ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
+
+ sgi = grp;
+ sg = sg_vec[sgi];
+
+ /* Use xcount to spread queues over the xfactor schedq's
+ * per priority.
+ */
+ x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
+ if (x == 0) {
+ /* First ODP queue for this priority
+ * Notify all threads in sg->thr_wanted that they
+ * should join.
+ */
+ sched_group_mask_t thrds = sg->thr_wanted;
+
+ while (!bitset_is_null(thrds)) {
+ uint32_t thr;
+
+ thr = bitset_ffs(thrds) - 1;
+ thrds = bitset_clr(thrds, thr);
+ /* Notify the thread about membership in this
+ * group/priority.
+ */
+ atom_bitset_set(&thread_state[thr].sg_wanted[prio],
+ sgi, __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem, 1,
+ __ATOMIC_RELEASE);
+ }
+ }
+ return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
+}
+
+void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ uint32_t x;
+
+ ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP);
+ ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
+ ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
+
+ sgi = grp;
+ sg = sg_vec[sgi];
+ x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
+
+ if (x == 0) {
+ /* Last ODP queue for this priority
+ * Notify all threads in sg->thr_wanted that they
+ * should leave.
+ */
+ sched_group_mask_t thrds = sg->thr_wanted;
+
+ while (!bitset_is_null(thrds)) {
+ uint32_t thr;
+
+ thr = bitset_ffs(thrds) - 1;
+ thrds = bitset_clr(thrds, thr);
+ /* Notify the thread about membership in this
+ * group/priority.
+ */
+ atom_bitset_clr(&thread_state[thr].sg_wanted[prio],
+ sgi, __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem, 1,
+ __ATOMIC_RELEASE);
+ }
+ }
+}
+
+static void update_sg_membership(sched_scalable_thread_state_t *ts)
+{
+ uint32_t p;
+ sched_group_mask_t sg_wanted;
+ sched_group_mask_t added;
+ sched_group_mask_t removed;
+ uint32_t sgi;
+ sched_group_t *sg;
+ uint32_t x;
+
+ for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
+ sg_wanted = atom_bitset_load(&ts->sg_wanted[p],
+ __ATOMIC_ACQUIRE);
+ if (!bitset_is_eql(ts->sg_actual[p], sg_wanted)) {
+ /* Our sched_group membership has changed */
+ added = bitset_andn(sg_wanted, ts->sg_actual[p]);
+ while (!bitset_is_null(added)) {
+ sgi = bitset_ffs(added) - 1;
+ sg = sg_vec[sgi];
+ for (x = 0; x < sg->xfactor; x++) {
+ /* Include our thread index to shift
+ * (rotate) the order of schedq's
+ */
+ insert_schedq_in_list
+ (ts,
+ &sg->schedq[p * sg->xfactor +
+ (x + ts->tidx) % sg->xfactor]);
+ }
+ atom_bitset_set(&sg->thr_actual[p], ts->tidx,
+ __ATOMIC_RELAXED);
+ added = bitset_clr(added, sgi);
+ }
+ removed = bitset_andn(ts->sg_actual[p], sg_wanted);
+ while (!bitset_is_null(removed)) {
+ sgi = bitset_ffs(removed) - 1;
+ sg = sg_vec[sgi];
+ for (x = 0; x < sg->xfactor; x++) {
+ remove_schedq_from_list
+ (ts,
+ &sg->schedq[p *
+ sg->xfactor + x]);
+ }
+ atom_bitset_clr(&sg->thr_actual[p], ts->tidx,
+ __ATOMIC_RELAXED);
+ removed = bitset_clr(removed, sgi);
+ }
+ ts->sg_actual[p] = sg_wanted;
+ }
+ }
+}
+
+/*******************************************************************************
+ * Scheduler
+ ******************************************************************************/
+
+static inline void _schedule_release_atomic(sched_scalable_thread_state_t *ts)
+{
+#ifdef CONFIG_QSCHST_LOCK
+ sched_update_deq_sc(ts->atomq, ts->dequeued, true);
+ ODP_ASSERT(ts->atomq->qschst.cur_ticket != ts->ticket);
+ ODP_ASSERT(ts->atomq->qschst.cur_ticket ==
+ ts->atomq->qschst.nxt_ticket);
+#else
+ sched_update_deq(ts->atomq, ts->dequeued, true);
+#endif
+ ts->atomq = NULL;
+ ts->ticket = TICKET_INVALID;
+}
+
+static inline void _schedule_release_ordered(sched_scalable_thread_state_t *ts)
+{
+ ts->out_of_order = false;
+ rctx_release(ts->rctx);
+ ts->rctx = NULL;
+}
+
+static void pktin_poll(sched_scalable_thread_state_t *ts)
+{
+ uint32_t i, tag, hi, npolls = 0;
+ int pktio_index, queue_index;
+
+ hi = __atomic_load_n(&pktin_hi, __ATOMIC_RELAXED);
+ if (hi == 0)
+ return;
+
+ for (i = ts->pktin_next; npolls != hi; i = (i + 1) % hi, npolls++) {
+ tag = __atomic_load_n(&pktin_tags[i], __ATOMIC_RELAXED);
+ if (!TAG_IS_READY(tag))
+ continue;
+ if (!__atomic_compare_exchange_n(&pktin_tags[i], &tag,
+ tag | TAG_BUSY,
+ true,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED))
+ continue;
+ /* Tag grabbed */
+ pktio_index = TAG_2_PKTIO(tag);
+ queue_index = TAG_2_QUEUE(tag);
+ if (odp_unlikely(sched_cb_pktin_poll(pktio_index,
+ 1, &queue_index))) {
+ /* Pktio stopped or closed
+ * Remove tag from pktin_tags
+ */
+ __atomic_store_n(&pktin_tags[i],
+ TAG_EMPTY, __ATOMIC_RELAXED);
+ __atomic_fetch_sub(&pktin_num,
+ 1, __ATOMIC_RELEASE);
+ /* Call stop_finalize when all queues
+ * of the pktio have been removed
+ */
+ if (__atomic_sub_fetch(&pktin_count[pktio_index], 1,
+ __ATOMIC_RELAXED) == 0)
+ sched_cb_pktio_stop_finalize(pktio_index);
+ } else {
+ /* We don't know whether any packets were found and enqueued
+ * Write back original tag value to release pktin queue
+ */
+ __atomic_store_n(&pktin_tags[i], tag, __ATOMIC_RELAXED);
+ /* Do not iterate through all pktin queues every time */
+ if ((ts->pktin_poll_cnts & 0xf) != 0)
+ break;
+ }
+ }
+ ODP_ASSERT(i < hi);
+ ts->pktin_poll_cnts++;
+ ts->pktin_next = i;
+}
+
+static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
+{
+ sched_scalable_thread_state_t *ts;
+ sched_elem_t *atomq;
+ int num;
+ uint32_t i;
+
+ ts = sched_ts;
+ atomq = ts->atomq;
+
+ /* Once an atomic queue has been scheduled to a thread, it will stay
+ * on that thread until empty or 'rotated' by WRR
+ */
+ if (atomq != NULL) {
+ ODP_ASSERT(ts->ticket != TICKET_INVALID);
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&atomq->qschlock);
+#endif
+dequeue_atomic:
+ ODP_ASSERT(ts->ticket == atomq->qschst.cur_ticket);
+ ODP_ASSERT(ts->ticket != atomq->qschst.nxt_ticket);
+ /* Atomic queues can be dequeued without lock since this thread
+ * has the only reference to the atomic queue being processed.
+ */
+ if (ts->dequeued < atomq->qschst.wrr_budget) {
+ num = _odp_queue_deq_sc(atomq, ev, num_evts);
+ if (odp_likely(num != 0)) {
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&atomq->qschlock);
+#endif
+ ts->dequeued += num;
+ /* Allow this thread to continue to 'own' this
+ * atomic queue until all events have been
+ * processed and the thread re-invokes the
+ * scheduler.
+ */
+ if (from)
+ *from = queue_get_handle(
+ (queue_entry_t *)atomq);
+ return num;
+ }
+ }
+ /* Atomic queue was empty or interrupted by WRR, release it. */
+ _schedule_release_atomic(ts);
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&atomq->qschlock);
+#endif
+ }
+
+ /* Release any previous reorder context. */
+ if (ts->rctx != NULL)
+ _schedule_release_ordered(ts);
+
+ /* Check for and perform any scheduler group updates. */
+ if (odp_unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) {
+ (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE);
+ ts->sg_sem = 0;
+ update_sg_membership(ts);
+ }
+
+ /* Scan our schedq list from beginning to end */
+ for (i = 0; i < ts->num_schedq; i++) {
+ sched_queue_t *schedq = ts->schedq_list[i];
+ sched_elem_t *elem;
+restart_same:
+ elem = schedq_peek(schedq);
+ if (odp_unlikely(elem == NULL)) {
+ /* Schedq empty, look at next one. */
+ continue;
+ }
+
+ if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) {
+ /* Dequeue element only if it is still at head
+ * of schedq.
+ */
+ if (odp_unlikely(!schedq_cond_pop(schedq, elem))) {
+ /* Queue not at head of schedq anymore, some
+ * other thread popped it.
+ */
+ goto restart_same;
+ }
+ ts->atomq = elem;
+ atomq = elem;
+ ts->dequeued = 0;
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&atomq->qschlock);
+ ts->ticket = atomq->qschst.nxt_ticket++;
+ ODP_ASSERT(atomq->qschst.cur_ticket == ts->ticket);
+#else
+ /* Dequeued atomic queue from the schedq, only we
+ * can process it and any qschst updates are our
+ * responsibility.
+ */
+ /* The ticket taken below will signal producers */
+ ts->ticket = __atomic_fetch_add(
+ &atomq->qschst.nxt_ticket, 1, __ATOMIC_RELAXED);
+ while (__atomic_load_n(
+ &atomq->qschst.cur_ticket,
+ __ATOMIC_ACQUIRE) != ts->ticket) {
+ /* No need to use WFE, spinning here seems
+ * very infrequent.
+ */
+ odp_cpu_pause();
+ }
+#endif
+ goto dequeue_atomic;
+ } else if (elem->cons_type == ODP_SCHED_SYNC_PARALLEL) {
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&elem->qschlock);
+ num = _odp_queue_deq_sc(elem, ev, num_evts);
+ if (odp_likely(num != 0)) {
+ sched_update_deq_sc(elem, num, false);
+ UNLOCK(&elem->qschlock);
+ if (from)
+ *from =
+ queue_get_handle((queue_entry_t *)elem);
+ return num;
+ }
+ UNLOCK(&elem->qschlock);
+#else
+ num = _odp_queue_deq_mc(elem, ev, num_evts);
+ if (odp_likely(num != 0)) {
+ sched_update_deq(elem, num, false);
+ if (from)
+ *from =
+ queue_get_handle((queue_entry_t *)elem);
+ return num;
+ }
+#endif
+ } else if (elem->cons_type == ODP_SCHED_SYNC_ORDERED) {
+ reorder_window_t *rwin;
+ reorder_context_t *rctx;
+ uint32_t sn;
+ uint32_t idx;
+
+ /* The ordered queue has a reorder window so requires
+ * order restoration. We must use a reorder context to
+ * collect all outgoing events. Ensure there is at least
+ * one available reorder context.
+ */
+ if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) {
+ ts->priv_rvec_free = atom_bitset_xchg(
+ &ts->rvec_free, 0,
+ __ATOMIC_RELAXED);
+ if (odp_unlikely(bitset_is_null(
+ ts->priv_rvec_free))) {
+ /* No free reorder contexts for
+ * this thread. Look at next schedq,
+ * hope we find non-ordered queue.
+ */
+ continue;
+ }
+ }
+ /* rwin_reserve and odp_queue_deq must be atomic or
+ * there will be a potential race condition.
+ * Allocate a slot in the reorder window.
+ */
+ rwin = queue_get_rwin((queue_entry_t *)elem);
+ ODP_ASSERT(rwin != NULL);
+ if (odp_unlikely(!rwin_reserve(rwin, &sn))) {
+ /* Reorder window full */
+ /* Look at next schedq, find other queue */
+ continue;
+ }
+ /* Wait for our turn to dequeue */
+ if (odp_unlikely(__atomic_load_n(&rwin->turn,
+ __ATOMIC_ACQUIRE)
+ != sn)) {
+ sevl();
+ while (wfe() &&
+ monitor32(&rwin->turn, __ATOMIC_ACQUIRE)
+ != sn)
+ doze();
+ }
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&elem->qschlock);
+#endif
+ num = _odp_queue_deq_sc(elem, ev, num_evts);
+ /* Wait for prod_read write in _odp_queue_dequeue_sc()
+ * to complete before we signal the next consumer
+ */
+ atomic_store_release(&rwin->turn, sn + 1,
+ /*readonly=*/false);
+ /* Find and initialise an unused reorder context. */
+ idx = bitset_ffs(ts->priv_rvec_free) - 1;
+ ts->priv_rvec_free =
+ bitset_clr(ts->priv_rvec_free, idx);
+ rctx = &ts->rvec[idx];
+ /* Need to initialise reorder context or we can't
+ * release it later.
+ */
+ rctx_init(rctx, idx, rwin, sn);
+
+ /* Was dequeue successful? */
+ if (odp_likely(num != 0)) {
+ /* Perform scheduler related updates */
+#ifdef CONFIG_QSCHST_LOCK
+ sched_update_deq_sc(elem, num,
+ /*atomic=*/false);
+ UNLOCK(&elem->qschlock);
+#else
+ sched_update_deq(elem, num, /*atomic=*/false);
+#endif
+
+ /* Are we in-order or out-of-order? */
+ ts->out_of_order = sn != rwin->hc.head;
+
+ ts->rctx = rctx;
+ if (from)
+ *from = queue_get_handle(
+ (queue_entry_t *)elem);
+ return num;
+ }
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&elem->qschlock);
+#endif
+ /* Since a slot was reserved in the reorder window,
+ * the reorder context needs to be released and
+ * inserted into the reorder window.
+ */
+ rctx_release(rctx);
+ ODP_ASSERT(ts->rctx == NULL);
+ }
+ /* Dequeue from parallel/ordered queue failed
+ * Check if we have a queue at the head of the schedq that needs
+ * to be popped
+ */
+ if (odp_unlikely(__atomic_load_n(&elem->pop_deficit,
+ __ATOMIC_RELAXED) != 0)) {
+#ifdef CONFIG_QSCHST_LOCK
+ LOCK(&elem->qschlock);
+ sched_update_popd_sc(elem);
+ UNLOCK(&elem->qschlock);
+#else
+ sched_update_popd(elem);
+#endif
+ }
+ }
+
+ pktin_poll(ts);
+ return 0;
+}
+
+/******************************************************************************/
+
+static void schedule_order_lock(unsigned lock_index)
+{
+ struct reorder_context *rctx = sched_ts->rctx;
+
+ if (odp_unlikely(rctx == NULL ||
+ rctx->rwin == NULL ||
+ lock_index >= rctx->rwin->lock_count)) {
+ ODP_ERR("Invalid call to odp_schedule_order_lock\n");
+ return;
+ }
+ if (odp_unlikely(__atomic_load_n(&rctx->rwin->olock[lock_index],
+ __ATOMIC_ACQUIRE) != rctx->sn)) {
+ sevl();
+ while (wfe() &&
+ monitor32(&rctx->rwin->olock[lock_index],
+ __ATOMIC_ACQUIRE) != rctx->sn)
+ doze();
+ }
+}
+
+static void schedule_order_unlock(unsigned lock_index)
+{
+ struct reorder_context *rctx;
+
+ rctx = sched_ts->rctx;
+ if (odp_unlikely(rctx == NULL ||
+ rctx->rwin == NULL ||
+ lock_index >= rctx->rwin->lock_count ||
+ rctx->rwin->olock[lock_index] != rctx->sn)) {
+ ODP_ERR("Invalid call to odp_schedule_order_unlock\n");
+ return;
+ }
+ atomic_store_release(&rctx->rwin->olock[lock_index],
+ rctx->sn + 1,
+ /*readonly=*/false);
+ rctx->olock_flags |= 1U << lock_index;
+}
+
+static void schedule_release_atomic(void)
+{
+ sched_scalable_thread_state_t *ts;
+
+ ts = sched_ts;
+ if (odp_likely(ts->atomq != NULL)) {
+#ifdef CONFIG_QSCHST_LOCK
+ sched_elem_t *atomq;
+
+ atomq = ts->atomq;
+ LOCK(&atomq->qschlock);
+#endif
+ _schedule_release_atomic(ts);
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&atomq->qschlock);
+#endif
+ }
+}
+
+static void schedule_release_ordered(void)
+{
+ sched_scalable_thread_state_t *ts;
+
+ ts = sched_ts;
+ if (ts->rctx != NULL)
+ _schedule_release_ordered(ts);
+}
+
+static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[],
+ int num)
+{
+ sched_scalable_thread_state_t *ts;
+ int n;
+ odp_time_t start;
+ odp_time_t delta;
+ odp_time_t deadline;
+
+ ts = sched_ts;
+ if (odp_unlikely(ts->pause)) {
+ if (ts->atomq != NULL) {
+#ifdef CONFIG_QSCHST_LOCK
+ sched_elem_t *atomq;
+
+ atomq = ts->atomq;
+ LOCK(&atomq->qschlock);
+#endif
+ _schedule_release_atomic(ts);
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&atomq->qschlock);
+#endif
+ } else if (ts->rctx != NULL) {
+ _schedule_release_ordered(ts);
+ }
+ return 0;
+ }
+
+ if (wait == ODP_SCHED_NO_WAIT)
+ return _schedule(from, ev, num);
+
+ if (wait == ODP_SCHED_WAIT) {
+ for (;;) {
+ n = _schedule(from, ev, num);
+ if (odp_likely(n > 0))
+ return n;
+ }
+ }
+
+ start = odp_time_local();
+
+ n = _schedule(from, ev, num);
+ if (odp_likely(n > 0))
+ return n;
+
+ delta = odp_time_local_from_ns(wait);
+ deadline = odp_time_sum(start, delta);
+
+ while (odp_time_cmp(deadline, odp_time_local()) > 0) {
+ n = _schedule(from, ev, num);
+ if (odp_likely(n > 0))
+ return n;
+ }
+
+ return 0;
+}
+
+static odp_event_t schedule(odp_queue_t *from, uint64_t wait)
+{
+ odp_event_t ev = ODP_EVENT_INVALID;
+ const int num = 1;
+ sched_scalable_thread_state_t *ts;
+ int n;
+ odp_time_t start;
+ odp_time_t delta;
+ odp_time_t deadline;
+
+ ts = sched_ts;
+ if (odp_unlikely(ts->pause)) {
+ if (ts->atomq != NULL) {
+#ifdef CONFIG_QSCHST_LOCK
+ sched_elem_t *atomq;
+
+ atomq = ts->atomq;
+ LOCK(&atomq->qschlock);
+#endif
+ _schedule_release_atomic(ts);
+#ifdef CONFIG_QSCHST_LOCK
+ UNLOCK(&atomq->qschlock);
+#endif
+ } else if (ts->rctx != NULL) {
+ _schedule_release_ordered(ts);
+ }
+ return ev;
+ }
+
+ if (wait == ODP_SCHED_NO_WAIT) {
+ (void)_schedule(from, &ev, num);
+ return ev;
+ }
+
+ if (wait == ODP_SCHED_WAIT) {
+ for (;;) {
+ n = _schedule(from, &ev, num);
+ if (odp_likely(n > 0))
+ return ev;
+ }
+ }
+
+ start = odp_time_local();
+
+ n = _schedule(from, &ev, num);
+ if (odp_likely(n > 0))
+ return ev;
+
+ delta = odp_time_local_from_ns(wait);
+ deadline = odp_time_sum(start, delta);
+
+ while (odp_time_cmp(deadline, odp_time_local()) > 0) {
+ n = _schedule(from, &ev, num);
+ if (odp_likely(n > 0))
+ return ev;
+ }
+
+ return ev;
+}
+
+static void schedule_pause(void)
+{
+ sched_ts->pause = true;
+}
+
+static void schedule_resume(void)
+{
+ sched_ts->pause = false;
+}
+
+static uint64_t schedule_wait_time(uint64_t ns)
+{
+ return ns;
+}
+
+static int schedule_num_prio(void)
+{
+ return ODP_SCHED_PRIO_NUM;
+}
+
+static int schedule_group_update(sched_group_t *sg,
+ uint32_t sgi,
+ const odp_thrmask_t *mask,
+ int join_leave)
+{
+ int thr;
+ uint32_t p;
+
+ /* Internal function, do not validate inputs */
+
+ /* Notify relevant threads about the change */
+ thr = odp_thrmask_first(mask);
+ while (0 <= thr) {
+ /* Add thread to scheduler group's wanted thread mask */
+ if (join_leave == SCHED_GROUP_JOIN)
+ atom_bitset_set(&sg->thr_wanted, thr, __ATOMIC_RELAXED);
+ else
+ atom_bitset_clr(&sg->thr_wanted, thr, __ATOMIC_RELAXED);
+ for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
+ if (sg->xcount[p] != 0) {
+ /* This priority level has ODP queues
+ * Notify the thread about membership in
+ * this group/priority
+ */
+ if (join_leave == SCHED_GROUP_JOIN)
+ atom_bitset_set(
+ &thread_state[thr].sg_wanted[p],
+ sgi,
+ __ATOMIC_RELEASE);
+ else
+ atom_bitset_clr(
+ &thread_state[thr].sg_wanted[p],
+ sgi,
+ __ATOMIC_RELEASE);
+ __atomic_store_n(&thread_state[thr].sg_sem,
+ 1,
+ __ATOMIC_RELEASE);
+ }
+ }
+ thr = odp_thrmask_next(mask, thr);
+ }
+
+ return 0;
+}
+
+static int _schedule_group_thrmask(sched_group_t *sg, odp_thrmask_t *mask)
+{
+ bitset_t bs;
+ uint32_t bit;
+
+ /* Internal function, do not validate inputs */
+
+ odp_thrmask_zero(mask);
+ bs = sg->thr_wanted;
+ while (!bitset_is_null(bs)) {
+ bit = bitset_ffs(bs) - 1;
+ bs = bitset_clr(bs, bit);
+ odp_thrmask_set(mask, bit);
+ }
+
+ return 0;
+}
+
+static odp_schedule_group_t schedule_group_create(const char *name,
+ const odp_thrmask_t *mask)
+{
+ uint32_t sgi;
+ sched_group_mask_t free;
+ uint32_t xfactor;
+ sched_group_t *sg;
+ uint32_t p;
+ uint32_t x;
+ uint32_t size;
+
+ /* Validate inputs */
+ if (mask == NULL)
+ ODP_ABORT("mask is NULL\n");
+
+ odp_spinlock_lock(&sched_grp_lock);
+
+ /* Allocate a scheduler group */
+ free = atom_bitset_load(&sg_free, __ATOMIC_RELAXED);
+ do {
+ /* All sched_groups in use */
+ if (bitset_is_null(free))
+ goto no_free_sched_group;
+
+ sgi = bitset_ffs(free) - 1;
+ /* All sched_groups in use */
+ if (sgi >= MAX_SCHED_GROUP)
+ goto no_free_sched_group;
+ } while (!atom_bitset_cmpxchg(&sg_free,
+ &free,
+ bitset_clr(free, sgi),
+ true,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQUIRE));
+
+ /* Compute xfactor (spread factor) from the number of threads
+ * present in the thread mask. Preferable this would be an
+ * explicit parameter.
+ */
+ xfactor = odp_thrmask_count(mask);
+ if (xfactor < 1)
+ xfactor = CONFIG_DEFAULT_XFACTOR;
+
+ size = sizeof(sched_group_t) +
+ (ODP_SCHED_PRIO_NUM * xfactor - 1) * sizeof(sched_queue_t);
+ sg = (sched_group_t *)shm_pool_alloc_align(sched_shm_pool, size);
+ if (sg == NULL)
+ goto shm_pool_alloc_failed;
+
+ strncpy(sg->name, name ? name : "", ODP_SCHED_GROUP_NAME_LEN - 1);
+ sg_vec[sgi] = sg;
+ memset(sg->thr_actual, 0, sizeof(sg->thr_actual));
+ sg->thr_wanted = bitset_null();
+ sg->xfactor = xfactor;
+ for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
+ sg->xcount[p] = 0;
+ for (x = 0; x < xfactor; x++)
+ schedq_init(&sg->schedq[p * xfactor + x], p);
+ }
+ if (odp_thrmask_count(mask) != 0)
+ schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN);
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return (odp_schedule_group_t)(sgi);
+
+shm_pool_alloc_failed:
+ /* Free the allocated group index */
+ atom_bitset_set(&sg_free, sgi, __ATOMIC_RELAXED);
+
+no_free_sched_group:
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return ODP_SCHED_GROUP_INVALID;
+}
+
+static int schedule_group_destroy(odp_schedule_group_t group)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ uint32_t p;
+ int ret = 0;
+
+ /* Validate inputs */
+ if (group < 0 || group >= (odp_schedule_group_t)MAX_SCHED_GROUP) {
+ ret = -1;
+ goto invalid_group;
+ }
+
+ if (sched_ts &&
+ odp_unlikely(__atomic_load_n(&sched_ts->sg_sem,
+ __ATOMIC_RELAXED) != 0)) {
+ (void)__atomic_load_n(&sched_ts->sg_sem,
+ __ATOMIC_ACQUIRE);
+ sched_ts->sg_sem = 0;
+ update_sg_membership(sched_ts);
+ }
+ odp_spinlock_lock(&sched_grp_lock);
+
+ sgi = (uint32_t)group;
+ if (bitset_is_set(sg_free, sgi)) {
+ ret = -1;
+ goto group_not_found;
+ }
+
+ sg = sg_vec[sgi];
+ /* First ensure all threads have processed group_join/group_leave
+ * requests.
+ */
+ for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
+ if (sg->xcount[p] != 0) {
+ bitset_t wanted = atom_bitset_load(
+ &sg->thr_wanted, __ATOMIC_RELAXED);
+
+ sevl();
+ while (wfe() &&
+ !bitset_is_eql(wanted,
+ bitset_monitor(&sg->thr_actual[p],
+ __ATOMIC_RELAXED)))
+ doze();
+ }
+ /* Else ignore because no ODP queues on this prio */
+ }
+
+ /* Check if all threads/queues have left the group */
+ for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
+ if (!bitset_is_null(sg->thr_actual[p])) {
+ ODP_ERR("Group has threads\n");
+ ret = -1;
+ goto thrd_q_present_in_group;
+ }
+ if (sg->xcount[p] != 0) {
+ ODP_ERR("Group has queues\n");
+ ret = -1;
+ goto thrd_q_present_in_group;
+ }
+ }
+
+ _odp_ishm_pool_free(sched_shm_pool, sg);
+ sg_vec[sgi] = NULL;
+ atom_bitset_set(&sg_free, sgi, __ATOMIC_RELEASE);
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return ret;
+
+thrd_q_present_in_group:
+
+group_not_found:
+ odp_spinlock_unlock(&sched_grp_lock);
+
+invalid_group:
+
+ return ret;
+}
+
+static odp_schedule_group_t schedule_group_lookup(const char *name)
+{
+ uint32_t sgi;
+ odp_schedule_group_t group;
+
+ /* Validate inputs */
+ if (name == NULL)
+ ODP_ABORT("name or mask is NULL\n");
+
+ group = ODP_SCHED_GROUP_INVALID;
+
+ odp_spinlock_lock(&sched_grp_lock);
+
+ /* Scan through the schedule group array */
+ for (sgi = 0; sgi < MAX_SCHED_GROUP; sgi++) {
+ if ((sg_vec[sgi] != NULL) &&
+ (strncmp(name, sg_vec[sgi]->name,
+ ODP_SCHED_GROUP_NAME_LEN) == 0)) {
+ group = (odp_schedule_group_t)sgi;
+ break;
+ }
+ }
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return group;
+}
+
+static int schedule_group_join(odp_schedule_group_t group,
+ const odp_thrmask_t *mask)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ int ret;
+
+ /* Validate inputs */
+ if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP))
+ return -1;
+
+ if (mask == NULL)
+ ODP_ABORT("name or mask is NULL\n");
+
+ odp_spinlock_lock(&sched_grp_lock);
+
+ sgi = (uint32_t)group;
+ if (bitset_is_set(sg_free, sgi)) {
+ odp_spinlock_unlock(&sched_grp_lock);
+ return -1;
+ }
+
+ sg = sg_vec[sgi];
+ ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN);
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return ret;
+}
+
+static int schedule_group_leave(odp_schedule_group_t group,
+ const odp_thrmask_t *mask)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ int ret = 0;
+
+ /* Validate inputs */
+ if (group < 0 || group >= (odp_schedule_group_t)MAX_SCHED_GROUP) {
+ ret = -1;
+ goto invalid_group;
+ }
+
+ if (mask == NULL)
+ ODP_ABORT("name or mask is NULL\n");
+
+ odp_spinlock_lock(&sched_grp_lock);
+
+ sgi = (uint32_t)group;
+ if (bitset_is_set(sg_free, sgi)) {
+ ret = -1;
+ goto group_not_found;
+ }
+
+ sg = sg_vec[sgi];
+
+ ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_LEAVE);
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return ret;
+
+group_not_found:
+ odp_spinlock_unlock(&sched_grp_lock);
+
+invalid_group:
+ return ret;
+}
+
+static int schedule_group_thrmask(odp_schedule_group_t group,
+ odp_thrmask_t *mask)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ int ret = 0;
+
+ /* Validate inputs */
+ if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) {
+ ret = -1;
+ goto invalid_group;
+ }
+
+ if (mask == NULL)
+ ODP_ABORT("name or mask is NULL\n");
+
+ odp_spinlock_lock(&sched_grp_lock);
+
+ sgi = (uint32_t)group;
+ if (bitset_is_set(sg_free, sgi)) {
+ ret = -1;
+ goto group_not_found;
+ }
+
+ sg = sg_vec[sgi];
+ ret = _schedule_group_thrmask(sg, mask);
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return ret;
+
+group_not_found:
+ odp_spinlock_unlock(&sched_grp_lock);
+
+invalid_group:
+ return ret;
+}
+
+static int schedule_group_info(odp_schedule_group_t group,
+ odp_schedule_group_info_t *info)
+{
+ uint32_t sgi;
+ sched_group_t *sg;
+ int ret = 0;
+
+ /* Validate inputs */
+ if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) {
+ ret = -1;
+ goto invalid_group;
+ }
+
+ if (info == NULL)
+ ODP_ABORT("name or mask is NULL\n");
+
+ odp_spinlock_lock(&sched_grp_lock);
+
+ sgi = (uint32_t)group;
+ if (bitset_is_set(sg_free, sgi)) {
+ ret = -1;
+ goto group_not_found;
+ }
+
+ sg = sg_vec[sgi];
+
+ ret = _schedule_group_thrmask(sg, &info->thrmask);
+
+ info->name = sg->name;
+
+ odp_spinlock_unlock(&sched_grp_lock);
+
+ return ret;
+
+group_not_found:
+ odp_spinlock_unlock(&sched_grp_lock);
+
+invalid_group:
+ return ret;
+}
+
+static int schedule_init_global(void)
+{
+ odp_thrmask_t mask;
+ odp_schedule_group_t tmp_all;
+ odp_schedule_group_t tmp_wrkr;
+ odp_schedule_group_t tmp_ctrl;
+ uint32_t bits;
+ uint32_t pool_size;
+ uint64_t min_alloc;
+ uint64_t max_alloc;
+
+ /* Attach to the pool if it exists */
+ sched_shm_pool = _odp_ishm_pool_lookup("sched_shm_pool");
+ if (sched_shm_pool == NULL) {
+ /* Add storage required for sched groups. Assume worst case
+ * xfactor of MAXTHREADS.
+ */
+ pool_size = (sizeof(sched_group_t) +
+ (ODP_SCHED_PRIO_NUM * MAXTHREADS - 1) *
+ sizeof(sched_queue_t)) * MAX_SCHED_GROUP;
+ /* Choose min_alloc and max_alloc such that slab allocator
+ * is selected.
+ */
+ min_alloc = sizeof(sched_group_t) +
+ (ODP_SCHED_PRIO_NUM * MAXTHREADS - 1) *
+ sizeof(sched_queue_t);
+ max_alloc = min_alloc;
+ sched_shm_pool = _odp_ishm_pool_create("sched_shm_pool",
+ pool_size,
+ min_alloc, max_alloc,
+ _ODP_ISHM_SINGLE_VA);
+ if (sched_shm_pool == NULL) {
+ ODP_ERR("Failed to allocate shared memory pool "
+ "for sched\n");
+ goto failed_sched_shm_pool_create;
+ }
+ }
+
+ odp_spinlock_init(&sched_grp_lock);
+
+ bits = MAX_SCHED_GROUP;
+ if (MAX_SCHED_GROUP == sizeof(sg_free) * CHAR_BIT)
+ sg_free = ~0;
+ else
+ sg_free = (1 << bits) - 1;
+
+ for (uint32_t i = 0; i < MAX_SCHED_GROUP; i++)
+ sg_vec[i] = NULL;
+ for (uint32_t i = 0; i < MAXTHREADS; i++) {
+ thread_state[i].sg_sem = 0;
+ for (uint32_t j = 0; j < ODP_SCHED_PRIO_NUM; j++)
+ thread_state[i].sg_wanted[j] = bitset_null();
+ }
+
+ /* Create sched groups for default GROUP_ALL, GROUP_WORKER and
+ * GROUP_CONTROL groups.
+ */
+ odp_thrmask_zero(&mask);
+ tmp_all = odp_schedule_group_create("__group_all", &mask);
+ if (tmp_all != ODP_SCHED_GROUP_ALL) {
+ ODP_ERR("Could not create ODP_SCHED_GROUP_ALL()\n");
+ goto failed_create_group_all;
+ }
+
+ tmp_wrkr = odp_schedule_group_create("__group_worker", &mask);
+ if (tmp_wrkr != ODP_SCHED_GROUP_WORKER) {
+ ODP_ERR("Could not create ODP_SCHED_GROUP_WORKER()\n");
+ goto failed_create_group_worker;
+ }
+
+ tmp_ctrl = odp_schedule_group_create("__group_control", &mask);
+ if (tmp_ctrl != ODP_SCHED_GROUP_CONTROL) {
+ ODP_ERR("Could not create ODP_SCHED_GROUP_CONTROL()\n");
+ goto failed_create_group_control;
+ }
+
+ return 0;
+
+failed_create_group_control:
+ if (tmp_ctrl != ODP_SCHED_GROUP_INVALID)
+ odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL);
+
+failed_create_group_worker:
+ if (tmp_wrkr != ODP_SCHED_GROUP_INVALID)
+ odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER);
+
+failed_create_group_all:
+ if (tmp_all != ODP_SCHED_GROUP_INVALID)
+ odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL);
+
+failed_sched_shm_pool_create:
+
+ return -1;
+}
+
+static int schedule_term_global(void)
+{
+ /* Destroy sched groups for default GROUP_ALL, GROUP_WORKER and
+ * GROUP_CONTROL groups.
+ */
+ if (odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL) != 0)
+ ODP_ERR("Failed to destroy ODP_SCHED_GROUP_ALL\n");
+ if (odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER) != 0)
+ ODP_ERR("Failed to destroy ODP_SCHED_GROUP_WORKER\n");
+ if (odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL) != 0)
+ ODP_ERR("Failed to destroy ODP_SCHED_GROUP_CONTROL\n");
+
+ _odp_ishm_pool_destroy(sched_shm_pool);
+
+ return 0;
+}
+
+static int schedule_init_local(void)
+{
+ int thr_id;
+ odp_thread_type_t thr_type;
+ odp_thrmask_t mask;
+
+ thr_id = odp_thread_id();
+ if (thread_state_init(thr_id))
+ goto failed_to_init_ts;
+
+ /* Add this thread to default schedule groups */
+ thr_type = odp_thread_type();
+ odp_thrmask_zero(&mask);
+ odp_thrmask_set(&mask, thr_id);
+
+ if (odp_schedule_group_join(ODP_SCHED_GROUP_ALL, &mask) != 0) {
+ ODP_ERR("Failed to join ODP_SCHED_GROUP_ALL\n");
+ goto failed_to_join_grp_all;
+ }
+ if (thr_type == ODP_THREAD_CONTROL) {
+ if (odp_schedule_group_join(ODP_SCHED_GROUP_CONTROL,
+ &mask) != 0) {
+ ODP_ERR("Failed to join ODP_SCHED_GROUP_CONTROL\n");
+ goto failed_to_join_grp_ctrl;
+ }
+ } else {
+ if (odp_schedule_group_join(ODP_SCHED_GROUP_WORKER,
+ &mask) != 0) {
+ ODP_ERR("Failed to join ODP_SCHED_GROUP_WORKER\n");
+ goto failed_to_join_grp_wrkr;
+ }
+ }
+
+ return 0;
+
+failed_to_join_grp_wrkr:
+
+failed_to_join_grp_ctrl:
+ odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask);
+
+failed_to_join_grp_all:
+failed_to_init_ts:
+
+ return -1;
+}
+
+static int schedule_term_local(void)
+{
+ int thr_id;
+ odp_thread_type_t thr_type;
+ odp_thrmask_t mask;
+ int rc = 0;
+
+ /* Remove this thread from default schedule groups */
+ thr_id = odp_thread_id();
+ thr_type = odp_thread_type();
+ odp_thrmask_zero(&mask);
+ odp_thrmask_set(&mask, thr_id);
+
+ if (odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask) != 0)
+ ODP_ERR("Failed to leave ODP_SCHED_GROUP_ALL\n");
+ if (thr_type == ODP_THREAD_CONTROL) {
+ if (odp_schedule_group_leave(ODP_SCHED_GROUP_CONTROL,
+ &mask) != 0)
+ ODP_ERR("Failed to leave ODP_SCHED_GROUP_CONTROL\n");
+ } else {
+ if (odp_schedule_group_leave(ODP_SCHED_GROUP_WORKER,
+ &mask) != 0)
+ ODP_ERR("Failed to leave ODP_SCHED_GROUP_WORKER\n");
+ }
+
+ update_sg_membership(sched_ts);
+
+ /* Check if the thread is still part of any groups */
+ if (sched_ts->num_schedq != 0) {
+ ODP_ERR("Thread %d still part of scheduler group(s)\n",
+ sched_ts->tidx);
+ rc = -1;
+ }
+
+ return rc;
+}
+
+static void pktio_start(int pktio_index, int num_in_queue, int in_queue_idx[])
+{
+ int i;
+ uint32_t old, tag, j;
+
+ for (i = 0; i < num_in_queue; i++) {
+ /* Try to reserve a slot */
+ if (__atomic_fetch_add(&pktin_num,
+ 1, __ATOMIC_RELAXED) >= PKTIN_MAX) {
+ __atomic_fetch_sub(&pktin_num, 1, __ATOMIC_RELAXED);
+ ODP_ABORT("Too many pktio in queues for scheduler\n");
+ }
+ /* A slot has been reserved, now we need to find an empty one */
+ for (j = 0; ; j = (j + 1) % PKTIN_MAX) {
+ if (__atomic_load_n(&pktin_tags[j],
+ __ATOMIC_RELAXED) != TAG_EMPTY)
+ /* Slot used, continue with next */
+ continue;
+ /* Empty slot found */
+ old = TAG_EMPTY;
+ tag = PKTIO_QUEUE_2_TAG(pktio_index, in_queue_idx[i]);
+ if (__atomic_compare_exchange_n(&pktin_tags[j],
+ &old,
+ tag,
+ true,
+ __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED)) {
+ /* Success grabbing slot,update high
+ * watermark
+ */
+ __atomic_fetch_max(&pktin_hi,
+ j + 1, __ATOMIC_RELAXED);
+ /* One more tag (queue) for this pktio
+ * instance
+ */
+ __atomic_fetch_add(&pktin_count[pktio_index],
+ 1, __ATOMIC_RELAXED);
+ /* Continue with next RX queue */
+ break;
+ }
+ /* Failed to grab slot */
+ }
+ }
+}
+
+static int num_grps(void)
+{
+ return MAX_SCHED_GROUP;
+}
+
+/*
+ * Stubs for internal scheduler abstraction layer due to absence of NULL
+ * checking before calling the function pointer.
+ */
+
+static int thr_add(odp_schedule_group_t group, int thr)
+{
+ /* This function is a schedule_init_local duplicate. */
+ (void)group;
+ (void)thr;
+ return 0;
+}
+
+static int thr_rem(odp_schedule_group_t group, int thr)
+{
+ /* This function is a schedule_term_local duplicate. */
+ (void)group;
+ (void)thr;
+ return 0;
+}
+
+static int init_queue(uint32_t queue_index,
+ const odp_schedule_param_t *sched_param)
+{
+ /* Not used in scalable scheduler. */
+ (void)queue_index;
+ (void)sched_param;
+ return 0;
+}
+
+static void destroy_queue(uint32_t queue_index)
+{
+ /* Not used in scalable scheduler. */
+ (void)queue_index;
+}
+
+static int sched_queue(uint32_t queue_index)
+{
+ /* Not used in scalable scheduler. */
+ (void)queue_index;
+ return 0;
+}
+
+static int ord_enq_multi(queue_t handle, void *buf_hdr[], int num,
+ int *ret)
+
+{
+ queue_entry_t *queue;
+ sched_scalable_thread_state_t *ts;
+ int actual;
+
+ ts = sched_ts;
+ if (ts && odp_unlikely(ts->out_of_order)) {
+ queue = qentry_from_int(handle);
+ actual = rctx_save(queue, (odp_buffer_hdr_t **)buf_hdr, num);
+ *ret = actual;
+ return 1;
+ }
+ return 0;
+}
+
+static void schedule_prefetch(int num)
+{
+ (void)num;
+}
+
+/* Wait until we are in-order (when processing an ordered queue)
+ * Note: this function may be called also when processing other queue types
+ */
+static void order_lock(void)
+{
+ sched_scalable_thread_state_t *ts;
+ reorder_window_t *rwin;
+ uint32_t sn;
+
+ ts = sched_ts;
+ if (odp_unlikely(ts->out_of_order)) {
+ /* We are processing ordered queue and are currently
+ * out-of-order.
+ * We are in-order when our reorder window slot number (sn)
+ * equals the head of the reorder window.
+ */
+ ODP_ASSERT(ts->rctx != NULL);
+ rwin = ts->rctx->rwin;
+ sn = ts->rctx->sn;
+ sevl();
+ /* Use acquire ordering to be on the safe side even if
+ * this isn't an acquire/release situation (aka lock).
+ */
+ while (wfe() &&
+ monitor32(&rwin->hc.head, __ATOMIC_ACQUIRE) != sn)
+ doze();
+ }
+}
+
+/* This function is unnecessary.
+ * The next thread becomes in-order when we release our reorder context
+ * (i.e. when odp_schedule() is called again.
+ */
+static void order_unlock(void)
+{
+}
+
+static unsigned schedule_max_ordered_locks(void)
+{
+ return CONFIG_QUEUE_MAX_ORD_LOCKS;
+}
+
+const schedule_fn_t schedule_scalable_fn = {
+ .pktio_start = pktio_start,
+ .thr_add = thr_add,
+ .thr_rem = thr_rem,
+ .num_grps = num_grps,
+ .init_queue = init_queue,
+ .destroy_queue = destroy_queue,
+ .sched_queue = sched_queue,
+ .ord_enq_multi = ord_enq_multi,
+ .init_global = schedule_init_global,
+ .term_global = schedule_term_global,
+ .init_local = schedule_init_local,
+ .term_local = schedule_term_local,
+ .order_lock = order_lock,
+ .order_unlock = order_unlock,
+ .max_ordered_locks = schedule_max_ordered_locks,
+};
+
+const schedule_api_t schedule_scalable_api = {
+ .schedule_wait_time = schedule_wait_time,
+ .schedule = schedule,
+ .schedule_multi = schedule_multi,
+ .schedule_pause = schedule_pause,
+ .schedule_resume = schedule_resume,
+ .schedule_release_atomic = schedule_release_atomic,
+ .schedule_release_ordered = schedule_release_ordered,
+ .schedule_prefetch = schedule_prefetch,
+ .schedule_num_prio = schedule_num_prio,
+ .schedule_group_create = schedule_group_create,
+ .schedule_group_destroy = schedule_group_destroy,
+ .schedule_group_lookup = schedule_group_lookup,
+ .schedule_group_join = schedule_group_join,
+ .schedule_group_leave = schedule_group_leave,
+ .schedule_group_thrmask = schedule_group_thrmask,
+ .schedule_group_info = schedule_group_info,
+ .schedule_order_lock = schedule_order_lock,
+ .schedule_order_unlock = schedule_order_unlock,
+};
diff --git a/platform/linux-generic/odp_schedule_scalable_ordered.c b/platform/linux-generic/odp_schedule_scalable_ordered.c
new file mode 100644
index 000000000..90ddb61c9
--- /dev/null
+++ b/platform/linux-generic/odp_schedule_scalable_ordered.c
@@ -0,0 +1,345 @@
+/* Copyright (c) 2017, ARM Limited. All rights reserved.
+ *
+ * Copyright (c) 2017, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <odp/api/shared_memory.h>
+#include <odp_queue_scalable_internal.h>
+#include <odp_schedule_if.h>
+#include <odp_bitset.h>
+
+#include <string.h>
+
+extern __thread sched_scalable_thread_state_t *sched_ts;
+
+reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool, unsigned lock_count)
+{
+ reorder_window_t *rwin;
+ uint32_t i;
+
+ rwin = (reorder_window_t *)
+ shm_pool_alloc_align(pool, sizeof(reorder_window_t));
+ if (rwin == NULL)
+ return NULL;
+
+ rwin->hc.head = 0;
+ rwin->hc.chgi = 0;
+ rwin->winmask = RWIN_SIZE - 1;
+ rwin->tail = 0;
+ rwin->turn = 0;
+ rwin->lock_count = (uint16_t)lock_count;
+ memset(rwin->olock, 0, sizeof(rwin->olock));
+ for (i = 0; i < RWIN_SIZE; i++)
+ rwin->ring[i] = NULL;
+
+ return rwin;
+}
+
+int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin)
+{
+ return _odp_ishm_pool_free(pool, rwin);
+}
+
+bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn)
+{
+ uint32_t head;
+ uint32_t oldt;
+ uint32_t newt;
+ uint32_t winmask;
+
+ /* Read head and tail separately */
+ oldt = rwin->tail;
+ winmask = rwin->winmask;
+ do {
+ /* Need __atomic_load to avoid compiler reordering */
+ head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED);
+ if (odp_unlikely(oldt - head >= winmask))
+ return false;
+
+ newt = oldt + 1;
+ } while (!__atomic_compare_exchange(&rwin->tail,
+ &oldt,
+ &newt,
+ true,
+ __ATOMIC_RELAXED,
+ __ATOMIC_RELAXED));
+ *sn = oldt;
+
+ return true;
+}
+
+void rwin_insert(reorder_window_t *rwin,
+ reorder_context_t *rctx,
+ uint32_t sn,
+ void (*callback)(reorder_context_t *))
+{
+ /* Initialise to silence scan-build */
+ hc_t old = {0, 0};
+ hc_t new;
+ uint32_t winmask;
+
+ __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE);
+ winmask = rwin->winmask;
+ if (old.head != sn) {
+ /* We are out-of-order. Store context in reorder window,
+ * releasing its content.
+ */
+ ODP_ASSERT(rwin->ring[sn & winmask] == NULL);
+ atomic_store_release(&rwin->ring[sn & winmask],
+ rctx,
+ /*readonly=*/false);
+ rctx = NULL;
+ do {
+ hc_t new;
+
+ new.head = old.head;
+ new.chgi = old.chgi + 1; /* Changed value */
+ /* Update head & chgi, fail if any has changed */
+ if (__atomic_compare_exchange(&rwin->hc,
+ /* Updated on fail */
+ &old,
+ &new,
+ true,
+ /* Rel our ring update */
+ __ATOMIC_RELEASE,
+ __ATOMIC_ACQUIRE))
+ /* CAS succeeded => head same (we are not
+ * in-order), chgi updated.
+ */
+ return;
+ /* CAS failed => head and/or chgi changed.
+ * We might not be out-of-order anymore.
+ */
+ } while (old.head != sn);
+ }
+
+ /* old.head == sn => we are now in-order! */
+ ODP_ASSERT(old.head == sn);
+ /* We are in-order so our responsibility to retire contexts */
+ new.head = old.head;
+ new.chgi = old.chgi + 1;
+
+ /* Retire our in-order context (if we still have it) */
+ if (rctx != NULL) {
+ callback(rctx);
+ new.head++;
+ }
+
+ /* Retire in-order contexts in the ring
+ * The first context might actually be ours (if we were originally
+ * out-of-order)
+ */
+ do {
+ for (;;) {
+ rctx = __atomic_load_n(&rwin->ring[new.head & winmask],
+ __ATOMIC_ACQUIRE);
+ if (rctx == NULL)
+ break;
+ /* We are the only thread that are in-order
+ * (until head updated) so don't have to use
+ * atomic load-and-clear (exchange)
+ */
+ rwin->ring[new.head & winmask] = NULL;
+ callback(rctx);
+ new.head++;
+ }
+ /* Update head&chgi, fail if chgi has changed (head cannot change) */
+ } while (!__atomic_compare_exchange(&rwin->hc,
+ &old, /* Updated on failure */
+ &new,
+ false, /* weak */
+ __ATOMIC_RELEASE, /* Release our ring updates */
+ __ATOMIC_ACQUIRE));
+}
+
+void rctx_init(reorder_context_t *rctx, uint16_t idx,
+ reorder_window_t *rwin, uint32_t sn)
+{
+ /* rctx->rvec_free and rctx->idx already initialised in
+ * thread_state_init function.
+ */
+ ODP_ASSERT(rctx->idx == idx);
+ rctx->rwin = rwin;
+ rctx->sn = sn;
+ rctx->olock_flags = 0;
+ /* First => no next reorder context */
+ rctx->next_idx = idx;
+ /* Where to store next event */
+ rctx->cur_idx = idx;
+ rctx->numevts = 0;
+}
+
+inline void rctx_free(const reorder_context_t *rctx)
+{
+ const reorder_context_t *const base = &rctx[-(int)rctx->idx];
+ const uint32_t first = rctx->idx;
+ uint32_t next_idx;
+
+ next_idx = rctx->next_idx;
+
+ ODP_ASSERT(rctx->rwin != NULL);
+ /* Set free bit */
+ if (rctx->rvec_free == &sched_ts->rvec_free)
+ /* Since it is our own reorder context, we can instead
+ * perform a non-atomic and relaxed update on our private
+ * rvec_free.
+ */
+ sched_ts->priv_rvec_free =
+ bitset_set(sched_ts->priv_rvec_free, rctx->idx);
+ else
+ atom_bitset_set(rctx->rvec_free, rctx->idx, __ATOMIC_RELEASE);
+
+ /* Can't dereference rctx after the corresponding free bit is set */
+ while (next_idx != first) {
+ rctx = &base[next_idx];
+ next_idx = rctx->next_idx;
+ /* Set free bit */
+ if (rctx->rvec_free == &sched_ts->rvec_free)
+ sched_ts->priv_rvec_free =
+ bitset_set(sched_ts->priv_rvec_free, rctx->idx);
+ else
+ atom_bitset_set(rctx->rvec_free, rctx->idx,
+ __ATOMIC_RELEASE);
+ }
+}
+
+inline void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin,
+ uint32_t lock_index)
+{
+ if ((rctx->olock_flags & (1U << lock_index)) == 0) {
+ /* Use relaxed ordering, we are not releasing any updates */
+ rwin->olock[lock_index] = rctx->sn + 1;
+ }
+}
+
+void olock_release(const reorder_context_t *rctx)
+{
+ reorder_window_t *rwin;
+ int i;
+
+ rwin = rctx->rwin;
+
+ for (i = 0; i < rwin->lock_count; i++)
+ olock_unlock(rctx, rwin, i);
+}
+
+static void blocking_enqueue(queue_entry_t *q, odp_buffer_hdr_t **evts, int num)
+{
+ int actual;
+
+ /* Iterate until all events have been successfully enqueued */
+ for (;;) {
+ /* Attempt to enqueue remaining events */
+ actual = q->s.enqueue_multi(qentry_to_int(q), evts, num);
+ if (odp_unlikely(actual < 0))
+ ODP_ERR("Failed to enqueue deferred events\n");
+ /* Update for potential partial success */
+ evts += actual;
+ num -= actual;
+ if (num == 0)
+ break;
+ /* Back-off to decrease load on the system */
+ odp_cpu_pause();
+ }
+}
+
+void rctx_retire(reorder_context_t *first)
+{
+ reorder_context_t *rctx;
+ queue_entry_t *q;
+ uint32_t i;
+ uint32_t j;
+ uint32_t num;
+
+ rctx = first;
+ do {
+ /* Process all events in this reorder context */
+ for (i = 0; i < rctx->numevts;) {
+ q = rctx->destq[i];
+ /* Find index of next different destq */
+ j = i + 1;
+ while (j < rctx->numevts && rctx->destq[j] == q)
+ j++;
+ num = j - i;
+ /* Blocking enqueue of events to this destq */
+ blocking_enqueue(q, &rctx->events[i], num);
+ i += num;
+ }
+ /* Update rctx pointer to point to 'next_idx' element */
+ rctx += (int)rctx->next_idx - (int)rctx->idx;
+ } while (rctx != first);
+ olock_release(first);
+ rctx_free(first);
+}
+
+void rctx_release(reorder_context_t *rctx)
+{
+ /* Insert reorder context into reorder window, potentially calling the
+ * rctx_retire function for all pending reorder_contexts.
+ */
+ rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire);
+}
+
+/* Save destination queue and events in the reorder context for deferred
+ * enqueue.
+ */
+int rctx_save(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
+{
+ int i;
+ sched_scalable_thread_state_t *ts;
+ reorder_context_t *first;
+ reorder_context_t *cur;
+ bitset_t next_idx;
+
+ ts = sched_ts;
+ first = ts->rctx;
+ ODP_ASSERT(ts->rctx != NULL);
+ cur = &first[(int)first->cur_idx - (int)first->idx];
+ for (i = 0; i < num; i++) {
+ if (odp_unlikely(cur->numevts == RC_EVT_SIZE)) {
+ /* No more space in current reorder context
+ * Try to allocate another.
+ */
+ if (odp_unlikely(
+ bitset_is_null(ts->priv_rvec_free))) {
+ ts->priv_rvec_free =
+ atom_bitset_xchg(
+ &ts->rvec_free,
+ 0,
+ __ATOMIC_RELAXED);
+ if (odp_unlikely(bitset_is_null(
+ ts->priv_rvec_free)))
+ /* Out of reorder contexts.
+ * Return the number of events
+ * stored so far.
+ */
+ return i;
+ }
+ next_idx = bitset_ffs(ts->priv_rvec_free) - 1;
+ ts->priv_rvec_free =
+ bitset_clr(ts->priv_rvec_free,
+ next_idx);
+ /* Link current to next (for eventual
+ * retiring)
+ */
+ cur->next_idx = next_idx;
+ /* Link first to next (for next call to
+ * queue_enq_multi())
+ */
+ first->cur_idx = next_idx;
+ /* Update current to next */
+ cur = &ts->rvec[next_idx];
+ rctx_init(cur, next_idx, NULL, 0);
+ /* The last rctx (so far) */
+ cur->next_idx = first->idx;
+ }
+ cur->events[cur->numevts] = buf_hdr[i];
+ cur->destq[cur->numevts] = queue;
+ cur->numevts++;
+ }
+ /* All events stored. */
+ return num;
+}