ext: Run pre-commit run --files ext/testlib

Change-Id: Ic581132f6136dddb127e2a1c5a1ecc19876488c3
This commit is contained in:
Bobby R. Bruce
2023-09-04 23:51:14 -07:00
parent ff75e5b30e
commit 9e1afdecef
17 changed files with 952 additions and 709 deletions

View File

@@ -36,9 +36,9 @@ from .fixture import *
from .configuration import *
from .main import main
#TODO Remove this awkward bootstrap
#FIXME
# TODO Remove this awkward bootstrap
# FIXME
from gem5 import *
#TODO Remove this as an export, users should getcwd from os
# TODO Remove this as an export, users should getcwd from os
from os import getcwd

View File

@@ -38,7 +38,7 @@
#
# Authors: Sean Wilson
'''
"""
Global configuration module which exposes two types of configuration
variables:
@@ -76,7 +76,7 @@ found an AttributeError will be raised.
common string names used across the test framework.
:code:`_defaults.build_dir = None` Once this module has been imported
constants should not be modified and their base attributes are frozen.
'''
"""
import abc
import argparse
import copy
@@ -87,27 +87,33 @@ from pickle import HIGHEST_PROTOCOL as highest_pickle_protocol
from testlib.helper import absdirpath, AttrDict, FrozenAttrDict
class UninitialzedAttributeException(Exception):
'''
"""
Signals that an attribute in the config file was not initialized.
'''
"""
pass
class UninitializedConfigException(Exception):
'''
"""
Signals that the config was not initialized before trying to access an
attribute.
'''
"""
pass
class TagRegex(object):
def __init__(self, include, regex):
self.include = include
self.regex = re.compile(regex)
def __str__(self):
type_ = 'Include' if self.include else 'Remove'
return '%10s: %s' % (type_, self.regex.pattern)
type_ = "Include" if self.include else "Remove"
return "%10s: %s" % (type_, self.regex.pattern)
class _Config(object):
_initialized = False
@@ -131,14 +137,14 @@ class _Config(object):
self._initialized = True
def _add_post_processor(self, attr, post_processor):
'''
"""
:param attr: Attribute to pass to and recieve from the
:func:`post_processor`.
:param post_processor: A callback functions called in a chain to
perform additional setup for a config argument. Should return a
tuple containing the new value for the config attr.
'''
"""
if attr not in self._post_processors:
self._post_processors[attr] = []
self._post_processors[attr].append(post_processor)
@@ -153,7 +159,7 @@ class _Config(object):
for attr in dir(args):
# Ignore non-argument attributes.
if not attr.startswith('_'):
if not attr.startswith("_"):
self._config_file_args[attr] = getattr(args, attr)
self._config.update(self._config_file_args)
@@ -166,14 +172,13 @@ class _Config(object):
newval = newval[0]
self._set(attr, newval)
def _lookup_val(self, attr):
'''
"""
Get the attribute from the config or fallback to defaults.
:returns: If the value is not stored return None. Otherwise a tuple
containing the value.
'''
"""
if attr in self._config:
return (self._config[attr],)
elif hasattr(self._defaults, attr):
@@ -184,82 +189,87 @@ class _Config(object):
return getattr(super(_Config, self), attr)
elif not self._initialized:
raise UninitializedConfigException(
'Cannot directly access elements from the config before it is'
' initialized')
"Cannot directly access elements from the config before it is"
" initialized"
)
else:
val = self._lookup_val(attr)
if val is not None:
return val[0]
else:
raise UninitialzedAttributeException(
'%s was not initialzed in the config.' % attr)
"%s was not initialzed in the config." % attr
)
def get_tags(self):
d = {typ: set(self.__getattr__(typ))
for typ in self.constants.supported_tags}
d = {
typ: set(self.__getattr__(typ))
for typ in self.constants.supported_tags
}
if any(map(lambda vals: bool(vals), d.values())):
return d
else:
return {}
def define_defaults(defaults):
'''
"""
Defaults are provided by the config if the attribute is not found in the
config or commandline. For instance, if we are using the list command
fixtures might not be able to count on the build_dir being provided since
we aren't going to build anything.
'''
defaults.base_dir = os.path.abspath(os.path.join(absdirpath(__file__),
os.pardir,
os.pardir))
defaults.result_path = os.path.join(os.getcwd(), 'testing-results')
defaults.resource_url = 'http://dist.gem5.org/dist/develop'
defaults.resource_path = os.path.abspath(os.path.join(defaults.base_dir,
'tests',
'gem5',
'resources'))
"""
defaults.base_dir = os.path.abspath(
os.path.join(absdirpath(__file__), os.pardir, os.pardir)
)
defaults.result_path = os.path.join(os.getcwd(), "testing-results")
defaults.resource_url = "http://dist.gem5.org/dist/develop"
defaults.resource_path = os.path.abspath(
os.path.join(defaults.base_dir, "tests", "gem5", "resources")
)
def define_constants(constants):
'''
"""
'constants' are values not directly exposed by the config, but are attached
to the object for centralized access. These should be used for setting
common string names used across the test framework. A simple typo in
a string can take a lot of debugging to uncover the issue, attribute errors
are easier to notice and most autocompletion systems detect them.
'''
constants.system_out_name = 'system-out'
constants.system_err_name = 'system-err'
"""
constants.system_out_name = "system-out"
constants.system_err_name = "system-err"
constants.isa_tag_type = 'isa'
constants.x86_tag = 'X86'
constants.gcn3_x86_tag = 'GCN3_X86'
constants.vega_x86_tag = 'VEGA_X86'
constants.sparc_tag = 'SPARC'
constants.riscv_tag = 'RISCV'
constants.arm_tag = 'ARM'
constants.mips_tag = 'MIPS'
constants.power_tag = 'POWER'
constants.null_tag = 'NULL'
constants.all_compiled_tag = 'ALL'
constants.isa_tag_type = "isa"
constants.x86_tag = "X86"
constants.gcn3_x86_tag = "GCN3_X86"
constants.vega_x86_tag = "VEGA_X86"
constants.sparc_tag = "SPARC"
constants.riscv_tag = "RISCV"
constants.arm_tag = "ARM"
constants.mips_tag = "MIPS"
constants.power_tag = "POWER"
constants.null_tag = "NULL"
constants.all_compiled_tag = "ALL"
constants.variant_tag_type = 'variant'
constants.opt_tag = 'opt'
constants.debug_tag = 'debug'
constants.fast_tag = 'fast'
constants.variant_tag_type = "variant"
constants.opt_tag = "opt"
constants.debug_tag = "debug"
constants.fast_tag = "fast"
constants.length_tag_type = 'length'
constants.quick_tag = 'quick'
constants.long_tag = 'long'
constants.very_long_tag = 'very-long'
constants.length_tag_type = "length"
constants.quick_tag = "quick"
constants.long_tag = "long"
constants.very_long_tag = "very-long"
constants.host_isa_tag_type = 'host'
constants.host_x86_64_tag = 'x86_64'
constants.host_arm_tag = 'aarch64'
constants.host_isa_tag_type = "host"
constants.host_x86_64_tag = "x86_64"
constants.host_arm_tag = "aarch64"
constants.kvm_tag = 'kvm'
constants.kvm_tag = "kvm"
constants.supported_tags = {
constants.isa_tag_type : (
constants.isa_tag_type: (
constants.x86_tag,
constants.gcn3_x86_tag,
constants.vega_x86_tag,
@@ -290,41 +300,43 @@ def define_constants(constants):
# Binding target ISA with host ISA. This is useful for the
# case where host ISA and target ISA need to coincide
constants.target_host = {
constants.arm_tag : (constants.host_arm_tag,),
constants.x86_tag : (constants.host_x86_64_tag,),
constants.gcn3_x86_tag : (constants.host_x86_64_tag,),
constants.vega_x86_tag : (constants.host_x86_64_tag,),
constants.sparc_tag : (constants.host_x86_64_tag,),
constants.riscv_tag : (constants.host_x86_64_tag,),
constants.mips_tag : (constants.host_x86_64_tag,),
constants.power_tag : (constants.host_x86_64_tag,),
constants.null_tag : (None,),
constants.arm_tag: (constants.host_arm_tag,),
constants.x86_tag: (constants.host_x86_64_tag,),
constants.gcn3_x86_tag: (constants.host_x86_64_tag,),
constants.vega_x86_tag: (constants.host_x86_64_tag,),
constants.sparc_tag: (constants.host_x86_64_tag,),
constants.riscv_tag: (constants.host_x86_64_tag,),
constants.mips_tag: (constants.host_x86_64_tag,),
constants.power_tag: (constants.host_x86_64_tag,),
constants.null_tag: (None,),
constants.all_compiled_tag: (None,),
}
constants.supported_isas = constants.supported_tags['isa']
constants.supported_variants = constants.supported_tags['variant']
constants.supported_lengths = constants.supported_tags['length']
constants.supported_hosts = constants.supported_tags['host']
constants.supported_isas = constants.supported_tags["isa"]
constants.supported_variants = constants.supported_tags["variant"]
constants.supported_lengths = constants.supported_tags["length"]
constants.supported_hosts = constants.supported_tags["host"]
constants.tempdir_fixture_name = 'tempdir'
constants.gem5_simulation_stderr = 'simerr'
constants.gem5_simulation_stdout = 'simout'
constants.gem5_simulation_stats = 'stats.txt'
constants.gem5_simulation_config_ini = 'config.ini'
constants.gem5_simulation_config_json = 'config.json'
constants.gem5_returncode_fixture_name = 'gem5-returncode'
constants.gem5_binary_fixture_name = 'gem5'
constants.xml_filename = 'results.xml'
constants.pickle_filename = 'results.pickle'
constants.tempdir_fixture_name = "tempdir"
constants.gem5_simulation_stderr = "simerr"
constants.gem5_simulation_stdout = "simout"
constants.gem5_simulation_stats = "stats.txt"
constants.gem5_simulation_config_ini = "config.ini"
constants.gem5_simulation_config_json = "config.json"
constants.gem5_returncode_fixture_name = "gem5-returncode"
constants.gem5_binary_fixture_name = "gem5"
constants.xml_filename = "results.xml"
constants.pickle_filename = "results.pickle"
constants.pickle_protocol = highest_pickle_protocol
# The root directory which all test names will be based off of.
constants.testing_base = absdirpath(os.path.join(absdirpath(__file__),
os.pardir))
constants.testing_base = absdirpath(
os.path.join(absdirpath(__file__), os.pardir)
)
def define_post_processors(config):
'''
"""
post_processors are used to do final configuration of variables. This is
useful if there is a dynamically set default, or some function that needs
to be applied after parsing in order to set a configration value.
@@ -333,17 +345,17 @@ def define_post_processors(config):
containing the already set config value or ``None`` if the config value
has not been set to anything. They must return the modified value in the
same format.
'''
"""
def set_default_build_dir(build_dir):
'''
"""
Post-processor to set the default build_dir based on the base_dir.
.. seealso :func:`~_Config._add_post_processor`
'''
"""
if not build_dir or build_dir[0] is None:
base_dir = config._lookup_val('base_dir')[0]
build_dir = (os.path.join(base_dir, 'build'),)
base_dir = config._lookup_val("base_dir")[0]
build_dir = (os.path.join(base_dir, "build"),)
return build_dir
def fix_verbosity_hack(verbose):
@@ -381,6 +393,7 @@ def define_post_processors(config):
if not host[0]:
try:
import platform
host_machine = platform.machine()
if host_machine not in constants.supported_hosts:
raise ValueError("Invalid host machine")
@@ -398,87 +411,98 @@ def define_post_processors(config):
positional_tags = positional_tags[0]
for flag, regex in positional_tags:
if flag == 'exclude_tags':
if flag == "exclude_tags":
tag_regex = TagRegex(False, regex)
elif flag == 'include_tags':
elif flag == "include_tags":
tag_regex = TagRegex(True, regex)
else:
raise ValueError('Unsupported flag.')
raise ValueError("Unsupported flag.")
new_positional_tags_list.append(tag_regex)
return (new_positional_tags_list,)
config._add_post_processor('build_dir', set_default_build_dir)
config._add_post_processor('verbose', fix_verbosity_hack)
config._add_post_processor('isa', default_isa)
config._add_post_processor('variant', default_variant)
config._add_post_processor('length', default_length)
config._add_post_processor('host', default_host)
config._add_post_processor('threads', threads_as_int)
config._add_post_processor('test_threads', test_threads_as_int)
config._add_post_processor(StorePositionalTagsAction.position_kword,
compile_tag_regex)
config._add_post_processor("build_dir", set_default_build_dir)
config._add_post_processor("verbose", fix_verbosity_hack)
config._add_post_processor("isa", default_isa)
config._add_post_processor("variant", default_variant)
config._add_post_processor("length", default_length)
config._add_post_processor("host", default_host)
config._add_post_processor("threads", threads_as_int)
config._add_post_processor("test_threads", test_threads_as_int)
config._add_post_processor(
StorePositionalTagsAction.position_kword, compile_tag_regex
)
class Argument(object):
'''
"""
Class represents a cli argument/flag for a argparse parser.
:attr name: The long name of this object that will be stored in the arg
output by the final parser.
'''
"""
def __init__(self, *flags, **kwargs):
self.flags = flags
self.kwargs = kwargs
if len(flags) == 0:
raise ValueError("Need at least one argument.")
elif 'dest' in kwargs:
self.name = kwargs['dest']
elif len(flags) > 1 or flags[0].startswith('-'):
elif "dest" in kwargs:
self.name = kwargs["dest"]
elif len(flags) > 1 or flags[0].startswith("-"):
for flag in flags:
if not flag.startswith('-'):
raise ValueError("invalid option string %s: must start"
"with a character '-'" % flag)
if not flag.startswith("-"):
raise ValueError(
"invalid option string %s: must start"
"with a character '-'" % flag
)
if flag.startswith('--'):
if not hasattr(self, 'name'):
self.name = flag.lstrip('-')
if flag.startswith("--"):
if not hasattr(self, "name"):
self.name = flag.lstrip("-")
if not hasattr(self, 'name'):
self.name = flags[0].lstrip('-')
self.name = self.name.replace('-', '_')
if not hasattr(self, "name"):
self.name = flags[0].lstrip("-")
self.name = self.name.replace("-", "_")
def add_to(self, parser):
'''Add this argument to the given parser.'''
"""Add this argument to the given parser."""
parser.add_argument(*self.flags, **self.kwargs)
def copy(self):
'''Copy this argument so you might modify any of its kwargs.'''
"""Copy this argument so you might modify any of its kwargs."""
return copy.deepcopy(self)
class _StickyInt:
'''
"""
A class that is used to cheat the verbosity count incrementer by
pretending to be an int. This makes the int stay on the heap and eat other
real numbers when they are added to it.
We use this so we can allow the verbose flag to be provided before or after
the subcommand. This likely has no utility outside of this use case.
'''
"""
def __init__(self, val=0):
self.val = val
self.type = int
def __add__(self, other):
self.val += other
return self
common_args = NotImplemented
class StorePositionAction(argparse.Action):
'''Base class for classes wishing to create namespaces where
"""Base class for classes wishing to create namespaces where
arguments are stored in the order provided via the command line.
'''
position_kword = 'positional'
"""
position_kword = "positional"
def __call__(self, parser, namespace, values, option_string=None):
if not self.position_kword in namespace:
@@ -487,120 +511,134 @@ class StorePositionAction(argparse.Action):
previous.append((self.dest, values))
setattr(namespace, self.position_kword, previous)
class StorePositionalTagsAction(StorePositionAction):
position_kword = 'tag_filters'
position_kword = "tag_filters"
def define_common_args(config):
'''
"""
Common args are arguments which are likely to be simular between different
subcommands, so they are available to all by placing their definitions
here.
'''
"""
global common_args
parse_comma_separated_string = lambda st: st.split(',')
parse_comma_separated_string = lambda st: st.split(",")
# A list of common arguments/flags used across cli parsers.
common_args = [
Argument(
'directories',
nargs='*',
"directories",
nargs="*",
default=[os.getcwd()],
help='Space separated list of directories to start searching '
'for tests in'),
help="Space separated list of directories to start searching "
"for tests in",
),
Argument(
'--exclude-tags',
"--exclude-tags",
action=StorePositionalTagsAction,
help='A tag comparison used to select tests.'),
help="A tag comparison used to select tests.",
),
Argument(
'--include-tags',
"--include-tags",
action=StorePositionalTagsAction,
help='A tag comparison used to select tests.'),
help="A tag comparison used to select tests.",
),
Argument(
'--isa',
action='extend',
"--isa",
action="extend",
default=[],
type=parse_comma_separated_string,
help="Only tests that are valid with one of these ISAs. "
"Comma separated."),
"Comma separated.",
),
Argument(
'--variant',
action='extend',
"--variant",
action="extend",
default=[],
type=parse_comma_separated_string,
help="Only tests that are valid with one of these binary variants"
"(e.g., opt, debug). Comma separated."),
"(e.g., opt, debug). Comma separated.",
),
Argument(
'--length',
action='extend',
"--length",
action="extend",
default=[],
type=parse_comma_separated_string,
help="Only tests that are one of these lengths. Comma separated."),
help="Only tests that are one of these lengths. Comma separated.",
),
Argument(
'--host',
action='append',
"--host",
action="append",
default=[],
help="Only tests that are meant to runnable on the selected host"),
help="Only tests that are meant to runnable on the selected host",
),
Argument(
'--uid',
action='store',
"--uid",
action="store",
default=None,
help='UID of a specific test item to run.'),
help="UID of a specific test item to run.",
),
Argument(
'--build-dir',
action='store',
help='Build directory for SCons'),
"--build-dir", action="store", help="Build directory for SCons"
),
Argument(
'--base-dir',
action='store',
"--base-dir",
action="store",
default=config._defaults.base_dir,
help='Directory to change to in order to exec scons.'),
help="Directory to change to in order to exec scons.",
),
Argument(
'-j', '--threads',
action='store',
"-j",
"--threads",
action="store",
default=1,
help='Number of threads to run SCons with.'),
help="Number of threads to run SCons with.",
),
Argument(
'-t', '--test-threads',
action='store',
"-t",
"--test-threads",
action="store",
default=1,
help='Number of threads to spawn to run concurrent tests with.'),
help="Number of threads to spawn to run concurrent tests with.",
),
Argument(
'-v',
action='count',
dest='verbose',
"-v",
action="count",
dest="verbose",
default=_StickyInt(),
help='Increase verbosity'),
help="Increase verbosity",
),
Argument(
'--config-path',
action='store',
"--config-path",
action="store",
default=os.getcwd(),
help='Path to read a testing.ini config in'
help="Path to read a testing.ini config in",
),
Argument(
'--skip-build',
action='store_true',
"--skip-build",
action="store_true",
default=False,
help='Skip the building component of SCons targets.'
help="Skip the building component of SCons targets.",
),
Argument(
'--result-path',
action='store',
help='The path to store results in.'
"--result-path",
action="store",
help="The path to store results in.",
),
Argument(
'--bin-path',
action='store',
"--bin-path",
action="store",
default=config._defaults.resource_path,
help='Path where resources are stored (downloaded if not present)'
help="Path where resources are stored (downloaded if not present)",
),
Argument(
'--resource-url',
action='store',
"--resource-url",
action="store",
default=config._defaults.resource_url,
help='The URL where the resources reside.'
help="The URL where the resources reside.",
),
]
# NOTE: There is a limitation which arises due to this format. If you have
@@ -610,7 +648,8 @@ def define_common_args(config):
# e.g. if you have a -v argument which increments verbosity level and
# a separate --verbose flag which 'store's verbosity level. the final
# one in the list will be saved.
common_args = AttrDict({arg.name:arg for arg in common_args})
common_args = AttrDict({arg.name: arg for arg in common_args})
class ArgParser(object, metaclass=abc.ABCMeta):
class ExtendAction(argparse.Action):
@@ -622,10 +661,10 @@ class ArgParser(object, metaclass=abc.ABCMeta):
def __init__(self, parser):
# Copy public methods of the parser.
for attr in dir(parser):
if not attr.startswith('_'):
if not attr.startswith("_"):
setattr(self, attr, getattr(parser, attr))
self.parser = parser
self.parser.register('action', 'extend', ArgParser.ExtendAction)
self.parser.register("action", "extend", ArgParser.ExtendAction)
self.add_argument = self.parser.add_argument
# Argument will be added to all parsers and subparsers.
@@ -633,25 +672,24 @@ class ArgParser(object, metaclass=abc.ABCMeta):
class CommandParser(ArgParser):
'''
"""
Main parser which parses command strings and uses those to direct to
a subparser.
'''
"""
def __init__(self):
parser = argparse.ArgumentParser()
super(CommandParser, self).__init__(parser)
self.subparser = self.add_subparsers(dest='command')
self.subparser = self.add_subparsers(dest="command")
class RunParser(ArgParser):
'''
"""
Parser for the \'run\' command.
'''
"""
def __init__(self, subparser):
parser = subparser.add_parser(
'run',
help='''Run Tests.'''
)
parser = subparser.add_parser("run", help="""Run Tests.""")
super(RunParser, self).__init__(parser)
@@ -672,46 +710,46 @@ class RunParser(ArgParser):
class ListParser(ArgParser):
'''
"""
Parser for the \'list\' command.
'''
"""
def __init__(self, subparser):
parser = subparser.add_parser(
'list',
help='''List and query test metadata.'''
"list", help="""List and query test metadata."""
)
super(ListParser, self).__init__(parser)
Argument(
'--suites',
action='store_true',
"--suites",
action="store_true",
default=False,
help='List all test suites.'
help="List all test suites.",
).add_to(parser)
Argument(
'--tests',
action='store_true',
"--tests",
action="store_true",
default=False,
help='List all test cases.'
help="List all test cases.",
).add_to(parser)
Argument(
'--fixtures',
action='store_true',
"--fixtures",
action="store_true",
default=False,
help='List all fixtures.'
help="List all fixtures.",
).add_to(parser)
Argument(
'--all-tags',
action='store_true',
"--all-tags",
action="store_true",
default=False,
help='List all tags.'
help="List all tags.",
).add_to(parser)
Argument(
'-q',
dest='quiet',
action='store_true',
"-q",
dest="quiet",
action="store_true",
default=False,
help='Quiet output (machine readable).'
help="Quiet output (machine readable).",
).add_to(parser)
common_args.directories.add_to(parser)
@@ -726,10 +764,7 @@ class ListParser(ArgParser):
class RerunParser(ArgParser):
def __init__(self, subparser):
parser = subparser.add_parser(
'rerun',
help='''Rerun failed tests.'''
)
parser = subparser.add_parser("rerun", help="""Rerun failed tests.""")
super(RerunParser, self).__init__(parser)
common_args.skip_build.add_to(parser)
@@ -744,6 +779,7 @@ class RerunParser(ArgParser):
common_args.length.add_to(parser)
common_args.host.add_to(parser)
config = _Config()
define_constants(config.constants)
@@ -752,14 +788,16 @@ define_constants(config.constants)
config.constants = FrozenAttrDict(config.constants.__dict__)
constants = config.constants
'''
"""
This config object is the singleton config object available throughout the
framework.
'''
"""
def initialize_config():
'''
"""
Parse the commandline arguments and setup the config varibles.
'''
"""
global config
# Setup constants and defaults

