tests,ext: Add a new testing library proposal

The new test library is split into two parts: The framework which resides
in ext/, and the gem5 helping components in /tests/gem5.

Change-Id: Ib4f3ae8d7eb96a7306335a3e739b7e8041aa99b9
Signed-off-by: Sean Wilson <spwilson2@wisc.edu>
Reviewed-on: https://gem5-review.googlesource.com/4421
Reviewed-by: Giacomo Travaglini <giacomo.travaglini@arm.com>
Maintainer: Jason Lowe-Power <jason@lowepower.com>
This commit is contained in:
Sean Wilson
2017-08-03 11:28:49 -05:00
committed by Jason Lowe-Power
parent e726ced408
commit 07ce662bd2
25 changed files with 4944 additions and 0 deletions

44
ext/testlib/__init__.py Normal file
View File

@@ -0,0 +1,44 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
from .state import *
from .runner import *
from .test import *
from .suite import *
from .loader import *
from .fixture import *
from .config import *
from main import main
#TODO Remove this awkward bootstrap
#FIXME
from gem5 import *
#TODO Remove this as an export, users should getcwd from os
from os import getcwd

687
ext/testlib/config.py Normal file
View File

@@ -0,0 +1,687 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
Global configuration module which exposes two types of configuration
variables:
1. config
2. constants (Also attached to the config variable as an attribute)
The main motivation for this module is to have a centralized location for
defaults and configuration by command line and files for the test framework.
A secondary goal is to reduce programming errors by providing common constant
strings and values as python attributes to simplify detection of typos.
A simple typo in a string can take a lot of debugging to uncover the issue,
attribute errors are easier to notice and most autocompletion systems detect
them.
The config variable is initialzed by calling :func:`initialize_config`.
Before this point only ``constants`` will be availaible. This is to ensure
that library function writers never accidentally get stale config attributes.
Program arguments/flag arguments are available from the config as attributes.
If an attribute was not set by the command line or the optional config file,
then it will fallback to the `_defaults` value, if still the value is not
found an AttributeError will be raised.
:func define_defaults:
Provided by the config if the attribute is not found in the config or
commandline. For instance, if we are using the list command fixtures might
not be able to count on the build_dir being provided since we aren't going
to build anything.
:var constants:
Values not directly exposed by the config, but are attached to the object
for centralized access. I.E. you can reach them with
:code:`config.constants.attribute`. These should be used for setting
common string names used across the test framework.
:code:`_defaults.build_dir = None` Once this module has been imported
constants should not be modified and their base attributes are frozen.
'''
import abc
import argparse
import copy
import os
import re
from ConfigParser import ConfigParser
from pickle import HIGHEST_PROTOCOL as highest_pickle_protocol
from helper import absdirpath, AttrDict, FrozenAttrDict
class UninitialzedAttributeException(Exception):
'''
Signals that an attribute in the config file was not initialized.
'''
pass
class UninitializedConfigException(Exception):
'''
Signals that the config was not initialized before trying to access an
attribute.
'''
pass
class TagRegex(object):
def __init__(self, include, regex):
self.include = include
self.regex = re.compile(regex)
def __str__(self):
type_ = 'Include' if self.include else 'Remove'
return '%10s: %s' % (type_, self.regex.pattern)
class _Config(object):
_initialized = False
__shared_dict = {}
constants = AttrDict()
_defaults = AttrDict()
_config = {}
_cli_args = {}
_post_processors = {}
def __init__(self):
# This object will act as if it were a singleton.
self.__dict__ = self.__shared_dict
def _init(self, parser):
self._parse_commandline_args(parser)
self._run_post_processors()
self._initialized = True
def _init_with_dicts(self, config, defaults):
self._config = config
self._defaults = defaults
self._initialized = True
def _add_post_processor(self, attr, post_processor):
'''
:param attr: Attribute to pass to and recieve from the
:func:`post_processor`.
:param post_processor: A callback functions called in a chain to
perform additional setup for a config argument. Should return a
tuple containing the new value for the config attr.
'''
if attr not in self._post_processors:
self._post_processors[attr] = []
self._post_processors[attr].append(post_processor)
def _set(self, name, value):
self._config[name] = value
def _parse_commandline_args(self, parser):
args = parser.parse_args()
self._config_file_args = {}
for attr in dir(args):
# Ignore non-argument attributes.
if not attr.startswith('_'):
self._config_file_args[attr] = getattr(args, attr)
self._config.update(self._config_file_args)
def _run_post_processors(self):
for attr, callbacks in self._post_processors.items():
newval = self._lookup_val(attr)
for callback in callbacks:
newval = callback(newval)
if newval is not None:
newval = newval[0]
self._set(attr, newval)
def _lookup_val(self, attr):
'''
Get the attribute from the config or fallback to defaults.
:returns: If the value is not stored return None. Otherwise a tuple
containing the value.
'''
if attr in self._config:
return (self._config[attr],)
elif hasattr(self._defaults, attr):
return (getattr(self._defaults, attr),)
def __getattr__(self, attr):
if attr in dir(super(_Config, self)):
return getattr(super(_Config, self), attr)
elif not self._initialized:
raise UninitializedConfigException(
'Cannot directly access elements from the config before it is'
' initialized')
else:
val = self._lookup_val(attr)
if val is not None:
return val[0]
else:
raise UninitialzedAttributeException(
'%s was not initialzed in the config.' % attr)
def get_tags(self):
d = {typ: set(self.__getattr__(typ))
for typ in self.constants.supported_tags}
if any(map(lambda vals: bool(vals), d.values())):
return d
else:
return {}
def define_defaults(defaults):
'''
Defaults are provided by the config if the attribute is not found in the
config or commandline. For instance, if we are using the list command
fixtures might not be able to count on the build_dir being provided since
we aren't going to build anything.
'''
defaults.base_dir = os.path.abspath(os.path.join(absdirpath(__file__),
os.pardir,
os.pardir))
defaults.result_path = os.path.join(os.getcwd(), '.testing-results')
defaults.list_only_failed = False
def define_constants(constants):
'''
'constants' are values not directly exposed by the config, but are attached
to the object for centralized access. These should be used for setting
common string names used across the test framework. A simple typo in
a string can take a lot of debugging to uncover the issue, attribute errors
are easier to notice and most autocompletion systems detect them.
'''
constants.system_out_name = 'system-out'
constants.system_err_name = 'system-err'
constants.isa_tag_type = 'isa'
constants.x86_tag = 'X86'
constants.sparc_tag = 'SPARC'
constants.alpha_tag = 'ALPHA'
constants.riscv_tag = 'RISCV'
constants.arm_tag = 'ARM'
constants.mips_tag = 'MIPS'
constants.power_tag = 'POWER'
constants.null_tag = 'NULL'
constants.variant_tag_type = 'variant'
constants.opt_tag = 'opt'
constants.debug_tag = 'debug'
constants.fast_tag = 'fast'
constants.length_tag_type = 'length'
constants.quick_tag = 'quick'
constants.long_tag = 'long'
constants.supported_tags = {
constants.isa_tag_type : (
constants.x86_tag,
constants.sparc_tag,
constants.alpha_tag,
constants.riscv_tag,
constants.arm_tag,
constants.mips_tag,
constants.power_tag,
constants.null_tag,
),
constants.variant_tag_type: (
constants.opt_tag,
constants.debug_tag,
constants.fast_tag,
),
constants.length_tag_type: (
constants.quick_tag,
constants.long_tag,
),
}
constants.supported_isas = constants.supported_tags['isa']
constants.supported_variants = constants.supported_tags['variant']
constants.supported_lengths = constants.supported_tags['length']
constants.tempdir_fixture_name = 'tempdir'
constants.gem5_simulation_stderr = 'simerr'
constants.gem5_simulation_stdout = 'simout'
constants.gem5_simulation_stats = 'stats.txt'
constants.gem5_simulation_config_ini = 'config.ini'
constants.gem5_simulation_config_json = 'config.json'
constants.gem5_returncode_fixture_name = 'gem5-returncode'
constants.gem5_binary_fixture_name = 'gem5'
constants.xml_filename = 'results.xml'
constants.pickle_filename = 'results.pickle'
constants.pickle_protocol = highest_pickle_protocol
# The root directory which all test names will be based off of.
constants.testing_base = absdirpath(os.path.join(absdirpath(__file__),
os.pardir))
def define_post_processors(config):
'''
post_processors are used to do final configuration of variables. This is
useful if there is a dynamically set default, or some function that needs
to be applied after parsing in order to set a configration value.
Post processors must accept a single argument that will either be a tuple
containing the already set config value or ``None`` if the config value
has not been set to anything. They must return the modified value in the
same format.
'''
def set_default_build_dir(build_dir):
'''
Post-processor to set the default build_dir based on the base_dir.
.. seealso :func:`~_Config._add_post_processor`
'''
if not build_dir or build_dir[0] is None:
base_dir = config._lookup_val('base_dir')[0]
build_dir = (os.path.join(base_dir, 'build'),)
return build_dir
def fix_verbosity_hack(verbose):
return (verbose[0].val,)
def threads_as_int(threads):
if threads is not None:
return (int(threads[0]),)
def test_threads_as_int(test_threads):
if test_threads is not None:
return (int(test_threads[0]),)
def default_isa(isa):
if not isa[0]:
return [constants.supported_tags[constants.isa_tag_type]]
else:
return isa
def default_variant(variant):
if not variant[0]:
# Default variant is only opt. No need to run tests with multiple
# different compilation targets
return [[constants.opt_tag]]
else:
return variant
def default_length(length):
if not length[0]:
return [[constants.quick_tag]]
else:
return length
def compile_tag_regex(positional_tags):
if not positional_tags:
return positional_tags
else:
new_positional_tags_list = []
positional_tags = positional_tags[0]
for flag, regex in positional_tags:
if flag == 'exclude_tags':
tag_regex = TagRegex(False, regex)
elif flag == 'include_tags':
tag_regex = TagRegex(True, regex)
else:
raise ValueError('Unsupported flag.')
new_positional_tags_list.append(tag_regex)
return (new_positional_tags_list,)
config._add_post_processor('build_dir', set_default_build_dir)
config._add_post_processor('verbose', fix_verbosity_hack)
config._add_post_processor('isa', default_isa)
config._add_post_processor('variant', default_variant)
config._add_post_processor('length', default_length)
config._add_post_processor('threads', threads_as_int)
config._add_post_processor('test_threads', test_threads_as_int)
config._add_post_processor(StorePositionalTagsAction.position_kword,
compile_tag_regex)
class Argument(object):
'''
Class represents a cli argument/flag for a argparse parser.
:attr name: The long name of this object that will be stored in the arg
output by the final parser.
'''
def __init__(self, *flags, **kwargs):
self.flags = flags
self.kwargs = kwargs
if len(flags) == 0:
raise ValueError("Need at least one argument.")
elif 'dest' in kwargs:
self.name = kwargs['dest']
elif len(flags) > 1 or flags[0].startswith('-'):
for flag in flags:
if not flag.startswith('-'):
raise ValueError("invalid option string %s: must start"
"with a character '-'" % flag)
if flag.startswith('--'):
if not hasattr(self, 'name'):
self.name = flag.lstrip('-')
if not hasattr(self, 'name'):
self.name = flags[0].lstrip('-')
self.name = self.name.replace('-', '_')
def add_to(self, parser):
'''Add this argument to the given parser.'''
parser.add_argument(*self.flags, **self.kwargs)
def copy(self):
'''Copy this argument so you might modify any of its kwargs.'''
return copy.deepcopy(self)
class _StickyInt:
'''
A class that is used to cheat the verbosity count incrementer by
pretending to be an int. This makes the int stay on the heap and eat other
real numbers when they are added to it.
We use this so we can allow the verbose flag to be provided before or after
the subcommand. This likely has no utility outside of this use case.
'''
def __init__(self, val=0):
self.val = val
self.type = int
def __add__(self, other):
self.val += other
return self
common_args = NotImplemented
class StorePositionAction(argparse.Action):
'''Base class for classes wishing to create namespaces where
arguments are stored in the order provided via the command line.
'''
position_kword = 'positional'
def __call__(self, parser, namespace, values, option_string=None):
if not self.position_kword in namespace:
setattr(namespace, self.position_kword, [])
previous = getattr(namespace, self.position_kword)
previous.append((self.dest, values))
setattr(namespace, self.position_kword, previous)
class StorePositionalTagsAction(StorePositionAction):
position_kword = 'tag_filters'
def define_common_args(config):
'''
Common args are arguments which are likely to be simular between different
subcommands, so they are available to all by placing their definitions
here.
'''
global common_args
# A list of common arguments/flags used across cli parsers.
common_args = [
Argument(
'directory',
nargs='?',
default=os.getcwd(),
help='Directory to start searching for tests in'),
Argument(
'--exclude-tags',
action=StorePositionalTagsAction,
help='A tag comparison used to select tests.'),
Argument(
'--include-tags',
action=StorePositionalTagsAction,
help='A tag comparison used to select tests.'),
Argument(
'--isa',
action='append',
default=[],
help="Only tests that are valid with one of these ISAs. "
"Comma separated."),
Argument(
'--variant',
action='append',
default=[],
help="Only tests that are valid with one of these binary variants"
"(e.g., opt, debug). Comma separated."),
Argument(
'--length',
action='append',
default=[],
help="Only tests that are one of these lengths. Comma separated."),
Argument(
'--uid',
action='store',
default=None,
help='UID of a specific test item to run.'),
Argument(
'--build-dir',
action='store',
help='Build directory for SCons'),
Argument(
'--base-dir',
action='store',
default=config._defaults.base_dir,
help='Directory to change to in order to exec scons.'),
Argument(
'-j', '--threads',
action='store',
default=1,
help='Number of threads to run SCons with.'),
Argument(
'-t', '--test-threads',
action='store',
default=1,
help='Number of threads to spawn to run concurrent tests with.'),
Argument(
'-v',
action='count',
dest='verbose',
default=_StickyInt(),
help='Increase verbosity'),
Argument(
'--config-path',
action='store',
default=os.getcwd(),
help='Path to read a testing.ini config in'
),
Argument(
'--skip-build',
action='store_true',
default=False,
help='Skip the building component of SCons targets.'
),
Argument(
'--result-path',
action='store',
help='The path to store results in.'
),
]
# NOTE: There is a limitation which arises due to this format. If you have
# multiple arguments with the same name only the final one in the list
# will be saved.
#
# e.g. if you have a -v argument which increments verbosity level and
# a separate --verbose flag which 'store's verbosity level. the final
# one in the list will be saved.
common_args = AttrDict({arg.name:arg for arg in common_args})
class ArgParser(object):
__metaclass__ = abc.ABCMeta
def __init__(self, parser):
# Copy public methods of the parser.
for attr in dir(parser):
if not attr.startswith('_'):
setattr(self, attr, getattr(parser, attr))
self.parser = parser
self.add_argument = self.parser.add_argument
# Argument will be added to all parsers and subparsers.
common_args.verbose.add_to(parser)
class CommandParser(ArgParser):
'''
Main parser which parses command strings and uses those to direct to
a subparser.
'''
def __init__(self):
parser = argparse.ArgumentParser()
super(CommandParser, self).__init__(parser)
self.subparser = self.add_subparsers(dest='command')
class RunParser(ArgParser):
'''
Parser for the \'run\' command.
'''
def __init__(self, subparser):
parser = subparser.add_parser(
'run',
help='''Run Tests.'''
)
super(RunParser, self).__init__(parser)
common_args.uid.add_to(parser)
common_args.skip_build.add_to(parser)
common_args.directory.add_to(parser)
common_args.build_dir.add_to(parser)
common_args.base_dir.add_to(parser)
common_args.threads.add_to(parser)
common_args.test_threads.add_to(parser)
common_args.isa.add_to(parser)
common_args.variant.add_to(parser)
common_args.length.add_to(parser)
common_args.include_tags.add_to(parser)
common_args.exclude_tags.add_to(parser)
class ListParser(ArgParser):
'''
Parser for the \'list\' command.
'''
def __init__(self, subparser):
parser = subparser.add_parser(
'list',
help='''List and query test metadata.'''
)
super(ListParser, self).__init__(parser)
Argument(
'--suites',
action='store_true',
default=False,
help='List all test suites.'
).add_to(parser)
Argument(
'--tests',
action='store_true',
default=False,
help='List all test cases.'
).add_to(parser)
Argument(
'--fixtures',
action='store_true',
default=False,
help='List all fixtures.'
).add_to(parser)
Argument(
'--all-tags',
action='store_true',
default=False,
help='List all tags.'
).add_to(parser)
Argument(
'-q',
dest='quiet',
action='store_true',
default=False,
help='Quiet output (machine readable).'
).add_to(parser)
common_args.directory.add_to(parser)
common_args.isa.add_to(parser)
common_args.variant.add_to(parser)
common_args.length.add_to(parser)
common_args.include_tags.add_to(parser)
common_args.exclude_tags.add_to(parser)
class RerunParser(ArgParser):
def __init__(self, subparser):
parser = subparser.add_parser(
'rerun',
help='''Rerun failed tests.'''
)
super(RerunParser, self).__init__(parser)
common_args.skip_build.add_to(parser)
common_args.directory.add_to(parser)
common_args.build_dir.add_to(parser)
common_args.base_dir.add_to(parser)
common_args.threads.add_to(parser)
common_args.test_threads.add_to(parser)
common_args.isa.add_to(parser)
common_args.variant.add_to(parser)
common_args.length.add_to(parser)
config = _Config()
define_constants(config.constants)
# Constants are directly exposed and available once this module is created.
# All constants MUST be defined before this point.
config.constants = FrozenAttrDict(config.constants.__dict__)
constants = config.constants
'''
This config object is the singleton config object available throughout the
framework.
'''
def initialize_config():
'''
Parse the commandline arguments and setup the config varibles.
'''
global config
# Setup constants and defaults
define_defaults(config._defaults)
define_post_processors(config)
define_common_args(config)
# Setup parser and subcommands
baseparser = CommandParser()
runparser = RunParser(baseparser.subparser)
listparser = ListParser(baseparser.subparser)
rerunparser = RerunParser(baseparser.subparser)
# Initialize the config by parsing args and running callbacks.
config._init(baseparser)

108
ext/testlib/fixture.py Normal file
View File

@@ -0,0 +1,108 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import copy
import traceback
import helper
import log
global_fixtures = []
class SkipException(Exception):
def __init__(self, fixture, testitem):
self.fixture = fixture
self.testitem = testitem
self.msg = 'Fixture "%s" raised SkipException for "%s".' % (
fixture.name, testitem.name
)
super(SkipException, self).__init__(self.msg)
class Fixture(object):
'''
Base Class for a test Fixture.
Fixtures are items which possibly require setup and/or tearing down after
a TestCase, TestSuite, or the Library has completed.
Fixtures are the prefered method of carrying incremental results or
variables between TestCases in TestSuites. (Rather than using globals.)
Using fixtures rather than globals ensures that state will be maintained
when executing tests in parallel.
.. note:: In order for Fixtures to be enumerated by the test system this
class' :code:`__new__` method must be called.
'''
collector = helper.InstanceCollector()
def __new__(klass, *args, **kwargs):
obj = super(Fixture, klass).__new__(klass, *args, **kwargs)
Fixture.collector.collect(obj)
return obj
def __init__(self, name=None, **kwargs):
if name is None:
name = self.__class__.__name__
self.name = name
def skip(self, testitem):
raise SkipException(self.name, testitem.metadata)
def schedule_finalized(self, schedule):
'''
This method is called once the schedule of for tests is known.
To enable tests to use the same fixture defintion for each execution
fixtures must return a copy of themselves in this method.
:returns: a copy of this fixture which will be setup/torndown
when the test item this object is tied to is about to execute.
'''
return self.copy()
def init(self, *args, **kwargs):
pass
def setup(self, testitem):
pass
def teardown(self, testitem):
pass
def copy(self):
return copy.deepcopy(self)
def globalfixture(fixture):
'''
Store the given fixture as a global fixture. Its setup() method
will be called before the first test is executed.
'''
global_fixtures.append(fixture)
return fixture

437
ext/testlib/handlers.py Normal file
View File

@@ -0,0 +1,437 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
Handlers for the testlib Log.
'''
from __future__ import print_function
import multiprocessing
import os
import Queue
import sys
import threading
import time
import traceback
import helper
import log
import result
import state
import test
import terminal
from config import config, constants
class _TestStreamManager(object):
def __init__(self):
self._writers = {}
def open_writer(self, test_result):
if test_result in self._writers:
raise ValueError('Cannot have multiple writters on a single test.')
self._writers[test_result] = _TestStreams(test_result.stdout,
test_result.stderr)
def get_writer(self, test_result):
if test_result not in self._writers:
self.open_writer(test_result)
return self._writers[test_result]
def close_writer(self, test_result):
if test_result in self._writers:
writer = self._writers.pop(test_result)
writer.close()
def close(self):
for writer in self._writers.values():
writer.close()
self._writers.clear()
class _TestStreams(object):
def __init__(self, stdout, stderr):
helper.mkdir_p(os.path.dirname(stdout))
helper.mkdir_p(os.path.dirname(stderr))
self.stdout = open(stdout, 'w')
self.stderr = open(stderr, 'w')
def close(self):
self.stdout.close()
self.stderr.close()
class ResultHandler(log.Handler):
'''
Log handler which listens for test results and output saving data as
it is reported.
When the handler is closed it writes out test results in the python pickle
format.
'''
def __init__(self, schedule, directory):
'''
:param schedule: The entire schedule as a :class:`LoadedLibrary`
object.
:param directory: Directory to save test stdout/stderr and aggregate
results to.
'''
self.directory = directory
self.internal_results = result.InternalLibraryResults(schedule,
directory)
self.test_stream_manager = _TestStreamManager()
self._closed = False
self.mapping = {
log.LibraryStatus.type_id: self.handle_library_status,
log.SuiteResult.type_id: self.handle_suite_result,
log.TestResult.type_id: self.handle_test_result,
log.TestStderr.type_id: self.handle_stderr,
log.TestStdout.type_id: self.handle_stdout,
}
def handle(self, record):
if not self._closed:
self.mapping.get(record.type_id, lambda _:None)(record)
def handle_library_status(self, record):
if record['status'] in (state.Status.Complete, state.Status.Avoided):
self.test_stream_manager.close()
def handle_suite_result(self, record):
suite_result = self.internal_results.get_suite_result(
record['metadata'].uid)
suite_result.result = record['result']
def handle_test_result(self, record):
test_result = self._get_test_result(record)
test_result.result = record['result']
def handle_stderr(self, record):
self.test_stream_manager.get_writer(
self._get_test_result(record)
).stderr.write(record['buffer'])
def handle_stdout(self, record):
self.test_stream_manager.get_writer(
self._get_test_result(record)
).stdout.write(record['buffer'])
def _get_test_result(self, test_record):
return self.internal_results.get_test_result(
test_record['metadata'].uid,
test_record['metadata'].suite_uid)
def _save(self):
#FIXME Hardcoded path name
result.InternalSavedResults.save(
self.internal_results,
os.path.join(self.directory, constants.pickle_filename))
result.JUnitSavedResults.save(
self.internal_results,
os.path.join(self.directory, constants.xml_filename))
def close(self):
if self._closed:
return
self._closed = True
self._save()
#TODO Change from a handler to an internal post processor so it can be used
# to reprint results
class SummaryHandler(log.Handler):
'''
A log handler which listens to the log for test results
and reports the aggregate results when closed.
'''
color = terminal.get_termcap()
reset = color.Normal
colormap = {
state.Result.Errored: color.Red,
state.Result.Failed: color.Red,
state.Result.Passed: color.Green,
state.Result.Skipped: color.Cyan,
}
sep_fmtkey = 'separator'
sep_fmtstr = '{%s}' % sep_fmtkey
def __init__(self):
self.mapping = {
log.TestResult.type_id: self.handle_testresult,
log.LibraryStatus.type_id: self.handle_library_status,
}
self._timer = helper.Timer()
self.results = []
def handle_library_status(self, record):
if record['status'] == state.Status.Building:
self._timer.restart()
def handle_testresult(self, record):
result = record['result'].value
if result in (state.Result.Skipped, state.Result.Failed,
state.Result.Passed, state.Result.Errored):
self.results.append(result)
def handle(self, record):
self.mapping.get(record.type_id, lambda _:None)(record)
def close(self):
print(self._display_summary())
def _display_summary(self):
most_severe_outcome = None
outcome_fmt = ' {count} {outcome}'
strings = []
outcome_count = [0] * len(state.Result.enums)
for result in self.results:
outcome_count[result] += 1
# Iterate over enums so they are in order of severity
for outcome in state.Result.enums:
outcome = getattr(state.Result, outcome)
count = outcome_count[outcome]
if count:
strings.append(outcome_fmt.format(count=count,
outcome=state.Result.enums[outcome]))
most_severe_outcome = outcome
string = ','.join(strings)
if most_severe_outcome is None:
string = ' No testing done'
most_severe_outcome = state.Result.Passed
else:
string = ' Results:' + string + ' in {:.2} seconds '.format(
self._timer.active_time())
string += ' '
return terminal.insert_separator(
string,
color=self.colormap[most_severe_outcome] + self.color.Bold)
class TerminalHandler(log.Handler):
color = terminal.get_termcap()
verbosity_mapping = {
log.LogLevel.Warn: color.Yellow,
log.LogLevel.Error: color.Red,
}
default = color.Normal
def __init__(self, verbosity=log.LogLevel.Info, machine_only=False):
self.stream = verbosity >= log.LogLevel.Trace
self.verbosity = verbosity
self.machine_only = machine_only
self.mapping = {
log.TestResult.type_id: self.handle_testresult,
log.SuiteStatus.type_id: self.handle_suitestatus,
log.TestStatus.type_id: self.handle_teststatus,
log.TestStderr.type_id: self.handle_stderr,
log.TestStdout.type_id: self.handle_stdout,
log.TestMessage.type_id: self.handle_testmessage,
log.LibraryMessage.type_id: self.handle_librarymessage,
}
def _display_outcome(self, name, outcome, reason=None):
print(self.color.Bold
+ SummaryHandler.colormap[outcome]
+ name
+ ' '
+ state.Result.enums[outcome]
+ SummaryHandler.reset)
if reason is not None:
log.test_log.info('')
log.test_log.info('Reason:')
log.test_log.info(reason)
log.test_log.info(terminal.separator('-'))
def handle_teststatus(self, record):
if record['status'] == state.Status.Running:
log.test_log.debug('Starting Test Case: %s' %\
record['metadata'].name)
def handle_testresult(self, record):
self._display_outcome(
'Test: %s' % record['metadata'].name,
record['result'].value)
def handle_suitestatus(self, record):
if record['status'] == state.Status.Running:
log.test_log.debug('Starting Test Suite: %s ' %\
record['metadata'].name)
def handle_stderr(self, record):
if self.stream:
print(record.data['buffer'], file=sys.stderr, end='')
def handle_stdout(self, record):
if self.stream:
print(record.data['buffer'], file=sys.stdout, end='')
def handle_testmessage(self, record):
if self.stream:
print(self._colorize(record['message'], record['level']))
def handle_librarymessage(self, record):
if not self.machine_only or record.data.get('machine_readable', False):
print(self._colorize(record['message'], record['level'],
record['bold']))
def _colorize(self, message, level, bold=False):
return '%s%s%s%s' % (
self.color.Bold if bold else '',
self.verbosity_mapping.get(level, ''),
message,
self.default)
def handle(self, record):
if record.data.get('level', self.verbosity) > self.verbosity:
return
self.mapping.get(record.type_id, lambda _:None)(record)
def set_verbosity(self, verbosity):
self.verbosity = verbosity
class PrintHandler(log.Handler):
def __init__(self):
pass
def handle(self, record):
print(str(record).rstrip())
def close(self):
pass
class MultiprocessingHandlerWrapper(log.Handler):
'''
A handler class which forwards log records to subhandlers, enabling
logging across multiprocessing python processes.
The 'parent' side of the handler should execute either
:func:`async_process` or :func:`process` to forward
log records to subhandlers.
'''
def __init__(self, *subhandlers):
# Create thread to spin handing recipt of messages
# Create queue to push onto
self.queue = multiprocessing.Queue()
self.queue.cancel_join_thread()
self._shutdown = threading.Event()
# subhandlers should be accessed with the _handler_lock
self._handler_lock = threading.Lock()
self._subhandlers = subhandlers
def add_handler(self, handler):
self._handler_lock.acquire()
self._subhandlers = (handler, ) + self._subhandlers
self._handler_lock.release()
def _with_handlers(self, callback):
exception = None
self._handler_lock.acquire()
for handler in self._subhandlers:
# Prevent deadlock when using this handler by delaying
# exception raise until we get a chance to unlock.
try:
callback(handler)
except Exception as e:
exception = e
break
self._handler_lock.release()
if exception is not None:
raise exception
def async_process(self):
self.thread = threading.Thread(target=self.process)
self.thread.daemon = True
self.thread.start()
def process(self):
while not self._shutdown.is_set():
try:
item = self.queue.get(timeout=0.1)
self._handle(item)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
return
except Queue.Empty:
continue
def _drain(self):
while True:
try:
item = self.queue.get(block=False)
self._handle(item)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
return
except Queue.Empty:
return
def _handle(self, record):
self._with_handlers(lambda handler: handler.handle(record))
def handle(self, record):
self.queue.put(record)
def _close(self):
if hasattr(self, 'thread'):
self.thread.join()
_wrap(self._drain)
self._with_handlers(lambda handler: _wrap(handler.close))
# NOTE Python2 has an known bug which causes IOErrors to be raised
# if this shutdown doesn't go cleanly on both ends.
# This sleep adds some time for the sender threads on this process to
# finish pickling the object and complete shutdown after the queue is
# closed.
time.sleep(.2)
self.queue.close()
time.sleep(.2)
def close(self):
if not self._shutdown.is_set():
self._shutdown.set()
self._close()
def _wrap(callback, *args, **kwargs):
try:
callback(*args, **kwargs)
except:
traceback.print_exc()

