aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@nokia.com>2016-09-15 16:39:20 +0300
committerMaxim Uvarov <maxim.uvarov@linaro.org>2016-09-22 19:02:28 +0300
commit92336dd2808af4826371d467588dcb81daafe4cf (patch)
tree6adef04ef801fd6fd6f1c8503e38b500d6eb2c54
parent7bb62b522a5f89d6d19a4c77254222b1c07ab44b (diff)
linux-gen: queue: burst enq and deq
Added support for a buffer header to carry a burst of buffer pointers. The buffer itself is the last one a burst. Burst are built with a enq_multi call, so single enq operations do not benefit from it. Signed-off-by: Petri Savolainen <petri.savolainen@nokia.com> Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r--platform/linux-generic/include/odp_buffer_internal.h7
-rw-r--r--platform/linux-generic/odp_queue.c124
-rw-r--r--platform/linux-generic/odp_schedule_ordered.c19
3 files changed, 113 insertions, 37 deletions
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 7b0ef8b0..69daf94b 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,6 +103,8 @@ typedef union odp_buffer_bits_t {
};
} odp_buffer_bits_t;
+#define BUFFER_BURST_SIZE 8
+
/* Common buffer header */
struct odp_buffer_hdr_t {
struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */
@@ -111,6 +113,11 @@ struct odp_buffer_hdr_t {
struct odp_buffer_hdr_t *link;
};
odp_buffer_bits_t handle; /* handle */
+
+ int burst_num;
+ int burst_first;
+ struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE];
+
union {
uint32_t all;
struct {
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 80d99e88..5b962e9f 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -388,20 +388,48 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
{
int sched = 0;
int i, ret;
- odp_buffer_hdr_t *tail;
-
- /* Chain input buffers together */
- for (i = 0; i < num - 1; i++)
- buf_hdr[i]->next = buf_hdr[i + 1];
-
- tail = buf_hdr[num - 1];
- buf_hdr[num - 1]->next = NULL;
+ odp_buffer_hdr_t *hdr, *tail, *next_hdr;
+ /* Ordered queues do not use bursts */
if (sched_fn->ord_enq_multi(queue->s.index, (void **)buf_hdr, num,
sustain, &ret))
return ret;
- /* Handle unordered enqueues */
+ /* Optimize the common case of single enqueue */
+ if (num == 1) {
+ tail = buf_hdr[0];
+ hdr = tail;
+ hdr->burst_num = 0;
+ } else {
+ int next;
+
+ /* Start from the last buffer header */
+ tail = buf_hdr[num - 1];
+ hdr = tail;
+ next = num - 2;
+
+ while (1) {
+ /* Build a burst. The buffer header carrying
+ * a burst is the last buffer of the burst. */
+ for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE;
+ i++, next--)
+ hdr->burst[BUFFER_BURST_SIZE - 1 - i] =
+ buf_hdr[next];
+
+ hdr->burst_num = i;
+ hdr->burst_first = BUFFER_BURST_SIZE - i;
+
+ if (odp_likely(next < 0))
+ break;
+
+ /* Get another header and link it */
+ next_hdr = hdr;
+ hdr = buf_hdr[next];
+ hdr->next = next_hdr;
+ next--;
+ }
+ }
+
LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
@@ -411,9 +439,9 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
/* Empty queue */
if (queue->s.head == NULL)
- queue->s.head = buf_hdr[0];
+ queue->s.head = hdr;
else
- queue->s.tail->next = buf_hdr[0];
+ queue->s.tail->next = hdr;
queue->s.tail = tail;
@@ -483,9 +511,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
int num)
{
- odp_buffer_hdr_t *hdr;
- int i;
- uint32_t j;
+ odp_buffer_hdr_t *hdr, *next;
+ int i, j;
+ int updated = 0;
LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -506,33 +534,65 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
return 0;
}
- for (i = 0; i < num && hdr; i++) {
- buf_hdr[i] = hdr;
- hdr = hdr->next;
- buf_hdr[i]->next = NULL;
- if (queue_is_ordered(queue)) {
- buf_hdr[i]->origin_qe = queue;
- buf_hdr[i]->order = queue->s.order_in++;
- for (j = 0; j < queue->s.param.sched.lock_count; j++) {
- buf_hdr[i]->sync[j] =
+ for (i = 0; i < num && hdr; ) {
+ int burst_num = hdr->burst_num;
+ int first = hdr->burst_first;
+
+ /* First, get bursted buffers */
+ for (j = 0; j < burst_num && i < num; j++, i++) {
+ buf_hdr[i] = hdr->burst[first + j];
+ odp_prefetch(buf_hdr[i]);
+ }
+
+ if (burst_num) {
+ hdr->burst_num = burst_num - j;
+ hdr->burst_first = first + j;
+ }
+
+ if (i == num)
+ break;
+
+ /* When burst is empty, consume the current buffer header and
+ * move to the next header */
+ buf_hdr[i] = hdr;
+ next = hdr->next;
+ hdr->next = NULL;
+ hdr = next;
+ updated++;
+ i++;
+ }
+
+ /* Ordered queue book keeping inside the lock */
+ if (queue_is_ordered(queue)) {
+ for (j = 0; j < i; j++) {
+ uint32_t k;
+
+ buf_hdr[j]->origin_qe = queue;
+ buf_hdr[j]->order = queue->s.order_in++;
+ for (k = 0; k < queue->s.param.sched.lock_count; k++) {
+ buf_hdr[j]->sync[k] =
odp_atomic_fetch_inc_u64
- (&queue->s.sync_in[j]);
+ (&queue->s.sync_in[k]);
}
- buf_hdr[i]->flags.sustain = SUSTAIN_ORDER;
- } else {
- buf_hdr[i]->origin_qe = NULL;
+ buf_hdr[j]->flags.sustain = SUSTAIN_ORDER;
}
}
- queue->s.head = hdr;
+ /* Write head only if updated */
+ if (updated)
+ queue->s.head = hdr;
- if (hdr == NULL) {
- /* Queue is now empty */
+ /* Queue is empty */
+ if (hdr == NULL)
queue->s.tail = NULL;
- }
UNLOCK(&queue->s.lock);
+ /* Init origin_qe for non-ordered queues */
+ if (!queue_is_ordered(queue))
+ for (j = 0; j < i; j++)
+ buf_hdr[j]->origin_qe = NULL;
+
return i;
}
@@ -543,7 +603,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
{
- odp_buffer_hdr_t *buf_hdr;
+ odp_buffer_hdr_t *buf_hdr = NULL;
int ret;
ret = deq_multi(queue, &buf_hdr, 1);
diff --git a/platform/linux-generic/odp_schedule_ordered.c b/platform/linux-generic/odp_schedule_ordered.c
index 92a1cc85..84121838 100644
--- a/platform/linux-generic/odp_schedule_ordered.c
+++ b/platform/linux-generic/odp_schedule_ordered.c
@@ -457,15 +457,24 @@ int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[],
{
queue_entry_t *origin_qe;
uint64_t order;
- int rc;
+ int i, rc;
queue_entry_t *qe = get_qentry(queue_index);
- odp_buffer_hdr_t *buf_hdr = p_buf_hdr[0];
+ odp_buffer_hdr_t *first_hdr = p_buf_hdr[0];
+ odp_buffer_hdr_t **buf_hdr = (odp_buffer_hdr_t **)p_buf_hdr;
+
+ /* Chain input buffers together */
+ for (i = 0; i < num - 1; i++) {
+ buf_hdr[i]->next = buf_hdr[i + 1];
+ buf_hdr[i]->burst_num = 0;
+ }
+
+ buf_hdr[num - 1]->next = NULL;
/* Handle ordered enqueues commonly via links */
- get_queue_order(&origin_qe, &order, buf_hdr);
+ get_queue_order(&origin_qe, &order, first_hdr);
if (origin_qe) {
- buf_hdr->link = buf_hdr->next;
- rc = ordered_queue_enq(qe, buf_hdr, sustain,
+ first_hdr->link = first_hdr->next;
+ rc = ordered_queue_enq(qe, first_hdr, sustain,
origin_qe, order);
*ret = rc == 0 ? num : rc;
return 1;