aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'platform/linux-generic/odp_queue.c')
-rw-r--r--platform/linux-generic/odp_queue.c70
1 files changed, 44 insertions, 26 deletions
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 9fae71084..a636a439f 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -53,14 +53,17 @@ queue_entry_t *get_qentry(uint32_t queue_id)
return &queue_tbl->queue[queue_id];
}
-static void queue_init(queue_entry_t *queue, const char *name,
- odp_queue_type_t type, odp_queue_param_t *param)
+static int queue_init(queue_entry_t *queue, const char *name,
+ odp_queue_type_t type, odp_queue_param_t *param)
{
strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1);
queue->s.type = type;
if (param) {
memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
+ if (queue->s.param.sched.lock_count >
+ ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE)
+ return -1;
} else {
/* Defaults */
memset(&queue->s.param, 0, sizeof(odp_queue_param_t));
@@ -98,12 +101,13 @@ static void queue_init(queue_entry_t *queue, const char *name,
queue->s.pri_queue = ODP_QUEUE_INVALID;
queue->s.cmd_ev = ODP_EVENT_INVALID;
+ return 0;
}
int odp_queue_init_global(void)
{
- uint32_t i;
+ uint32_t i, j;
odp_shm_t shm;
ODP_DBG("Queue init ... ");
@@ -123,8 +127,10 @@ int odp_queue_init_global(void)
/* init locks */
queue_entry_t *queue = get_qentry(i);
LOCK_INIT(&queue->s.lock);
- odp_atomic_init_u64(&queue->s.sync_in, 0);
- odp_atomic_init_u64(&queue->s.sync_out, 0);
+ for (j = 0; j < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; j++) {
+ odp_atomic_init_u64(&queue->s.sync_in[j], 0);
+ odp_atomic_init_u64(&queue->s.sync_out[j], 0);
+ }
queue->s.handle = queue_from_id(i);
}
@@ -201,6 +207,14 @@ odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle)
return queue->s.param.sched.group;
}
+int odp_queue_lock_count(odp_queue_t handle)
+{
+ queue_entry_t *queue = queue_to_qentry(handle);
+
+ return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ?
+ (int)queue->s.param.sched.lock_count : -1;
+}
+
odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type,
odp_queue_param_t *param)
{
@@ -216,7 +230,10 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type,
LOCK(&queue->s.lock);
if (queue->s.status == QUEUE_STATUS_FREE) {
- queue_init(queue, name, type, param);
+ if (queue_init(queue, name, type, param)) {
+ UNLOCK(&queue->s.lock);
+ return handle;
+ }
if (type == ODP_QUEUE_TYPE_SCHED ||
type == ODP_QUEUE_TYPE_PKTIN)
@@ -574,13 +591,14 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
{
- return buf_hdr->origin_qe->s.enqueue(buf_hdr->target_qe, buf_hdr,
+ return buf_hdr->target_qe->s.enqueue(buf_hdr->target_qe, buf_hdr,
buf_hdr->flags.sustain);
}
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
{
odp_buffer_hdr_t *buf_hdr;
+ uint32_t i;
LOCK(&queue->s.lock);
@@ -604,7 +622,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
if (queue_is_ordered(queue)) {
buf_hdr->origin_qe = queue;
buf_hdr->order = queue->s.order_in++;
- buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
+ for (i = 0; i < queue->s.param.sched.lock_count; i++) {
+ buf_hdr->sync[i] =
+ odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]);
+ }
buf_hdr->flags.sustain = 0;
} else {
buf_hdr->origin_qe = NULL;
@@ -625,6 +646,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
{
odp_buffer_hdr_t *hdr;
int i;
+ uint32_t j;
LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -652,8 +674,11 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
if (queue_is_ordered(queue)) {
buf_hdr[i]->origin_qe = queue;
buf_hdr[i]->order = queue->s.order_in++;
- buf_hdr[i]->sync =
- odp_atomic_fetch_inc_u64(&queue->s.sync_in);
+ for (j = 0; j < queue->s.param.sched.lock_count; j++) {
+ buf_hdr[i]->sync[j] =
+ odp_atomic_fetch_inc_u64
+ (&queue->s.sync_in[j]);
+ }
buf_hdr[i]->flags.sustain = 0;
} else {
buf_hdr[i]->origin_qe = NULL;
@@ -762,7 +787,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
return -1;
} else {
rc = pktout_enqueue(queue, buf_hdr);
- if (!rc)
+ if (rc)
return rc;
}
@@ -970,39 +995,32 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
return 0;
}
-/* This routine is a no-op in linux-generic */
-int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED,
- odp_queue_t queue ODP_UNUSED)
-{
- return 0;
-}
-
-void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+void odp_schedule_order_lock(unsigned lock_index)
{
queue_entry_t *origin_qe;
uint64_t *sync;
- get_sched_sync(&origin_qe, &sync);
- if (!origin_qe)
+ get_sched_sync(&origin_qe, &sync, lock_index);
+ if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count)
return;
/* Wait until we are in order. Note that sync_out will be incremented
* both by unlocks as well as order resolution, so we're OK if only
* some events in the ordered flow need to lock.
*/
- while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+ while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]))
odp_spin();
}
-void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+void odp_schedule_order_unlock(unsigned lock_index)
{
queue_entry_t *origin_qe;
uint64_t *sync;
- get_sched_sync(&origin_qe, &sync);
- if (!origin_qe)
+ get_sched_sync(&origin_qe, &sync, lock_index);
+ if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count)
return;
/* Release the ordered lock */
- odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]);
}