460
ext/testlib/helper.py Normal file
View File

@@ -0,0 +1,460 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
Helper classes for writing tests with this test library.
'''
from collections import MutableSet, OrderedDict
import difflib
import errno
import os
import Queue
import re
import shutil
import stat
import subprocess
import tempfile
import threading
import time
import traceback
#TODO Tear out duplicate logic from the sandbox IOManager
def log_call(logger, command, *popenargs, **kwargs):
'''
Calls the given process and automatically logs the command and output.
If stdout or stderr are provided output will also be piped into those
streams as well.
:params stdout: Iterable of items to write to as we read from the
subprocess.
:params stderr: Iterable of items to write to as we read from the
subprocess.
'''
if isinstance(command, str):
cmdstr = command
else:
cmdstr = ' '.join(command)
logger_callback = logger.trace
logger.trace('Logging call to command: %s' % cmdstr)
stdout_redirect = kwargs.get('stdout', tuple())
stderr_redirect = kwargs.get('stderr', tuple())
if hasattr(stdout_redirect, 'write'):
stdout_redirect = (stdout_redirect,)
if hasattr(stderr_redirect, 'write'):
stderr_redirect = (stderr_redirect,)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
p = subprocess.Popen(command, *popenargs, **kwargs)
def log_output(log_callback, pipe, redirects=tuple()):
# Read iteractively, don't allow input to fill the pipe.
for line in iter(pipe.readline, ''):
for r in redirects:
r.write(line)
log_callback(line.rstrip())
stdout_thread = threading.Thread(target=log_output,
args=(logger_callback, p.stdout, stdout_redirect))
stdout_thread.setDaemon(True)
stderr_thread = threading.Thread(target=log_output,
args=(logger_callback, p.stderr, stderr_redirect))
stderr_thread.setDaemon(True)
stdout_thread.start()
stderr_thread.start()
retval = p.wait()
stdout_thread.join()
stderr_thread.join()
# Return the return exit code of the process.
if retval != 0:
raise subprocess.CalledProcessError(retval, cmdstr)
# lru_cache stuff (Introduced in python 3.2+)
# Renamed and modified to cacheresult
class _HashedSeq(list):
'''
This class guarantees that hash() will be called no more than once per
element. This is important because the cacheresult() will hash the key
multiple times on a cache miss.
.. note:: From cpython 3.7
'''
__slots__ = 'hashvalue'
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark = (object(),),
fasttypes = {int, str, frozenset, type(None)},
tuple=tuple, type=type, len=len):
'''
Make a cache key from optionally typed positional and keyword arguments.
The key is constructed in a way that is flat as possible rather than as
a nested structure that would take more memory. If there is only a single
argument and its data type is known to cache its hash value, then that
argument is returned without a wrapper. This saves space and improves
lookup speed.
.. note:: From cpython 3.7
'''
key = args
if kwds:
key += kwd_mark
for item in kwds.items():
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for v in kwds.values())
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
def cacheresult(function, typed=False):
'''
:param typed: If typed is True, arguments of different types will be
cached separately. I.e. f(3.0) and f(3) will be treated as distinct
calls with distinct results.
.. note:: From cpython 3.7
'''
sentinel = object() # unique object used to signal cache misses
make_key = _make_key # build a key from the function arguments
cache = {}
def wrapper(*args, **kwds):
# Simple caching without ordering or size limit
key = _make_key(args, kwds, typed)
result = cache.get(key, sentinel)
if result is not sentinel:
return result
result = function(*args, **kwds)
cache[key] = result
return result
return wrapper
class OrderedSet(MutableSet):
'''
Maintain ordering of insertion in items to the set with quick iteration.
http://code.activestate.com/recipes/576694/
'''
def __init__(self, iterable=None):
self.end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.map = {} # key --> [key, prev, next]
if iterable is not None:
self |= iterable
def __len__(self):
return len(self.map)
def __contains__(self, key):
return key in self.map
def add(self, key):
if key not in self.map:
end = self.end
curr = end[1]
curr[2] = end[1] = self.map[key] = [key, curr, end]
def update(self, keys):
for key in keys:
self.add(key)
def discard(self, key):
if key in self.map:
key, prev, next = self.map.pop(key)
prev[2] = next
next[1] = prev
def __iter__(self):
end = self.end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def pop(self, last=True):
if not self:
raise KeyError('set is empty')
key = self.end[1][0] if last else self.end[2][0]
self.discard(key)
return key
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, OrderedSet):
return len(self) == len(other) and list(self) == list(other)
return set(self) == set(other)
def absdirpath(path):
'''
Return the directory component of the absolute path of the given path.
'''
return os.path.dirname(os.path.abspath(path))
joinpath = os.path.join
def mkdir_p(path):
'''
Same thing as mkdir -p
https://stackoverflow.com/a/600612
'''
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
class FrozenSetException(Exception):
'''Signals one tried to set a value in a 'frozen' object.'''
pass
class AttrDict(object):
'''Object which exposes its own internal dictionary through attributes.'''
def __init__(self, dict_={}):
self.update(dict_)
def __getattr__(self, attr):
dict_ = self.__dict__
if attr in dict_:
return dict_[attr]
raise AttributeError('Could not find %s attribute' % attr)
def __setattr__(self, attr, val):
self.__dict__[attr] = val
def __iter__(self):
return iter(self.__dict__)
def __getitem__(self, item):
return self.__dict__[item]
def update(self, items):
self.__dict__.update(items)
class FrozenAttrDict(AttrDict):
'''An AttrDict whose attributes cannot be modified directly.'''
__initialized = False
def __init__(self, dict_={}):
super(FrozenAttrDict, self).__init__(dict_)
self.__initialized = True
def __setattr__(self, attr, val):
if self.__initialized:
raise FrozenSetException(
'Cannot modify an attribute in a FozenAttrDict')
else:
super(FrozenAttrDict, self).__setattr__(attr, val)
def update(self, items):
if self.__initialized:
raise FrozenSetException(
'Cannot modify an attribute in a FozenAttrDict')
else:
super(FrozenAttrDict, self).update(items)
class InstanceCollector(object):
'''
A class used to simplify collecting of Classes.
>> instance_list = collector.create()
>> # Create a bunch of classes which call collector.collect(self)
>> # instance_list contains all instances created since
>> # collector.create was called
>> collector.remove(instance_list)
'''
def __init__(self):
self.collectors = []
def create(self):
collection = []
self.collectors.append(collection)
return collection
def remove(self, collector):
self.collectors.remove(collector)
def collect(self, instance):
for col in self.collectors:
col.append(instance)
def append_dictlist(dict_, key, value):
'''
Append the `value` to a list associated with `key` in `dict_`.
If `key` doesn't exist, create a new list in the `dict_` with value in it.
'''
list_ = dict_.get(key, [])
list_.append(value)
dict_[key] = list_
class ExceptionThread(threading.Thread):
'''
Wrapper around a python :class:`Thread` which will raise an
exception on join if the child threw an unhandled exception.
'''
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self._eq = Queue.Queue()
def run(self, *args, **kwargs):
try:
threading.Thread.run(self, *args, **kwargs)
self._eq.put(None)
except:
tb = traceback.format_exc()
self._eq.put(tb)
def join(self, *args, **kwargs):
threading.Thread.join(*args, **kwargs)
exception = self._eq.get()
if exception:
raise Exception(exception)
def _filter_file(fname, filters):
with open(fname, "r") as file_:
for line in file_:
for regex in filters:
if re.match(regex, line):
break
else:
yield line
def _copy_file_keep_perms(source, target):
'''Copy a file keeping the original permisions of the target.'''
st = os.stat(target)
shutil.copy2(source, target)
os.chown(target, st[stat.ST_UID], st[stat.ST_GID])
def _filter_file_inplace(fname, filters):
'''
Filter the given file writing filtered lines out to a temporary file, then
copy that tempfile back into the original file.
'''
reenter = False
(_, tfname) = tempfile.mkstemp(text=True)
with open(tfname, 'w') as tempfile_:
for line in _filter_file(fname, filters):
tempfile_.write(line)
# Now filtered output is into tempfile_
_copy_file_keep_perms(tfname, fname)
def diff_out_file(ref_file, out_file, logger, ignore_regexes=tuple()):
'''Diff two files returning the diff as a string.'''
if not os.path.exists(ref_file):
raise OSError("%s doesn't exist in reference directory"\
% ref_file)
if not os.path.exists(out_file):
raise OSError("%s doesn't exist in output directory" % out_file)
_filter_file_inplace(out_file, ignore_regexes)
_filter_file_inplace(ref_file, ignore_regexes)
#try :
(_, tfname) = tempfile.mkstemp(text=True)
with open(tfname, 'r+') as tempfile_:
try:
log_call(logger, ['diff', out_file, ref_file], stdout=tempfile_)
except OSError:
# Likely signals that diff does not exist on this system. fallback
# to difflib
with open(out_file, 'r') as outf, open(ref_file, 'r') as reff:
diff = difflib.unified_diff(iter(reff.readline, ''),
iter(outf.readline, ''),
fromfile=ref_file,
tofile=out_file)
return ''.join(diff)
except subprocess.CalledProcessError:
tempfile_.seek(0)
return ''.join(tempfile_.readlines())
else:
return None
class Timer():
def __init__(self):
self.restart()
def restart(self):
self._start = self.timestamp()
self._stop = None
def stop(self):
self._stop = self.timestamp()
return self._stop - self._start
def runtime(self):
return self._stop - self._start
def active_time(self):
return self.timestamp() - self._start
@staticmethod
def timestamp():
return time.time()

302
ext/testlib/loader.py Normal file
View File

@@ -0,0 +1,302 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
Contains the :class:`Loader` which is responsible for discovering and loading
tests.
Loading typically follows the following stages.
1. Recurse down a given directory looking for tests which match a given regex.
The default regex used will match any python file (ending in .py) that has
a name starting or ending in test(s). If there are any additional
components of the name they must be connected with '-' or '_'. Lastly,
file names that begin with '.' will be ignored.
The following names would match:
- `tests.py`
- `test.py`
- `test-this.py`
- `tests-that.py`
- `these-test.py`
These would not match:
- `.test.py` - 'hidden' files are ignored.
- `test` - Must end in '.py'
- `test-.py` - Needs a character after the hypen.
- `testthis.py` - Needs a hypen or underscore to separate 'test' and 'this'
2. With all files discovered execute each file gathering its test items we
care about collecting. (`TestCase`, `TestSuite` and `Fixture` objects.)
As a final note, :class:`TestCase` instances which are not put into
a :class:`TestSuite` by the test writer will be placed into
a :class:`TestSuite` named after the module.
.. seealso:: :func:`load_file`
'''
import os
import re
import sys
import traceback
import config
import log
import suite as suite_mod
import test as test_mod
import fixture as fixture_mod
import wrappers
import uid
class DuplicateTestItemException(Exception):
'''
Exception indicates multiple test items with the same UID
were discovered.
'''
pass
# Match filenames that either begin or end with 'test' or tests and use
# - or _ to separate additional name components.
default_filepath_regex = re.compile(
r'(((.+[_])?tests?)|(tests?([-_].+)?))\.py$')
def default_filepath_filter(filepath):
'''The default filter applied to filepaths to marks as test sources.'''
filepath = os.path.basename(filepath)
if default_filepath_regex.match(filepath):
# Make sure doesn't start with .
return not filepath.startswith('.')
return False
def path_as_modulename(filepath):
'''Return the given filepath as a module name.'''
# Remove the file extention (.py)
return os.path.splitext(os.path.basename(filepath))[0]
def path_as_suitename(filepath):
return os.path.split(os.path.dirname(os.path.abspath((filepath))))[-1]
def _assert_files_in_same_dir(files):
if __debug__:
if files:
directory = os.path.dirname(files[0])
for f in files:
assert os.path.dirname(f) == directory
class Loader(object):
'''
Class for discovering tests.
Discovered :class:`TestCase` and :class:`TestSuite` objects are wrapped by
:class:`LoadedTest` and :class:`LoadedSuite` objects respectively.
These objects provided additional methods and metadata about the loaded
objects and are the internal representation used by testlib.
To simply discover and load all tests using the default filter create an
instance and `load_root`.
>>> import os
>>> tl = Loader()
>>> tl.load_root(os.getcwd())
.. note:: If tests are not contained in a TestSuite, they will
automatically be placed into one for the module.
.. warn:: This class is extremely thread-unsafe.
It modifies the sys path and global config.
Use with care.
'''
def __init__(self):
self.suites = []
self.suite_uids = {}
self.filepath_filter = default_filepath_filter
# filepath -> Successful | Failed to load
self._files = {}
@property
def schedule(self):
return wrappers.LoadedLibrary(self.suites, fixture_mod.global_fixtures)
def load_schedule_for_suites(self, *uids):
files = {uid.UID.uid_to_path(id_) for id_ in uids}
for file_ in files:
self.load_file(file_)
return wrappers.LoadedLibrary(
[self.suite_uids[id_] for id_ in uids],
fixture_mod.global_fixtures)
def _verify_no_duplicate_suites(self, new_suites):
new_suite_uids = self.suite_uids.copy()
for suite in new_suites:
if suite.uid in new_suite_uids:
raise DuplicateTestItemException(
"More than one suite with UID '%s' was defined" %\
suite.uid)
new_suite_uids[suite.uid] = suite
def _verify_no_duplicate_tests_in_suites(self, new_suites):
for suite in new_suites:
test_uids = set()
for test in suite:
if test.uid in test_uids:
raise DuplicateTestItemException(
"More than one test with UID '%s' was defined"
" in suite '%s'"
% (test.uid, suite.uid))
test_uids.add(test.uid)
def load_root(self, root):
'''
Load files from the given root directory which match
`self.filepath_filter`.
'''
if __debug__:
self._loaded_a_file = True
for directory in self._discover_files(root):
if directory:
_assert_files_in_same_dir(directory)
for f in directory:
self.load_file(f)
def load_dir(self, directory):
for dir_ in self._discover_files(directory):
_assert_files_in_same_dir(dir_)
for f in dir_:
self.load_file(f)
def load_file(self, path):
path = os.path.abspath(path)
if path in self._files:
if not self._files[path]:
raise Exception('Attempted to load a file which already'
' failed to load')
else:
log.test_log.debug('Tried to reload: %s' % path)
return
# Create a custom dictionary for the loaded module.
newdict = {
'__builtins__':__builtins__,
'__name__': path_as_modulename(path),
'__file__': path,
}
# Add the file's containing directory to the system path. So it can do
# relative imports naturally.
old_path = sys.path[:]
sys.path.insert(0, os.path.dirname(path))
cwd = os.getcwd()
os.chdir(os.path.dirname(path))
config.config.file_under_load = path
new_tests = test_mod.TestCase.collector.create()
new_suites = suite_mod.TestSuite.collector.create()
new_fixtures = fixture_mod.Fixture.collector.create()
def cleanup():
config.config.file_under_load = None
sys.path[:] = old_path
os.chdir(cwd)
test_mod.TestCase.collector.remove(new_tests)
suite_mod.TestSuite.collector.remove(new_suites)
fixture_mod.Fixture.collector.remove(new_fixtures)
try:
execfile(path, newdict, newdict)
except Exception as e:
log.test_log.debug(traceback.format_exc())
log.test_log.warn(
'Exception thrown while loading "%s"\n'
'Ignoring all tests in this file.'
% (path))
cleanup()
return
# Create a module test suite for those not contained in a suite.
orphan_tests = set(new_tests)
for suite in new_suites:
for test in suite:
# Remove the test if it wasn't already removed.
# (Suites may contain copies of tests.)
if test in orphan_tests:
orphan_tests.remove(test)
if orphan_tests:
orphan_tests = sorted(orphan_tests, key=new_tests.index)
# FIXME Use the config based default to group all uncollected
# tests.
# NOTE: This is automatically collected (we still have the
# collector active.)
suite_mod.TestSuite(tests=orphan_tests,
name=path_as_suitename(path))
try:
loaded_suites = [wrappers.LoadedSuite(suite, path)
for suite in new_suites]
self._verify_no_duplicate_suites(loaded_suites)
self._verify_no_duplicate_tests_in_suites(loaded_suites)
except Exception as e:
log.test_log.warn('%s\n'
'Exception thrown while loading "%s"\n'
'Ignoring all tests in this file.'
% (traceback.format_exc(), path))
else:
log.test_log.info('Discovered %d tests and %d suites in %s'
'' % (len(new_tests), len(loaded_suites), path))
self.suites.extend(loaded_suites)
self.suite_uids.update({suite.uid: suite
for suite in loaded_suites})
cleanup()
def _discover_files(self, root):
'''
Recurse down from the given root directory returning a list of
directories which contain a list of files matching
`self.filepath_filter`.
'''
# Will probably want to order this traversal.
for root, dirnames, filenames in os.walk(root):
dirnames.sort()
if filenames:
filenames.sort()
filepaths = [os.path.join(root, filename) \
for filename in filenames]
filepaths = filter(self.filepath_filter, filepaths)
if filepaths:
yield filepaths

