diff options
Diffstat (limited to 'platform/linux-generic/odp_queue.c')
-rw-r--r-- | platform/linux-generic/odp_queue.c | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 9fae71084..a636a439f 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -53,14 +53,17 @@ queue_entry_t *get_qentry(uint32_t queue_id) return &queue_tbl->queue[queue_id]; } -static void queue_init(queue_entry_t *queue, const char *name, - odp_queue_type_t type, odp_queue_param_t *param) +static int queue_init(queue_entry_t *queue, const char *name, + odp_queue_type_t type, odp_queue_param_t *param) { strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1); queue->s.type = type; if (param) { memcpy(&queue->s.param, param, sizeof(odp_queue_param_t)); + if (queue->s.param.sched.lock_count > + ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE) + return -1; } else { /* Defaults */ memset(&queue->s.param, 0, sizeof(odp_queue_param_t)); @@ -98,12 +101,13 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.pri_queue = ODP_QUEUE_INVALID; queue->s.cmd_ev = ODP_EVENT_INVALID; + return 0; } int odp_queue_init_global(void) { - uint32_t i; + uint32_t i, j; odp_shm_t shm; ODP_DBG("Queue init ... "); @@ -123,8 +127,10 @@ int odp_queue_init_global(void) /* init locks */ queue_entry_t *queue = get_qentry(i); LOCK_INIT(&queue->s.lock); - odp_atomic_init_u64(&queue->s.sync_in, 0); - odp_atomic_init_u64(&queue->s.sync_out, 0); + for (j = 0; j < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; j++) { + odp_atomic_init_u64(&queue->s.sync_in[j], 0); + odp_atomic_init_u64(&queue->s.sync_out[j], 0); + } queue->s.handle = queue_from_id(i); } @@ -201,6 +207,14 @@ odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle) return queue->s.param.sched.group; } +int odp_queue_lock_count(odp_queue_t handle) +{ + queue_entry_t *queue = queue_to_qentry(handle); + + return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + (int)queue->s.param.sched.lock_count : -1; +} + odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, odp_queue_param_t *param) { @@ -216,7 +230,10 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, LOCK(&queue->s.lock); if (queue->s.status == QUEUE_STATUS_FREE) { - queue_init(queue, name, type, param); + if (queue_init(queue, name, type, param)) { + UNLOCK(&queue->s.lock); + return handle; + } if (type == ODP_QUEUE_TYPE_SCHED || type == ODP_QUEUE_TYPE_PKTIN) @@ -574,13 +591,14 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) int queue_enq_internal(odp_buffer_hdr_t *buf_hdr) { - return buf_hdr->origin_qe->s.enqueue(buf_hdr->target_qe, buf_hdr, + return buf_hdr->target_qe->s.enqueue(buf_hdr->target_qe, buf_hdr, buf_hdr->flags.sustain); } odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) { odp_buffer_hdr_t *buf_hdr; + uint32_t i; LOCK(&queue->s.lock); @@ -604,7 +622,10 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) if (queue_is_ordered(queue)) { buf_hdr->origin_qe = queue; buf_hdr->order = queue->s.order_in++; - buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in); + for (i = 0; i < queue->s.param.sched.lock_count; i++) { + buf_hdr->sync[i] = + odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]); + } buf_hdr->flags.sustain = 0; } else { buf_hdr->origin_qe = NULL; @@ -625,6 +646,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { odp_buffer_hdr_t *hdr; int i; + uint32_t j; LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { @@ -652,8 +674,11 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) if (queue_is_ordered(queue)) { buf_hdr[i]->origin_qe = queue; buf_hdr[i]->order = queue->s.order_in++; - buf_hdr[i]->sync = - odp_atomic_fetch_inc_u64(&queue->s.sync_in); + for (j = 0; j < queue->s.param.sched.lock_count; j++) { + buf_hdr[i]->sync[j] = + odp_atomic_fetch_inc_u64 + (&queue->s.sync_in[j]); + } buf_hdr[i]->flags.sustain = 0; } else { buf_hdr[i]->origin_qe = NULL; @@ -762,7 +787,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, return -1; } else { rc = pktout_enqueue(queue, buf_hdr); - if (!rc) + if (rc) return rc; } @@ -970,39 +995,32 @@ int release_order(queue_entry_t *origin_qe, uint64_t order, return 0; } -/* This routine is a no-op in linux-generic */ -int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED, - odp_queue_t queue ODP_UNUSED) -{ - return 0; -} - -void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED) +void odp_schedule_order_lock(unsigned lock_index) { queue_entry_t *origin_qe; uint64_t *sync; - get_sched_sync(&origin_qe, &sync); - if (!origin_qe) + get_sched_sync(&origin_qe, &sync, lock_index); + if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) return; /* Wait until we are in order. Note that sync_out will be incremented * both by unlocks as well as order resolution, so we're OK if only * some events in the ordered flow need to lock. */ - while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out)) + while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index])) odp_spin(); } -void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED) +void odp_schedule_order_unlock(unsigned lock_index) { queue_entry_t *origin_qe; uint64_t *sync; - get_sched_sync(&origin_qe, &sync); - if (!origin_qe) + get_sched_sync(&origin_qe, &sync, lock_index); + if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) return; /* Release the ordered lock */ - odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out); + odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]); } |