aboutsummaryrefslogtreecommitdiff
path: root/helper/threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'helper/threads.c')
-rw-r--r--helper/threads.c578
1 files changed, 331 insertions, 247 deletions
diff --git a/helper/threads.c b/helper/threads.c
index cb747e5bf..5063725b2 100644
--- a/helper/threads.c
+++ b/helper/threads.c
@@ -1,44 +1,57 @@
-/* Copyright (c) 2013, Linaro Limited
- * All rights reserved.
- *
- * SPDX-License-Identifier: BSD-3-Clause
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2013-2018 Linaro Limited
+ * Copyright (c) 2019-2024 Nokia
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <sched.h>
+#include <stdlib.h>
+#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <sys/syscall.h>
+#include <errno.h>
+#include <limits.h>
+#include <sys/mman.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <inttypes.h>
#include <odp_api.h>
+#include <odp/helper/debug.h>
#include <odp/helper/threads.h>
-#include "odph_debug.h"
-static struct {
- int proc; /* true when process mode is required, false otherwise */
-} helper_options;
+#define FAILED_CPU -1
+
+/* Thread status codes */
+#define NOT_STARTED 0
+#define INIT_DONE 1
+#define STARTED 2
+
+static odph_helper_options_t helper_options;
/*
- * wrapper for odpthreads, either implemented as linux threads or processes.
- * (in process mode, if start_routine returns NULL, the process return FAILURE).
+ * Run a thread, either as Linux pthread or process.
+ * In process mode, if start_routine returns NULL, the process return FAILURE.
*/
-static void *_odph_thread_run_start_routine(void *arg)
+static void *run_thread(void *arg)
{
int status;
int ret;
- odph_odpthread_params_t *thr_params;
-
- odph_odpthread_start_args_t *start_args = arg;
+ odp_instance_t instance;
+ odph_thread_param_t *thr_params;
+ odph_thread_start_args_t *start_args = arg;
thr_params = &start_args->thr_params;
+ instance = start_args->instance;
/* ODP thread local init */
- if (odp_init_local(thr_params->instance, thr_params->thr_type)) {
+ if (odp_init_local(instance, thr_params->thr_type)) {
ODPH_ERR("Local init failed\n");
- if (start_args->linuxtype == ODPTHREAD_PROCESS)
+ if (start_args->mem_model == ODP_MEM_MODEL_PROCESS)
_exit(EXIT_FAILURE);
return (void *)-1;
}
@@ -46,10 +59,13 @@ static void *_odph_thread_run_start_routine(void *arg)
ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n",
thr_params->thr_type == ODP_THREAD_WORKER ?
"worker" : "control",
- (start_args->linuxtype == ODPTHREAD_PTHREAD) ?
+ (start_args->mem_model == ODP_MEM_MODEL_THREAD) ?
"pthread" : "process",
(int)getpid());
+ if (start_args->init_status)
+ odp_atomic_store_rel_u32(start_args->init_status, INIT_DONE);
+
status = thr_params->start(thr_params->arg);
ret = odp_term_local();
@@ -57,7 +73,7 @@ static void *_odph_thread_run_start_routine(void *arg)
ODPH_ERR("Local term failed\n");
/* for process implementation of odp threads, just return status... */
- if (start_args->linuxtype == ODPTHREAD_PROCESS)
+ if (start_args->mem_model == ODP_MEM_MODEL_PROCESS)
_exit(status);
/* threads implementation return void* pointers: cast status to that. */
@@ -65,11 +81,9 @@ static void *_odph_thread_run_start_routine(void *arg)
}
/*
- * Create a single ODPthread as a linux process
+ * Create a single linux process
*/
-static int _odph_linux_process_create(odph_odpthread_t *thread_tbl,
- int cpu,
- const odph_odpthread_params_t *thr_params)
+static int create_process(odph_thread_t *thread, int cpu, uint64_t stack_size)
{
cpu_set_t cpu_set;
pid_t pid;
@@ -77,20 +91,19 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl,
CPU_ZERO(&cpu_set);
CPU_SET(cpu, &cpu_set);
- thread_tbl->start_args.thr_params = *thr_params; /* copy */
- thread_tbl->start_args.linuxtype = ODPTHREAD_PROCESS;
- thread_tbl->cpu = cpu;
+ thread->start_args.mem_model = ODP_MEM_MODEL_PROCESS;
+ thread->cpu = cpu;
pid = fork();
if (pid < 0) {
ODPH_ERR("fork() failed\n");
- thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED;
+ thread->cpu = FAILED_CPU;
return -1;
}
/* Parent continues to fork */
if (pid > 0) {
- thread_tbl->proc.pid = pid;
+ thread->proc.pid = pid;
return 0;
}
@@ -107,17 +120,73 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl,
return -2;
}
- _odph_thread_run_start_routine(&thread_tbl->start_args);
+ if (stack_size) {
+ struct rlimit rlimit;
+
+ if (getrlimit(RLIMIT_STACK, &rlimit)) {
+ ODPH_ERR("getrlimit() failed: %s\n", strerror(errno));
+ return -3;
+ }
+
+ rlimit.rlim_cur = stack_size;
+
+ if (setrlimit(RLIMIT_STACK, &rlimit)) {
+ ODPH_ERR("setrlimit() failed: %s\n", strerror(errno));
+ return -4;
+ }
+ }
+
+ run_thread(&thread->start_args);
return 0; /* never reached */
}
/*
- * Create a single ODPthread as a linux thread
+ * Wait single process to exit
*/
-static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
- int cpu,
- const odph_odpthread_params_t *thr_params)
+static int wait_process(odph_thread_t *thread, odph_thread_join_result_t *res)
+{
+ pid_t pid;
+ int status = 0, estatus;
+
+ pid = waitpid(thread->proc.pid, &status, 0);
+
+ if (pid < 0) {
+ ODPH_ERR("waitpid() failed\n");
+ return -1;
+ }
+
+ /* Examine the child process' termination status */
+ if (WIFEXITED(status)) {
+ estatus = WEXITSTATUS(status);
+
+ if (res != NULL) {
+ res->is_sig = false;
+ res->ret = estatus;
+ } else if (estatus != EXIT_SUCCESS) {
+ ODPH_ERR("Child exit status:%d (pid:%d)\n", estatus, (int)pid);
+ return -1;
+ }
+ } else {
+ int signo = WTERMSIG(status);
+
+ if (res != NULL) {
+ res->is_sig = true;
+ res->ret = signo;
+ } else {
+ ODPH_ERR("Child term signo:%d - %s (pid:%d)\n", signo, strsignal(signo),
+ (int)pid);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Create a single linux pthread
+ */
+static int create_pthread(odph_thread_t *thread, int cpu, uint64_t stack_size)
{
int ret;
cpu_set_t cpu_set;
@@ -125,23 +194,40 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
CPU_ZERO(&cpu_set);
CPU_SET(cpu, &cpu_set);
- pthread_attr_init(&thread_tbl->thread.attr);
+ pthread_attr_init(&thread->thread.attr);
- thread_tbl->cpu = cpu;
+ thread->cpu = cpu;
- pthread_attr_setaffinity_np(&thread_tbl->thread.attr,
+ pthread_attr_setaffinity_np(&thread->thread.attr,
sizeof(cpu_set_t), &cpu_set);
- thread_tbl->start_args.thr_params = *thr_params; /* copy */
- thread_tbl->start_args.linuxtype = ODPTHREAD_PTHREAD;
+ if (stack_size) {
+ /*
+ * Round up to page size. "On some systems,
+ * pthread_attr_setstacksize() can fail with the error EINVAL if
+ * stacksize is not a multiple of the system page size." (man
+ * page)
+ */
+ stack_size = (stack_size + ODP_PAGE_SIZE - 1) & ~(ODP_PAGE_SIZE - 1);
+
+ if (stack_size < (uint64_t)PTHREAD_STACK_MIN)
+ stack_size = PTHREAD_STACK_MIN;
+
+ if (pthread_attr_setstacksize(&thread->thread.attr, stack_size)) {
+ ODPH_ERR("pthread_attr_setstacksize() failed\n");
+ return -1;
+ }
+ }
+
+ thread->start_args.mem_model = ODP_MEM_MODEL_THREAD;
- ret = pthread_create(&thread_tbl->thread.thread_id,
- &thread_tbl->thread.attr,
- _odph_thread_run_start_routine,
- &thread_tbl->start_args);
+ ret = pthread_create(&thread->thread.thread_id,
+ &thread->thread.attr,
+ run_thread,
+ &thread->start_args);
if (ret != 0) {
- ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
- thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED;
+ ODPH_ERR("Failed to start thread on CPU #%d: %d\n", cpu, ret);
+ thread->cpu = FAILED_CPU;
return ret;
}
@@ -149,129 +235,213 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
}
/*
- * create an odpthread set (as linux processes or linux threads or both)
+ * Wait single pthread to exit
*/
-int odph_odpthreads_create(odph_odpthread_t *thread_tbl,
- const odp_cpumask_t *mask,
- const odph_odpthread_params_t *thr_params)
+static int wait_pthread(odph_thread_t *thread, odph_thread_join_result_t *res)
{
- int i;
- int num;
- int cpu_count;
- int cpu;
+ int ret;
+ void *thread_ret = NULL;
+
+ /* Wait thread to exit */
+ ret = pthread_join(thread->thread.thread_id, &thread_ret);
+
+ if (ret) {
+ ODPH_ERR("pthread_join failed (%i) from cpu #%i\n",
+ ret, thread->cpu);
+ return -1;
+ }
- num = odp_cpumask_count(mask);
+ if (res != NULL) {
+ res->is_sig = false;
+ res->ret = (int)(intptr_t)thread_ret;
+ } else if (thread_ret) {
+ ODPH_ERR("Bad exit status cpu #%i %p\n", thread->cpu, thread_ret);
+ return -1;
+ }
- memset(thread_tbl, 0, num * sizeof(odph_odpthread_t));
+ ret = pthread_attr_destroy(&thread->thread.attr);
- cpu_count = odp_cpu_count();
+ if (ret) {
+ ODPH_ERR("pthread_attr_destroy failed (%i) from cpu #%i\n",
+ ret, thread->cpu);
+
+ if (res == NULL)
+ return -1;
+ }
+
+ return 0;
+}
- if (num < 1 || num > cpu_count) {
- ODPH_ERR("Invalid number of odpthreads:%d"
- " (%d cores available)\n",
- num, cpu_count);
+void odph_thread_param_init(odph_thread_param_t *param)
+{
+ memset(param, 0, sizeof(*param));
+}
+
+void odph_thread_common_param_init(odph_thread_common_param_t *param)
+{
+ memset(param, 0, sizeof(*param));
+ param->sync_timeout = ODP_TIME_SEC_IN_NS;
+}
+
+int odph_thread_create(odph_thread_t thread[],
+ const odph_thread_common_param_t *param,
+ const odph_thread_param_t thr_param[],
+ int num)
+{
+ int i, num_cpu, cpu;
+ const odp_cpumask_t *cpumask = param->cpumask;
+ int use_pthread = 1;
+ odp_atomic_u32_t *init_status = NULL;
+
+ if (param->thread_model == 1)
+ use_pthread = 0;
+
+ if (helper_options.mem_model == ODP_MEM_MODEL_PROCESS)
+ use_pthread = 0;
+
+ if (num < 1) {
+ ODPH_ERR("Bad number of threads (%i)\n", num);
+ return -1;
+ }
+
+ num_cpu = odp_cpumask_count(cpumask);
+
+ if (num_cpu != num) {
+ ODPH_ERR("Number of threads (%i) and CPUs (%i) does not match"
+ "\n", num, num_cpu);
return -1;
}
- cpu = odp_cpumask_first(mask);
+ memset(thread, 0, num * sizeof(odph_thread_t));
+
+ if (param->sync) {
+ init_status = mmap(NULL, sizeof(odp_atomic_u32_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+
+ if (init_status == MAP_FAILED) {
+ ODPH_ERR("mmap() failed: %s\n", strerror(errno));
+ return -1;
+ }
+ }
+
+ cpu = odp_cpumask_first(cpumask);
for (i = 0; i < num; i++) {
- if (!helper_options.proc) {
- if (odph_linux_thread_create(&thread_tbl[i],
- cpu,
- thr_params))
+ odph_thread_start_args_t *start_args = &thread[i].start_args;
+
+ /* Copy thread parameters */
+ if (param->share_param)
+ start_args->thr_params = thr_param[0];
+ else
+ start_args->thr_params = thr_param[i];
+
+ start_args->instance = param->instance;
+ start_args->status = NOT_STARTED;
+ start_args->init_status = init_status;
+ if (init_status)
+ odp_atomic_init_u32(init_status, NOT_STARTED);
+
+ if (use_pthread) {
+ if (create_pthread(&thread[i], cpu, start_args->thr_params.stack_size))
break;
- } else {
- if (_odph_linux_process_create(&thread_tbl[i],
- cpu,
- thr_params))
+ } else {
+ if (create_process(&thread[i], cpu, start_args->thr_params.stack_size))
break;
}
- cpu = odp_cpumask_next(mask, cpu);
+ /* Wait newly created thread to update status */
+ if (init_status) {
+ odp_time_t t1, t2;
+ uint64_t diff_ns;
+ uint32_t status;
+ int timeout = 0;
+ uint64_t timeout_ns = param->sync_timeout;
+
+ if (!timeout_ns)
+ timeout_ns = ODP_TIME_SEC_IN_NS;
+
+ t1 = odp_time_local();
+
+ do {
+ odp_cpu_pause();
+ t2 = odp_time_local();
+ diff_ns = odp_time_diff_ns(t2, t1);
+ timeout = diff_ns > timeout_ns;
+ status = odp_atomic_load_acq_u32(init_status);
+
+ } while (status != INIT_DONE && timeout == 0);
+
+ if (timeout) {
+ ODPH_ERR("Thread (i:%i) start up timeout: sync timeout %" PRIu64 ""
+ " , t1 %" PRIu64 ", t2 %" PRIu64 "\n", i,
+ param->sync_timeout, odp_time_to_ns(t1),
+ odp_time_to_ns(t2));
+ break;
+ }
+ }
+
+ start_args->status = STARTED;
+
+ cpu = odp_cpumask_next(cpumask, cpu);
+ }
+
+ if (init_status) {
+ if (munmap(init_status, sizeof(odp_atomic_u32_t)))
+ ODPH_ERR("munmap() failed: %s\n", strerror(errno));
}
- thread_tbl[num - 1].last = 1;
return i;
}
-/*
- * wait for the odpthreads termination (linux processes and threads)
- */
-int odph_odpthreads_join(odph_odpthread_t *thread_tbl)
+static int join_threads(odph_thread_t thread[], odph_thread_join_result_t res[], int num)
{
- pid_t pid;
- int i = 0;
- int terminated = 0;
- /* child process return code (!=0 is error) */
- int status = 0;
- /* "child" thread return code (!NULL is error) */
- void *thread_ret = NULL;
- int ret;
- int retval = 0;
-
- /* joins linux threads or wait for processes */
- do {
- /* pthreads: */
- switch (thread_tbl[i].start_args.linuxtype) {
- case ODPTHREAD_PTHREAD:
- /* Wait thread to exit */
- ret = pthread_join(thread_tbl[i].thread.thread_id,
- &thread_ret);
- if (ret != 0) {
- ODPH_ERR("Failed to join thread from cpu #%d\n",
- thread_tbl[i].cpu);
- retval = -1;
- } else {
- terminated++;
- if (thread_ret != NULL) {
- ODPH_ERR("Bad exit status cpu #%d %p\n",
- thread_tbl[i].cpu, thread_ret);
- retval = -1;
- }
- }
- pthread_attr_destroy(&thread_tbl[i].thread.attr);
- break;
+ odph_thread_start_args_t *start_args;
+ int i;
- case ODPTHREAD_PROCESS:
+ for (i = 0; i < num; i++) {
+ start_args = &thread[i].start_args;
- /* processes: */
- pid = waitpid(thread_tbl[i].proc.pid, &status, 0);
+ if (start_args->status != STARTED) {
+ ODPH_ERR("Thread (i:%i) not started.\n", i);
+ break;
+ }
- if (pid < 0) {
- ODPH_ERR("waitpid() failed\n");
- retval = -1;
+ if (thread[i].start_args.mem_model == ODP_MEM_MODEL_THREAD) {
+ if (wait_pthread(&thread[i], res != NULL ? &res[i] : NULL))
break;
- }
+ } else {
+ if (wait_process(&thread[i], res != NULL ? &res[i] : NULL))
+ break;
+ }
- terminated++;
+ start_args->status = NOT_STARTED;
+ }
- /* Examine the child process' termination status */
- if (WIFEXITED(status) &&
- WEXITSTATUS(status) != EXIT_SUCCESS) {
- ODPH_ERR("Child exit status:%d (pid:%d)\n",
- WEXITSTATUS(status), (int)pid);
- retval = -1;
- }
- if (WIFSIGNALED(status)) {
- int signo = WTERMSIG(status);
+ return i;
+}
- ODPH_ERR("Child term signo:%d - %s (pid:%d)\n",
- signo, strsignal(signo), (int)pid);
- retval = -1;
- }
- break;
+int odph_thread_join(odph_thread_t thread[], int num)
+{
+ if (thread == NULL) {
+ ODPH_ERR("Bad thread table pointer\n");
+ return -1;
+ }
- case ODPTHREAD_NOT_STARTED:
- ODPH_DBG("No join done on not started ODPthread.\n");
- break;
- default:
- ODPH_DBG("Invalid case statement value!\n");
- break;
- }
+ return join_threads(thread, NULL, num);
+}
- } while (!thread_tbl[i++].last);
+int odph_thread_join_result(odph_thread_t thread[], odph_thread_join_result_t res[], int num)
+{
+ if (thread == NULL) {
+ ODPH_ERR("Bad thread table pointer\n");
+ return -1;
+ }
- return (retval < 0) ? retval : terminated;
+ if (res == NULL) {
+ ODPH_ERR("Bad result table pointer\n");
+ return -1;
+ }
+
+ return join_threads(thread, res, num);
}
/* man gettid() notes:
@@ -327,129 +497,43 @@ int odph_odpthread_getaffinity(void)
return -1;
}
-/*
- * return the number of elements in an array of getopt options, excluding the
- * terminating {0,0,0,0}
- */
-static int get_getopt_options_length(const struct option *longopts)
+int odph_parse_options(int argc, char *argv[])
{
- int l = 0;
+ char *env;
+ int i, j;
- if (!longopts)
- return 0;
+ helper_options.mem_model = ODP_MEM_MODEL_THREAD;
- while (longopts[l].name)
- l++;
+ /* Enable process mode using environment variable. Setting environment
+ * variable is easier for CI testing compared to command line
+ * argument. */
+ env = getenv("ODPH_PROC_MODE");
+ if (env && atoi(env))
+ helper_options.mem_model = ODP_MEM_MODEL_PROCESS;
- return l;
-}
+ /* Find and remove option */
+ for (i = 0; i < argc;) {
+ if (strcmp(argv[i], "--odph_proc") == 0) {
+ helper_options.mem_model = ODP_MEM_MODEL_PROCESS;
-/* Merge getopt options */
-int odph_merge_getopt_options(const char *shortopts1,
- const char *shortopts2,
- const struct option *longopts1,
- const struct option *longopts2,
- char **shortopts,
- struct option **longopts)
-{
- int shortopts1_len;
- int shortopts2_len;
- int longopts1_len;
- int longopts2_len;
- int index;
- int res_index = 0;
- struct option termination = {0, 0, 0, 0};
-
- /* merge short options: */
- if (shortopts) {
- shortopts1_len = (shortopts1) ? strlen(shortopts1) : 0;
- shortopts2_len = (shortopts2) ? strlen(shortopts2) : 0;
- *shortopts = malloc(shortopts1_len + shortopts2_len + 1);
- if (!*shortopts)
- return -1;
+ for (j = i; j < argc - 1; j++)
+ argv[j] = argv[j + 1];
- (*shortopts)[0] = 0;
-
- if (shortopts1)
- strcpy((*shortopts), shortopts1);
- if (shortopts2)
- strcat((*shortopts), shortopts2);
- }
-
- /* merge long options */
- if (!longopts)
- return 0;
+ argc--;
+ continue;
+ }
- longopts1_len = get_getopt_options_length(longopts1);
- longopts2_len = get_getopt_options_length(longopts2);
- *longopts = malloc(sizeof(struct option) *
- (longopts1_len + longopts2_len + 1));
- if (!*longopts) {
- if (shortopts)
- free(*shortopts);
- return -1;
+ i++;
}
- for (index = 0; (longopts1) && (longopts1[index].name); index++)
- (*longopts)[res_index++] = longopts1[index];
-
- for (index = 0; (longopts2) && (longopts2[index].name); index++)
- (*longopts)[res_index++] = longopts2[index];
-
- (*longopts)[res_index] = termination;
-
- return 0;
+ return argc;
}
-/*
- * Parse command line options to extract options affecting helpers.
- */
-int odph_parse_options(int argc, char *argv[],
- const char *caller_shortopts,
- const struct option *caller_longopts)
+int odph_options(odph_helper_options_t *options)
{
- int c;
- char *shortopts;
- struct option *longopts;
- int res = 0;
-
- static struct option helper_long_options[] = {
- /* These options set a flag. */
- {"odph_proc", no_argument, &helper_options.proc, 1},
- {0, 0, 0, 0}
- };
-
- static const char *helper_short_options = "";
-
- /* defaults: */
- helper_options.proc = false;
-
- /* merge caller's command line options descriptions with helper's: */
- if (odph_merge_getopt_options(caller_shortopts, helper_short_options,
- caller_longopts, helper_long_options,
- &shortopts, &longopts) < 0)
- return -1;
+ memset(options, 0, sizeof(odph_helper_options_t));
- while (1) {
- /* getopt_long stores the option index here. */
- int option_index = 0;
+ options->mem_model = helper_options.mem_model;
- c = getopt_long (argc, argv,
- shortopts, longopts, &option_index);
-
- /* Detect the end of the options. */
- if (c == -1)
- break;
-
- /* check for unknown options or missing arguments */
- if (c == '?' || c == ':')
- res = -1;
- }
-
- optind = 0; /* caller expects this to be zero if it parses too*/
-
- free(shortopts);
- free(longopts);
-
- return res;
+ return 0;
}