256
ext/testlib/log.py Normal file
View File

@@ -0,0 +1,256 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
This module supplies the global `test_log` object which all testing
results and messages are reported through.
'''
import wrappers
class LogLevel():
Fatal = 0
Error = 1
Warn = 2
Info = 3
Debug = 4
Trace = 5
class RecordTypeCounterMetaclass(type):
'''
Record type metaclass.
Adds a static integer value in addition to typeinfo so identifiers
are common across processes, networks and module reloads.
'''
counter = 0
def __init__(cls, name, bases, dct):
cls.type_id = RecordTypeCounterMetaclass.counter
RecordTypeCounterMetaclass.counter += 1
class Record(object):
'''
A generic object that is passed to the :class:`Log` and its handlers.
..note: Although not statically enforced, all items in the record should be
be pickleable. This enables logging accross multiple processes.
'''
__metaclass__ = RecordTypeCounterMetaclass
def __init__(self, **data):
self.data = data
def __getitem__(self, item):
if item not in self.data:
raise KeyError('%s not in record %s' %\
(item, self.__class__.__name__))
return self.data[item]
def __str__(self):
return str(self.data)
class StatusRecord(Record):
def __init__(self, obj, status):
Record.__init__(self, metadata=obj.metadata, status=status)
class ResultRecord(Record):
def __init__(self, obj, result):
Record.__init__(self, metadata=obj.metadata, result=result)
#TODO Refactor this shit... Not ideal. Should just specify attributes.
class TestStatus(StatusRecord):
pass
class SuiteStatus(StatusRecord):
pass
class LibraryStatus(StatusRecord):
pass
class TestResult(ResultRecord):
pass
class SuiteResult(ResultRecord):
pass
class LibraryResult(ResultRecord):
pass
# Test Output Types
class TestStderr(Record):
pass
class TestStdout(Record):
pass
# Message (Raw String) Types
class TestMessage(Record):
pass
class LibraryMessage(Record):
pass
class Log(object):
def __init__(self):
self.handlers = []
self._opened = False # TODO Guards to methods
self._closed = False # TODO Guards to methods
def finish_init(self):
self._opened = True
def close(self):
self._closed = True
for handler in self.handlers:
handler.close()
def log(self, record):
if not self._opened:
self.finish_init()
if self._closed:
raise Exception('The log has been closed'
' and is no longer available.')
map(lambda handler:handler.prehandle(), self.handlers)
for handler in self.handlers:
handler.handle(record)
handler.posthandle()
def add_handler(self, handler):
if self._opened:
raise Exception('Unable to add a handler once the log is open.')
self.handlers.append(handler)
def close_handler(self, handler):
handler.close()
self.handlers.remove(handler)
class Handler(object):
'''
Empty implementation of the interface available to handlers which
is expected by the :class:`Log`.
'''
def __init__(self):
pass
def handle(self, record):
pass
def close(self):
pass
def prehandle(self):
pass
def posthandle(self):
pass
class LogWrapper(object):
_result_typemap = {
wrappers.LoadedLibrary.__name__: LibraryResult,
wrappers.LoadedSuite.__name__: SuiteResult,
wrappers.LoadedTest.__name__: TestResult,
}
_status_typemap = {
wrappers.LoadedLibrary.__name__: LibraryStatus,
wrappers.LoadedSuite.__name__: SuiteStatus,
wrappers.LoadedTest.__name__: TestStatus,
}
def __init__(self, log):
self.log_obj = log
def log(self, *args, **kwargs):
self.log_obj.log(*args, **kwargs)
# Library Logging Methods
# TODO Replace these methods in a test/create a wrapper?
# That way they still can log like this it's just hidden that they
# capture the current test.
def message(self, message, level=LogLevel.Info, bold=False, **metadata):
self.log_obj.log(LibraryMessage(message=message, level=level,
bold=bold, **metadata))
def error(self, message):
self.message(message, LogLevel.Error)
def warn(self, message):
self.message(message, LogLevel.Warn)
def info(self, message):
self.message(message, LogLevel.Info)
def debug(self, message):
self.message(message, LogLevel.Debug)
def trace(self, message):
self.message(message, LogLevel.Trace)
# Ongoing Test Logging Methods
def status_update(self, obj, status):
self.log_obj.log(
self._status_typemap[obj.__class__.__name__](obj, status))
def result_update(self, obj, result):
self.log_obj.log(
self._result_typemap[obj.__class__.__name__](obj, result))
def test_message(self, test, message, level):
self.log_obj.log(TestMessage(message=message, level=level,
test_uid=test.uid, suite_uid=test.parent_suite.uid))
# NOTE If performance starts to drag on logging stdout/err
# replace metadata with just test and suite uid tags.
def test_stdout(self, test, suite, buf):
self.log_obj.log(TestStdout(buffer=buf, metadata=test.metadata))
def test_stderr(self, test, suite, buf):
self.log_obj.log(TestStderr(buffer=buf, metadata=test.metadata))
def close(self):
self.log_obj.close()
class TestLogWrapper(object):
def __init__(self, log, test, suite):
self.log_obj = log
self.test = test
def test_message(self, message, level):
self.log_obj.test_message(test=self.test,
message=message, level=level)
def error(self, message):
self.test_message(message, LogLevel.Error)
def warn(self, message):
self.test_message(message, LogLevel.Warn)
def info(self, message):
self.test_message(message, LogLevel.Info)
def debug(self, message):
self.test_message(message, LogLevel.Debug)
def trace(self, message):
self.test_message(message, LogLevel.Trace)
test_log = LogWrapper(Log())

328
ext/testlib/main.py Normal file
View File

@@ -0,0 +1,328 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import os
import itertools
import config
import fixture as fixture_mod
import handlers
import loader as loader_mod
import log
import query
import result
import runner
import terminal
import uid
def entry_message():
log.test_log.message("Running the new gem5 testing script.")
log.test_log.message("For more information see TESTING.md.")
log.test_log.message("To see details as the testing scripts are"
" running, use the option"
" -v, -vv, or -vvv")
class RunLogHandler():
def __init__(self):
term_handler = handlers.TerminalHandler(
verbosity=config.config.verbose+log.LogLevel.Info
)
summary_handler = handlers.SummaryHandler()
self.mp_handler = handlers.MultiprocessingHandlerWrapper(
summary_handler, term_handler)
self.mp_handler.async_process()
log.test_log.log_obj.add_handler(self.mp_handler)
entry_message()
def schedule_finalized(self, test_schedule):
# Create the result handler object.
self.result_handler = handlers.ResultHandler(
test_schedule, config.config.result_path)
self.mp_handler.add_handler(self.result_handler)
def finish_testing(self):
self.result_handler.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
def close(self):
self.mp_handler.close()
def get_config_tags():
return getattr(config.config,
config.StorePositionalTagsAction.position_kword)
def filter_with_config_tags(loaded_library):
tags = get_config_tags()
final_tags = []
regex_fmt = '^%s$'
cfg = config.config
def _append_inc_tag_filter(name):
if hasattr(cfg, name):
tag_opts = getattr(cfg, name)
for tag in tag_opts:
final_tags.append(config.TagRegex(True, regex_fmt % tag))
def _append_rem_tag_filter(name):
if hasattr(cfg, name):
tag_opts = getattr(cfg, name)
for tag in cfg.constants.supported_tags[name]:
if tag not in tag_opts:
final_tags.append(config.TagRegex(False, regex_fmt % tag))
# Append additional tags for the isa, length, and variant options.
# They apply last (they take priority)
special_tags = (
cfg.constants.isa_tag_type,
cfg.constants.length_tag_type,
cfg.constants.variant_tag_type
)
for tagname in special_tags:
_append_inc_tag_filter(tagname)
for tagname in special_tags:
_append_rem_tag_filter(tagname)
if tags is None:
tags = tuple()
filters = list(itertools.chain(tags, final_tags))
string = 'Filtering suites with tags as follows:\n'
filter_string = '\t\n'.join((str(f) for f in filters))
log.test_log.trace(string + filter_string)
return filter_with_tags(loaded_library, filters)
def filter_with_tags(loaded_library, filters):
'''
Filter logic supports two filter types:
--include-tags <regex>
--exclude-tags <regex>
The logic maintains a `set` of test suites.
If the regex provided with the `--include-tags` flag matches a tag of a
suite, that suite will added to the set.
If the regex provided with the `--exclude-tags` flag matches a tag of a
suite, that suite will removed to the set.
Suites can be added and removed multiple times.
First Flag Special Case Logic:
If include is the first flag, start with an empty set of suites.
If exclude is the first flag, start with the set of all collected suites.
Let's trace out the set as we go through the flags to clarify::
# Say our collection of suites looks like this: set(suite_ARM64,
# suite_X86, suite_Other).
#
# Additionally, we've passed the flags in the following order:
# --include-tags "ARM64" --exclude-tags ".*" --include-tags "X86"
# Process --include-tags "ARM64"
set(suite_ARM64) # Suite begins empty, but adds the ARM64 suite
# Process --exclude-tags ".*"
set() # Removed all suites which have tags
# Process --include-tags "X86"
set(suite_X86)
'''
if not filters:
return
query_runner = query.QueryRunner(loaded_library)
tags = query_runner.tags()
if not filters[0].include:
suites = set(query_runner.suites())
else:
suites = set()
def exclude(excludes):
return suites - excludes
def include(includes):
return suites | includes
for tag_regex in filters:
matched_tags = (tag for tag in tags if tag_regex.regex.search(tag))
for tag in matched_tags:
matched_suites = set(query_runner.suites_with_tag(tag))
suites = include(matched_suites) if tag_regex.include \
else exclude(matched_suites)
# Set the library's suites to only those which where accepted by our filter
loaded_library.suites = [suite for suite in loaded_library.suites
if suite in suites]
# TODO Add results command for listing previous results.
def load_tests():
'''
Create a TestLoader and load tests for the directory given by the config.
'''
testloader = loader_mod.Loader()
log.test_log.message(terminal.separator())
log.test_log.message('Loading Tests', bold=True)
testloader.load_root(config.config.directory)
return testloader
def do_list():
term_handler = handlers.TerminalHandler(
verbosity=config.config.verbose+log.LogLevel.Info,
machine_only=config.config.quiet
)
log.test_log.log_obj.add_handler(term_handler)
entry_message()
test_schedule = load_tests().schedule
filter_with_config_tags(test_schedule)
qrunner = query.QueryRunner(test_schedule)
if config.config.suites:
qrunner.list_suites()
elif config.config.tests:
qrunner.list_tests()
elif config.config.all_tags:
qrunner.list_tags()
else:
qrunner.list_suites()
qrunner.list_tests()
qrunner.list_tags()
def run_schedule(test_schedule, log_handler):
'''
Test Phases
-----------
* Test Collection
* Fixture Parameterization
* Global Fixture Setup
* Iteratevely run suites:
* Suite Fixture Setup
* Iteratively run tests:
* Test Fixture Setup
* Run Test
* Test Fixture Teardown
* Suite Fixture Teardown
* Global Fixture Teardown
'''
log_handler.schedule_finalized(test_schedule)
# Iterate through all fixtures notifying them of the test schedule.
for suite in test_schedule:
copied_fixtures = []
for fixture in suite.fixtures:
copied_fixtures.append(fixture.schedule_finalized(test_schedule))
suite.fixtures = copied_fixtures
for test in suite:
copied_fixtures = []
for fixture in test.fixtures:
copied_fixtures.append(fixture.schedule_finalized(
test_schedule))
test.fixtures = copied_fixtures
log.test_log.message(terminal.separator())
log.test_log.message('Running Tests from {} suites'
.format(len(test_schedule.suites)), bold=True)
log.test_log.message("Results will be stored in {}".format(
config.config.result_path))
log.test_log.message(terminal.separator())
# Build global fixtures and exectute scheduled test suites.
if config.config.test_threads > 1:
library_runner = runner.LibraryParallelRunner(test_schedule)
library_runner.set_threads(config.config.test_threads)
else:
library_runner = runner.LibraryRunner(test_schedule)
library_runner.run()
log_handler.finish_testing()
def do_run():
# Initialize early parts of the log.
with RunLogHandler() as log_handler:
if config.config.uid:
uid_ = uid.UID.from_uid(config.config.uid)
if isinstance(uid_, uid.TestUID):
log.test_log.error('Unable to run a standalone test.\n'
'Gem5 expects test suites to be the smallest unit '
' of test.\n\n'
'Pass a SuiteUID instead.')
return
test_schedule = loader_mod.Loader().load_schedule_for_suites(uid_)
if get_config_tags():
log.test_log.warn(
"The '--uid' flag was supplied,"
" '--include-tags' and '--exclude-tags' will be ignored."
)
else:
test_schedule = load_tests().schedule
# Filter tests based on tags
filter_with_config_tags(test_schedule)
# Execute the tests
run_schedule(test_schedule, log_handler)
def do_rerun():
# Init early parts of log
with RunLogHandler() as log_handler:
# Load previous results
results = result.InternalSavedResults.load(
os.path.join(config.config.result_path,
config.constants.pickle_filename))
rerun_suites = (suite.uid for suite in results if suite.unsucessful)
# Use loader to load suites
loader = loader_mod.Loader()
test_schedule = loader.load_schedule_for_suites(*rerun_suites)
# Execute the tests
run_schedule(test_schedule, log_handler)
def main():
'''
Main entrypoint for the testlib test library.
'''
config.initialize_config()
# 'do' the given command.
globals()['do_'+config.config.command]()
log.test_log.close()

71
ext/testlib/query.py Normal file
View File

@@ -0,0 +1,71 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import terminal
import log
# TODO Refactor print logic out of this so the objects
# created are separate from print logic.
class QueryRunner(object):
def __init__(self, test_schedule):
self.schedule = test_schedule
def tags(self):
tags = set()
for suite in self.schedule:
tags = tags | set(suite.tags)
return tags
def suites(self):
return [suite for suite in self.schedule]
def suites_with_tag(self, tag):
return filter(lambda suite: tag in suite.tags, self.suites())
def list_tests(self):
log.test_log.message(terminal.separator())
log.test_log.message('Listing all Test Cases.', bold=True)
log.test_log.message(terminal.separator())
for suite in self.schedule:
for test in suite:
log.test_log.message(test.uid, machine_readable=True)
def list_suites(self):
log.test_log.message(terminal.separator())
log.test_log.message('Listing all Test Suites.', bold=True)
log.test_log.message(terminal.separator())
for suite in self.suites():
log.test_log.message(suite.uid, machine_readable=True)
def list_tags(self):
log.test_log.message(terminal.separator())
log.test_log.message('Listing all Test Tags.', bold=True)
log.test_log.message(terminal.separator())
for tag in self.tags():
log.test_log.message(tag, machine_readable=True)

303
ext/testlib/result.py Normal file
View File

@@ -0,0 +1,303 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import os
import pickle
import xml.sax.saxutils
from config import config
import helper
import state
import log
def _create_uid_index(iterable):
index = {}
for item in iterable:
assert item.uid not in index
index[item.uid] = item
return index
class _CommonMetadataMixin:
@property
def name(self):
return self._metadata.name
@property
def uid(self):
return self._metadata.uid
@property
def result(self):
return self._metadata.result
@result.setter
def result(self, result):
self._metadata.result = result
@property
def unsucessful(self):
return self._metadata.result.value != state.Result.Passed
class InternalTestResult(object, _CommonMetadataMixin):
def __init__(self, obj, suite, directory):
self._metadata = obj.metadata
self.suite = suite
self.stderr = os.path.join(
InternalSavedResults.output_path(self.uid, suite.uid),
'stderr'
)
self.stdout = os.path.join(
InternalSavedResults.output_path(self.uid, suite.uid),
'stdout'
)
class InternalSuiteResult(object, _CommonMetadataMixin):
def __init__(self, obj, directory):
self._metadata = obj.metadata
self.directory = directory
self._wrap_tests(obj)
def _wrap_tests(self, obj):
self._tests = [InternalTestResult(test, self, self.directory)
for test in obj]
self._tests_index = _create_uid_index(self._tests)
def get_test(self, uid):
return self._tests_index[uid]
def __iter__(self):
return iter(self._tests)
def get_test_result(self, uid):
return self.get_test(uid)
def aggregate_test_results(self):
results = {}
for test in self:
helper.append_dictlist(results, test.result.value, test)
return results
class InternalLibraryResults(object, _CommonMetadataMixin):
def __init__(self, obj, directory):
self.directory = directory
self._metadata = obj.metadata
self._wrap_suites(obj)
def __iter__(self):
return iter(self._suites)
def _wrap_suites(self, obj):
self._suites = [InternalSuiteResult(suite, self.directory)
for suite in obj]
self._suites_index = _create_uid_index(self._suites)
def add_suite(self, suite):
if suite.uid in self._suites:
raise ValueError('Cannot have duplicate suite UIDs.')
self._suites[suite.uid] = suite
def get_suite_result(self, suite_uid):
return self._suites_index[suite_uid]
def get_test_result(self, test_uid, suite_uid):
return self.get_suite_result(suite_uid).get_test_result(test_uid)
def aggregate_test_results(self):
results = {}
for suite in self._suites:
for test in suite:
helper.append_dictlist(results, test.result.value, test)
return results
class InternalSavedResults:
@staticmethod
def output_path(test_uid, suite_uid, base=None):
'''
Return the path which results for a specific test case should be
stored.
'''
if base is None:
base = config.result_path
return os.path.join(
base,
str(suite_uid).replace(os.path.sep, '-'),
str(test_uid).replace(os.path.sep, '-'))
@staticmethod
def save(results, path, protocol=pickle.HIGHEST_PROTOCOL):
if not os.path.exists(os.path.dirname(path)):
try:
os.makedirs(os.path.dirname(path))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
with open(path, 'w') as f:
pickle.dump(results, f, protocol)
@staticmethod
def load(path):
with open(path, 'r') as f:
return pickle.load(f)
class XMLElement(object):
def write(self, file_):
self.begin(file_)
self.end(file_)
def begin(self, file_):
file_.write('<')
file_.write(self.name)
for attr in self.attributes:
file_.write(' ')
attr.write(file_)
file_.write('>')
self.body(file_)
def body(self, file_):
for elem in self.elements:
file_.write('\n')
elem.write(file_)
file_.write('\n')
def end(self, file_):
file_.write('</%s>' % self.name)
class XMLAttribute(object):
def __init__(self, name, value):
self.name = name
self.value = value
def write(self, file_):
file_.write('%s=%s' % (self.name,
xml.sax.saxutils.quoteattr(self.value)))
class JUnitTestSuites(XMLElement):
name = 'testsuites'
result_map = {
state.Result.Errored: 'errors',
state.Result.Failed: 'failures',
state.Result.Passed: 'tests'
}
def __init__(self, internal_results):
results = internal_results.aggregate_test_results()
self.attributes = []
for result, tests in results.items():
self.attributes.append(self.result_attribute(result,
str(len(tests))))
self.elements = []
for suite in internal_results:
self.elements.append(JUnitTestSuite(suite))
def result_attribute(self, result, count):
return XMLAttribute(self.result_map[result], count)
class JUnitTestSuite(JUnitTestSuites):
name = 'testsuite'
result_map = {
state.Result.Errored: 'errors',
state.Result.Failed: 'failures',
state.Result.Passed: 'tests',
state.Result.Skipped: 'skipped'
}
def __init__(self, suite_result):
results = suite_result.aggregate_test_results()
self.attributes = [
XMLAttribute('name', suite_result.name)
]
for result, tests in results.items():
self.attributes.append(self.result_attribute(result,
str(len(tests))))
self.elements = []
for test in suite_result:
self.elements.append(JUnitTestCase(test))
def result_attribute(self, result, count):
return XMLAttribute(self.result_map[result], count)
class JUnitTestCase(XMLElement):
name = 'testcase'
def __init__(self, test_result):
self.attributes = [
XMLAttribute('name', test_result.name),
# TODO JUnit expects class of test.. add as test metadata.
XMLAttribute('classname', str(test_result.uid)),
XMLAttribute('status', str(test_result.result)),
]
# TODO JUnit expects a message for the reason a test was
# skipped or errored, save this with the test metadata.
# http://llg.cubic.org/docs/junit/
self.elements = [
LargeFileElement('system-err', test_result.stderr),
LargeFileElement('system-out', test_result.stdout),
]
class LargeFileElement(XMLElement):
def __init__(self, name, filename):
self.name = name
self.filename = filename
self.attributes = []
def body(self, file_):
try:
with open(self.filename, 'r') as f:
for line in f:
file_.write(xml.sax.saxutils.escape(line))
except IOError:
# TODO Better error logic, this is sometimes O.K.
# if there was no stdout/stderr captured for the test
#
# TODO If that was the case, the file should still be made and it
# should just be empty instead of not existing.
pass
class JUnitSavedResults:
@staticmethod
def save(results, path):
'''
Compile the internal results into JUnit format writting it to the
given file.
'''
results = JUnitTestSuites(results)
with open(path, 'w') as f:
results.write(f)

216
ext/testlib/runner.py Normal file
View File

@@ -0,0 +1,216 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import multiprocessing.dummy
import threading
import traceback
import helper
import state
import log
import sandbox
from state import Status, Result
from fixture import SkipException
def compute_aggregate_result(iterable):
'''
Status of the test suite by default is:
* Passed if all contained tests passed
* Errored if any contained tests errored
* Failed if no tests errored, but one or more failed.
* Skipped if all contained tests were skipped
'''
failed = []
skipped = []
for testitem in iterable:
result = testitem.result
if result.value == Result.Errored:
return Result(result.value, result.reason)
elif result.value == Result.Failed:
failed.append(result.reason)
elif result.value == result.Skipped:
skipped.append(result.reason)
if failed:
return Result(Result.Failed, failed)
elif skipped:
return Result(Result.Skipped, skipped)
else:
return Result(Result.Passed)
class TestParameters(object):
def __init__(self, test, suite):
self.test = test
self.suite = suite
self.log = log.TestLogWrapper(log.test_log, test, suite)
@helper.cacheresult
def _fixtures(self):
fixtures = {fixture.name:fixture for fixture in self.suite.fixtures}
for fixture in self.test.fixtures:
fixtures[fixture.name] = fixture
return fixtures
@property
def fixtures(self):
return self._fixtures()
class RunnerPattern:
def __init__(self, loaded_testable):
self.testable = loaded_testable
self.builder = FixtureBuilder(self.testable.fixtures)
def handle_error(self, trace):
self.testable.result = Result(Result.Errored, trace)
self.avoid_children(trace)
def handle_skip(self, trace):
self.testable.result = Result(Result.Skipped, trace)
self.avoid_children(trace)
def avoid_children(self, reason):
for testable in self.testable:
testable.result = Result(self.testable.result.value, reason)
testable.status = Status.Avoided
def test(self):
pass
def run(self):
avoided = False
try:
self.testable.status = Status.Building
self.builder.setup(self.testable)
except SkipException:
self.handle_skip(traceback.format_exc())
avoided = True
except BrokenFixtureException:
self.handle_error(traceback.format_exc())
avoided = True
else:
self.testable.status = Status.Running
self.test()
finally:
self.testable.status = Status.TearingDown
self.builder.teardown(self.testable)
if avoided:
self.testable.status = Status.Avoided
else:
self.testable.status = Status.Complete
class TestRunner(RunnerPattern):
def test(self):
self.sandbox_test()
def sandbox_test(self):
try:
sandbox.Sandbox(TestParameters(
self.testable,
self.testable.parent_suite))
except sandbox.SubprocessException:
self.testable.result = Result(Result.Failed,
traceback.format_exc())
else:
self.testable.result = Result(Result.Passed)
class SuiteRunner(RunnerPattern):
def test(self):
for test in self.testable:
test.runner(test).run()
self.testable.result = compute_aggregate_result(
iter(self.testable))
class LibraryRunner(SuiteRunner):
pass
class LibraryParallelRunner(RunnerPattern):
def set_threads(self, threads):
self.threads = threads
def _entrypoint(self, suite):
suite.runner(suite).run()
def test(self):
pool = multiprocessing.dummy.Pool(self.threads)
pool.map(lambda suite : suite.runner(suite).run(), self.testable)
self.testable.result = compute_aggregate_result(
iter(self.testable))
class BrokenFixtureException(Exception):
def __init__(self, fixture, testitem, trace):
self.fixture = fixture
self.testitem = testitem
self.trace = trace
self.msg = ('%s\n'
'Exception raised building "%s" raised SkipException'
' for "%s".' %
(trace, fixture.name, testitem.name)
)
super(BrokenFixtureException, self).__init__(self.msg)
class FixtureBuilder(object):
def __init__(self, fixtures):
self.fixtures = fixtures
self.built_fixtures = []
def setup(self, testitem):
for fixture in self.fixtures:
# Mark as built before, so if the build fails
# we still try to tear it down.
self.built_fixtures.append(fixture)
try:
fixture.setup(testitem)
except SkipException:
raise
except Exception as e:
exc = traceback.format_exc()
msg = 'Exception raised while setting up fixture for %s' %\
testitem.uid
log.test_log.warn('%s\n%s' % (exc, msg))
raise BrokenFixtureException(fixture, testitem,
traceback.format_exc())
def teardown(self, testitem):
for fixture in self.built_fixtures:
try:
fixture.teardown(testitem)
except Exception:
# Log exception but keep cleaning up.
exc = traceback.format_exc()
msg = 'Exception raised while tearing down fixture for %s' %\
testitem.uid
log.test_log.warn('%s\n%s' % (exc, msg))

193
ext/testlib/sandbox.py Normal file
View File

@@ -0,0 +1,193 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import multiprocessing
import pdb
import os
import sys
import threading
import traceback
import log
pdb._Pdb = pdb.Pdb
class ForkedPdb(pdb._Pdb):
'''
A Pdb subclass that may be used from a forked multiprocessing child
'''
io_manager = None
def interaction(self, *args, **kwargs):
_stdin = sys.stdin
self.io_manager.restore_pipes()
try:
sys.stdin = open('/dev/stdin')
pdb._Pdb.interaction(self, *args, **kwargs)
finally:
sys.stdin = _stdin
self.io_manager.replace_pipes()
#TODO Refactor duplicate stdout, stderr logic
class IoManager(object):
def __init__(self, test, suite):
self.test = test
self.suite = suite
self.log = log.test_log
self._init_pipes()
def _init_pipes(self):
self.stdout_rp, self.stdout_wp = os.pipe()
self.stderr_rp, self.stderr_wp = os.pipe()
def close_parent_pipes(self):
os.close(self.stdout_wp)
os.close(self.stderr_wp)
def setup(self):
self.replace_pipes()
self.fixup_pdb()
def fixup_pdb(self):
ForkedPdb.io_manager = self
pdb.Pdb = ForkedPdb
def replace_pipes(self):
self.old_stderr = os.dup(sys.stderr.fileno())
self.old_stdout = os.dup(sys.stdout.fileno())
os.dup2(self.stderr_wp, sys.stderr.fileno())
sys.stderr = os.fdopen(self.stderr_wp, 'w', 0)
os.dup2(self.stdout_wp, sys.stdout.fileno())
sys.stdout = os.fdopen(self.stdout_wp, 'w', 0)
def restore_pipes(self):
self.stderr_wp = os.dup(sys.stderr.fileno())
self.stdout_wp = os.dup(sys.stdout.fileno())
os.dup2(self.old_stderr, sys.stderr.fileno())
sys.stderr = os.fdopen(self.old_stderr, 'w', 0)
os.dup2(self.old_stdout, sys.stdout.fileno())
sys.stdout = os.fdopen(self.old_stdout, 'w', 0)
def start_loggers(self):
self.log_ouput()
def log_ouput(self):
def _log_output(pipe, log_callback):
with os.fdopen(pipe, 'r') as pipe:
# Read iteractively, don't allow input to fill the pipe.
for line in iter(pipe.readline, ''):
log_callback(line)
# Don't keep a backpointer to self in the thread.
log = self.log
test = self.test
suite = self.suite
self.stdout_thread = threading.Thread(
target=_log_output,
args=(self.stdout_rp,
lambda buf: log.test_stdout(test, suite, buf))
)
self.stderr_thread = threading.Thread(
target=_log_output,
args=(self.stderr_rp,
lambda buf: log.test_stderr(test, suite, buf))
)
# Daemon + Join to not lock up main thread if something breaks
# but provide consistent execution if nothing goes wrong.
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
def join_loggers(self):
self.stdout_thread.join()
self.stderr_thread.join()
class SubprocessException(Exception):
def __init__(self, exception, trace):
super(SubprocessException, self).__init__(trace)
class ExceptionProcess(multiprocessing.Process):
class Status():
def __init__(self, exitcode, exception_tuple):
self.exitcode = exitcode
if exception_tuple is not None:
self.trace = exception_tuple[1]
self.exception = exception_tuple[0]
else:
self.exception = None
self.trace = None
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
super(ExceptionProcess, self).run()
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
raise
@property
def status(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self.Status(self.exitcode, self._exception)
class Sandbox(object):
def __init__(self, test_parameters):
self.params = test_parameters
self.io_manager = IoManager(self.params.test, self.params.suite)
self.p = ExceptionProcess(target=self.entrypoint)
# Daemon + Join to not lock up main thread if something breaks
self.p.daemon = True
self.io_manager.start_loggers()
self.p.start()
self.io_manager.close_parent_pipes()
self.p.join()
self.io_manager.join_loggers()
status = self.p.status
if status.exitcode:
raise SubprocessException(status.exception, status.trace)
def entrypoint(self):
self.io_manager.setup()
self.params.test.test(self.params)

63
ext/testlib/state.py Normal file
View File

@@ -0,0 +1,63 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
class Result:
enums = '''
NotRun
Skipped
Passed
Failed
Errored
'''.split()
for idx, enum in enumerate(enums):
locals()[enum] = idx
@classmethod
def name(cls, enum):
return cls.enums[enum]
def __init__(self, value, reason=None):
self.value = value
self.reason = reason
def __str__(self):
return self.name(self.value)
class Status:
enums = '''
Unscheduled
Building
Running
TearingDown
Complete
Avoided
'''.split()
for idx, enum in enumerate(enums):
locals()[enum] = idx
@classmethod
def name(cls, enum):
return cls.enums[enum]

69
ext/testlib/suite.py Normal file
View File

@@ -0,0 +1,69 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import helper
import runner as runner_mod
class TestSuite(object):
'''
An object grouping a collection of tests. It provides tags which enable
filtering during list and run selection. All tests held in the suite must
have a unique name.
..note::
The :func:`__new__` method enables collection of test cases, it must
be called in order for test cases to be collected.
..note::
To reduce test definition boilerplate, the :func:`init` method is
forwarded all `*args` and `**kwargs`. This means derived classes can
define init without boilerplate super().__init__(*args, **kwargs).
'''
runner = runner_mod.SuiteRunner
collector = helper.InstanceCollector()
fixtures = []
tests = []
tags = set()
def __new__(klass, *args, **kwargs):
obj = super(TestSuite, klass).__new__(klass, *args, **kwargs)
TestSuite.collector.collect(obj)
return obj
def __init__(self, name=None, fixtures=tuple(), tests=tuple(),
tags=tuple(), **kwargs):
self.fixtures = self.fixtures + list(fixtures)
self.tags = self.tags | set(tags)
self.tests = self.tests + list(tests)
if name is None:
name = self.__class__.__name__
self.name = name
def __iter__(self):
return iter(self.tests)

165
ext/testlib/terminal.py Normal file
View File

@@ -0,0 +1,165 @@
# Copyright (c) 2011 Advanced Micro Devices, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Author: Steve Reinhardt
import sys
import fcntl
import termios
import struct
# Intended usage example:
#
# if force_colors:
# from m5.util.terminal import termcap
# elif no_colors:
# from m5.util.terminal import no_termcap as termcap
# else:
# from m5.util.terminal import tty_termcap as termcap
# print termcap.Blue + "This could be blue!" + termcap.Normal
# ANSI color names in index order
color_names = "Black Red Green Yellow Blue Magenta Cyan White".split()
default_separator = '='
# Character attribute capabilities. Note that not all terminals
# support all of these capabilities, or support them
# differently/meaningfully. For example:
#
# - In PuTTY (with the default settings), Dim has no effect, Standout
# is the same as Reverse, and Blink does not blink but switches to a
# gray background.
#
# Please feel free to add information about other terminals here.
#
capability_map = {
'Bold': 'bold',
'Dim': 'dim',
'Blink': 'blink',
'Underline': 'smul',
'Reverse': 'rev',
'Standout': 'smso',
'Normal': 'sgr0'
}
capability_names = capability_map.keys()
def null_cap_string(s, *args):
return ''
try:
import curses
curses.setupterm()
def cap_string(s, *args):
cap = curses.tigetstr(s)
if cap:
return curses.tparm(cap, *args)
else:
return ''
except:
cap_string = null_cap_string
class ColorStrings(object):
def __init__(self, cap_string):
for i, c in enumerate(color_names):
setattr(self, c, cap_string('setaf', i))
for name, cap in capability_map.iteritems():
setattr(self, name, cap_string(cap))
termcap = ColorStrings(cap_string)
no_termcap = ColorStrings(null_cap_string)
if sys.stdout.isatty():
tty_termcap = termcap
else:
tty_termcap = no_termcap
def get_termcap(use_colors = None):
if use_colors:
return termcap
elif use_colors is None:
# option unspecified; default behavior is to use colors iff isatty
return tty_termcap
else:
return no_termcap
def terminal_size():
'''Return the (width, heigth) of the terminal screen.'''
h, w, hp, wp = struct.unpack('HHHH',
fcntl.ioctl(0, termios.TIOCGWINSZ,
struct.pack('HHHH', 0, 0, 0, 0)))
return w, h
def separator(char=default_separator, color=None):
'''
Return a separator of the given character that is the length of the full
width of the terminal screen.
'''
(w, h) = terminal_size()
if color:
return color + char*w + termcap.Normal
else:
return char*w
def insert_separator(inside, char=default_separator,
min_barrier=3, color=None):
'''
Place the given string inside of the separator. If it does not fit inside,
expand the separator to fit it with at least min_barrier.
.. seealso:: :func:`separator`
'''
# Use a bytearray so it's efficient to manipulate
string = bytearray(separator(char, color=color))
# Check if we can fit inside with at least min_barrier.
gap = (len(string) - len(inside)) - min_barrier * 2
if gap > 0:
# We'll need to expand the string to fit us.
string.extend([ char for _ in range(-gap)])
# Emplace inside
middle = ((len(string)-1)/2)
start_idx = middle - len(inside)/2
string[start_idx:len(inside)+start_idx] = inside
return str(string)
if __name__ == '__main__':
def test_termcap(obj):
for c_name in color_names:
c_str = getattr(obj, c_name)
print c_str + c_name + obj.Normal
for attr_name in capability_names:
if attr_name == 'Normal':
continue
attr_str = getattr(obj, attr_name)
print attr_str + c_str + attr_name + " " + c_name + obj.Normal
print obj.Bold + obj.Underline + \
c_name + "Bold Underline " + c_str + obj.Normal
print "=== termcap enabled ==="
test_termcap(termcap)
print termcap.Normal
print "=== termcap disabled ==="
test_termcap(no_termcap)

91
ext/testlib/test.py Normal file
View File

@@ -0,0 +1,91 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import functools
import helper
import runner as runner_mod
class TestCase(object):
'''
Base class for all tests.
..note::
The :func:`__new__` method enables collection of test cases, it must
be called in order for test cases to be collected.
'''
fixtures = []
# TODO, remove explicit dependency. Use the loader to set the
# default runner
runner = runner_mod.TestRunner
collector = helper.InstanceCollector()
def __new__(cls, *args, **kwargs):
obj = super(TestCase, cls).__new__(cls, *args, **kwargs)
TestCase.collector.collect(obj)
return obj
def __init__(self, name=None, fixtures=tuple(), **kwargs):
self.fixtures = self.fixtures + list(fixtures)
if name is None:
name = self.__class__.__name__
self.name = name
class TestFunction(TestCase):
'''
TestCase implementation which uses a callable object as a test.
'''
def __init__(self, function, name=None, **kwargs):
self.test_function = function
if name is None:
name = function.__name__
TestCase.__init__(self, name=name, **kwargs)
def test(self, *args, **kwargs):
self.test_function(*args, **kwargs)
# TODO Change the decorator to make this easier to create copy tests.
# Good way to do so might be return by reference.
def testfunction(function=None, name=None, fixtures=tuple()):
'''
A decorator used to wrap a function as a TestFunction.
'''
def testfunctiondecorator(function):
'''Decorator used to mark a function as a test case.'''
kwargs = {}
if name is not None:
kwargs['name'] = name
if fixtures is not None:
kwargs['fixtures'] = fixtures
TestFunction(function, **kwargs)
return function
if function is not None:
return testfunctiondecorator(function)
else:
return testfunctiondecorator

110
ext/testlib/uid.py Normal file
View File

@@ -0,0 +1,110 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import os
import itertools
import config
class UID(object):
sep = ':'
type_idx, path_idx = range(2)
def __init__(self, path, *args):
self.path = self._shorten_path(path)
self.attributes = args
@staticmethod
def _shorten_path(path):
return os.path.relpath(path,
os.path.commonprefix((config.constants.testing_base,
path)))
@staticmethod
def _full_path(short_path):
return os.path.join(config.constants.testing_base, short_path)
@classmethod
def uid_to_path(cls, uid):
split_path = str(uid).split(cls.sep)[cls.path_idx]
return cls._full_path(split_path)
@classmethod
def uid_to_class(cls, uid):
return globals()[uid.split(cls.sep)[cls.type_idx]]
@classmethod
def from_suite(self, suite, filepath):
return SuiteUID(filepath, suite.name)
@classmethod
def from_test(self, test, filepath):
return TestUID(filepath, test.name, test.parent_suite.name)
@classmethod
def from_uid(cls, uid):
args = uid.split(cls.sep)
del args[cls.type_idx]
return cls.uid_to_class(uid)(*args)
def __str__(self):
common_opts = {
self.path_idx: self.path,
self.type_idx: self.__class__.__name__
}
return self.sep.join(itertools.chain(
[common_opts[0], common_opts[1]],
self.attributes))
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
return type(self) == type(other) and str(self) == str(other)
class TestUID(UID):
def __init__(self, filename, test_name, suite_name):
UID.__init__(self, filename, suite_name, test_name)
@property
def test(self):
return self.attributes[1]
@property
def suite(self):
return self.attributes[0]
class SuiteUID(UID):
def __init__(self, filename, suite_name):
UID.__init__(self, filename, suite_name)
@property
def suite(self):
return self.attributes[0]

236
ext/testlib/wrappers.py Normal file
View File

@@ -0,0 +1,236 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
Module contains wrappers for test items that have been
loaded by the testlib :class:`testlib.loader.Loader`.
'''
import itertools
import log
import uid
from state import Status, Result
class TestCaseMetadata():
def __init__(self, name, uid, path, result, status, suite_uid):
self.name = name
self.uid = uid
self.path = path
self.status = status
self.result = result
self.suite_uid = suite_uid
class TestSuiteMetadata():
def __init__(self, name, uid, tags, path, status, result):
self.name = name
self.uid = uid
self.tags = tags
self.path = path
self.status = status
self.result = result
class LibraryMetadata():
def __init__(self, name, result, status):
self.name = name
self.result = result
self.status = status
class LoadedTestable(object):
'''
Base class for loaded test items.
:property:`result` and :property:`status` setters
notify testlib via the :func:`log_result` and :func:`log_status`
of the updated status.
'''
def __init__(self, obj):
self.obj = obj
self.metadata = self._generate_metadata()
@property
def status(self):
return self.metadata.status
@status.setter
def status(self, status):
self.log_status(status)
self.metadata.status = status
@property
def result(self):
return self.metadata.result
@result.setter
def result(self, result):
self.log_result(result)
self.metadata.result = result
@property
def uid(self):
return self.metadata.uid
@property
def name(self):
return self.metadata.name
@property
def fixtures(self):
return self.obj.fixtures
@fixtures.setter
def fixtures(self, fixtures):
self.obj.fixtures = fixtures
@property
def runner(self):
return self.obj.runner
# TODO Change log to provide status_update, result_update for all types.
def log_status(self, status):
log.test_log.status_update(self, status)
def log_result(self, result):
log.test_log.result_update(self, result)
def __iter__(self):
return iter(())
class LoadedTest(LoadedTestable):
def __init__(self, test_obj, loaded_suite, path):
self.parent_suite = loaded_suite
self._path = path
LoadedTestable.__init__(self, test_obj)
def test(self, *args, **kwargs):
self.obj.test(*args, **kwargs)
def _generate_metadata(self):
return TestCaseMetadata( **{
'name':self.obj.name,
'path': self._path,
'uid': uid.TestUID(self._path,
self.obj.name,
self.parent_suite.name),
'status': Status.Unscheduled,
'result': Result(Result.NotRun),
'suite_uid': self.parent_suite.metadata.uid
})
class LoadedSuite(LoadedTestable):
def __init__(self, suite_obj, path):
self._path = path
LoadedTestable.__init__(self, suite_obj)
self.tests = self._wrap_children(suite_obj)
def _wrap_children(self, suite_obj):
return [LoadedTest(test, self, self.metadata.path)
for test in suite_obj]
def _generate_metadata(self):
return TestSuiteMetadata( **{
'name': self.obj.name,
'tags':self.obj.tags,
'path': self._path,
'uid': uid.SuiteUID(self._path, self.obj.name),
'status': Status.Unscheduled,
'result': Result(Result.NotRun)
})
def __iter__(self):
return iter(self.tests)
@property
def tags(self):
return self.metadata.tags
class LoadedLibrary(LoadedTestable):
'''
Wraps a collection of all loaded test suites and
provides utility functions for accessing fixtures.
'''
def __init__(self, suites, global_fixtures):
LoadedTestable.__init__(self, suites)
self.global_fixtures = global_fixtures
def _generate_metadata(self):
return LibraryMetadata( **{
'name': 'Test Library',
'status': Status.Unscheduled,
'result': Result(Result.NotRun)
})
def __iter__(self):
'''
:returns: an iterator over contained :class:`TestSuite` objects.
'''
return iter(self.obj)
def all_fixture_tuples(self):
return itertools.chain(
self.global_fixtures,
*(suite.fixtures for suite in self.obj))
def all_fixtures(self):
'''
:returns: an interator overall all global, suite,
and test fixtures
'''
return itertools.chain(itertools.chain(
self.global_fixtures,
*(suite.fixtures for suite in self.obj)),
*(self.test_fixtures(suite) for suite in self.obj)
)
def test_fixtures(self, suite):
'''
:returns: an interator over all fixtures of each
test contained in the given suite
'''
return itertools.chain(*(test.fixtures for test in suite))
@property
def fixtures(self):
return self.global_fixtures
@property
def uid(self):
return self.name
@property
def suites(self):
return self.obj
@suites.setter
def suites(self, suites):
self.obj = suites

