Ensure the `threaded_tests` module can be imported safely (#90)
On MacOS, running `multiprocessing.Manager()` spawns a new process. This
means it's not OK to run this in the global namespace, as that runs
while modules are being resolved, before main. The multiprocessing
guidelines [0], under "Safe importing of main module", indicate that
multiprocessing operations may have side-effects and mustn't run at that
point.
This turns the `Test.manager` global object into a local variable. The
manager's job is to handle shared state between processes and so its
lifetime is tied to the shared data. That data is then tied to the
`TestQueue` instance which runs tests in parallel and collects results.
So we can wrap the parallel test queue runner with a
`multiprocess.Manager()` context:
def Run(self, ...):
with multiprocessing.Manager() as manager:
# Run tests in parallel with manager
[0]: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
diff --git a/tools/threaded_tests.py b/tools/threaded_tests.py
index 0b83db1..0383c4f 100644
--- a/tools/threaded_tests.py
+++ b/tools/threaded_tests.py
@@ -42,79 +42,79 @@
n_tests_passed = multiprocessing.Value('i', 0)
n_tests_failed = multiprocessing.Value('i', 0)
n_tests_skipped = multiprocessing.Value('i', 0)
- manager = multiprocessing.Manager()
def __init__(self, name, shared, **kwargs):
- self.name = name
- self.shared = shared
- self.args = kwargs
+ self.name = name
+ self.shared = shared
+ self.args = kwargs
class TestQueue(object):
def __init__(self, prefix = ''):
self.progress_prefix = prefix
self.queue = []
- self.tests_skipped = Test.manager.dict()
+ self.tests_skipped = None
self.n_known_failures = 0
self.known_failures = collections.Counter()
def AddKnownFailures(self, reason, n_tests):
- self.n_known_failures += n_tests
- self.known_failures[reason] += n_tests
+ self.n_known_failures += n_tests
+ self.known_failures[reason] += n_tests
def AddTest(self, name, **kwargs):
self.queue.append(Test(name, self, **kwargs))
# Run the specified tests.
def Run(self, jobs, verbose, run_function):
- def InitGlobals():
- # Initialisation.
- self.start_time = time.time()
- self.n_tests = len(self.queue)
- if self.n_tests == 0:
- printer.Print('No tests to run.')
- return False
- Test.n_tests_passed.value = 0
- Test.n_tests_failed.value = 0
- Test.n_tests_skipped.value = 0
- self.tests_skipped.clear()
- return True
+ with multiprocessing.Manager() as manager:
+ def InitGlobals():
+ # Initialisation.
+ self.start_time = time.time()
+ self.n_tests = len(self.queue)
+ if self.n_tests == 0:
+ printer.Print('No tests to run.')
+ return False
+ Test.n_tests_passed.value = 0
+ Test.n_tests_failed.value = 0
+ Test.n_tests_skipped.value = 0
+ self.tests_skipped = manager.dict()
+ return True
- thread_pool.Multithread(run_function, self.queue, jobs, InitGlobals)
+ thread_pool.Multithread(run_function, self.queue, jobs, InitGlobals)
- printer.UpdateProgress(self.start_time,
- Test.n_tests_passed.value,
- Test.n_tests_failed.value,
- self.n_tests,
- Test.n_tests_skipped.value,
- self.n_known_failures,
- '== Done ==',
- prevent_next_overwrite = True,
- prefix = self.progress_prefix)
- n_tests_features = 0
- features = set()
- for reason, n_tests in self.tests_skipped.items():
- m = re.match(REGEXP_MISSING_FEATURES, reason)
- if m:
- if verbose:
- printer.Print("%d tests skipped because the following features are not "
- "available '%s'" % (n_tests, m.group(1)))
+ printer.UpdateProgress(self.start_time,
+ Test.n_tests_passed.value,
+ Test.n_tests_failed.value,
+ self.n_tests,
+ Test.n_tests_skipped.value,
+ self.n_known_failures,
+ '== Done ==',
+ prevent_next_overwrite = True,
+ prefix = self.progress_prefix)
+ n_tests_features = 0
+ features = set()
+ for reason, n_tests in self.tests_skipped.items():
+ m = re.match(REGEXP_MISSING_FEATURES, reason)
+ if m:
+ if verbose:
+ printer.Print("%d tests skipped because the following features are "
+ "not available '%s'" % (n_tests, m.group(1)))
+ else:
+ n_tests_features += n_tests
+ features.update(m.group(1).split(', '))
else:
- n_tests_features += n_tests
- features.update(m.group(1).split(', '))
- else:
- printer.Print("%d tests skipped because '%s'" % (n_tests, reason))
+ printer.Print("%d tests skipped because '%s'" % (n_tests, reason))
- n_tests_other = 0
- if n_tests_features > 0 :
- printer.Print("%d tests skipped because the CPU does not support "
- "the following features: '%s'" %
- (n_tests_features, ", ".join(features)))
+ n_tests_other = 0
+ if n_tests_features > 0 :
+ printer.Print("%d tests skipped because the CPU does not support "
+ "the following features: '%s'" %
+ (n_tests_features, ", ".join(features)))
- for reason, n_tests in self.known_failures.items():
- printer.Print("%d tests skipped because '%s'" % (n_tests, reason))
+ for reason, n_tests in self.known_failures.items():
+ printer.Print("%d tests skipped because '%s'" % (n_tests, reason))
- # Empty the queue now that the tests have been run.
- self.queue = []
- # `0` indicates success
- return Test.n_tests_failed.value
+ # Empty the queue now that the tests have been run.
+ self.queue = []
+ # `0` indicates success
+ return Test.n_tests_failed.value