View File

@@ -28,16 +28,18 @@
import testlib.helper as helper
class SkipException(Exception):
def __init__(self, fixture, testitem):
self.msg = 'Fixture "%s" raised SkipException for "%s".' % (
fixture.name, testitem.name
fixture.name,
testitem.name,
)
super(SkipException, self).__init__(self.msg)
class Fixture(object):
'''
"""
Base Class for a test Fixture.
Fixtures are items which possibly require setup and/or tearing down after
@@ -50,7 +52,8 @@ class Fixture(object):
.. note:: In order for Fixtures to be enumerated by the test system this
class' :code:`__new__` method must be called.
'''
"""
collector = helper.InstanceCollector()
def __new__(klass, *args, **kwargs):

View File

@@ -26,11 +26,11 @@
#
# Authors: Sean Wilson
'''
"""
Handlers for the testlib Log.
'''
"""
import multiprocessing
import os
import sys
@@ -54,9 +54,10 @@ class _TestStreamManager(object):
def open_writer(self, test_result):
if test_result in self._writers:
raise ValueError('Cannot have multiple writters on a single test.')
self._writers[test_result] = _TestStreams(test_result.stdout,
test_result.stderr)
raise ValueError("Cannot have multiple writters on a single test.")
self._writers[test_result] = _TestStreams(
test_result.stdout, test_result.stderr
)
def get_writer(self, test_result):
if test_result not in self._writers:
@@ -73,89 +74,94 @@ class _TestStreamManager(object):
writer.close()
self._writers.clear()
class _TestStreams(object):
def __init__(self, stdout, stderr):
helper.mkdir_p(os.path.dirname(stdout))
helper.mkdir_p(os.path.dirname(stderr))
self.stdout = open(stdout, 'w')
self.stderr = open(stderr, 'w')
self.stdout = open(stdout, "w")
self.stderr = open(stderr, "w")
def close(self):
self.stdout.close()
self.stderr.close()
class ResultHandler(object):
'''
"""
Log handler which listens for test results and output saving data as
it is reported.
When the handler is closed it writes out test results in the python pickle
format.
'''
"""
def __init__(self, schedule, directory):
'''
"""
:param schedule: The entire schedule as a :class:`LoadedLibrary`
object.
:param directory: Directory to save test stdout/stderr and aggregate
results to.
'''
"""
self.directory = directory
self.internal_results = result.InternalLibraryResults(schedule,
directory)
self.internal_results = result.InternalLibraryResults(
schedule, directory
)
self.test_stream_manager = _TestStreamManager()
self._closed = False
self.mapping = {
log.LibraryStatus.type_id: self.handle_library_status,
log.SuiteResult.type_id: self.handle_suite_result,
log.TestResult.type_id: self.handle_test_result,
log.TestStderr.type_id: self.handle_stderr,
log.TestStdout.type_id: self.handle_stdout,
}
def handle(self, record):
if not self._closed:
self.mapping.get(record.type_id, lambda _:None)(record)
self.mapping.get(record.type_id, lambda _: None)(record)
def handle_library_status(self, record):
if record['status'] in (state.Status.Complete, state.Status.Avoided):
if record["status"] in (state.Status.Complete, state.Status.Avoided):
self.test_stream_manager.close()
def handle_suite_result(self, record):
suite_result = self.internal_results.get_suite_result(
record['metadata'].uid)
suite_result.result = record['result']
record["metadata"].uid
)
suite_result.result = record["result"]
def handle_test_result(self, record):
test_result = self._get_test_result(record)
test_result.result = record['result']
test_result.result = record["result"]
def handle_stderr(self, record):
self.test_stream_manager.get_writer(
self._get_test_result(record)
).stderr.write(record['buffer'])
).stderr.write(record["buffer"])
def handle_stdout(self, record):
self.test_stream_manager.get_writer(
self._get_test_result(record)
).stdout.write(record['buffer'])
).stdout.write(record["buffer"])
def _get_test_result(self, test_record):
return self.internal_results.get_test_result(
test_record['metadata'].uid,
test_record['metadata'].suite_uid)
test_record["metadata"].uid, test_record["metadata"].suite_uid
)
def _save(self):
#FIXME Hardcoded path name
# FIXME Hardcoded path name
result.InternalSavedResults.save(
self.internal_results,
os.path.join(self.directory, constants.pickle_filename))
os.path.join(self.directory, constants.pickle_filename),
)
result.JUnitSavedResults.save(
self.internal_results,
os.path.join(self.directory, constants.xml_filename))
os.path.join(self.directory, constants.xml_filename),
)
def close(self):
if self._closed:
@@ -164,11 +170,11 @@ class ResultHandler(object):
self._save()
def unsuccessful(self):
'''
"""
Performs an or reduce on all of the results.
Returns true if at least one test is unsuccessful, false when all tests
pass
'''
"""
for suite_result in self.internal_results:
if suite_result.unsuccessful:
return True
@@ -176,13 +182,14 @@ class ResultHandler(object):
return False
#TODO Change from a handler to an internal post processor so it can be used
# TODO Change from a handler to an internal post processor so it can be used
# to reprint results
class SummaryHandler(object):
'''
"""
A log handler which listens to the log for test results
and reports the aggregate results when closed.
'''
"""
color = terminal.get_termcap()
reset = color.Normal
colormap = {
@@ -201,24 +208,28 @@ class SummaryHandler(object):
self.results = []
def handle_library_status(self, record):
if record['status'] == state.Status.Building:
if record["status"] == state.Status.Building:
self._timer.restart()
def handle_testresult(self, record):
result = record['result'].value
if result in (state.Result.Skipped, state.Result.Failed,
state.Result.Passed, state.Result.Errored):
result = record["result"].value
if result in (
state.Result.Skipped,
state.Result.Failed,
state.Result.Passed,
state.Result.Errored,
):
self.results.append(result)
def handle(self, record):
self.mapping.get(record.type_id, lambda _:None)(record)
self.mapping.get(record.type_id, lambda _: None)(record)
def close(self):
print(self._display_summary())
def _display_summary(self):
most_severe_outcome = None
outcome_fmt = ' {count} {outcome}'
outcome_fmt = " {count} {outcome}"
strings = []
outcome_count = [0] * len(state.Result.enums)
@@ -230,20 +241,27 @@ class SummaryHandler(object):
outcome = getattr(state.Result, outcome)
count = outcome_count[outcome]
if count:
strings.append(outcome_fmt.format(count=count,
outcome=state.Result.enums[outcome]))
strings.append(
outcome_fmt.format(
count=count, outcome=state.Result.enums[outcome]
)
)
most_severe_outcome = outcome
string = ','.join(strings)
string = ",".join(strings)
if most_severe_outcome is None:
string = ' No testing done'
string = " No testing done"
most_severe_outcome = state.Result.Passed
else:
string = ' Results:' + string + ' in {:.2} seconds '.format(
self._timer.active_time())
string += ' '
string = (
" Results:"
+ string
+ " in {:.2} seconds ".format(self._timer.active_time())
)
string += " "
return terminal.insert_separator(
string,
color=self.colormap[most_severe_outcome] + self.color.Bold)
string, color=self.colormap[most_severe_outcome] + self.color.Bold
)
class TerminalHandler(object):
color = terminal.get_termcap()
@@ -268,75 +286,85 @@ class TerminalHandler(object):
}
def _display_outcome(self, name, outcome, reason=None):
print(self.color.Bold
print(
self.color.Bold
+ SummaryHandler.colormap[outcome]
+ name
+ ' '
+ " "
+ state.Result.enums[outcome]
+ SummaryHandler.reset)
+ SummaryHandler.reset
)
if reason is not None:
log.test_log.info('')
log.test_log.info('Reason:')
log.test_log.info("")
log.test_log.info("Reason:")
log.test_log.info(reason)
log.test_log.info(terminal.separator('-'))
log.test_log.info(terminal.separator("-"))
def handle_teststatus(self, record):
if record['status'] == state.Status.Running:
log.test_log.debug('Starting Test Case: %s' %\
record['metadata'].name)
if record["status"] == state.Status.Running:
log.test_log.debug(
"Starting Test Case: %s" % record["metadata"].name
)
def handle_testresult(self, record):
self._display_outcome(
'Test: %s' % record['metadata'].name,
record['result'].value)
"Test: %s" % record["metadata"].name, record["result"].value
)
def handle_suitestatus(self, record):
if record['status'] == state.Status.Running:
log.test_log.debug('Starting Test Suite: %s ' %\
record['metadata'].name)
if record["status"] == state.Status.Running:
log.test_log.debug(
"Starting Test Suite: %s " % record["metadata"].name
)
def handle_stderr(self, record):
if self.stream:
print(record.data['buffer'], file=sys.stderr, end='')
print(record.data["buffer"], file=sys.stderr, end="")
def handle_stdout(self, record):
if self.stream:
print(record.data['buffer'], file=sys.stdout, end='')
print(record.data["buffer"], file=sys.stdout, end="")
def handle_testmessage(self, record):
if self.stream:
print(self._colorize(record['message'], record['level']))
print(self._colorize(record["message"], record["level"]))
def handle_librarymessage(self, record):
if not self.machine_only or record.data.get('machine_readable', False):
print(self._colorize(record['message'], record['level'],
record['bold']))
if not self.machine_only or record.data.get("machine_readable", False):
print(
self._colorize(
record["message"], record["level"], record["bold"]
)
)
def _colorize(self, message, level, bold=False):
return '%s%s%s%s' % (
self.color.Bold if bold else '',
self.verbosity_mapping.get(level, ''),
return "%s%s%s%s" % (
self.color.Bold if bold else "",
self.verbosity_mapping.get(level, ""),
message,
self.default)
self.default,
)
def handle(self, record):
if record.data.get('level', self.verbosity) > self.verbosity:
if record.data.get("level", self.verbosity) > self.verbosity:
return
self.mapping.get(record.type_id, lambda _:None)(record)
self.mapping.get(record.type_id, lambda _: None)(record)
def close(self):
pass
class MultiprocessingHandlerWrapper(object):
'''
"""
A handler class which forwards log records to subhandlers, enabling
logging across multiprocessing python processes.
The 'parent' side of the handler should execute either
:func:`async_process` or :func:`process` to forward
log records to subhandlers.
'''
"""
def __init__(self, *subhandlers):
# Create thread to spin handing recipt of messages
# Create queue to push onto
@@ -350,7 +378,7 @@ class MultiprocessingHandlerWrapper(object):
def add_handler(self, handler):
self._handler_lock.acquire()
self._subhandlers = (handler, ) + self._subhandlers
self._subhandlers = (handler,) + self._subhandlers
self._handler_lock.release()
def _with_handlers(self, callback):
@@ -405,7 +433,7 @@ class MultiprocessingHandlerWrapper(object):
self.queue.put(record)
def _close(self):
if hasattr(self, 'thread'):
if hasattr(self, "thread"):
self.thread.join()
_wrap(self._drain)
self._with_handlers(lambda handler: _wrap(handler.close))
@@ -415,9 +443,9 @@ class MultiprocessingHandlerWrapper(object):
# This sleep adds some time for the sender threads on this process to
# finish pickling the object and complete shutdown after the queue is
# closed.
time.sleep(.2)
time.sleep(0.2)
self.queue.close()
time.sleep(.2)
time.sleep(0.2)
def close(self):
if not self._shutdown.is_set():

View File

@@ -38,9 +38,9 @@
#
# Authors: Sean Wilson
'''
"""
Helper classes for writing tests with this test library.
'''
"""
from collections import namedtuple
from collections.abc import MutableSet
@@ -55,6 +55,7 @@ import tempfile
import threading
import time
class TimedWaitPID(object):
"""Utility to monkey-patch os.waitpid() with os.wait4().
@@ -69,7 +70,8 @@ class TimedWaitPID(object):
it is read.
"""
TimeRecord = namedtuple( "_TimeRecord", "user_time system_time" )
TimeRecord = namedtuple("_TimeRecord", "user_time system_time")
class Wrapper(object):
def __init__(self):
@@ -79,11 +81,8 @@ class TimedWaitPID(object):
def __call__(self, pid, options):
pid, status, resource_usage = os.wait4(pid, options)
with self._access_lock:
self._time_for_pid[pid] = (
TimedWaitPID.TimeRecord(
resource_usage.ru_utime,
resource_usage.ru_stime
)
self._time_for_pid[pid] = TimedWaitPID.TimeRecord(
resource_usage.ru_utime, resource_usage.ru_stime
)
return (pid, status)
@@ -108,14 +107,14 @@ class TimedWaitPID(object):
with TimedWaitPID._wrapper_lock:
if TimedWaitPID._wrapper is None:
TimedWaitPID._wrapper = TimedWaitPID.Wrapper()
if TimedWaitPID._original_os_waitpid is None :
if TimedWaitPID._original_os_waitpid is None:
TimedWaitPID._original_os_waitpid = os.waitpid
os.waitpid = TimedWaitPID._wrapper
@staticmethod
def restore():
with TimedWaitPID._wrapper_lock:
if TimedWaitPID._original_os_waitpid is not None :
if TimedWaitPID._original_os_waitpid is not None:
os.waitpid = TimedWaitPID._original_os_waitpid
TimedWaitPID._original_os_waitpid = None
@@ -129,12 +128,13 @@ class TimedWaitPID(object):
with TimedWaitPID._wrapper_lock:
return TimedWaitPID._wrapper.get_time_for_pid(pid)
# Patch os.waitpid()
TimedWaitPID.install()
#TODO Tear out duplicate logic from the sandbox IOManager
# TODO Tear out duplicate logic from the sandbox IOManager
def log_call(logger, command, time, *popenargs, **kwargs):
'''
"""
Calls the given process and automatically logs the command and output.
If stdout or stderr are provided output will also be piped into those
@@ -145,7 +145,7 @@ def log_call(logger, command, time, *popenargs, **kwargs):
:params stderr: Iterable of items to write to as we read from the
subprocess.
'''
"""
if isinstance(command, str):
cmdstr = command
else:
@@ -159,33 +159,35 @@ def log_call(logger, command, time, *popenargs, **kwargs):
raise e
logger_callback = logger.trace
logger.trace('Logging call to command: %s' % cmdstr)
logger.trace("Logging call to command: %s" % cmdstr)
stdout_redirect = kwargs.get('stdout', tuple())
stderr_redirect = kwargs.get('stderr', tuple())
stdout_redirect = kwargs.get("stdout", tuple())
stderr_redirect = kwargs.get("stderr", tuple())
if hasattr(stdout_redirect, 'write'):
if hasattr(stdout_redirect, "write"):
stdout_redirect = (stdout_redirect,)
if hasattr(stderr_redirect, 'write'):
if hasattr(stderr_redirect, "write"):
stderr_redirect = (stderr_redirect,)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE
p = subprocess.Popen(command, *popenargs, **kwargs)
def log_output(log_callback, pipe, redirects=tuple()):
# Read iteractively, don't allow input to fill the pipe.
for line in iter(pipe.readline, b''):
for line in iter(pipe.readline, b""):
line = line.decode("utf-8")
for r in redirects:
r.write(line)
log_callback(line.rstrip())
stdout_thread = threading.Thread(target=log_output,
args=(logger_callback, p.stdout, stdout_redirect))
stdout_thread = threading.Thread(
target=log_output, args=(logger_callback, p.stdout, stdout_redirect)
)
stdout_thread.setDaemon(True)
stderr_thread = threading.Thread(target=log_output,
args=(logger_callback, p.stderr, stderr_redirect))
stderr_thread = threading.Thread(
target=log_output, args=(logger_callback, p.stderr, stderr_redirect)
)
stderr_thread.setDaemon(True)
stdout_thread.start()
@@ -197,25 +199,26 @@ def log_call(logger, command, time, *popenargs, **kwargs):
if time is not None and TimedWaitPID.has_time_for_pid(p.pid):
resource_usage = TimedWaitPID.get_time_for_pid(p.pid)
time['user_time'] = resource_usage.user_time
time['system_time'] = resource_usage.system_time
time["user_time"] = resource_usage.user_time
time["system_time"] = resource_usage.system_time
# Return the return exit code of the process.
if retval != 0:
raise subprocess.CalledProcessError(retval, cmdstr)
# lru_cache stuff (Introduced in python 3.2+)
# Renamed and modified to cacheresult
class _HashedSeq(list):
'''
"""
This class guarantees that hash() will be called no more than once per
element. This is important because the cacheresult() will hash the key
multiple times on a cache miss.
.. note:: From cpython 3.7
'''
"""
__slots__ = 'hashvalue'
__slots__ = "hashvalue"
def __init__(self, tup, hash=hash):
self[:] = tup
@@ -224,11 +227,18 @@ class _HashedSeq(list):
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark = (object(),),
fasttypes = {int, str, frozenset, type(None)},
tuple=tuple, type=type, len=len):
'''
def _make_key(
args,
kwds,
typed,
kwd_mark=(object(),),
fasttypes={int, str, frozenset, type(None)},
tuple=tuple,
type=type,
len=len,
):
"""
Make a cache key from optionally typed positional and keyword arguments.
The key is constructed in a way that is flat as possible rather than as
a nested structure that would take more memory. If there is only a single
@@ -237,7 +247,7 @@ def _make_key(args, kwds, typed,
lookup speed.
.. note:: From cpython 3.7
'''
"""
key = args
if kwds:
key += kwd_mark
@@ -253,15 +263,16 @@ def _make_key(args, kwds, typed,
def cacheresult(function, typed=False):
'''
"""
:param typed: If typed is True, arguments of different types will be
cached separately. I.e. f(3.0) and f(3) will be treated as distinct
calls with distinct results.
.. note:: From cpython 3.7
'''
"""
sentinel = object() # unique object used to signal cache misses
cache = {}
def wrapper(*args, **kwds):
# Simple caching without ordering or size limit
key = _make_key(args, kwds, typed)
@@ -271,14 +282,16 @@ def cacheresult(function, typed=False):
result = function(*args, **kwds)
cache[key] = result
return result
return wrapper
class OrderedSet(MutableSet):
'''
"""
Maintain ordering of insertion in items to the set with quick iteration.
http://code.activestate.com/recipes/576694/
'''
"""
def __init__(self, iterable=None):
self.end = end = []
@@ -325,35 +338,38 @@ class OrderedSet(MutableSet):
def pop(self, last=True):
if not self:
raise KeyError('set is empty')
raise KeyError("set is empty")
key = self.end[1][0] if last else self.end[2][0]
self.discard(key)
return key
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self))
return "%s()" % (self.__class__.__name__,)
return "%s(%r)" % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, OrderedSet):
return len(self) == len(other) and list(self) == list(other)
return set(self) == set(other)
def absdirpath(path):
'''
"""
Return the directory component of the absolute path of the given path.
'''
"""
return os.path.dirname(os.path.abspath(path))
joinpath = os.path.join
def mkdir_p(path):
'''
"""
Same thing as mkdir -p
https://stackoverflow.com/a/600612
'''
"""
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
@@ -364,12 +380,14 @@ def mkdir_p(path):
class FrozenSetException(Exception):
'''Signals one tried to set a value in a 'frozen' object.'''
"""Signals one tried to set a value in a 'frozen' object."""
pass
class AttrDict(object):
'''Object which exposes its own internal dictionary through attributes.'''
"""Object which exposes its own internal dictionary through attributes."""
def __init__(self, dict_={}):
self.update(dict_)
@@ -377,7 +395,7 @@ class AttrDict(object):
dict_ = self.__dict__
if attr in dict_:
return dict_[attr]
raise AttributeError('Could not find %s attribute' % attr)
raise AttributeError("Could not find %s attribute" % attr)
def __setattr__(self, attr, val):
self.__dict__[attr] = val
@@ -393,8 +411,10 @@ class AttrDict(object):
class FrozenAttrDict(AttrDict):
'''An AttrDict whose attributes cannot be modified directly.'''
"""An AttrDict whose attributes cannot be modified directly."""
__initialized = False
def __init__(self, dict_={}):
super(FrozenAttrDict, self).__init__(dict_)
self.__initialized = True
@@ -402,20 +422,22 @@ class FrozenAttrDict(AttrDict):
def __setattr__(self, attr, val):
if self.__initialized:
raise FrozenSetException(
'Cannot modify an attribute in a FozenAttrDict')
"Cannot modify an attribute in a FozenAttrDict"
)
else:
super(FrozenAttrDict, self).__setattr__(attr, val)
def update(self, items):
if self.__initialized:
raise FrozenSetException(
'Cannot modify an attribute in a FozenAttrDict')
"Cannot modify an attribute in a FozenAttrDict"
)
else:
super(FrozenAttrDict, self).update(items)
class InstanceCollector(object):
'''
"""
A class used to simplify collecting of Classes.
>> instance_list = collector.create()
@@ -423,7 +445,8 @@ class InstanceCollector(object):
>> # instance_list contains all instances created since
>> # collector.create was called
>> collector.remove(instance_list)
'''
"""
def __init__(self):
self.collectors = []
@@ -441,14 +464,15 @@ class InstanceCollector(object):
def append_dictlist(dict_, key, value):
'''
"""
Append the `value` to a list associated with `key` in `dict_`.
If `key` doesn't exist, create a new list in the `dict_` with value in it.
'''
"""
list_ = dict_.get(key, [])
list_.append(value)
dict_[key] = list_
def _filter_file(fname, filters):
with open(fname, "r") as file_:
for line in file_:
@@ -460,19 +484,19 @@ def _filter_file(fname, filters):
def _copy_file_keep_perms(source, target):
'''Copy a file keeping the original permisions of the target.'''
"""Copy a file keeping the original permisions of the target."""
st = os.stat(target)
shutil.copy2(source, target)
os.chown(target, st[stat.ST_UID], st[stat.ST_GID])
def _filter_file_inplace(fname, dir, filters):
'''
"""
Filter the given file writing filtered lines out to a temporary file, then
copy that tempfile back into the original file.
'''
"""
(_, tfname) = tempfile.mkstemp(dir=dir, text=True)
with open(tfname, 'w') as tempfile_:
with open(tfname, "w") as tempfile_:
for line in _filter_file(fname, filters):
tempfile_.write(line)
@@ -481,39 +505,45 @@ def _filter_file_inplace(fname, dir, filters):
def diff_out_file(ref_file, out_file, logger, ignore_regexes=tuple()):
'''Diff two files returning the diff as a string.'''
"""Diff two files returning the diff as a string."""
if not os.path.exists(ref_file):
raise OSError("%s doesn't exist in reference directory"\
% ref_file)
raise OSError("%s doesn't exist in reference directory" % ref_file)
if not os.path.exists(out_file):
raise OSError("%s doesn't exist in output directory" % out_file)
_filter_file_inplace(out_file, os.path.dirname(out_file), ignore_regexes)
_filter_file_inplace(ref_file, os.path.dirname(out_file), ignore_regexes)
#try :
# try :
(_, tfname) = tempfile.mkstemp(dir=os.path.dirname(out_file), text=True)
with open(tfname, 'r+') as tempfile_:
with open(tfname, "r+") as tempfile_:
try:
log_call(logger, ['diff', out_file, ref_file],
time=None, stdout=tempfile_)
log_call(
logger,
["diff", out_file, ref_file],
time=None,
stdout=tempfile_,
)
except OSError:
# Likely signals that diff does not exist on this system. fallback
# to difflib
with open(out_file, 'r') as outf, open(ref_file, 'r') as reff:
diff = difflib.unified_diff(iter(reff.readline, ''),
iter(outf.readline, ''),
with open(out_file, "r") as outf, open(ref_file, "r") as reff:
diff = difflib.unified_diff(
iter(reff.readline, ""),
iter(outf.readline, ""),
fromfile=ref_file,
tofile=out_file)
return ''.join(diff)
tofile=out_file,
)
return "".join(diff)
except subprocess.CalledProcessError:
tempfile_.seek(0)
return ''.join(tempfile_.readlines())
return "".join(tempfile_.readlines())
else:
return None
class Timer():
class Timer:
def __init__(self):
self.restart()

