diff options
Diffstat (limited to 'platform/linux-generic/odp_queue_basic.c')
-rw-r--r-- | platform/linux-generic/odp_queue_basic.c | 1298 |
1 files changed, 1298 insertions, 0 deletions
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c new file mode 100644 index 000000000..ec51b65f2 --- /dev/null +++ b/platform/linux-generic/odp_queue_basic.c @@ -0,0 +1,1298 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (c) 2013-2018 Linaro Limited + * Copyright (c) 2021-2023 Nokia + */ + +#include <odp/api/align.h> +#include <odp/api/hints.h> +#include <odp/api/packet_io.h> +#include <odp/api/queue.h> +#include <odp/api/schedule.h> +#include <odp/api/shared_memory.h> +#include <odp/api/std_types.h> +#include <odp/api/sync.h> +#include <odp/api/ticketlock.h> +#include <odp/api/traffic_mngr.h> + +#include <odp/api/plat/queue_inline_types.h> +#include <odp/api/plat/sync_inlines.h> +#include <odp/api/plat/ticketlock_inlines.h> + +#include <odp_config_internal.h> +#include <odp_debug_internal.h> +#include <odp_event_internal.h> +#include <odp_global_data.h> +#include <odp_init_internal.h> +#include <odp_libconfig_internal.h> +#include <odp_macros_internal.h> +#include <odp_packet_io_internal.h> +#include <odp_pool_internal.h> +#include <odp_queue_basic_internal.h> +#include <odp_queue_if.h> +#include <odp_schedule_if.h> +#include <odp_timer_internal.h> +#include <odp_string_internal.h> + +#include <inttypes.h> +#include <string.h> + +#define LOCK(queue_ptr) odp_ticketlock_lock(&((queue_ptr)->lock)) +#define UNLOCK(queue_ptr) odp_ticketlock_unlock(&((queue_ptr)->lock)) +#define LOCK_INIT(queue_ptr) odp_ticketlock_init(&((queue_ptr)->lock)) + +#define MIN_QUEUE_SIZE 32 +#define MAX_QUEUE_SIZE (1 * 1024 * 1024) + +static int queue_init(queue_entry_t *queue, const char *name, + const odp_queue_param_t *param); + +queue_global_t *_odp_queue_glb; +extern _odp_queue_inline_offset_t _odp_queue_inline_offset; + +static int queue_capa(odp_queue_capability_t *capa, int sched ODP_UNUSED) +{ + memset(capa, 0, sizeof(odp_queue_capability_t)); + + /* Reserve some queues for internal use */ + capa->max_queues = CONFIG_MAX_QUEUES - CONFIG_INTERNAL_QUEUES; + capa->plain.max_num = CONFIG_MAX_PLAIN_QUEUES; + capa->plain.max_size = _odp_queue_glb->config.max_queue_size; + capa->plain.lockfree.max_num = _odp_queue_glb->queue_lf_num; + capa->plain.lockfree.max_size = _odp_queue_glb->queue_lf_size; + + return 0; +} + +static int read_config_file(queue_global_t *_odp_queue_glb) +{ + const char *str; + uint32_t val_u32; + int val = 0; + + _ODP_PRINT("Queue config:\n"); + + str = "queue_basic.max_queue_size"; + if (!_odp_libconfig_lookup_int(str, &val)) { + _ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + + val_u32 = val; + + if (val_u32 > MAX_QUEUE_SIZE || val_u32 < MIN_QUEUE_SIZE || + !_ODP_CHECK_IS_POWER2(val_u32)) { + _ODP_ERR("Bad value %s = %u\n", str, val_u32); + return -1; + } + + _odp_queue_glb->config.max_queue_size = val_u32; + _ODP_PRINT(" %s: %u\n", str, val_u32); + + str = "queue_basic.default_queue_size"; + if (!_odp_libconfig_lookup_int(str, &val)) { + _ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + + val_u32 = val; + + if (val_u32 > _odp_queue_glb->config.max_queue_size || + val_u32 < MIN_QUEUE_SIZE || + !_ODP_CHECK_IS_POWER2(val_u32)) { + _ODP_ERR("Bad value %s = %u\n", str, val_u32); + return -1; + } + + _odp_queue_glb->config.default_queue_size = val_u32; + _ODP_PRINT(" %s: %u\n\n", str, val_u32); + + return 0; +} + +static int queue_init_global(void) +{ + uint32_t i; + odp_shm_t shm; + uint32_t lf_size = 0; + queue_lf_func_t *lf_func; + odp_queue_capability_t capa; + uint64_t mem_size; + + _ODP_DBG("Starts...\n"); + + /* Fill in queue entry field offsets for inline functions */ + memset(&_odp_queue_inline_offset, 0, + sizeof(_odp_queue_inline_offset_t)); + _odp_queue_inline_offset.context = offsetof(queue_entry_t, + param.context); + + shm = odp_shm_reserve("_odp_queue_basic_global", + sizeof(queue_global_t), + sizeof(queue_entry_t), + 0); + if (shm == ODP_SHM_INVALID) + return -1; + + _odp_queue_glb = odp_shm_addr(shm); + + memset(_odp_queue_glb, 0, sizeof(queue_global_t)); + + for (i = 0; i < CONFIG_MAX_QUEUES; i++) { + /* init locks */ + queue_entry_t *queue = qentry_from_index(i); + + LOCK_INIT(queue); + queue->index = i; + queue->handle = (odp_queue_t)queue; + } + + if (read_config_file(_odp_queue_glb)) { + odp_shm_free(shm); + return -1; + } + + _odp_queue_glb->queue_gbl_shm = shm; + mem_size = sizeof(uint32_t) * CONFIG_MAX_QUEUES * + (uint64_t)_odp_queue_glb->config.max_queue_size; + + shm = odp_shm_reserve("_odp_queue_basic_rings", mem_size, + ODP_CACHE_LINE_SIZE, + 0); + + if (shm == ODP_SHM_INVALID) { + odp_shm_free(_odp_queue_glb->queue_gbl_shm); + return -1; + } + + _odp_queue_glb->queue_ring_shm = shm; + _odp_queue_glb->ring_data = odp_shm_addr(shm); + + lf_func = &_odp_queue_glb->queue_lf_func; + _odp_queue_glb->queue_lf_num = _odp_queue_lf_init_global(&lf_size, lf_func); + _odp_queue_glb->queue_lf_size = lf_size; + + queue_capa(&capa, 0); + + _ODP_DBG("... done.\n"); + _ODP_DBG(" queue_entry_t size %zu\n", sizeof(queue_entry_t)); + _ODP_DBG(" max num queues %u\n", capa.max_queues); + _ODP_DBG(" max queue size %u\n", capa.plain.max_size); + _ODP_DBG(" max num lockfree %u\n", capa.plain.lockfree.max_num); + _ODP_DBG(" max lockfree size %u\n\n", capa.plain.lockfree.max_size); + + return 0; +} + +static int queue_init_local(void) +{ + return 0; +} + +static int queue_term_local(void) +{ + return 0; +} + +static int queue_term_global(void) +{ + int ret = 0; + queue_entry_t *queue; + int i; + + for (i = 0; i < CONFIG_MAX_QUEUES; i++) { + queue = qentry_from_index(i); + LOCK(queue); + if (queue->status != QUEUE_STATUS_FREE) { + _ODP_ERR("Not destroyed queue: %s\n", queue->name); + ret = -1; + } + UNLOCK(queue); + } + + _odp_queue_lf_term_global(); + + if (odp_shm_free(_odp_queue_glb->queue_ring_shm)) { + _ODP_ERR("shm free failed"); + ret = -1; + } + + if (odp_shm_free(_odp_queue_glb->queue_gbl_shm)) { + _ODP_ERR("shm free failed"); + ret = -1; + } + + return ret; +} + +static int queue_capability(odp_queue_capability_t *capa) +{ + return queue_capa(capa, 1); +} + +static odp_queue_type_t queue_type(odp_queue_t handle) +{ + return qentry_from_handle(handle)->type; +} + +static odp_schedule_sync_t queue_sched_type(odp_queue_t handle) +{ + return qentry_from_handle(handle)->param.sched.sync; +} + +static odp_schedule_prio_t queue_sched_prio(odp_queue_t handle) +{ + return qentry_from_handle(handle)->param.sched.prio; +} + +static odp_schedule_group_t queue_sched_group(odp_queue_t handle) +{ + return qentry_from_handle(handle)->param.sched.group; +} + +static uint32_t queue_lock_count(odp_queue_t handle) +{ + queue_entry_t *queue = qentry_from_handle(handle); + + return queue->param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + queue->param.sched.lock_count : 0; +} + +static odp_queue_t queue_create(const char *name, + const odp_queue_param_t *param) +{ + uint32_t i; + uint32_t max_idx; + queue_entry_t *queue; + void *queue_lf; + odp_queue_type_t type; + odp_queue_param_t default_param; + odp_queue_t handle = ODP_QUEUE_INVALID; + + if (param == NULL) { + odp_queue_param_init(&default_param); + param = &default_param; + } + + type = param->type; + + if (type == ODP_QUEUE_TYPE_SCHED) { + if (param->sched.prio < odp_schedule_min_prio() || + param->sched.prio > odp_schedule_max_prio()) { + _ODP_ERR("Bad queue priority: %i\n", param->sched.prio); + return ODP_QUEUE_INVALID; + } + } + + if (param->nonblocking == ODP_BLOCKING) { + if (param->size > _odp_queue_glb->config.max_queue_size) + return ODP_QUEUE_INVALID; + } else if (param->nonblocking == ODP_NONBLOCKING_LF) { + /* Only plain type lock-free queues supported */ + if (type != ODP_QUEUE_TYPE_PLAIN) + return ODP_QUEUE_INVALID; + if (param->size > _odp_queue_glb->queue_lf_size) + return ODP_QUEUE_INVALID; + } else { + /* Wait-free queues not supported */ + return ODP_QUEUE_INVALID; + } + + if (type == ODP_QUEUE_TYPE_SCHED) { + /* Start scheduled queue indices from zero to enable direct + * mapping to scheduler implementation indices. */ + i = 0; + max_idx = CONFIG_MAX_SCHED_QUEUES; + } else { + i = CONFIG_MAX_SCHED_QUEUES; + /* All internal queues are of type plain */ + max_idx = CONFIG_MAX_QUEUES; + } + + for (; i < max_idx; i++) { + queue = qentry_from_index(i); + + if (queue->status != QUEUE_STATUS_FREE) + continue; + + LOCK(queue); + if (queue->status == QUEUE_STATUS_FREE) { + if (queue_init(queue, name, param)) { + UNLOCK(queue); + return ODP_QUEUE_INVALID; + } + + if (!queue->spsc && + param->nonblocking == ODP_NONBLOCKING_LF) { + queue_lf_func_t *lf_fn; + + lf_fn = &_odp_queue_glb->queue_lf_func; + + queue_lf = _odp_queue_lf_create(queue); + + if (queue_lf == NULL) { + UNLOCK(queue); + return ODP_QUEUE_INVALID; + } + queue->queue_lf = queue_lf; + + queue->enqueue = lf_fn->enq; + queue->enqueue_multi = lf_fn->enq_multi; + queue->dequeue = lf_fn->deq; + queue->dequeue_multi = lf_fn->deq_multi; + queue->orig_dequeue_multi = lf_fn->deq_multi; + } + + if (type == ODP_QUEUE_TYPE_SCHED) + queue->status = QUEUE_STATUS_NOTSCHED; + else + queue->status = QUEUE_STATUS_READY; + + handle = queue->handle; + UNLOCK(queue); + break; + } + UNLOCK(queue); + } + + if (handle == ODP_QUEUE_INVALID) + return ODP_QUEUE_INVALID; + + if (type == ODP_QUEUE_TYPE_SCHED) { + if (_odp_sched_fn->create_queue(queue->index, + &queue->param.sched)) { + queue->status = QUEUE_STATUS_FREE; + _ODP_ERR("schedule queue init failed\n"); + return ODP_QUEUE_INVALID; + } + } + + return handle; +} + +static int queue_create_multi(const char *name[], const odp_queue_param_t param[], + odp_bool_t share_param, odp_queue_t queue[], int num) +{ + int i; + + _ODP_ASSERT(param != NULL); + _ODP_ASSERT(queue != NULL); + _ODP_ASSERT(num > 0); + + for (i = 0; i < num; i++) { + odp_queue_t cur_queue; + const char *cur_name = name != NULL ? name[i] : NULL; + const odp_queue_param_t *cur_param = share_param ? ¶m[0] : ¶m[i]; + + cur_queue = queue_create(cur_name, cur_param); + if (cur_queue == ODP_QUEUE_INVALID) + return (i == 0) ? -1 : i; + + queue[i] = cur_queue; + } + return i; +} + +void _odp_sched_queue_set_status(uint32_t queue_index, int status) +{ + queue_entry_t *queue = qentry_from_index(queue_index); + + LOCK(queue); + + queue->status = status; + + UNLOCK(queue); +} + +static int queue_destroy(odp_queue_t handle) +{ + int empty; + queue_entry_t *queue; + + queue = qentry_from_handle(handle); + + if (handle == ODP_QUEUE_INVALID) + return -1; + + LOCK(queue); + if (queue->status == QUEUE_STATUS_FREE) { + UNLOCK(queue); + _ODP_ERR("queue \"%s\" already free\n", queue->name); + return -1; + } + if (queue->status == QUEUE_STATUS_DESTROYED) { + UNLOCK(queue); + _ODP_ERR("queue \"%s\" already destroyed\n", queue->name); + return -1; + } + + if (queue->spsc) + empty = ring_spsc_is_empty(&queue->ring_spsc); + else if (queue->type == ODP_QUEUE_TYPE_SCHED) + empty = ring_st_is_empty(&queue->ring_st); + else + empty = ring_mpmc_u32_is_empty(&queue->ring_mpmc); + + if (!empty) { + UNLOCK(queue); + _ODP_ERR("queue \"%s\" not empty\n", queue->name); + return -1; + } + + switch (queue->status) { + case QUEUE_STATUS_READY: + queue->status = QUEUE_STATUS_FREE; + break; + case QUEUE_STATUS_NOTSCHED: + queue->status = QUEUE_STATUS_FREE; + _odp_sched_fn->destroy_queue(queue->index); + break; + case QUEUE_STATUS_SCHED: + /* Queue is still in scheduling */ + queue->status = QUEUE_STATUS_DESTROYED; + break; + default: + _ODP_ABORT("Unexpected queue status\n"); + } + + if (queue->queue_lf) + _odp_queue_lf_destroy(queue->queue_lf); + + UNLOCK(queue); + + return 0; +} + +static int queue_destroy_multi(odp_queue_t handle[], int num) +{ + int i; + + _ODP_ASSERT(handle != NULL); + _ODP_ASSERT(num > 0); + + for (i = 0; i < num; i++) { + int ret = queue_destroy(handle[i]); + + if (ret) + return (i == 0) ? ret : i; + } + + return i; +} + +static int queue_context_set(odp_queue_t handle, void *context, + uint32_t len ODP_UNUSED) +{ + odp_mb_full(); + qentry_from_handle(handle)->param.context = context; + odp_mb_full(); + return 0; +} + +static odp_queue_t queue_lookup(const char *name) +{ + uint32_t i; + + for (i = 0; i < CONFIG_MAX_QUEUES; i++) { + queue_entry_t *queue = qentry_from_index(i); + + if (queue->status == QUEUE_STATUS_FREE || + queue->status == QUEUE_STATUS_DESTROYED) + continue; + + LOCK(queue); + if (strcmp(name, queue->name) == 0) { + /* found it */ + UNLOCK(queue); + return queue->handle; + } + UNLOCK(queue); + } + + return ODP_QUEUE_INVALID; +} + +static inline void event_index_from_hdr(uint32_t event_index[], + _odp_event_hdr_t *event_hdr[], int num) +{ + int i; + + for (i = 0; i < num; i++) + event_index[i] = event_hdr[i]->index.u32; +} + +static inline void event_index_to_hdr(_odp_event_hdr_t *event_hdr[], + uint32_t event_index[], int num) +{ + int i; + + for (i = 0; i < num; i++) { + event_hdr[i] = _odp_event_hdr_from_index_u32(event_index[i]); + odp_prefetch(event_hdr[i]); + } +} + +static inline int _plain_queue_enq_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + queue_entry_t *queue; + int ret, num_enq; + ring_mpmc_u32_t *ring_mpmc; + uint32_t event_idx[num]; + + queue = qentry_from_handle(handle); + ring_mpmc = &queue->ring_mpmc; + + if (_odp_sched_fn->ord_enq_multi(handle, (void **)event_hdr, num, &ret)) + return ret; + + event_index_from_hdr(event_idx, event_hdr, num); + + num_enq = ring_mpmc_u32_enq_multi(ring_mpmc, queue->ring_data, + queue->ring_mask, event_idx, num); + + return num_enq; +} + +static inline int _plain_queue_deq_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + int num_deq; + queue_entry_t *queue; + ring_mpmc_u32_t *ring_mpmc; + uint32_t event_idx[num]; + + queue = qentry_from_handle(handle); + ring_mpmc = &queue->ring_mpmc; + + num_deq = ring_mpmc_u32_deq_multi(ring_mpmc, queue->ring_data, + queue->ring_mask, event_idx, num); + + if (num_deq == 0) + return 0; + + event_index_to_hdr(event_hdr, event_idx, num_deq); + + return num_deq; +} + +static int plain_queue_enq_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + return _plain_queue_enq_multi(handle, event_hdr, num); +} + +static int plain_queue_enq(odp_queue_t handle, _odp_event_hdr_t *event_hdr) +{ + int ret; + + ret = _plain_queue_enq_multi(handle, &event_hdr, 1); + + if (ret == 1) + return 0; + else + return -1; +} + +static int plain_queue_deq_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + return _plain_queue_deq_multi(handle, event_hdr, num); +} + +static _odp_event_hdr_t *plain_queue_deq(odp_queue_t handle) +{ + _odp_event_hdr_t *event_hdr = NULL; + int ret; + + ret = _plain_queue_deq_multi(handle, &event_hdr, 1); + + if (ret == 1) + return event_hdr; + else + return NULL; +} + +static int error_enqueue(odp_queue_t handle, _odp_event_hdr_t *event_hdr) +{ + (void)event_hdr; + + _ODP_ERR("Enqueue not supported (0x%" PRIx64 ")\n", odp_queue_to_u64(handle)); + + return -1; +} + +static int error_enqueue_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + (void)event_hdr; + (void)num; + + _ODP_ERR("Enqueue multi not supported (0x%" PRIx64 ")\n", odp_queue_to_u64(handle)); + + return -1; +} + +static _odp_event_hdr_t *error_dequeue(odp_queue_t handle) +{ + _ODP_ERR("Dequeue not supported (0x%" PRIx64 ")\n", odp_queue_to_u64(handle)); + + return NULL; +} + +static int error_dequeue_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + (void)event_hdr; + (void)num; + + _ODP_ERR("Dequeue multi not supported (0x%" PRIx64 ")\n", odp_queue_to_u64(handle)); + + return -1; +} + +static void queue_param_init(odp_queue_param_t *params) +{ + memset(params, 0, sizeof(odp_queue_param_t)); + params->type = ODP_QUEUE_TYPE_PLAIN; + params->enq_mode = ODP_QUEUE_OP_MT; + params->deq_mode = ODP_QUEUE_OP_MT; + params->nonblocking = ODP_BLOCKING; + params->order = ODP_QUEUE_ORDER_KEEP; + params->sched.prio = odp_schedule_default_prio(); + params->sched.sync = ODP_SCHED_SYNC_PARALLEL; + params->sched.group = ODP_SCHED_GROUP_ALL; +} + +static int queue_info(odp_queue_t handle, odp_queue_info_t *info) +{ + uint32_t queue_id; + queue_entry_t *queue; + int status; + + if (odp_unlikely(info == NULL)) { + _ODP_ERR("Unable to store info, NULL ptr given\n"); + return -1; + } + + queue_id = queue_to_index(handle); + + if (odp_unlikely(queue_id >= CONFIG_MAX_QUEUES)) { + _ODP_ERR("Invalid queue handle: 0x%" PRIx64 "\n", odp_queue_to_u64(handle)); + return -1; + } + + queue = qentry_from_index(queue_id); + + LOCK(queue); + status = queue->status; + + if (odp_unlikely(status == QUEUE_STATUS_FREE || + status == QUEUE_STATUS_DESTROYED)) { + UNLOCK(queue); + _ODP_ERR("Invalid queue status:%d\n", status); + return -1; + } + + info->name = queue->name; + info->param = queue->param; + + UNLOCK(queue); + + return 0; +} + +static void queue_print(odp_queue_t handle) +{ + odp_pktio_info_t pktio_info; + queue_entry_t *queue; + uint32_t queue_id; + int status, prio; + int max_prio = odp_schedule_max_prio(); + + queue_id = queue_to_index(handle); + + if (odp_unlikely(queue_id >= CONFIG_MAX_QUEUES)) { + _ODP_ERR("Invalid queue handle: 0x%" PRIx64 "\n", odp_queue_to_u64(handle)); + return; + } + + queue = qentry_from_index(queue_id); + + LOCK(queue); + status = queue->status; + + if (odp_unlikely(status == QUEUE_STATUS_FREE || + status == QUEUE_STATUS_DESTROYED)) { + UNLOCK(queue); + _ODP_ERR("Invalid queue status:%d\n", status); + return; + } + _ODP_PRINT("\nQueue info\n"); + _ODP_PRINT("----------\n"); + _ODP_PRINT(" handle %p\n", (void *)queue->handle); + _ODP_PRINT(" index %" PRIu32 "\n", queue_id); + _ODP_PRINT(" name %s\n", queue->name); + _ODP_PRINT(" enq mode %s\n", + queue->param.enq_mode == ODP_QUEUE_OP_MT ? "ODP_QUEUE_OP_MT" : + (queue->param.enq_mode == ODP_QUEUE_OP_MT_UNSAFE ? "ODP_QUEUE_OP_MT_UNSAFE" : + (queue->param.enq_mode == ODP_QUEUE_OP_DISABLED ? "ODP_QUEUE_OP_DISABLED" : + "unknown"))); + _ODP_PRINT(" deq mode %s\n", + queue->param.deq_mode == ODP_QUEUE_OP_MT ? "ODP_QUEUE_OP_MT" : + (queue->param.deq_mode == ODP_QUEUE_OP_MT_UNSAFE ? "ODP_QUEUE_OP_MT_UNSAFE" : + (queue->param.deq_mode == ODP_QUEUE_OP_DISABLED ? "ODP_QUEUE_OP_DISABLED" : + "unknown"))); + _ODP_PRINT(" non-blocking %s\n", + queue->param.nonblocking == ODP_BLOCKING ? "ODP_BLOCKING" : + (queue->param.nonblocking == ODP_NONBLOCKING_LF ? "ODP_NONBLOCKING_LF" : + (queue->param.nonblocking == ODP_NONBLOCKING_WF ? "ODP_NONBLOCKING_WF" : + "unknown"))); + _ODP_PRINT(" type %s\n", + queue->type == ODP_QUEUE_TYPE_PLAIN ? "ODP_QUEUE_TYPE_PLAIN" : + (queue->type == ODP_QUEUE_TYPE_SCHED ? "ODP_QUEUE_TYPE_SCHED" : "unknown")); + if (queue->type == ODP_QUEUE_TYPE_SCHED) { + _ODP_PRINT(" sync %s\n", + queue->param.sched.sync == ODP_SCHED_SYNC_PARALLEL ? + "ODP_SCHED_SYNC_PARALLEL" : + (queue->param.sched.sync == ODP_SCHED_SYNC_ATOMIC ? + "ODP_SCHED_SYNC_ATOMIC" : + (queue->param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + "ODP_SCHED_SYNC_ORDERED" : "unknown"))); + prio = queue->param.sched.prio; + _ODP_PRINT(" priority %i (%i in API)\n", max_prio - prio, prio); + _ODP_PRINT(" group %i\n", queue->param.sched.group); + if (_odp_sched_id == _ODP_SCHED_ID_BASIC) + _ODP_PRINT(" spread %i\n", _odp_sched_basic_get_spread(queue_id)); + } + if (queue->pktin.pktio != ODP_PKTIO_INVALID) { + if (!odp_pktio_info(queue->pktin.pktio, &pktio_info)) + _ODP_PRINT(" pktin %s\n", pktio_info.name); + } + if (queue->pktout.pktio != ODP_PKTIO_INVALID) { + if (!odp_pktio_info(queue->pktout.pktio, &pktio_info)) + _ODP_PRINT(" pktout %s\n", pktio_info.name); + } + _ODP_PRINT(" timers %" PRIu64 "\n", odp_atomic_load_u64(&queue->num_timers)); + _ODP_PRINT(" status %s\n", + queue->status == QUEUE_STATUS_READY ? "ready" : + (queue->status == QUEUE_STATUS_NOTSCHED ? "not scheduled" : + (queue->status == QUEUE_STATUS_SCHED ? "scheduled" : "unknown"))); + _ODP_PRINT(" param.size %" PRIu32 "\n", queue->param.size); + if (queue->queue_lf) { + _ODP_PRINT(" implementation queue_lf\n"); + _ODP_PRINT(" length %" PRIu32 "/%" PRIu32 "\n", + _odp_queue_lf_length(queue->queue_lf), _odp_queue_lf_max_length()); + } else if (queue->spsc) { + _ODP_PRINT(" implementation ring_spsc\n"); + _ODP_PRINT(" length %" PRIu32 "/%" PRIu32 "\n", + ring_spsc_length(&queue->ring_spsc), queue->ring_mask + 1); + } else if (queue->type == ODP_QUEUE_TYPE_SCHED) { + _ODP_PRINT(" implementation ring_st\n"); + _ODP_PRINT(" length %" PRIu32 "/%" PRIu32 "\n", + ring_st_length(&queue->ring_st), queue->ring_mask + 1); + } else { + _ODP_PRINT(" implementation ring_mpmc\n"); + _ODP_PRINT(" length %" PRIu32 "/%" PRIu32 "\n", + ring_mpmc_u32_len(&queue->ring_mpmc), queue->ring_mask + 1); + } + _ODP_PRINT("\n"); + + UNLOCK(queue); +} + +static void queue_print_all(void) +{ + uint32_t i, index, len, max_len; + const char *name; + int status; + odp_queue_type_t type; + odp_nonblocking_t blocking; + odp_queue_op_mode_t enq_mode; + odp_queue_op_mode_t deq_mode; + odp_queue_order_t order; + const char *status_str; + const char *bl_str; + char type_c, enq_c, deq_c, order_c, sync_c; + const int col_width = 24; + int prio = 0; + int spr = 0; + odp_schedule_sync_t sync = ODP_SCHED_SYNC_PARALLEL; + odp_schedule_group_t grp = ODP_SCHED_GROUP_INVALID; + + _ODP_PRINT("\nList of all queues\n"); + _ODP_PRINT("------------------\n"); + _ODP_PRINT(" idx %-*s type stat blk enq deq ord len max_len sync prio grp", col_width, "name"); + if (_odp_sched_id == _ODP_SCHED_ID_BASIC) + _ODP_PRINT(" spr\n"); + else + _ODP_PRINT("\n"); + + for (i = 0; i < CONFIG_MAX_QUEUES; i++) { + queue_entry_t *queue = qentry_from_index(i); + + if (queue->status < QUEUE_STATUS_READY) + continue; + + LOCK(queue); + + status = queue->status; + index = queue->index; + name = queue->name; + type = queue->type; + blocking = queue->param.nonblocking; + enq_mode = queue->param.enq_mode; + deq_mode = queue->param.deq_mode; + order = queue->param.order; + + if (queue->queue_lf) { + len = _odp_queue_lf_length(queue->queue_lf); + max_len = _odp_queue_lf_max_length(); + } else if (queue->spsc) { + len = ring_spsc_length(&queue->ring_spsc); + max_len = queue->ring_mask + 1; + } else if (type == ODP_QUEUE_TYPE_SCHED) { + len = ring_st_length(&queue->ring_st); + max_len = queue->ring_mask + 1; + prio = queue->param.sched.prio; + grp = queue->param.sched.group; + sync = queue->param.sched.sync; + if (_odp_sched_id == _ODP_SCHED_ID_BASIC) + spr = _odp_sched_basic_get_spread(index); + } else { + len = ring_mpmc_u32_len(&queue->ring_mpmc); + max_len = queue->ring_mask + 1; + } + + UNLOCK(queue); + + if (status < QUEUE_STATUS_READY) + continue; + + status_str = (status == QUEUE_STATUS_READY) ? "R" : + ((status == QUEUE_STATUS_SCHED) ? "S" : "NS"); + + type_c = (type == ODP_QUEUE_TYPE_PLAIN) ? 'P' : 'S'; + + bl_str = (blocking == ODP_BLOCKING) ? "B" : + ((blocking == ODP_NONBLOCKING_LF) ? "LF" : "WF"); + + enq_c = (enq_mode == ODP_QUEUE_OP_MT) ? 'S' : + ((enq_mode == ODP_QUEUE_OP_MT_UNSAFE) ? 'U' : 'D'); + + deq_c = (deq_mode == ODP_QUEUE_OP_MT) ? 'S' : + ((deq_mode == ODP_QUEUE_OP_MT_UNSAFE) ? 'U' : 'D'); + + order_c = (order == ODP_QUEUE_ORDER_KEEP) ? 'K' : 'I'; + + _ODP_PRINT("%4u %-*s %c %2s %2s", index, col_width, name, type_c, + status_str, bl_str); + _ODP_PRINT(" %c %c %c %6u %6u", enq_c, deq_c, order_c, len, max_len); + + if (type == ODP_QUEUE_TYPE_SCHED) { + sync_c = (sync == ODP_SCHED_SYNC_PARALLEL) ? 'P' : + ((sync == ODP_SCHED_SYNC_ATOMIC) ? 'A' : 'O'); + /* Print prio level matching odp_schedule_print() output */ + prio = odp_schedule_max_prio() - prio; + + _ODP_PRINT(" %c %4i %3i", sync_c, prio, grp); + + if (_odp_sched_id == _ODP_SCHED_ID_BASIC) + _ODP_PRINT(" %3i", spr); + } + + _ODP_PRINT("\n"); + } + + _ODP_PRINT("\n"); +} + +static inline int _sched_queue_enq_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + int sched = 0; + int ret; + queue_entry_t *queue; + int num_enq; + ring_st_t *ring_st; + uint32_t event_idx[num]; + + queue = qentry_from_handle(handle); + ring_st = &queue->ring_st; + + if (_odp_sched_fn->ord_enq_multi(handle, (void **)event_hdr, num, &ret)) + return ret; + + event_index_from_hdr(event_idx, event_hdr, num); + + LOCK(queue); + + num_enq = ring_st_enq_multi(ring_st, queue->ring_data, + queue->ring_mask, event_idx, num); + + if (odp_unlikely(num_enq == 0)) { + UNLOCK(queue); + return 0; + } + + if (queue->status == QUEUE_STATUS_NOTSCHED) { + queue->status = QUEUE_STATUS_SCHED; + sched = 1; + } + + UNLOCK(queue); + + /* Add queue to scheduling */ + if (sched && _odp_sched_fn->sched_queue(queue->index)) + _ODP_ABORT("schedule_queue failed\n"); + + return num_enq; +} + +int _odp_sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num, + int update_status) +{ + int num_deq, status; + ring_st_t *ring_st; + queue_entry_t *queue = qentry_from_index(queue_index); + uint32_t event_idx[max_num]; + + ring_st = &queue->ring_st; + + LOCK(queue); + + status = queue->status; + + if (odp_unlikely(status < QUEUE_STATUS_READY)) { + /* Bad queue, or queue has been destroyed. + * Inform scheduler about a destroyed queue. */ + if (queue->status == QUEUE_STATUS_DESTROYED) { + queue->status = QUEUE_STATUS_FREE; + _odp_sched_fn->destroy_queue(queue_index); + } + + UNLOCK(queue); + return -1; + } + + num_deq = ring_st_deq_multi(ring_st, queue->ring_data, + queue->ring_mask, event_idx, max_num); + + if (num_deq == 0) { + /* Already empty queue */ + if (update_status && status == QUEUE_STATUS_SCHED) + queue->status = QUEUE_STATUS_NOTSCHED; + + UNLOCK(queue); + + return 0; + } + + UNLOCK(queue); + + event_index_to_hdr((_odp_event_hdr_t **)ev, event_idx, num_deq); + + return num_deq; +} + +static int sched_queue_enq_multi(odp_queue_t handle, + _odp_event_hdr_t *event_hdr[], int num) +{ + return _sched_queue_enq_multi(handle, event_hdr, num); +} + +static int sched_queue_enq(odp_queue_t handle, _odp_event_hdr_t *event_hdr) +{ + int ret; + + ret = _sched_queue_enq_multi(handle, &event_hdr, 1); + + if (ret == 1) + return 0; + else + return -1; +} + +int _odp_sched_queue_empty(uint32_t queue_index) +{ + queue_entry_t *queue = qentry_from_index(queue_index); + int ret = 0; + + LOCK(queue); + + if (odp_unlikely(queue->status < QUEUE_STATUS_READY)) { + /* Bad queue, or queue has been destroyed. */ + UNLOCK(queue); + return -1; + } + + if (ring_st_is_empty(&queue->ring_st)) { + /* Already empty queue. Update status. */ + if (queue->status == QUEUE_STATUS_SCHED) + queue->status = QUEUE_STATUS_NOTSCHED; + + ret = 1; + } + + UNLOCK(queue); + + return ret; +} + +static int queue_init(queue_entry_t *queue, const char *name, + const odp_queue_param_t *param) +{ + uint64_t offset; + uint32_t queue_size; + odp_queue_type_t queue_type; + int spsc; + + queue_type = param->type; + + if (name == NULL) + queue->name[0] = 0; + else + _odp_strcpy(queue->name, name, ODP_QUEUE_NAME_LEN); + memcpy(&queue->param, param, sizeof(odp_queue_param_t)); + if (queue->param.sched.lock_count > _odp_sched_fn->max_ordered_locks()) + return -1; + + if (queue_type == ODP_QUEUE_TYPE_SCHED) + queue->param.deq_mode = ODP_QUEUE_OP_DISABLED; + + queue->type = queue_type; + odp_atomic_init_u64(&queue->num_timers, 0); + + queue->pktin = PKTIN_INVALID; + queue->pktout = PKTOUT_INVALID; + + queue_size = param->size; + if (queue_size == 0) + queue_size = _odp_queue_glb->config.default_queue_size; + + if (queue_size < MIN_QUEUE_SIZE) + queue_size = MIN_QUEUE_SIZE; + + /* Round up if not already a power of two */ + queue_size = _ODP_ROUNDUP_POWER2_U32(queue_size); + + if (queue_size > _odp_queue_glb->config.max_queue_size) { + _ODP_ERR("Too large queue size %u\n", queue_size); + return -1; + } + + offset = queue->index * (uint64_t)_odp_queue_glb->config.max_queue_size; + + /* Single-producer / single-consumer plain queue has simple and + * lock-free implementation */ + spsc = (queue_type == ODP_QUEUE_TYPE_PLAIN) && + (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) && + (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE); + + queue->spsc = spsc; + queue->queue_lf = NULL; + + /* Default to error functions */ + queue->enqueue = error_enqueue; + queue->enqueue_multi = error_enqueue_multi; + queue->dequeue = error_dequeue; + queue->dequeue_multi = error_dequeue_multi; + queue->orig_dequeue_multi = error_dequeue_multi; + + if (spsc) { + _odp_queue_spsc_init(queue, queue_size); + } else { + if (queue_type == ODP_QUEUE_TYPE_PLAIN) { + queue->enqueue = plain_queue_enq; + queue->enqueue_multi = plain_queue_enq_multi; + queue->dequeue = plain_queue_deq; + queue->dequeue_multi = plain_queue_deq_multi; + queue->orig_dequeue_multi = plain_queue_deq_multi; + + queue->ring_data = &_odp_queue_glb->ring_data[offset]; + queue->ring_mask = queue_size - 1; + ring_mpmc_u32_init(&queue->ring_mpmc); + + } else { + queue->enqueue = sched_queue_enq; + queue->enqueue_multi = sched_queue_enq_multi; + + queue->ring_data = &_odp_queue_glb->ring_data[offset]; + queue->ring_mask = queue_size - 1; + ring_st_init(&queue->ring_st); + } + } + + return 0; +} + +static uint64_t queue_to_u64(odp_queue_t hdl) +{ + return _odp_pri(hdl); +} + +static odp_pktout_queue_t queue_get_pktout(odp_queue_t handle) +{ + queue_entry_t *qentry = qentry_from_handle(handle); + + return qentry->pktout; +} + +static void queue_set_pktout(odp_queue_t handle, odp_pktio_t pktio, int index) +{ + queue_entry_t *qentry = qentry_from_handle(handle); + + qentry->pktout.pktio = pktio; + qentry->pktout.index = index; +} + +static odp_pktin_queue_t queue_get_pktin(odp_queue_t handle) +{ + queue_entry_t *qentry = qentry_from_handle(handle); + + return qentry->pktin; +} + +static void queue_set_pktin(odp_queue_t handle, odp_pktio_t pktio, int index) +{ + queue_entry_t *qentry = qentry_from_handle(handle); + + qentry->pktin.pktio = pktio; + qentry->pktin.index = index; +} + +static void queue_set_enq_deq_func(odp_queue_t handle, + queue_enq_fn_t enq, + queue_enq_multi_fn_t enq_multi, + queue_deq_fn_t deq, + queue_deq_multi_fn_t deq_multi) +{ + queue_entry_t *qentry = qentry_from_handle(handle); + + if (enq) + qentry->enqueue = enq; + + if (enq_multi) + qentry->enqueue_multi = enq_multi; + + if (deq) + qentry->dequeue = deq; + + if (deq_multi) + qentry->dequeue_multi = deq_multi; +} + +static int queue_orig_multi(odp_queue_t handle, + _odp_event_hdr_t **event_hdr, int num) +{ + queue_entry_t *queue = qentry_from_handle(handle); + + return queue->orig_dequeue_multi(handle, event_hdr, num); +} + +static int queue_api_enq_multi(odp_queue_t handle, + const odp_event_t ev[], int num) +{ + queue_entry_t *queue = qentry_from_handle(handle); + + if (odp_unlikely(num == 0)) + return 0; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + return queue->enqueue_multi(handle, + (_odp_event_hdr_t **)(uintptr_t)ev, num); +} + +static void queue_timer_add(odp_queue_t handle) +{ + queue_entry_t *queue = qentry_from_handle(handle); + + odp_atomic_inc_u64(&queue->num_timers); +} + +static void queue_timer_rem(odp_queue_t handle) +{ + queue_entry_t *queue = qentry_from_handle(handle); + + odp_atomic_dec_u64(&queue->num_timers); +} + +static int queue_api_enq(odp_queue_t handle, odp_event_t ev) +{ + queue_entry_t *queue = qentry_from_handle(handle); + + return queue->enqueue(handle, + (_odp_event_hdr_t *)(uintptr_t)ev); +} + +static int queue_api_deq_multi(odp_queue_t handle, odp_event_t ev[], int num) +{ + queue_entry_t *queue = qentry_from_handle(handle); + int ret; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + ret = queue->dequeue_multi(handle, (_odp_event_hdr_t **)ev, num); + + if (odp_global_rw->inline_timers && + odp_atomic_load_u64(&queue->num_timers)) + timer_run(ret ? 2 : 1); + + return ret; +} + +static odp_event_t queue_api_deq(odp_queue_t handle) +{ + queue_entry_t *queue = qentry_from_handle(handle); + odp_event_t ev = (odp_event_t)queue->dequeue(handle); + + if (odp_global_rw->inline_timers && + odp_atomic_load_u64(&queue->num_timers)) + timer_run(ev != ODP_EVENT_INVALID ? 2 : 1); + + return ev; +} + +/* API functions */ +_odp_queue_api_fn_t _odp_queue_basic_api = { + .queue_create = queue_create, + .queue_create_multi = queue_create_multi, + .queue_destroy = queue_destroy, + .queue_destroy_multi = queue_destroy_multi, + .queue_lookup = queue_lookup, + .queue_capability = queue_capability, + .queue_context_set = queue_context_set, + .queue_enq = queue_api_enq, + .queue_enq_multi = queue_api_enq_multi, + .queue_deq = queue_api_deq, + .queue_deq_multi = queue_api_deq_multi, + .queue_type = queue_type, + .queue_sched_type = queue_sched_type, + .queue_sched_prio = queue_sched_prio, + .queue_sched_group = queue_sched_group, + .queue_lock_count = queue_lock_count, + .queue_to_u64 = queue_to_u64, + .queue_param_init = queue_param_init, + .queue_info = queue_info, + .queue_print = queue_print, + .queue_print_all = queue_print_all + +}; + +/* Functions towards internal components */ +queue_fn_t _odp_queue_basic_fn = { + .init_global = queue_init_global, + .term_global = queue_term_global, + .init_local = queue_init_local, + .term_local = queue_term_local, + .get_pktout = queue_get_pktout, + .set_pktout = queue_set_pktout, + .get_pktin = queue_get_pktin, + .set_pktin = queue_set_pktin, + .set_enq_deq_fn = queue_set_enq_deq_func, + .orig_deq_multi = queue_orig_multi, + .timer_add = queue_timer_add, + .timer_rem = queue_timer_rem +}; |