# Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import fnmatch import importlib import inspect import json import os import pdb import sys import unittest import traceback from collections import OrderedDict # This ensures that absolute imports of typ modules will work when # running typ/runner.py as a script even if typ is not installed. # We need this entry in addition to the one in __main__.py to ensure # that typ/runner.py works when invoked via subprocess on windows in # _spawn_main(). path_to_file = os.path.realpath(__file__) if path_to_file.endswith('.pyc'): # pragma: no cover path_to_file = path_to_file[:-1] dir_above_typ = os.path.dirname(os.path.dirname(path_to_file)) if dir_above_typ not in sys.path: # pragma: no cover sys.path.append(dir_above_typ) from typ import json_results from typ.arg_parser import ArgumentParser from typ.host import Host from typ.pool import make_pool from typ.stats import Stats from typ.printer import Printer from typ.test_case import TestCase as TypTestCase from typ.version import VERSION Result = json_results.Result ResultSet = json_results.ResultSet ResultType = json_results.ResultType def main(argv=None, host=None, win_multiprocessing=None, **defaults): host = host or Host() runner = Runner(host=host) if win_multiprocessing is not None: runner.win_multiprocessing = win_multiprocessing return runner.main(argv, **defaults) class TestInput(object): def __init__(self, name, msg='', timeout=None, expected=None): self.name = name self.msg = msg self.timeout = timeout self.expected = expected class TestSet(object): def __init__(self, parallel_tests=None, isolated_tests=None, tests_to_skip=None): def promote(tests): tests = tests or [] return [test if isinstance(test, TestInput) else TestInput(test) for test in tests] self.parallel_tests = promote(parallel_tests) self.isolated_tests = promote(isolated_tests) self.tests_to_skip = promote(tests_to_skip) class WinMultiprocessing(object): ignore = 'ignore' importable = 'importable' spawn = 'spawn' values = [ignore, importable, spawn] class _AddTestsError(Exception): pass class Runner(object): def __init__(self, host=None): self.args = None self.classifier = None self.cov = None self.context = None self.coverage_source = None self.host = host or Host() self.loader = unittest.loader.TestLoader() self.printer = None self.setup_fn = None self.stats = None self.teardown_fn = None self.top_level_dir = None self.win_multiprocessing = WinMultiprocessing.spawn self.final_responses = [] # initialize self.args to the defaults. parser = ArgumentParser(self.host) self.parse_args(parser, []) def main(self, argv=None, **defaults): parser = ArgumentParser(self.host) self.parse_args(parser, argv, **defaults) if parser.exit_status is not None: return parser.exit_status try: ret, _, _ = self.run() return ret except KeyboardInterrupt: self.print_("interrupted, exiting", stream=self.host.stderr) return 130 def parse_args(self, parser, argv, **defaults): for attrname in defaults: if not hasattr(self.args, attrname): parser.error("Unknown default argument name '%s'" % attrname, bailout=False) return parser.set_defaults(**defaults) self.args = parser.parse_args(args=argv) if parser.exit_status is not None: return def print_(self, msg='', end='\n', stream=None): self.host.print_(msg, end, stream=stream) def run(self, test_set=None): ret = 0 h = self.host if self.args.version: self.print_(VERSION) return ret, None, None should_spawn = self._check_win_multiprocessing() if should_spawn: return self._spawn(test_set) ret = self._set_up_runner() if ret: # pragma: no cover return ret, None, None find_start = h.time() if self.cov: # pragma: no cover self.cov.erase() self.cov.start() full_results = None result_set = ResultSet() if not test_set: ret, test_set = self.find_tests(self.args) find_end = h.time() if not ret: ret, full_results = self._run_tests(result_set, test_set) if self.cov: # pragma: no cover self.cov.stop() self.cov.save() test_end = h.time() trace = self._trace_from_results(result_set) if full_results: self._summarize(full_results) self._write(self.args.write_full_results_to, full_results) upload_ret = self._upload(full_results) if not ret: ret = upload_ret reporting_end = h.time() self._add_trace_event(trace, 'run', find_start, reporting_end) self._add_trace_event(trace, 'discovery', find_start, find_end) self._add_trace_event(trace, 'testing', find_end, test_end) self._add_trace_event(trace, 'reporting', test_end, reporting_end) self._write(self.args.write_trace_to, trace) self.report_coverage() else: upload_ret = 0 return ret, full_results, trace def _check_win_multiprocessing(self): wmp = self.win_multiprocessing ignore, importable, spawn = WinMultiprocessing.values if wmp not in WinMultiprocessing.values: raise ValueError('illegal value %s for win_multiprocessing' % wmp) h = self.host if wmp == ignore and h.platform == 'win32': # pragma: win32 raise ValueError('Cannot use WinMultiprocessing.ignore for ' 'win_multiprocessing when actually running ' 'on Windows.') if wmp == ignore or self.args.jobs == 1: return False if wmp == importable: if self._main_is_importable(): return False raise ValueError('The __main__ module (%s) ' # pragma: no cover 'may not be importable' % sys.modules['__main__'].__file__) assert wmp == spawn return True def _main_is_importable(self): # pragma: untested path = sys.modules['__main__'].__file__ if not path: return False if path.endswith('.pyc'): path = path[:-1] if not path.endswith('.py'): return False if path.endswith('__main__.py'): # main modules are not directly importable. return False path = self.host.realpath(path) for d in sys.path: if path.startswith(self.host.realpath(d)): return True return False # pragma: no cover def _spawn(self, test_set): # TODO: Handle picklable hooks, rather than requiring them to be None. assert self.classifier is None assert self.context is None assert self.setup_fn is None assert self.teardown_fn is None assert test_set is None h = self.host if self.args.write_trace_to: # pragma: untested should_delete_trace = False else: should_delete_trace = True fp = h.mktempfile(delete=False) fp.close() self.args.write_trace_to = fp.name if self.args.write_full_results_to: # pragma: untested should_delete_results = False else: should_delete_results = True fp = h.mktempfile(delete=False) fp.close() self.args.write_full_results_to = fp.name argv = ArgumentParser(h).argv_from_args(self.args) ret = h.call_inline([h.python_interpreter, path_to_file] + argv) trace = self._read_and_delete(self.args.write_trace_to, should_delete_trace) full_results = self._read_and_delete(self.args.write_full_results_to, should_delete_results) return ret, full_results, trace def _set_up_runner(self): h = self.host args = self.args self.stats = Stats(args.status_format, h.time, args.jobs) self.printer = Printer( self.print_, args.overwrite, args.terminal_width) self.top_level_dir = args.top_level_dir if not self.top_level_dir: if args.tests and h.isdir(args.tests[0]): # TODO: figure out what to do if multiple files are # specified and they don't all have the same correct # top level dir. d = h.realpath(h.dirname(args.tests[0])) if h.exists(d, '__init__.py'): top_dir = d else: top_dir = args.tests[0] else: top_dir = h.getcwd() while h.exists(top_dir, '__init__.py'): top_dir = h.dirname(top_dir) self.top_level_dir = h.realpath(top_dir) h.add_to_path(self.top_level_dir) for path in args.path: h.add_to_path(path) if args.coverage: # pragma: no cover try: import coverage except ImportError: h.print_("Error: coverage is not installed") return 1 source = self.args.coverage_source if not source: source = [self.top_level_dir] + self.args.path self.coverage_source = source self.cov = coverage.coverage(source=self.coverage_source, data_suffix=True) self.cov.erase() return 0 def find_tests(self, args): test_set = TestSet() orig_skip = unittest.skip orig_skip_if = unittest.skipIf if args.all: unittest.skip = lambda reason: lambda x: x unittest.skipIf = lambda condition, reason: lambda x: x try: names = self._name_list_from_args(args) classifier = self.classifier or _default_classifier(args) for name in names: try: self._add_tests_to_set(test_set, args.suffixes, self.top_level_dir, classifier, name) except (AttributeError, ImportError, SyntaxError) as e: self.print_('Failed to load "%s": %s' % (name, e)) return 1, None except _AddTestsError as e: self.print_(str(e)) return 1, None # TODO: Add support for discovering setupProcess/teardownProcess? test_set.parallel_tests = _sort_inputs(test_set.parallel_tests) test_set.isolated_tests = _sort_inputs(test_set.isolated_tests) test_set.tests_to_skip = _sort_inputs(test_set.tests_to_skip) return 0, test_set finally: unittest.skip = orig_skip unittest.skipIf = orig_skip_if def _name_list_from_args(self, args): if args.tests: names = args.tests elif args.file_list: if args.file_list == '-': s = self.host.stdin.read() else: s = self.host.read_text_file(args.file_list) names = [line.strip() for line in s.splitlines()] else: names = [self.top_level_dir] return names def _add_tests_to_set(self, test_set, suffixes, top_level_dir, classifier, name): h = self.host loader = self.loader add_tests = _test_adder(test_set, classifier) if h.isfile(name): rpath = h.relpath(name, top_level_dir) if rpath.endswith('.py'): rpath = rpath[:-3] module = rpath.replace(h.sep, '.') add_tests(loader.loadTestsFromName(module)) elif h.isdir(name): for suffix in suffixes: add_tests(loader.discover(name, suffix, top_level_dir)) else: possible_dir = name.replace('.', h.sep) if h.isdir(top_level_dir, possible_dir): for suffix in suffixes: path = h.join(top_level_dir, possible_dir) suite = loader.discover(path, suffix, top_level_dir) add_tests(suite) else: add_tests(loader.loadTestsFromName(name)) def _run_tests(self, result_set, test_set): h = self.host if not test_set.parallel_tests and not test_set.isolated_tests: self.print_('No tests to run.') return 1, None all_tests = [ti.name for ti in _sort_inputs(test_set.parallel_tests + test_set.isolated_tests + test_set.tests_to_skip)] if self.args.list_only: self.print_('\n'.join(all_tests)) return 0, None self._run_one_set(self.stats, result_set, test_set) failed_tests = sorted(json_results.failed_test_names(result_set)) retry_limit = self.args.retry_limit while retry_limit and failed_tests: if retry_limit == self.args.retry_limit: self.flush() self.args.overwrite = False self.printer.should_overwrite = False self.args.verbose = min(self.args.verbose, 1) self.print_('') self.print_('Retrying failed tests (attempt #%d of %d)...' % (self.args.retry_limit - retry_limit + 1, self.args.retry_limit)) self.print_('') stats = Stats(self.args.status_format, h.time, 1) stats.total = len(failed_tests) tests_to_retry = TestSet(isolated_tests=list(failed_tests)) retry_set = ResultSet() self._run_one_set(stats, retry_set, tests_to_retry) result_set.results.extend(retry_set.results) failed_tests = json_results.failed_test_names(retry_set) retry_limit -= 1 if retry_limit != self.args.retry_limit: self.print_('') full_results = json_results.make_full_results(self.args.metadata, int(h.time()), all_tests, result_set) return (json_results.exit_code_from_full_results(full_results), full_results) def _run_one_set(self, stats, result_set, test_set): stats.total = (len(test_set.parallel_tests) + len(test_set.isolated_tests) + len(test_set.tests_to_skip)) self._skip_tests(stats, result_set, test_set.tests_to_skip) self._run_list(stats, result_set, test_set.parallel_tests, self.args.jobs) self._run_list(stats, result_set, test_set.isolated_tests, 1) def _skip_tests(self, stats, result_set, tests_to_skip): for test_input in tests_to_skip: last = self.host.time() stats.started += 1 self._print_test_started(stats, test_input) now = self.host.time() result = Result(test_input.name, actual=ResultType.Skip, started=last, took=(now - last), worker=0, expected=[ResultType.Skip], out=test_input.msg) result_set.add(result) stats.finished += 1 self._print_test_finished(stats, result) def _run_list(self, stats, result_set, test_inputs, jobs): h = self.host running_jobs = set() jobs = min(len(test_inputs), jobs) if not jobs: return child = _Child(self) pool = make_pool(h, jobs, _run_one_test, child, _setup_process, _teardown_process) try: while test_inputs or running_jobs: while test_inputs and (len(running_jobs) < self.args.jobs): test_input = test_inputs.pop(0) stats.started += 1 pool.send(test_input) running_jobs.add(test_input.name) self._print_test_started(stats, test_input) result = pool.get() running_jobs.remove(result.name) result_set.add(result) stats.finished += 1 self._print_test_finished(stats, result) pool.close() finally: self.final_responses.extend(pool.join()) def _print_test_started(self, stats, test_input): if self.args.quiet: # Print nothing when --quiet was passed. return # If -vvv was passed, print when the test is queued to be run. # We don't actually know when the test picked up to run, because # that is handled by the child process (where we can't easily # print things). Otherwise, only print when the test is started # if we know we can overwrite the line, so that we do not # get multiple lines of output as noise (in -vvv, we actually want # the noise). test_start_msg = stats.format() + test_input.name if self.args.verbose > 2: self.update(test_start_msg + ' queued', elide=False) if self.args.overwrite: self.update(test_start_msg, elide=(not self.args.verbose)) def _print_test_finished(self, stats, result): stats.add_time() assert result.actual in [ResultType.Failure, ResultType.Skip, ResultType.Pass] if result.actual == ResultType.Failure: result_str = ' failed' elif result.actual == ResultType.Skip: result_str = ' was skipped' elif result.actual == ResultType.Pass: result_str = ' passed' if result.unexpected: result_str += ' unexpectedly' if self.args.timing: timing_str = ' %.4fs' % result.took else: timing_str = '' suffix = '%s%s' % (result_str, timing_str) out = result.out err = result.err if result.code: if out or err: suffix += ':\n' self.update(stats.format() + result.name + suffix, elide=False) for l in out.splitlines(): self.print_(' %s' % l) for l in err.splitlines(): self.print_(' %s' % l) elif not self.args.quiet: if self.args.verbose > 1 and (out or err): suffix += ':\n' self.update(stats.format() + result.name + suffix, elide=(not self.args.verbose)) if self.args.verbose > 1: for l in out.splitlines(): self.print_(' %s' % l) for l in err.splitlines(): self.print_(' %s' % l) if self.args.verbose: self.flush() def update(self, msg, elide): self.printer.update(msg, elide) def flush(self): self.printer.flush() def _summarize(self, full_results): num_tests = self.stats.finished num_failures = json_results.num_failures(full_results) if self.args.quiet and num_failures == 0: return if self.args.timing: timing_clause = ' in %.1fs' % (self.host.time() - self.stats.started_time) else: timing_clause = '' self.update('%d test%s run%s, %d failure%s.' % (num_tests, '' if num_tests == 1 else 's', timing_clause, num_failures, '' if num_failures == 1 else 's'), elide=False) self.print_() def _read_and_delete(self, path, delete): h = self.host obj = None if h.exists(path): contents = h.read_text_file(path) if contents: obj = json.loads(contents) if delete: h.remove(path) return obj def _write(self, path, obj): if path: self.host.write_text_file(path, json.dumps(obj, indent=2) + '\n') def _upload(self, full_results): h = self.host if not self.args.test_results_server: return 0 url, content_type, data = json_results.make_upload_request( self.args.test_results_server, self.args.builder_name, self.args.master_name, self.args.test_type, full_results) try: h.fetch(url, data, {'Content-Type': content_type}) return 0 except Exception as e: h.print_('Uploading the JSON results raised "%s"' % str(e)) return 1 def report_coverage(self): if self.args.coverage: # pragma: no cover self.host.print_() import coverage cov = coverage.coverage(data_suffix=True) cov.combine() cov.report(show_missing=self.args.coverage_show_missing, omit=self.args.coverage_omit) if self.args.coverage_annotate: cov.annotate(omit=self.args.coverage_omit) def _add_trace_event(self, trace, name, start, end): event = { 'name': name, 'ts': int((start - self.stats.started_time) * 1000000), 'dur': int((end - start) * 1000000), 'ph': 'X', 'pid': self.host.getpid(), 'tid': 0, } trace['traceEvents'].append(event) def _trace_from_results(self, result_set): trace = OrderedDict() trace['traceEvents'] = [] trace['otherData'] = {} for m in self.args.metadata: k, v = m.split('=') trace['otherData'][k] = v for result in result_set.results: started = int((result.started - self.stats.started_time) * 1000000) took = int(result.took * 1000000) event = OrderedDict() event['name'] = result.name event['dur'] = took event['ts'] = started event['ph'] = 'X' # "Complete" events event['pid'] = result.pid event['tid'] = result.worker args = OrderedDict() args['expected'] = sorted(str(r) for r in result.expected) args['actual'] = str(result.actual) args['out'] = result.out args['err'] = result.err args['code'] = result.code args['unexpected'] = result.unexpected args['flaky'] = result.flaky event['args'] = args trace['traceEvents'].append(event) return trace def _matches(name, globs): return any(fnmatch.fnmatch(name, glob) for glob in globs) def _default_classifier(args): def default_classifier(test_set, test): name = test.id() if not args.all and _matches(name, args.skip): test_set.tests_to_skip.append(TestInput(name, 'skipped by request')) elif _matches(name, args.isolate): test_set.isolated_tests.append(TestInput(name)) else: test_set.parallel_tests.append(TestInput(name)) return default_classifier def _test_adder(test_set, classifier): def add_tests(obj): if isinstance(obj, unittest.suite.TestSuite): for el in obj: add_tests(el) elif (obj.id().startswith('unittest.loader.LoadTestsFailure') or obj.id().startswith('unittest.loader.ModuleImportFailure')): # Access to protected member pylint: disable=W0212 module_name = obj._testMethodName try: method = getattr(obj, obj._testMethodName) method() except Exception as e: if 'LoadTests' in obj.id(): raise _AddTestsError('%s.load_tests() failed: %s' % (module_name, str(e))) else: raise _AddTestsError(str(e)) else: assert isinstance(obj, unittest.TestCase) classifier(test_set, obj) return add_tests class _Child(object): def __init__(self, parent): self.host = None self.worker_num = None self.all = parent.args.all self.debugger = parent.args.debugger self.coverage = parent.args.coverage and parent.args.jobs > 1 self.coverage_source = parent.coverage_source self.dry_run = parent.args.dry_run self.loader = parent.loader self.passthrough = parent.args.passthrough self.context = parent.context self.setup_fn = parent.setup_fn self.teardown_fn = parent.teardown_fn self.context_after_setup = None self.top_level_dir = parent.top_level_dir self.loaded_suites = {} self.cov = None def _setup_process(host, worker_num, child): child.host = host child.worker_num = worker_num # pylint: disable=protected-access if child.coverage: # pragma: no cover import coverage child.cov = coverage.coverage(source=child.coverage_source, data_suffix=True) child.cov._warn_no_data = False child.cov.start() if child.setup_fn: child.context_after_setup = child.setup_fn(child, child.context) else: child.context_after_setup = child.context return child def _teardown_process(child): res = None e = None if child.teardown_fn: try: res = child.teardown_fn(child, child.context_after_setup) except Exception as e: pass if child.cov: # pragma: no cover child.cov.stop() child.cov.save() return (child.worker_num, res, e) def _run_one_test(child, test_input): h = child.host pid = h.getpid() test_name = test_input.name start = h.time() # It is important to capture the output before loading the test # to ensure that # 1) the loader doesn't logs something we don't captured # 2) neither the loader nor the test case grab a reference to the # uncaptured stdout or stderr that later is used when the test is run. # This comes up when using the FakeTestLoader and testing typ itself, # but could come up when testing non-typ code as well. h.capture_output(divert=not child.passthrough) tb_str = '' try: orig_skip = unittest.skip orig_skip_if = unittest.skipIf if child.all: unittest.skip = lambda reason: lambda x: x unittest.skipIf = lambda condition, reason: lambda x: x try: suite = child.loader.loadTestsFromName(test_name) except Exception as e: try: suite = _load_via_load_tests(child, test_name) except Exception as e: # pragma: untested suite = [] tb_str = traceback.format_exc(e) finally: unittest.skip = orig_skip unittest.skipIf = orig_skip_if tests = list(suite) if len(tests) != 1: err = 'Failed to load %s' if tb_str: # pragma: untested err += (' (traceback follows):\n %s' % ' \n'.join(tb_str.splitlines())) h.restore_output() return Result(test_name, ResultType.Failure, start, 0, child.worker_num, unexpected=True, code=1, err=err, pid=pid) test_case = tests[0] if isinstance(test_case, TypTestCase): test_case.child = child test_case.context = child.context_after_setup test_result = unittest.TestResult() out = '' err = '' try: if child.dry_run: pass elif child.debugger: # pragma: no cover _run_under_debugger(h, test_case, suite, test_result) else: suite.run(test_result) finally: out, err = h.restore_output() took = h.time() - start return _result_from_test_result(test_result, test_name, start, took, out, err, child.worker_num, pid) def _run_under_debugger(host, test_case, suite, test_result): # pragma: no cover # Access to protected member pylint: disable=W0212 test_func = getattr(test_case, test_case._testMethodName) fname = inspect.getsourcefile(test_func) lineno = inspect.getsourcelines(test_func)[1] + 1 dbg = pdb.Pdb(stdout=host.stdout.stream) dbg.set_break(fname, lineno) dbg.runcall(suite.run, test_result) def _result_from_test_result(test_result, test_name, start, took, out, err, worker_num, pid): flaky = False if test_result.failures: expected = [ResultType.Pass] actual = ResultType.Failure code = 1 unexpected = True err = err + test_result.failures[0][1] elif test_result.errors: expected = [ResultType.Pass] actual = ResultType.Failure code = 1 unexpected = True err = err + test_result.errors[0][1] elif test_result.skipped: expected = [ResultType.Skip] actual = ResultType.Skip err = err + test_result.skipped[0][1] code = 0 unexpected = False elif test_result.expectedFailures: expected = [ResultType.Failure] actual = ResultType.Failure code = 1 err = err + test_result.expectedFailures[0][1] unexpected = False elif test_result.unexpectedSuccesses: expected = [ResultType.Failure] actual = ResultType.Pass code = 0 unexpected = True else: expected = [ResultType.Pass] actual = ResultType.Pass code = 0 unexpected = False return Result(test_name, actual, start, took, worker_num, expected, unexpected, flaky, code, out, err, pid) def _load_via_load_tests(child, test_name): # If we couldn't import a test directly, the test may be only loadable # via unittest's load_tests protocol. See if we can find a load_tests # entry point that will work for this test. loader = child.loader comps = test_name.split('.') new_suite = unittest.TestSuite() while comps: name = '.'.join(comps) module = None suite = None if name not in child.loaded_suites: try: module = importlib.import_module(name) except ImportError: pass if module: suite = loader.loadTestsFromModule(module) child.loaded_suites[name] = suite suite = child.loaded_suites[name] if suite: for test_case in suite: assert isinstance(test_case, unittest.TestCase) if test_case.id() == test_name: new_suite.addTest(test_case) break comps.pop() return new_suite def _sort_inputs(inps): return sorted(inps, key=lambda inp: inp.name) if __name__ == '__main__': # pragma: no cover sys.modules['__main__'].__file__ = path_to_file sys.exit(main(win_multiprocessing=WinMultiprocessing.importable))