View File

@@ -26,7 +26,7 @@
#
# Authors: Sean Wilson
'''
"""
Contains the :class:`Loader` which is responsible for discovering and loading
tests.
@@ -63,7 +63,7 @@ a :class:`TestSuite` by the test writer will be placed into
a :class:`TestSuite` named after the module.
.. seealso:: :func:`load_file`
'''
"""
import os
import re
@@ -77,44 +77,52 @@ import testlib.fixture as fixture_mod
import testlib.wrappers as wrappers
import testlib.uid as uid
class DuplicateTestItemException(Exception):
'''
"""
Exception indicates multiple test items with the same UID
were discovered.
'''
"""
pass
# Match filenames that either begin or end with 'test' or tests and use
# - or _ to separate additional name components.
default_filepath_regex = re.compile(
r'(((.+[_])?tests?)|(tests?([-_].+)?))\.py$')
r"(((.+[_])?tests?)|(tests?([-_].+)?))\.py$"
)
def default_filepath_filter(filepath):
'''The default filter applied to filepaths to marks as test sources.'''
"""The default filter applied to filepaths to marks as test sources."""
filepath = os.path.basename(filepath)
if default_filepath_regex.match(filepath):
# Make sure doesn't start with .
return not filepath.startswith('.')
return not filepath.startswith(".")
return False
def path_as_modulename(filepath):
'''Return the given filepath as a module name.'''
"""Return the given filepath as a module name."""
# Remove the file extention (.py)
return os.path.splitext(os.path.basename(filepath))[0]
def path_as_suitename(filepath):
return os.path.split(os.path.dirname(os.path.abspath((filepath))))[-1]
def _assert_files_in_same_dir(files):
if __debug__:
if files:
directory = os.path.dirname(files[0])
for f in files:
assert(os.path.dirname(f) == directory)
assert os.path.dirname(f) == directory
class Loader(object):
'''
"""
Class for discovering tests.
Discovered :class:`TestCase` and :class:`TestSuite` objects are wrapped by
@@ -135,7 +143,8 @@ class Loader(object):
.. warn:: This class is extremely thread-unsafe.
It modifies the sys path and global config.
Use with care.
'''
"""
def __init__(self):
self.suites = []
self.suite_uids = {}
@@ -153,16 +162,15 @@ class Loader(object):
for file_ in files:
self.load_file(file_)
return wrappers.LoadedLibrary(
[self.suite_uids[id_] for id_ in uids])
return wrappers.LoadedLibrary([self.suite_uids[id_] for id_ in uids])
def _verify_no_duplicate_suites(self, new_suites):
new_suite_uids = self.suite_uids.copy()
for suite in new_suites:
if suite.uid in new_suite_uids:
raise DuplicateTestItemException(
"More than one suite with UID '%s' was defined" %\
suite.uid)
"More than one suite with UID '%s' was defined" % suite.uid
)
new_suite_uids[suite.uid] = suite
def _verify_no_duplicate_tests_in_suites(self, new_suites):
@@ -172,15 +180,15 @@ class Loader(object):
if test.uid in test_uids:
raise DuplicateTestItemException(
"More than one test with UID '%s' was defined"
" in suite '%s'"
% (test.uid, suite.uid))
" in suite '%s'" % (test.uid, suite.uid)
)
test_uids.add(test.uid)
def load_root(self, root):
'''
"""
Load files from the given root directory which match
`self.filepath_filter`.
'''
"""
for directory in self._discover_files(root):
directory = list(directory)
if directory:
@@ -193,17 +201,18 @@ class Loader(object):
if path in self._files:
if not self._files[path]:
raise Exception('Attempted to load a file which already'
' failed to load')
raise Exception(
"Attempted to load a file which already" " failed to load"
)
else:
log.test_log.debug('Tried to reload: %s' % path)
log.test_log.debug("Tried to reload: %s" % path)
return
# Create a custom dictionary for the loaded module.
newdict = {
'__builtins__':__builtins__,
'__name__': path_as_modulename(path),
'__file__': path,
"__builtins__": __builtins__,
"__name__": path_as_modulename(path),
"__file__": path,
}
# Add the file's containing directory to the system path. So it can do
@@ -223,8 +232,8 @@ class Loader(object):
log.test_log.debug(traceback.format_exc())
log.test_log.warn(
'Exception thrown while loading "%s"\n'
'Ignoring all tests in this file.'
% (path))
"Ignoring all tests in this file." % (path)
)
# Clean up
sys.path[:] = old_path
os.chdir(cwd)
@@ -247,27 +256,34 @@ class Loader(object):
# tests.
# NOTE: This is automatically collected (we still have the
# collector active.)
suite_mod.TestSuite(tests=orphan_tests,
name=path_as_suitename(path))
suite_mod.TestSuite(
tests=orphan_tests, name=path_as_suitename(path)
)
try:
loaded_suites = [wrappers.LoadedSuite(suite, path)
for suite in new_suites]
loaded_suites = [
wrappers.LoadedSuite(suite, path) for suite in new_suites
]
self._verify_no_duplicate_suites(loaded_suites)
self._verify_no_duplicate_tests_in_suites(loaded_suites)
except Exception as e:
log.test_log.warn('%s\n'
log.test_log.warn(
"%s\n"
'Exception thrown while loading "%s"\n'
'Ignoring all tests in this file.'
% (traceback.format_exc(), path))
"Ignoring all tests in this file."
% (traceback.format_exc(), path)
)
else:
log.test_log.info('Discovered %d tests and %d suites in %s'
'' % (len(new_tests), len(loaded_suites), path))
log.test_log.info(
"Discovered %d tests and %d suites in %s"
"" % (len(new_tests), len(loaded_suites), path)
)
self.suites.extend(loaded_suites)
self.suite_uids.update({suite.uid: suite
for suite in loaded_suites})
self.suite_uids.update(
{suite.uid: suite for suite in loaded_suites}
)
# Clean up
sys.path[:] = old_path
os.chdir(cwd)
@@ -276,18 +292,19 @@ class Loader(object):
fixture_mod.Fixture.collector.remove(new_fixtures)
def _discover_files(self, root):
'''
"""
Recurse down from the given root directory returning a list of
directories which contain a list of files matching
`self.filepath_filter`.
'''
"""
# Will probably want to order this traversal.
for root, dirnames, filenames in os.walk(root):
dirnames.sort()
if filenames:
filenames.sort()
filepaths = [os.path.join(root, filename) \
for filename in filenames]
filepaths = [
os.path.join(root, filename) for filename in filenames
]
filepaths = filter(self.filepath_filter, filepaths)
if filepaths:
yield filepaths

View File

@@ -26,13 +26,14 @@
#
# Authors: Sean Wilson
'''
"""
This module supplies the global `test_log` object which all testing
results and messages are reported through.
'''
"""
import testlib.wrappers as wrappers
class LogLevel():
class LogLevel:
Fatal = 0
Error = 1
Warn = 2
@@ -42,33 +43,36 @@ class LogLevel():
class RecordTypeCounterMetaclass(type):
'''
"""
Record type metaclass.
Adds a static integer value in addition to typeinfo so identifiers
are common across processes, networks and module reloads.
'''
"""
counter = 0
def __init__(cls, name, bases, dct):
cls.type_id = RecordTypeCounterMetaclass.counter
RecordTypeCounterMetaclass.counter += 1
class Record(object, metaclass=RecordTypeCounterMetaclass):
'''
"""
A generic object that is passed to the :class:`Log` and its handlers.
..note: Although not statically enforced, all items in the record should be
be pickleable. This enables logging accross multiple processes.
'''
"""
def __init__(self, **data):
self.data = data
def __getitem__(self, item):
if item not in self.data:
raise KeyError('%s not in record %s' %\
(item, self.__class__.__name__))
raise KeyError(
"%s not in record %s" % (item, self.__class__.__name__)
)
return self.data[item]
def __str__(self):
@@ -78,30 +82,52 @@ class Record(object, metaclass=RecordTypeCounterMetaclass):
class StatusRecord(Record):
def __init__(self, obj, status):
Record.__init__(self, metadata=obj.metadata, status=status)
class ResultRecord(Record):
def __init__(self, obj, result):
Record.__init__(self, metadata=obj.metadata, result=result)
#TODO Refactor this shit... Not ideal. Should just specify attributes.
# TODO Refactor this shit... Not ideal. Should just specify attributes.
class TestStatus(StatusRecord):
pass
class SuiteStatus(StatusRecord):
pass
class LibraryStatus(StatusRecord):
pass
class TestResult(ResultRecord):
pass
class SuiteResult(ResultRecord):
pass
class LibraryResult(ResultRecord):
pass
# Test Output Types
class TestStderr(Record):
pass
class TestStdout(Record):
pass
# Message (Raw String) Types
class TestMessage(Record):
pass
class LibraryMessage(Record):
pass
@@ -136,19 +162,25 @@ class Log(object):
if not self._opened:
self.finish_init()
if self._closed:
raise Exception('The log has been closed'
' and is no longer available.')
raise Exception(
"The log has been closed" " and is no longer available."
)
for handler in self.handlers:
handler.handle(record)
def message(self, message, level=LogLevel.Info, bold=False, **metadata):
if self.test:
record = TestMessage(message=message, level=level,
test_uid=self.test.uid, suite_uid=self.test.parent_suite.uid)
record = TestMessage(
message=message,
level=level,
test_uid=self.test.uid,
suite_uid=self.test.parent_suite.uid,
)
else:
record = LibraryMessage(message=message, level=level,
bold=bold, **metadata)
record = LibraryMessage(
message=message, level=level, bold=bold, **metadata
)
self.log(record)
@@ -168,20 +200,19 @@ class Log(object):
self.message(message, LogLevel.Trace)
def status_update(self, obj, status):
self.log(
self._status_typemap[obj.__class__.__name__](obj, status))
self.log(self._status_typemap[obj.__class__.__name__](obj, status))
def result_update(self, obj, result):
self.log(
self._result_typemap[obj.__class__.__name__](obj, result))
self.log(self._result_typemap[obj.__class__.__name__](obj, result))
def add_handler(self, handler):
if self._opened:
raise Exception('Unable to add a handler once the log is open.')
raise Exception("Unable to add a handler once the log is open.")
self.handlers.append(handler)
def close_handler(self, handler):
handler.close()
self.handlers.remove(handler)
test_log = Log()

View File

@@ -39,21 +39,26 @@ import testlib.runner as runner
import testlib.terminal as terminal
import testlib.uid as uid
def entry_message():
log.test_log.message("Running the new gem5 testing script.")
log.test_log.message("For more information see TESTING.md.")
log.test_log.message("To see details as the testing scripts are"
log.test_log.message(
"To see details as the testing scripts are"
" running, use the option"
" -v, -vv, or -vvv")
" -v, -vv, or -vvv"
)
class RunLogHandler():
class RunLogHandler:
def __init__(self):
term_handler = handlers.TerminalHandler(
verbosity=configuration.config.verbose+log.LogLevel.Info
verbosity=configuration.config.verbose + log.LogLevel.Info
)
summary_handler = handlers.SummaryHandler()
self.mp_handler = handlers.MultiprocessingHandlerWrapper(
summary_handler, term_handler)
summary_handler, term_handler
)
self.mp_handler.async_process()
log.test_log.add_handler(self.mp_handler)
entry_message()
@@ -61,7 +66,8 @@ class RunLogHandler():
def schedule_finalized(self, test_schedule):
# Create the result handler object.
self.result_handler = handlers.ResultHandler(
test_schedule, configuration.config.result_path)
test_schedule, configuration.config.result_path
)
self.mp_handler.add_handler(self.result_handler)
def finish_testing(self):
@@ -78,35 +84,43 @@ class RunLogHandler():
self.mp_handler.close()
def unsuccessful(self):
'''
"""
Performs an or reduce on all of the results.
Returns true if at least one test is unsuccessful, false when all tests
pass
'''
"""
return self.result_handler.unsuccessful()
def get_config_tags():
return getattr(configuration.config,
configuration.StorePositionalTagsAction.position_kword)
return getattr(
configuration.config,
configuration.StorePositionalTagsAction.position_kword,
)
def filter_with_config_tags(loaded_library):
tags = get_config_tags()
final_tags = []
regex_fmt = '^%s$'
regex_fmt = "^%s$"
cfg = configuration.config
def _append_inc_tag_filter(name):
if hasattr(cfg, name):
tag_opts = getattr(cfg, name)
for tag in tag_opts:
final_tags.append(configuration.TagRegex(True, regex_fmt % tag))
final_tags.append(
configuration.TagRegex(True, regex_fmt % tag)
)
def _append_rem_tag_filter(name):
if hasattr(cfg, name):
tag_opts = getattr(cfg, name)
for tag in cfg.constants.supported_tags[name]:
if tag not in tag_opts:
final_tags.append(configuration.TagRegex(False, regex_fmt % tag))
final_tags.append(
configuration.TagRegex(False, regex_fmt % tag)
)
# Append additional tags for the isa, length, and variant options.
# They apply last (they take priority)
@@ -114,7 +128,7 @@ def filter_with_config_tags(loaded_library):
cfg.constants.isa_tag_type,
cfg.constants.length_tag_type,
cfg.constants.host_isa_tag_type,
cfg.constants.variant_tag_type
cfg.constants.variant_tag_type,
)
for tagname in special_tags:
@@ -126,15 +140,15 @@ def filter_with_config_tags(loaded_library):
tags = tuple()
filters = list(itertools.chain(final_tags, tags))
string = 'Filtering suites with tags as follows:\n'
filter_string = '\t\n'.join((str(f) for f in filters))
string = "Filtering suites with tags as follows:\n"
filter_string = "\t\n".join((str(f) for f in filters))
log.test_log.trace(string + filter_string)
return filter_with_tags(loaded_library, filters)
def filter_with_tags(loaded_library, filters):
'''
"""
Filter logic supports two filter types:
--include-tags <regex>
--exclude-tags <regex>
@@ -168,7 +182,7 @@ def filter_with_tags(loaded_library, filters):
set() # Removed all suites which have tags
# Process --include-tags "X86"
set(suite_X86)
'''
"""
if not filters:
return
@@ -182,6 +196,7 @@ def filter_with_tags(loaded_library, filters):
def exclude(excludes):
return suites - excludes
def include(includes):
return suites | includes
@@ -189,32 +204,39 @@ def filter_with_tags(loaded_library, filters):
matched_tags = (tag for tag in tags if tag_regex.regex.search(tag))
for tag in matched_tags:
matched_suites = set(query_runner.suites_with_tag(tag))
suites = include(matched_suites) if tag_regex.include \
suites = (
include(matched_suites)
if tag_regex.include
else exclude(matched_suites)
)
# Set the library's suites to only those which where accepted by our filter
loaded_library.suites = [suite for suite in loaded_library.suites
if suite in suites]
loaded_library.suites = [
suite for suite in loaded_library.suites if suite in suites
]
# TODO Add results command for listing previous results.
def load_tests():
'''
"""
Create a TestLoader and load tests for the directory given by the config.
'''
"""
testloader = loader_mod.Loader()
log.test_log.message(terminal.separator())
log.test_log.message('Loading Tests', bold=True)
log.test_log.message("Loading Tests", bold=True)
for root in configuration.config.directories:
testloader.load_root(root)
return testloader
def do_list():
term_handler = handlers.TerminalHandler(
verbosity=configuration.config.verbose+log.LogLevel.Info,
machine_only=configuration.config.quiet
verbosity=configuration.config.verbose + log.LogLevel.Info,
machine_only=configuration.config.quiet,
)
log.test_log.add_handler(term_handler)
@@ -238,8 +260,9 @@ def do_list():
return 0
def run_schedule(test_schedule, log_handler):
'''
"""
Test Phases
-----------
* Test Collection
@@ -253,15 +276,18 @@ def run_schedule(test_schedule, log_handler):
* Test Fixture Teardown
* Suite Fixture Teardown
* Global Fixture Teardown
'''
"""
log_handler.schedule_finalized(test_schedule)
log.test_log.message(terminal.separator())
log.test_log.message('Running Tests from {} suites'
.format(len(test_schedule.suites)), bold=True)
log.test_log.message("Results will be stored in {}".format(
configuration.config.result_path))
log.test_log.message(
"Running Tests from {} suites".format(len(test_schedule.suites)),
bold=True,
)
log.test_log.message(
"Results will be stored in {}".format(configuration.config.result_path)
)
log.test_log.message(terminal.separator())
# Build global fixtures and exectute scheduled test suites.
@@ -278,16 +304,19 @@ def run_schedule(test_schedule, log_handler):
return 1 if failed else 0
def do_run():
# Initialize early parts of the log.
with RunLogHandler() as log_handler:
if configuration.config.uid:
uid_ = uid.UID.from_uid(configuration.config.uid)
if isinstance(uid_, uid.TestUID):
log.test_log.error('Unable to run a standalone test.\n'
'Gem5 expects test suites to be the smallest unit '
' of test.\n\n'
'Pass a SuiteUID instead.')
log.test_log.error(
"Unable to run a standalone test.\n"
"Gem5 expects test suites to be the smallest unit "
" of test.\n\n"
"Pass a SuiteUID instead."
)
return
test_schedule = loader_mod.Loader().load_schedule_for_suites(uid_)
if get_config_tags():
@@ -302,13 +331,17 @@ def do_run():
# Execute the tests
return run_schedule(test_schedule, log_handler)
def do_rerun():
# Init early parts of log
with RunLogHandler() as log_handler:
# Load previous results
results = result.InternalSavedResults.load(
os.path.join(configuration.config.result_path,
configuration.constants.pickle_filename))
os.path.join(
configuration.config.result_path,
configuration.constants.pickle_filename,
)
)
rerun_suites = (suite.uid for suite in results if suite.unsuccessful)
@@ -319,16 +352,17 @@ def do_rerun():
# Execute the tests
return run_schedule(test_schedule, log_handler)
def main():
'''
"""
Main entrypoint for the testlib test library.
Returns 0 on success and 1 otherwise so it can be used as a return code
for scripts.
'''
"""
configuration.initialize_config()
# 'do' the given command.
result = globals()['do_'+configuration.config.command]()
result = globals()["do_" + configuration.config.command]()
log.test_log.close()
return result

View File

@@ -49,7 +49,7 @@ class QueryRunner(object):
def list_tests(self):
log.test_log.message(terminal.separator())
log.test_log.message('Listing all Test Cases.', bold=True)
log.test_log.message("Listing all Test Cases.", bold=True)
log.test_log.message(terminal.separator())
for suite in self.schedule:
for test in suite:
@@ -57,14 +57,14 @@ class QueryRunner(object):
def list_suites(self):
log.test_log.message(terminal.separator())
log.test_log.message('Listing all Test Suites.', bold=True)
log.test_log.message("Listing all Test Suites.", bold=True)
log.test_log.message(terminal.separator())
for suite in self.suites():
log.test_log.message(suite.uid, machine_readable=True)
def list_tags(self):
log.test_log.message(terminal.separator())
log.test_log.message('Listing all Test Tags.', bold=True)
log.test_log.message("Listing all Test Tags.", bold=True)
log.test_log.message(terminal.separator())
for tag in self.tags():

View File

@@ -46,6 +46,7 @@ from testlib.configuration import config
import testlib.helper as helper
import testlib.state as state
def _create_uid_index(iterable):
index = {}
for item in iterable:
@@ -58,12 +59,15 @@ class _CommonMetadataMixin:
@property
def name(self):
return self._metadata.name
@property
def uid(self):
return self._metadata.uid
@property
def result(self):
return self._metadata.result
@result.setter
def result(self, result):
self._metadata.result = result
@@ -83,12 +87,10 @@ class InternalTestResult(_CommonMetadataMixin):
self.suite = suite
self.stderr = os.path.join(
InternalSavedResults.output_path(self.uid, suite.uid),
'stderr'
InternalSavedResults.output_path(self.uid, suite.uid), "stderr"
)
self.stdout = os.path.join(
InternalSavedResults.output_path(self.uid, suite.uid),
'stdout'
InternalSavedResults.output_path(self.uid, suite.uid), "stdout"
)
@@ -99,8 +101,9 @@ class InternalSuiteResult(_CommonMetadataMixin):
self._wrap_tests(obj)
def _wrap_tests(self, obj):
self._tests = [InternalTestResult(test, self, self.directory)
for test in obj]
self._tests = [
InternalTestResult(test, self, self.directory) for test in obj
]
self._tests_index = _create_uid_index(self._tests)
def get_test(self, uid):
@@ -129,13 +132,14 @@ class InternalLibraryResults(_CommonMetadataMixin):
return iter(self._suites)
def _wrap_suites(self, obj):
self._suites = [InternalSuiteResult(suite, self.directory)
for suite in obj]
self._suites = [
InternalSuiteResult(suite, self.directory) for suite in obj
]
self._suites_index = _create_uid_index(self._suites)
def add_suite(self, suite):
if suite.uid in self._suites:
raise ValueError('Cannot have duplicate suite UIDs.')
raise ValueError("Cannot have duplicate suite UIDs.")
self._suites[suite.uid] = suite
def get_suite_result(self, suite_uid):
@@ -151,19 +155,21 @@ class InternalLibraryResults(_CommonMetadataMixin):
helper.append_dictlist(results, test.result.value, test)
return results
class InternalSavedResults:
@staticmethod
def output_path(test_uid, suite_uid, base=None):
'''
"""
Return the path which results for a specific test case should be
stored.
'''
"""
if base is None:
base = config.result_path
return os.path.join(
base,
str(suite_uid).replace(os.path.sep, '-'),
str(test_uid).replace(os.path.sep, '-'))
str(suite_uid).replace(os.path.sep, "-"),
str(test_uid).replace(os.path.sep, "-"),
)
@staticmethod
def save(results, path, protocol=pickle.HIGHEST_PROTOCOL):
@@ -174,12 +180,12 @@ class InternalSavedResults:
if exc.errno != errno.EEXIST:
raise
with open(path, 'wb') as f:
with open(path, "wb") as f:
pickle.dump(results, f, protocol)
@staticmethod
def load(path):
with open(path, 'rb') as f:
with open(path, "rb") as f:
return pickle.load(f)
@@ -189,29 +195,29 @@ class XMLElement(object):
self.end(file_)
def begin(self, file_):
file_.write('<')
file_.write("<")
file_.write(self.name)
if hasattr(self, 'attributes'):
if hasattr(self, "attributes"):
for attr in self.attributes:
file_.write(' ')
file_.write(" ")
attr.write(file_)
file_.write('>')
file_.write(">")
self.body(file_)
def body(self, file_):
if hasattr(self, 'elements'):
if hasattr(self, "elements"):
for elem in self.elements:
file_.write('\n')
file_.write("\n")
elem.write(file_)
if hasattr(self, 'content'):
file_.write('\n')
file_.write(
xml.sax.saxutils.escape(self.content))
file_.write('\n')
if hasattr(self, "content"):
file_.write("\n")
file_.write(xml.sax.saxutils.escape(self.content))
file_.write("\n")
def end(self, file_):
file_.write('</%s>' % self.name)
file_.write("</%s>" % self.name)
class XMLAttribute(object):
def __init__(self, name, value):
@@ -219,16 +225,17 @@ class XMLAttribute(object):
self.value = value
def write(self, file_):
file_.write('%s=%s' % (self.name,
xml.sax.saxutils.quoteattr(self.value)))
file_.write(
"%s=%s" % (self.name, xml.sax.saxutils.quoteattr(self.value))
)
class JUnitTestSuites(XMLElement):
name = 'testsuites'
name = "testsuites"
result_map = {
state.Result.Errored: 'errors',
state.Result.Failed: 'failures',
state.Result.Passed: 'tests'
state.Result.Errored: "errors",
state.Result.Failed: "failures",
state.Result.Passed: "tests",
}
def __init__(self, internal_results):
@@ -236,8 +243,9 @@ class JUnitTestSuites(XMLElement):
self.attributes = []
for result, tests in results.items():
self.attributes.append(self.result_attribute(result,
str(len(tests))))
self.attributes.append(
self.result_attribute(result, str(len(tests)))
)
self.elements = []
for suite in internal_results:
@@ -246,24 +254,24 @@ class JUnitTestSuites(XMLElement):
def result_attribute(self, result, count):
return XMLAttribute(self.result_map[result], count)
class JUnitTestSuite(JUnitTestSuites):
name = 'testsuite'
name = "testsuite"
result_map = {
state.Result.Errored: 'errors',
state.Result.Failed: 'failures',
state.Result.Passed: 'tests',
state.Result.Skipped: 'skipped'
state.Result.Errored: "errors",
state.Result.Failed: "failures",
state.Result.Passed: "tests",
state.Result.Skipped: "skipped",
}
def __init__(self, suite_result):
results = suite_result.aggregate_test_results()
self.attributes = [
XMLAttribute('name', suite_result.name)
]
self.attributes = [XMLAttribute("name", suite_result.name)]
for result, tests in results.items():
self.attributes.append(self.result_attribute(result,
str(len(tests))))
self.attributes.append(
self.result_attribute(result, str(len(tests)))
)
self.elements = []
for test in suite_result:
@@ -272,40 +280,42 @@ class JUnitTestSuite(JUnitTestSuites):
def result_attribute(self, result, count):
return XMLAttribute(self.result_map[result], count)
class JUnitTestCase(XMLElement):
name = 'testcase'
name = "testcase"
def __init__(self, test_result):
self.attributes = [
XMLAttribute('name', test_result.name),
XMLAttribute("name", test_result.name),
# TODO JUnit expects class of test.. add as test metadata.
XMLAttribute('classname', str(test_result.uid)),
XMLAttribute('status', str(test_result.result)),
XMLAttribute('time', str(test_result.time["user_time"])),
XMLAttribute("classname", str(test_result.uid)),
XMLAttribute("status", str(test_result.result)),
XMLAttribute("time", str(test_result.time["user_time"])),
]
# TODO JUnit expects a message for the reason a test was
# skipped or errored, save this with the test metadata.
# http://llg.cubic.org/docs/junit/
self.elements = [
LargeFileElement('system-err', test_result.stderr),
LargeFileElement('system-out', test_result.stdout),
LargeFileElement("system-err", test_result.stderr),
LargeFileElement("system-out", test_result.stdout),
]
if str(test_result.result) == 'Failed':
self.elements.append(JUnitFailure(
'Test failed',
str(test_result.result.reason))
if str(test_result.result) == "Failed":
self.elements.append(
JUnitFailure("Test failed", str(test_result.result.reason))
)
class JUnitFailure(XMLElement):
name = 'failure'
name = "failure"
def __init__(self, message, cause):
self.attributes = [
XMLAttribute('message', message),
XMLAttribute("message", message),
]
cause_element = XMLElement()
cause_element.name = 'cause'
cause_element.name = "cause"
cause_element.content = cause
self.elements = [cause_element]
@@ -318,7 +328,7 @@ class LargeFileElement(XMLElement):
def body(self, file_):
try:
with open(self.filename, 'r') as f:
with open(self.filename, "r") as f:
for line in f:
file_.write(xml.sax.saxutils.escape(line))
except IOError:
@@ -330,15 +340,13 @@ class LargeFileElement(XMLElement):
pass
class JUnitSavedResults:
@staticmethod
def save(results, path):
'''
"""
Compile the internal results into JUnit format writting it to the
given file.
'''
"""
results = JUnitTestSuites(results)
with open(path, 'w') as f:
with open(path, "w") as f:
results.write(f)

