aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBill Fischofer <bill.fischofer@linaro.org>2015-09-01 12:31:07 -0500
committerMaxim Uvarov <maxim.uvarov@linaro.org>2015-09-01 21:05:50 +0300
commit8d82a346f881648dd90bb672f89bf13fd64d8971 (patch)
tree08a535abf8e97859285c747321981a368c06a02d
parentaad66a68a86328be72bab7dc85128fe57dbe81f0 (diff)
linux-generic: schedule: implement ordered locks
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> Reviewed-by: Petri Savolainen <petri.savolainen@nokia.com> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r--platform/linux-generic/include/odp_buffer_internal.h3
-rw-r--r--platform/linux-generic/include/odp_queue_internal.h4
-rw-r--r--platform/linux-generic/odp_queue.c48
-rw-r--r--platform/linux-generic/odp_schedule.c9
4 files changed, 64 insertions, 0 deletions
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ca4d31454..6badebabf 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -140,7 +140,10 @@ typedef struct odp_buffer_hdr_t {
void *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
uint64_t order; /* sequence for ordered queues */
queue_entry_t *origin_qe; /* ordered queue origin */
+ union {
queue_entry_t *target_qe; /* ordered queue target */
+ uint64_t sync; /* for ordered synchronization */
+ };
} odp_buffer_hdr_t;
/** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 4bd926515..f285ea304 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -82,6 +82,8 @@ struct queue_entry_s {
uint64_t order_out;
odp_buffer_hdr_t *reorder_head;
odp_buffer_hdr_t *reorder_tail;
+ odp_atomic_u64_t sync_in;
+ odp_atomic_u64_t sync_out;
};
typedef union queue_entry_u {
@@ -120,6 +122,7 @@ int queue_sched_atomic(odp_queue_t handle);
int release_order(queue_entry_t *origin_qe, uint64_t order,
odp_pool_t pool, int enq_called);
void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync);
void sched_enq_called(void);
void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
@@ -191,6 +194,7 @@ static inline void reorder_enq(queue_entry_t *queue,
static inline void order_release(queue_entry_t *origin_qe, int count)
{
origin_qe->s.order_out += count;
+ odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
}
static inline int reorder_deq(queue_entry_t *queue,
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 09b0398c3..1bd0de6c0 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -123,6 +123,8 @@ int odp_queue_init_global(void)
/* init locks */
queue_entry_t *queue = get_qentry(i);
LOCK_INIT(&queue->s.lock);
+ odp_atomic_init_u64(&queue->s.sync_in, 0);
+ odp_atomic_init_u64(&queue->s.sync_out, 0);
queue->s.handle = queue_from_id(i);
}
@@ -599,6 +601,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
if (queue_is_ordered(queue)) {
buf_hdr->origin_qe = queue;
buf_hdr->order = queue->s.order_in++;
+ buf_hdr->sync = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr->flags.sustain = 0;
} else {
buf_hdr->origin_qe = NULL;
@@ -646,6 +649,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
if (queue_is_ordered(queue)) {
buf_hdr[i]->origin_qe = queue;
buf_hdr[i]->order = queue->s.order_in++;
+ buf_hdr[i]->sync =
+ odp_atomic_fetch_inc_u64(&queue->s.sync_in);
buf_hdr[i]->flags.sustain = 0;
} else {
buf_hdr[i]->origin_qe = NULL;
@@ -960,3 +965,46 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
UNLOCK(&origin_qe->s.lock);
return 0;
}
+
+/* This routine is a no-op in linux-generic */
+int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED,
+ odp_queue_t queue ODP_UNUSED)
+{
+ return 0;
+}
+
+void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+{
+ queue_entry_t *origin_qe;
+ uint64_t *sync;
+
+ get_sched_sync(&origin_qe, &sync);
+ if (!origin_qe)
+ return;
+
+ /* Wait until we are in order. Note that sync_out will be incremented
+ * both by unlocks as well as order resolution, so we're OK if only
+ * some events in the ordered flow need to lock.
+ */
+ while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+ odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+{
+ queue_entry_t *origin_qe;
+ uint64_t *sync;
+
+ get_sched_sync(&origin_qe, &sync);
+ if (!origin_qe)
+ return;
+
+ /* Get a new sync order for reusability, and release the lock. Note
+ * that this must be done in this sequence to prevent race conditions
+ * where the next waiter could lock and unlock before we're able to
+ * get a new sync order since that would cause order inversion on
+ * subsequent locks we may perform in this ordered context.
+ */
+ *sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+ odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index b78a38aa8..dc2d75f5b 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -86,6 +86,7 @@ typedef struct {
queue_entry_t *qe;
queue_entry_t *origin_qe;
uint64_t order;
+ uint64_t sync;
odp_pool_t pool;
int enq_called;
int num;
@@ -546,6 +547,8 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
sched_local.origin_qe = qe;
sched_local.order =
sched_local.buf_hdr[0]->order;
+ sched_local.sync =
+ sched_local.buf_hdr[0]->sync;
sched_local.enq_called = 0;
if (odp_queue_enq(pri_q, ev))
ODP_ABORT("schedule failed\n");
@@ -797,6 +800,12 @@ void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
*order = sched_local.order;
}
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync)
+{
+ *origin_qe = sched_local.origin_qe;
+ *sync = &sched_local.sync;
+}
+
void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
{
if (buf_hdr)