aboutsummaryrefslogtreecommitdiff
path: root/helper/linux.c
blob: a2eacc199f041d8e7a5fef310753f1a93c2e0f0d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
/* Copyright (c) 2013, Linaro Limited
 * All rights reserved.
 *
 * SPDX-License-Identifier:     BSD-3-Clause
 */

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <sched.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

#include <stdlib.h>
#include <string.h>
#include <stdio.h>

#include <odp/helper/linux.h>
#include <odp/thread.h>
#include <odp/init.h>
#include <odp/system_info.h>
#include "odph_debug.h"

int odph_linux_cpumask_default(odp_cpumask_t *mask, int num)
{
	int ret, cpu, i;
	cpu_set_t cpuset;

	ret = pthread_getaffinity_np(pthread_self(),
				     sizeof(cpu_set_t), &cpuset);
	if (ret != 0)
		ODPH_ABORT("failed to read CPU affinity value\n");

	odp_cpumask_zero(mask);

	/*
	 * If no user supplied number or it's too large, then attempt
	 * to use all CPUs
	 */
	if (0 == num || CPU_SETSIZE < num)
		num = CPU_COUNT(&cpuset);

	/* build the mask, allocating down from highest numbered CPU */
	for (cpu = 0, i = CPU_SETSIZE-1; i >= 0 && cpu < num; --i) {
		if (CPU_ISSET(i, &cpuset)) {
			odp_cpumask_set(mask, i);
			cpu++;
		}
	}

	return cpu;
}


static void *odp_run_start_routine(void *arg)
{
	odp_start_args_t *start_args = arg;

	/* ODP thread local init */
	if (odp_init_local(ODP_THREAD_WORKER)) {
		ODPH_ERR("Local init failed\n");
		return NULL;
	}

	void *ret_ptr = start_args->start_routine(start_args->arg);
	int ret = odp_term_local();
	if (ret < 0)
		ODPH_ERR("Local term failed\n");
	else if (ret == 0 && odp_term_global())
		ODPH_ERR("Global term failed\n");

	return ret_ptr;
}


int odph_linux_pthread_create(odph_linux_pthread_t *thread_tbl,
			       const odp_cpumask_t *mask_in,
			       void *(*start_routine) (void *), void *arg)
{
	int i;
	int num;
	odp_cpumask_t mask;
	int cpu_count;
	int cpu;
	int ret;

	odp_cpumask_copy(&mask, mask_in);
	num = odp_cpumask_count(&mask);

	memset(thread_tbl, 0, num * sizeof(odph_linux_pthread_t));

	cpu_count = odp_cpu_count();

	if (num < 1 || num > cpu_count) {
		ODPH_ERR("Invalid number of threads: %d (%d cores available)\n",
			 num, cpu_count);
		return 0;
	}

	cpu = odp_cpumask_first(&mask);
	for (i = 0; i < num; i++) {
		odp_cpumask_t thd_mask;

		odp_cpumask_zero(&thd_mask);
		odp_cpumask_set(&thd_mask, cpu);

		pthread_attr_init(&thread_tbl[i].attr);

		thread_tbl[i].cpu = cpu;

		pthread_attr_setaffinity_np(&thread_tbl[i].attr,
					    sizeof(cpu_set_t), &thd_mask.set);

		thread_tbl[i].start_args = malloc(sizeof(odp_start_args_t));
		if (thread_tbl[i].start_args == NULL)
			ODPH_ABORT("Malloc failed");

		thread_tbl[i].start_args->start_routine = start_routine;
		thread_tbl[i].start_args->arg           = arg;

		ret = pthread_create(&thread_tbl[i].thread, &thread_tbl[i].attr,
			       odp_run_start_routine, thread_tbl[i].start_args);
		if (ret != 0) {
			ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
			free(thread_tbl[i].start_args);
			break;
		}

		cpu = odp_cpumask_next(&mask, cpu);
	}

	return i;
}


void odph_linux_pthread_join(odph_linux_pthread_t *thread_tbl, int num)
{
	int i;
	int ret;

	for (i = 0; i < num; i++) {
		/* Wait thread to exit */
		ret = pthread_join(thread_tbl[i].thread, NULL);
		if (ret != 0) {
			ODPH_ERR("Failed to join thread from cpu #%d\n",
				thread_tbl[i].cpu);
		}
		pthread_attr_destroy(&thread_tbl[i].attr);
		free(thread_tbl[i].start_args);
	}
}


int odph_linux_process_fork_n(odph_linux_process_t *proc_tbl,
			      const odp_cpumask_t *mask_in)
{
	odp_cpumask_t mask;
	pid_t pid;
	int num;
	int cpu_count;
	int cpu;
	int i;

	odp_cpumask_copy(&mask, mask_in);
	num = odp_cpumask_count(&mask);

	memset(proc_tbl, 0, num * sizeof(odph_linux_process_t));

	cpu_count = odp_cpu_count();

	if (num < 1 || num > cpu_count) {
		ODPH_ERR("Bad num\n");
		return -1;
	}

	cpu = odp_cpumask_first(&mask);
	for (i = 0; i < num; i++) {
		odp_cpumask_t proc_mask;

		odp_cpumask_zero(&proc_mask);
		odp_cpumask_set(&proc_mask, cpu);

		pid = fork();

		if (pid < 0) {
			ODPH_ERR("fork() failed\n");
			return -1;
		}

		/* Parent continues to fork */
		if (pid > 0) {
			proc_tbl[i].pid  = pid;
			proc_tbl[i].cpu = cpu;

			cpu = odp_cpumask_next(&mask, cpu);
			continue;
		}

		/* Child process */
		if (sched_setaffinity(0, sizeof(cpu_set_t), &proc_mask.set)) {
			ODPH_ERR("sched_setaffinity() failed\n");
			return -2;
		}

		if (odp_init_local(ODP_THREAD_WORKER)) {
			ODPH_ERR("Local init failed\n");
			return -2;
		}

		return 0;
	}

	return 1;
}


int odph_linux_process_fork(odph_linux_process_t *proc, int cpu)
{
	odp_cpumask_t mask;

	odp_cpumask_zero(&mask);
	odp_cpumask_set(&mask, cpu);
	return odph_linux_process_fork_n(proc, &mask);
}


int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num)
{
	pid_t pid;
	int i, j;
	int status;

	for (i = 0; i < num; i++) {
		pid = wait(&status);

		if (pid < 0) {
			ODPH_ERR("wait() failed\n");
			return -1;
		}

		for (j = 0; j < num; j++) {
			if (proc_tbl[j].pid == pid) {
				proc_tbl[j].status = status;
				break;
			}
		}

		if (j == num) {
			ODPH_ERR("Bad pid\n");
			return -1;
		}
	}

	return 0;
}