1
tests/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.testing-results

33
tests/gem5/__init__.py Normal file
View File

@@ -0,0 +1,33 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import suite
import fixture
from suite import *
from fixture import *

258
tests/gem5/fixture.py Normal file
View File

@@ -0,0 +1,258 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import os
import tempfile
import shutil
from testlib.fixture import Fixture, globalfixture
from testlib.config import config, constants
from testlib.helper import log_call, cacheresult, joinpath, absdirpath
import testlib.log as log
class VariableFixture(Fixture):
def __init__(self, value=None, name=None):
super(VariableFixture, self).__init__(name=name)
self.value = value
class TempdirFixture(Fixture):
def __init__(self):
self.path = None
super(TempdirFixture, self).__init__(
name=constants.tempdir_fixture_name)
def setup(self, testitem):
self.path = tempfile.mkdtemp(prefix='gem5out')
def teardown(self, testitem):
if self.path is not None:
shutil.rmtree(self.path)
class SConsFixture(Fixture):
'''
Fixture will wait until all SCons targets are collected and tests are
about to be ran, then will invocate a single instance of SCons for all
targets.
:param directory: The directory which scons will -C (cd) into before
executing. If None is provided, will choose the config base_dir.
'''
def __init__(self, directory=None, target_class=None):
self.directory = directory if directory else config.base_dir
self.target_class = target_class if target_class else SConsTarget
self.threads = config.threads
self.targets = set()
super(SConsFixture, self).__init__()
def setup(self, testitem):
if config.skip_build:
return
command = [
'scons', '-C', self.directory,
'-j', str(self.threads),
'--ignore-style'
]
if not self.targets:
log.test_log.warn(
'No SCons targets specified, this will'
' build the default all target.\n'
'This is likely unintended, and you'
' may wish to kill testlib and reconfigure.')
else:
log.test_log.message(
'Building the following targets.'
' This may take a while.')
log.test_log.message('%s' % (', '.join(self.targets)))
log.test_log.message(
"You may want to run with only a single ISA"
"(--isa=), use --skip-build, or use 'rerun'.")
command.extend(self.targets)
log_call(log.test_log, command)
class SConsTarget(Fixture):
# The singleton scons fixture we'll use for all targets.
default_scons_invocation = None
def __init__(self, target, build_dir=None, invocation=None):
'''
Represents a target to be built by an 'invocation' of scons.
:param target: The target known to scons.
:param build_dir: The 'build' directory path which will be prepended
to the target name.
:param invocation: Represents an invocation of scons which we will
automatically attach this target to. If None provided, uses the
main 'scons' invocation.
'''
if build_dir is None:
build_dir = config.build_dir
self.target = os.path.join(build_dir, target)
super(SConsTarget, self).__init__(name=target)
if invocation is None:
if self.default_scons_invocation is None:
SConsTarget.default_scons_invocation = SConsFixture()
globalfixture(SConsTarget.default_scons_invocation)
invocation = self.default_scons_invocation
self.invocation = invocation
def schedule_finalized(self, schedule):
self.invocation.targets.add(self.target)
return Fixture.schedule_finalized(self, schedule)
class Gem5Fixture(SConsTarget):
def __init__(self, isa, variant):
target = joinpath(isa.upper(), 'gem5.%s' % variant)
super(Gem5Fixture, self).__init__(target)
self.name = constants.gem5_binary_fixture_name
self.path = self.target
self.isa = isa
self.variant = variant
class MakeFixture(Fixture):
def __init__(self, directory, *args, **kwargs):
name = 'make -C %s' % directory
super(MakeFixture, self).__init__(build_once=True, lazy_init=False,
name=name,
*args, **kwargs)
self.targets = []
self.directory = directory
def setup(self):
super(MakeFixture, self).setup()
targets = set(self.required_by)
command = ['make', '-C', self.directory]
command.extend([target.target for target in targets])
log_call(command)
class MakeTarget(Fixture):
def __init__(self, target, make_fixture=None, *args, **kwargs):
'''
:param make_fixture: The make invocation we will be attached to.
Since we don't have a single global instance of make in gem5 like we do
scons we need to know what invocation to attach to. If none given,
creates its own.
'''
super(MakeTarget, self).__init__(name=target, *args, **kwargs)
self.target = self.name
if make_fixture is None:
make_fixture = MakeFixture(
absdirpath(target),
lazy_init=True,
build_once=False)
self.make_fixture = make_fixture
# Add our self to the required targets of the main MakeFixture
self.require(self.make_fixture)
def setup(self, testitem):
super(MakeTarget, self).setup()
self.make_fixture.setup()
return self
class TestProgram(MakeTarget):
def __init__(self, program, isa, os, recompile=False):
make_dir = joinpath('test-progs', program)
make_fixture = MakeFixture(make_dir)
target = joinpath('bin', isa, os, program)
super(TestProgram, self).__init__(target, make_fixture)
self.path = joinpath(make_dir, target)
self.recompile = recompile
def setup(self, testitem):
# Check if the program exists if it does then only compile if
# recompile was given.
if self.recompile:
super(MakeTarget, self).setup()
elif not os.path.exists(self.path):
super(MakeTarget, self).setup()
class DownloadedProgram(Fixture):
""" Like TestProgram, but checks the version in the gem5 binary repository
and downloads an updated version if it is needed.
"""
urlbase = "http://gem5.org/dist/current/"
def __init__(self, path, program, **kwargs):
super(DownloadedProgram, self).__init__("download-" + program,
build_once=True, **kwargs)
self.program_dir = joinpath('test-progs', path)
self.path = joinpath(self.program_dir, program)
self.url = self.urlbase + self.path
def _download(self):
import urllib
log.test_log.debug("Downloading " + self.url + " to " + self.path)
if not os.path.exists(self.program_dir):
os.makedirs(self.program_dir)
urllib.urlretrieve(self.url, self.path)
def _getremotetime(self):
import urllib2, datetime, time
import _strptime # Needed for python threading bug
u = urllib2.urlopen(self.url)
return time.mktime(datetime.datetime.strptime( \
u.info().getheaders("Last-Modified")[0],
"%a, %d %b %Y %X GMT").timetuple())
def setup(self, testitem):
import urllib2
# Check to see if there is a file downloaded
if not os.path.exists(self.path):
self._download()
else:
try:
t = self._getremotetime()
except urllib2.URLError:
# Problem checking the server, use the old files.
log.debug("Could not contact server. Binaries may be old.")
return
# If the server version is more recent, download it
if t > os.path.getmtime(self.path):
self._download()

