aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/include/odp_ring_internal.h
blob: 55fedeb3aa68d43d6f0f0a7d65c798120e9b725b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/* Copyright (c) 2016, Linaro Limited
 * All rights reserved.
 *
 * SPDX-License-Identifier:     BSD-3-Clause
 */

#ifndef ODP_RING_INTERNAL_H_
#define ODP_RING_INTERNAL_H_

#ifdef __cplusplus
extern "C" {
#endif

#include <odp/api/atomic.h>
#include <odp/api/hints.h>
#include <odp_align_internal.h>

/* Ring empty, not a valid data value. */
#define RING_EMPTY ((uint32_t)-1)

/* Ring of uint32_t data
 *
 * Ring stores head and tail counters. Ring indexes are formed from these
 * counters with a mask (mask = ring_size - 1), which requires that ring size
 * must be a power of two. Also ring size must be larger than the maximum
 * number of data items that will be stored on it (there's no check against
 * overwriting). */
typedef struct {
	/* Writer head and tail */
	odp_atomic_u32_t w_head;
	odp_atomic_u32_t w_tail;
	uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];

	/* Reader head and tail */
	odp_atomic_u32_t r_head;
	odp_atomic_u32_t r_tail;

	uint32_t data[0];
} ring_t ODP_ALIGNED_CACHE;

/* Initialize ring */
static inline void ring_init(ring_t *ring)
{
	odp_atomic_init_u32(&ring->w_head, 0);
	odp_atomic_init_u32(&ring->w_tail, 0);
	odp_atomic_init_u32(&ring->r_head, 0);
	odp_atomic_init_u32(&ring->r_tail, 0);
}

/* Dequeue data from the ring head */
static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
{
	uint32_t head, tail, new_head;
	uint32_t data;

	head = odp_atomic_load_u32(&ring->r_head);

	/* Move reader head. This thread owns data at the new head. */
	do {
		tail = odp_atomic_load_u32(&ring->w_tail);

		if (head == tail)
			return RING_EMPTY;

		new_head = head + 1;

	} while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
			      new_head) == 0));

	/* Read queue index */
	data = ring->data[new_head & mask];

	/* Wait until other readers have updated the tail */
	while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
		odp_cpu_pause();

	/* Now update the reader tail */
	odp_atomic_store_rel_u32(&ring->r_tail, new_head);

	return data;
}

/* Dequeue multiple data from the ring head. Num is smaller than ring size. */
static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
				      uint32_t data[], uint32_t num)
{
	uint32_t head, tail, new_head, i;

	head = odp_atomic_load_u32(&ring->r_head);

	/* Move reader head. This thread owns data at the new head. */
	do {
		tail = odp_atomic_load_u32(&ring->w_tail);

		/* Ring is empty */
		if (head == tail)
			return 0;

		/* Try to take all available */
		if ((tail - head) < num)
			num = tail - head;

		new_head = head + num;

	} while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
			      new_head) == 0));

	/* Read queue index */
	for (i = 0; i < num; i++)
		data[i] = ring->data[(head + 1 + i) & mask];

	/* Wait until other readers have updated the tail */
	while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
		odp_cpu_pause();

	/* Now update the reader tail */
	odp_atomic_store_rel_u32(&ring->r_tail, new_head);

	return num;
}

/* Enqueue data into the ring tail */
static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
{
	uint32_t old_head, new_head;

	/* Reserve a slot in the ring for writing */
	old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
	new_head = old_head + 1;

	/* Ring is full. Wait for the last reader to finish. */
	while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
		odp_cpu_pause();

	/* Write data */
	ring->data[new_head & mask] = data;

	/* Wait until other writers have updated the tail */
	while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
		odp_cpu_pause();

	/* Now update the writer tail */
	odp_atomic_store_rel_u32(&ring->w_tail, new_head);
}

/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */
static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
				  uint32_t num)
{
	uint32_t old_head, new_head, i;

	/* Reserve a slot in the ring for writing */
	old_head = odp_atomic_fetch_add_u32(&ring->w_head, num);
	new_head = old_head + 1;

	/* Ring is full. Wait for the last reader to finish. */
	while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
		odp_cpu_pause();

	/* Write data */
	for (i = 0; i < num; i++)
		ring->data[(new_head + i) & mask] = data[i];

	/* Wait until other writers have updated the tail */
	while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
		odp_cpu_pause();

	/* Now update the writer tail */
	odp_atomic_store_rel_u32(&ring->w_tail, old_head + num);
}

#ifdef __cplusplus
}
#endif

#endif