View File

@@ -47,14 +47,15 @@ import testlib.log as log
from testlib.state import Status, Result
from testlib.fixture import SkipException
def compute_aggregate_result(iterable):
'''
"""
Status of the test suite by default is:
* Passed if all contained tests passed
* Errored if any contained tests errored
* Failed if no tests errored, but one or more failed.
* Skipped if all contained tests were skipped
'''
"""
failed = []
skipped = []
for testitem in iterable:
@@ -73,18 +74,18 @@ def compute_aggregate_result(iterable):
else:
return Result(Result.Passed)
class TestParameters(object):
def __init__(self, test, suite):
self.test = test
self.suite = suite
self.log = log.test_log
self.log.test = test
self.time = {
"user_time" : 0, "system_time" : 0}
self.time = {"user_time": 0, "system_time": 0}
@helper.cacheresult
def _fixtures(self):
fixtures = {fixture.name:fixture for fixture in self.suite.fixtures}
fixtures = {fixture.name: fixture for fixture in self.suite.fixtures}
for fixture in self.test.fixtures:
fixtures[fixture.name] = fixture
return fixtures
@@ -139,18 +140,18 @@ class RunnerPattern:
else:
self.testable.status = Status.Complete
class TestRunner(RunnerPattern):
def test(self):
test_params = TestParameters(
self.testable,
self.testable.parent_suite)
test_params = TestParameters(self.testable, self.testable.parent_suite)
try:
# Running the test
test_params.test.test(test_params)
except Exception:
self.testable.result = Result(Result.Failed,
traceback.format_exc())
self.testable.result = Result(
Result.Failed, traceback.format_exc()
)
else:
self.testable.result = Result(Result.Passed)
@@ -161,8 +162,7 @@ class SuiteRunner(RunnerPattern):
def test(self):
for test in self.testable:
test.runner(test).run()
self.testable.result = compute_aggregate_result(
iter(self.testable))
self.testable.result = compute_aggregate_result(iter(self.testable))
class LibraryRunner(SuiteRunner):
@@ -175,22 +175,22 @@ class LibraryParallelRunner(RunnerPattern):
def test(self):
pool = multiprocessing.dummy.Pool(self.threads)
pool.map(lambda suite : suite.runner(suite).run(), self.testable)
self.testable.result = compute_aggregate_result(
iter(self.testable))
pool.map(lambda suite: suite.runner(suite).run(), self.testable)
self.testable.result = compute_aggregate_result(iter(self.testable))
class BrokenFixtureException(Exception):
def __init__(self, fixture, testitem, trace):
self.trace = trace
self.msg = ('%s\n'
self.msg = (
"%s\n"
'Exception raised building "%s" raised SkipException'
' for "%s".' %
(trace, fixture.name, testitem.name)
' for "%s".' % (trace, fixture.name, testitem.name)
)
super(BrokenFixtureException, self).__init__(self.msg)
class FixtureBuilder(object):
def __init__(self, fixtures):
self.fixtures = fixtures
@@ -207,12 +207,15 @@ class FixtureBuilder(object):
raise
except Exception as e:
exc = traceback.format_exc()
msg = 'Exception raised while setting up fixture for %s' %\
testitem.uid
log.test_log.warn('%s\n%s' % (exc, msg))
msg = (
"Exception raised while setting up fixture for %s"
% testitem.uid
)
log.test_log.warn("%s\n%s" % (exc, msg))
raise BrokenFixtureException(fixture, testitem,
traceback.format_exc())
raise BrokenFixtureException(
fixture, testitem, traceback.format_exc()
)
def post_test_procedure(self, testitem):
for fixture in self.built_fixtures:
@@ -225,6 +228,8 @@ class FixtureBuilder(object):
except Exception:
# Log exception but keep cleaning up.
exc = traceback.format_exc()
msg = 'Exception raised while tearing down fixture for %s' %\
testitem.uid
log.test_log.warn('%s\n%s' % (exc, msg))
msg = (
"Exception raised while tearing down fixture for %s"
% testitem.uid
)
log.test_log.warn("%s\n%s" % (exc, msg))

View File

@@ -24,14 +24,15 @@
#
# Authors: Sean Wilson
class Result:
enums = '''
enums = """
NotRun
Skipped
Passed
Failed
Errored
'''.split()
""".split()
for idx, enum in enumerate(enums):
locals()[enum] = idx
@@ -46,15 +47,16 @@ class Result:
def __str__(self):
return self.name(self.value)
class Status:
enums = '''
enums = """
Unscheduled
Building
Running
TearingDown
Complete
Avoided
'''.split()
""".split()
for idx, enum in enumerate(enums):
locals()[enum] = idx