163
tests/gem5/suite.py Normal file
View File

@@ -0,0 +1,163 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
import os
import copy
import subprocess
from testlib.test import TestFunction
from testlib.suite import TestSuite
from testlib.helper import log_call
from testlib.config import constants, config
from fixture import TempdirFixture, Gem5Fixture, VariableFixture
import verifier
def gem5_verify_config(name,
config,
config_args,
verifiers,
gem5_args=tuple(),
fixtures=[],
valid_isas=constants.supported_isas,
valid_variants=constants.supported_variants,
length=constants.supported_lengths[0]):
'''
Helper class to generate common gem5 tests using verifiers.
The generated TestSuite will run gem5 with the provided config and
config_args. After that it will run any provided verifiers to verify
details about the gem5 run.
.. seealso:: For the verifiers see :mod:`testlib.gem5.verifier`
:param name: Name of the test.
:param config: The config to give gem5.
:param config_args: A list of arguments to pass to the given config.
:param verifiers: An iterable with Verifier instances which will be placed
into a suite that will be ran after a gem5 run.
:param gem5_args: An iterable with arguments to give to gem5. (Arguments
that would normally go before the config path.)
:param valid_isas: An iterable with the isas that this test can be ran
for. If None given, will run for all supported_isas.
:param valid_variants: An iterable with the variant levels that
this test can be ran for. (E.g. opt, debug)
'''
fixtures = list(fixtures)
testsuites = []
for opt in valid_variants:
for isa in valid_isas:
# Create a tempdir fixture to be shared throughout the test.
tempdir = TempdirFixture()
gem5_returncode = VariableFixture(
name=constants.gem5_returncode_fixture_name)
# Common name of this generated testcase.
_name = '{given_name}-{isa}-{opt}'.format(
given_name=name,
isa=isa,
opt=opt)
# Create the running of gem5 subtest.
# NOTE: We specifically create this test before our verifiers so
# this is listed first.
tests = []
gem5_execution = TestFunction(
_create_test_run_gem5(config, config_args, gem5_args),
name=_name)
tests.append(gem5_execution)
# Create copies of the verifier subtests for this isa and
# variant.
for verifier in verifiers:
tests.append(verifier.instantiate_test(_name))
# Add the isa and variant to tags list.
tags = [isa, opt, length]
# Create the gem5 target for the specific architecture and
# variant.
_fixtures = copy.copy(fixtures)
_fixtures.append(Gem5Fixture(isa, opt))
_fixtures.append(tempdir)
_fixtures.append(gem5_returncode)
# Finally construct the self contained TestSuite out of our
# tests.
testsuites.append(TestSuite(
name=_name,
fixtures=_fixtures,
tags=tags,
tests=tests))
return testsuites
def _create_test_run_gem5(config, config_args, gem5_args):
def test_run_gem5(params):
'''
Simple \'test\' which runs gem5 and saves the result into a tempdir.
NOTE: Requires fixtures: tempdir, gem5
'''
fixtures = params.fixtures
if gem5_args is None:
_gem5_args = tuple()
elif isinstance(gem5_args, str):
# If just a single str, place it in an iterable
_gem5_args = (gem5_args,)
else:
_gem5_args = gem5_args
# FIXME/TODO: I don't like the idea of having to modify this test run
# or always collect results even if not using a verifier. There should
# be some configuration in here that only gathers certain results for
# certain verifiers.
#
# I.E. Only the returncode verifier will use the gem5_returncode
# fixture, but we always require it even if that verifier isn't being
# ran.
returncode = fixtures[constants.gem5_returncode_fixture_name]
tempdir = fixtures[constants.tempdir_fixture_name].path
gem5 = fixtures[constants.gem5_binary_fixture_name].path
command = [
gem5,
'-d', # Set redirect dir to tempdir.
tempdir,
'-re',# TODO: Change to const. Redirect stdout and stderr
]
command.extend(_gem5_args)
command.append(config)
# Config_args should set up the program args.
command.extend(config_args)
returncode.value = log_call(params.log, command)
return test_run_gem5

