blob: 7616bf1af513f6a21df3cc7106d680352f7c2705 [file] [log] [blame]
#!/usr/bin/env python3
import argparse
import csv
import cmd
import copy
import json
import logging
import netrc
import os
import re
import shlex
import shutil
import subprocess
import sys
import textwrap
import time
from uuid import uuid4
from datetime import datetime
from distutils.spawn import find_executable
try:
from squad_client.core.api import SquadApi
from squad_client.shortcuts import submit_results
from squad_client.core.models import Squad
from urllib.parse import urlparse
except ImportError as e:
logger = logging.getLogger("RUNNER")
logger.warning("squad_client is needed if you want to upload to qa-reports")
try:
import pexpect
import yaml
except ImportError as e:
print(e)
print("Please run the below command to install modules required")
print("pip3 install -r ${REPO_PATH}/automated/utils/requirements.txt")
sys.exit(1)
class StoreDictKeyPair(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
self._nargs = nargs
super(StoreDictKeyPair, self).__init__(
option_strings, dest, nargs=nargs, **kwargs
)
def __call__(self, parser, namespace, values, option_string=None):
my_dict = {}
for kv in values:
if "=" in kv:
k, v = kv.split("=", 1)
my_dict[k] = v
else:
print("Invalid parameter: %s" % kv)
setattr(namespace, self.dest, my_dict)
# quit gracefully if the connection is closed by remote host
SSH_PARAMS = "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ServerAliveInterval=5"
def run_command(command, target=None):
"""Run a shell command. If target is specified, ssh to the given target first."""
run = command
if target:
run = 'ssh {} {} "{}"'.format(SSH_PARAMS, target, command)
logger = logging.getLogger("RUNNER.run_command")
logger.debug(run)
if sys.version_info[0] < 3:
return subprocess.check_output(shlex.split(run)).strip()
else:
return subprocess.check_output(shlex.split(run)).strip().decode("utf-8")
class TestPlan(object):
"""
Analysis args specified, then generate test plan.
"""
def __init__(self, args):
self.test_def = args.test_def
self.test_plan = args.test_plan
self.timeout = args.timeout
self.skip_install = args.skip_install
self.logger = logging.getLogger("RUNNER.TestPlan")
self.overlay = args.overlay
def apply_overlay(self, test_list):
fixed_test_list = copy.deepcopy(test_list)
logger = logging.getLogger("RUNNER.TestPlan.Overlay")
with open(self.overlay) as f:
data = yaml.load(f, Loader=yaml.SafeLoader)
if data.get("skip"):
skip_tests = data["skip"]
for test in test_list:
for skip_test in skip_tests:
if (
test["path"] == skip_test["path"]
and test["repository"] == skip_test["repository"]
):
fixed_test_list.remove(test)
logger.info("Skipped: {}".format(test))
else:
continue
if data.get("amend"):
amend_tests = data["amend"]
for test in fixed_test_list:
for amend_test in amend_tests:
if (
test["path"] == amend_test["path"]
and test["repository"] == amend_test["repository"]
):
if amend_test.get("parameters"):
if test.get("parameters"):
test["parameters"].update(amend_test["parameters"])
else:
test["parameters"] = amend_test["parameters"]
logger.info("Updated: {}".format(test))
else:
logger.warning(
"'parameters' not found in {}, nothing to amend.".format(
amend_test
)
)
if data.get("add"):
add_tests = data["add"]
unique_add_tests = []
for test in add_tests:
if test not in unique_add_tests:
unique_add_tests.append(test)
else:
logger.warning("Skipping duplicate test {}".format(test))
for test in test_list:
del test["uuid"]
for add_test in unique_add_tests:
if add_test in test_list:
logger.warning(
"{} already included in test plan, do nothing.".format(add_test)
)
else:
add_test["uuid"] = str(uuid4())
fixed_test_list.append(add_test)
logger.info("Added: {}".format(add_test))
return fixed_test_list
def test_list(self, kind="automated"):
if self.test_def:
if not os.path.exists(self.test_def):
self.logger.error(" %s NOT found, exiting..." % self.test_def)
sys.exit(1)
test_list = [{"path": self.test_def}]
test_list[0]["uuid"] = str(uuid4())
test_list[0]["timeout"] = self.timeout
test_list[0]["skip_install"] = self.skip_install
elif self.test_plan:
if not os.path.exists(self.test_plan):
self.logger.error(" %s NOT found, exiting..." % self.test_plan)
sys.exit(1)
with open(self.test_plan, "r") as f:
test_plan = yaml.safe_load(f)
try:
plan_version = test_plan["metadata"].get("format")
self.logger.info("Test plan version: {}".format(plan_version))
tests = []
if plan_version == "Linaro Test Plan v2":
tests = test_plan["tests"][kind]
elif plan_version == "Linaro Test Plan v1" or plan_version is None:
for requirement in test_plan["requirements"]:
if "tests" in requirement.keys():
if (
requirement["tests"]
and kind in requirement["tests"].keys()
and requirement["tests"][kind]
):
for test in requirement["tests"][kind]:
tests.append(test)
test_list = []
unique_tests = [] # List of test hashes
for test in tests:
test_hash = hash(json.dumps(test, sort_keys=True))
if test_hash in unique_tests:
# Test is already in the test_list; don't add it again.
self.logger.warning("Skipping duplicate test {}".format(test))
continue
unique_tests.append(test_hash)
test_list.append(test)
for test in test_list:
test["uuid"] = str(uuid4())
except KeyError as e:
self.logger.error("%s is missing from test plan" % str(e))
sys.exit(1)
else:
self.logger.error("Plese specify a test or test plan.")
sys.exit(1)
if self.overlay is None:
return test_list
else:
return self.apply_overlay(test_list)
class TestSetup(object):
"""
Create directories required, then copy files needed to these directories.
"""
def __init__(self, test, args):
self.test = test
self.args = args
self.logger = logging.getLogger("RUNNER.TestSetup")
self.test_kind = args.kind
self.test_version = test.get("version", None)
def validate_env(self):
# Inspect if environment set properly.
try:
self.repo_path = os.environ["REPO_PATH"]
except KeyError:
self.logger.error("KeyError: REPO_PATH")
self.logger.error(
"Please run '. ./bin/setenv.sh' to setup test environment"
)
sys.exit(1)
def create_dir(self):
if not os.path.exists(self.test["output"]):
os.makedirs(self.test["output"])
self.logger.info("Output directory created: %s" % self.test["output"])
def copy_test_repo(self):
self.validate_env()
shutil.rmtree(self.test["test_path"], ignore_errors=True)
if self.repo_path in self.test["test_path"]:
self.logger.error(
"Cannot copy repository into itself. Please choose output directory outside repository path"
)
sys.exit(1)
shutil.copytree(self.repo_path, self.test["test_path"], symlinks=True)
self.logger.info("Test repo copied to: %s" % self.test["test_path"])
def checkout_version(self):
if self.test_version:
path = os.getcwd()
os.chdir(self.test["test_path"])
subprocess.call("git checkout %s" % self.test_version, shell=True)
os.chdir(path)
def create_uuid_file(self):
with open("%s/uuid" % self.test["test_path"], "w") as f:
f.write(self.test["uuid"])
class TestDefinition(object):
"""
Convert test definition to testdef.yaml, testdef_metadata and run.sh.
"""
def __init__(self, test, args):
self.test = test
self.args = args
self.logger = logging.getLogger("RUNNER.TestDef")
self.skip_install = args.skip_install
self.is_manual = False
if "skip_install" in test:
self.skip_install = test["skip_install"]
self.custom_params = None
if "parameters" in test:
self.custom_params = test["parameters"]
if "params" in test:
self.custom_params = test["params"]
self.exists = False
if os.path.isfile(self.test["path"]):
self.exists = True
with open(self.test["path"], "r") as f:
self.testdef = yaml.safe_load(f)
if self.testdef["metadata"]["format"].startswith(
"Manual Test Definition"
):
self.is_manual = True
if self.is_manual:
self.runner = ManualTestRun(test, args)
elif self.args.target is not None:
self.runner = RemoteTestRun(test, args)
else:
self.runner = AutomatedTestRun(test, args)
def definition(self):
with open("%s/testdef.yaml" % self.test["test_path"], "wb") as f:
f.write(yaml.dump(self.testdef, encoding="utf-8", allow_unicode=True))
def metadata(self):
with open("%s/testdef_metadata" % self.test["test_path"], "wb") as f:
f.write(
yaml.dump(
self.testdef["metadata"], encoding="utf-8", allow_unicode=True
)
)
def mkrun(self):
if not self.is_manual:
with open("%s/run.sh" % self.test["test_path"], "a") as f:
f.write("#!/bin/sh\n")
self.parameters = self.handle_parameters()
if self.parameters:
for line in self.parameters:
f.write(line)
f.write("set -e\n")
f.write("set -x\n")
f.write("export TESTRUN_ID=%s\n" % self.testdef["metadata"]["name"])
if self.args.target is None:
f.write("cd %s\n" % (self.test["test_path"]))
else:
f.write("cd %s\n" % (self.test["target_test_path"]))
f.write("UUID=`cat uuid`\n")
f.write('echo "<STARTRUN $TESTRUN_ID $UUID>"\n')
f.write(
"export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin\n"
)
steps = self.testdef["run"].get("steps", [])
if steps:
for step in steps:
command = step
if "--cmd" in step or "--shell" in step:
command = re.sub(r"\$(\d+)\b", r"\\$\1", step)
f.write("%s\n" % command)
f.write('echo "<ENDRUN $TESTRUN_ID $UUID>"\n')
os.chmod("%s/run.sh" % self.test["test_path"], 0o755)
def run(self):
self.runner.run()
def handle_parameters(self):
ret_val = ["###default parameters from test definition###\n"]
if "params" in self.testdef:
for def_param_name, def_param_value in list(self.testdef["params"].items()):
# ?'yaml_line'
if def_param_name == "yaml_line":
continue
ret_val.append("%s='%s'\n" % (def_param_name, def_param_value))
elif "parameters" in self.testdef:
for def_param_name, def_param_value in list(
self.testdef["parameters"].items()
):
if def_param_name == "yaml_line":
continue
ret_val.append("%s='%s'\n" % (def_param_name, def_param_value))
else:
return None
ret_val.append("######\n")
ret_val.append("###custom parameters from test plan###\n")
if self.custom_params:
for param_name, param_value in list(self.custom_params.items()):
if param_name == "yaml_line":
continue
ret_val.append("%s='%s'\n" % (param_name, param_value))
if self.skip_install:
ret_val.append('SKIP_INSTALL="True"\n')
ret_val.append("######\n")
ret_val.append("###custom parameters from command line###\n")
if self.args.test_def_params:
for param_name, param_value in self.args.test_def_params.items():
ret_val.append("%s='%s'\n" % (param_name, param_value))
ret_val.append("######\n")
return ret_val
class TestRun(object):
def __init__(self, test, args):
self.test = test
self.args = args
self.logger = logging.getLogger("RUNNER.TestRun")
self.test_timeout = self.args.timeout
if "timeout" in test:
self.test_timeout = test["timeout"]
def run(self):
raise NotImplementedError
def check_result(self):
raise NotImplementedError
class AutomatedTestRun(TestRun):
def run(self):
self.logger.info("Executing %s/run.sh" % self.test["test_path"])
shell_cmd = "%s/run.sh 2>&1 | tee %s/stdout.log" % (
self.test["test_path"],
self.test["test_path"],
)
self.child = pexpect.spawnu("/bin/sh", ["-c", shell_cmd])
self.check_result()
def check_result(self):
if self.test_timeout:
self.logger.info("Test timeout: %s" % self.test_timeout)
test_end = time.time() + self.test_timeout
while self.child.isalive():
if self.test_timeout and time.time() > test_end:
self.logger.warning(
"%s test timed out, killing test process..."
% self.test["test_uuid"]
)
self.child.terminate(force=True)
break
try:
self.child.expect("\r\n")
print(self.child.before)
except pexpect.TIMEOUT:
continue
except pexpect.EOF:
self.logger.info("%s test finished.\n" % self.test["test_uuid"])
break
class RemoteTestRun(AutomatedTestRun):
def copy_to_target(self):
os.chdir(self.test["test_path"])
tarball_name = "target-test-files.tar"
self.logger.info("Archiving test files")
run_command(
"tar -caf %s run.sh uuid automated/lib automated/bin automated/utils %s"
% (tarball_name, self.test["tc_relative_dir"])
)
self.logger.info("Creating test path")
run_command("mkdir -p %s" % (self.test["target_test_path"]), self.args.target)
self.logger.info("Copying test archive to target host")
run_command(
"scp %s ./%s %s:%s"
% (
SSH_PARAMS,
tarball_name,
self.args.target,
self.test["target_test_path"],
)
)
self.logger.info("Unarchiving test files on target")
run_command(
"cd %s && tar -xf %s" % (self.test["target_test_path"], tarball_name),
self.args.target,
)
self.logger.info("Removing test file archive from target")
run_command(
"rm %s/%s" % (self.test["target_test_path"], tarball_name), self.args.target
)
def cleanup_target(self):
self.logger.info("Removing files from target after testing")
run_command("rm -rf %s" % (self.test["target_test_path"]), self.args.target)
def run(self):
self.copy_to_target()
self.logger.info(
"Executing %s/run.sh remotely on %s"
% (self.test["target_test_path"], self.args.target)
)
shell_cmd = 'ssh %s %s "%s/run.sh 2>&1"' % (
SSH_PARAMS,
self.args.target,
self.test["target_test_path"],
)
self.logger.debug("shell_cmd: %s" % shell_cmd)
output = open("%s/stdout.log" % self.test["test_path"], "w")
self.child = pexpect.spawnu(shell_cmd)
self.child.logfile = output
self.check_result()
self.cleanup_target()
class ManualTestShell(cmd.Cmd):
def __init__(self, test_dict, result_path, test_case_id):
cmd.Cmd.__init__(self)
self.test_dict = test_dict
self.test_case_id = test_case_id
self.result_path = result_path
self.current_step_index = 0
self.steps = self.test_dict["run"]["steps"]
self.expected = self.test_dict["run"]["expected"]
self.prompt = "%s[%s] > " % (
self.test_dict["metadata"]["name"],
self.test_case_id,
)
self.result = None
self.intro = """
Welcome to manual test executor. Type 'help' for available commands.
This shell is meant to be executed on your computer, not on the system
under test. Please execute the steps from the test case, compare to
expected result and record the test result as 'pass' or 'fail'. If there
is an issue that prevents from executing the step, please record the result
as 'skip'.
"""
def do_quit(self, line):
"""
Exit test execution
"""
if self.result is not None:
return True
if line.find("-f") >= 0:
self._record_result("skip")
return True
print(
"Test result not recorded. Use -f to force. Forced quit records result as 'skip'"
)
do_EOF = do_quit
def do_description(self, line):
"""
Prints current test overall description
"""
print(self.test_dict["metadata"]["description"])
def do_steps(self, line):
"""
Prints all steps of the current test case
"""
for index, step in enumerate(self.steps):
print("%s. %s" % (index, step))
def do_expected(self, line):
"""
Prints all expected results of the current test case
"""
for index, expected in enumerate(self.expected):
print("%s. %s" % (index, expected))
def do_current(self, line):
"""
Prints current test step
"""
self._print_step()
do_start = do_current
def do_next(self, line):
"""
Prints next test step
"""
if len(self.steps) > self.current_step_index + 1:
self.current_step_index += 1
self._print_step()
def _print_step(self):
print("%s. %s" % (self.current_step_index, self.steps[self.current_step_index]))
def _record_result(self, result):
print("Recording %s in %s/stdout.log" % (result, self.result_path))
with open("%s/stdout.log" % self.result_path, "a") as f:
f.write(
"<LAVA_SIGNAL_TESTCASE TEST_CASE_ID=%s RESULT=%s>"
% (self.test_case_id, result)
)
def do_pass(self, line):
"""
Records PASS as test result
"""
self.result = "pass"
self._record_result(self.result)
return True
def do_fail(self, line):
"""
Records FAIL as test result
"""
self.result = "fail"
self._record_result(self.result)
return True
def do_skip(self, line):
"""
Records SKIP as test result
"""
self.result = "skip"
self._record_result(self.result)
return True
class ManualTestRun(TestRun, cmd.Cmd):
def run(self):
print(self.test["test_name"])
with open("%s/testdef.yaml" % self.test["test_path"], "r") as f:
self.testdef = yaml.safe_load(f)
if "name" in self.test:
test_case_id = self.test["name"]
else:
test_case_id = self.testdef["metadata"]["name"]
ManualTestShell(self.testdef, self.test["test_path"], test_case_id).cmdloop()
def check_result(self):
pass
def get_packages(linux_distribution, target=None):
"""Return a list of installed packages with versions
linux_distribution is a string that may be 'debian',
'ubuntu', 'centos', or 'fedora'.
For example (ubuntu):
'packages': ['acl-2.2.52-2',
'adduser-3.113+nmu3',
...
'zlib1g:amd64-1:1.2.8.dfsg-2+b1',
'zlib1g-dev:amd64-1:1.2.8.dfsg-2+b1']
(centos):
"packages": ["acl-2.2.51-12.el7",
"apr-1.4.8-3.el7",
...
"zlib-1.2.7-17.el7",
"zlib-devel-1.2.7-17.el7"
]
"""
logger = logging.getLogger("RUNNER.get_packages")
packages = []
if linux_distribution in ["debian", "ubuntu"]:
# Debian (apt) based system
packages = run_command(
"dpkg-query -W -f '${package}-${version}\n'", target
).splitlines()
elif linux_distribution in ["centos", "fedora"]:
# RedHat (rpm) based system
packages = run_command(
"rpm -qa --qf '%{NAME}-%{VERSION}-%{RELEASE}\n'", target
).splitlines()
else:
logger.warning(
"Unknown linux distribution '{}'; package list not populated.".format(
linux_distribution
)
)
packages.sort()
return packages
def get_environment(target=None, skip_collection=False):
"""Return a dictionary with environmental information
target: optional ssh host string to gather environment remotely.
skip_collection: Skip data collection and return an empty dictionary.
For example (on a HiSilicon D03):
{
"bios_version": "Hisilicon D03 UEFI 16.12 Release",
"board_name": "D03",
"board_vendor": "Huawei",
"kernel": "4.9.0-20.gitedc2a1c.linaro.aarch64",
"linux_distribution": "centos",
"linux_distribution_version": "6.2",
"packages": [
"GeoIP-1.5.0-11.el7",
"NetworkManager-1.4.0-20.el7_3",
...
"yum-plugin-fastestmirror-1.1.31-40.el7",
"zlib-1.2.7-17.el7"
],
"uname": "Linux localhost.localdomain 4.9.0-20.gitedc2a1c.linaro.aarch64 #1 SMP Wed Dec 14 17:50:15 UTC 2016 aarch64 aarch64 aarch64 GNU/Linux"
}
"""
environment = {}
if skip_collection:
return environment
try:
environment["linux_distribution"] = (
run_command("grep ^ID= /etc/os-release", target)
.split("=")[-1]
.strip('"')
.lower()
)
except subprocess.CalledProcessError:
environment["linux_distribution"] = ""
try:
environment["linux_distribution_version"] = (
run_command("grep ^VERSION= /etc/os-release", target)
.split("=")[-1]
.strip('"')
)
except subprocess.CalledProcessError:
environment["linux_distribution_version"] = ""
try:
environment["linux_distribution_version_id"] = (
run_command("grep ^VERSION_ID= /etc/os-release", target)
.split("=")[-1]
.strip('"')
)
except subprocess.CalledProcessError:
environment["linux_distribution_version_id"] = ""
try:
environment["linux_distribution_version_codename"] = (
run_command("grep ^VERSION_CODENAME= /etc/os-release", target)
.split("=")[-1]
.strip('"')
)
except subprocess.CalledProcessError:
environment["linux_distribution_version_codename"] = ""
try:
environment["kernel"] = run_command("uname -r", target)
except subprocess.CalledProcessError:
environment["kernel"] = ""
try:
environment["uname"] = run_command("uname -a", target)
except subprocess.CalledProcessError:
environment["uname"] = ""
try:
environment["bios_version"] = run_command(
"cat /sys/devices/virtual/dmi/id/bios_version", target
)
except subprocess.CalledProcessError:
environment["bios_version"] = ""
try:
environment["board_vendor"] = run_command(
"cat /sys/devices/virtual/dmi/id/board_vendor", target
)
except subprocess.CalledProcessError:
environment["board_vendor"] = ""
try:
environment["board_name"] = run_command(
"cat /sys/devices/virtual/dmi/id/board_name", target
)
except subprocess.CalledProcessError:
environment["board_name"] = ""
try:
environment["packages"] = get_packages(
environment["linux_distribution"], target
)
except subprocess.CalledProcessError:
environment["packages"] = []
return environment
class ResultParser(object):
def __init__(self, test, args):
self.test = test
self.args = args
self.metrics = []
self.results = {}
self.results["test"] = test["test_name"]
self.results["id"] = test["test_uuid"]
self.results["test_plan"] = args.test_plan
self.results["environment"] = get_environment(
target=self.args.target, skip_collection=self.args.skip_environment
)
self.logger = logging.getLogger("RUNNER.ResultParser")
self.results["params"] = {}
self.pattern = None
self.fixup = None
self.qa_reports_server = args.qa_reports_server
if args.qa_reports_token is not None:
self.qa_reports_token = args.qa_reports_token
else:
self.qa_reports_token = os.environ.get(
"QA_REPORTS_TOKEN", get_token_from_netrc(self.qa_reports_server)
)
self.qa_reports_project = args.qa_reports_project
self.qa_reports_group = args.qa_reports_group
self.qa_reports_env = args.qa_reports_env
self.qa_reports_build_version = args.qa_reports_build_version
self.qa_reports_disable_metadata = args.qa_reports_disable_metadata
self.qa_reports_metadata = args.qa_reports_metadata
self.qa_reports_metadata_file = args.qa_reports_metadata_file
with open(os.path.join(self.test["test_path"], "testdef.yaml"), "r") as f:
self.testdef = yaml.safe_load(f)
self.results["name"] = ""
if (
"metadata" in self.testdef.keys()
and "name" in self.testdef["metadata"].keys()
):
self.results["name"] = self.testdef["metadata"]["name"]
if "params" in self.testdef.keys():
self.results["params"] = self.testdef["params"]
if self.args.test_def_params:
for param_name, param_value in self.args.test_def_params.items():
self.results["params"][param_name] = param_value
if (
"parse" in self.testdef.keys()
and "pattern" in self.testdef["parse"].keys()
):
self.pattern = self.testdef["parse"]["pattern"]
self.logger.info("Enabling log parse pattern: %s" % self.pattern)
if "fixupdict" in self.testdef["parse"].keys():
self.fixup = self.testdef["parse"]["fixupdict"]
self.logger.info(
"Enabling log parse pattern fixup: %s" % self.fixup
)
if "parameters" in test.keys():
self.results["params"].update(test["parameters"])
if "params" in test.keys():
self.results["params"].update(test["params"])
if "version" in test.keys():
self.results["version"] = test["version"]
else:
path = os.getcwd()
os.chdir(self.test["test_path"])
if sys.version_info[0] < 3:
test_version = subprocess.check_output("git rev-parse HEAD", shell=True)
else:
test_version = subprocess.check_output(
"git rev-parse HEAD", shell=True
).decode("utf-8")
self.results["version"] = test_version.rstrip()
os.chdir(path)
self.lava_run = args.lava_run
if self.lava_run and not find_executable("lava-test-case"):
self.logger.info(
"lava-test-case not found, '-l' or '--lava_run' option ignored'"
)
self.lava_run = False
def run(self):
self.parse_stdout()
if self.pattern:
self.parse_pattern()
# If 'metrics' is empty, add 'no-result-found fail'.
if not self.metrics:
self.metrics = [
{
"test_case_id": "no-result-found",
"result": "fail",
"measurement": "",
"units": "",
}
]
self.results["metrics"] = self.metrics
self.dict_to_json()
self.dict_to_csv()
self.send_to_qa_reports()
self.send_to_fiotest()
self.logger.info("Result files saved to: %s" % self.test["test_path"])
print("--- Printing result.csv ---")
with open("%s/result.csv" % self.test["test_path"]) as f:
print(f.read())
def parse_stdout(self):
with open("%s/stdout.log" % self.test["test_path"], "r") as f:
test_case_re = re.compile("TEST_CASE_ID=(.*)")
result_re = re.compile("RESULT=(.*)")
measurement_re = re.compile("MEASUREMENT=(.*)")
units_re = re.compile("UNITS=(.*)")
for line in f:
if re.match(r"\<(|LAVA_SIGNAL_TESTCASE )TEST_CASE_ID=.*", line):
line = line.strip("\n").strip("\r").strip("<>").split(" ")
data = {
"test_case_id": "",
"result": "",
"measurement": "",
"units": "",
}
for string in line:
test_case_match = test_case_re.match(string)
result_match = result_re.match(string)
measurement_match = measurement_re.match(string)
units_match = units_re.match(string)
if test_case_match:
data["test_case_id"] = test_case_match.group(1)
if result_match:
data["result"] = result_match.group(1)
if measurement_match:
try:
data["measurement"] = float(measurement_match.group(1))
except ValueError as e:
pass
if units_match:
data["units"] = units_match.group(1)
self.metrics.append(data.copy())
if self.lava_run:
self.send_to_lava(data)
def parse_pattern(self):
with open("%s/stdout.log" % self.test["test_path"], "r") as f:
rex_pattern = re.compile(r"%s" % self.pattern)
for line in f:
data = {}
m = rex_pattern.search(line)
if m:
data = m.groupdict()
for x in ["measurement", "units"]:
if x not in data:
data[x] = ""
if self.fixup and data["result"] in self.fixup:
data["result"] = self.fixup[data["result"]]
self.metrics.append(data.copy())
if self.lava_run:
self.send_to_lava(data)
def send_to_lava(self, data):
cmd = "lava-test-case {} --result {}".format(
data["test_case_id"], data["result"]
)
if data["measurement"]:
cmd = "{} --measurement {} --units {}".format(
cmd, data["measurement"], data["units"]
)
self.logger.debug("lava-run: cmd: {}".format(cmd))
subprocess.call(shlex.split(cmd))
def send_to_fiotest(self):
"""
This method saves results as filesystem tree. This is required by
fiotest: https://github.com/foundriesio/fiotest/
"""
# check if TEST_DIR variable is set
test_path = os.environ.get("TEST_DIR")
if not test_path:
self.logger.debug("TEST_DIR is not set")
self.logger.debug("NOT reporting result to fiotest")
return
# create directory with test name
try:
for metric in self.metrics:
local_ts = datetime.now()
dir_name = "{}-{}".format(local_ts.timestamp(), metric["test_case_id"])
os.makedirs(os.path.join(test_path, dir_name), exist_ok=True)
if metric["measurement"] != "":
metrics_dir = os.path.join(test_path, dir_name, "metrics")
os.makedirs(metrics_dir, exist_ok=True)
with open(os.path.join(metrics_dir, "value"), "w") as value_file:
value_file.write(metric["measurement"])
else:
if metric["result"] == "fail":
os.makedirs(
os.path.join(test_path, dir_name, "failed"), exist_ok=True
)
if metric["result"] == "skip":
os.makedirs(
os.path.join(test_path, dir_name, "skipped"), exist_ok=True
)
except PermissionError:
self.logger.error(
"Unable to prepare fiotest results due to lack of permissions"
)
def send_to_qa_reports(self):
if None in (
self.qa_reports_server,
self.qa_reports_token,
self.qa_reports_group,
self.qa_reports_project,
self.qa_reports_build_version,
self.qa_reports_env,
):
self.logger.warning(
"All parameters for qa reports are not set, results will not be pushed to qa reports"
)
return
SquadApi.configure(url=self.qa_reports_server, token=self.qa_reports_token)
tests = {}
metrics = {}
for metric in self.metrics:
if metric["measurement"] != "":
metrics[
"{}/{}".format(self.test["test_name"], metric["test_case_id"])
] = metric["measurement"]
else:
tests[
"{}/{}".format(self.test["test_name"], metric["test_case_id"])
] = metric["result"]
with open("{}/stdout.log".format(self.test["test_path"]), "r") as logfile:
log = logfile.read()
metadata = {}
if not self.qa_reports_disable_metadata:
if self.qa_reports_metadata:
metadata.update(self.qa_reports_metadata)
if self.qa_reports_metadata_file:
try:
with open(self.qa_reports_metadata_file, "r") as metadata_file:
loaded_metadata = yaml.load(
metadata_file, Loader=yaml.SafeLoader
)
# check if loaded metadata is key=value and both are strings
for key, value in loaded_metadata.items():
if type(key) == str and type(value) == str:
# only update metadata with simple keys
# ignore all other items in the dictionary
metadata.update({key: value})
else:
self.logger.warning("Ignoring key: %s" % key)
except FileNotFoundError:
self.logger.warning("Metadata file not found")
except PermissionError:
self.logger.warning(
"Insufficient permissions to open metadata file"
)
if submit_results(
group_project_slug="{}/{}".format(
self.qa_reports_group, self.qa_reports_project
),
build_version=self.qa_reports_build_version,
env_slug=self.qa_reports_env,
tests=tests,
metrics=metrics,
log=log,
metadata=metadata,
attachments=None,
):
self.logger.info("Results pushed to QA Reports")
else:
self.logger.warning("Results upload to QA Reports failed!")
def dict_to_json(self):
# Save test results to output/test_id/result.json
with open("%s/result.json" % self.test["test_path"], "w") as f:
json.dump([self.results], f, indent=4)
if not self.args.cleanup:
# Collect test results of all tests in output/result.json
feeds = []
if os.path.isfile("%s/result.json" % self.test["output"]):
with open("%s/result.json" % self.test["output"], "r") as f:
feeds = json.load(f)
feeds.append(self.results)
with open("%s/result.json" % self.test["output"], "w") as f:
json.dump(feeds, f, indent=4)
def dict_to_csv(self):
# Convert dict self.results['params'] to a string.
test_params = ""
if self.results["params"]:
params_dict = self.results["params"]
test_params = ";".join(["%s=%s" % (k, v) for k, v in params_dict.items()])
for metric in self.results["metrics"]:
metric["name"] = self.results["name"]
metric["test_params"] = test_params
# Save test results to output/test_id/result.csv
fieldnames = [
"name",
"test_case_id",
"result",
"measurement",
"units",
"test_params",
]
with open("%s/result.csv" % self.test["test_path"], "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for metric in self.results["metrics"]:
writer.writerow(metric)
if not self.args.cleanup:
# Collect test results of all tests in output/result.csv
if not os.path.isfile("%s/result.csv" % self.test["output"]):
with open("%s/result.csv" % self.test["output"], "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
with open("%s/result.csv" % self.test["output"], "a") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
for metric in self.results["metrics"]:
writer.writerow(metric)
def get_token_from_netrc(qa_reports_server):
if qa_reports_server is None:
return
parse = urlparse(qa_reports_server)
netrc_local = netrc.netrc()
authTokens = netrc_local.authenticators("{}".format(parse.netloc))
if authTokens is not None:
hostname, username, authToken = authTokens
return authToken
# Unable to find Token hence returning None
return
def get_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
"-o",
"--output",
default=os.getenv("HOME", "") + "/output",
dest="output",
help=textwrap.dedent(
"""\
specify a directory to store test and result files.
Default: $HOME/output
"""
),
)
parser.add_argument(
"-p",
"--test_plan",
default=None,
dest="test_plan",
help=textwrap.dedent(
"""\
specify a test plan file which has tests and related
params listed in yaml format.
"""
),
)
parser.add_argument(
"-d",
"--test_def",
default=None,
dest="test_def",
help=textwrap.dedent(
"""\
base on test definition repo location, specify relative
path to the test definition to run.
Format example: "ubuntu/smoke-tests-basic.yaml"
"""
),
)
parser.add_argument(
"-r",
"--test_def_params",
default={},
dest="test_def_params",
action=StoreDictKeyPair,
nargs="+",
metavar="KEY=VALUE",
help=textwrap.dedent(
"""\
Set additional parameters when using test definition without
a test plan. The name values are set similarily to environment
variables:
--test_def_params KEY1=VALUE1 KEY2=VALUE2 ...
"""
),
)
parser.add_argument(
"-k",
"--kind",
default="automated",
dest="kind",
choices=["automated", "manual"],
help=textwrap.dedent(
"""\
Selects type of tests to be executed from the test plan.
Possible options: automated, manual
"""
),
)
parser.add_argument(
"-t",
"--timeout",
type=int,
default=None,
dest="timeout",
help="Specify test timeout",
)
parser.add_argument(
"-g",
"--target",
default=None,
dest="target",
help=textwrap.dedent(
"""\
Specify SSH target to execute tests.
Format: user@host
Note: ssh authentication must be paswordless
"""
),
)
parser.add_argument(
"-s",
"--skip_install",
dest="skip_install",
default=False,
action="store_true",
help="skip install section defined in test definition.",
)
parser.add_argument(
"-e",
"--skip_environment",
dest="skip_environment",
default=False,
action="store_true",
help="skip environmental data collection (board name, distro, etc)",
)
parser.add_argument(
"-l",
"--lava_run",
dest="lava_run",
default=False,
action="store_true",
help="send test result to LAVA with lava-test-case.",
)
parser.add_argument(
"-O",
"--overlay",
default=None,
dest="overlay",
help=textwrap.dedent(
"""\
Specify test plan overlay file to:
* skip tests
* amend test parameters
* add new tests
"""
),
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
dest="verbose",
default=False,
help="Set log level to DEBUG.",
)
parser.add_argument(
"--qa-reports-server",
dest="qa_reports_server",
default=None,
help="qa reports server where the results have to be sent",
)
parser.add_argument(
"--qa-reports-token",
dest="qa_reports_token",
default=None,
help="qa reports token to upload the results to qa_reports_server",
)
parser.add_argument(
"--qa-reports-project",
dest="qa_reports_project",
default=None,
help="qa reports projects to which the results have to be uploaded",
)
parser.add_argument(
"--qa-reports-group",
dest="qa_reports_group",
default=None,
help="qa reports group in which the results have to be stored",
)
parser.add_argument(
"--qa-reports-env",
dest="qa_reports_env",
default=None,
help="qa reports environment for the results that have to be stored",
)
parser.add_argument(
"--qa-reports-build-version",
dest="qa_reports_build_version",
default=None,
help="qa reports build id for the result set",
)
parser.add_argument(
"--qa-reports-disable-metadata",
dest="qa_reports_disable_metadata",
default=False,
action="store_true",
help="Disable sending metadata to SQUAD. Default: false",
)
parser.add_argument(
"--qa-reports-metadata",
dest="qa_reports_metadata",
default={},
action=StoreDictKeyPair,
nargs="+",
metavar="KEY=VALUE",
help="List of metadata key=value pairs to be sent to SQUAD",
)
parser.add_argument(
"--qa-reports-metadata-file",
dest="qa_reports_metadata_file",
default=None,
help="YAML file that defines metadata to be reported to SQUAD",
)
parser.add_argument(
"--cleanup",
dest="cleanup",
default=False,
action="store_true",
help=textwrap.dedent(
"""\
If set to true, test-runner will remove all temporary files after
running the test. It includes all collected logs and test results. This
option should only be used if uploading results to SQUAD or LAVA.
Default: false
"""
),
)
args = parser.parse_args()
return args
def main():
args = get_args()
# Setup logger.
logger = logging.getLogger("RUNNER")
logger.setLevel(logging.INFO)
if args.verbose:
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s: %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.debug("Test job arguments: %s" % args)
if args.kind != "manual" and args.target is None:
if os.geteuid() != 0:
logger.error("Sorry, you need to run this as root")
sys.exit(1)
# Validate target argument format and connectivity.
if args.target:
rex = re.compile(".+@.+")
if not rex.match(args.target):
logger.error("Usage: -g username@host")
sys.exit(1)
if pexpect.which("ssh") is None:
logger.error("openssh client must be installed on the host.")
sys.exit(1)
try:
run_command("exit", args.target)
except subprocess.CalledProcessError as e:
logger.error("ssh login failed.")
print(e)
sys.exit(1)
# Generate test plan.
test_plan = TestPlan(args)
test_list = test_plan.test_list(args.kind)
logger.info("Tests to run:")
for test in test_list:
print(test)
# Run tests.
for test in test_list:
# Set and save test params to test dictionary.
test["test_name"] = os.path.splitext(test["path"].split("/")[-1])[0]
test["test_uuid"] = "%s_%s" % (test["test_name"], test["uuid"])
test["output"] = os.path.realpath(args.output)
if args.target is not None and "-o" not in sys.argv:
test["output"] = os.path.join(test["output"], args.target)
test["test_path"] = os.path.join(test["output"], test["test_uuid"])
if args.target is not None:
# Get relative directory path of yaml file for partial file copy.
# '-d' takes any relative paths to the yaml file, so get the realpath first.
tc_realpath = os.path.realpath(test["path"])
tc_dirname = os.path.dirname(tc_realpath)
test["tc_relative_dir"] = "%s%s" % (
args.kind,
tc_dirname.split(args.kind)[1],
)
target_user_home = run_command("echo $HOME", args.target)
test["target_test_path"] = "%s/output/%s" % (
target_user_home,
test["test_uuid"],
)
logger.debug("Test parameters: %s" % test)
# Create directories and copy files needed.
setup = TestSetup(test, args)
setup.create_dir()
setup.copy_test_repo()
setup.checkout_version()
setup.create_uuid_file()
# Convert test definition.
test_def = TestDefinition(test, args)
if test_def.exists:
test_def.definition()
test_def.metadata()
test_def.mkrun()
# Run test.
test_def.run()
# Parse test output, save results in json and csv format.
result_parser = ResultParser(test, args)
result_parser.run()
if args.cleanup:
# remove a copy of test-definitions
logger.warning("Removing a copy of test-definitions")
logger.warning("Removing all collected logs")
shutil.rmtree(test["test_path"])
else:
logger.warning("Requested test definition %s doesn't exist" % test["path"])
if __name__ == "__main__":
main()