View File

@@ -30,8 +30,9 @@
import testlib.helper as helper
import testlib.runner as runner_mod
class TestSuite(object):
'''
"""
An object grouping a collection of tests. It provides tags which enable
filtering during list and run selection. All tests held in the suite must
have a unique name.
@@ -44,7 +45,8 @@ class TestSuite(object):
To reduce test definition boilerplate, the :func:`init` method is
forwarded all `*args` and `**kwargs`. This means derived classes can
define init without boilerplate super().__init__(*args, **kwargs).
'''
"""
runner = runner_mod.SuiteRunner
collector = helper.InstanceCollector()
fixtures = []
@@ -56,8 +58,14 @@ class TestSuite(object):
TestSuite.collector.collect(obj)
return obj
def __init__(self, name=None, fixtures=tuple(), tests=tuple(),
tags=tuple(), **kwargs):
def __init__(
self,
name=None,
fixtures=tuple(),
tests=tuple(),
tags=tuple(),
**kwargs
):
self.fixtures = self.fixtures + list(fixtures)
self.tags = self.tags | set(tags)
self.tests = self.tests + list(tests)

View File

@@ -41,7 +41,7 @@ import struct
# ANSI color names in index order
color_names = "Black Red Green Yellow Blue Magenta Cyan White".split()
default_separator = '='
default_separator = "="
# Character attribute capabilities. Note that not all terminals
# support all of these capabilities, or support them
@@ -54,39 +54,46 @@ default_separator = '='
# Please feel free to add information about other terminals here.
#
capability_map = {
'Bold': 'bold',
'Dim': 'dim',
'Blink': 'blink',
'Underline': 'smul',
'Reverse': 'rev',
'Standout': 'smso',
'Normal': 'sgr0'
"Bold": "bold",
"Dim": "dim",
"Blink": "blink",
"Underline": "smul",
"Reverse": "rev",
"Standout": "smso",
"Normal": "sgr0",
}
capability_names = capability_map.keys()
def null_cap_string(s, *args):
return ''
return ""
try:
import curses
curses.setupterm()
def cap_string(s, *args):
cap = curses.tigetstr(s)
if cap:
return curses.tparm(cap, *args).decode("utf-8")
else:
return ''
return ""
except:
cap_string = null_cap_string
class ColorStrings(object):
def __init__(self, cap_string):
for i, c in enumerate(color_names):
setattr(self, c, cap_string('setaf', i))
setattr(self, c, cap_string("setaf", i))
for name, cap in capability_map.items():
setattr(self, name, cap_string(cap))
termcap = ColorStrings(cap_string)
no_termcap = ColorStrings(null_cap_string)
@@ -95,7 +102,8 @@ if sys.stdout.isatty():
else:
tty_termcap = no_termcap
def get_termcap(use_colors = None):
def get_termcap(use_colors=None):
if use_colors:
return termcap
elif use_colors is None:
@@ -104,12 +112,16 @@ def get_termcap(use_colors = None):
else:
return no_termcap
def terminal_size():
'''Return the (width, heigth) of the terminal screen.'''
"""Return the (width, heigth) of the terminal screen."""
try:
h, w, hp, wp = struct.unpack('HHHH',
fcntl.ioctl(0, termios.TIOCGWINSZ,
struct.pack('HHHH', 0, 0, 0, 0)))
h, w, hp, wp = struct.unpack(
"HHHH",
fcntl.ioctl(
0, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)
),
)
return w, h
except IOError:
# It's possible that in sandboxed environments the above ioctl is not
@@ -118,51 +130,60 @@ def terminal_size():
def separator(char=default_separator, color=None):
'''
"""
Return a separator of the given character that is the length of the full
width of the terminal screen.
'''
"""
(w, h) = terminal_size()
if color:
return color + char*w + termcap.Normal
return color + char * w + termcap.Normal
else:
return char*w
return char * w
def insert_separator(inside, char=default_separator,
min_barrier=3, color=None):
'''
def insert_separator(
inside, char=default_separator, min_barrier=3, color=None
):
"""
Place the given string inside of the separator. If it does not fit inside,
expand the separator to fit it with at least min_barrier.
.. seealso:: :func:`separator`
'''
"""
# Use a bytearray so it's efficient to manipulate
string = bytearray(separator(char, color=color), 'utf-8')
string = bytearray(separator(char, color=color), "utf-8")
# Check if we can fit inside with at least min_barrier.
gap = (len(string) - len(inside)) - min_barrier * 2
if gap > 0:
# We'll need to expand the string to fit us.
string.extend([ char for _ in range(-gap)])
string.extend([char for _ in range(-gap)])
# Emplace inside
middle = (len(string)-1)//2
start_idx = middle - len(inside)//2
string[start_idx:len(inside)+start_idx] = str.encode(inside)
middle = (len(string) - 1) // 2
start_idx = middle - len(inside) // 2
string[start_idx : len(inside) + start_idx] = str.encode(inside)
return str(string.decode("utf-8"))
if __name__ == '__main__':
if __name__ == "__main__":
def test_termcap(obj):
for c_name in color_names:
c_str = getattr(obj, c_name)
print(c_str + c_name + obj.Normal)
for attr_name in capability_names:
if attr_name == 'Normal':
if attr_name == "Normal":
continue
attr_str = getattr(obj, attr_name)
print(attr_str + c_str + attr_name + " " + c_name + obj.Normal)
print(obj.Bold + obj.Underline + \
c_name + "Bold Underline " + c_str + obj.Normal)
print(
obj.Bold
+ obj.Underline
+ c_name
+ "Bold Underline "
+ c_str
+ obj.Normal
)
print("=== termcap enabled ===")
test_termcap(termcap)

