aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatias Elo <matias.elo@nokia.com>2018-07-11 15:30:07 +0300
committerMatias Elo <matias.elo@nokia.com>2018-07-13 10:30:04 +0300
commit66fd19ddff3e3bfcfe6369714297f3fea8417318 (patch)
treec369f591cb8c9f46625de8724264d6e9bf08efe5
parent32b033ccb0824ce20989bf312b40c9533ce7c664 (diff)
Port eb021ca3 "linux-gen: queue_spsc: single-producer, single-consumer queue"
Includes also: 3c335400 "linux-gen: queue: fix queue empty check" Signed-off-by: Matias Elo <matias.elo@nokia.com>
-rw-r--r--platform/linux-dpdk/Makefile.am1
-rw-r--r--platform/linux-dpdk/include/odp_ring_spsc_internal.h4
-rw-r--r--platform/linux-dpdk/odp_queue_basic.c49
-rw-r--r--platform/linux-dpdk/odp_queue_spsc.c93
4 files changed, 133 insertions, 14 deletions
diff --git a/platform/linux-dpdk/Makefile.am b/platform/linux-dpdk/Makefile.am
index 5223252ac..62134db9c 100644
--- a/platform/linux-dpdk/Makefile.am
+++ b/platform/linux-dpdk/Makefile.am
@@ -168,6 +168,7 @@ __LIB__libodp_linux_la_SOURCES = \
odp_queue_basic.c \
odp_queue_if.c \
../linux-generic/odp_queue_lf.c \
+ odp_queue_spsc.c \
../linux-generic/odp_rwlock.c \
../linux-generic/odp_rwlock_recursive.c \
../linux-generic/odp_schedule_basic.c \
diff --git a/platform/linux-dpdk/include/odp_ring_spsc_internal.h b/platform/linux-dpdk/include/odp_ring_spsc_internal.h
index e54f30cbb..99dfebfa8 100644
--- a/platform/linux-dpdk/include/odp_ring_spsc_internal.h
+++ b/platform/linux-dpdk/include/odp_ring_spsc_internal.h
@@ -52,8 +52,8 @@ static inline ring_spsc_t ring_spsc_create(const char *name, uint32_t size)
/* Ring name must be unique */
ring_spsc_name_to_mz_name(name, ring_name);
- rte_ring = rte_ring_create(ring_name, size, rte_socket_id(),
- RING_F_SP_ENQ | RING_F_SC_DEQ);
+ rte_ring = rte_ring_create(ring_name, size, rte_socket_id(),
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
if (rte_ring == NULL) {
ODP_ERR("Creating DPDK ring failed: %s\n",
rte_strerror(rte_errno));
diff --git a/platform/linux-dpdk/odp_queue_basic.c b/platform/linux-dpdk/odp_queue_basic.c
index 68249ee1d..1d20d5af5 100644
--- a/platform/linux-dpdk/odp_queue_basic.c
+++ b/platform/linux-dpdk/odp_queue_basic.c
@@ -292,7 +292,8 @@ static odp_queue_t queue_create(const char *name,
return ODP_QUEUE_INVALID;
}
- if (param->nonblocking == ODP_NONBLOCKING_LF) {
+ if (!queue->s.spsc &&
+ param->nonblocking == ODP_NONBLOCKING_LF) {
queue_lf_func_t *lf_func;
lf_func = &queue_glb->queue_lf_func;
@@ -366,6 +367,7 @@ void sched_cb_queue_set_status(uint32_t queue_index, int status)
static int queue_destroy(odp_queue_t handle)
{
+ int empty;
queue_entry_t *queue;
queue = qentry_from_handle(handle);
@@ -383,12 +385,21 @@ static int queue_destroy(odp_queue_t handle)
ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name);
return -1;
}
- if (ring_st_is_empty(queue->s.ring_st) == 0) {
+
+ if (queue->s.spsc)
+ empty = ring_spsc_is_empty(queue->s.ring_spsc);
+ else
+ empty = ring_st_is_empty(queue->s.ring_st);
+
+ if (!empty) {
UNLOCK(queue);
ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
return -1;
}
- ring_st_free(queue->s.ring_st);
+ if (queue->s.spsc)
+ ring_spsc_free(queue->s.ring_st);
+ else
+ ring_st_free(queue->s.ring_st);
switch (queue->s.status) {
case QUEUE_STATUS_READY:
@@ -406,7 +417,7 @@ static int queue_destroy(odp_queue_t handle)
ODP_ABORT("Unexpected queue status\n");
}
- if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF)
+ if (queue->s.queue_lf)
queue_lf_destroy(queue->s.queue_lf);
UNLOCK(queue);
@@ -639,6 +650,7 @@ static int queue_init(queue_entry_t *queue, const char *name,
const odp_queue_param_t *param)
{
uint32_t queue_size;
+ int spsc;
if (name == NULL) {
queue->s.name[0] = 0;
@@ -655,11 +667,6 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->s.type = queue->s.param.type;
- queue->s.enqueue = queue_int_enq;
- queue->s.dequeue = queue_int_deq;
- queue->s.enqueue_multi = queue_int_enq_multi;
- queue->s.dequeue_multi = queue_int_deq_multi;
-
queue->s.pktin = PKTIN_INVALID;
queue->s.pktout = PKTOUT_INVALID;
@@ -677,9 +684,27 @@ static int queue_init(queue_entry_t *queue, const char *name,
return -1;
}
- queue->s.ring_st = ring_st_create(queue->s.name, queue_size);
- if (queue->s.ring_st == NULL)
- return -1;
+ /* Single-producer / single-consumer plain queue has simple and
+ * lock-free implementation */
+ spsc = (param->type == ODP_QUEUE_TYPE_PLAIN) &&
+ (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) &&
+ (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE);
+
+ queue->s.spsc = spsc;
+ queue->s.queue_lf = NULL;
+
+ if (spsc) {
+ queue_spsc_init(queue, queue_size);
+ } else {
+ queue->s.enqueue = queue_int_enq;
+ queue->s.dequeue = queue_int_deq;
+ queue->s.enqueue_multi = queue_int_enq_multi;
+ queue->s.dequeue_multi = queue_int_deq_multi;
+
+ queue->s.ring_st = ring_st_create(queue->s.name, queue_size);
+ if (queue->s.ring_st == NULL)
+ return -1;
+ }
return 0;
}
diff --git a/platform/linux-dpdk/odp_queue_spsc.c b/platform/linux-dpdk/odp_queue_spsc.c
new file mode 100644
index 000000000..9ee6f5b03
--- /dev/null
+++ b/platform/linux-dpdk/odp_queue_spsc.c
@@ -0,0 +1,93 @@
+/* Copyright (c) 2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+#include <odp/api/hints.h>
+#include <odp_queue_internal.h>
+
+#include "config.h"
+#include <odp_debug_internal.h>
+
+static inline int spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ queue_entry_t *queue;
+ ring_spsc_t ring_spsc;
+
+ queue = q_int;
+ ring_spsc = queue->s.ring_spsc;
+
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ ODP_ERR("Bad queue status\n");
+ return -1;
+ }
+
+ return ring_spsc_enq_multi(ring_spsc, (void **)buf_hdr, num);
+}
+
+static inline int spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ queue_entry_t *queue;
+ ring_spsc_t ring_spsc;
+
+ queue = q_int;
+ ring_spsc = queue->s.ring_spsc;
+
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ /* Bad queue, or queue has been destroyed. */
+ return -1;
+ }
+
+ return ring_spsc_deq_multi(ring_spsc, (void **)buf_hdr, num);
+}
+
+static int queue_spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ return spsc_enq_multi(q_int, buf_hdr, num);
+}
+
+static int queue_spsc_enq(void *q_int, odp_buffer_hdr_t *buf_hdr)
+{
+ int ret;
+
+ ret = spsc_enq_multi(q_int, &buf_hdr, 1);
+
+ if (ret == 1)
+ return 0;
+ else
+ return -1;
+}
+
+static int queue_spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ return spsc_deq_multi(q_int, buf_hdr, num);
+}
+
+static odp_buffer_hdr_t *queue_spsc_deq(void *q_int)
+{
+ odp_buffer_hdr_t *buf_hdr = NULL;
+ int ret;
+
+ ret = spsc_deq_multi(q_int, &buf_hdr, 1);
+
+ if (ret == 1)
+ return buf_hdr;
+ else
+ return NULL;
+}
+
+void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
+{
+ queue->s.enqueue = queue_spsc_enq;
+ queue->s.dequeue = queue_spsc_deq;
+ queue->s.enqueue_multi = queue_spsc_enq_multi;
+ queue->s.dequeue_multi = queue_spsc_deq_multi;
+
+ queue->s.ring_spsc = ring_spsc_create(queue->s.name, queue_size);
+ if (queue->s.ring_spsc == NULL)
+ ODP_ABORT("Creating SPSC ring failed\n");
+}