diff options
Diffstat (limited to 'helper/threads.c')
-rw-r--r-- | helper/threads.c | 578 |
1 files changed, 331 insertions, 247 deletions
diff --git a/helper/threads.c b/helper/threads.c index cb747e5bf..5063725b2 100644 --- a/helper/threads.c +++ b/helper/threads.c @@ -1,44 +1,57 @@ -/* Copyright (c) 2013, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (c) 2013-2018 Linaro Limited + * Copyright (c) 2019-2024 Nokia */ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <sched.h> +#include <stdlib.h> +#include <string.h> #include <unistd.h> #include <sys/wait.h> #include <sys/prctl.h> #include <sys/syscall.h> +#include <errno.h> +#include <limits.h> +#include <sys/mman.h> +#include <sys/time.h> +#include <sys/resource.h> +#include <inttypes.h> #include <odp_api.h> +#include <odp/helper/debug.h> #include <odp/helper/threads.h> -#include "odph_debug.h" -static struct { - int proc; /* true when process mode is required, false otherwise */ -} helper_options; +#define FAILED_CPU -1 + +/* Thread status codes */ +#define NOT_STARTED 0 +#define INIT_DONE 1 +#define STARTED 2 + +static odph_helper_options_t helper_options; /* - * wrapper for odpthreads, either implemented as linux threads or processes. - * (in process mode, if start_routine returns NULL, the process return FAILURE). + * Run a thread, either as Linux pthread or process. + * In process mode, if start_routine returns NULL, the process return FAILURE. */ -static void *_odph_thread_run_start_routine(void *arg) +static void *run_thread(void *arg) { int status; int ret; - odph_odpthread_params_t *thr_params; - - odph_odpthread_start_args_t *start_args = arg; + odp_instance_t instance; + odph_thread_param_t *thr_params; + odph_thread_start_args_t *start_args = arg; thr_params = &start_args->thr_params; + instance = start_args->instance; /* ODP thread local init */ - if (odp_init_local(thr_params->instance, thr_params->thr_type)) { + if (odp_init_local(instance, thr_params->thr_type)) { ODPH_ERR("Local init failed\n"); - if (start_args->linuxtype == ODPTHREAD_PROCESS) + if (start_args->mem_model == ODP_MEM_MODEL_PROCESS) _exit(EXIT_FAILURE); return (void *)-1; } @@ -46,10 +59,13 @@ static void *_odph_thread_run_start_routine(void *arg) ODPH_DBG("helper: ODP %s thread started as linux %s. (pid=%d)\n", thr_params->thr_type == ODP_THREAD_WORKER ? "worker" : "control", - (start_args->linuxtype == ODPTHREAD_PTHREAD) ? + (start_args->mem_model == ODP_MEM_MODEL_THREAD) ? "pthread" : "process", (int)getpid()); + if (start_args->init_status) + odp_atomic_store_rel_u32(start_args->init_status, INIT_DONE); + status = thr_params->start(thr_params->arg); ret = odp_term_local(); @@ -57,7 +73,7 @@ static void *_odph_thread_run_start_routine(void *arg) ODPH_ERR("Local term failed\n"); /* for process implementation of odp threads, just return status... */ - if (start_args->linuxtype == ODPTHREAD_PROCESS) + if (start_args->mem_model == ODP_MEM_MODEL_PROCESS) _exit(status); /* threads implementation return void* pointers: cast status to that. */ @@ -65,11 +81,9 @@ static void *_odph_thread_run_start_routine(void *arg) } /* - * Create a single ODPthread as a linux process + * Create a single linux process */ -static int _odph_linux_process_create(odph_odpthread_t *thread_tbl, - int cpu, - const odph_odpthread_params_t *thr_params) +static int create_process(odph_thread_t *thread, int cpu, uint64_t stack_size) { cpu_set_t cpu_set; pid_t pid; @@ -77,20 +91,19 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl, CPU_ZERO(&cpu_set); CPU_SET(cpu, &cpu_set); - thread_tbl->start_args.thr_params = *thr_params; /* copy */ - thread_tbl->start_args.linuxtype = ODPTHREAD_PROCESS; - thread_tbl->cpu = cpu; + thread->start_args.mem_model = ODP_MEM_MODEL_PROCESS; + thread->cpu = cpu; pid = fork(); if (pid < 0) { ODPH_ERR("fork() failed\n"); - thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED; + thread->cpu = FAILED_CPU; return -1; } /* Parent continues to fork */ if (pid > 0) { - thread_tbl->proc.pid = pid; + thread->proc.pid = pid; return 0; } @@ -107,17 +120,73 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl, return -2; } - _odph_thread_run_start_routine(&thread_tbl->start_args); + if (stack_size) { + struct rlimit rlimit; + + if (getrlimit(RLIMIT_STACK, &rlimit)) { + ODPH_ERR("getrlimit() failed: %s\n", strerror(errno)); + return -3; + } + + rlimit.rlim_cur = stack_size; + + if (setrlimit(RLIMIT_STACK, &rlimit)) { + ODPH_ERR("setrlimit() failed: %s\n", strerror(errno)); + return -4; + } + } + + run_thread(&thread->start_args); return 0; /* never reached */ } /* - * Create a single ODPthread as a linux thread + * Wait single process to exit */ -static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, - int cpu, - const odph_odpthread_params_t *thr_params) +static int wait_process(odph_thread_t *thread, odph_thread_join_result_t *res) +{ + pid_t pid; + int status = 0, estatus; + + pid = waitpid(thread->proc.pid, &status, 0); + + if (pid < 0) { + ODPH_ERR("waitpid() failed\n"); + return -1; + } + + /* Examine the child process' termination status */ + if (WIFEXITED(status)) { + estatus = WEXITSTATUS(status); + + if (res != NULL) { + res->is_sig = false; + res->ret = estatus; + } else if (estatus != EXIT_SUCCESS) { + ODPH_ERR("Child exit status:%d (pid:%d)\n", estatus, (int)pid); + return -1; + } + } else { + int signo = WTERMSIG(status); + + if (res != NULL) { + res->is_sig = true; + res->ret = signo; + } else { + ODPH_ERR("Child term signo:%d - %s (pid:%d)\n", signo, strsignal(signo), + (int)pid); + return -1; + } + } + + return 0; +} + +/* + * Create a single linux pthread + */ +static int create_pthread(odph_thread_t *thread, int cpu, uint64_t stack_size) { int ret; cpu_set_t cpu_set; @@ -125,23 +194,40 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, CPU_ZERO(&cpu_set); CPU_SET(cpu, &cpu_set); - pthread_attr_init(&thread_tbl->thread.attr); + pthread_attr_init(&thread->thread.attr); - thread_tbl->cpu = cpu; + thread->cpu = cpu; - pthread_attr_setaffinity_np(&thread_tbl->thread.attr, + pthread_attr_setaffinity_np(&thread->thread.attr, sizeof(cpu_set_t), &cpu_set); - thread_tbl->start_args.thr_params = *thr_params; /* copy */ - thread_tbl->start_args.linuxtype = ODPTHREAD_PTHREAD; + if (stack_size) { + /* + * Round up to page size. "On some systems, + * pthread_attr_setstacksize() can fail with the error EINVAL if + * stacksize is not a multiple of the system page size." (man + * page) + */ + stack_size = (stack_size + ODP_PAGE_SIZE - 1) & ~(ODP_PAGE_SIZE - 1); + + if (stack_size < (uint64_t)PTHREAD_STACK_MIN) + stack_size = PTHREAD_STACK_MIN; + + if (pthread_attr_setstacksize(&thread->thread.attr, stack_size)) { + ODPH_ERR("pthread_attr_setstacksize() failed\n"); + return -1; + } + } + + thread->start_args.mem_model = ODP_MEM_MODEL_THREAD; - ret = pthread_create(&thread_tbl->thread.thread_id, - &thread_tbl->thread.attr, - _odph_thread_run_start_routine, - &thread_tbl->start_args); + ret = pthread_create(&thread->thread.thread_id, + &thread->thread.attr, + run_thread, + &thread->start_args); if (ret != 0) { - ODPH_ERR("Failed to start thread on cpu #%d\n", cpu); - thread_tbl->start_args.linuxtype = ODPTHREAD_NOT_STARTED; + ODPH_ERR("Failed to start thread on CPU #%d: %d\n", cpu, ret); + thread->cpu = FAILED_CPU; return ret; } @@ -149,129 +235,213 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, } /* - * create an odpthread set (as linux processes or linux threads or both) + * Wait single pthread to exit */ -int odph_odpthreads_create(odph_odpthread_t *thread_tbl, - const odp_cpumask_t *mask, - const odph_odpthread_params_t *thr_params) +static int wait_pthread(odph_thread_t *thread, odph_thread_join_result_t *res) { - int i; - int num; - int cpu_count; - int cpu; + int ret; + void *thread_ret = NULL; + + /* Wait thread to exit */ + ret = pthread_join(thread->thread.thread_id, &thread_ret); + + if (ret) { + ODPH_ERR("pthread_join failed (%i) from cpu #%i\n", + ret, thread->cpu); + return -1; + } - num = odp_cpumask_count(mask); + if (res != NULL) { + res->is_sig = false; + res->ret = (int)(intptr_t)thread_ret; + } else if (thread_ret) { + ODPH_ERR("Bad exit status cpu #%i %p\n", thread->cpu, thread_ret); + return -1; + } - memset(thread_tbl, 0, num * sizeof(odph_odpthread_t)); + ret = pthread_attr_destroy(&thread->thread.attr); - cpu_count = odp_cpu_count(); + if (ret) { + ODPH_ERR("pthread_attr_destroy failed (%i) from cpu #%i\n", + ret, thread->cpu); + + if (res == NULL) + return -1; + } + + return 0; +} - if (num < 1 || num > cpu_count) { - ODPH_ERR("Invalid number of odpthreads:%d" - " (%d cores available)\n", - num, cpu_count); +void odph_thread_param_init(odph_thread_param_t *param) +{ + memset(param, 0, sizeof(*param)); +} + +void odph_thread_common_param_init(odph_thread_common_param_t *param) +{ + memset(param, 0, sizeof(*param)); + param->sync_timeout = ODP_TIME_SEC_IN_NS; +} + +int odph_thread_create(odph_thread_t thread[], + const odph_thread_common_param_t *param, + const odph_thread_param_t thr_param[], + int num) +{ + int i, num_cpu, cpu; + const odp_cpumask_t *cpumask = param->cpumask; + int use_pthread = 1; + odp_atomic_u32_t *init_status = NULL; + + if (param->thread_model == 1) + use_pthread = 0; + + if (helper_options.mem_model == ODP_MEM_MODEL_PROCESS) + use_pthread = 0; + + if (num < 1) { + ODPH_ERR("Bad number of threads (%i)\n", num); + return -1; + } + + num_cpu = odp_cpumask_count(cpumask); + + if (num_cpu != num) { + ODPH_ERR("Number of threads (%i) and CPUs (%i) does not match" + "\n", num, num_cpu); return -1; } - cpu = odp_cpumask_first(mask); + memset(thread, 0, num * sizeof(odph_thread_t)); + + if (param->sync) { + init_status = mmap(NULL, sizeof(odp_atomic_u32_t), PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + if (init_status == MAP_FAILED) { + ODPH_ERR("mmap() failed: %s\n", strerror(errno)); + return -1; + } + } + + cpu = odp_cpumask_first(cpumask); for (i = 0; i < num; i++) { - if (!helper_options.proc) { - if (odph_linux_thread_create(&thread_tbl[i], - cpu, - thr_params)) + odph_thread_start_args_t *start_args = &thread[i].start_args; + + /* Copy thread parameters */ + if (param->share_param) + start_args->thr_params = thr_param[0]; + else + start_args->thr_params = thr_param[i]; + + start_args->instance = param->instance; + start_args->status = NOT_STARTED; + start_args->init_status = init_status; + if (init_status) + odp_atomic_init_u32(init_status, NOT_STARTED); + + if (use_pthread) { + if (create_pthread(&thread[i], cpu, start_args->thr_params.stack_size)) break; - } else { - if (_odph_linux_process_create(&thread_tbl[i], - cpu, - thr_params)) + } else { + if (create_process(&thread[i], cpu, start_args->thr_params.stack_size)) break; } - cpu = odp_cpumask_next(mask, cpu); + /* Wait newly created thread to update status */ + if (init_status) { + odp_time_t t1, t2; + uint64_t diff_ns; + uint32_t status; + int timeout = 0; + uint64_t timeout_ns = param->sync_timeout; + + if (!timeout_ns) + timeout_ns = ODP_TIME_SEC_IN_NS; + + t1 = odp_time_local(); + + do { + odp_cpu_pause(); + t2 = odp_time_local(); + diff_ns = odp_time_diff_ns(t2, t1); + timeout = diff_ns > timeout_ns; + status = odp_atomic_load_acq_u32(init_status); + + } while (status != INIT_DONE && timeout == 0); + + if (timeout) { + ODPH_ERR("Thread (i:%i) start up timeout: sync timeout %" PRIu64 "" + " , t1 %" PRIu64 ", t2 %" PRIu64 "\n", i, + param->sync_timeout, odp_time_to_ns(t1), + odp_time_to_ns(t2)); + break; + } + } + + start_args->status = STARTED; + + cpu = odp_cpumask_next(cpumask, cpu); + } + + if (init_status) { + if (munmap(init_status, sizeof(odp_atomic_u32_t))) + ODPH_ERR("munmap() failed: %s\n", strerror(errno)); } - thread_tbl[num - 1].last = 1; return i; } -/* - * wait for the odpthreads termination (linux processes and threads) - */ -int odph_odpthreads_join(odph_odpthread_t *thread_tbl) +static int join_threads(odph_thread_t thread[], odph_thread_join_result_t res[], int num) { - pid_t pid; - int i = 0; - int terminated = 0; - /* child process return code (!=0 is error) */ - int status = 0; - /* "child" thread return code (!NULL is error) */ - void *thread_ret = NULL; - int ret; - int retval = 0; - - /* joins linux threads or wait for processes */ - do { - /* pthreads: */ - switch (thread_tbl[i].start_args.linuxtype) { - case ODPTHREAD_PTHREAD: - /* Wait thread to exit */ - ret = pthread_join(thread_tbl[i].thread.thread_id, - &thread_ret); - if (ret != 0) { - ODPH_ERR("Failed to join thread from cpu #%d\n", - thread_tbl[i].cpu); - retval = -1; - } else { - terminated++; - if (thread_ret != NULL) { - ODPH_ERR("Bad exit status cpu #%d %p\n", - thread_tbl[i].cpu, thread_ret); - retval = -1; - } - } - pthread_attr_destroy(&thread_tbl[i].thread.attr); - break; + odph_thread_start_args_t *start_args; + int i; - case ODPTHREAD_PROCESS: + for (i = 0; i < num; i++) { + start_args = &thread[i].start_args; - /* processes: */ - pid = waitpid(thread_tbl[i].proc.pid, &status, 0); + if (start_args->status != STARTED) { + ODPH_ERR("Thread (i:%i) not started.\n", i); + break; + } - if (pid < 0) { - ODPH_ERR("waitpid() failed\n"); - retval = -1; + if (thread[i].start_args.mem_model == ODP_MEM_MODEL_THREAD) { + if (wait_pthread(&thread[i], res != NULL ? &res[i] : NULL)) break; - } + } else { + if (wait_process(&thread[i], res != NULL ? &res[i] : NULL)) + break; + } - terminated++; + start_args->status = NOT_STARTED; + } - /* Examine the child process' termination status */ - if (WIFEXITED(status) && - WEXITSTATUS(status) != EXIT_SUCCESS) { - ODPH_ERR("Child exit status:%d (pid:%d)\n", - WEXITSTATUS(status), (int)pid); - retval = -1; - } - if (WIFSIGNALED(status)) { - int signo = WTERMSIG(status); + return i; +} - ODPH_ERR("Child term signo:%d - %s (pid:%d)\n", - signo, strsignal(signo), (int)pid); - retval = -1; - } - break; +int odph_thread_join(odph_thread_t thread[], int num) +{ + if (thread == NULL) { + ODPH_ERR("Bad thread table pointer\n"); + return -1; + } - case ODPTHREAD_NOT_STARTED: - ODPH_DBG("No join done on not started ODPthread.\n"); - break; - default: - ODPH_DBG("Invalid case statement value!\n"); - break; - } + return join_threads(thread, NULL, num); +} - } while (!thread_tbl[i++].last); +int odph_thread_join_result(odph_thread_t thread[], odph_thread_join_result_t res[], int num) +{ + if (thread == NULL) { + ODPH_ERR("Bad thread table pointer\n"); + return -1; + } - return (retval < 0) ? retval : terminated; + if (res == NULL) { + ODPH_ERR("Bad result table pointer\n"); + return -1; + } + + return join_threads(thread, res, num); } /* man gettid() notes: @@ -327,129 +497,43 @@ int odph_odpthread_getaffinity(void) return -1; } -/* - * return the number of elements in an array of getopt options, excluding the - * terminating {0,0,0,0} - */ -static int get_getopt_options_length(const struct option *longopts) +int odph_parse_options(int argc, char *argv[]) { - int l = 0; + char *env; + int i, j; - if (!longopts) - return 0; + helper_options.mem_model = ODP_MEM_MODEL_THREAD; - while (longopts[l].name) - l++; + /* Enable process mode using environment variable. Setting environment + * variable is easier for CI testing compared to command line + * argument. */ + env = getenv("ODPH_PROC_MODE"); + if (env && atoi(env)) + helper_options.mem_model = ODP_MEM_MODEL_PROCESS; - return l; -} + /* Find and remove option */ + for (i = 0; i < argc;) { + if (strcmp(argv[i], "--odph_proc") == 0) { + helper_options.mem_model = ODP_MEM_MODEL_PROCESS; -/* Merge getopt options */ -int odph_merge_getopt_options(const char *shortopts1, - const char *shortopts2, - const struct option *longopts1, - const struct option *longopts2, - char **shortopts, - struct option **longopts) -{ - int shortopts1_len; - int shortopts2_len; - int longopts1_len; - int longopts2_len; - int index; - int res_index = 0; - struct option termination = {0, 0, 0, 0}; - - /* merge short options: */ - if (shortopts) { - shortopts1_len = (shortopts1) ? strlen(shortopts1) : 0; - shortopts2_len = (shortopts2) ? strlen(shortopts2) : 0; - *shortopts = malloc(shortopts1_len + shortopts2_len + 1); - if (!*shortopts) - return -1; + for (j = i; j < argc - 1; j++) + argv[j] = argv[j + 1]; - (*shortopts)[0] = 0; - - if (shortopts1) - strcpy((*shortopts), shortopts1); - if (shortopts2) - strcat((*shortopts), shortopts2); - } - - /* merge long options */ - if (!longopts) - return 0; + argc--; + continue; + } - longopts1_len = get_getopt_options_length(longopts1); - longopts2_len = get_getopt_options_length(longopts2); - *longopts = malloc(sizeof(struct option) * - (longopts1_len + longopts2_len + 1)); - if (!*longopts) { - if (shortopts) - free(*shortopts); - return -1; + i++; } - for (index = 0; (longopts1) && (longopts1[index].name); index++) - (*longopts)[res_index++] = longopts1[index]; - - for (index = 0; (longopts2) && (longopts2[index].name); index++) - (*longopts)[res_index++] = longopts2[index]; - - (*longopts)[res_index] = termination; - - return 0; + return argc; } -/* - * Parse command line options to extract options affecting helpers. - */ -int odph_parse_options(int argc, char *argv[], - const char *caller_shortopts, - const struct option *caller_longopts) +int odph_options(odph_helper_options_t *options) { - int c; - char *shortopts; - struct option *longopts; - int res = 0; - - static struct option helper_long_options[] = { - /* These options set a flag. */ - {"odph_proc", no_argument, &helper_options.proc, 1}, - {0, 0, 0, 0} - }; - - static const char *helper_short_options = ""; - - /* defaults: */ - helper_options.proc = false; - - /* merge caller's command line options descriptions with helper's: */ - if (odph_merge_getopt_options(caller_shortopts, helper_short_options, - caller_longopts, helper_long_options, - &shortopts, &longopts) < 0) - return -1; + memset(options, 0, sizeof(odph_helper_options_t)); - while (1) { - /* getopt_long stores the option index here. */ - int option_index = 0; + options->mem_model = helper_options.mem_model; - c = getopt_long (argc, argv, - shortopts, longopts, &option_index); - - /* Detect the end of the options. */ - if (c == -1) - break; - - /* check for unknown options or missing arguments */ - if (c == '?' || c == ':') - res = -1; - } - - optind = 0; /* caller expects this to be zero if it parses too*/ - - free(shortopts); - free(longopts); - - return res; + return 0; } |