View File

@@ -29,14 +29,16 @@
import testlib.helper as helper
import testlib.runner as runner_mod
class TestCase(object):
'''
"""
Base class for all tests.
..note::
The :func:`__new__` method enables collection of test cases, it must
be called in order for test cases to be collected.
'''
"""
fixtures = []
# TODO, remove explicit dependency. Use the loader to set the
@@ -55,10 +57,12 @@ class TestCase(object):
name = self.__class__.__name__
self.name = name
class TestFunction(TestCase):
'''
"""
TestCase implementation which uses a callable object as a test.
'''
"""
def __init__(self, function, name=None, **kwargs):
self.test_function = function
if name is None:

View File

@@ -31,8 +31,9 @@ import itertools
import testlib.configuration as configuration
class UID(object):
sep = ':'
sep = ":"
type_idx, path_idx = range(2)
def __init__(self, path, *args):
@@ -41,9 +42,10 @@ class UID(object):
@staticmethod
def _shorten_path(path):
return os.path.relpath(path,
os.path.commonprefix((configuration.constants.testing_base,
path)))
return os.path.relpath(
path,
os.path.commonprefix((configuration.constants.testing_base, path)),
)
@staticmethod
def _full_path(short_path):
@@ -75,11 +77,11 @@ class UID(object):
def __str__(self):
common_opts = {
self.path_idx: self.path,
self.type_idx: self.__class__.__name__
self.type_idx: self.__class__.__name__,
}
return self.sep.join(itertools.chain(
[common_opts[0], common_opts[1]],
self.attributes))
return self.sep.join(
itertools.chain([common_opts[0], common_opts[1]], self.attributes)
)
def __hash__(self):
return hash(str(self))

View File

@@ -38,16 +38,17 @@
#
# Authors: Sean Wilson
'''
"""
Module contains wrappers for test items that have been
loaded by the testlib :class:`testlib.loader.Loader`.
'''
"""
import itertools
import testlib.uid as uid
from testlib.state import Status, Result
class TestCaseMetadata():
class TestCaseMetadata:
def __init__(self, name, uid, path, result, status, suite_uid):
self.name = name
self.uid = uid
@@ -57,7 +58,7 @@ class TestCaseMetadata():
self.suite_uid = suite_uid
class TestSuiteMetadata():
class TestSuiteMetadata:
def __init__(self, name, uid, tags, path, status, result):
self.name = name
self.uid = uid
@@ -67,7 +68,7 @@ class TestSuiteMetadata():
self.result = result
class LibraryMetadata():
class LibraryMetadata:
def __init__(self, name, result, status):
self.name = name
self.result = result
@@ -75,13 +76,14 @@ class LibraryMetadata():
class LoadedTestable(object):
'''
"""
Base class for loaded test items.
:property:`result` and :property:`status` setters
notify testlib via the :func:`log_result` and :func:`log_status`
of the updated status.
'''
"""
def __init__(self, obj):
self.obj = obj
self.metadata = self._generate_metadata()
@@ -135,10 +137,12 @@ class LoadedTestable(object):
# TODO Change log to provide status_update, result_update for all types.
def log_status(self, status):
import testlib.log as log
log.test_log.status_update(self, status)
def log_result(self, result):
import testlib.log as log
log.test_log.result_update(self, result)
def __iter__(self):
@@ -155,16 +159,18 @@ class LoadedTest(LoadedTestable):
self.obj.test(*args, **kwargs)
def _generate_metadata(self):
return TestCaseMetadata( **{
'name':self.obj.name,
'path': self._path,
'uid': uid.TestUID(self._path,
self.obj.name,
self.parent_suite.name),
'status': Status.Unscheduled,
'result': Result(Result.NotRun),
'suite_uid': self.parent_suite.metadata.uid
})
return TestCaseMetadata(
**{
"name": self.obj.name,
"path": self._path,
"uid": uid.TestUID(
self._path, self.obj.name, self.parent_suite.name
),
"status": Status.Unscheduled,
"result": Result(Result.NotRun),
"suite_uid": self.parent_suite.metadata.uid,
}
)
class LoadedSuite(LoadedTestable):
@@ -174,18 +180,21 @@ class LoadedSuite(LoadedTestable):
self.tests = self._wrap_children(suite_obj)
def _wrap_children(self, suite_obj):
return [LoadedTest(test, self, self.metadata.path)
for test in suite_obj]
return [
LoadedTest(test, self, self.metadata.path) for test in suite_obj
]
def _generate_metadata(self):
return TestSuiteMetadata( **{
'name': self.obj.name,
'tags':self.obj.tags,
'path': self._path,
'uid': uid.SuiteUID(self._path, self.obj.name),
'status': Status.Unscheduled,
'result': Result(Result.NotRun)
})
return TestSuiteMetadata(
**{
"name": self.obj.name,
"tags": self.obj.tags,
"path": self._path,
"uid": uid.SuiteUID(self._path, self.obj.name),
"status": Status.Unscheduled,
"result": Result(Result.NotRun),
}
)
def __iter__(self):
return iter(self.tests)
@@ -196,41 +205,44 @@ class LoadedSuite(LoadedTestable):
class LoadedLibrary(LoadedTestable):
'''
"""
Wraps a collection of all loaded test suites and
provides utility functions for accessing fixtures.
'''
"""
def __init__(self, suites):
LoadedTestable.__init__(self, suites)
def _generate_metadata(self):
return LibraryMetadata( **{
'name': 'Test Library',
'status': Status.Unscheduled,
'result': Result(Result.NotRun)
})
return LibraryMetadata(
**{
"name": "Test Library",
"status": Status.Unscheduled,
"result": Result(Result.NotRun),
}
)
def __iter__(self):
'''
"""
:returns: an iterator over contained :class:`TestSuite` objects.
'''
"""
return iter(self.obj)
def all_fixtures(self):
'''
"""
:returns: an interator overall all global, suite,
and test fixtures
'''
return itertools.chain(itertools.chain(
*(suite.fixtures for suite in self.obj)),
"""
return itertools.chain(
itertools.chain(*(suite.fixtures for suite in self.obj)),
*(self.test_fixtures(suite) for suite in self.obj)
)
def test_fixtures(self, suite):
'''
"""
:returns: an interator over all fixtures of each
test contained in the given suite
'''
"""
return itertools.chain(*(test.fixtures for test in suite))
@property