diff options
author | Matias Elo <matias.elo@nokia.com> | 2018-07-11 15:30:07 +0300 |
---|---|---|
committer | Matias Elo <matias.elo@nokia.com> | 2018-07-13 10:30:04 +0300 |
commit | 66fd19ddff3e3bfcfe6369714297f3fea8417318 (patch) | |
tree | c369f591cb8c9f46625de8724264d6e9bf08efe5 | |
parent | 32b033ccb0824ce20989bf312b40c9533ce7c664 (diff) |
Port eb021ca3 "linux-gen: queue_spsc: single-producer, single-consumer queue"
Includes also:
3c335400 "linux-gen: queue: fix queue empty check"
Signed-off-by: Matias Elo <matias.elo@nokia.com>
-rw-r--r-- | platform/linux-dpdk/Makefile.am | 1 | ||||
-rw-r--r-- | platform/linux-dpdk/include/odp_ring_spsc_internal.h | 4 | ||||
-rw-r--r-- | platform/linux-dpdk/odp_queue_basic.c | 49 | ||||
-rw-r--r-- | platform/linux-dpdk/odp_queue_spsc.c | 93 |
4 files changed, 133 insertions, 14 deletions
diff --git a/platform/linux-dpdk/Makefile.am b/platform/linux-dpdk/Makefile.am index 5223252ac..62134db9c 100644 --- a/platform/linux-dpdk/Makefile.am +++ b/platform/linux-dpdk/Makefile.am @@ -168,6 +168,7 @@ __LIB__libodp_linux_la_SOURCES = \ odp_queue_basic.c \ odp_queue_if.c \ ../linux-generic/odp_queue_lf.c \ + odp_queue_spsc.c \ ../linux-generic/odp_rwlock.c \ ../linux-generic/odp_rwlock_recursive.c \ ../linux-generic/odp_schedule_basic.c \ diff --git a/platform/linux-dpdk/include/odp_ring_spsc_internal.h b/platform/linux-dpdk/include/odp_ring_spsc_internal.h index e54f30cbb..99dfebfa8 100644 --- a/platform/linux-dpdk/include/odp_ring_spsc_internal.h +++ b/platform/linux-dpdk/include/odp_ring_spsc_internal.h @@ -52,8 +52,8 @@ static inline ring_spsc_t ring_spsc_create(const char *name, uint32_t size) /* Ring name must be unique */ ring_spsc_name_to_mz_name(name, ring_name); - rte_ring = rte_ring_create(ring_name, size, rte_socket_id(), - RING_F_SP_ENQ | RING_F_SC_DEQ); + rte_ring = rte_ring_create(ring_name, size, rte_socket_id(), + RING_F_SP_ENQ | RING_F_SC_DEQ); if (rte_ring == NULL) { ODP_ERR("Creating DPDK ring failed: %s\n", rte_strerror(rte_errno)); diff --git a/platform/linux-dpdk/odp_queue_basic.c b/platform/linux-dpdk/odp_queue_basic.c index 68249ee1d..1d20d5af5 100644 --- a/platform/linux-dpdk/odp_queue_basic.c +++ b/platform/linux-dpdk/odp_queue_basic.c @@ -292,7 +292,8 @@ static odp_queue_t queue_create(const char *name, return ODP_QUEUE_INVALID; } - if (param->nonblocking == ODP_NONBLOCKING_LF) { + if (!queue->s.spsc && + param->nonblocking == ODP_NONBLOCKING_LF) { queue_lf_func_t *lf_func; lf_func = &queue_glb->queue_lf_func; @@ -366,6 +367,7 @@ void sched_cb_queue_set_status(uint32_t queue_index, int status) static int queue_destroy(odp_queue_t handle) { + int empty; queue_entry_t *queue; queue = qentry_from_handle(handle); @@ -383,12 +385,21 @@ static int queue_destroy(odp_queue_t handle) ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name); return -1; } - if (ring_st_is_empty(queue->s.ring_st) == 0) { + + if (queue->s.spsc) + empty = ring_spsc_is_empty(queue->s.ring_spsc); + else + empty = ring_st_is_empty(queue->s.ring_st); + + if (!empty) { UNLOCK(queue); ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; } - ring_st_free(queue->s.ring_st); + if (queue->s.spsc) + ring_spsc_free(queue->s.ring_st); + else + ring_st_free(queue->s.ring_st); switch (queue->s.status) { case QUEUE_STATUS_READY: @@ -406,7 +417,7 @@ static int queue_destroy(odp_queue_t handle) ODP_ABORT("Unexpected queue status\n"); } - if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF) + if (queue->s.queue_lf) queue_lf_destroy(queue->s.queue_lf); UNLOCK(queue); @@ -639,6 +650,7 @@ static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param) { uint32_t queue_size; + int spsc; if (name == NULL) { queue->s.name[0] = 0; @@ -655,11 +667,6 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.type = queue->s.param.type; - queue->s.enqueue = queue_int_enq; - queue->s.dequeue = queue_int_deq; - queue->s.enqueue_multi = queue_int_enq_multi; - queue->s.dequeue_multi = queue_int_deq_multi; - queue->s.pktin = PKTIN_INVALID; queue->s.pktout = PKTOUT_INVALID; @@ -677,9 +684,27 @@ static int queue_init(queue_entry_t *queue, const char *name, return -1; } - queue->s.ring_st = ring_st_create(queue->s.name, queue_size); - if (queue->s.ring_st == NULL) - return -1; + /* Single-producer / single-consumer plain queue has simple and + * lock-free implementation */ + spsc = (param->type == ODP_QUEUE_TYPE_PLAIN) && + (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) && + (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE); + + queue->s.spsc = spsc; + queue->s.queue_lf = NULL; + + if (spsc) { + queue_spsc_init(queue, queue_size); + } else { + queue->s.enqueue = queue_int_enq; + queue->s.dequeue = queue_int_deq; + queue->s.enqueue_multi = queue_int_enq_multi; + queue->s.dequeue_multi = queue_int_deq_multi; + + queue->s.ring_st = ring_st_create(queue->s.name, queue_size); + if (queue->s.ring_st == NULL) + return -1; + } return 0; } diff --git a/platform/linux-dpdk/odp_queue_spsc.c b/platform/linux-dpdk/odp_queue_spsc.c new file mode 100644 index 000000000..9ee6f5b03 --- /dev/null +++ b/platform/linux-dpdk/odp_queue_spsc.c @@ -0,0 +1,93 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ +#include <odp/api/hints.h> +#include <odp_queue_internal.h> + +#include "config.h" +#include <odp_debug_internal.h> + +static inline int spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + queue_entry_t *queue; + ring_spsc_t ring_spsc; + + queue = q_int; + ring_spsc = queue->s.ring_spsc; + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + ODP_ERR("Bad queue status\n"); + return -1; + } + + return ring_spsc_enq_multi(ring_spsc, (void **)buf_hdr, num); +} + +static inline int spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + queue_entry_t *queue; + ring_spsc_t ring_spsc; + + queue = q_int; + ring_spsc = queue->s.ring_spsc; + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + /* Bad queue, or queue has been destroyed. */ + return -1; + } + + return ring_spsc_deq_multi(ring_spsc, (void **)buf_hdr, num); +} + +static int queue_spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + return spsc_enq_multi(q_int, buf_hdr, num); +} + +static int queue_spsc_enq(void *q_int, odp_buffer_hdr_t *buf_hdr) +{ + int ret; + + ret = spsc_enq_multi(q_int, &buf_hdr, 1); + + if (ret == 1) + return 0; + else + return -1; +} + +static int queue_spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + return spsc_deq_multi(q_int, buf_hdr, num); +} + +static odp_buffer_hdr_t *queue_spsc_deq(void *q_int) +{ + odp_buffer_hdr_t *buf_hdr = NULL; + int ret; + + ret = spsc_deq_multi(q_int, &buf_hdr, 1); + + if (ret == 1) + return buf_hdr; + else + return NULL; +} + +void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size) +{ + queue->s.enqueue = queue_spsc_enq; + queue->s.dequeue = queue_spsc_deq; + queue->s.enqueue_multi = queue_spsc_enq_multi; + queue->s.dequeue_multi = queue_spsc_deq_multi; + + queue->s.ring_spsc = ring_spsc_create(queue->s.name, queue_size); + if (queue->s.ring_spsc == NULL) + ODP_ABORT("Creating SPSC ring failed\n"); +} |