205
tests/gem5/verifier.py Normal file
View File

@@ -0,0 +1,205 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
Built in test cases that verify particular details about a gem5 run.
'''
import re
from testlib import test
from testlib.config import constants
from testlib.helper import joinpath, diff_out_file
class Verifier(object):
def __init__(self, fixtures=tuple()):
self.fixtures = fixtures
def _test(self, *args, **kwargs):
# Use a callback wrapper to make stack
# traces easier to understand.
self.test(*args, **kwargs)
def instantiate_test(self, name_pfx):
name = '-'.join([name_pfx, self.__class__.__name__])
return test.TestFunction(self._test,
name=name, fixtures=self.fixtures)
def failed(self, fixtures):
'''
Called if this verifier fails to cleanup (or not) as needed.
'''
try:
fixtures[constants.tempdir_fixture_name].skip_cleanup()
except KeyError:
pass # No need to do anything if the tempdir fixture doesn't exist
class MatchGoldStandard(Verifier):
'''
Compares a standard output to the test output and passes if they match,
fails if they do not.
'''
def __init__(self, standard_filename, ignore_regex=None,
test_filename='simout'):
'''
:param standard_filename: The path of the standard file to compare
output to.
:param ignore_regex: A string, compiled regex, or iterable containing
either which will be ignored in 'standard' and test output files when
diffing.
'''
super(MatchGoldStandard, self).__init__()
self.standard_filename = standard_filename
self.test_filename = test_filename
self.ignore_regex = _iterable_regex(ignore_regex)
def test(self, params):
# We need a tempdir fixture from our parent verifier suite.
fixtures = params.fixtures
# Get the file from the tempdir of the test.
tempdir = fixtures[constants.tempdir_fixture_name].path
self.test_filename = joinpath(tempdir, self.test_filename)
diff = diff_out_file(self.standard_filename,
self.test_filename,
ignore_regexes=self.ignore_regex,
logger=params.log)
if diff is not None:
self.failed(fixtures)
test.fail('Stdout did not match:\n%s\nSee %s for full results'
% (diff, tempdir))
def _generic_instance_warning(self, kwargs):
'''
Method for helper classes to tell users to use this more generic class
if they are going to manually override the test_filename param.
'''
if 'test_filename' in kwargs:
raise ValueError('If you are setting test_filename use the more'
' generic %s'
' instead' % MatchGoldStandard.__name__)
class DerivedGoldStandard(MatchGoldStandard):
__ignore_regex_sentinel = object()
_file = None
_default_ignore_regex = []
def __init__(self, standard_filename,
ignore_regex=__ignore_regex_sentinel, **kwargs):
if ignore_regex == self.__ignore_regex_sentinel:
ignore_regex = self._default_ignore_regex
self._generic_instance_warning(kwargs)
super(DerivedGoldStandard, self).__init__(
standard_filename,
test_filename=self._file,
ignore_regex=ignore_regex,
**kwargs)
class MatchStdout(DerivedGoldStandard):
_file = constants.gem5_simulation_stdout
_default_ignore_regex = [
re.compile('^Redirecting (stdout|stderr) to'),
re.compile('^gem5 compiled '),
re.compile('^gem5 started '),
re.compile('^gem5 executing on '),
re.compile('^command line:'),
re.compile("^Couldn't import dot_parser,"),
re.compile("^info: kernel located at:"),
re.compile("^Couldn't unlink "),
re.compile("^Using GPU kernel code file\(s\) "),
]
class MatchStdoutNoPerf(MatchStdout):
_file = constants.gem5_simulation_stdout
_default_ignore_regex = MatchStdout._default_ignore_regex + [
re.compile('^Exiting @ tick'),
]
class MatchStderr(DerivedGoldStandard):
_file = constants.gem5_simulation_stderr
_default_ignore_regex = []
class MatchStats(DerivedGoldStandard):
# TODO: Likely will want to change this verifier since we have the weird
# perl script right now. A simple diff probably isn't going to work.
_file = constants.gem5_simulation_stats
_default_ignore_regex = []
class MatchConfigINI(DerivedGoldStandard):
_file = constants.gem5_simulation_config_ini
_default_ignore_regex = (
re.compile("^(executable|readfile|kernel|image_file)="),
re.compile("^(cwd|input|codefile)="),
)
class MatchConfigJSON(DerivedGoldStandard):
_file = constants.gem5_simulation_config_json
_default_ignore_regex = (
re.compile(r'''^\s*"(executable|readfile|kernel|image_file)":'''),
re.compile(r'''^\s*"(cwd|input|codefile)":'''),
)
class MatchRegex(Verifier):
def __init__(self, regex, match_stderr=True, match_stdout=True):
super(MatchRegex, self).__init__()
self.regex = _iterable_regex(regex)
self.match_stderr = match_stderr
self.match_stdout = match_stdout
def test(self, params):
fixtures = params.fixtures
# Get the file from the tempdir of the test.
tempdir = fixtures[constants.tempdir_fixture_name].path
def parse_file(fname):
with open(fname, 'r') as file_:
for line in file_:
for regex in self.regex:
if re.match(regex, line):
return True
if self.match_stdout:
if parse_file(joinpath(tempdir,
constants.gem5_simulation_stdout)):
return # Success
if self.match_stderr:
if parse_file(joinpath(tempdir,
constants.gem5_simulation_stderr)):
return # Success
self.failed(fixtures)
test.fail('Could not match regex.')
_re_type = type(re.compile(''))
def _iterable_regex(regex):
if isinstance(regex, _re_type) or isinstance(regex, str):
regex = (regex,)
return regex

121
tests/legacy-configs/run.py Normal file
View File

@@ -0,0 +1,121 @@
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Sean Wilson
'''
New version of the run.py script. For this, all dependencies should be
handled outside of the script.
.. warning:: This script is NOT the recommended way to handle configurations
for new tests. This exists for legacy support only. New Tests should
either use configs from the normal gem5 configs or create their own for
a test.
'''
import argparse
import sys
import os
from os.path import abspath, join as joinpath, dirname
import m5
# Add the normal gem5 config path to system path.
# This requirement should be removed if possible from all legacy scripts, but
# I've left it here for now.
sys.path.insert(0, abspath(joinpath(dirname(__file__), '../../configs')))
# set default maxtick... script can override
# -1 means run forever
maxtick = m5.MaxTick
def run_test(root):
"""Default run_test implementations. Scripts can override it."""
# instantiate configuration
m5.instantiate()
# simulate until program terminates
exit_event = m5.simulate(maxtick)
print 'Exiting @ tick', m5.curTick(), 'because', exit_event.getCause()
test_progs = os.environ.get('M5_TEST_PROGS', '/dist/m5/regression/test-progs')
# Since we're in batch mode, dont allow tcp socket connections
m5.disableAllListeners()
parser = argparse.ArgumentParser()
parser.add_argument('--cmd',
action='store',
type=str,
help='Command to pass to the test system')
parser.add_argument('--executable',
action='store',
type=str,
help='Executable to pass to the test system')
parser.add_argument('--config',
action='append',
type=str,
help='A config file to initialize the system with.'\
+ ' If more than one given, loads them in order given.')
args = parser.parse_args()
executable = args.executable
for config in args.config:
execfile(config)
# Initialize all CPUs in a system
def initCPUs(sys):
def initCPU(cpu):
# We might actually have a MemTest object or something similar
# here that just pretends to be a CPU.
try:
cpu.createThreads()
except:
pass
# The CPU attribute doesn't exist in some cases, e.g. the Ruby testers.
if not hasattr(sys, "cpu"):
return
# The CPU can either be a list of CPUs or a single object.
if isinstance(sys.cpu, list):
[ initCPU(cpu) for cpu in sys.cpu ]
else:
initCPU(sys.cpu)
# TODO: Might want to automatically place the cmd and executable on the
# cpu[0].workload, although I think most legacy configs do this automatically
# or somewhere in their `test.py` config.
# We might be creating a single system or a dual system. Try
# initializing the CPUs in all known system attributes.
for sysattr in [ "system", "testsys", "drivesys" ]:
if hasattr(root, sysattr):
initCPUs(getattr(root, sysattr))
run_test(root)

24
tests/main.py Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python2
'''
The main source for testlib. Ties together the default test runners and
loaders.
Discovers and runs all tests from a given root directory.
'''
from __future__ import print_function
import sys
import os
base_dir = os.path.dirname(os.path.abspath(__name__))
ext_path = os.path.join(base_dir, os.pardir, 'ext')
sys.path.insert(0, base_dir)
sys.path.insert(0, ext_path)
import testlib.main as testlib
import testlib.config as config
import testlib.helper as helper
config.basedir = helper.absdirpath(__file__)
testlib()