aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_queue_spsc.c
blob: f8b04ca2206cfe1c4964a2819cc728f100d935b5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/* Copyright (c) 2018, Linaro Limited
 * All rights reserved.
 *
 * SPDX-License-Identifier:     BSD-3-Clause
 */
#include <string.h>
#include <stdio.h>

#include <odp_queue_internal.h>
#include <odp_pool_internal.h>

#include "config.h"
#include <odp_debug_internal.h>

static inline void buffer_index_from_buf(uint32_t buffer_index[],
					 odp_buffer_hdr_t *buf_hdr[], int num)
{
	int i;

	for (i = 0; i < num; i++)
		buffer_index[i] = buf_hdr[i]->index.u32;
}

static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[],
				       uint32_t buffer_index[], int num)
{
	int i;

	for (i = 0; i < num; i++) {
		buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]);
		odp_prefetch(buf_hdr[i]);
	}
}

static inline int spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
				 int num)
{
	queue_entry_t *queue;
	ring_spsc_t *ring_spsc;
	uint32_t buf_idx[num];

	queue = q_int;
	ring_spsc = &queue->s.ring_spsc;

	buffer_index_from_buf(buf_idx, buf_hdr, num);

	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
		ODP_ERR("Bad queue status\n");
		return -1;
	}

	return ring_spsc_enq_multi(ring_spsc, buf_idx, num);
}

static inline int spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
				 int num)
{
	queue_entry_t *queue;
	int num_deq;
	ring_spsc_t *ring_spsc;
	uint32_t buf_idx[num];

	queue = q_int;
	ring_spsc = &queue->s.ring_spsc;

	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
		/* Bad queue, or queue has been destroyed. */
		return -1;
	}

	num_deq = ring_spsc_deq_multi(ring_spsc, buf_idx, num);

	if (num_deq == 0)
		return 0;

	buffer_index_to_buf(buf_hdr, buf_idx, num_deq);

	return num_deq;
}

static int queue_spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
				int num)
{
	return spsc_enq_multi(q_int, buf_hdr, num);
}

static int queue_spsc_enq(void *q_int, odp_buffer_hdr_t *buf_hdr)
{
	int ret;

	ret = spsc_enq_multi(q_int, &buf_hdr, 1);

	if (ret == 1)
		return 0;
	else
		return -1;
}

static int queue_spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
				int num)
{
	return spsc_deq_multi(q_int, buf_hdr, num);
}

static odp_buffer_hdr_t *queue_spsc_deq(void *q_int)
{
	odp_buffer_hdr_t *buf_hdr = NULL;
	int ret;

	ret = spsc_deq_multi(q_int, &buf_hdr, 1);

	if (ret == 1)
		return buf_hdr;
	else
		return NULL;
}

void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
{
	uint64_t offset;

	queue->s.enqueue = queue_spsc_enq;
	queue->s.dequeue = queue_spsc_deq;
	queue->s.enqueue_multi = queue_spsc_enq_multi;
	queue->s.dequeue_multi = queue_spsc_deq_multi;

	offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;

	ring_spsc_init(&queue->s.ring_spsc, &queue_glb->ring_data[offset],
		       queue_size);
}