diff options
author | Bill Fischofer <bill.fischofer@linaro.org> | 2015-09-01 12:31:07 -0500 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2015-09-01 21:05:50 +0300 |
commit | 8d82a346f881648dd90bb672f89bf13fd64d8971 (patch) | |
tree | 08a535abf8e97859285c747321981a368c06a02d | |
parent | aad66a68a86328be72bab7dc85128fe57dbe81f0 (diff) |
linux-generic: schedule: implement ordered locks
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
Reviewed-by: Petri Savolainen <petri.savolainen@nokia.com>
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r-- | platform/linux-generic/include/odp_buffer_internal.h | 3 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_queue_internal.h | 4 | ||||
-rw-r--r-- | platform/linux-generic/odp_queue.c | 48 | ||||
-rw-r--r-- | platform/linux-generic/odp_schedule.c | 9 |
4 files changed, 64 insertions, 0 deletions
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index ca4d31454..6badebabf 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -140,7 +140,10 @@ typedef struct odp_buffer_hdr_t { void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */ uint64_t order; /* sequence for ordered queues */ queue_entry_t *origin_qe; /* ordered queue origin */ + union { queue_entry_t *target_qe; /* ordered queue target */ + uint64_t sync; /* for ordered synchronization */ + }; } odp_buffer_hdr_t; /** @internal Compile time assert that the diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 4bd926515..f285ea304 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -82,6 +82,8 @@ struct queue_entry_s { uint64_t order_out; odp_buffer_hdr_t *reorder_head; odp_buffer_hdr_t *reorder_tail; + odp_atomic_u64_t sync_in; + odp_atomic_u64_t sync_out; }; typedef union queue_entry_u { @@ -120,6 +122,7 @@ int queue_sched_atomic(odp_queue_t handle); int release_order(queue_entry_t *origin_qe, uint64_t order, odp_pool_t pool, int enq_called); void get_sched_order(queue_entry_t **origin_qe, uint64_t *order); +void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync); void sched_enq_called(void); void sched_order_resolved(odp_buffer_hdr_t *buf_hdr); @@ -191,6 +194,7 @@ static inline void reorder_enq(queue_entry_t *queue, static inline void order_release(queue_entry_t *origin_qe, int count) { origin_qe->s.order_out += count; + odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count); } static inline int reorder_deq(queue_entry_t *queue, diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 09b0398c3..1bd0de6c0 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -123,6 +123,8 @@ int odp_queue_init_global(void) /* init locks */ queue_entry_t *queue = get_qentry(i); LOCK_INIT(&queue->s.lock); + odp_atomic_init_u64(&queue->s.sync_in, 0); + odp_atomic_init_u64(&queue->s.sync_out, 0); queue->s.handle = queue_from_id(i); } @@ -599,6 +601,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) if (queue_is_ordered(queue)) { buf_hdr->origin_qe = queue; buf_hdr->order = queue->s.order_in++; + buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in); buf_hdr->flags.sustain = 0; } else { buf_hdr->origin_qe = NULL; @@ -646,6 +649,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) if (queue_is_ordered(queue)) { buf_hdr[i]->origin_qe = queue; buf_hdr[i]->order = queue->s.order_in++; + buf_hdr[i]->sync = + odp_atomic_fetch_inc_u64(&queue->s.sync_in); buf_hdr[i]->flags.sustain = 0; } else { buf_hdr[i]->origin_qe = NULL; @@ -960,3 +965,46 @@ int release_order(queue_entry_t *origin_qe, uint64_t order, UNLOCK(&origin_qe->s.lock); return 0; } + +/* This routine is a no-op in linux-generic */ +int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED, + odp_queue_t queue ODP_UNUSED) +{ + return 0; +} + +void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED) +{ + queue_entry_t *origin_qe; + uint64_t *sync; + + get_sched_sync(&origin_qe, &sync); + if (!origin_qe) + return; + + /* Wait until we are in order. Note that sync_out will be incremented + * both by unlocks as well as order resolution, so we're OK if only + * some events in the ordered flow need to lock. + */ + while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out)) + odp_spin(); +} + +void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED) +{ + queue_entry_t *origin_qe; + uint64_t *sync; + + get_sched_sync(&origin_qe, &sync); + if (!origin_qe) + return; + + /* Get a new sync order for reusability, and release the lock. Note + * that this must be done in this sequence to prevent race conditions + * where the next waiter could lock and unlock before we're able to + * get a new sync order since that would cause order inversion on + * subsequent locks we may perform in this ordered context. + */ + *sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in); + odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out); +} diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index b78a38aa8..dc2d75f5b 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -86,6 +86,7 @@ typedef struct { queue_entry_t *qe; queue_entry_t *origin_qe; uint64_t order; + uint64_t sync; odp_pool_t pool; int enq_called; int num; @@ -546,6 +547,8 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.origin_qe = qe; sched_local.order = sched_local.buf_hdr[0]->order; + sched_local.sync = + sched_local.buf_hdr[0]->sync; sched_local.enq_called = 0; if (odp_queue_enq(pri_q, ev)) ODP_ABORT("schedule failed\n"); @@ -797,6 +800,12 @@ void get_sched_order(queue_entry_t **origin_qe, uint64_t *order) *order = sched_local.order; } +void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync) +{ + *origin_qe = sched_local.origin_qe; + *sync = &sched_local.sync; +} + void sched_order_resolved(odp_buffer_hdr_t *buf_hdr) { if (buf_hdr) |