aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-dpdk/include/odp_ptr_ring_spsc_internal.h
diff options
context:
space:
mode:
Diffstat (limited to 'platform/linux-dpdk/include/odp_ptr_ring_spsc_internal.h')
-rw-r--r--platform/linux-dpdk/include/odp_ptr_ring_spsc_internal.h107
1 files changed, 107 insertions, 0 deletions
diff --git a/platform/linux-dpdk/include/odp_ptr_ring_spsc_internal.h b/platform/linux-dpdk/include/odp_ptr_ring_spsc_internal.h
new file mode 100644
index 000000000..3680f087a
--- /dev/null
+++ b/platform/linux-dpdk/include/odp_ptr_ring_spsc_internal.h
@@ -0,0 +1,107 @@
+/* Copyright (c) 2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_SPSC_INTERNAL_H_
+#define ODP_RING_SPSC_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/queue.h>
+
+#include <odp_debug_internal.h>
+
+#include <rte_config.h>
+#include <rte_ring.h>
+#include <rte_errno.h>
+
+/* Lock-free ring for single-producer / single-consumer usage.
+ *
+ * Thread doing an operation may be different each time, but the same operation
+ * (enq- or dequeue) must not be called concurrently. The next thread may call
+ * the same operation only when it's sure that the previous thread have returned
+ * from the call, or will never return back to finish the call when interrupted
+ * during the call.
+ *
+ * Enqueue and dequeue operations can be done concurrently.
+ */
+typedef struct rte_ring *ring_spsc_t;
+
+static void ring_spsc_name_to_mz_name(const char *name, char *ring_name)
+{
+ int i = 0;
+ int max_len = ODP_QUEUE_NAME_LEN < RTE_RING_NAMESIZE ?
+ ODP_QUEUE_NAME_LEN : RTE_RING_NAMESIZE;
+
+ do {
+ snprintf(ring_name, max_len, "%d-%s", i++, name);
+ ring_name[max_len - 1] = 0;
+ } while (rte_ring_lookup(ring_name) != NULL);
+}
+
+/* Initialize ring. Ring size must be a power of two. */
+static inline ring_spsc_t ring_spsc_create(const char *name, uint32_t size)
+{
+ struct rte_ring *rte_ring;
+ char ring_name[RTE_RING_NAMESIZE];
+
+ /* Ring name must be unique */
+ ring_spsc_name_to_mz_name(name, ring_name);
+
+ rte_ring = rte_ring_create(ring_name, size, rte_socket_id(),
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (rte_ring == NULL) {
+ _ODP_ERR("Creating DPDK ring failed: %s\n", rte_strerror(rte_errno));
+ return NULL;
+ }
+
+ return rte_ring;
+}
+
+/* Free all memory used by the ring. */
+static inline void ring_spsc_free(ring_spsc_t ring)
+{
+ rte_ring_free(ring);
+}
+
+/* Dequeue data from the ring head. Max_num is smaller than ring size.*/
+static inline uint32_t ring_spsc_deq_multi(ring_spsc_t ring, void **data,
+ uint32_t max_num)
+{
+ return rte_ring_sc_dequeue_burst(ring, data, max_num, NULL);
+}
+
+/* Enqueue data into the ring tail. Num_data is smaller than ring size. */
+static inline uint32_t ring_spsc_enq_multi(ring_spsc_t ring, void **data,
+ uint32_t num_data)
+{
+ return rte_ring_sp_enqueue_burst(ring, data, num_data, NULL);
+}
+
+/* Check if ring is empty */
+static inline int ring_spsc_is_empty(ring_spsc_t ring)
+{
+ return rte_ring_empty(ring);
+}
+
+/* Return current ring length */
+static inline int ring_spsc_length(ring_spsc_t ring)
+{
+ return rte_ring_count(ring);
+}
+
+/* Return maximum ring length */
+static inline int ring_spsc_max_length(ring_spsc_t ring)
+{
+ return rte_ring_get_capacity(ring);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif