aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_queue_spsc.c
blob: bb2a7ce9f6a2a2a0ad0aa83bffe82b9eb2fffb28 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/* SPDX-License-Identifier: BSD-3-Clause
 * Copyright (c) 2018 Linaro Limited
 * Copyright (c) 2021 Nokia
 */

#include <odp_debug_internal.h>
#include <odp_event_internal.h>
#include <odp_pool_internal.h>
#include <odp_queue_basic_internal.h>

#include <string.h>
#include <stdio.h>

static inline void event_index_from_hdr(uint32_t event_index[],
					_odp_event_hdr_t *event_hdr[], int num)
{
	int i;

	for (i = 0; i < num; i++)
		event_index[i] = event_hdr[i]->index.u32;
}

static inline void event_index_to_hdr(_odp_event_hdr_t *event_hdr[],
				      uint32_t event_index[], int num)
{
	int i;

	for (i = 0; i < num; i++) {
		event_hdr[i] = _odp_event_hdr_from_index_u32(event_index[i]);
		odp_prefetch(event_hdr[i]);
	}
}

static inline int spsc_enq_multi(odp_queue_t handle,
				 _odp_event_hdr_t *event_hdr[], int num)
{
	queue_entry_t *queue;
	ring_spsc_t *ring_spsc;
	uint32_t buf_idx[num];

	queue = qentry_from_handle(handle);
	ring_spsc = &queue->ring_spsc;

	event_index_from_hdr(buf_idx, event_hdr, num);

	if (odp_unlikely(queue->status < QUEUE_STATUS_READY)) {
		_ODP_ERR("Bad queue status\n");
		return -1;
	}

	return ring_spsc_enq_multi(ring_spsc, queue->ring_data,
				   queue->ring_mask, buf_idx, num);
}

static inline int spsc_deq_multi(odp_queue_t handle,
				 _odp_event_hdr_t *event_hdr[], int num)
{
	queue_entry_t *queue;
	int num_deq;
	ring_spsc_t *ring_spsc;
	uint32_t buf_idx[num];

	queue = qentry_from_handle(handle);
	ring_spsc = &queue->ring_spsc;

	if (odp_unlikely(queue->status < QUEUE_STATUS_READY)) {
		/* Bad queue, or queue has been destroyed. */
		return -1;
	}

	num_deq = ring_spsc_deq_multi(ring_spsc, queue->ring_data,
				      queue->ring_mask, buf_idx, num);

	if (num_deq == 0)
		return 0;

	event_index_to_hdr(event_hdr, buf_idx, num_deq);

	return num_deq;
}

static int queue_spsc_enq_multi(odp_queue_t handle, _odp_event_hdr_t *event_hdr[],
				int num)
{
	return spsc_enq_multi(handle, event_hdr, num);
}

static int queue_spsc_enq(odp_queue_t handle, _odp_event_hdr_t *event_hdr)
{
	int ret;

	ret = spsc_enq_multi(handle, &event_hdr, 1);

	if (ret == 1)
		return 0;
	else
		return -1;
}

static int queue_spsc_deq_multi(odp_queue_t handle, _odp_event_hdr_t *event_hdr[],
				int num)
{
	return spsc_deq_multi(handle, event_hdr, num);
}

static _odp_event_hdr_t *queue_spsc_deq(odp_queue_t handle)
{
	_odp_event_hdr_t *event_hdr = NULL;
	int ret;

	ret = spsc_deq_multi(handle, &event_hdr, 1);

	if (ret == 1)
		return event_hdr;
	else
		return NULL;
}

void _odp_queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
{
	uint64_t offset;

	queue->enqueue = queue_spsc_enq;
	queue->dequeue = queue_spsc_deq;
	queue->enqueue_multi = queue_spsc_enq_multi;
	queue->dequeue_multi = queue_spsc_deq_multi;
	queue->orig_dequeue_multi = queue_spsc_deq_multi;

	offset = queue->index * (uint64_t)_odp_queue_glb->config.max_queue_size;

	queue->ring_data = &_odp_queue_glb->ring_data[offset];
	queue->ring_mask = queue_size - 1;
	ring_spsc_init(&queue->ring_spsc);
}