aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_schedule_iquery.c
diff options
context:
space:
mode:
Diffstat (limited to 'platform/linux-generic/odp_schedule_iquery.c')
-rw-r--r--platform/linux-generic/odp_schedule_iquery.c1552
1 files changed, 0 insertions, 1552 deletions
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c
deleted file mode 100644
index c38c7c01a..000000000
--- a/platform/linux-generic/odp_schedule_iquery.c
+++ /dev/null
@@ -1,1552 +0,0 @@
-/* Copyright (c) 2016, Linaro Limited
- * All rights reserved.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- */
-
-#include <odp/api/schedule.h>
-#include <odp_schedule_if.h>
-#include <odp/api/align.h>
-#include <odp/api/queue.h>
-#include <odp/api/shared_memory.h>
-#include <odp_internal.h>
-#include <odp_debug_internal.h>
-#include <odp_ring_internal.h>
-#include <odp_buffer_internal.h>
-#include <odp_bitmap_internal.h>
-#include <odp/api/thread.h>
-#include <odp/api/time.h>
-#include <odp/api/rwlock.h>
-#include <odp/api/hints.h>
-#include <odp/api/cpu.h>
-#include <odp/api/thrmask.h>
-#include <odp/api/packet_io.h>
-#include <odp_config_internal.h>
-
-/* Should remove this dependency */
-#include <odp_queue_internal.h>
-
-/* Number of priority levels */
-#define NUM_SCHED_PRIO 8
-
-ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (NUM_SCHED_PRIO - 1),
- "lowest_prio_does_not_match_with_num_prios");
-
-ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
- (ODP_SCHED_PRIO_NORMAL < (NUM_SCHED_PRIO - 1)),
- "normal_prio_is_not_between_highest_and_lowest");
-
-/* Number of scheduling groups */
-#define NUM_SCHED_GRPS 256
-
-/* Start of named groups in group mask arrays */
-#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
-
-/* Instantiate a WAPL bitmap to be used as queue index bitmap */
-typedef WAPL_BITMAP(ODP_CONFIG_QUEUES) queue_index_bitmap_t;
-
-typedef struct {
- odp_rwlock_t lock;
- queue_index_bitmap_t queues; /* queues in this priority level */
-} sched_prio_t;
-
-typedef struct {
- odp_rwlock_t lock;
- bool allocated;
- odp_thrmask_t threads; /* threads subscribe to this group */
- queue_index_bitmap_t queues; /* queues in this group */
- char name[ODP_SCHED_GROUP_NAME_LEN];
-} sched_group_t;
-
-/* Packet input poll command queues */
-#define PKTIO_CMD_QUEUES 4
-
-/* Maximum number of packet input queues per command */
-#define MAX_PKTIN 16
-
-/* Maximum number of packet IO interfaces */
-#define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
-
-/* Maximum number of pktio poll commands */
-#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
-
-/* Not a valid index */
-#define NULL_INDEX ((uint32_t)-1)
-/* Pktio command is free */
-#define PKTIO_CMD_FREE ((uint32_t)-1)
-
-/* Packet IO poll queue ring size. In worst case, all pktios
- * have all pktins enabled and one poll command is created per
- * pktin queue. The ring size must be larger than or equal to
- * NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can hold all
- * poll commands in the worst case.
- */
-#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)
-
-/* Mask for wrapping around pktio poll command index */
-#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)
-
-/* Maximum number of dequeues */
-#define MAX_DEQ CONFIG_BURST_SIZE
-
-/* Instantiate a RING data structure as pktio command queue */
-typedef struct {
- /* Ring header */
- ring_t ring;
-
- /* Ring data: pktio poll command indexes */
- uint32_t cmd_index[PKTIO_RING_SIZE];
-} pktio_cmd_queue_t ODP_ALIGNED_CACHE;
-
-/* Packet IO poll command */
-typedef struct {
- int pktio;
- int count;
- int pktin[MAX_PKTIN];
- uint32_t index;
-} pktio_cmd_t;
-
-/* Collect the pktio poll resources */
-typedef struct {
- odp_rwlock_t lock;
- /* count active commands per pktio interface */
- int actives[NUM_PKTIO];
- pktio_cmd_t commands[NUM_PKTIO_CMD];
- pktio_cmd_queue_t queues[PKTIO_CMD_QUEUES];
-} pktio_poll_t;
-
-/* Forward declaration */
-typedef struct sched_thread_local sched_thread_local_t;
-
-/* Order context of a queue */
-typedef struct {
- /* Current ordered context id */
- odp_atomic_u64_t ctx ODP_ALIGNED_CACHE;
-
- /* Next unallocated context id */
- odp_atomic_u64_t next_ctx;
-
- /* Array of ordered locks */
- odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
-
-} order_context_t ODP_ALIGNED_CACHE;
-
-typedef struct {
- odp_shm_t selfie;
-
- /* Schedule priorities */
- sched_prio_t prios[NUM_SCHED_PRIO];
-
- /* Schedule groups */
- sched_group_t groups[NUM_SCHED_GRPS];
-
- /* Cache queue parameters for easy reference */
- odp_schedule_param_t queues[ODP_CONFIG_QUEUES];
-
- /* Poll pktio inputs in spare time */
- pktio_poll_t pktio_poll;
-
- /* Queues send or unwind their availability indications
- * for scheduling, the bool value also serves as a focal
- * point for atomic competition. */
- bool availables[ODP_CONFIG_QUEUES];
-
- /* Quick reference to per thread context */
- sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX];
-
- order_context_t order[ODP_CONFIG_QUEUES];
-} sched_global_t;
-
-/* Per thread events cache */
-typedef struct {
- int count;
- odp_queue_t queue;
- odp_event_t stash[MAX_DEQ], *top;
-} event_cache_t;
-
-/* Ordered stash size */
-#define MAX_ORDERED_STASH 512
-
-/* Storage for stashed enqueue operation arguments */
-typedef struct {
- odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
- queue_entry_t *queue_entry;
- int num;
-} ordered_stash_t;
-
-/* Ordered lock states */
-typedef union {
- uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS];
- uint32_t all;
-} lock_called_t;
-
-ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
- "Lock_called_values_do_not_fit_in_uint32");
-
-/* Instantiate a sparse bitmap to store thread's interested
- * queue indexes per priority.
- */
-typedef SPARSE_BITMAP(ODP_CONFIG_QUEUES) queue_index_sparse_t;
-
-struct sched_thread_local {
- int thread;
- bool pause;
-
- /* Cache events only for atomic queue */
- event_cache_t cache;
-
- /* Saved atomic context */
- bool *atomic;
-
- /* Record the pktio polls have done */
- uint16_t pktin_polls;
-
- /* Interested queue indexes to be checked by thread
- * at each priority level for scheduling, and a round
- * robin iterator to improve fairness between queues
- * in the same priority level.
- */
- odp_rwlock_t lock;
- queue_index_sparse_t indexes[NUM_SCHED_PRIO];
- sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
-
- struct {
- /* Source queue index */
- uint32_t src_queue;
- uint64_t ctx; /**< Ordered context id */
- int stash_num; /**< Number of stashed enqueue operations */
- uint8_t in_order; /**< Order status */
- lock_called_t lock_called; /**< States of ordered locks */
- /** Storage for stashed enqueue operations */
- ordered_stash_t stash[MAX_ORDERED_STASH];
- } ordered;
-};
-
-/* Global scheduler context */
-static sched_global_t *sched;
-
-/* Thread local scheduler context */
-static __thread sched_thread_local_t thread_local;
-
-static int schedule_init_global(void)
-{
- odp_shm_t shm;
- int i, k, prio, group;
-
- ODP_DBG("Schedule[iquery] init ... ");
-
- shm = odp_shm_reserve("odp_scheduler_iquery",
- sizeof(sched_global_t),
- ODP_CACHE_LINE_SIZE, 0);
-
- sched = odp_shm_addr(shm);
-
- if (sched == NULL) {
- ODP_ERR("Schedule[iquery] "
- "init: shm reserve.\n");
- return -1;
- }
-
- memset(sched, 0, sizeof(sched_global_t));
-
- sched->selfie = shm;
-
- for (prio = 0; prio < NUM_SCHED_PRIO; prio++)
- odp_rwlock_init(&sched->prios[prio].lock);
-
- for (group = 0; group < NUM_SCHED_GRPS; group++) {
- sched->groups[group].allocated = false;
- odp_rwlock_init(&sched->groups[group].lock);
- }
-
- odp_rwlock_init(&sched->pktio_poll.lock);
-
- for (i = 0; i < PKTIO_CMD_QUEUES; i++) {
- pktio_cmd_queue_t *queue =
- &sched->pktio_poll.queues[i];
-
- ring_init(&queue->ring);
-
- for (k = 0; k < PKTIO_RING_SIZE; k++)
- queue->cmd_index[k] = RING_EMPTY;
- }
-
- for (i = 0; i < NUM_PKTIO_CMD; i++)
- sched->pktio_poll.commands[i].index = PKTIO_CMD_FREE;
-
- ODP_DBG("done\n");
- return 0;
-}
-
-static int schedule_term_global(void)
-{
- uint32_t i;
- odp_shm_t shm = sched->selfie;
-
- for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
- int count = 0;
- odp_event_t events[1];
-
- if (sched->availables[i])
- count = sched_cb_queue_deq_multi(i, events, 1);
-
- if (count < 0)
- sched_cb_queue_destroy_finalize(i);
- else if (count > 0)
- ODP_ERR("Queue (%d) not empty\n", i);
- }
-
- memset(sched, 0, sizeof(sched_global_t));
-
- if (odp_shm_free(shm) < 0) {
- ODP_ERR("Schedule[iquery] "
- "term: shm release.\n");
- return -1;
- }
- return 0;
-}
-
-/*
- * These APIs are used to manipulate thread's interests.
- */
-static void thread_set_interest(sched_thread_local_t *thread,
- unsigned int queue_index, int prio);
-
-static void thread_clear_interest(sched_thread_local_t *thread,
- unsigned int queue_index, int prio);
-
-static void thread_set_interests(sched_thread_local_t *thread,
- queue_index_bitmap_t *set);
-
-static void thread_clear_interests(sched_thread_local_t *thread,
- queue_index_bitmap_t *clear);
-
-static void sched_thread_local_reset(void)
-{
- int prio;
- queue_index_sparse_t *index;
- sparse_bitmap_iterator_t *iterator;
-
- memset(&thread_local, 0, sizeof(sched_thread_local_t));
-
- thread_local.thread = odp_thread_id();
- thread_local.cache.queue = ODP_QUEUE_INVALID;
- thread_local.ordered.src_queue = NULL_INDEX;
-
- odp_rwlock_init(&thread_local.lock);
-
- for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
- index = &thread_local.indexes[prio];
- iterator = &thread_local.iterators[prio];
-
- sparse_bitmap_zero(index);
- sparse_bitmap_iterator(iterator, index);
- }
-}
-
-static int schedule_init_local(void)
-{
- int group;
- sched_group_t *G;
- queue_index_bitmap_t collect;
-
- wapl_bitmap_zero(&collect);
- sched_thread_local_reset();
-
- /* Collect all queue indexes of the schedule groups
- * which this thread has subscribed
- */
- for (group = 0; group < NUM_SCHED_GRPS; group++) {
- G = &sched->groups[group];
- odp_rwlock_read_lock(&G->lock);
-
- if ((group < SCHED_GROUP_NAMED || G->allocated) &&
- odp_thrmask_isset(&G->threads, thread_local.thread))
- wapl_bitmap_or(&collect, &collect, &G->queues);
-
- odp_rwlock_read_unlock(&G->lock);
- }
-
- /* Distribute the above collected queue indexes into
- * thread local interests per priority level.
- */
- thread_set_interests(&thread_local, &collect);
-
- /* "Night gathers, and now my watch begins..." */
- sched->threads[thread_local.thread] = &thread_local;
- return 0;
-}
-
-static inline void schedule_release_context(void);
-
-static int schedule_term_local(void)
-{
- int group;
- sched_group_t *G;
-
- if (thread_local.cache.count) {
- ODP_ERR("Locally pre-scheduled events exist.\n");
- return -1;
- }
-
- schedule_release_context();
-
- /* Unsubscribe all named schedule groups */
- for (group = SCHED_GROUP_NAMED;
- group < NUM_SCHED_GRPS; group++) {
- G = &sched->groups[group];
- odp_rwlock_write_lock(&G->lock);
-
- if (G->allocated && odp_thrmask_isset(
- &G->threads, thread_local.thread))
- odp_thrmask_clr(&G->threads, thread_local.thread);
-
- odp_rwlock_write_unlock(&G->lock);
- }
-
- /* "...for this night and all the nights to come." */
- sched->threads[thread_local.thread] = NULL;
- sched_thread_local_reset();
- return 0;
-}
-
-static int init_sched_queue(uint32_t queue_index,
- const odp_schedule_param_t *sched_param)
-{
- int prio, group, thread, i;
- sched_prio_t *P;
- sched_group_t *G;
- sched_thread_local_t *local;
-
- prio = sched_param->prio;
- group = sched_param->group;
-
- G = &sched->groups[group];
- odp_rwlock_write_lock(&G->lock);
-
- /* Named schedule group must be created prior
- * to queue creation to this group.
- */
- if (group >= SCHED_GROUP_NAMED && !G->allocated) {
- odp_rwlock_write_unlock(&G->lock);
- return -1;
- }
-
- /* Record the queue in its priority level globally */
- P = &sched->prios[prio];
-
- odp_rwlock_write_lock(&P->lock);
- wapl_bitmap_set(&P->queues, queue_index);
- odp_rwlock_write_unlock(&P->lock);
-
- /* Record the queue in its schedule group */
- wapl_bitmap_set(&G->queues, queue_index);
-
- /* Cache queue parameters for easy reference */
- memcpy(&sched->queues[queue_index],
- sched_param, sizeof(odp_schedule_param_t));
-
- odp_atomic_init_u64(&sched->order[queue_index].ctx, 0);
- odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0);
-
- for (i = 0; i < CONFIG_QUEUE_MAX_ORD_LOCKS; i++)
- odp_atomic_init_u64(&sched->order[queue_index].lock[i], 0);
-
- /* Update all threads in this schedule group to
- * start check this queue index upon scheduling.
- */
- thread = odp_thrmask_first(&G->threads);
- while (thread >= 0) {
- local = sched->threads[thread];
- thread_set_interest(local, queue_index, prio);
- thread = odp_thrmask_next(&G->threads, thread);
- }
-
- odp_rwlock_write_unlock(&G->lock);
- return 0;
-}
-
-/*
- * Must be called with schedule group's rwlock held.
- * This is also being used in destroy_schedule_group()
- * to destroy all orphan queues while destroying a whole
- * schedule group.
- */
-static void __destroy_sched_queue(
- sched_group_t *G, uint32_t queue_index)
-{
- int prio, thread;
- sched_prio_t *P;
- sched_thread_local_t *local;
-
- prio = sched->queues[queue_index].prio;
-
- /* Forget the queue in its schedule group */
- wapl_bitmap_clear(&G->queues, queue_index);
-
- /* Forget queue schedule parameters */
- memset(&sched->queues[queue_index],
- 0, sizeof(odp_schedule_param_t));
-
- /* Update all threads in this schedule group to
- * stop check this queue index upon scheduling.
- */
- thread = odp_thrmask_first(&G->threads);
- while (thread >= 0) {
- local = sched->threads[thread];
- thread_clear_interest(local, queue_index, prio);
- thread = odp_thrmask_next(&G->threads, thread);
- }
-
- /* Forget the queue in its priority level globally */
- P = &sched->prios[prio];
-
- odp_rwlock_write_lock(&P->lock);
- wapl_bitmap_clear(&P->queues, queue_index);
- odp_rwlock_write_unlock(&P->lock);
-}
-
-static void destroy_sched_queue(uint32_t queue_index)
-{
- int group;
- sched_group_t *G;
-
- group = sched->queues[queue_index].group;
-
- G = &sched->groups[group];
- odp_rwlock_write_lock(&G->lock);
-
- /* Named schedule group could have been destroyed
- * earlier and left these orphan queues.
- */
- if (group >= SCHED_GROUP_NAMED && !G->allocated) {
- odp_rwlock_write_unlock(&G->lock);
- return;
- }
-
- __destroy_sched_queue(G, queue_index);
- odp_rwlock_write_unlock(&G->lock);
-
- if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED &&
- odp_atomic_load_u64(&sched->order[queue_index].ctx) !=
- odp_atomic_load_u64(&sched->order[queue_index].next_ctx))
- ODP_ERR("queue reorder incomplete\n");
-}
-
-static int pktio_cmd_queue_hash(int pktio, int pktin)
-{
- return (pktio ^ pktin) % PKTIO_CMD_QUEUES;
-}
-
-static inline pktio_cmd_t *alloc_pktio_cmd(void)
-{
- int i;
- pktio_cmd_t *cmd = NULL;
-
- odp_rwlock_write_lock(&sched->pktio_poll.lock);
-
- /* Find next free command */
- for (i = 0; i < NUM_PKTIO_CMD; i++) {
- if (sched->pktio_poll.commands[i].index
- == PKTIO_CMD_FREE) {
- cmd = &sched->pktio_poll.commands[i];
- cmd->index = i;
- break;
- }
- }
-
- odp_rwlock_write_unlock(&sched->pktio_poll.lock);
- return cmd;
-}
-
-static inline void free_pktio_cmd(pktio_cmd_t *cmd)
-{
- odp_rwlock_write_lock(&sched->pktio_poll.lock);
-
- cmd->index = PKTIO_CMD_FREE;
-
- odp_rwlock_write_unlock(&sched->pktio_poll.lock);
-}
-
-static void schedule_pktio_start(int pktio, int count, int pktin[])
-{
- int i, index;
- pktio_cmd_t *cmd;
-
- if (count > MAX_PKTIN)
- ODP_ABORT("Too many input queues for scheduler\n");
-
- /* Record the active commands count per pktio interface */
- sched->pktio_poll.actives[pktio] = count;
-
- /* Create a pktio poll command per pktin */
- for (i = 0; i < count; i++) {
- cmd = alloc_pktio_cmd();
-
- if (cmd == NULL)
- ODP_ABORT("Scheduler out of pktio commands\n");
-
- index = pktio_cmd_queue_hash(pktio, pktin[i]);
-
- cmd->pktio = pktio;
- cmd->count = 1;
- cmd->pktin[0] = pktin[i];
- ring_enq(&sched->pktio_poll.queues[index].ring,
- PKTIO_RING_MASK, cmd->index);
- }
-}
-
-static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED)
-{
- int remains;
-
- odp_rwlock_write_lock(&sched->pktio_poll.lock);
-
- sched->pktio_poll.actives[pktio]--;
- remains = sched->pktio_poll.actives[pktio];
-
- odp_rwlock_write_unlock(&sched->pktio_poll.lock);
- return remains;
-}
-
-#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)
-#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)
-
-static inline bool do_schedule_prio(int prio);
-
-static inline int pop_cache_events(odp_event_t ev[], unsigned int max)
-{
- int k = 0;
- event_cache_t *cache;
-
- cache = &thread_local.cache;
- while (cache->count && max) {
- ev[k] = *cache->top++;
- k++;
- max--;
- cache->count--;
- }
-
- return k;
-}
-
-static inline void assign_queue_handle(odp_queue_t *handle)
-{
- if (handle)
- *handle = thread_local.cache.queue;
-}
-
-static inline void pktio_poll_input(void)
-{
- int i, hash;
- uint32_t index;
-
- ring_t *ring;
- pktio_cmd_t *cmd;
-
- /*
- * Each thread starts the search for a poll command
- * from the hash(threadID) queue to mitigate contentions.
- * If the queue is empty, it moves to other queues.
- *
- * Most of the times, the search stops on the first
- * command found to optimize multi-threaded performance.
- * A small portion of polls have to do full iteration to
- * avoid packet input starvation when there are less
- * threads than command queues.
- */
- hash = thread_local.thread % PKTIO_CMD_QUEUES;
-
- for (i = 0; i < PKTIO_CMD_QUEUES; i++,
- hash = (hash + 1) % PKTIO_CMD_QUEUES) {
- ring = &sched->pktio_poll.queues[hash].ring;
- index = ring_deq(ring, PKTIO_RING_MASK);
-
- if (odp_unlikely(index == RING_EMPTY))
- continue;
-
- cmd = &sched->pktio_poll.commands[index];
-
- /* Poll packet input */
- if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio,
- cmd->count,
- cmd->pktin))) {
- /* Pktio stopped or closed. Remove poll
- * command and call stop_finalize when all
- * commands of the pktio has been removed.
- */
- if (schedule_pktio_stop(cmd->pktio,
- cmd->pktin[0]) == 0)
- sched_cb_pktio_stop_finalize(cmd->pktio);
-
- free_pktio_cmd(cmd);
- } else {
- /* Continue scheduling the pktio */
- ring_enq(ring, PKTIO_RING_MASK, index);
-
- /* Do not iterate through all pktin poll
- * command queues every time.
- */
- if (odp_likely(thread_local.pktin_polls & 0xF))
- break;
- }
- }
-
- thread_local.pktin_polls++;
-}
-
-/*
- * Schedule queues
- */
-static int do_schedule(odp_queue_t *out_queue,
- odp_event_t out_ev[], unsigned int max_num)
-{
- int prio, count;
-
- /* Consume locally cached events */
- count = pop_cache_events(out_ev, max_num);
- if (count > 0) {
- assign_queue_handle(out_queue);
- return count;
- }
-
- schedule_release_context();
-
- if (odp_unlikely(thread_local.pause))
- return count;
-
- DO_SCHED_LOCK();
- /* Schedule events */
- for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
- /* Round robin iterate the interested queue
- * indexes in this priority level to compete
- * and consume available queues
- */
- if (!do_schedule_prio(prio))
- continue;
-
- count = pop_cache_events(out_ev, max_num);
- assign_queue_handle(out_queue);
- DO_SCHED_UNLOCK();
- return count;
- }
-
- DO_SCHED_UNLOCK();
-
- /* Poll packet input when there are no events */
- pktio_poll_input();
- return 0;
-}
-
-static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
- odp_event_t out_ev[], unsigned int max_num)
-{
- int count, first = 1;
- odp_time_t next, wtime;
-
- while (1) {
- count = do_schedule(out_queue, out_ev, max_num);
-
- if (count)
- break;
-
- if (wait == ODP_SCHED_WAIT)
- continue;
-
- if (wait == ODP_SCHED_NO_WAIT)
- break;
-
- if (first) {
- wtime = odp_time_local_from_ns(wait);
- next = odp_time_sum(odp_time_local(), wtime);
- first = 0;
- continue;
- }
-
- if (odp_time_cmp(next, odp_time_local()) < 0)
- break;
- }
-
- return count;
-}
-
-static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait)
-{
- odp_event_t ev;
-
- ev = ODP_EVENT_INVALID;
-
- schedule_loop(out_queue, wait, &ev, 1);
-
- return ev;
-}
-
-static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
- odp_event_t events[], int num)
-{
- return schedule_loop(out_queue, wait, events, num);
-}
-
-static void schedule_pause(void)
-{
- thread_local.pause = 1;
-}
-
-static void schedule_resume(void)
-{
- thread_local.pause = 0;
-}
-
-static uint64_t schedule_wait_time(uint64_t ns)
-{
- return ns;
-}
-
-static int number_of_priorites(void)
-{
- return NUM_SCHED_PRIO;
-}
-
-/*
- * Create a named schedule group with pre-defined
- * set of subscription threads.
- *
- * Sched queues belonging to this group must be
- * created after the group creation. Upon creation
- * the group holds 0 sched queues.
- */
-static odp_schedule_group_t schedule_group_create(
- const char *name, const odp_thrmask_t *mask)
-{
- int group;
- sched_group_t *G;
-
- for (group = SCHED_GROUP_NAMED;
- group < NUM_SCHED_GRPS; group++) {
- G = &sched->groups[group];
-
- odp_rwlock_write_lock(&G->lock);
- if (!G->allocated) {
- strncpy(G->name, name ? name : "",
- ODP_SCHED_GROUP_NAME_LEN - 1);
- odp_thrmask_copy(&G->threads, mask);
- wapl_bitmap_zero(&G->queues);
-
- G->allocated = true;
- odp_rwlock_write_unlock(&G->lock);
- return (odp_schedule_group_t)group;
- }
- odp_rwlock_write_unlock(&G->lock);
- }
-
- return ODP_SCHED_GROUP_INVALID;
-}
-
-static inline void __destroy_group_queues(sched_group_t *group)
-{
- unsigned int index;
- queue_index_bitmap_t queues;
- wapl_bitmap_iterator_t it;
-
- /* Constructor */
- wapl_bitmap_zero(&queues);
- wapl_bitmap_copy(&queues, &group->queues);
- wapl_bitmap_iterator(&it, &queues);
-
- /* Walk through the queue index bitmap */
- for (it.start(&it); it.has_next(&it);) {
- index = it.next(&it);
- __destroy_sched_queue(group, index);
- }
-}
-
-/*
- * Destroy a named schedule group.
- */
-static int schedule_group_destroy(odp_schedule_group_t group)
-{
- int done = -1;
- sched_group_t *G;
-
- if (group < SCHED_GROUP_NAMED ||
- group >= NUM_SCHED_GRPS)
- return -1;
-
- G = &sched->groups[group];
- odp_rwlock_write_lock(&G->lock);
-
- if (G->allocated) {
- /* Destroy all queues in this schedule group
- * and leave no orphan queues.
- */
- __destroy_group_queues(G);
-
- done = 0;
- G->allocated = false;
- wapl_bitmap_zero(&G->queues);
- odp_thrmask_zero(&G->threads);
- memset(G->name, 0, ODP_SCHED_GROUP_NAME_LEN);
- }
-
- odp_rwlock_write_unlock(&G->lock);
- return done;
-}
-
-static odp_schedule_group_t schedule_group_lookup(const char *name)
-{
- int group;
- sched_group_t *G;
-
- for (group = SCHED_GROUP_NAMED;
- group < NUM_SCHED_GRPS; group++) {
- G = &sched->groups[group];
-
- odp_rwlock_read_lock(&G->lock);
- if (strcmp(name, G->name) == 0) {
- odp_rwlock_read_unlock(&G->lock);
- return (odp_schedule_group_t)group;
- }
- odp_rwlock_read_unlock(&G->lock);
- }
-
- return ODP_SCHED_GROUP_INVALID;
-}
-
-static int schedule_group_join(odp_schedule_group_t group,
- const odp_thrmask_t *mask)
-{
- int done = -1, thread;
- sched_group_t *G;
- sched_thread_local_t *local;
-
- /* Named schedule group only */
- if (group < SCHED_GROUP_NAMED ||
- group >= NUM_SCHED_GRPS)
- return done;
-
- G = &sched->groups[group];
- odp_rwlock_write_lock(&G->lock);
-
- if (G->allocated) {
- /* Make new joined threads to start check
- * queue indexes in this schedule group
- */
- thread = odp_thrmask_first(mask);
- while (thread >= 0) {
- local = sched->threads[thread];
- thread_set_interests(local, &G->queues);
-
- odp_thrmask_set(&G->threads, thread);
- thread = odp_thrmask_next(mask, thread);
- }
- done = 0;
- }
-
- odp_rwlock_write_unlock(&G->lock);
- return done;
-}
-
-static int schedule_group_leave(odp_schedule_group_t group,
- const odp_thrmask_t *mask)
-{
- int done = -1, thread;
- sched_group_t *G;
- sched_thread_local_t *local;
-
- /* Named schedule group only */
- if (group < SCHED_GROUP_NAMED ||
- group >= NUM_SCHED_GRPS)
- return done;
-
- G = &sched->groups[group];
- odp_rwlock_write_lock(&G->lock);
-
- if (G->allocated) {
- /* Make leaving threads to stop check
- * queue indexes in this schedule group
- */
- thread = odp_thrmask_first(mask);
- while (thread >= 0) {
- local = sched->threads[thread];
- thread_clear_interests(local, &G->queues);
-
- odp_thrmask_clr(&G->threads, thread);
- thread = odp_thrmask_next(mask, thread);
- }
- done = 0;
- }
-
- odp_rwlock_write_unlock(&G->lock);
- return done;
-}
-
-static int schedule_group_thrmask(odp_schedule_group_t group,
- odp_thrmask_t *thrmask)
-{
- int done = -1;
- sched_group_t *G;
-
- /* Named schedule group only */
- if (group < SCHED_GROUP_NAMED ||
- group >= NUM_SCHED_GRPS)
- return done;
-
- G = &sched->groups[group];
- odp_rwlock_read_lock(&G->lock);
-
- if (G->allocated && thrmask != NULL) {
- done = 0;
- odp_thrmask_copy(thrmask, &G->threads);
- }
-
- odp_rwlock_read_unlock(&G->lock);
- return done;
-}
-
-static int schedule_group_info(odp_schedule_group_t group,
- odp_schedule_group_info_t *info)
-{
- int done = -1;
- sched_group_t *G;
-
- /* Named schedule group only */
- if (group < SCHED_GROUP_NAMED ||
- group >= NUM_SCHED_GRPS)
- return done;
-
- G = &sched->groups[group];
- odp_rwlock_read_lock(&G->lock);
-
- if (G->allocated && info != NULL) {
- done = 0;
- info->name = G->name;
- odp_thrmask_copy(&info->thrmask, &G->threads);
- }
-
- odp_rwlock_read_unlock(&G->lock);
- return done;
-}
-
-/* This function is a no-op */
-static void schedule_prefetch(int num ODP_UNUSED)
-{
-}
-
-/*
- * Limited to join and leave pre-defined schedule groups
- * before and after thread local initialization or termination.
- */
-static int group_add_thread(odp_schedule_group_t group, int thread)
-{
- sched_group_t *G;
-
- if (group < 0 || group >= SCHED_GROUP_NAMED)
- return -1;
-
- G = &sched->groups[group];
-
- odp_rwlock_write_lock(&G->lock);
- odp_thrmask_set(&G->threads, thread);
- odp_rwlock_write_unlock(&G->lock);
- return 0;
-}
-
-static int group_remove_thread(odp_schedule_group_t group, int thread)
-{
- sched_group_t *G;
-
- if (group < 0 || group >= SCHED_GROUP_NAMED)
- return -1;
-
- G = &sched->groups[group];
-
- odp_rwlock_write_lock(&G->lock);
- odp_thrmask_clr(&G->threads, thread);
- odp_rwlock_write_unlock(&G->lock);
- return 0;
-}
-
-static int number_of_groups(void)
-{
- return NUM_SCHED_GRPS;
-}
-
-static int schedule_sched_queue(uint32_t queue_index)
-{
- /* Set available indications globally */
- sched->availables[queue_index] = true;
- return 0;
-}
-
-static int schedule_unsched_queue(uint32_t queue_index)
-{
- /* Clear available indications globally */
- sched->availables[queue_index] = false;
- return 0;
-}
-
-static void schedule_release_atomic(void)
-{
- unsigned int queue_index;
-
- if ((thread_local.atomic != NULL) &&
- (thread_local.cache.count == 0)) {
- queue_index = thread_local.atomic - sched->availables;
- thread_local.atomic = NULL;
- sched->availables[queue_index] = true;
- }
-}
-
-static inline int ordered_own_turn(uint32_t queue_index)
-{
- uint64_t ctx;
-
- ctx = odp_atomic_load_acq_u64(&sched->order[queue_index].ctx);
-
- return ctx == thread_local.ordered.ctx;
-}
-
-static inline void wait_for_order(uint32_t queue_index)
-{
- /* Busy loop to synchronize ordered processing */
- while (1) {
- if (ordered_own_turn(queue_index))
- break;
- odp_cpu_pause();
- }
-}
-
-/**
- * Perform stashed enqueue operations
- *
- * Should be called only when already in order.
- */
-static inline void ordered_stash_release(void)
-{
- int i;
-
- for (i = 0; i < thread_local.ordered.stash_num; i++) {
- queue_entry_t *queue_entry;
- odp_buffer_hdr_t **buf_hdr;
- int num;
-
- queue_entry = thread_local.ordered.stash[i].queue_entry;
- buf_hdr = thread_local.ordered.stash[i].buf_hdr;
- num = thread_local.ordered.stash[i].num;
-
- queue_fn->enq_multi(qentry_to_int(queue_entry), buf_hdr, num);
- }
- thread_local.ordered.stash_num = 0;
-}
-
-static inline void release_ordered(void)
-{
- uint32_t qi;
- unsigned i;
-
- qi = thread_local.ordered.src_queue;
-
- wait_for_order(qi);
-
- /* Release all ordered locks */
- for (i = 0; i < sched->queues[qi].lock_count; i++) {
- if (!thread_local.ordered.lock_called.u8[i])
- odp_atomic_store_rel_u64(&sched->order[qi].lock[i],
- thread_local.ordered.ctx + 1);
- }
-
- thread_local.ordered.lock_called.all = 0;
- thread_local.ordered.src_queue = NULL_INDEX;
- thread_local.ordered.in_order = 0;
-
- ordered_stash_release();
-
- /* Next thread can continue processing */
- odp_atomic_add_rel_u64(&sched->order[qi].ctx, 1);
-}
-
-static void schedule_release_ordered(void)
-{
- uint32_t queue_index;
-
- queue_index = thread_local.ordered.src_queue;
-
- if (odp_unlikely((queue_index == NULL_INDEX) ||
- thread_local.cache.count))
- return;
-
- release_ordered();
-}
-
-static inline void schedule_release_context(void)
-{
- if (thread_local.ordered.src_queue != NULL_INDEX)
- release_ordered();
- else
- schedule_release_atomic();
-}
-
-static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
- int num, int *ret)
-{
- int i;
- uint32_t stash_num = thread_local.ordered.stash_num;
- queue_entry_t *dst_queue = qentry_from_int(q_int);
- uint32_t src_queue = thread_local.ordered.src_queue;
-
- if ((src_queue == NULL_INDEX) || thread_local.ordered.in_order)
- return 0;
-
- if (ordered_own_turn(src_queue)) {
- /* Own turn, so can do enqueue directly. */
- thread_local.ordered.in_order = 1;
- ordered_stash_release();
- return 0;
- }
-
- /* Pktout may drop packets, so the operation cannot be stashed. */
- if (dst_queue->s.pktout.pktio != ODP_PKTIO_INVALID ||
- odp_unlikely(stash_num >= MAX_ORDERED_STASH)) {
- /* If the local stash is full, wait until it is our turn and
- * then release the stash and do enqueue directly. */
- wait_for_order(src_queue);
-
- thread_local.ordered.in_order = 1;
-
- ordered_stash_release();
- return 0;
- }
-
- thread_local.ordered.stash[stash_num].queue_entry = dst_queue;
- thread_local.ordered.stash[stash_num].num = num;
- for (i = 0; i < num; i++)
- thread_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i];
-
- thread_local.ordered.stash_num++;
-
- *ret = num;
- return 1;
-}
-
-static void order_lock(void)
-{
- uint32_t queue_index;
-
- queue_index = thread_local.ordered.src_queue;
-
- if (queue_index == NULL_INDEX)
- return;
-
- wait_for_order(queue_index);
-}
-
-static void order_unlock(void)
-{
-}
-
-static void schedule_order_lock(unsigned lock_index)
-{
- odp_atomic_u64_t *ord_lock;
- uint32_t queue_index;
-
- queue_index = thread_local.ordered.src_queue;
-
- ODP_ASSERT(queue_index != NULL_INDEX &&
- lock_index <= sched->queues[queue_index].lock_count &&
- !thread_local.ordered.lock_called.u8[lock_index]);
-
- ord_lock = &sched->order[queue_index].lock[lock_index];
-
- /* Busy loop to synchronize ordered processing */
- while (1) {
- uint64_t lock_seq;
-
- lock_seq = odp_atomic_load_acq_u64(ord_lock);
-
- if (lock_seq == thread_local.ordered.ctx) {
- thread_local.ordered.lock_called.u8[lock_index] = 1;
- return;
- }
- odp_cpu_pause();
- }
-}
-
-static void schedule_order_unlock(unsigned lock_index)
-{
- odp_atomic_u64_t *ord_lock;
- uint32_t queue_index;
-
- queue_index = thread_local.ordered.src_queue;
-
- ODP_ASSERT(queue_index != NULL_INDEX &&
- lock_index <= sched->queues[queue_index].lock_count);
-
- ord_lock = &sched->order[queue_index].lock[lock_index];
-
- ODP_ASSERT(thread_local.ordered.ctx == odp_atomic_load_u64(ord_lock));
-
- odp_atomic_store_rel_u64(ord_lock, thread_local.ordered.ctx + 1);
-}
-
-static unsigned schedule_max_ordered_locks(void)
-{
- return CONFIG_QUEUE_MAX_ORD_LOCKS;
-}
-
-static inline bool is_atomic_queue(unsigned int queue_index)
-{
- return (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ATOMIC);
-}
-
-static inline bool is_ordered_queue(unsigned int queue_index)
-{
- return (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED);
-}
-
-static void schedule_save_context(uint32_t queue_index)
-{
- if (is_atomic_queue(queue_index)) {
- thread_local.atomic = &sched->availables[queue_index];
- } else if (is_ordered_queue(queue_index)) {
- uint64_t ctx;
- odp_atomic_u64_t *next_ctx;
-
- next_ctx = &sched->order[queue_index].next_ctx;
- ctx = odp_atomic_fetch_inc_u64(next_ctx);
-
- thread_local.ordered.ctx = ctx;
- thread_local.ordered.src_queue = queue_index;
- }
-}
-
-/* Fill in scheduler interface */
-const schedule_fn_t schedule_iquery_fn = {
- .status_sync = 1,
- .pktio_start = schedule_pktio_start,
- .thr_add = group_add_thread,
- .thr_rem = group_remove_thread,
- .num_grps = number_of_groups,
- .init_queue = init_sched_queue,
- .destroy_queue = destroy_sched_queue,
- .sched_queue = schedule_sched_queue,
- .ord_enq_multi = schedule_ord_enq_multi,
- .init_global = schedule_init_global,
- .term_global = schedule_term_global,
- .init_local = schedule_init_local,
- .term_local = schedule_term_local,
- .order_lock = order_lock,
- .order_unlock = order_unlock,
- .max_ordered_locks = schedule_max_ordered_locks,
- .unsched_queue = schedule_unsched_queue,
- .save_context = schedule_save_context
-};
-
-/* Fill in scheduler API calls */
-const schedule_api_t schedule_iquery_api = {
- .schedule_wait_time = schedule_wait_time,
- .schedule = schedule,
- .schedule_multi = schedule_multi,
- .schedule_pause = schedule_pause,
- .schedule_resume = schedule_resume,
- .schedule_release_atomic = schedule_release_atomic,
- .schedule_release_ordered = schedule_release_ordered,
- .schedule_prefetch = schedule_prefetch,
- .schedule_num_prio = number_of_priorites,
- .schedule_group_create = schedule_group_create,
- .schedule_group_destroy = schedule_group_destroy,
- .schedule_group_lookup = schedule_group_lookup,
- .schedule_group_join = schedule_group_join,
- .schedule_group_leave = schedule_group_leave,
- .schedule_group_thrmask = schedule_group_thrmask,
- .schedule_group_info = schedule_group_info,
- .schedule_order_lock = schedule_order_lock,
- .schedule_order_unlock = schedule_order_unlock
-};
-
-static void thread_set_interest(sched_thread_local_t *thread,
- unsigned int queue_index, int prio)
-{
- queue_index_sparse_t *index;
-
- if (thread == NULL)
- return;
-
- if (prio >= NUM_SCHED_PRIO)
- return;
-
- index = &thread->indexes[prio];
-
- odp_rwlock_write_lock(&thread->lock);
- sparse_bitmap_set(index, queue_index);
- odp_rwlock_write_unlock(&thread->lock);
-}
-
-static void thread_clear_interest(sched_thread_local_t *thread,
- unsigned int queue_index, int prio)
-{
- queue_index_sparse_t *index;
-
- if (thread == NULL)
- return;
-
- if (prio >= NUM_SCHED_PRIO)
- return;
-
- index = &thread->indexes[prio];
-
- odp_rwlock_write_lock(&thread->lock);
- sparse_bitmap_clear(index, queue_index);
- odp_rwlock_write_unlock(&thread->lock);
-}
-
-static void thread_set_interests(sched_thread_local_t *thread,
- queue_index_bitmap_t *set)
-{
- int prio;
- sched_prio_t *P;
- unsigned int queue_index;
- queue_index_bitmap_t subset;
- wapl_bitmap_iterator_t it;
-
- if (thread == NULL || set == NULL)
- return;
-
- for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
- P = &sched->prios[prio];
- odp_rwlock_read_lock(&P->lock);
-
- /* The collection of queue indexes in 'set'
- * may belong to several priority levels.
- */
- wapl_bitmap_zero(&subset);
- wapl_bitmap_and(&subset, &P->queues, set);
-
- odp_rwlock_read_unlock(&P->lock);
-
- /* Add the subset to local indexes */
- wapl_bitmap_iterator(&it, &subset);
- for (it.start(&it); it.has_next(&it);) {
- queue_index = it.next(&it);
- thread_set_interest(thread, queue_index, prio);
- }
- }
-}
-
-static void thread_clear_interests(sched_thread_local_t *thread,
- queue_index_bitmap_t *clear)
-{
- int prio;
- sched_prio_t *P;
- unsigned int queue_index;
- queue_index_bitmap_t subset;
- wapl_bitmap_iterator_t it;
-
- if (thread == NULL || clear == NULL)
- return;
-
- for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
- P = &sched->prios[prio];
- odp_rwlock_read_lock(&P->lock);
-
- /* The collection of queue indexes in 'clear'
- * may belong to several priority levels.
- */
- wapl_bitmap_zero(&subset);
- wapl_bitmap_and(&subset, &P->queues, clear);
-
- odp_rwlock_read_unlock(&P->lock);
-
- /* Remove the subset from local indexes */
- wapl_bitmap_iterator(&it, &subset);
- for (it.start(&it); it.has_next(&it);) {
- queue_index = it.next(&it);
- thread_clear_interest(thread, queue_index, prio);
- }
- }
-}
-
-static inline bool compete_atomic_queue(unsigned int queue_index)
-{
- bool expected = sched->availables[queue_index];
-
- if (expected && is_atomic_queue(queue_index)) {
- expected = __atomic_compare_exchange_n(
- &sched->availables[queue_index],
- &expected, false, 0,
- __ATOMIC_RELEASE, __ATOMIC_RELAXED);
- }
-
- return expected;
-}
-
-static inline int consume_queue(int prio, unsigned int queue_index)
-{
- int count;
- unsigned int max = MAX_DEQ;
- event_cache_t *cache = &thread_local.cache;
-
- /* Low priorities have smaller batch size to limit
- * head of line blocking latency.
- */
- if (odp_unlikely(MAX_DEQ > 1 && prio > ODP_SCHED_PRIO_DEFAULT))
- max = MAX_DEQ / 2;
-
- /* For ordered queues we want consecutive events to
- * be dispatched to separate threads, so do not cache
- * them locally.
- */
- if (is_ordered_queue(queue_index))
- max = 1;
-
- count = sched_cb_queue_deq_multi(
- queue_index, cache->stash, max);
-
- if (count < 0) {
- DO_SCHED_UNLOCK();
- sched_cb_queue_destroy_finalize(queue_index);
- DO_SCHED_LOCK();
- return 0;
- }
-
- if (count == 0)
- return 0;
-
- cache->top = &cache->stash[0];
- cache->count = count;
- cache->queue = sched_cb_queue_handle(queue_index);
- return count;
-}
-
-static inline bool do_schedule_prio(int prio)
-{
- int nbits, next, end;
- unsigned int queue_index;
- sparse_bitmap_iterator_t *it;
-
- it = &thread_local.iterators[prio];
- nbits = (int)*it->_base.last;
-
- /* No interests at all! */
- if (nbits <= 0)
- return false;
-
- /* In critical path, cannot afford iterator calls,
- * do it manually with internal knowledge
- */
- it->_start = (it->_start + 1) % nbits;
- end = it->_start + nbits;
-
- for (next = it->_start; next < end; next++) {
- queue_index = it->_base.il[next % nbits];
-
- if (!compete_atomic_queue(queue_index))
- continue;
-
- if (!consume_queue(prio, queue_index))
- continue;
-
- return true;
- }
-
- return false;
-}