aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@linaro.org>2018-06-15 16:48:26 +0300
committerMaxim Uvarov <maxim.uvarov@linaro.org>2018-06-19 15:48:36 +0300
commit492390a79f20c7aaaf16f232fbd5ecf0d9b700e7 (patch)
treec6e9fa0178dc7ff142b499a8eb6087eb77decb38
parent53f3baf58256c085fa230992eb03c896276fc874 (diff)
downloadodp-api-next.tar.gz
validation: queue: pair testapi-next
Test a pair of queues with two threads. This simple multi-thread test can be executed in all enqueue/dequeue modes, also in single-producer / single-consumer mode. Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org> Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
-rw-r--r--test/validation/api/queue/queue.c218
1 files changed, 218 insertions, 0 deletions
diff --git a/test/validation/api/queue/queue.c b/test/validation/api/queue/queue.c
index 20408da..f009a24 100644
--- a/test/validation/api/queue/queue.c
+++ b/test/validation/api/queue/queue.c
@@ -25,6 +25,17 @@ typedef struct {
odp_atomic_u32_t num_event;
struct {
+ odp_queue_t queue_a;
+ odp_queue_t queue_b;
+ int passed_a;
+ int passed_b;
+ int burst;
+ odp_pool_t pool;
+ odp_barrier_t barrier;
+ odp_atomic_u32_t counter;
+ } pair;
+
+ struct {
uint32_t num_event;
} thread[ODP_THREAD_COUNT_MAX];
@@ -349,6 +360,208 @@ static void queue_test_burst_lf_spsc(void)
ODP_QUEUE_OP_MT_UNSAFE);
}
+static int queue_pair_work_loop(void *arg)
+{
+ uint32_t i, events, burst, retry, max_retry;
+ odp_buffer_t buf;
+ odp_event_t ev;
+ uint32_t *data;
+ odp_queue_t src_queue, dst_queue;
+ odp_pool_t pool;
+ int passed;
+ int thread_a;
+ test_globals_t *globals = arg;
+
+ burst = globals->pair.burst;
+ pool = globals->pair.pool;
+
+ /* Select which thread is A */
+ thread_a = odp_atomic_fetch_inc_u32(&globals->pair.counter);
+
+ if (thread_a) {
+ src_queue = globals->pair.queue_a;
+ dst_queue = globals->pair.queue_b;
+ } else {
+ src_queue = globals->pair.queue_b;
+ dst_queue = globals->pair.queue_a;
+ }
+
+ for (i = 0; i < burst; i++) {
+ buf = odp_buffer_alloc(pool);
+ CU_ASSERT(buf != ODP_BUFFER_INVALID);
+
+ if (buf == ODP_BUFFER_INVALID)
+ return -1;
+
+ data = odp_buffer_addr(buf);
+ *data = i;
+ ev = odp_buffer_to_event(buf);
+ CU_ASSERT(odp_queue_enq(dst_queue, ev) == 0);
+ }
+
+ /* Wait until both threads are ready */
+ odp_barrier_wait(&globals->pair.barrier);
+ events = 0;
+ retry = 0;
+ max_retry = 0;
+ i = 0;
+ while (events < 10000 && retry < 300) {
+ ev = odp_queue_deq(src_queue);
+ if (ev == ODP_EVENT_INVALID) {
+ retry++;
+ /* Slow down polling period after 100 retries. This
+ * gives time for the other thread to answer, if it
+ * was e.g. interrupted by the OS. We give up if
+ * the source queue stays empty for about 100ms. */
+ if (retry > 200)
+ odp_time_wait_ns(ODP_TIME_MSEC_IN_NS);
+ else if (retry > 100)
+ odp_time_wait_ns(ODP_TIME_USEC_IN_NS);
+
+ if (retry > max_retry)
+ max_retry = retry;
+
+ continue;
+ }
+
+ events++;
+ retry = 0;
+ buf = odp_buffer_from_event(ev);
+ data = odp_buffer_addr(buf);
+ CU_ASSERT(*data == i);
+ i++;
+ if (i == burst)
+ i = 0;
+
+ CU_ASSERT(odp_queue_enq(dst_queue, ev) == 0);
+ }
+
+ passed = (events == 10000);
+
+ if (thread_a) {
+ globals->pair.passed_a = passed;
+ if (max_retry > 100)
+ printf("\n thread_a max_retry %u\n", max_retry);
+ } else {
+ globals->pair.passed_b = passed;
+ if (max_retry > 100)
+ printf("\n thread_b max_retry %u\n", max_retry);
+ }
+
+ return 0;
+}
+
+static void test_pair(odp_nonblocking_t nonblocking,
+ odp_queue_op_mode_t enq_mode,
+ odp_queue_op_mode_t deq_mode)
+{
+ odp_queue_param_t param;
+ odp_queue_t queue;
+ odp_queue_capability_t capa;
+ uint32_t max_burst;
+ odp_pool_t pool;
+ odp_event_t ev;
+ odp_shm_t shm;
+ test_globals_t *globals;
+
+ shm = odp_shm_lookup(GLOBALS_NAME);
+ CU_ASSERT_FATAL(shm != ODP_SHM_INVALID);
+ globals = odp_shm_addr(shm);
+
+ CU_ASSERT_FATAL(odp_queue_capability(&capa) == 0);
+
+ max_burst = 2 * BURST_SIZE;
+
+ if (nonblocking == ODP_NONBLOCKING_LF) {
+ if (capa.plain.lockfree.max_num == 0) {
+ printf(" NO LOCKFREE QUEUES. Test skipped.\n");
+ return;
+ }
+
+ if (capa.plain.lockfree.max_size < max_burst)
+ max_burst = capa.plain.lockfree.max_size;
+ } else {
+ if (capa.plain.max_size && capa.plain.max_size < max_burst)
+ max_burst = capa.plain.max_size;
+ }
+
+ globals->pair.burst = max_burst / 2;
+
+ pool = odp_pool_lookup("msg_pool");
+ CU_ASSERT_FATAL(pool != ODP_POOL_INVALID);
+ globals->pair.pool = pool;
+
+ odp_queue_param_init(&param);
+ param.type = ODP_QUEUE_TYPE_PLAIN;
+ param.nonblocking = nonblocking;
+ param.size = max_burst;
+ param.enq_mode = enq_mode;
+ param.deq_mode = deq_mode;
+
+ queue = odp_queue_create("queue_a", &param);
+ CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID);
+ globals->pair.queue_a = queue;
+ CU_ASSERT(odp_queue_deq(queue) == ODP_EVENT_INVALID);
+
+ queue = odp_queue_create("queue_b", &param);
+ CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID);
+ globals->pair.queue_b = queue;
+ CU_ASSERT(odp_queue_deq(queue) == ODP_EVENT_INVALID);
+
+ odp_barrier_init(&globals->pair.barrier, 2);
+ globals->pair.passed_a = 0;
+ globals->pair.passed_b = 0;
+ odp_atomic_init_u32(&globals->pair.counter, 0);
+
+ /* Create one worker thread */
+ globals->cu_thr.numthrds = 1;
+ odp_cunit_thread_create(queue_pair_work_loop, (pthrd_arg *)globals);
+
+ /* Run this thread as the second thread */
+ CU_ASSERT(queue_pair_work_loop(globals) == 0);
+
+ /* Wait worker to terminate */
+ odp_cunit_thread_exit((pthrd_arg *)globals);
+
+ CU_ASSERT(globals->pair.passed_a);
+ CU_ASSERT(globals->pair.passed_b);
+
+ while ((ev = dequeue_event(globals->pair.queue_a)) != ODP_EVENT_INVALID)
+ odp_event_free(ev);
+
+ while ((ev = dequeue_event(globals->pair.queue_b)) != ODP_EVENT_INVALID)
+ odp_event_free(ev);
+
+ CU_ASSERT(odp_queue_destroy(globals->pair.queue_a) == 0);
+ CU_ASSERT(odp_queue_destroy(globals->pair.queue_b) == 0);
+}
+
+static void queue_test_pair(void)
+{
+ test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT);
+}
+
+static void queue_test_pair_spmc(void)
+{
+ test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT_UNSAFE, ODP_QUEUE_OP_MT);
+}
+
+static void queue_test_pair_mpsc(void)
+{
+ test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT_UNSAFE);
+}
+
+static void queue_test_pair_spsc(void)
+{
+ test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT_UNSAFE, ODP_QUEUE_OP_MT_UNSAFE);
+}
+
+static void queue_test_pair_lf_spsc(void)
+{
+ test_pair(ODP_NONBLOCKING_LF, ODP_QUEUE_OP_MT_UNSAFE,
+ ODP_QUEUE_OP_MT_UNSAFE);
+}
+
static void queue_test_param(void)
{
odp_queue_t queue, null_queue;
@@ -727,6 +940,11 @@ odp_testinfo_t queue_suite[] = {
ODP_TEST_INFO(queue_test_burst_lf_spmc),
ODP_TEST_INFO(queue_test_burst_lf_mpsc),
ODP_TEST_INFO(queue_test_burst_lf_spsc),
+ ODP_TEST_INFO(queue_test_pair),
+ ODP_TEST_INFO(queue_test_pair_spmc),
+ ODP_TEST_INFO(queue_test_pair_mpsc),
+ ODP_TEST_INFO(queue_test_pair_spsc),
+ ODP_TEST_INFO(queue_test_pair_lf_spsc),
ODP_TEST_INFO(queue_test_param),
ODP_TEST_INFO(queue_test_info),
ODP_TEST_INFO(queue_test_mt_plain_block),