Linting: Black
Lets enforce black across the repo. This change fixes the formatting to
black standards to pass the CI test.
Signed-off-by: Benjamin Copeland <ben.copeland@linaro.org>
diff --git a/automated/android/apk-automation/andebenchpro2015.py b/automated/android/apk-automation/andebenchpro2015.py
index 43b7210..1c2d655 100755
--- a/automated/android/apk-automation/andebenchpro2015.py
+++ b/automated/android/apk-automation/andebenchpro2015.py
@@ -7,39 +7,41 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "andebench-pro_2015.apk"
- self.config['apk_package'] = "com.eembc.andebench"
- self.config['activity'] = "com.eembc.andebench/.splash"
+ self.config["apk_file_name"] = "andebench-pro_2015.apk"
+ self.config["apk_package"] = "com.eembc.andebench"
+ self.config["activity"] = "com.eembc.andebench/.splash"
super(ApkRunnerImpl, self).__init__(self.config)
def setUp(self):
- self.call_adb('shell setenforce 0')
+ self.call_adb("shell setenforce 0")
super(ApkRunnerImpl, self).setUp()
def tearDown(self):
- self.call_adb('shell setenforce 1')
+ self.call_adb("shell setenforce 1")
super(ApkRunnerImpl, self).tearDown()
def parseResult(self):
- local_result_csv = "%s/andebench.log.csv" % self.config['output']
+ local_result_csv = "%s/andebench.log.csv" % self.config["output"]
remote_result_csv = "/mnt/sdcard/Download/andebench.log.csv"
self.call_adb("pull %s %s" % (remote_result_csv, local_result_csv))
- test_items = ["CoreMark-PRO (Base)",
- "CoreMark-PRO (Peak)",
- "Memory Bandwidth",
- "Memory Latency",
- "Storage",
- "Platform",
- "3D",
- "Overall Score",
- "Verify"]
+ test_items = [
+ "CoreMark-PRO (Base)",
+ "CoreMark-PRO (Peak)",
+ "Memory Bandwidth",
+ "Memory Latency",
+ "Storage",
+ "Platform",
+ "3D",
+ "Overall Score",
+ "Verify",
+ ]
pat_score = re.compile(r"^(?P<measurement>[\d\.]+)$")
pat_score_unit_str = r"^(?P<measurement>[\d\.]+)(?P<units>[^\d\.]+)$"
pat_score_unit = re.compile(pat_score_unit_str)
- with open(local_result_csv, 'r') as f:
+ with open(local_result_csv, "r") as f:
for line in f.readlines():
fields = line.split(",")
if fields[0] not in test_items:
@@ -49,8 +51,7 @@
test_name = fields[0].strip()
measurement = fields[1].strip()
elif len(fields) == 3:
- test_name = "_".join([fields[0].strip(),
- fields[1].strip()])
+ test_name = "_".join([fields[0].strip(), fields[1].strip()])
measurement = fields[2].strip()
else:
# not possible here
@@ -58,42 +59,44 @@
pass
test_name = test_name.replace(" ", "_")
- test_name = test_name.replace('(', '').replace(")", "")
+ test_name = test_name.replace("(", "").replace(")", "")
match = pat_score.match(measurement)
if not match:
match = pat_score_unit.match(measurement)
if not match:
- self.report_result("andebenchpro2015-%s" % test_name,
- "fail")
+ self.report_result("andebenchpro2015-%s" % test_name, "fail")
else:
data = match.groupdict()
- measurement = data.get('measurement')
+ measurement = data.get("measurement")
units = data.get("units")
if units is None:
units = "points"
- self.report_result("andebenchpro2015-%s" % test_name,
- "pass", measurement, units)
+ self.report_result(
+ "andebenchpro2015-%s" % test_name, "pass", measurement, units
+ )
def execute(self):
# Enable 64-bit
time.sleep(10)
self.dump_always()
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if continue_btn:
continue_btn.touch()
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
self.dump_always()
- btn_license = self.vc.findViewWithText(u'I Agree')
+ btn_license = self.vc.findViewWithText(u"I Agree")
if btn_license:
btn_license.touch()
@@ -106,7 +109,7 @@
item.touch()
time.sleep(3)
self.dump_always()
- item = self.vc.findViewWithText(u'Options')
+ item = self.vc.findViewWithText(u"Options")
if item:
item.touch()
time.sleep(3)
@@ -115,17 +118,18 @@
opt_expandableListView1 = self.vc.findViewByIdOrRaise(opt_str)
if opt_expandableListView1:
for sub in opt_expandableListView1.children:
- if not self.vc.findViewWithText(u'Memory', sub):
+ if not self.vc.findViewWithText(u"Memory", sub):
cbx1_str = "com.eembc.andebench:id/cbx1"
self.vc.findViewByIdOrRaise(cbx1_str, sub).touch()
time.sleep(3)
self.dump_always()
self.vc.findViewByIdOrRaise(
- "com.eembc.andebench:id/ab_icon").touch()
+ "com.eembc.andebench:id/ab_icon"
+ ).touch()
time.sleep(3)
self.dump_always()
- self.vc.findViewWithTextOrRaise(u'Home').touch()
+ self.vc.findViewWithTextOrRaise(u"Home").touch()
while True:
try:
@@ -148,13 +152,13 @@
self.dump_always()
self.vc.findViewWithTextOrRaise("DEVICE SCORE")
- self.vc.findViewWithTextOrRaise(u'3D').touch()
- self.vc.findViewWithTextOrRaise(u'Platform').touch()
- self.vc.findViewWithTextOrRaise(u'Storage').touch()
- self.vc.findViewWithTextOrRaise(u'Memory Latency').touch()
- self.vc.findViewWithTextOrRaise(u'Memory Bandwidth').touch()
- self.vc.findViewWithTextOrRaise(u'CoreMark-PRO (Peak)').touch()
- self.vc.findViewWithTextOrRaise(u'CoreMark-PRO (Base)').touch()
+ self.vc.findViewWithTextOrRaise(u"3D").touch()
+ self.vc.findViewWithTextOrRaise(u"Platform").touch()
+ self.vc.findViewWithTextOrRaise(u"Storage").touch()
+ self.vc.findViewWithTextOrRaise(u"Memory Latency").touch()
+ self.vc.findViewWithTextOrRaise(u"Memory Bandwidth").touch()
+ self.vc.findViewWithTextOrRaise(u"CoreMark-PRO (Peak)").touch()
+ self.vc.findViewWithTextOrRaise(u"CoreMark-PRO (Base)").touch()
find_result = True
except ViewNotFoundException:
pass
diff --git a/automated/android/apk-automation/antutu6.py b/automated/android/apk-automation/antutu6.py
index 2ff4f53..d462e82 100755
--- a/automated/android/apk-automation/antutu6.py
+++ b/automated/android/apk-automation/antutu6.py
@@ -7,9 +7,9 @@
self.config = config
self.apk_3d_name = "antutu_benchmark_v6_3d_f1.apk"
self.apk_3d_pkg = "com.antutu.benchmark.full"
- self.config['apk_file_name'] = "AnTuTu6.0.4.apk"
- self.config['apk_package'] = "com.antutu.ABenchMark"
- self.config['activity'] = "com.antutu.ABenchMark/.ABenchMarkStart"
+ self.config["apk_file_name"] = "AnTuTu6.0.4.apk"
+ self.config["apk_package"] = "com.antutu.ABenchMark"
+ self.config["activity"] = "com.antutu.ABenchMark/.ABenchMarkStart"
super(ApkRunnerImpl, self).__init__(self.config)
def setUp(self):
@@ -23,12 +23,18 @@
self.uninstall_apk(self.apk_3d_pkg)
def parseResult(self):
- test_items = [u'3D', u'UX', u'CPU', u'RAM']
+ test_items = [u"3D", u"UX", u"CPU", u"RAM"]
test_subitems = {
- u'3D': [u'3D [Garden]', u'3D [Marooned]'],
- u'UX': [u'UX Data Secure', u'UX Data process', u'UX Strategy games', u'UX Image process', u'UX I/O performance'],
- u'CPU': [u'CPU Mathematics', u'CPU Common Use', u'CPU Multi-Core'],
- u'RAM': []
+ u"3D": [u"3D [Garden]", u"3D [Marooned]"],
+ u"UX": [
+ u"UX Data Secure",
+ u"UX Data process",
+ u"UX Strategy games",
+ u"UX Image process",
+ u"UX I/O performance",
+ ],
+ u"CPU": [u"CPU Mathematics", u"CPU Common Use", u"CPU Multi-Core"],
+ u"RAM": [],
}
antutu_sum = 0
for item in test_items:
@@ -43,7 +49,7 @@
else:
self.dump_always()
self.logger.info("Press DPAD_DOWN to find %s item" % item)
- self.device.press('DPAD_DOWN')
+ self.device.press("DPAD_DOWN")
time.sleep(2)
self.logger.info("Trying to find the score value for test suite: %s" % item)
@@ -51,51 +57,73 @@
while not found_view:
self.dump_always()
id_root = self.vc.findViewWithText(item)
- score_view = self.vc.findViewById("com.antutu.ABenchMark:id/tv_score_value",
- id_root.getParent())
+ score_view = self.vc.findViewById(
+ "com.antutu.ABenchMark:id/tv_score_value", id_root.getParent()
+ )
if score_view:
score = score_view.getText().strip()
self.logger.info("Found %s score: %s" % (item, score))
try:
score = int(score)
- self.report_result('antutu6-%s' % item.lower(), 'pass', score, 'points')
+ self.report_result(
+ "antutu6-%s" % item.lower(), "pass", score, "points"
+ )
antutu_sum = antutu_sum + int(score)
except ValueError:
- self.report_result('antutu6-%s' % item.lower(), 'fail')
+ self.report_result("antutu6-%s" % item.lower(), "fail")
found_view = True
- arrow_icon = self.vc.findViewById("com.antutu.ABenchMark:id/iv_arrow", id_root.getParent())
+ arrow_icon = self.vc.findViewById(
+ "com.antutu.ABenchMark:id/iv_arrow", id_root.getParent()
+ )
if arrow_icon:
arrow_icon.touch()
else:
- self.logger.info("Press DPAD_DOWN to find %s item value" % item.lower())
- self.device.press('DPAD_DOWN')
+ self.logger.info(
+ "Press DPAD_DOWN to find %s item value" % item.lower()
+ )
+ self.device.press("DPAD_DOWN")
time.sleep(2)
for sub_item in test_subitems[item]:
- self.logger.info("Trying to find score value for sub item: %s" % sub_item)
+ self.logger.info(
+ "Trying to find score value for sub item: %s" % sub_item
+ )
found_view = False
while not found_view:
self.dump_always()
subitem_obj = self.vc.findViewWithText(sub_item)
if subitem_obj:
- subitem_value_obj = self.vc.findViewByIdOrRaise("com.antutu.ABenchMark:id/tv_value", subitem_obj.getParent())
- subitem_key = sub_item.replace("[", '').replace("]", '')
- subitem_key = subitem_key.replace("/", '')
- subitem_key = subitem_key.replace(' ', '-')
+ subitem_value_obj = self.vc.findViewByIdOrRaise(
+ "com.antutu.ABenchMark:id/tv_value", subitem_obj.getParent()
+ )
+ subitem_key = sub_item.replace("[", "").replace("]", "")
+ subitem_key = subitem_key.replace("/", "")
+ subitem_key = subitem_key.replace(" ", "-")
subitem_score = subitem_value_obj.getText().strip()
- self.logger.info("Found %s score: %s" % (subitem_key, subitem_score))
+ self.logger.info(
+ "Found %s score: %s" % (subitem_key, subitem_score)
+ )
try:
subitem_score = int(subitem_score)
- self.report_result('antutu6-%s' % subitem_key.lower(), 'pass', subitem_score, 'points')
+ self.report_result(
+ "antutu6-%s" % subitem_key.lower(),
+ "pass",
+ subitem_score,
+ "points",
+ )
except ValueError:
- self.report_result('antutu6-%s' % subitem_key.lower(), 'fail')
+ self.report_result(
+ "antutu6-%s" % subitem_key.lower(), "fail"
+ )
found_view = True
else:
- self.logger.info("Press DPAD_DOWN to find sub item: %s" % sub_item)
- self.device.press('DPAD_DOWN')
+ self.logger.info(
+ "Press DPAD_DOWN to find sub item: %s" % sub_item
+ )
+ self.device.press("DPAD_DOWN")
time.sleep(2)
- self.report_result('antutu6-sum', 'pass', antutu_sum, 'points')
+ self.report_result("antutu6-sum", "pass", antutu_sum, "points")
def execute(self):
# Enable 64-bit
@@ -104,41 +132,46 @@
finished = False
while not finished:
self.dump_always()
- test_region = self.vc.findViewById("com.antutu.ABenchMark:"
- "id/start_test_region")
+ test_region = self.vc.findViewById(
+ "com.antutu.ABenchMark:" "id/start_test_region"
+ )
if test_region:
test_region.touch()
time.sleep(30)
self.dump_always()
- text_qr_code = self.vc.findViewWithText(u'QRCode of result')
+ text_qr_code = self.vc.findViewWithText(u"QRCode of result")
if text_qr_code:
finished = True
self.logger.info("Benchmark test finished!")
- stop_msg = 'Unfortunately, AnTuTu 3DBench has stopped.'
+ stop_msg = "Unfortunately, AnTuTu 3DBench has stopped."
msg_stopped = self.vc.findViewWithText(stop_msg)
if msg_stopped:
- btn_ok = self.vc.findViewWithTextOrRaise(u'OK') # nopep8
+ btn_ok = self.vc.findViewWithTextOrRaise(u"OK") # nopep8
btn_ok.touch()
# cancel the update
update_window = self.vc.findViewWithText("New update available")
- need_permission_msg = self.vc.findViewWithText("Please allow the permissions we need for test")
- allow_permission_btn = self.vc.findViewWithText('ALLOW')
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ need_permission_msg = self.vc.findViewWithText(
+ "Please allow the permissions we need for test"
+ )
+ allow_permission_btn = self.vc.findViewWithText("ALLOW")
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if update_window:
- btn_cancel = self.vc.findViewWithTextOrRaise(u'Cancel')
+ btn_cancel = self.vc.findViewWithTextOrRaise(u"Cancel")
btn_cancel.touch()
elif need_permission_msg:
- btn_ok = self.vc.findViewWithTextOrRaise(u'OK')
+ btn_ok = self.vc.findViewWithTextOrRaise(u"OK")
btn_ok.touch()
elif allow_permission_btn:
allow_permission_btn.touch()
elif warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
diff --git a/automated/android/apk-automation/benchmarkpi.py b/automated/android/apk-automation/benchmarkpi.py
index e1d444b..a29a8ae 100755
--- a/automated/android/apk-automation/benchmarkpi.py
+++ b/automated/android/apk-automation/benchmarkpi.py
@@ -7,9 +7,9 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "gr.androiddev.BenchmarkPi-1.apk"
- self.config['apk_package'] = "gr.androiddev.BenchmarkPi"
- self.config['activity'] = "gr.androiddev.BenchmarkPi/.BenchmarkPi"
+ self.config["apk_file_name"] = "gr.androiddev.BenchmarkPi-1.apk"
+ self.config["apk_package"] = "gr.androiddev.BenchmarkPi"
+ self.config["activity"] = "gr.androiddev.BenchmarkPi/.BenchmarkPi"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
@@ -17,16 +17,20 @@
while not find_start_btn:
time.sleep(2)
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
else:
- start_button = self.vc.findViewByIdOrRaise("gr.androiddev.BenchmarkPi:id/Button01")
+ start_button = self.vc.findViewByIdOrRaise(
+ "gr.androiddev.BenchmarkPi:id/Button01"
+ )
start_button.touch()
find_start_btn = True
@@ -34,17 +38,19 @@
while not finished:
time.sleep(1)
try:
- self.vc.dump(window='-1')
+ self.vc.dump(window="-1")
self.vc.findViewByIdOrRaise("android:id/message")
finished = True
except ViewNotFoundException:
pass
except RuntimeError as e:
self.logger.error(e)
- self.logger.info('benchmark pi finished')
+ self.logger.info("benchmark pi finished")
def parseResult(self):
- return_text = self.vc.findViewByIdOrRaise("android:id/message").getText().split(" ")
+ return_text = (
+ self.vc.findViewByIdOrRaise("android:id/message").getText().split(" ")
+ )
flagwordA = "calculated"
flagwordB = "Pi"
@@ -53,7 +59,9 @@
if return_text.index(flagwordB) == return_text.index(flagwordA) + 1:
score_number = return_text[return_text.index(flagwordA) + 3]
score_unit = return_text[return_text.index(flagwordA) + 4].split("!")[0]
- self.logger.info('Valid test result found: %s %s' % (score_number, score_unit))
+ self.logger.info(
+ "Valid test result found: %s %s" % (score_number, score_unit)
+ )
run_result = "pass"
else:
self.logger.error("Output string changed, parser need to be updated!")
@@ -62,4 +70,4 @@
self.logger.error("Can not find keyword which is supposed to show up!")
sys.exit(1)
- self.report_result('benchmarkpi', run_result, score_number, score_unit)
+ self.report_result("benchmarkpi", run_result, score_number, score_unit)
diff --git a/automated/android/apk-automation/caffeinemark.py b/automated/android/apk-automation/caffeinemark.py
index cf0f5a4..5852f83 100755
--- a/automated/android/apk-automation/caffeinemark.py
+++ b/automated/android/apk-automation/caffeinemark.py
@@ -6,17 +6,17 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'com.flexycore.caffeinemark-1.apk'
- self.config['apk_package'] = 'com.flexycore.caffeinemark'
- self.config['activity'] = 'com.flexycore.caffeinemark/.Application'
+ self.config["apk_file_name"] = "com.flexycore.caffeinemark-1.apk"
+ self.config["apk_package"] = "com.flexycore.caffeinemark"
+ self.config["activity"] = "com.flexycore.caffeinemark/.Application"
super(ApkRunnerImpl, self).__init__(self.config)
def setUp(self):
- self.call_adb('shell setenforce 0')
+ self.call_adb("shell setenforce 0")
super(ApkRunnerImpl, self).setUp()
def tearDown(self):
- self.call_adb('shell setenforce 1')
+ self.call_adb("shell setenforce 1")
super(ApkRunnerImpl, self).tearDown()
def execute(self):
@@ -24,16 +24,20 @@
while not find_start_btn:
time.sleep(2)
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
else:
- start_button = self.vc.findViewByIdOrRaise("com.flexycore.caffeinemark:id/startButton")
+ start_button = self.vc.findViewByIdOrRaise(
+ "com.flexycore.caffeinemark:id/startButton"
+ )
start_button.touch()
find_start_btn = True
@@ -41,21 +45,31 @@
while not finished:
try:
self.dump_always()
- self.vc.findViewByIdOrRaise("com.flexycore.caffeinemark:id/testResultsCellOneTitle")
+ self.vc.findViewByIdOrRaise(
+ "com.flexycore.caffeinemark:id/testResultsCellOneTitle"
+ )
finished = True
except ViewNotFoundException:
- self.logger.info("ViewNotFoundException when tried to find com.flexycore.caffeinemark:id/testResultsCellOneTitle")
+ self.logger.info(
+ "ViewNotFoundException when tried to find com.flexycore.caffeinemark:id/testResultsCellOneTitle"
+ )
pass
except RuntimeError:
- self.logger.info("RuntimeError when tried to find com.flexycore.caffeinemark:id/testResultsCellOneTitle")
+ self.logger.info(
+ "RuntimeError when tried to find com.flexycore.caffeinemark:id/testResultsCellOneTitle"
+ )
pass
self.logger.info("benchmark finished")
def parseResult(self):
- total_score = self.vc.findViewByIdOrRaise("com.flexycore.caffeinemark:id/testResultEntryOverAllScore").getText()
- self.report_result("Caffeinemark-score", 'pass', total_score, 'points')
+ total_score = self.vc.findViewByIdOrRaise(
+ "com.flexycore.caffeinemark:id/testResultEntryOverAllScore"
+ ).getText()
+ self.report_result("Caffeinemark-score", "pass", total_score, "points")
- details_button = self.vc.findViewByIdOrRaise("com.flexycore.caffeinemark:id/testResultsDetailsButton")
+ details_button = self.vc.findViewByIdOrRaise(
+ "com.flexycore.caffeinemark:id/testResultsDetailsButton"
+ )
details_button.touch()
time.sleep(2)
@@ -63,24 +77,24 @@
sieve_name = self.vc.findViewByIdOrRaise("id/no_id/9").getText()
sieve_score = self.vc.findViewByIdOrRaise("id/no_id/10").getText()
- self.report_result("Caffeinemark-Sieve-score", 'pass', sieve_score, 'points')
+ self.report_result("Caffeinemark-Sieve-score", "pass", sieve_score, "points")
loop_name = self.vc.findViewByIdOrRaise("id/no_id/13").getText()
loop_score = self.vc.findViewByIdOrRaise("id/no_id/14").getText()
- self.report_result("Caffeinemark-Loop-score", 'pass', loop_score, 'points')
+ self.report_result("Caffeinemark-Loop-score", "pass", loop_score, "points")
logic_name = self.vc.findViewByIdOrRaise("id/no_id/17").getText()
logic_score = self.vc.findViewByIdOrRaise("id/no_id/18").getText()
- self.report_result("Caffeinemark-Collect-score", 'pass', logic_score, 'points')
+ self.report_result("Caffeinemark-Collect-score", "pass", logic_score, "points")
string_name = self.vc.findViewByIdOrRaise("id/no_id/21").getText()
string_score = self.vc.findViewByIdOrRaise("id/no_id/22").getText()
- self.report_result("Caffeinemark-String-score", 'pass', string_score, 'points')
+ self.report_result("Caffeinemark-String-score", "pass", string_score, "points")
float_name = self.vc.findViewByIdOrRaise("id/no_id/25").getText()
float_score = self.vc.findViewByIdOrRaise("id/no_id/26").getText()
- self.report_result("Caffeinemark-Float-score", 'pass', float_score, 'points')
+ self.report_result("Caffeinemark-Float-score", "pass", float_score, "points")
method_name = self.vc.findViewByIdOrRaise("id/no_id/29").getText()
method_score = self.vc.findViewByIdOrRaise("id/no_id/30").getText()
- self.report_result("Caffeinemark-Method-score", 'pass', method_score, 'points')
+ self.report_result("Caffeinemark-Method-score", "pass", method_score, "points")
diff --git a/automated/android/apk-automation/cf-bench.py b/automated/android/apk-automation/cf-bench.py
index f6646dd..24bab57 100755
--- a/automated/android/apk-automation/cf-bench.py
+++ b/automated/android/apk-automation/cf-bench.py
@@ -8,9 +8,9 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'CF-Bench-Pro-1.3.apk'
- self.config['apk_package'] = 'eu.chainfire.cfbench'
- self.config['activity'] = 'eu.chainfire.cfbench/.MainActivity'
+ self.config["apk_file_name"] = "CF-Bench-Pro-1.3.apk"
+ self.config["apk_package"] = "eu.chainfire.cfbench"
+ self.config["activity"] = "eu.chainfire.cfbench/.MainActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
@@ -18,11 +18,13 @@
while not find_start_btn:
time.sleep(2)
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
@@ -38,7 +40,9 @@
try:
time.sleep(5)
self.dump_always()
- self.vc.findViewByIdOrRaise("eu.chainfire.cfbench:id/admob_preference_layout")
+ self.vc.findViewByIdOrRaise(
+ "eu.chainfire.cfbench:id/admob_preference_layout"
+ )
finished = True
except ViewNotFoundException:
pass
@@ -54,7 +58,7 @@
while not found_score_view:
score_view = self.vc.findViewWithText(content_desc)
if not score_view:
- self.device.press('DPAD_DOWN')
+ self.device.press("DPAD_DOWN")
time.sleep(2)
try:
self.dump_always()
@@ -66,15 +70,25 @@
found_score_view = True
score_uid = score_view.getUniqueId()
- uid = int(re.search(r"id/no_id/(?P<uid>\d+)", score_uid).group('uid'))
+ uid = int(re.search(r"id/no_id/(?P<uid>\d+)", score_uid).group("uid"))
score = self.vc.findViewByIdOrRaise("id/no_id/%s" % (uid + offset))
score_text = score.getText()
if score_text.find("%") > 0:
score_value, units = score_text.split(" ")
- self.report_result("cfbench-" + content_desc.replace(" ", "-"), 'pass', score_value, units)
+ self.report_result(
+ "cfbench-" + content_desc.replace(" ", "-"),
+ "pass",
+ score_value,
+ units,
+ )
else:
- self.report_result("cfbench-" + content_desc.replace(" ", "-"), 'pass', score_text, 'points')
+ self.report_result(
+ "cfbench-" + content_desc.replace(" ", "-"),
+ "pass",
+ score_text,
+ "points",
+ )
except ViewNotFoundException:
self.logger.error("%s not found" % content_desc)
pass
diff --git a/automated/android/apk-automation/common/__init__.py b/automated/android/apk-automation/common/__init__.py
index db68292..29e671e 100755
--- a/automated/android/apk-automation/common/__init__.py
+++ b/automated/android/apk-automation/common/__init__.py
@@ -9,6 +9,7 @@
import subprocess
import sys
import time
+
try:
import urlparse
except ImportError:
@@ -20,49 +21,53 @@
def __init__(self, config):
self.config = config
- self.logger = logging.getLogger(self.config['name'])
+ self.logger = logging.getLogger(self.config["name"])
self.logger.setLevel(logging.INFO)
- if self.config.get('verbose') and self.config['verbose']:
+ if self.config.get("verbose") and self.config["verbose"]:
self.logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ )
ch.setFormatter(formatter)
self.logger.addHandler(ch)
- self.config['output'] = os.getenv("OUTPUT", "./output/%s" % config['name'])
- if os.path.exists(self.config['output']):
- suffix = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
- shutil.move(self.config['output'], '%s-%s' % (self.config['output'], suffix))
- os.makedirs(self.config['output'])
+ self.config["output"] = os.getenv("OUTPUT", "./output/%s" % config["name"])
+ if os.path.exists(self.config["output"]):
+ suffix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ shutil.move(
+ self.config["output"], "%s-%s" % (self.config["output"], suffix)
+ )
+ os.makedirs(self.config["output"])
self.results = []
- serialno = os.getenv('ANDROID_SERIAL')
+ serialno = os.getenv("ANDROID_SERIAL")
if serialno is None:
- serialno = '.*'
- kwargs1 = {
- 'serialno': serialno,
- 'verbose': True,
- 'ignoresecuredevice': False}
- self.logger.debug('VC kwargs1: %s' % kwargs1)
+ serialno = ".*"
+ kwargs1 = {"serialno": serialno, "verbose": True, "ignoresecuredevice": False}
+ self.logger.debug("VC kwargs1: %s" % kwargs1)
self.device, self.serialno = ViewClient.connectToDeviceOrExit(**kwargs1)
kwargs2 = {
- 'startviewserver': True,
- 'forceviewserveruse': False,
- 'autodump': False,
- 'ignoreuiautomatorkilled': True,
- 'compresseddump': False}
- self.logger.debug('VC kwargs2: %s' % kwargs2)
+ "startviewserver": True,
+ "forceviewserveruse": False,
+ "autodump": False,
+ "ignoreuiautomatorkilled": True,
+ "compresseddump": False,
+ }
+ self.logger.debug("VC kwargs2: %s" % kwargs2)
self.vc = ViewClient(self.device, self.serialno, **kwargs2)
def run(self):
self.validate()
- for i in range(1, self.config['loops'] + 1):
+ for i in range(1, self.config["loops"] + 1):
try:
- self.logger.info('Running iteration [%s/%s]' % (i, self.config['loops']))
- self.config['itr'] = i
- self.logger.info('Test config: %s' % self.config)
+ self.logger.info(
+ "Running iteration [%s/%s]" % (i, self.config["loops"])
+ )
+ self.config["itr"] = i
+ self.logger.info("Test config: %s" % self.config)
self.setUp()
self.execute()
self.parseResult()
@@ -70,7 +75,7 @@
self.tearDown()
except Exception as e:
self.take_screencap()
- self.report_result(self.config['name'], 'fail')
+ self.report_result(self.config["name"], "fail")
self.logger.error(e, exc_info=True)
sys.exit(1)
@@ -84,49 +89,51 @@
units = str(units)
tc_name = str(name)
- if self.config['loops'] > 1 and self.config['itr'] != 'stats':
- tc_name = '%s-itr%s' % (name, self.config['itr'])
+ if self.config["loops"] > 1 and self.config["itr"] != "stats":
+ tc_name = "%s-itr%s" % (name, self.config["itr"])
- result_string = '%s %s %s %s' % (tc_name, result, score, units)
+ result_string = "%s %s %s %s" % (tc_name, result, score, units)
if score is None:
- result_string = '%s %s' % (tc_name, result)
+ result_string = "%s %s" % (tc_name, result)
if score is not None and units is None:
- result_string = '%s %s %s' % (tc_name, result, score)
+ result_string = "%s %s %s" % (tc_name, result, score)
- self.logger.info('TestResult: %s' % result_string)
- with open('%s/result.txt' % self.config['output'], 'a') as f:
- f.write('%s\n' % result_string)
+ self.logger.info("TestResult: %s" % result_string)
+ with open("%s/result.txt" % self.config["output"], "a") as f:
+ f.write("%s\n" % result_string)
# Save result to results for post processing.
- result = {'itr': self.config['itr'],
- 'test_case_id': str(name),
- 'result': str(result),
- 'measurement': score,
- 'units': units}
+ result = {
+ "itr": self.config["itr"],
+ "test_case_id": str(name),
+ "result": str(result),
+ "measurement": score,
+ "units": units,
+ }
self.results.append(result)
def statistics_result(self):
- if self.config['loops'] == 1:
+ if self.config["loops"] == 1:
return
- self.config['itr'] = 'stats'
+ self.config["itr"] = "stats"
tc_list = []
for result in self.results:
- if result['measurement'] is not None:
- tc_list.append(result['test_case_id'])
+ if result["measurement"] is not None:
+ tc_list.append(result["test_case_id"])
tc_list = set(tc_list)
for tc in tc_list:
ms_list = []
for result in self.results:
- if result['test_case_id'] == tc:
- ms_list.append(result['measurement'])
+ if result["test_case_id"] == tc:
+ ms_list.append(result["measurement"])
- units = ''
+ units = ""
for result in self.results:
- if result['test_case_id'] == tc:
- units = result['units']
+ if result["test_case_id"] == tc:
+ units = result["units"]
break
# Calculate and report population standard deviation and standard error.
@@ -134,28 +141,28 @@
variance = sum([(e - mean) ** 2 for e in ms_list]) / len(ms_list)
pstdev = math.sqrt(variance)
pstderr = pstdev / math.sqrt(len(ms_list))
- self.report_result('%s-min' % tc, 'pass', min(ms_list), units)
- self.report_result('%s-max' % tc, 'pass', max(ms_list), units)
- self.report_result('%s-mean' % tc, 'pass', mean, units)
- self.report_result('%s-sigma' % tc, 'pass', pstdev, units)
- self.report_result('%s-stderr' % tc, 'pass', pstderr, units)
+ self.report_result("%s-min" % tc, "pass", min(ms_list), units)
+ self.report_result("%s-max" % tc, "pass", max(ms_list), units)
+ self.report_result("%s-mean" % tc, "pass", mean, units)
+ self.report_result("%s-sigma" % tc, "pass", pstdev, units)
+ self.report_result("%s-stderr" % tc, "pass", pstderr, units)
def result_post_processing(self):
self.statistics_result()
# Save results to output/name/name-result.csv.
- fieldnames = ['itr', 'test_case_id', 'result', 'measurement', 'units']
- with open('%s/result.csv' % self.config['output'], 'w') as f:
+ fieldnames = ["itr", "test_case_id", "result", "measurement", "units"]
+ with open("%s/result.csv" % self.config["output"], "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for result in self.results:
writer.writerow(result)
- self.logger.info('Result saved to %s/result.csv' % self.config['output'])
+ self.logger.info("Result saved to %s/result.csv" % self.config["output"])
# Save results to output/name/name-result.json
- with open('%s/result.json' % self.config['output'], 'w') as f:
+ with open("%s/result.json" % self.config["output"], "w") as f:
json.dump([self.results], f, indent=4)
- self.logger.info('Result saved to %s/result.json' % self.config['output'])
+ self.logger.info("Result saved to %s/result.json" % self.config["output"])
def dump_always(self):
success = False
@@ -182,30 +189,29 @@
sys.exit(1)
def validate(self):
- if self.config['apk_file_name'] is None:
+ if self.config["apk_file_name"] is None:
self.logger.error("APK file name not set")
sys.exit(1)
- if self.config['apk_package'] is None:
+ if self.config["apk_package"] is None:
self.logger.error("APK package name not set")
sys.exit(1)
- if self.config['activity'] is None:
+ if self.config["activity"] is None:
self.logger.error("Activity name not set")
sys.exit(1)
def download_apk(self, apk_name):
# download APK if not already downloaded
- apk_path = os.path.join(os.path.abspath(self.config['apk_dir']),
- apk_name)
+ apk_path = os.path.join(os.path.abspath(self.config["apk_dir"]), apk_name)
if not os.path.isfile(apk_path):
# create directory for downloaded files
if not os.path.exists(os.path.dirname(apk_path)):
os.makedirs(os.path.dirname(apk_path))
- if self.config['base_url'].startswith("scp://"):
+ if self.config["base_url"].startswith("scp://"):
# like scp://user@host:/abs_path
- base_url = self.config['base_url']
+ base_url = self.config["base_url"]
remote_dir = base_url.split(":")[2]
user_host = base_url.split(":")[1].replace("/", "")
@@ -213,87 +219,97 @@
user = user_host.split("@")[0]
remote_path = "%s/%s" % (remote_dir, apk_name)
- scp_cmdline = "scp %s@%s:%s %s" % (user, host,
- remote_path, apk_path)
+ scp_cmdline = "scp %s@%s:%s %s" % (user, host, remote_path, apk_path)
ret = os.system(scp_cmdline)
if ret != 0:
- self.logger.info('Failed to run command: %s' % scp_cmdline)
+ self.logger.info("Failed to run command: %s" % scp_cmdline)
sys.exit(1)
else:
- apk_url = urlparse.urljoin(self.config['base_url'], apk_name)
- self.logger.info('Start downloading file: %s' % apk_url)
+ apk_url = urlparse.urljoin(self.config["base_url"], apk_name)
+ self.logger.info("Start downloading file: %s" % apk_url)
r = requests.get(apk_url, stream=True)
if r.status_code == 200:
- with open(apk_path, 'wb') as f:
+ with open(apk_path, "wb") as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
else:
- self.logger.info('Failed to download file: %s' % apk_url)
+ self.logger.info("Failed to download file: %s" % apk_url)
sys.exit(1)
else:
- self.logger.info('APK file already exists: %s' % apk_name)
+ self.logger.info("APK file already exists: %s" % apk_name)
def install_apk(self, apk_name):
- apk_path = os.path.join(os.path.abspath(self.config['apk_dir']), apk_name)
- self.logger.info('Installing %s' % os.path.basename(apk_path))
+ apk_path = os.path.join(os.path.abspath(self.config["apk_dir"]), apk_name)
+ self.logger.info("Installing %s" % os.path.basename(apk_path))
self.call_adb("install %s" % apk_path)
def uninstall_apk(self, package):
- install_packages = subprocess.check_output(['adb', 'shell', 'pm', 'list', 'packages'])
+ install_packages = subprocess.check_output(
+ ["adb", "shell", "pm", "list", "packages"]
+ )
if package in str(install_packages):
- self.logger.info('Stopping %s' % package)
+ self.logger.info("Stopping %s" % package)
self.call_adb("shell am force-stop %s" % package)
- self.logger.info('Uninstalling %s' % package)
+ self.logger.info("Uninstalling %s" % package)
self.call_adb("shell pm uninstall %s" % package)
def take_screencap(self):
- screencap_file = '/data/local/tmp/%s-itr%s.png' % (self.config['name'], self.config['itr'])
- self.call_adb('shell screencap %s' % screencap_file)
- self.logger.info('Pulling %s to output directory...' % screencap_file)
- self.call_adb('pull %s %s' % (screencap_file, self.config['output']))
+ screencap_file = "/data/local/tmp/%s-itr%s.png" % (
+ self.config["name"],
+ self.config["itr"],
+ )
+ self.call_adb("shell screencap %s" % screencap_file)
+ self.logger.info("Pulling %s to output directory..." % screencap_file)
+ self.call_adb("pull %s %s" % (screencap_file, self.config["output"]))
def collect_log(self):
- self.logger.info("Saving logcat.log, logcat-events.log and dmesg.log to output directory...")
- self.call_adb('logcat -d -v time > %s/logcat.log' % self.config['output'])
- self.call_adb('logcat -d -b events -v time > %s/logcat-events.log' % self.config['output'])
- self.call_adb('shell dmesg > %s/dmesg.log' % self.config['output'])
+ self.logger.info(
+ "Saving logcat.log, logcat-events.log and dmesg.log to output directory..."
+ )
+ self.call_adb("logcat -d -v time > %s/logcat.log" % self.config["output"])
+ self.call_adb(
+ "logcat -d -b events -v time > %s/logcat-events.log" % self.config["output"]
+ )
+ self.call_adb("shell dmesg > %s/dmesg.log" % self.config["output"])
def set_performance_governor(self, target_governor="performance"):
- if self.config.get('set_governor_policy') is not None \
- and self.config.get('set_governor_policy') is False:
+ if (
+ self.config.get("set_governor_policy") is not None
+ and self.config.get("set_governor_policy") is False
+ ):
return
- f_scaling_governor = ('/sys/devices/system/cpu/'
- 'cpu0/cpufreq/scaling_governor')
- f_governor_backup = '/data/local/tmp/scaling_governor'
- dir_sys_cpu = '/sys/devices/system/cpu/'
- self.call_adb('shell "cat %s>%s"' % (f_scaling_governor,
- f_governor_backup))
+ f_scaling_governor = "/sys/devices/system/cpu/" "cpu0/cpufreq/scaling_governor"
+ f_governor_backup = "/data/local/tmp/scaling_governor"
+ dir_sys_cpu = "/sys/devices/system/cpu/"
+ self.call_adb('shell "cat %s>%s"' % (f_scaling_governor, f_governor_backup))
- f_cpus_remote = '/data/local/tmp/cpus.txt'
- self.call_adb('shell "ls -d %s/cpu[0-9]* >%s"' % (dir_sys_cpu,
- f_cpus_remote))
- f_cpus_local = os.path.join(os.path.abspath(self.config['output']),
- 'cpus.txt')
- self.call_adb('pull %s %s' % (f_cpus_remote, f_cpus_local))
- with open(f_cpus_local, 'r') as f:
+ f_cpus_remote = "/data/local/tmp/cpus.txt"
+ self.call_adb('shell "ls -d %s/cpu[0-9]* >%s"' % (dir_sys_cpu, f_cpus_remote))
+ f_cpus_local = os.path.join(os.path.abspath(self.config["output"]), "cpus.txt")
+ self.call_adb("pull %s %s" % (f_cpus_remote, f_cpus_local))
+ with open(f_cpus_local, "r") as f:
for cpu in f.readlines():
- self.call_adb('shell "echo %s>%s/cpufreq/'
- 'scaling_governor"' % (target_governor,
- cpu.strip()))
+ self.call_adb(
+ 'shell "echo %s>%s/cpufreq/'
+ 'scaling_governor"' % (target_governor, cpu.strip())
+ )
def set_back_governor(self):
- if self.config.get('set_governor_policy') is not None \
- and self.config.get('set_governor_policy') is False:
+ if (
+ self.config.get("set_governor_policy") is not None
+ and self.config.get("set_governor_policy") is False
+ ):
return
- dir_sys_cpu = '/sys/devices/system/cpu/'
- f_governor_backup = '/data/local/tmp/scaling_governor'
- f_governor_local = os.path.join(os.path.abspath(self.config['output']),
- 'scaling_governor')
- self.call_adb('pull %s %s' % (f_governor_backup, f_governor_local))
- with open(f_governor_local, 'r') as f:
+ dir_sys_cpu = "/sys/devices/system/cpu/"
+ f_governor_backup = "/data/local/tmp/scaling_governor"
+ f_governor_local = os.path.join(
+ os.path.abspath(self.config["output"]), "scaling_governor"
+ )
+ self.call_adb("pull %s %s" % (f_governor_backup, f_governor_local))
+ with open(f_governor_local, "r") as f:
contents = f.readlines()
if len(contents) > 0:
gov_policy = contents[0].strip()
@@ -303,9 +319,9 @@
# set to peformance governor policay
self.set_performance_governor()
# Install APK.
- self.download_apk(self.config['apk_file_name'])
- self.uninstall_apk(self.config['apk_package'])
- self.install_apk(self.config['apk_file_name'])
+ self.download_apk(self.config["apk_file_name"])
+ self.uninstall_apk(self.config["apk_package"])
+ self.install_apk(self.config["apk_file_name"])
# Clear logcat buffer.
self.call_adb("logcat -c")
@@ -313,8 +329,8 @@
time.sleep(3)
# Start intent.
- self.logger.info('Starting %s' % self.config['apk_package'])
- self.call_adb("shell am start -W -S %s" % self.config['activity'])
+ self.logger.info("Starting %s" % self.config["apk_package"])
+ self.call_adb("shell am start -W -S %s" % self.config["activity"])
time.sleep(5)
def execute(self):
@@ -324,5 +340,5 @@
raise NotImplementedError
def tearDown(self):
- self.uninstall_apk(self.config['apk_package'])
+ self.uninstall_apk(self.config["apk_package"])
self.set_back_governor()
diff --git a/automated/android/apk-automation/gearses2eclair.py b/automated/android/apk-automation/gearses2eclair.py
index c0251f7..6cabb01 100755
--- a/automated/android/apk-automation/gearses2eclair.py
+++ b/automated/android/apk-automation/gearses2eclair.py
@@ -7,43 +7,52 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "GearsES2eclair-20141021.apk"
- self.config['apk_package'] = "com.jeffboody.GearsES2eclair"
- self.config['activity'] = "com.jeffboody.GearsES2eclair/.GearsES2eclair"
+ self.config["apk_file_name"] = "GearsES2eclair-20141021.apk"
+ self.config["apk_package"] = "com.jeffboody.GearsES2eclair"
+ self.config["activity"] = "com.jeffboody.GearsES2eclair/.GearsES2eclair"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
- self.logger.info('Running GearsES2eclair for 60 seconds...')
+ self.logger.info("Running GearsES2eclair for 60 seconds...")
self.dump_always()
- message_obj = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
+ message_obj = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
if message_obj:
- button1 = self.vc.findViewWithTextOrRaise(u'OK')
+ button1 = self.vc.findViewWithTextOrRaise(u"OK")
button1.touch()
time.sleep(60)
def parseResult(self):
- raw_output_file = "%s/logcat-gearses2eclair-itr%s.log" % (self.config['output'], self.config['itr'])
- self.call_adb('logcat -d > %s' % raw_output_file)
+ raw_output_file = "%s/logcat-gearses2eclair-itr%s.log" % (
+ self.config["output"],
+ self.config["itr"],
+ )
+ self.call_adb("logcat -d > %s" % raw_output_file)
# 08-29 01:25:29.491 4704 4728 I a3d : a3d_GLES_dump@566 fps=58
- fps_pattern = re.compile(r'^.*\s+:\s+a3d_GLES_dump@\d+\s+fps=(?P<fps>\d+)\s*$')
+ fps_pattern = re.compile(r"^.*\s+:\s+a3d_GLES_dump@\d+\s+fps=(?P<fps>\d+)\s*$")
result_collector = []
with open(raw_output_file, "r") as logfile:
for line in logfile:
matches = fps_pattern.match(line)
if matches:
- result_collector.append(matches.group('fps'))
+ result_collector.append(matches.group("fps"))
- self.logger.info('result_collector: %s' % result_collector)
+ self.logger.info("result_collector: %s" % result_collector)
if len(result_collector) > 0:
- average_fps = sum(float(element) for element in result_collector) / len(result_collector)
+ average_fps = sum(float(element) for element in result_collector) / len(
+ result_collector
+ )
score_number = average_fps
run_result = "pass"
- self.logger.info("The average FPS in this test run is %s" % str(score_number))
+ self.logger.info(
+ "The average FPS in this test run is %s" % str(score_number)
+ )
else:
self.logger.error("The collector is empty, no actual result received!")
sys.exit(1)
- self.report_result('gearses2eclair', run_result, score_number, "fps")
+ self.report_result("gearses2eclair", run_result, score_number, "fps")
diff --git a/automated/android/apk-automation/geekbench3.py b/automated/android/apk-automation/geekbench3.py
index bcfb188..8f74ffa 100755
--- a/automated/android/apk-automation/geekbench3.py
+++ b/automated/android/apk-automation/geekbench3.py
@@ -10,21 +10,23 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "com.primatelabs.geekbench3.apk"
- self.config['apk_package'] = "com.primatelabs.geekbench3"
- self.config['activity'] = "com.primatelabs.geekbench3/.HomeActivity"
+ self.config["apk_file_name"] = "com.primatelabs.geekbench3.apk"
+ self.config["apk_package"] = "com.primatelabs.geekbench3"
+ self.config["activity"] = "com.primatelabs.geekbench3/.HomeActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def all_fail(self):
- self.report_result('geekbench-run', 'fail')
- self.report_result('geekbench-single-core', 'skip')
- self.report_result('geekbench-multi-core', 'skip')
+ self.report_result("geekbench-run", "fail")
+ self.report_result("geekbench-single-core", "skip")
+ self.report_result("geekbench-multi-core", "skip")
def execute(self):
try:
time.sleep(2)
self.dump_always()
- trigger = self.vc.findViewByIdOrRaise(self.config['apk_package'] + ":id/runBenchmarks")
+ trigger = self.vc.findViewByIdOrRaise(
+ self.config["apk_package"] + ":id/runBenchmarks"
+ )
trigger.touch()
self.logger.info("Geekbench 3 Test Started!")
except ViewNotFoundException:
@@ -33,7 +35,7 @@
sys.exit(1)
finished = False
- while (not finished):
+ while not finished:
time.sleep(10)
self.dump_always()
flag = self.vc.findViewWithText("RESULT")
@@ -44,12 +46,14 @@
elif in_progress:
self.logger.info("Geekbench 3 Test is still in progress...")
else:
- self.logger.error("Something goes wrong! It is unusual that the test has not been started after 10+ seconds! Please manually check it!")
+ self.logger.error(
+ "Something goes wrong! It is unusual that the test has not been started after 10+ seconds! Please manually check it!"
+ )
# self.all_fail()
# sys.exit(1)
# Generate the .gb3 file
- self.device.press('KEYCODE_MENU')
+ self.device.press("KEYCODE_MENU")
time.sleep(1)
self.dump_always()
share_button = self.vc.findViewWithText("Share")
@@ -57,16 +61,26 @@
share_button.touch()
time.sleep(5)
else:
- self.logger.error("Can not find the Share button to generate .gb3 file! Please check the screen!")
+ self.logger.error(
+ "Can not find the Share button to generate .gb3 file! Please check the screen!"
+ )
sys.exit(1)
def parseResult(self):
- raw_output_file = '%s/geekbench3-result-itr%s.gb3' % (self.config['output'], self.config['itr'])
- self.logger.info('Pulling /data/user/0/com.primatelabs.geekbench3/files to output directory...')
- self.call_adb('pull /data/user/0/com.primatelabs.geekbench3/files %s/files' % self.config['output'])
- db_file_list = glob.glob('%s/files/*.gb3' % self.config['output'])
+ raw_output_file = "%s/geekbench3-result-itr%s.gb3" % (
+ self.config["output"],
+ self.config["itr"],
+ )
+ self.logger.info(
+ "Pulling /data/user/0/com.primatelabs.geekbench3/files to output directory..."
+ )
+ self.call_adb(
+ "pull /data/user/0/com.primatelabs.geekbench3/files %s/files"
+ % self.config["output"]
+ )
+ db_file_list = glob.glob("%s/files/*.gb3" % self.config["output"])
if len(db_file_list) > 1:
- self.logger.error('More then one db file found...')
+ self.logger.error("More then one db file found...")
sys.exit(1)
db_file = db_file_list[0]
os.rename(db_file, raw_output_file)
@@ -84,13 +98,19 @@
# Find the ending point with the information we want
endpoint = line.find(endpoint_keyword)
if endpoint == -1:
- self.logger.error("Can not find %s in log file! Please manually check it!" % endpoint_keyword)
+ self.logger.error(
+ "Can not find %s in log file! Please manually check it!"
+ % endpoint_keyword
+ )
self.all_fail()
sys.exit(1)
else:
self.report_result("geekbench-run", "pass")
result_cut = line[0:endpoint].split(",")
- result_cut = [element.replace('"', '').replace(' ', '') for element in result_cut]
+ result_cut = [
+ element.replace('"', "").replace(" ", "")
+ for element in result_cut
+ ]
for item in result_cut:
if singlecore_keyword == item.split(":")[0]:
singlecore_result[singlecore_keyword] = item.split(":")[1]
@@ -98,18 +118,32 @@
multicore_result[multicore_keyword] = item.split(":")[1]
if len(singlecore_result) != 1:
run_result = "fail"
- self.logger.error("Incorrect value for single core test result! Please check the test result file!")
- self.report_result('geekbench-single-core', run_result)
+ self.logger.error(
+ "Incorrect value for single core test result! Please check the test result file!"
+ )
+ self.report_result("geekbench-single-core", run_result)
else:
run_result = "pass"
- self.report_result('geekbench-single-core', run_result, singlecore_result[singlecore_keyword], 'points')
+ self.report_result(
+ "geekbench-single-core",
+ run_result,
+ singlecore_result[singlecore_keyword],
+ "points",
+ )
if len(multicore_result) != 1:
run_result = "fail"
- self.logger.error("Incorrect value for multi core test result! Please check the test result file!")
- self.report_result('geekbench-multi-core', run_result)
+ self.logger.error(
+ "Incorrect value for multi core test result! Please check the test result file!"
+ )
+ self.report_result("geekbench-multi-core", run_result)
else:
run_result = "pass"
- self.report_result('geekbench-multi-core', run_result, multicore_result[multicore_keyword], 'points')
+ self.report_result(
+ "geekbench-multi-core",
+ run_result,
+ multicore_result[multicore_keyword],
+ "points",
+ )
logfile.close()
else:
@@ -118,4 +152,4 @@
def tearDown(self):
super(ApkRunnerImpl, self).tearDown()
- shutil.rmtree('%s/files/' % self.config['output'])
+ shutil.rmtree("%s/files/" % self.config["output"])
diff --git a/automated/android/apk-automation/geekbench4.py b/automated/android/apk-automation/geekbench4.py
index 829bd1c..ebcf005 100755
--- a/automated/android/apk-automation/geekbench4.py
+++ b/automated/android/apk-automation/geekbench4.py
@@ -19,41 +19,47 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "geekbench-3-4-3-0.apk"
- self.config['apk_package'] = "com.primatelabs.geekbench"
- self.config['activity'] = "com.primatelabs.geekbench/.HomeActivity"
+ self.config["apk_file_name"] = "geekbench-3-4-3-0.apk"
+ self.config["apk_package"] = "com.primatelabs.geekbench"
+ self.config["activity"] = "com.primatelabs.geekbench/.HomeActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def all_fail(self):
- self.report_result('geekbench-run', 'fail')
- self.report_result('geekbench-single-core', 'skip')
- self.report_result('geekbench-multi-core', 'skip')
+ self.report_result("geekbench-run", "fail")
+ self.report_result("geekbench-single-core", "skip")
+ self.report_result("geekbench-multi-core", "skip")
def execute(self):
find_run_btn = False
while not find_run_btn:
time.sleep(5)
self.dump_always()
- agreement = self.vc.findViewWithText(u'By using Geekbench you are agreeing to the terms of the Geekbench End User License Agreement and Privacy Policy.')
+ agreement = self.vc.findViewWithText(
+ u"By using Geekbench you are agreeing to the terms of the Geekbench End User License Agreement and Privacy Policy."
+ )
if agreement:
- accept_btn = self.vc.findViewWithTextOrRaise(u'ACCEPT')
+ accept_btn = self.vc.findViewWithTextOrRaise(u"ACCEPT")
accept_btn.touch()
continue
- no_internet = self.vc.findViewWithText(u'Geekbench encountered an error communicating with the Geekbench Browser. Geekbench requires an active internet connection in order to run benchmarks.')
+ no_internet = self.vc.findViewWithText(
+ u"Geekbench encountered an error communicating with the Geekbench Browser. Geekbench requires an active internet connection in order to run benchmarks."
+ )
if no_internet:
- self.logger.info("Geekbench requires an active internet connection in order to run benchmarks!")
+ self.logger.info(
+ "Geekbench requires an active internet connection in order to run benchmarks!"
+ )
self.all_fail()
sys.exit(1)
- runBench = self.vc.findViewWithText(u'RUN CPU BENCHMARK')
+ runBench = self.vc.findViewWithText(u"RUN CPU BENCHMARK")
if runBench:
runBench.touch()
find_run_btn = True
self.logger.info("Geekbench 4 Test Started!")
finished = False
- while (not finished):
+ while not finished:
time.sleep(10)
self.dump_always()
progress = self.vc.findViewById("android:id/progress")
@@ -62,23 +68,33 @@
self.logger.info("Geekbench 4 Test is still in progress...")
continue
- geekbench_score = self.vc.findViewWithText(u'Geekbench Score')
+ geekbench_score = self.vc.findViewWithText(u"Geekbench Score")
if geekbench_score:
self.logger.info("Geekbench 4 Test Finished!")
finished = True
continue
- self.logger.error("Something goes wrong! It is unusual that the test has not been started after 10+ seconds! Please manually check it!")
+ self.logger.error(
+ "Something goes wrong! It is unusual that the test has not been started after 10+ seconds! Please manually check it!"
+ )
# self.all_fail()
# sys.exit(1)
def parseResult(self):
- raw_output_file = '%s/geekbench3-result-itr%s.json' % (self.config['output'], self.config['itr'])
- self.logger.info('Pulling /data/user/0/com.primatelabs.geekbench/files to output directory...')
- self.call_adb('pull /data/user/0/com.primatelabs.geekbench/files %s/files' % self.config['output'])
- db_file_list = glob.glob('%s/files/*.gb4' % self.config['output'])
+ raw_output_file = "%s/geekbench3-result-itr%s.json" % (
+ self.config["output"],
+ self.config["itr"],
+ )
+ self.logger.info(
+ "Pulling /data/user/0/com.primatelabs.geekbench/files to output directory..."
+ )
+ self.call_adb(
+ "pull /data/user/0/com.primatelabs.geekbench/files %s/files"
+ % self.config["output"]
+ )
+ db_file_list = glob.glob("%s/files/*.gb4" % self.config["output"])
if len(db_file_list) > 1:
- self.logger.error('More then one db file found...')
+ self.logger.error("More then one db file found...")
sys.exit(1)
db_file = db_file_list[0]
os.rename(db_file, raw_output_file)
@@ -86,15 +102,23 @@
if os.path.exists(raw_output_file):
with open(raw_output_file, "r") as read_file:
res_data = json.load(read_file)
- for sec in res_data['sections']:
- self.report_result("Geekbench4-%s" % sec["name"], "pass", sec["score"], 'points')
- sub_testcases = sec['workloads']
+ for sec in res_data["sections"]:
+ self.report_result(
+ "Geekbench4-%s" % sec["name"], "pass", sec["score"], "points"
+ )
+ sub_testcases = sec["workloads"]
for sub_testcase in sub_testcases:
- self.report_result("Geekbench4-%s-%s" % (sec["name"], sub_testcase["name"].replace(' ', '_')), "pass", sub_testcase["score"], 'points')
+ self.report_result(
+ "Geekbench4-%s-%s"
+ % (sec["name"], sub_testcase["name"].replace(" ", "_")),
+ "pass",
+ sub_testcase["score"],
+ "points",
+ )
else:
self.logger.error("Result file does not exist: %s" % raw_output_file)
sys.exit(1)
def tearDown(self):
super(ApkRunnerImpl, self).tearDown()
- shutil.rmtree('%s/files/' % self.config['output'])
+ shutil.rmtree("%s/files/" % self.config["output"])
diff --git a/automated/android/apk-automation/glbenchmark25.py b/automated/android/apk-automation/glbenchmark25.py
index 22e4410..f05597c 100755
--- a/automated/android/apk-automation/glbenchmark25.py
+++ b/automated/android/apk-automation/glbenchmark25.py
@@ -7,25 +7,34 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'GLBenchmark_2.5.1.apk'
- self.config['apk_package'] = 'com.glbenchmark.glbenchmark25'
- self.config['activity'] = 'com.glbenchmark.glbenchmark25/com.glbenchmark.activities.GLBenchmarkDownloaderActivity'
+ self.config["apk_file_name"] = "GLBenchmark_2.5.1.apk"
+ self.config["apk_package"] = "com.glbenchmark.glbenchmark25"
+ self.config[
+ "activity"
+ ] = "com.glbenchmark.glbenchmark25/com.glbenchmark.activities.GLBenchmarkDownloaderActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def setUp(self):
# set to peformance governor policay
# self.set_performance_governor()
# download apk related files
- self.download_apk('main.1.com.glbenchmark.glbenchmark25.obb')
- self.download_apk(self.config['apk_file_name'])
- self.uninstall_apk(self.config['apk_package'])
- self.install_apk(self.config['apk_file_name'])
+ self.download_apk("main.1.com.glbenchmark.glbenchmark25.obb")
+ self.download_apk(self.config["apk_file_name"])
+ self.uninstall_apk(self.config["apk_package"])
+ self.install_apk(self.config["apk_file_name"])
# Push data and config files.
- self.logger.info('Pushing main.1.com.glbenchmark.glbenchmark25.obb to target...')
- self.call_adb('push %s/main.1.com.glbenchmark.glbenchmark25.obb /sdcard/Android/obb/com.glbenchmark.glbenchmark25/main.1.com.glbenchmark.glbenchmark25.obb' % self.config['apk_dir'])
- self.logger.info('Pushing glbenchmark25-preferences.xml to target...')
- self.call_adb('push ./glbenchmark25-preferences.xml /data/data/com.glbenchmark.glbenchmark25/shared_prefs/com.glbenchmark.glbenchmark25_preferences.xml')
+ self.logger.info(
+ "Pushing main.1.com.glbenchmark.glbenchmark25.obb to target..."
+ )
+ self.call_adb(
+ "push %s/main.1.com.glbenchmark.glbenchmark25.obb /sdcard/Android/obb/com.glbenchmark.glbenchmark25/main.1.com.glbenchmark.glbenchmark25.obb"
+ % self.config["apk_dir"]
+ )
+ self.logger.info("Pushing glbenchmark25-preferences.xml to target...")
+ self.call_adb(
+ "push ./glbenchmark25-preferences.xml /data/data/com.glbenchmark.glbenchmark25/shared_prefs/com.glbenchmark.glbenchmark25_preferences.xml"
+ )
# Clear logcat buffer.
self.call_adb("logcat -c")
@@ -33,8 +42,8 @@
time.sleep(3)
# Start intent.
- self.logger.info('Starting %s' % self.config['apk_package'])
- self.call_adb("shell am start -W -S %s" % self.config['activity'])
+ self.logger.info("Starting %s" % self.config["apk_package"])
+ self.call_adb("shell am start -W -S %s" % self.config["activity"])
def execute(self):
selected_all = False
@@ -42,10 +51,14 @@
self.dump_always()
select_all_btn = self.vc.findViewWithText("All")
display_tests_menu = self.vc.findViewWithText("Performance Tests")
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
- attention_msg = self.vc.findViewWithText(u'''Network connection not found!
-Do you want to setup network connection? (If you can not upload the results you will not see it)''')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
+ attention_msg = self.vc.findViewWithText(
+ u"""Network connection not found!
+Do you want to setup network connection? (If you can not upload the results you will not see it)"""
+ )
if select_all_btn:
select_all_btn.touch()
self.logger.info("All selected!")
@@ -55,12 +68,12 @@
self.logger.info("Display all tests to select all")
elif warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
elif attention_msg:
- self.report_result('glbenchmark25-run', 'fail')
+ self.report_result("glbenchmark25-run", "fail")
self.logger.info("Network connection is required")
sys.exit(1)
else:
@@ -70,17 +83,22 @@
# Disable crashed test suites
self.dump_always()
crashed_test_name = "C24Z24MS4"
- self.logger.info('Test suite %s is going to be disabled!' % crashed_test_name)
+ self.logger.info("Test suite %s is going to be disabled!" % crashed_test_name)
crashed_test = self.vc.findViewWithText(crashed_test_name)
if crashed_test is not None:
crashed_test.touch()
- self.logger.info('Test suite %s has been excluded!' % crashed_test_name)
+ self.logger.info("Test suite %s has been excluded!" % crashed_test_name)
else:
- self.logger.info('Can not find test suite %s, please check the screen!' % crashed_test_name)
+ self.logger.info(
+ "Can not find test suite %s, please check the screen!"
+ % crashed_test_name
+ )
# Start selected test suites
self.dump_always()
- start_button = self.vc.findViewByIdOrRaise("com.glbenchmark.glbenchmark25:id/buttonStart")
+ start_button = self.vc.findViewByIdOrRaise(
+ "com.glbenchmark.glbenchmark25:id/buttonStart"
+ )
start_button.touch()
finished = False
@@ -89,7 +107,7 @@
self.dump_always()
flag = self.vc.findViewWithText("Result processing")
if flag is not None:
- self.logger.info('GLBenchmark Test Finished.')
+ self.logger.info("GLBenchmark Test Finished.")
finished = True
# Give up the result upload
cancel_button = self.vc.findViewWithText("Cancel")
@@ -97,9 +115,11 @@
cancel_button.touch()
time.sleep(5)
else:
- self.logger.error('Can not find cancel button! Please check the pop up window!')
+ self.logger.error(
+ "Can not find cancel button! Please check the pop up window!"
+ )
else:
- self.logger.info('GLBenchmark Test is still in progress...')
+ self.logger.info("GLBenchmark Test is still in progress...")
def getText(self, node):
children = node.childNodes
@@ -107,20 +127,24 @@
for node in children:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
- return ''.join(rc)
+ return "".join(rc)
def logparser(self, cached_result_file):
- run_result = 'pass'
+ run_result = "pass"
dom = xml.dom.minidom.parse(cached_result_file)
- results = dom.getElementsByTagName('test_result')
+ results = dom.getElementsByTagName("test_result")
for test in results:
- title = self.getText(test.getElementsByTagName('title')[0])
- test_type = self.getText(test.getElementsByTagName('type')[0])
- score_number = self.getText(test.getElementsByTagName('score')[0])
- fps = self.getText(test.getElementsByTagName('fps')[0])
- score_unit = self.getText(test.getElementsByTagName('uom')[0])
- benchmark_name = title.replace(" ", "-").replace(":", "") + "-" + test_type.replace(" ", "-").replace(":", "")
+ title = self.getText(test.getElementsByTagName("title")[0])
+ test_type = self.getText(test.getElementsByTagName("type")[0])
+ score_number = self.getText(test.getElementsByTagName("score")[0])
+ fps = self.getText(test.getElementsByTagName("fps")[0])
+ score_unit = self.getText(test.getElementsByTagName("uom")[0])
+ benchmark_name = (
+ title.replace(" ", "-").replace(":", "")
+ + "-"
+ + test_type.replace(" ", "-").replace(":", "")
+ )
self.report_result(benchmark_name, run_result, score_number, score_unit)
if fps != "":
@@ -129,8 +153,16 @@
self.report_result(benchmark_name, run_result, score_number, score_unit)
def parseResult(self):
- cached_result_file = '%s/last-results-2.5.1-itr%s.xml' % (self.config['output'], self.config['itr'])
- self.logger.info('pull /data/data/com.glbenchmark.glbenchmark25/cache/last_results_2.5.1.xml to output directory...')
- self.call_adb('pull /data/data/com.glbenchmark.glbenchmark25/cache/last_results_2.5.1.xml %s' % cached_result_file)
+ cached_result_file = "%s/last-results-2.5.1-itr%s.xml" % (
+ self.config["output"],
+ self.config["itr"],
+ )
+ self.logger.info(
+ "pull /data/data/com.glbenchmark.glbenchmark25/cache/last_results_2.5.1.xml to output directory..."
+ )
+ self.call_adb(
+ "pull /data/data/com.glbenchmark.glbenchmark25/cache/last_results_2.5.1.xml %s"
+ % cached_result_file
+ )
self.logparser(cached_result_file)
diff --git a/automated/android/apk-automation/javawhetstone.py b/automated/android/apk-automation/javawhetstone.py
index a2ac10a..21b1d6d 100755
--- a/automated/android/apk-automation/javawhetstone.py
+++ b/automated/android/apk-automation/javawhetstone.py
@@ -7,17 +7,19 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'JavaBenchmark/pure-java-benchmarks/01-Java_Whetstone.apk'
- self.config['apk_package'] = 'com.roywhet'
- self.config['activity'] = 'com.roywhet/.JavaWhetstoneActivity'
+ self.config[
+ "apk_file_name"
+ ] = "JavaBenchmark/pure-java-benchmarks/01-Java_Whetstone.apk"
+ self.config["apk_package"] = "com.roywhet"
+ self.config["activity"] = "com.roywhet/.JavaWhetstoneActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def setUp(self):
- self.call_adb('shell setenforce 0')
+ self.call_adb("shell setenforce 0")
super(ApkRunnerImpl, self).setUp()
def tearDown(self):
- self.call_adb('shell setenforce 1')
+ self.call_adb("shell setenforce 1")
super(ApkRunnerImpl, self).tearDown()
def execute(self):
@@ -25,10 +27,12 @@
while not find_start_btn:
time.sleep(2)
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
else:
btn_run = self.vc.findViewByIdOrRaise("com.roywhet:id/startButton")
@@ -41,10 +45,12 @@
try:
time.sleep(30)
self.dump_always()
- self.jws_results = self.vc.findViewByIdOrRaise("com.roywhet:id/displayDetails")
- if re.search('Total Elapsed Time', self.jws_results.getText()):
+ self.jws_results = self.vc.findViewByIdOrRaise(
+ "com.roywhet:id/displayDetails"
+ )
+ if re.search("Total Elapsed Time", self.jws_results.getText()):
finished = True
- self.logger.info('benchmark finished')
+ self.logger.info("benchmark finished")
except ViewNotFoundException:
pass
except RuntimeError:
@@ -62,20 +68,20 @@
"N6": "MFLOPS",
"N7": "MOPS",
"N8": "MOPS",
- "MWIPS": "MFLOPS"
+ "MWIPS": "MFLOPS",
}
- for line in self.jws_results.getText().split('\n'):
+ for line in self.jws_results.getText().split("\n"):
line = str(line.strip())
- elements = re.split(r'\s+', line)
- if line.startswith('MWIPS'):
- units = key_unit_hash['MWIPS']
+ elements = re.split(r"\s+", line)
+ if line.startswith("MWIPS"):
+ units = key_unit_hash["MWIPS"]
key = "MWIPS"
value = elements[1]
- elif line.startswith('N'):
+ elif line.startswith("N"):
units = key_unit_hash[elements[0]]
key = "%s-%s" % (elements[0], elements[1])
value = elements[2]
else:
continue
- self.report_result('javawhetstone-%s' % key, 'pass', value, units)
+ self.report_result("javawhetstone-%s" % key, "pass", value, units)
diff --git a/automated/android/apk-automation/jbench.py b/automated/android/apk-automation/jbench.py
index 33b8eb6..cab96ab 100755
--- a/automated/android/apk-automation/jbench.py
+++ b/automated/android/apk-automation/jbench.py
@@ -7,9 +7,11 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'JavaBenchmark/pure-java-benchmarks/03-JBench.apk'
- self.config['apk_package'] = 'it.JBench.bench'
- self.config['activity'] = 'it.JBench.bench/it.JBench.jbench.MainActivity'
+ self.config[
+ "apk_file_name"
+ ] = "JavaBenchmark/pure-java-benchmarks/03-JBench.apk"
+ self.config["apk_package"] = "it.JBench.bench"
+ self.config["activity"] = "it.JBench.bench/it.JBench.jbench.MainActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
@@ -18,11 +20,13 @@
time.sleep(2)
self.dump_always()
btn_jbench = self.vc.findViewById("it.JBench.bench:id/button1")
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
@@ -33,16 +37,20 @@
self.logger.info("Nothing found, need to check manually")
finished = False
- while (not finished):
+ while not finished:
try:
time.sleep(5)
self.dump_always()
- results = self.vc.findViewByIdOrRaise("it.JBench.bench:id/textViewResult")
- if re.search(r'^\d+$', results.getText()):
+ results = self.vc.findViewByIdOrRaise(
+ "it.JBench.bench:id/textViewResult"
+ )
+ if re.search(r"^\d+$", results.getText()):
finished = True
print("benchmark finished")
print("%s=%s" % ("JBench", results.getText().strip()))
- self.report_result("jbench", 'pass', results.getText().strip(), 'points')
+ self.report_result(
+ "jbench", "pass", results.getText().strip(), "points"
+ )
except ViewNotFoundException:
pass
except RuntimeError:
diff --git a/automated/android/apk-automation/linpack.py b/automated/android/apk-automation/linpack.py
index 9fba9bf..2f72a99 100755
--- a/automated/android/apk-automation/linpack.py
+++ b/automated/android/apk-automation/linpack.py
@@ -5,9 +5,9 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "com.greenecomputing.linpack-1.apk"
- self.config['apk_package'] = "com.greenecomputing.linpack"
- self.config['activity'] = "com.greenecomputing.linpack/.Linpack"
+ self.config["apk_file_name"] = "com.greenecomputing.linpack-1.apk"
+ self.config["apk_package"] = "com.greenecomputing.linpack"
+ self.config["activity"] = "com.greenecomputing.linpack/.Linpack"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
@@ -16,16 +16,20 @@
while not find_start_btn:
time.sleep(2)
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
else:
- start_single_button = self.vc.findViewByIdOrRaise("com.greenecomputing.linpack:id/btnsingle")
+ start_single_button = self.vc.findViewByIdOrRaise(
+ "com.greenecomputing.linpack:id/btnsingle"
+ )
start_single_button.touch()
find_start_btn = True
@@ -37,14 +41,24 @@
if self.vc.findViewById("com.greenecomputing.linpack:id/btnsingle"):
test_finished = True
- mflops_single_score = self.vc.findViewByIdOrRaise("com.greenecomputing.linpack:id/txtmflops_result")
- time_single_score = self.vc.findViewByIdOrRaise("com.greenecomputing.linpack:id/txttime_result")
- self.report_result('Linpack-MFLOPSSingleScore', 'pass', mflops_single_score.getText(), 'MFLOPS')
- self.report_result('Linpack-TimeSingleScore', 'pass', time_single_score.getText(), 'seconds')
+ mflops_single_score = self.vc.findViewByIdOrRaise(
+ "com.greenecomputing.linpack:id/txtmflops_result"
+ )
+ time_single_score = self.vc.findViewByIdOrRaise(
+ "com.greenecomputing.linpack:id/txttime_result"
+ )
+ self.report_result(
+ "Linpack-MFLOPSSingleScore", "pass", mflops_single_score.getText(), "MFLOPS"
+ )
+ self.report_result(
+ "Linpack-TimeSingleScore", "pass", time_single_score.getText(), "seconds"
+ )
# Multi core test.
self.dump_always()
- start_multi_button = self.vc.findViewByIdOrRaise("com.greenecomputing.linpack:id/btncalculate")
+ start_multi_button = self.vc.findViewByIdOrRaise(
+ "com.greenecomputing.linpack:id/btncalculate"
+ )
start_multi_button.touch()
# using start_single_button to check if the test finished
@@ -55,10 +69,18 @@
if self.vc.findViewById("com.greenecomputing.linpack:id/btnsingle"):
test_finished = True
- mflops_multi_score = self.vc.findViewByIdOrRaise("com.greenecomputing.linpack:id/txtmflops_result")
- time_multi_score = self.vc.findViewByIdOrRaise("com.greenecomputing.linpack:id/txttime_result")
- self.report_result('Linpack-MFLOPSMultiScore', 'pass', mflops_multi_score.getText(), 'MFLOPS')
- self.report_result('Linpack-TimeMultiScore', 'pass', time_multi_score.getText(), 'seconds')
+ mflops_multi_score = self.vc.findViewByIdOrRaise(
+ "com.greenecomputing.linpack:id/txtmflops_result"
+ )
+ time_multi_score = self.vc.findViewByIdOrRaise(
+ "com.greenecomputing.linpack:id/txttime_result"
+ )
+ self.report_result(
+ "Linpack-MFLOPSMultiScore", "pass", mflops_multi_score.getText(), "MFLOPS"
+ )
+ self.report_result(
+ "Linpack-TimeMultiScore", "pass", time_multi_score.getText(), "seconds"
+ )
def parseResult(self):
pass
diff --git a/automated/android/apk-automation/main.py b/automated/android/apk-automation/main.py
index 5320fad..f8b3d3b 100755
--- a/automated/android/apk-automation/main.py
+++ b/automated/android/apk-automation/main.py
@@ -2,22 +2,47 @@
import importlib
parser = ArgumentParser()
-parser.add_argument('-d', '--apk_dir', dest='apk_dir', default='./apks',
- help="Specify APK's directory.")
-parser.add_argument('-u', '--base_url', dest='base_url', default='http://testdata.validation.linaro.org/apks/',
- help="Specify APK's base url.")
-parser.add_argument('-n', '--name', dest='name', default='linpack',
- help='Specify test name.')
-parser.add_argument('-l', '--loops', type=int, dest='loops', default=1,
- help='Set the number of test loops.')
-parser.add_argument('-g', '--governor', action='store_true', dest='set_governor_policy', default=False,
- help='Specify if to set the governor policy to performance')
-parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
- default=False, help='Set the number of test loops.')
+parser.add_argument(
+ "-d", "--apk_dir", dest="apk_dir", default="./apks", help="Specify APK's directory."
+)
+parser.add_argument(
+ "-u",
+ "--base_url",
+ dest="base_url",
+ default="http://testdata.validation.linaro.org/apks/",
+ help="Specify APK's base url.",
+)
+parser.add_argument(
+ "-n", "--name", dest="name", default="linpack", help="Specify test name."
+)
+parser.add_argument(
+ "-l",
+ "--loops",
+ type=int,
+ dest="loops",
+ default=1,
+ help="Set the number of test loops.",
+)
+parser.add_argument(
+ "-g",
+ "--governor",
+ action="store_true",
+ dest="set_governor_policy",
+ default=False,
+ help="Specify if to set the governor policy to performance",
+)
+parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ dest="verbose",
+ default=False,
+ help="Set the number of test loops.",
+)
args = parser.parse_args()
-print('Test job arguments: %s' % args)
+print("Test job arguments: %s" % args)
config = vars(args)
-mod = importlib.import_module(config['name'])
+mod = importlib.import_module(config["name"])
a = mod.ApkRunnerImpl(config)
a.run()
diff --git a/automated/android/apk-automation/quadrantpro.py b/automated/android/apk-automation/quadrantpro.py
index d6fd963..1f08915 100755
--- a/automated/android/apk-automation/quadrantpro.py
+++ b/automated/android/apk-automation/quadrantpro.py
@@ -5,17 +5,21 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'com.aurorasoftworks.quadrant.ui.professional-1.apk'
- self.config['apk_package'] = 'com.aurorasoftworks.quadrant.ui.professional'
- self.config['activity'] = 'com.aurorasoftworks.quadrant.ui.professional/.QuadrantProfessionalLauncherActivity'
+ self.config[
+ "apk_file_name"
+ ] = "com.aurorasoftworks.quadrant.ui.professional-1.apk"
+ self.config["apk_package"] = "com.aurorasoftworks.quadrant.ui.professional"
+ self.config[
+ "activity"
+ ] = "com.aurorasoftworks.quadrant.ui.professional/.QuadrantProfessionalLauncherActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def setUp(self):
- self.call_adb('shell setenforce 0')
+ self.call_adb("shell setenforce 0")
super(ApkRunnerImpl, self).setUp()
def tearDown(self):
- self.call_adb('shell setenforce 1')
+ self.call_adb("shell setenforce 1")
super(ApkRunnerImpl, self).tearDown()
def execute(self):
@@ -23,12 +27,14 @@
while need_continue:
self.dump_always()
view_license_btn = self.vc.findViewWithText("View license")
- run_full_item = self.vc.findViewWithText(u'Run full benchmark')
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ run_full_item = self.vc.findViewWithText(u"Run full benchmark")
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
@@ -44,13 +50,13 @@
# Hack workaround to kill the first time start up
# then it will work from 2nd time
- self.call_adb("shell am force-stop %s" % self.config['apk_package'])
- self.call_adb("shell am start -W -S %s" % self.config['activity'])
+ self.call_adb("shell am force-stop %s" % self.config["apk_package"])
+ self.call_adb("shell am start -W -S %s" % self.config["activity"])
need_continue = True
while need_continue:
self.dump_always()
view_license_btn = self.vc.findViewWithText("View license")
- run_full_item = self.vc.findViewWithText(u'Run full benchmark')
+ run_full_item = self.vc.findViewWithText(u"Run full benchmark")
if view_license_btn:
ok_button = self.vc.findViewWithTextOrRaise("OK")
ok_button.touch()
@@ -65,9 +71,11 @@
while not finished:
try:
self.dump_always()
- self.vc.findViewByIdOrRaise("com.aurorasoftworks.quadrant.ui.professional:id/chart")
+ self.vc.findViewByIdOrRaise(
+ "com.aurorasoftworks.quadrant.ui.professional:id/chart"
+ )
finished = True
- self.logger.info('Benchmark finished')
+ self.logger.info("Benchmark finished")
except ViewNotFoundException:
pass
except RuntimeError:
@@ -76,12 +84,17 @@
pass
def parseResult(self):
- raw_output_file = "%s/logcat-quadrandpro-itr%s.log" % (self.config['output'], self.config['itr'])
- self.call_adb('logcat -d -v brief > %s' % raw_output_file)
+ raw_output_file = "%s/logcat-quadrandpro-itr%s.log" % (
+ self.config["output"],
+ self.config["itr"],
+ )
+ self.call_adb("logcat -d -v brief > %s" % raw_output_file)
with open(raw_output_file) as logfile:
for line in logfile:
- if 'aggregate score is' in line:
- tc_id = line.split()[3].replace('_', '-')
+ if "aggregate score is" in line:
+ tc_id = line.split()[3].replace("_", "-")
measurement = line.split()[-1]
- self.report_result('quadrandpro-%s' % tc_id, 'pass', measurement, 'points')
+ self.report_result(
+ "quadrandpro-%s" % tc_id, "pass", measurement, "points"
+ )
diff --git a/automated/android/apk-automation/rl-sqlite.py b/automated/android/apk-automation/rl-sqlite.py
index c16c201..d231bbb 100755
--- a/automated/android/apk-automation/rl-sqlite.py
+++ b/automated/android/apk-automation/rl-sqlite.py
@@ -5,29 +5,31 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'RL_Benchmark_SQLite_v1.3.apk'
- self.config['apk_package'] = 'com.redlicense.benchmark.sqlite'
- self.config['activity'] = 'com.redlicense.benchmark.sqlite/.Main'
+ self.config["apk_file_name"] = "RL_Benchmark_SQLite_v1.3.apk"
+ self.config["apk_package"] = "com.redlicense.benchmark.sqlite"
+ self.config["activity"] = "com.redlicense.benchmark.sqlite/.Main"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
find_start_btn = False
while not find_start_btn:
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- btn_start = self.vc.findViewWithText(u'Start')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ btn_start = self.vc.findViewWithText(u"Start")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif btn_start:
btn_start.touch()
find_start_btn = True
finished = False
- while(not finished):
+ while not finished:
self.dump_always()
- overall_result = self.vc.findViewWithText(u'Overall')
+ overall_result = self.vc.findViewWithText(u"Overall")
if overall_result:
finished = True
self.logger.info("benchmark finished")
@@ -35,21 +37,35 @@
def __get_score_with_text(self, text):
found_score_view = False
while not found_score_view:
- linear_layout = self.vc.findViewByIdOrRaise("com.redlicense.benchmark.sqlite:id/stats")
+ linear_layout = self.vc.findViewByIdOrRaise(
+ "com.redlicense.benchmark.sqlite:id/stats"
+ )
for ch in linear_layout.children:
subitem = self.vc.findViewWithText(text, ch)
if subitem:
- subitem_result = self.vc.findViewByIdOrRaise("com.redlicense.benchmark.sqlite:id/test_result", ch)
- score = subitem_result.getText().replace("sec", "").replace("Running", "").strip()
+ subitem_result = self.vc.findViewByIdOrRaise(
+ "com.redlicense.benchmark.sqlite:id/test_result", ch
+ )
+ score = (
+ subitem_result.getText()
+ .replace("sec", "")
+ .replace("Running", "")
+ .strip()
+ )
score_in_ms = float(score) * 1000
- self.report_result("RL-sqlite-" + text.replace(" ", "-"), 'pass', str(score_in_ms), "ms")
+ self.report_result(
+ "RL-sqlite-" + text.replace(" ", "-"),
+ "pass",
+ str(score_in_ms),
+ "ms",
+ )
found_score_view = True
break
if subitem is None:
self.logger.info("%s not found, need to pageup" % text)
- self.device.press('DPAD_UP')
+ self.device.press("DPAD_UP")
time.sleep(2)
- self.device.press('DPAD_UP')
+ self.device.press("DPAD_UP")
time.sleep(2)
self.dump_always()
@@ -65,6 +81,8 @@
self.__get_score_with_text("Creating an index")
self.__get_score_with_text("100 SELECTs on a string comparison")
self.__get_score_with_text("100 SELECTs without an index")
- self.__get_score_with_text("25000 INSERTs into an indexed table in a transaction")
+ self.__get_score_with_text(
+ "25000 INSERTs into an indexed table in a transaction"
+ )
self.__get_score_with_text("25000 INSERTs in a transaction")
self.__get_score_with_text("1000 INSERTs")
diff --git a/automated/android/apk-automation/scimark.py b/automated/android/apk-automation/scimark.py
index 9fcc96f..49d06b9 100755
--- a/automated/android/apk-automation/scimark.py
+++ b/automated/android/apk-automation/scimark.py
@@ -6,25 +6,29 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = 'JavaBenchmark/non-pure-java-benchmarks/03-SciMark.apk'
- self.config['apk_package'] = 'net.danielroggen.scimark'
- self.config['activity'] = 'net.danielroggen.scimark/.ActivityMain'
+ self.config[
+ "apk_file_name"
+ ] = "JavaBenchmark/non-pure-java-benchmarks/03-SciMark.apk"
+ self.config["apk_package"] = "net.danielroggen.scimark"
+ self.config["activity"] = "net.danielroggen.scimark/.ActivityMain"
super(ApkRunnerImpl, self).__init__(self.config)
def execute(self):
find_start_btn = False
while not find_start_btn:
self.dump_always()
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
else:
- btn_java_bench = self.vc.findViewWithTextOrRaise(u'Java bench')
+ btn_java_bench = self.vc.findViewWithTextOrRaise(u"Java bench")
btn_java_bench.touch()
find_start_btn = True
@@ -33,7 +37,9 @@
try:
time.sleep(60)
self.dump_always()
- self.sci_results = self.vc.findViewByIdOrRaise("net.danielroggen.scimark:id/textViewResult")
+ self.sci_results = self.vc.findViewByIdOrRaise(
+ "net.danielroggen.scimark:id/textViewResult"
+ )
if self.sci_results.getText().find("Done") > 0:
finished = True
self.logger.info("benchmark finished")
@@ -45,13 +51,28 @@
pass
def parseResult(self):
- keys = ["FFT (1024)", "SOR (100x100)", "Monte Carlo",
- "Sparse matmult (N=1000, nz=5000)", "LU (100x100)", "Composite Score"]
+ keys = [
+ "FFT (1024)",
+ "SOR (100x100)",
+ "Monte Carlo",
+ "Sparse matmult (N=1000, nz=5000)",
+ "LU (100x100)",
+ "Composite Score",
+ ]
for line in self.sci_results.getText().replace(": \n", ":").split("\n"):
line = str(line.strip())
key_val = line.split(":")
if len(key_val) == 2:
if key_val[0].strip() in keys:
- key = key_val[0].strip().replace(' ', '-').replace('(', '').replace(')', '').replace(',', '')
- self.report_result("scimark-" + key, 'pass', key_val[1].strip(), 'Mflops')
+ key = (
+ key_val[0]
+ .strip()
+ .replace(" ", "-")
+ .replace("(", "")
+ .replace(")", "")
+ .replace(",", "")
+ )
+ self.report_result(
+ "scimark-" + key, "pass", key_val[1].strip(), "Mflops"
+ )
diff --git a/automated/android/apk-automation/vellamo3.py b/automated/android/apk-automation/vellamo3.py
index 3d763aa..ff0fce5 100755
--- a/automated/android/apk-automation/vellamo3.py
+++ b/automated/android/apk-automation/vellamo3.py
@@ -7,15 +7,15 @@
class ApkRunnerImpl(ApkTestRunner):
def __init__(self, config):
self.config = config
- self.config['apk_file_name'] = "com.quicinc.vellamo-3.apk"
- self.config['apk_package'] = "com.quicinc.vellamo"
- self.config['activity'] = "com.quicinc.vellamo/.main.MainActivity"
+ self.config["apk_file_name"] = "com.quicinc.vellamo-3.apk"
+ self.config["apk_package"] = "com.quicinc.vellamo"
+ self.config["activity"] = "com.quicinc.vellamo/.main.MainActivity"
super(ApkRunnerImpl, self).__init__(self.config)
def choose_chapter(self, chapter_name):
# ToDo: scroll screen if chapter is not found on the first screen
self.dump_always()
- scroll = self.vc.findViewWithText(u'''LET'S ROLL''')
+ scroll = self.vc.findViewWithText(u"""LET'S ROLL""")
if scroll:
print("Click LET'S ROLL")
scroll.touch()
@@ -23,7 +23,7 @@
chapter_tab = None
self.dump_always()
while chapter_tab is None:
- gotit_button = self.vc.findViewWithText(u'GOT IT')
+ gotit_button = self.vc.findViewWithText(u"GOT IT")
if gotit_button:
print("Click GOT IT")
gotit_button.touch()
@@ -37,7 +37,10 @@
for child in enclosing_tab.children:
if child.getClass() == "android.widget.FrameLayout":
for subchild in child.children:
- if subchild.getId() == "com.quicinc.vellamo:id/card_launcher_run_button":
+ if (
+ subchild.getId()
+ == "com.quicinc.vellamo:id/card_launcher_run_button"
+ ):
subchild.touch()
break
@@ -46,16 +49,22 @@
while need_continue:
self.dump_always()
btn_setup_1 = self.vc.findViewById("android:id/button1")
- btn_settings = self.vc.findViewById('com.quicinc.vellamo:id/main_toolbar_wheel')
- btn_animations = self.vc.findViewWithText(u'Make Vellamo even more beautiful')
- warn_msg = self.vc.findViewWithText(u'This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer.')
- continue_btn = self.vc.findViewWithText(u'CONTINUE')
+ btn_settings = self.vc.findViewById(
+ "com.quicinc.vellamo:id/main_toolbar_wheel"
+ )
+ btn_animations = self.vc.findViewWithText(
+ u"Make Vellamo even more beautiful"
+ )
+ warn_msg = self.vc.findViewWithText(
+ u"This app was built for an older version of Android and may not work properly. Try checking for updates, or contact the developer."
+ )
+ continue_btn = self.vc.findViewWithText(u"CONTINUE")
if btn_setup_1:
# Accept Vellamo EULA
btn_setup_1.touch()
elif warn_msg:
self.logger.info("Older version warning popped up")
- warning_ok_btn = self.vc.findViewWithTextOrRaise(u'OK')
+ warning_ok_btn = self.vc.findViewWithTextOrRaise(u"OK")
warning_ok_btn.touch()
elif continue_btn:
continue_btn.touch()
@@ -72,19 +81,21 @@
self.logger.info("Benchmark started now")
- chapters = ['Browser', 'Multicore', 'Metal']
+ chapters = ["Browser", "Multicore", "Metal"]
for chapter in chapters:
self.choose_chapter(chapter)
# Start benchmark
self.dump_always()
try:
- gotit_button = self.vc.findViewWithText(u'GOT IT')
+ gotit_button = self.vc.findViewWithText(u"GOT IT")
if gotit_button:
gotit_button.touch()
except ViewNotFoundException:
- self.report_result('vellamo3-%s' % chapter, 'fail')
- self.logger.error('Start button for chapter %s NOT found, moving to the next chapter...')
+ self.report_result("vellamo3-%s" % chapter, "fail")
+ self.logger.error(
+ "Start button for chapter %s NOT found, moving to the next chapter..."
+ )
continue
# Wait while Vellamo is running benchmark
@@ -93,7 +104,9 @@
time.sleep(1)
try:
self.dump_always()
- goback_btn = self.vc.findViewById("com.quicinc.vellamo:id/main_toolbar_goback_button")
+ goback_btn = self.vc.findViewById(
+ "com.quicinc.vellamo:id/main_toolbar_goback_button"
+ )
if goback_btn:
goback_btn.touch()
time.sleep(5)
@@ -111,43 +124,70 @@
self.device.press("KEYCODE_BACK")
def parseResult(self):
- raw_result_file = '%s/chapterscores-itr%s.json' % (self.config['output'], self.config['itr'])
- self.call_adb('pull /data/data/com.quicinc.vellamo/files/chapterscores.json %s' % raw_result_file)
- default_unit = 'Points'
+ raw_result_file = "%s/chapterscores-itr%s.json" % (
+ self.config["output"],
+ self.config["itr"],
+ )
+ self.call_adb(
+ "pull /data/data/com.quicinc.vellamo/files/chapterscores.json %s"
+ % raw_result_file
+ )
+ default_unit = "Points"
# This is one-line file, read it in a whole
- fileopen = open(raw_result_file, 'r')
+ fileopen = open(raw_result_file, "r")
jsoncontent = json.load(fileopen)
- result_flag = 'benchmark_results'
- chapter_flag = 'chapter_name'
+ result_flag = "benchmark_results"
+ chapter_flag = "chapter_name"
total_score = 0
for item in jsoncontent:
if result_flag and chapter_flag in item.keys():
chapter = item[chapter_flag]
chapter_total = 0
- self.logger.info('%s test result found in category: %s' % (str(len(item[result_flag])), chapter))
+ self.logger.info(
+ "%s test result found in category: %s"
+ % (str(len(item[result_flag])), chapter)
+ )
for elem in item[result_flag]:
- if 'failed' in elem.keys() and 'id' in elem.keys() and 'score' in elem.keys():
+ if (
+ "failed" in elem.keys()
+ and "id" in elem.keys()
+ and "score" in elem.keys()
+ ):
# Pick up the result
- if elem['failed'] is False:
- result = 'pass'
+ if elem["failed"] is False:
+ result = "pass"
else:
- result = 'fail'
+ result = "fail"
# Pick up the full test name
- testcase = chapter + '-' + elem['id']
+ testcase = chapter + "-" + elem["id"]
# Pick up the test score
- score = elem['score']
+ score = elem["score"]
# Submit the result to LAVA
- self.report_result("vellamo3-" + testcase, result, str(score), default_unit)
+ self.report_result(
+ "vellamo3-" + testcase, result, str(score), default_unit
+ )
chapter_total = chapter_total + score
else:
- print('Corrupted test result found, please check it manually.')
- print('A valid test result must contain id, score and pass/fail status.')
+ print("Corrupted test result found, please check it manually.")
+ print(
+ "A valid test result must contain id, score and pass/fail status."
+ )
- self.report_result("vellamo3-" + chapter + "-total", "pass", str(chapter_total), default_unit)
+ self.report_result(
+ "vellamo3-" + chapter + "-total",
+ "pass",
+ str(chapter_total),
+ default_unit,
+ )
total_score = total_score + chapter_total
else:
- print('Cannot find %s or %s in test result dictionary. Please check it manually.' % (result_flag, chapter_flag))
+ print(
+ "Cannot find %s or %s in test result dictionary. Please check it manually."
+ % (result_flag, chapter_flag)
+ )
fileopen.close()
- self.report_result("vellamo3-total-score", "pass", str(total_score), default_unit)
+ self.report_result(
+ "vellamo3-total-score", "pass", str(total_score), default_unit
+ )
diff --git a/automated/android/multinode/tradefed/sts_util.py b/automated/android/multinode/tradefed/sts_util.py
index a90ce70..80bd27d 100644
--- a/automated/android/multinode/tradefed/sts_util.py
+++ b/automated/android/multinode/tradefed/sts_util.py
@@ -30,9 +30,7 @@
affect the results in any way.
"""
- def __init__(
- self, device_serial_or_address, logger, device_access_timeout_secs=60
- ):
+ def __init__(self, device_serial_or_address, logger, device_access_timeout_secs=60):
"""Construct a StsUtil instance for a TradeFed invocation.
Args:
@@ -134,9 +132,7 @@
test_result_tree.write(test_result_path)
# Fix the fingerprint in the failures overview HTML.
- with open(
- test_result_failures_path_orig, "r"
- ) as test_result_failures_file:
+ with open(test_result_failures_path_orig, "r") as test_result_failures_file:
test_result_failures = test_result_failures_file.read().replace(
manipulated_fingerprint, self.device_fingerprint
)
diff --git a/automated/android/multinode/tradefed/tradefed-runner-multinode.py b/automated/android/multinode/tradefed/tradefed-runner-multinode.py
index a28f5af..1b984d6 100755
--- a/automated/android/multinode/tradefed/tradefed-runner-multinode.py
+++ b/automated/android/multinode/tradefed/tradefed-runner-multinode.py
@@ -11,68 +11,111 @@
import time
-sys.path.insert(0, '../../../lib/')
-sys.path.insert(1, '../../')
-import py_test_lib # nopep8
-from py_util_lib import call_shell_lib # nopep8
-import tradefed.result_parser as result_parser # nopep8
-from multinode.tradefed.utils import * # nopep8
-from multinode.tradefed.sts_util import StsUtil # nopep8
+sys.path.insert(0, "../../../lib/")
+sys.path.insert(1, "../../")
+import py_test_lib # nopep8
+from py_util_lib import call_shell_lib # nopep8
+import tradefed.result_parser as result_parser # nopep8
+from multinode.tradefed.utils import * # nopep8
+from multinode.tradefed.sts_util import StsUtil # nopep8
-OUTPUT = '%s/output' % os.getcwd()
-RESULT_FILE = '%s/result.txt' % OUTPUT
-TRADEFED_STDOUT = '%s/tradefed-stdout.txt' % OUTPUT
-TRADEFED_LOGCAT = '%s/tradefed-logcat-%s.txt' % (OUTPUT, '%s')
+OUTPUT = "%s/output" % os.getcwd()
+RESULT_FILE = "%s/result.txt" % OUTPUT
+TRADEFED_STDOUT = "%s/tradefed-stdout.txt" % OUTPUT
+TRADEFED_LOGCAT = "%s/tradefed-logcat-%s.txt" % (OUTPUT, "%s")
parser = argparse.ArgumentParser()
-parser.add_argument('-t', dest='TEST_PARAMS', required=True,
- help="TradeFed shell test parameters")
-parser.add_argument('-u', dest='TEST_RETRY_PARAMS', required=False,
- help="TradeFed shell test parameters for TradeFed session retry")
-parser.add_argument('-i', dest='MAX_NUM_RUNS', required=False, default=10, type=int,
- help="Maximum number of TradeFed runs. Based on the first run, retries can be \
- triggered to stabilize the results of the test suite.")
-parser.add_argument('-n', dest='RUNS_IF_UNCHANGED', required=False, default=3, type=int,
- help="Number of runs while the number of failures and completed modules does \
- not change. Results are considered to be stable after this number of runs.")
-parser.add_argument('-p', dest='TEST_PATH', required=True,
- help="path to TradeFed package top directory")
-parser.add_argument('-s', dest='STATE_CHECK_FREQUENCY_SECS', required=False, default=60, type=int,
- help="Every STATE_CHECK_FREQUENCY_SECS seconds, the state of connected devices is \
+parser.add_argument(
+ "-t", dest="TEST_PARAMS", required=True, help="TradeFed shell test parameters"
+)
+parser.add_argument(
+ "-u",
+ dest="TEST_RETRY_PARAMS",
+ required=False,
+ help="TradeFed shell test parameters for TradeFed session retry",
+)
+parser.add_argument(
+ "-i",
+ dest="MAX_NUM_RUNS",
+ required=False,
+ default=10,
+ type=int,
+ help="Maximum number of TradeFed runs. Based on the first run, retries can be \
+ triggered to stabilize the results of the test suite.",
+)
+parser.add_argument(
+ "-n",
+ dest="RUNS_IF_UNCHANGED",
+ required=False,
+ default=3,
+ type=int,
+ help="Number of runs while the number of failures and completed modules does \
+ not change. Results are considered to be stable after this number of runs.",
+)
+parser.add_argument(
+ "-p", dest="TEST_PATH", required=True, help="path to TradeFed package top directory"
+)
+parser.add_argument(
+ "-s",
+ dest="STATE_CHECK_FREQUENCY_SECS",
+ required=False,
+ default=60,
+ type=int,
+ help="Every STATE_CHECK_FREQUENCY_SECS seconds, the state of connected devices is \
checked and the last few lines TradeFed output are printed. Increase this time \
- for large test suite runs to reduce the noise in the LAVA logs.")
-parser.add_argument('-r', dest='RESULTS_FORMAT', required=False,
- default=result_parser.TradefedResultParser.AGGREGATED,
- choices=[result_parser.TradefedResultParser.AGGREGATED,
- result_parser.TradefedResultParser.ATOMIC],
- help="The format of the saved results. 'aggregated' means number of \
+ for large test suite runs to reduce the noise in the LAVA logs.",
+)
+parser.add_argument(
+ "-r",
+ dest="RESULTS_FORMAT",
+ required=False,
+ default=result_parser.TradefedResultParser.AGGREGATED,
+ choices=[
+ result_parser.TradefedResultParser.AGGREGATED,
+ result_parser.TradefedResultParser.ATOMIC,
+ ],
+ help="The format of the saved results. 'aggregated' means number of \
passed and failed tests are recorded for each module. 'atomic' means \
- each test result is recorded separately")
-parser.add_argument('-m', dest='DEVICE_WORKER_MAPPING_FILE', required=True,
- help="File listing adb devices to be used for testing. For devices connected \
+ each test result is recorded separately",
+)
+parser.add_argument(
+ "-m",
+ dest="DEVICE_WORKER_MAPPING_FILE",
+ required=True,
+ help='File listing adb devices to be used for testing. For devices connected \
via adb TCP/IP, the LAVA worker job id should be given as second column, \
separated by semicolon. Individual lines in that files will look like \
- \"some_device_serial\" or \"some_device_ip;worker_host_id\"")
+ "some_device_serial" or "some_device_ip;worker_host_id"',
+)
# The total number of failed test cases to be printed for this job
# Print too much failures would cause the lava job timed out
# Default to not print any failures
-parser.add_argument('-f', dest='FAILURES_PRINTED', type=int,
- required=False, default=0,
- help="Specify the number of failed test cases to be\
- printed, 0 means not print any failures.")
-parser.add_argument('--userdata_image_file', dest='USERDATA_IMAGE_FILE',
- required=False, help="Userdata image file that will be \
+parser.add_argument(
+ "-f",
+ dest="FAILURES_PRINTED",
+ type=int,
+ required=False,
+ default=0,
+ help="Specify the number of failed test cases to be\
+ printed, 0 means not print any failures.",
+)
+parser.add_argument(
+ "--userdata_image_file",
+ dest="USERDATA_IMAGE_FILE",
+ required=False,
+ help="Userdata image file that will be \
used to reset devices to a clean state before starting \
- TradeFed reruns.")
+ TradeFed reruns.",
+)
args = parser.parse_args()
if os.path.exists(OUTPUT):
- suffix = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
- shutil.move(OUTPUT, '%s_%s' % (OUTPUT, suffix))
+ suffix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ shutil.move(OUTPUT, "%s_%s" % (OUTPUT, suffix))
os.makedirs(OUTPUT)
# Setup logger.
@@ -83,7 +126,7 @@
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
-formatter = logging.Formatter('%(asctime)s - %(name)s: %(levelname)s: %(message)s')
+formatter = logging.Formatter("%(asctime)s - %(name)s: %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
@@ -93,8 +136,11 @@
for line in filter(None, (line.rstrip() for line in mappingFile)):
deviceToWorker = line.split(sep=";")
device_address = deviceToWorker[0]
- worker_job_id = (None if (len(deviceToWorker) == 1 or not deviceToWorker[1])
- else deviceToWorker[1])
+ worker_job_id = (
+ None
+ if (len(deviceToWorker) == 1 or not deviceToWorker[1])
+ else deviceToWorker[1]
+ )
devices.append(
Device(
serial_or_address=device_address,
@@ -107,12 +153,15 @@
logger.error("Mapping file cannot be opened: %s" % args.DEVICE_WORKER_MAPPING_FILE)
sys.exit(1)
-logger.info('Configured devices:')
+logger.info("Configured devices:")
for device in devices:
if device.worker_job_id is None:
logger.info("%s (locally connected via USB)" % device.serial_or_address)
else:
- logger.info("%s (remote worker job id: %s)" % (device.serial_or_address, device.worker_job_id))
+ logger.info(
+ "%s (remote worker job id: %s)"
+ % (device.serial_or_address, device.worker_job_id)
+ )
def release_all_devices():
@@ -127,44 +176,55 @@
sys.exit(exit_code)
-tradefed_stdout = open(TRADEFED_STDOUT, 'w')
+tradefed_stdout = open(TRADEFED_STDOUT, "w")
-logger.info('Test params: %s' % args.TEST_PARAMS)
-logger.info('Starting TradeFed shell and waiting for device detection...')
+logger.info("Test params: %s" % args.TEST_PARAMS)
+logger.info("Starting TradeFed shell and waiting for device detection...")
command = None
prompt = None
results_heading_re = None
results_line_re = None
-valid_test_paths = ['android-cts', 'android-gts', 'android-sts']
+valid_test_paths = ["android-cts", "android-gts", "android-sts"]
if args.TEST_PATH in valid_test_paths:
suite = args.TEST_PATH[-3:]
command = "android-%s/tools/%s-tradefed" % (suite, suite)
prompt = "%s-tf >" % suite
- results_heading_re = re.compile(r'Session\s+Pass\s+Fail\s+Modules\s+Complete\s+Result Directory\s+Test Plan\s+Device serial\(s\)\s+Build ID\s+Product')
- results_line_re_without_session = r'\s+(\d+\s+){3,3}(of)\s+\d+\s+'
+ results_heading_re = re.compile(
+ r"Session\s+Pass\s+Fail\s+Modules\s+Complete\s+Result Directory\s+Test Plan\s+Device serial\(s\)\s+Build ID\s+Product"
+ )
+ results_line_re_without_session = r"\s+(\d+\s+){3,3}(of)\s+\d+\s+"
if command is None:
cleanup_and_exit(1, "Not supported path: %s" % args.TEST_PATH)
-if args.TEST_PATH == 'android-sts':
+if args.TEST_PATH == "android-sts":
stsUtil = StsUtil(devices[0].serial_or_address, logger)
# Locate and parse test result.
-result_dir_parent = os.path.join(args.TEST_PATH, 'results')
+result_dir_parent = os.path.join(args.TEST_PATH, "results")
def last_result_dir():
- latest_subdir = next(reversed(sorted([
- d for d in os.listdir(result_dir_parent)
- if os.path.isdir(os.path.join(result_dir_parent, d))
- ])))
+ latest_subdir = next(
+ reversed(
+ sorted(
+ [
+ d
+ for d in os.listdir(result_dir_parent)
+ if os.path.isdir(os.path.join(result_dir_parent, d))
+ ]
+ )
+ )
+ )
return os.path.join(result_dir_parent, latest_subdir)
-device_detected_re = re.compile(r'DeviceManager: Detected new device ')
-device_detected_search_re = re.compile(r'DeviceManager: Detected new device .*$', flags=re.M)
+device_detected_re = re.compile(r"DeviceManager: Detected new device ")
+device_detected_search_re = re.compile(
+ r"DeviceManager: Detected new device .*$", flags=re.M
+)
tradefed_start_retry_count = 5
all_devices_names = set(device.serial_or_address for device in devices)
for tradefed_start_retry in range(tradefed_start_retry_count):
@@ -175,43 +235,56 @@
# Find and parse output lines following this pattern:
# 04-23 12:30:33 I/DeviceManager: Detected new device serial_or_address
child.expect(device_detected_re, timeout=30)
- output_lines = subprocess.check_output(['tail', TRADEFED_STDOUT]).decode("utf-8")
- matches = [match[1].strip() for match in
- (device_detected_re.split(line_match)
- for line_match in device_detected_search_re.findall(output_lines))
- if len(match) == 2 and match[1]]
+ output_lines = subprocess.check_output(["tail", TRADEFED_STDOUT]).decode(
+ "utf-8"
+ )
+ matches = [
+ match[1].strip()
+ for match in (
+ device_detected_re.split(line_match)
+ for line_match in device_detected_search_re.findall(output_lines)
+ )
+ if len(match) == 2 and match[1]
+ ]
for match in matches:
try:
devices_to_detect.remove(match)
except KeyError:
if match not in all_devices_names:
- logger.debug('Unexpected device detected: %s' % match)
+ logger.debug("Unexpected device detected: %s" % match)
except (pexpect.TIMEOUT, pexpect.EOF) as e:
- logger.warning('TradeFed did not detect all devices. Checking device availability and restarting TradeFed...')
+ logger.warning(
+ "TradeFed did not detect all devices. Checking device availability and restarting TradeFed..."
+ )
print(e)
child.terminate(force=True)
- missing_devices = [device for device in devices
- if device.serial_or_address in devices_to_detect]
+ missing_devices = [
+ device
+ for device in devices
+ if device.serial_or_address in devices_to_detect
+ ]
for device in missing_devices:
if not device.ensure_available(logger=logger):
cleanup_and_exit(
1,
- 'adb device %s is not available and reconnection attempts failed. Aborting.'
- % device.serial_or_address)
+ "adb device %s is not available and reconnection attempts failed. Aborting."
+ % device.serial_or_address,
+ )
if devices_to_detect:
cleanup_and_exit(
1,
- 'TradeFed did not detect all available devices after %s retries. Aborting.'
- % tradefed_start_retry_count)
+ "TradeFed did not detect all available devices after %s retries. Aborting."
+ % tradefed_start_retry_count,
+ )
-logger.info('Starting TradeFed shell test.')
+logger.info("Starting TradeFed shell test.")
try:
child.expect(prompt, timeout=60)
child.sendline(args.TEST_PARAMS)
except pexpect.TIMEOUT:
- result = 'lunch-tf-shell fail'
+ result = "lunch-tf-shell fail"
py_test_lib.add_result(RESULT_FILE, result)
retry_check = RetryCheck(args.MAX_NUM_RUNS, args.RUNS_IF_UNCHANGED)
@@ -229,51 +302,59 @@
tradefed_session_id = 0
result_summary = None
while child.isalive():
- subprocess.run('echo')
- subprocess.run(['echo', '--- line break ---'])
- logger.info('Checking adb connectivity...')
+ subprocess.run("echo")
+ subprocess.run(["echo", "--- line break ---"])
+ logger.info("Checking adb connectivity...")
for device in devices:
device.ensure_available(logger=logger)
num_available_devices = sum(device.is_available() for device in devices)
if num_available_devices < len(devices):
- logger.debug('Some devices are lost. Dumping state of adb/USB devices.')
- child.sendline('dump logs')
+ logger.debug("Some devices are lost. Dumping state of adb/USB devices.")
+ child.sendline("dump logs")
call_shell_lib("adb_debug_info")
logger.debug('"adb devices" output')
- subprocess.run(['adb', 'devices'])
+ subprocess.run(["adb", "devices"])
if num_available_devices == 0:
- logger.error('adb connection to all devices lost!! Will wait for 5 minutes and '
- 'terminating TradeFed shell test!')
+ logger.error(
+ "adb connection to all devices lost!! Will wait for 5 minutes and "
+ "terminating TradeFed shell test!"
+ )
time.sleep(300)
child.terminate(force=True)
- result = 'check-adb-connectivity fail'
+ result = "check-adb-connectivity fail"
py_test_lib.add_result(RESULT_FILE, result)
fail_to_complete = True
break
- logger.info("Currently available devices: %s" %
- [device.serial_or_address for device in devices if device.is_available()])
+ logger.info(
+ "Currently available devices: %s"
+ % [device.serial_or_address for device in devices if device.is_available()]
+ )
# Check if all tests finished every minute.
- m = child.expect(['ResultReporter: Full Result:',
- 'ConsoleReporter:.*Test run failed to complete.',
- pexpect.TIMEOUT],
- timeout=args.STATE_CHECK_FREQUENCY_SECS)
+ m = child.expect(
+ [
+ "ResultReporter: Full Result:",
+ "ConsoleReporter:.*Test run failed to complete.",
+ pexpect.TIMEOUT,
+ ],
+ timeout=args.STATE_CHECK_FREQUENCY_SECS,
+ )
# TradeFed run not finished yet, continue to wait.
if m == 2:
# Flush pexpect input buffer.
- child.expect(['.+', pexpect.TIMEOUT, pexpect.EOF], timeout=1)
- logger.info('Printing tradefed recent output...')
- subprocess.run(['tail', TRADEFED_STDOUT])
+ child.expect([".+", pexpect.TIMEOUT, pexpect.EOF], timeout=1)
+ logger.info("Printing tradefed recent output...")
+ subprocess.run(["tail", TRADEFED_STDOUT])
continue
# A module or test run failed to complete. This is a case for TradeFed retry
if m == 1:
fail_to_complete = True
- logger.warning('TradeFed reported failure to complete a module.')
+ logger.warning("TradeFed reported failure to complete a module.")
# TradeFed didn't report completion yet, so keep going.
continue
@@ -283,38 +364,53 @@
# Once all tests and reruns finished, exit from TradeFed shell to throw EOF,
# which sets child.isalive() to false.
try:
- logger.debug('Checking TradeFed session result...')
+ logger.debug("Checking TradeFed session result...")
child.expect(prompt, timeout=60)
- child.sendline('list results')
+ child.sendline("list results")
child.expect(results_heading_re, timeout=60)
- results_line_re = \
- re.compile('(%s)%s' %
- (str(tradefed_session_id), # Expect the current session ID in the output
- results_line_re_without_session))
+ results_line_re = re.compile(
+ "(%s)%s"
+ % (
+ str(tradefed_session_id), # Expect the current session ID in the output
+ results_line_re_without_session,
+ )
+ )
child.expect(results_line_re, timeout=60)
- output_lines = subprocess.check_output(['tail', TRADEFED_STDOUT])
+ output_lines = subprocess.check_output(["tail", TRADEFED_STDOUT])
output_lines_match = results_line_re.search(str(output_lines))
if output_lines_match is None:
cleanup_and_exit(
1,
- 'Unexpected TradeFed output. Could not find expected results line for the current '
- 'TradeFed session (%s)' % str(tradefed_session_id))
+ "Unexpected TradeFed output. Could not find expected results line for the current "
+ "TradeFed session (%s)" % str(tradefed_session_id),
+ )
# Expected column contents: see results_heading_re
- result_line_columns = re.split(r'\s+', output_lines_match.group())
+ result_line_columns = re.split(r"\s+", output_lines_match.group())
pass_count = result_line_columns[1]
failure_count = result_line_columns[2]
modules_completed = result_line_columns[3]
modules_total = result_line_columns[5]
timestamp = result_line_columns[6]
- result_summary = ResultSummary(failure_count, modules_completed, modules_total, timestamp)
+ result_summary = ResultSummary(
+ failure_count, modules_completed, modules_total, timestamp
+ )
retry_check.post_result(result_summary)
- logger.info('Finished TradeFed session %s. %s of %s modules completed with %s passed '
- 'tests and %s failures.'
- % (tradefed_session_id, str(modules_completed),
- str(modules_total), str(pass_count), str(failure_count)))
+ logger.info(
+ "Finished TradeFed session %s. %s of %s modules completed with %s passed "
+ "tests and %s failures."
+ % (
+ tradefed_session_id,
+ str(modules_completed),
+ str(modules_total),
+ str(pass_count),
+ str(failure_count),
+ )
+ )
except (pexpect.TIMEOUT, pexpect.EOF) as e:
- logger.error('Unexpected TradeFed output/behavior while trying to fetch test run results. '
- 'Printing the exception and killing the TradeFed process...')
+ logger.error(
+ "Unexpected TradeFed output/behavior while trying to fetch test run results. "
+ "Printing the exception and killing the TradeFed process..."
+ )
print(e)
child.terminate(force=True)
fail_to_complete = True
@@ -326,15 +422,17 @@
# with 'release-keys'.
# That actually breaks the TradeFed retry feature, as the stored fingerprint
# won't match anymore with the fingerprint reported by the device.
- if suite == 'sts':
+ if suite == "sts":
try:
stsUtil.fix_result_file_fingerprints(last_result_dir())
except subprocess.CalledProcessError as e:
fail_to_complete = True
print(e)
- logger.error('Could not apply workarounds for STS due to an '
- 'adb-related error. Cannot continue with TradeFed '
- 'reruns; results might be incomplete.')
+ logger.error(
+ "Could not apply workarounds for STS due to an "
+ "adb-related error. Cannot continue with TradeFed "
+ "reruns; results might be incomplete."
+ )
child.terminate(force=True)
break
@@ -343,12 +441,16 @@
# output parsing more reliable.
if not result_summary.was_successful() or fail_to_complete:
if args.TEST_RETRY_PARAMS is None:
- logger.debug('NOT retrying TradeFed session as TEST_RETRY_PARAMS is not defined.')
+ logger.debug(
+ "NOT retrying TradeFed session as TEST_RETRY_PARAMS is not defined."
+ )
elif not retry_check.should_continue():
- logger.info('NOT retrying TradeFed session as maximum number of retries is reached.')
+ logger.info(
+ "NOT retrying TradeFed session as maximum number of retries is reached."
+ )
else:
- logger.info('Retrying with results of session %s' % tradefed_session_id)
- logger.info('First resetting the devices to a clean state...')
+ logger.info("Retrying with results of session %s" % tradefed_session_id)
+ logger.info("First resetting the devices to a clean state...")
unavailable_devices = []
for device in devices:
@@ -356,19 +458,23 @@
unavailable_devices += [device.serial_or_address]
if unavailable_devices:
logger.warning(
- 'Following devices were not reset successfully '
- 'or are not yet available again: %s'
- % ', '.join(unavailable_devices)
+ "Following devices were not reset successfully "
+ "or are not yet available again: %s"
+ % ", ".join(unavailable_devices)
)
try:
child.expect(prompt, timeout=60)
- child.sendline('%s --retry %s' % (args.TEST_RETRY_PARAMS, str(tradefed_session_id)))
+ child.sendline(
+ "%s --retry %s" % (args.TEST_RETRY_PARAMS, str(tradefed_session_id))
+ )
tradefed_session_id += 1
- fail_to_complete = False # Reset as we have a new chance to complete.
+ fail_to_complete = False # Reset as we have a new chance to complete.
except pexpect.TIMEOUT:
print(e)
- logger.error('Timeout while starting a TradeFed retry. Force killing the child process...')
+ logger.error(
+ "Timeout while starting a TradeFed retry. Force killing the child process..."
+ )
child.terminate(force=True)
fail_to_complete = True
break
@@ -377,26 +483,28 @@
try:
child.expect(prompt, timeout=60)
logger.debug('Sending "exit" command to TF shell...')
- child.sendline('exit')
+ child.sendline("exit")
child.expect(pexpect.EOF, timeout=60)
- logger.debug('Child process ended properly.')
+ logger.debug("Child process ended properly.")
except pexpect.TIMEOUT as e:
# The Tradefed shell is hanging longer than expected for some reason.
# We need to kill it, but that most likely doesn't affect the results of
# previously finished test runs, so don't report failure.
print(e)
- logger.debug('Timeout while trying to exit cleanly, force killing child process...')
+ logger.debug(
+ "Timeout while trying to exit cleanly, force killing child process..."
+ )
child.terminate(force=True)
break
tradefed_stdout.close()
if fail_to_complete:
- py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run fail')
+ py_test_lib.add_result(RESULT_FILE, "tradefed-test-run fail")
else:
- py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run pass')
+ py_test_lib.add_result(RESULT_FILE, "tradefed-test-run pass")
-logger.info('Tradefed test finished')
+logger.info("Tradefed test finished")
# Log only results of the last run. It also lists all successful tests from previous runs.
parser = result_parser.TradefedResultParser(RESULT_FILE)
@@ -406,8 +514,9 @@
parser_success = parser.parse_recursively(last_result_dir())
if not parser_success:
logger.warning(
- 'Failed to parse the TradeFed logs. Test result listing in the LAVA '
- 'logs will be incomplete.')
+ "Failed to parse the TradeFed logs. Test result listing in the LAVA "
+ "logs will be incomplete."
+ )
# Report failure if not all test modules were completed, if the test result
# files seem broken or incomplete or if Tradefed ran into a unknown state.
diff --git a/automated/android/multinode/tradefed/utils.py b/automated/android/multinode/tradefed/utils.py
index 6fab4d3..35cc87f 100644
--- a/automated/android/multinode/tradefed/utils.py
+++ b/automated/android/multinode/tradefed/utils.py
@@ -12,9 +12,7 @@
class Device:
- tcpip_device_re = re.compile(
- r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$"
- )
+ tcpip_device_re = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$")
EXEC_IN_LAVA = shutil.which("lava-send") is not None
def __init__(
@@ -30,8 +28,7 @@
)
self.logcat_output_file = open(logcat_output_filename, "w")
self.logcat = subprocess.Popen(
- ["adb", "-s", serial_or_address, "logcat"],
- stdout=self.logcat_output_file,
+ ["adb", "-s", serial_or_address, "logcat"], stdout=self.logcat_output_file,
)
self.worker_job_id = worker_job_id
self.worker_handshake_iteration = 1
@@ -58,8 +55,7 @@
self._is_available = False
logger.debug(
- "adb connection to %s lost! Trying to reconnect..."
- % self.serial_or_address
+ "adb connection to %s lost! Trying to reconnect..." % self.serial_or_address
)
# Tell the hosting worker that something is broken
@@ -101,7 +97,8 @@
"OK",
],
timeout=timeout_secs,
- ).returncode == 0
+ ).returncode
+ == 0
)
except subprocess.TimeoutExpired as e:
print(e)
@@ -154,7 +151,8 @@
subprocess.run(
["adb", "connect", self.serial_or_address],
timeout=reconnectTimeoutSecs,
- ).returncode != 0
+ ).returncode
+ != 0
):
return False
except subprocess.TimeoutExpired:
@@ -195,13 +193,7 @@
else:
try:
subprocess.run(
- [
- "adb",
- "-s",
- self.serial_or_address,
- "reboot",
- "bootloader",
- ],
+ ["adb", "-s", self.serial_or_address, "reboot", "bootloader",],
timeout=commandTimeoutSecs,
)
except subprocess.TimeoutExpired:
@@ -260,10 +252,7 @@
[
"lava-wait",
"worker-sync-%s-%s"
- % (
- self.worker_job_id,
- str(self.worker_handshake_iteration),
- ),
+ % (self.worker_job_id, str(self.worker_handshake_iteration),),
]
)
# TODO could check result variable from MultiNode cache
@@ -301,14 +290,13 @@
def should_continue(self):
return (
- self.current_retry < self.total_max_retries and self.current_unchanged < self.retries_if_unchanged
+ self.current_retry < self.total_max_retries
+ and self.current_unchanged < self.retries_if_unchanged
)
class ResultSummary:
- def __init__(
- self, failure_count, modules_completed, modules_total, timestamp
- ):
+ def __init__(self, failure_count, modules_completed, modules_total, timestamp):
self.failure_count = int(failure_count)
self.modules_completed = int(modules_completed)
self.modules_total = int(modules_total)
diff --git a/automated/android/noninteractive-tradefed/tradefed-runner.py b/automated/android/noninteractive-tradefed/tradefed-runner.py
index c2bce5c..889444c 100755
--- a/automated/android/noninteractive-tradefed/tradefed-runner.py
+++ b/automated/android/noninteractive-tradefed/tradefed-runner.py
@@ -12,21 +12,21 @@
import logging
import time
-sys.path.insert(0, '../../lib/')
+sys.path.insert(0, "../../lib/")
import py_test_lib # nopep8
-OUTPUT = '%s/output' % os.getcwd()
-RESULT_FILE = '%s/result.txt' % OUTPUT
-TRADEFED_STDOUT = '%s/tradefed-stdout.txt' % OUTPUT
-TRADEFED_LOGCAT = '%s/tradefed-logcat.txt' % OUTPUT
-TEST_PARAMS = ''
-AGGREGATED = 'aggregated'
-ATOMIC = 'atomic'
+OUTPUT = "%s/output" % os.getcwd()
+RESULT_FILE = "%s/result.txt" % OUTPUT
+TRADEFED_STDOUT = "%s/tradefed-stdout.txt" % OUTPUT
+TRADEFED_LOGCAT = "%s/tradefed-logcat.txt" % OUTPUT
+TEST_PARAMS = ""
+AGGREGATED = "aggregated"
+ATOMIC = "atomic"
def result_parser(xml_file, result_format):
- etree_file = open(xml_file, 'rb')
+ etree_file = open(xml_file, "rb")
etree_content = etree_file.read()
rx = re.compile("&#([0-9]+);|&#x([0-9a-fA-F]+);")
endpos = len(etree_content)
@@ -43,10 +43,12 @@
else:
num = int(m.group(2), 16)
# #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]
- if not(num in (0x9, 0xA, 0xD)
- or 0x20 <= num <= 0xD7FF
- or 0xE000 <= num <= 0xFFFD
- or 0x10000 <= num <= 0x10FFFF):
+ if not (
+ num in (0x9, 0xA, 0xD)
+ or 0x20 <= num <= 0xD7FF
+ or 0xE000 <= num <= 0xFFFD
+ or 0x10000 <= num <= 0x10FFFF
+ ):
etree_content = etree_content[:mstart] + etree_content[mend:]
endpos = len(etree_content)
# next time search again from the same position as this time
@@ -59,67 +61,71 @@
try:
root = ET.fromstring(etree_content)
except ET.ParseError as e:
- logger.error('xml.etree.ElementTree.ParseError: %s' % e)
- logger.info('Please Check %s manually' % xml_file)
+ logger.error("xml.etree.ElementTree.ParseError: %s" % e)
+ logger.info("Please Check %s manually" % xml_file)
sys.exit(1)
- logger.info('Test modules in %s: %s'
- % (xml_file, str(len(root.findall('Module')))))
+ logger.info("Test modules in %s: %s" % (xml_file, str(len(root.findall("Module")))))
failures_count = 0
- for elem in root.findall('Module'):
+ for elem in root.findall("Module"):
# Naming: Module Name + Test Case Name + Test Name
- if 'abi' in elem.attrib.keys():
- module_name = '.'.join([elem.attrib['abi'], elem.attrib['name']])
+ if "abi" in elem.attrib.keys():
+ module_name = ".".join([elem.attrib["abi"], elem.attrib["name"]])
else:
- module_name = elem.attrib['name']
+ module_name = elem.attrib["name"]
if result_format == AGGREGATED:
- tests_executed = len(elem.findall('.//Test'))
+ tests_executed = len(elem.findall(".//Test"))
tests_passed = len(elem.findall('.//Test[@result="pass"]'))
tests_failed = len(elem.findall('.//Test[@result="fail"]'))
- result = '%s_executed pass %s' % (module_name, str(tests_executed))
+ result = "%s_executed pass %s" % (module_name, str(tests_executed))
py_test_lib.add_result(RESULT_FILE, result)
- result = '%s_passed pass %s' % (module_name, str(tests_passed))
+ result = "%s_passed pass %s" % (module_name, str(tests_passed))
py_test_lib.add_result(RESULT_FILE, result)
- failed_result = 'pass'
+ failed_result = "pass"
if tests_failed > 0:
- failed_result = 'fail'
- result = '%s_failed %s %s' % (module_name, failed_result,
- str(tests_failed))
+ failed_result = "fail"
+ result = "%s_failed %s %s" % (module_name, failed_result, str(tests_failed))
py_test_lib.add_result(RESULT_FILE, result)
# output result to show if the module is done or not
- tests_done = elem.get('done', 'false')
- if tests_done == 'false':
- result = '%s_done fail' % module_name
+ tests_done = elem.get("done", "false")
+ if tests_done == "false":
+ result = "%s_done fail" % module_name
else:
- result = '%s_done pass' % module_name
+ result = "%s_done pass" % module_name
py_test_lib.add_result(RESULT_FILE, result)
if args.FAILURES_PRINTED > 0 and failures_count < args.FAILURES_PRINTED:
# print failed test cases for debug
- test_cases = elem.findall('.//TestCase')
+ test_cases = elem.findall(".//TestCase")
for test_case in test_cases:
failed_tests = test_case.findall('.//Test[@result="fail"]')
for failed_test in failed_tests:
- test_name = '%s/%s.%s' % (module_name,
- test_case.get("name"),
- failed_test.get("name"))
- failures = failed_test.findall('.//Failure')
- failure_msg = ''
+ test_name = "%s/%s.%s" % (
+ module_name,
+ test_case.get("name"),
+ failed_test.get("name"),
+ )
+ failures = failed_test.findall(".//Failure")
+ failure_msg = ""
for failure in failures:
- failure_msg = '%s \n %s' % (failure_msg,
- failure.get('message'))
+ failure_msg = "%s \n %s" % (
+ failure_msg,
+ failure.get("message"),
+ )
- logger.info('%s %s' % (test_name, failure_msg.strip()))
+ logger.info("%s %s" % (test_name, failure_msg.strip()))
failures_count = failures_count + 1
if failures_count > args.FAILURES_PRINTED:
- logger.info('There are more than %d test cases '
- 'failed, the output for the rest '
- 'failed test cases will be '
- 'skipped.' % (args.FAILURES_PRINTED))
+ logger.info(
+ "There are more than %d test cases "
+ "failed, the output for the rest "
+ "failed test cases will be "
+ "skipped." % (args.FAILURES_PRINTED)
+ )
# break the for loop of failed_tests
break
if failures_count > args.FAILURES_PRINTED:
@@ -127,64 +133,78 @@
break
if result_format == ATOMIC:
- test_cases = elem.findall('.//TestCase')
+ test_cases = elem.findall(".//TestCase")
for test_case in test_cases:
- tests = test_case.findall('.//Test')
+ tests = test_case.findall(".//Test")
for atomic_test in tests:
atomic_test_result = atomic_test.get("result")
- atomic_test_name = "%s/%s.%s" % (module_name,
- test_case.get("name"),
- atomic_test.get("name"))
+ atomic_test_name = "%s/%s.%s" % (
+ module_name,
+ test_case.get("name"),
+ atomic_test.get("name"),
+ )
py_test_lib.add_result(
- RESULT_FILE, "%s %s" % (atomic_test_name,
- atomic_test_result))
+ RESULT_FILE, "%s %s" % (atomic_test_name, atomic_test_result)
+ )
parser = argparse.ArgumentParser()
-parser.add_argument('-t', dest='TEST_PARAMS', required=True,
- help="tradefed shell test parameters")
-parser.add_argument('-p', dest='TEST_PATH', required=True,
- help="path to tradefed package top directory")
-parser.add_argument('-r', dest='RESULTS_FORMAT', required=False,
- default=AGGREGATED, choices=[AGGREGATED, ATOMIC],
- help="The format of the saved results. 'aggregated' means number of \
+parser.add_argument(
+ "-t", dest="TEST_PARAMS", required=True, help="tradefed shell test parameters"
+)
+parser.add_argument(
+ "-p", dest="TEST_PATH", required=True, help="path to tradefed package top directory"
+)
+parser.add_argument(
+ "-r",
+ dest="RESULTS_FORMAT",
+ required=False,
+ default=AGGREGATED,
+ choices=[AGGREGATED, ATOMIC],
+ help="The format of the saved results. 'aggregated' means number of \
passed and failed tests are recorded for each module. 'atomic' means \
- each test result is recorded separately")
+ each test result is recorded separately",
+)
# The total number of failed test cases to be printed for this job
# Print too much failures would cause the lava job timed out
# Default to not print any failures
-parser.add_argument('-f', dest='FAILURES_PRINTED', type=int,
- required=False, default=0,
- help="Speciy the number of failed test cases to be\
- printed, 0 means not print any failures.")
+parser.add_argument(
+ "-f",
+ dest="FAILURES_PRINTED",
+ type=int,
+ required=False,
+ default=0,
+ help="Speciy the number of failed test cases to be\
+ printed, 0 means not print any failures.",
+)
args = parser.parse_args()
# TEST_PARAMS = args.TEST_PARAMS
if os.path.exists(OUTPUT):
- suffix = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
- shutil.move(OUTPUT, '%s_%s' % (OUTPUT, suffix))
+ suffix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ shutil.move(OUTPUT, "%s_%s" % (OUTPUT, suffix))
os.makedirs(OUTPUT)
# Setup logger.
# There might be an issue in lava/local dispatcher, most likely problem of
# pexpect. It prints the messages from print() last, not by sequence.
# Use logging and subprocess.call() to work around this.
-logger = logging.getLogger('Tradefed')
+logger = logging.getLogger("Tradefed")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
-formatter = logging.Formatter('%(asctime)s - %(name)s: %(levelname)s: %(message)s')
+formatter = logging.Formatter("%(asctime)s - %(name)s: %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
-tradefed_stdout = open(TRADEFED_STDOUT, 'w')
-tradefed_logcat_out = open(TRADEFED_LOGCAT, 'w')
-tradefed_logcat = subprocess.Popen(['adb', 'logcat'], stdout=tradefed_logcat_out)
+tradefed_stdout = open(TRADEFED_STDOUT, "w")
+tradefed_logcat_out = open(TRADEFED_LOGCAT, "w")
+tradefed_logcat = subprocess.Popen(["adb", "logcat"], stdout=tradefed_logcat_out)
-logger.info('Test params: %s' % args.TEST_PARAMS)
-logger.info('Starting tradefed shell test...')
+logger.info("Test params: %s" % args.TEST_PARAMS)
+logger.info("Starting tradefed shell test...")
command = None
prompt = None
@@ -198,22 +218,24 @@
logger.error("Not supported path: %s" % args.TEST_PATH)
sys.exit(1)
-child = subprocess.Popen(shlex.split(command), stderr=subprocess.STDOUT, stdout=tradefed_stdout)
+child = subprocess.Popen(
+ shlex.split(command), stderr=subprocess.STDOUT, stdout=tradefed_stdout
+)
fail_to_complete = child.wait()
if fail_to_complete:
- py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run fail')
+ py_test_lib.add_result(RESULT_FILE, "tradefed-test-run fail")
else:
- py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run pass')
+ py_test_lib.add_result(RESULT_FILE, "tradefed-test-run pass")
-logger.info('Tradefed test finished')
+logger.info("Tradefed test finished")
tradefed_stdout.close()
tradefed_logcat.kill()
tradefed_logcat_out.close()
# Locate and parse test result.
-result_dir = '%s/results' % args.TEST_PATH
-test_result = 'test_result.xml'
+result_dir = "%s/results" % args.TEST_PATH
+test_result = "test_result.xml"
if os.path.exists(result_dir) and os.path.isdir(result_dir):
for root, dirs, files in os.walk(result_dir):
for name in files:
diff --git a/automated/android/tradefed/result_parser.py b/automated/android/tradefed/result_parser.py
index 477b068..1a4eb10 100644
--- a/automated/android/tradefed/result_parser.py
+++ b/automated/android/tradefed/result_parser.py
@@ -5,7 +5,7 @@
import sys
import xml.etree.ElementTree as ET
-sys.path.insert(0, '../../lib/')
+sys.path.insert(0, "../../lib/")
import py_test_lib # nopep8
@@ -18,7 +18,7 @@
self.logger = logging.getLogger()
self.failures_to_print = 0
self.results_format = TradefedResultParser.AGGREGATED
- self.test_result_file_name = 'test_result.xml'
+ self.test_result_file_name = "test_result.xml"
def parse_recursively(self, result_dir):
if not os.path.exists(result_dir) or not os.path.isdir(result_dir):
@@ -67,23 +67,20 @@
try:
root = ET.fromstring(etree_content)
except ET.ParseError as e:
- self.logger.error('xml.etree.ElementTree.ParseError: %s' % e)
- self.logger.info('Please Check %s manually' % xml_file)
+ self.logger.error("xml.etree.ElementTree.ParseError: %s" % e)
+ self.logger.info("Please Check %s manually" % xml_file)
return False
self.logger.info(
- 'Test modules in %s: %s'
- % (xml_file, str(len(root.findall('Module'))))
+ "Test modules in %s: %s" % (xml_file, str(len(root.findall("Module"))))
)
remaining_failures_to_print = self.failures_to_print
- for elem in root.findall('Module'):
+ for elem in root.findall("Module"):
# Naming: Module Name + Test Case Name + Test Name
- if 'abi' in elem.attrib.keys():
- module_name = '.'.join(
- [elem.attrib['abi'], elem.attrib['name']]
- )
+ if "abi" in elem.attrib.keys():
+ module_name = ".".join([elem.attrib["abi"], elem.attrib["name"]])
else:
- module_name = elem.attrib['name']
+ module_name = elem.attrib["name"]
if self.results_format == TradefedResultParser.AGGREGATED:
r = self.print_aggregated(
@@ -92,10 +89,10 @@
remaining_failures_to_print -= r.num_printed_failures
if r.failures_skipped:
self.logger.info(
- 'There are more than %d test cases '
- 'failed, the output for the rest '
- 'failed test cases will be '
- 'skipped.' % (self.failures_to_print)
+ "There are more than %d test cases "
+ "failed, the output for the rest "
+ "failed test cases will be "
+ "skipped." % (self.failures_to_print)
)
elif self.results_format == TradefedResultParser.ATOMIC:
@@ -103,36 +100,32 @@
return True
def print_aggregated(self, module_name, elem, failures_to_print):
- tests_executed = len(elem.findall('.//Test'))
+ tests_executed = len(elem.findall(".//Test"))
tests_passed = len(elem.findall('.//Test[@result="pass"]'))
tests_failed = len(elem.findall('.//Test[@result="fail"]'))
- result = '%s_executed pass %s' % (module_name, str(tests_executed))
+ result = "%s_executed pass %s" % (module_name, str(tests_executed))
py_test_lib.add_result(self.result_output_file, result)
- result = '%s_passed pass %s' % (module_name, str(tests_passed))
+ result = "%s_passed pass %s" % (module_name, str(tests_passed))
py_test_lib.add_result(self.result_output_file, result)
- failed_result = 'pass'
+ failed_result = "pass"
if tests_failed > 0:
- failed_result = 'fail'
- result = '%s_failed %s %s' % (
- module_name,
- failed_result,
- str(tests_failed),
- )
+ failed_result = "fail"
+ result = "%s_failed %s %s" % (module_name, failed_result, str(tests_failed),)
py_test_lib.add_result(self.result_output_file, result)
# output result to show if the module is done or not
- tests_done = elem.get('done', 'false')
- if tests_done == 'false':
- result = '%s_done fail' % module_name
+ tests_done = elem.get("done", "false")
+ if tests_done == "false":
+ result = "%s_done fail" % module_name
else:
- result = '%s_done pass' % module_name
+ result = "%s_done pass" % module_name
py_test_lib.add_result(self.result_output_file, result)
Result = collections.namedtuple(
- 'Result', ['num_printed_failures', 'failures_skipped']
+ "Result", ["num_printed_failures", "failures_skipped"]
)
if failures_to_print == 0:
@@ -140,34 +133,31 @@
# print failed test cases for debug
num_printed_failures = 0
- test_cases = elem.findall('.//TestCase')
+ test_cases = elem.findall(".//TestCase")
for test_case in test_cases:
failed_tests = test_case.findall('.//Test[@result="fail"]')
for failed_test in failed_tests:
if num_printed_failures == failures_to_print:
return Result(num_printed_failures, True)
- test_name = '%s/%s.%s' % (
+ test_name = "%s/%s.%s" % (
module_name,
test_case.get("name"),
failed_test.get("name"),
)
- failures = failed_test.findall('.//Failure')
- failure_msg = ''
+ failures = failed_test.findall(".//Failure")
+ failure_msg = ""
for failure in failures:
- failure_msg = '%s \n %s' % (
- failure_msg,
- failure.get('message'),
- )
+ failure_msg = "%s \n %s" % (failure_msg, failure.get("message"),)
- self.logger.info('%s %s' % (test_name, failure_msg.strip()))
+ self.logger.info("%s %s" % (test_name, failure_msg.strip()))
num_printed_failures += 1
return Result(num_printed_failures, False)
def print_atomic(self, module_name, elem):
- test_cases = elem.findall('.//TestCase')
+ test_cases = elem.findall(".//TestCase")
for test_case in test_cases:
- tests = test_case.findall('.//Test')
+ tests = test_case.findall(".//Test")
for atomic_test in tests:
atomic_test_result = atomic_test.get("result")
atomic_test_name = "%s/%s.%s" % (
diff --git a/automated/android/tradefed/tradefed-runner.py b/automated/android/tradefed/tradefed-runner.py
index e86d3d5..f1203f3 100755
--- a/automated/android/tradefed/tradefed-runner.py
+++ b/automated/android/tradefed/tradefed-runner.py
@@ -13,64 +13,77 @@
import result_parser
-sys.path.insert(0, '../../lib/')
+sys.path.insert(0, "../../lib/")
import py_test_lib # nopep8
-OUTPUT = '%s/output' % os.getcwd()
-RESULT_FILE = '%s/result.txt' % OUTPUT
-TRADEFED_STDOUT = '%s/tradefed-stdout.txt' % OUTPUT
-TRADEFED_LOGCAT = '%s/tradefed-logcat.txt' % OUTPUT
-TEST_PARAMS = ''
+OUTPUT = "%s/output" % os.getcwd()
+RESULT_FILE = "%s/result.txt" % OUTPUT
+TRADEFED_STDOUT = "%s/tradefed-stdout.txt" % OUTPUT
+TRADEFED_LOGCAT = "%s/tradefed-logcat.txt" % OUTPUT
+TEST_PARAMS = ""
parser = argparse.ArgumentParser()
-parser.add_argument('-t', dest='TEST_PARAMS', required=True,
- help="tradefed shell test parameters")
-parser.add_argument('-p', dest='TEST_PATH', required=True,
- help="path to tradefed package top directory")
-parser.add_argument('-r', dest='RESULTS_FORMAT', required=False,
- default=result_parser.TradefedResultParser.AGGREGATED,
- choices=[result_parser.TradefedResultParser.AGGREGATED,
- result_parser.TradefedResultParser.ATOMIC],
- help="The format of the saved results. 'aggregated' means number of \
+parser.add_argument(
+ "-t", dest="TEST_PARAMS", required=True, help="tradefed shell test parameters"
+)
+parser.add_argument(
+ "-p", dest="TEST_PATH", required=True, help="path to tradefed package top directory"
+)
+parser.add_argument(
+ "-r",
+ dest="RESULTS_FORMAT",
+ required=False,
+ default=result_parser.TradefedResultParser.AGGREGATED,
+ choices=[
+ result_parser.TradefedResultParser.AGGREGATED,
+ result_parser.TradefedResultParser.ATOMIC,
+ ],
+ help="The format of the saved results. 'aggregated' means number of \
passed and failed tests are recorded for each module. 'atomic' means \
- each test result is recorded separately")
+ each test result is recorded separately",
+)
# The total number of failed test cases to be printed for this job
# Print too much failures would cause the lava job timed out
# Default to not print any failures
-parser.add_argument('-f', dest='FAILURES_PRINTED', type=int,
- required=False, default=0,
- help="Speciy the number of failed test cases to be\
- printed, 0 means not print any failures.")
+parser.add_argument(
+ "-f",
+ dest="FAILURES_PRINTED",
+ type=int,
+ required=False,
+ default=0,
+ help="Speciy the number of failed test cases to be\
+ printed, 0 means not print any failures.",
+)
args = parser.parse_args()
# TEST_PARAMS = args.TEST_PARAMS
if os.path.exists(OUTPUT):
- suffix = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
- shutil.move(OUTPUT, '%s_%s' % (OUTPUT, suffix))
+ suffix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
+ shutil.move(OUTPUT, "%s_%s" % (OUTPUT, suffix))
os.makedirs(OUTPUT)
# Setup logger.
# There might be an issue in lava/local dispatcher, most likely problem of
# pexpect. It prints the messages from print() last, not by sequence.
# Use logging and subprocess.call() to work around this.
-logger = logging.getLogger('Tradefed')
+logger = logging.getLogger("Tradefed")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
-formatter = logging.Formatter('%(asctime)s - %(name)s: %(levelname)s: %(message)s')
+formatter = logging.Formatter("%(asctime)s - %(name)s: %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
-tradefed_stdout = open(TRADEFED_STDOUT, 'w')
-tradefed_logcat_out = open(TRADEFED_LOGCAT, 'w')
-tradefed_logcat = subprocess.Popen(['adb', 'logcat'], stdout=tradefed_logcat_out)
+tradefed_stdout = open(TRADEFED_STDOUT, "w")
+tradefed_logcat_out = open(TRADEFED_LOGCAT, "w")
+tradefed_logcat = subprocess.Popen(["adb", "logcat"], stdout=tradefed_logcat_out)
-logger.info('Test params: %s' % args.TEST_PARAMS)
-logger.info('Starting tradefed shell test...')
+logger.info("Test params: %s" % args.TEST_PARAMS)
+logger.info("Starting tradefed shell test...")
command = None
prompt = None
@@ -87,66 +100,85 @@
sys.exit(1)
vts_monitor_enabled = False
-if command == 'android-vts/tools/vts-tradefed' and \
- os.path.exists('android-vts/testcases/vts/script/monitor-runner-output.py'):
+if command == "android-vts/tools/vts-tradefed" and os.path.exists(
+ "android-vts/testcases/vts/script/monitor-runner-output.py"
+):
vts_monitor_enabled = True
- vts_run_details = open('{}/vts_run_details.txt'.format(OUTPUT), 'w')
- monitor_cmd = 'android-vts/testcases/vts/script/monitor-runner-output.py -m'
- monitor_vts_output = subprocess.Popen(shlex.split(monitor_cmd), stderr=subprocess.STDOUT, stdout=vts_run_details)
+ vts_run_details = open("{}/vts_run_details.txt".format(OUTPUT), "w")
+ monitor_cmd = "android-vts/testcases/vts/script/monitor-runner-output.py -m"
+ monitor_vts_output = subprocess.Popen(
+ shlex.split(monitor_cmd), stderr=subprocess.STDOUT, stdout=vts_run_details
+ )
child = pexpect.spawn(command, logfile=tradefed_stdout, searchwindowsize=1024)
try:
child.expect(prompt, timeout=60)
child.sendline(args.TEST_PARAMS)
except pexpect.TIMEOUT:
- result = 'lunch-tf-shell fail'
+ result = "lunch-tf-shell fail"
py_test_lib.add_result(RESULT_FILE, result)
fail_to_complete = False
while child.isalive():
- subprocess.call('echo')
- subprocess.call(['echo', '--- line break ---'])
- logger.info('Checking adb connectivity...')
+ subprocess.call("echo")
+ subprocess.call(["echo", "--- line break ---"])
+ logger.info("Checking adb connectivity...")
adb_command = "adb shell echo OK"
adb_check = subprocess.Popen(shlex.split(adb_command))
if adb_check.wait() != 0:
- logger.debug('adb connection lost! maybe device is rebooting. Lets check again in 5 minute')
+ logger.debug(
+ "adb connection lost! maybe device is rebooting. Lets check again in 5 minute"
+ )
time.sleep(300)
adb_check = subprocess.Popen(shlex.split(adb_command))
if adb_check.wait() != 0:
- logger.debug('adb connection lost! Trying to dump logs of all invocations...')
- child.sendline('d l')
+ logger.debug(
+ "adb connection lost! Trying to dump logs of all invocations..."
+ )
+ child.sendline("d l")
time.sleep(30)
- subprocess.call(['sh', '-c', '. ../../lib/sh-test-lib && . ../../lib/android-test-lib && adb_debug_info'])
+ subprocess.call(
+ [
+ "sh",
+ "-c",
+ ". ../../lib/sh-test-lib && . ../../lib/android-test-lib && adb_debug_info",
+ ]
+ )
logger.debug('"adb devices" output')
- subprocess.call(['adb', 'devices'])
- logger.error('adb connection lost!! Will wait for 5 minutes and terminating tradefed shell test as adb connection is lost!')
+ subprocess.call(["adb", "devices"])
+ logger.error(
+ "adb connection lost!! Will wait for 5 minutes and terminating tradefed shell test as adb connection is lost!"
+ )
time.sleep(300)
child.terminate(force=True)
- result = 'check-adb-connectivity fail'
+ result = "check-adb-connectivity fail"
py_test_lib.add_result(RESULT_FILE, result)
break
else:
- logger.info('adb device is alive')
+ logger.info("adb device is alive")
time.sleep(300)
# Check if all tests finished every minute.
- m = child.expect(['ResultReporter: Full Result:',
- 'ConsoleReporter:.*Test run failed to complete.',
- pexpect.TIMEOUT],
- searchwindowsize=1024,
- timeout=60)
+ m = child.expect(
+ [
+ "ResultReporter: Full Result:",
+ "ConsoleReporter:.*Test run failed to complete.",
+ pexpect.TIMEOUT,
+ ],
+ searchwindowsize=1024,
+ timeout=60,
+ )
# Once all tests finshed, exit from tf shell to throw EOF, which sets child.isalive() to false.
if m == 0:
try:
child.expect(prompt, searchwindowsize=1024, timeout=60)
logger.debug('Sending "exit" command to TF shell...')
- child.sendline('exit')
+ child.sendline("exit")
child.expect(pexpect.EOF, timeout=60)
- logger.debug('Child process ended properly.')
+ logger.debug("Child process ended properly.")
except pexpect.TIMEOUT as e:
print(e)
- logger.debug('Unsuccessful clean exit, force killing child process...')
+ logger.debug("Unsuccessful clean exit, force killing child process...")
child.terminate(force=True)
break
# Mark test run as fail when a module or the whole run failed to complete.
@@ -155,16 +187,16 @@
# CTS not finished yet, continue to wait.
elif m == 2:
# Flush pexpect input buffer.
- child.expect(['.+', pexpect.TIMEOUT, pexpect.EOF], timeout=1)
- logger.info('Printing tradefed recent output...')
- subprocess.call(['tail', TRADEFED_STDOUT])
+ child.expect([".+", pexpect.TIMEOUT, pexpect.EOF], timeout=1)
+ logger.info("Printing tradefed recent output...")
+ subprocess.call(["tail", TRADEFED_STDOUT])
if fail_to_complete:
- py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run fail')
+ py_test_lib.add_result(RESULT_FILE, "tradefed-test-run fail")
else:
- py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run pass')
+ py_test_lib.add_result(RESULT_FILE, "tradefed-test-run pass")
-logger.info('Tradefed test finished')
+logger.info("Tradefed test finished")
tradefed_logcat.kill()
tradefed_logcat_out.close()
tradefed_stdout.close()
@@ -173,7 +205,7 @@
vts_run_details.close()
# Locate and parse test result.
-result_dir = '%s/results' % args.TEST_PATH
+result_dir = "%s/results" % args.TEST_PATH
parser = result_parser.TradefedResultParser(RESULT_FILE)
parser.logger = logger
parser.results_format = args.RESULTS_FORMAT
diff --git a/automated/lib/android_adb_wrapper.py b/automated/lib/android_adb_wrapper.py
index 64fcbb1..15092aa 100644
--- a/automated/lib/android_adb_wrapper.py
+++ b/automated/lib/android_adb_wrapper.py
@@ -15,9 +15,7 @@
self.serial = serial
self.command = command
self.error_message = error_message
- message = "Command `{}` failed on {}: {}".format(
- command, serial, error_message
- )
+ message = "Command `{}` failed on {}: {}".format(command, serial, error_message)
super(DeviceCommandError, self).__init__(message)
@@ -54,9 +52,7 @@
)
if ret.returncode < 0:
if raise_on_error:
- raise DeviceCommandError(
- serial if serial else "??", str(args), ret.stderr
- )
+ raise DeviceCommandError(serial if serial else "??", str(args), ret.stderr)
else:
return None
@@ -73,9 +69,7 @@
)
if raise_on_error and ret.returncode < 0:
- raise DeviceCommandError(
- serial if serial else "??", str(args), ret.stderr
- )
+ raise DeviceCommandError(serial if serial else "??", str(args), ret.stderr)
return ret
diff --git a/automated/lib/android_ui_wifi.py b/automated/lib/android_ui_wifi.py
index dd32166..32e7dd8 100755
--- a/automated/lib/android_ui_wifi.py
+++ b/automated/lib/android_ui_wifi.py
@@ -26,12 +26,8 @@
)
# Check if there is an option to turn WiFi on or off
- wifi_enabler = dut(
- text="OFF", resourceId="com.android.settings:id/switch_widget"
- )
- wifi_disabler = dut(
- text="ON", resourceId="com.android.settings:id/switch_widget"
- )
+ wifi_enabler = dut(text="OFF", resourceId="com.android.settings:id/switch_widget")
+ wifi_disabler = dut(text="ON", resourceId="com.android.settings:id/switch_widget")
if not wifi_enabler.exists and not wifi_disabler.exists:
raise DeviceCommandError(
@@ -80,10 +76,7 @@
)
args = parser.parse_args()
- if args.ACTION[0] != "set_wifi_state" or args.ACTION[1] not in (
- "on",
- "off",
- ):
+ if args.ACTION[0] != "set_wifi_state" or args.ACTION[1] not in ("on", "off",):
print(
"ERROR: Specified ACTION is not supported: {}".format(args.ACTION),
file=sys.stderr,
diff --git a/automated/lib/parse_rt_tests_results.py b/automated/lib/parse_rt_tests_results.py
index 1d106a6..3f24870 100755
--- a/automated/lib/parse_rt_tests_results.py
+++ b/automated/lib/parse_rt_tests_results.py
@@ -30,20 +30,20 @@
def print_res(res, key):
- print('t{}-{}-latency pass {} us'.format(res['t'], key, res[key]))
+ print("t{}-{}-latency pass {} us".format(res["t"], key, res[key]))
def get_block(filename):
# Fetch a text block from the file iterating backwards. Each block
# starts with an escape sequence which starts with '\x1b'.
- with open(filename, 'rb') as f:
+ with open(filename, "rb") as f:
try:
f.seek(0, os.SEEK_END)
while True:
pe = f.tell()
f.seek(-2, os.SEEK_CUR)
- while f.read(1) != b'\x1b':
+ while f.read(1) != b"\x1b":
f.seek(-2, os.SEEK_CUR)
pa = f.tell()
@@ -51,7 +51,7 @@
# Remove escape sequence at the start of the block
# The control sequence ends in 'A'
- i = blk.find('A') + 1
+ i = blk.find("A") + 1
yield blk[i:]
# Jump back to next block
@@ -65,18 +65,18 @@
def get_lastlines(filename):
for b in get_block(filename):
# Ignore empty blocks
- if len(b.strip('\n')) == 0:
+ if len(b.strip("\n")) == 0:
continue
- return b.split('\n')
+ return b.split("\n")
def parse_cyclictest(filename):
- fields = ['t', 'min', 'avg', 'max']
+ fields = ["t", "min", "avg", "max"]
- r = re.compile('[ :\n]+')
+ r = re.compile("[ :\n]+")
for line in get_lastlines(filename):
- if not line.startswith('T:'):
+ if not line.startswith("T:"):
continue
data = [x.lower() for x in r.split(line)]
@@ -86,16 +86,16 @@
if e in fields:
res[e] = next(it)
- print_res(res, 'min')
- print_res(res, 'avg')
- print_res(res, 'max')
+ print_res(res, "min")
+ print_res(res, "avg")
+ print_res(res, "max")
def parse_pmqtest(filename):
- fields = ['min', 'avg', 'max']
+ fields = ["min", "avg", "max"]
- rl = re.compile('[ ,:\n]+')
- rt = re.compile('[ ,#]+')
+ rl = re.compile("[ ,:\n]+")
+ rt = re.compile("[ ,#]+")
for line in get_lastlines(filename):
data = [x.lower() for x in rl.split(line)]
res = {}
@@ -110,21 +110,21 @@
# The id is constructed from the '#FROM -> #TO' output, e.g.
# #1 -> #0, Min 1, Cur 3, Avg 4, Max 119
data = rt.split(line)
- res['t'] = '{}-{}'.format(data[1], data[3])
+ res["t"] = "{}-{}".format(data[1], data[3])
- print_res(res, 'min')
- print_res(res, 'avg')
- print_res(res, 'max')
+ print_res(res, "min")
+ print_res(res, "avg")
+ print_res(res, "max")
def main():
tool = sys.argv[1]
logfile = sys.argv[2]
- if tool in ['cyclictest', 'signaltest', 'cyclicdeadline']:
+ if tool in ["cyclictest", "signaltest", "cyclicdeadline"]:
parse_cyclictest(logfile)
- elif tool in ['pmqtest', 'ptsematest', 'sigwaittest', 'svsematest']:
+ elif tool in ["pmqtest", "ptsematest", "sigwaittest", "svsematest"]:
parse_pmqtest(logfile)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/automated/lib/py_test_lib.py b/automated/lib/py_test_lib.py
index 0ccd963..0d57005 100755
--- a/automated/lib/py_test_lib.py
+++ b/automated/lib/py_test_lib.py
@@ -1,3 +1,3 @@
def add_result(result_file, result):
- with open(result_file, 'a') as f:
- f.write('%s\n' % result)
+ with open(result_file, "a") as f:
+ f.write("%s\n" % result)
diff --git a/automated/linux/aep-pre-post/postprocess_lisa_results.py b/automated/linux/aep-pre-post/postprocess_lisa_results.py
index 7cfb58b..b19c9bd 100644
--- a/automated/linux/aep-pre-post/postprocess_lisa_results.py
+++ b/automated/linux/aep-pre-post/postprocess_lisa_results.py
@@ -4,14 +4,12 @@
def main():
parser = argparse.ArgumentParser()
- parser.add_argument("-f",
- "--file",
- help="CSV file for postprocessing",
- dest="source_filename")
- parser.add_argument("-o",
- "--output-file",
- help="Results file",
- dest="results_filename")
+ parser.add_argument(
+ "-f", "--file", help="CSV file for postprocessing", dest="source_filename"
+ )
+ parser.add_argument(
+ "-o", "--output-file", help="Results file", dest="results_filename"
+ )
args = parser.parse_args()
row_index = 0
diff --git a/automated/linux/fuego-multinode/parser.py b/automated/linux/fuego-multinode/parser.py
index 27b99dd..82fc974 100755
--- a/automated/linux/fuego-multinode/parser.py
+++ b/automated/linux/fuego-multinode/parser.py
@@ -6,44 +6,55 @@
import sys
parser = argparse.ArgumentParser()
-parser.add_argument('-s', '--source', dest='source', required=True,
- help='path to fuego test result file run.json.')
-parser.add_argument('-d', '--dest', dest='dest', required=True,
- help='Path to plain test result file result.txt.')
+parser.add_argument(
+ "-s",
+ "--source",
+ dest="source",
+ required=True,
+ help="path to fuego test result file run.json.",
+)
+parser.add_argument(
+ "-d",
+ "--dest",
+ dest="dest",
+ required=True,
+ help="Path to plain test result file result.txt.",
+)
args = parser.parse_args()
with open(args.source) as f:
data = json.load(f)
-if 'test_sets' not in data.keys():
- print('test_sets NOT found in {}'.format(run_json))
+if "test_sets" not in data.keys():
+ print("test_sets NOT found in {}".format(run_json))
sys.exit(1)
result_lines = []
-for test_set in data['test_sets']:
- result_lines.append('lava-test-set start {}'.format(test_set['name']))
+for test_set in data["test_sets"]:
+ result_lines.append("lava-test-set start {}".format(test_set["name"]))
- for test_case in test_set['test_cases']:
+ for test_case in test_set["test_cases"]:
# Functional
- result_line = '{} {}'.format(test_case['name'],
- test_case['status'].lower())
+ result_line = "{} {}".format(test_case["name"], test_case["status"].lower())
result_lines.append(result_line)
# Benchmark
- if test_case.get('measurements'):
- for measurement in test_case['measurements']:
+ if test_case.get("measurements"):
+ for measurement in test_case["measurements"]:
# Use test_case_name plus measurement name as test_case_id so
# that it is readable and unique.
- result_line = '{}_{} {} {} {}'.format(test_case['name'],
- measurement['name'],
- measurement['status'].lower(),
- measurement['measure'],
- measurement.get('unit', ''))
+ result_line = "{}_{} {} {} {}".format(
+ test_case["name"],
+ measurement["name"],
+ measurement["status"].lower(),
+ measurement["measure"],
+ measurement.get("unit", ""),
+ )
result_lines.append(result_line)
- result_lines.append('lava-test-set stop {}'.format(test_set['name']))
+ result_lines.append("lava-test-set stop {}".format(test_set["name"]))
-with open(args.dest, 'w') as f:
+with open(args.dest, "w") as f:
for result_line in result_lines:
print(result_line)
- f.write('{}\n'.format(result_line))
+ f.write("{}\n".format(result_line))
diff --git a/automated/linux/glmark2/glmark2_lava_parse.py b/automated/linux/glmark2/glmark2_lava_parse.py
index 4e199c4..d98c3ee 100755
--- a/automated/linux/glmark2/glmark2_lava_parse.py
+++ b/automated/linux/glmark2/glmark2_lava_parse.py
@@ -31,32 +31,32 @@
">": "]",
}
-if __name__ == '__main__':
+if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: %s <result_file>" % sys.argv[0])
sys.exit(1)
rex = re.compile("(?P<test_case_id>.*): (?P<units>FPS): (?P<measurement>\\d+)")
score_rex = re.compile("(?P<test_case_id>glmark2 Score): (?P<measurement>\\d+)")
- with open(sys.argv[1], 'r') as f:
+ with open(sys.argv[1], "r") as f:
for line in f.readlines():
m = rex.search(line)
if m:
- case_id = m.group('test_case_id')
+ case_id = m.group("test_case_id")
for r in replaces.keys():
case_id = case_id.replace(r, replaces[r])
- result = 'pass'
- measurement = m.group('measurement')
- units = m.group('units')
+ result = "pass"
+ measurement = m.group("measurement")
+ units = m.group("units")
print("%s %s %s %s" % (case_id, result, measurement, units))
continue
m = score_rex.search(line)
if m:
- case_id = m.group('test_case_id')
+ case_id = m.group("test_case_id")
for r in replaces.keys():
case_id = case_id.replace(r, replaces[r])
- result = 'pass'
- measurement = m.group('measurement')
+ result = "pass"
+ measurement = m.group("measurement")
print("%s %s %s" % (case_id, result, measurement))
diff --git a/automated/linux/gst-validate/gst_validate_lava_parse.py b/automated/linux/gst-validate/gst_validate_lava_parse.py
index 70b93c9..b0697a9 100755
--- a/automated/linux/gst-validate/gst_validate_lava_parse.py
+++ b/automated/linux/gst-validate/gst_validate_lava_parse.py
@@ -28,35 +28,37 @@
def map_result_to_lava(result):
- if result == 'Passed':
- result = 'pass'
- elif result == 'Failed':
- result = 'fail'
- elif result == 'Skipped':
- result = 'skip'
- elif result == 'Timeout':
- result = 'fail'
+ if result == "Passed":
+ result = "pass"
+ elif result == "Failed":
+ result = "fail"
+ elif result == "Skipped":
+ result = "skip"
+ elif result == "Timeout":
+ result = "fail"
return result
-if __name__ == '__main__':
+if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: %s <result_file> [ignore_file]" % sys.argv[0])
sys.exit(1)
ignore_tests = []
if len(sys.argv) == 3:
- with open(sys.argv[2], 'r') as f:
+ with open(sys.argv[2], "r") as f:
ignore_tests = f.read().split()
- rex = re.compile(r'^(?P<test_case_id>validate\..*):\s+(?P<result>(Failed|Passed|Skipped|Timeout))')
- with open(sys.argv[1], 'r') as f:
+ rex = re.compile(
+ r"^(?P<test_case_id>validate\..*):\s+(?P<result>(Failed|Passed|Skipped|Timeout))"
+ )
+ with open(sys.argv[1], "r") as f:
for line in f.readlines():
s = rex.search(line)
if s:
- test_case_id = s.group('test_case_id')
- result = s.group('result')
+ test_case_id = s.group("test_case_id")
+ result = s.group("result")
if test_case_id not in ignore_tests:
print("%s %s" % (test_case_id, map_result_to_lava(result)))
diff --git a/automated/linux/igt/print-test-result.py b/automated/linux/igt/print-test-result.py
index 7fe353e..cf1bf2d 100755
--- a/automated/linux/igt/print-test-result.py
+++ b/automated/linux/igt/print-test-result.py
@@ -5,33 +5,47 @@
def print_result(results):
- for test, content in results['tests'].items():
- print('<LAVA_SIGNAL_STARTTC %s>' % test)
- print('************************************************************************************************************************************')
- print('%-15s %s' % ('Test:', test))
- print('%-15s %s' % ('Result:', content['result']))
+ for test, content in results["tests"].items():
+ print("<LAVA_SIGNAL_STARTTC %s>" % test)
+ print(
+ "************************************************************************************************************************************"
+ )
+ print("%-15s %s" % ("Test:", test))
+ print("%-15s %s" % ("Result:", content["result"]))
# Test result generated by igt_runner doesn't have the following values
try:
- print('%-15s %s' % ('Command:', content['command']))
- print('%-15s %s' % ('Environment:', content['environment']))
- print('%-15s %s' % ('Returncode:', content['returncode']))
+ print("%-15s %s" % ("Command:", content["command"]))
+ print("%-15s %s" % ("Environment:", content["environment"]))
+ print("%-15s %s" % ("Returncode:", content["returncode"]))
except KeyError:
pass
- print('%-15s %s' % ('Stdout:', content['out'].replace('\n', '\n ')))
- print('%-15s %s' % ('Stderr:', content['err'].replace('\n', '\n ')))
- print('%-15s %s' % ('dmesg:', content['dmesg'].replace('\n', '\n ')))
- print('<LAVA_SIGNAL_TESTCASE TEST_CASE_ID=%s RESULT=%s>' % (test, content['result']))
- print('<LAVA_SIGNAL_ENDTC %s>' % test)
+ print(
+ "%-15s %s" % ("Stdout:", content["out"].replace("\n", "\n "))
+ )
+ print(
+ "%-15s %s" % ("Stderr:", content["err"].replace("\n", "\n "))
+ )
+ print(
+ "%-15s %s"
+ % ("dmesg:", content["dmesg"].replace("\n", "\n "))
+ )
+ print(
+ "<LAVA_SIGNAL_TESTCASE TEST_CASE_ID=%s RESULT=%s>"
+ % (test, content["result"])
+ )
+ print("<LAVA_SIGNAL_ENDTC %s>" % test)
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = argparse.ArgumentParser()
- parser.add_argument("-f",
- "--json-file",
- nargs='?',
- default=sys.stdin,
- type=argparse.FileType('r', encoding='UTF-8'),
- help="Test result file in json format")
+ parser.add_argument(
+ "-f",
+ "--json-file",
+ nargs="?",
+ default=sys.stdin,
+ type=argparse.FileType("r", encoding="UTF-8"),
+ help="Test result file in json format",
+ )
args = parser.parse_args()
with args.json_file as data:
diff --git a/automated/linux/kvm/wait-ip.py b/automated/linux/kvm/wait-ip.py
index 6ace09a..aea3939 100755
--- a/automated/linux/kvm/wait-ip.py
+++ b/automated/linux/kvm/wait-ip.py
@@ -21,5 +21,5 @@
self.returnIP()
-server = HTTPServer(('', 8080), MyHandler)
+server = HTTPServer(("", 8080), MyHandler)
server.handle_request()
diff --git a/automated/linux/ota-update/ota-update.py b/automated/linux/ota-update/ota-update.py
index 399c67d..57b46eb 100644
--- a/automated/linux/ota-update/ota-update.py
+++ b/automated/linux/ota-update/ota-update.py
@@ -6,25 +6,29 @@
import os
from argparse import ArgumentParser
-sys.path.insert(0, '../../lib/')
+sys.path.insert(0, "../../lib/")
import py_test_lib # nopep8
-OUTPUT = '%s/output' % os.getcwd()
-RESULT_FILE = '%s/result.txt' % OUTPUT
+OUTPUT = "%s/output" % os.getcwd()
+RESULT_FILE = "%s/result.txt" % OUTPUT
parser = ArgumentParser()
-parser.add_argument("-d", "--device", dest="devicename", default="hikey-r2-01",
- help="Device Name to be updated")
-parser.add_argument("-is", "--installed-sha", dest="installed_sha", default="",
- help="OTA update sha")
-parser.add_argument("-us", "--update-sha", dest="update_sha", default="",
- help="OTA update sha")
+parser.add_argument(
+ "-d",
+ "--device",
+ dest="devicename",
+ default="hikey-r2-01",
+ help="Device Name to be updated",
+)
+parser.add_argument(
+ "-is", "--installed-sha", dest="installed_sha", default="", help="OTA update sha"
+)
+parser.add_argument(
+ "-us", "--update-sha", dest="update_sha", default="", help="OTA update sha"
+)
args = parser.parse_args()
url = "http://api.ota-prototype.linaro.org/devices/%s/" % args.devicename
-headers = {
- "OTA-TOKEN": "BadT0ken5",
- "Content-type": "application/json"
-}
+headers = {"OTA-TOKEN": "BadT0ken5", "Content-type": "application/json"}
data = json.dumps({"image": {"hash": args.update_sha}})
@@ -33,7 +37,9 @@
while loop < 20:
r = requests.get(url, headers=headers)
resp = yaml.load(r.text)
- currentsha_on_server = resp.get("deviceImage").get("image").get("hash").get("sha256")
+ currentsha_on_server = (
+ resp.get("deviceImage").get("image").get("hash").get("sha256")
+ )
if currentsha_on_server == sha:
return 0
loop = loop + 1
diff --git a/automated/linux/piglit/piglit_lava_parse.py b/automated/linux/piglit/piglit_lava_parse.py
index 32e8e2a..81da2b5 100755
--- a/automated/linux/piglit/piglit_lava_parse.py
+++ b/automated/linux/piglit/piglit_lava_parse.py
@@ -28,12 +28,12 @@
def map_result_to_lava(result):
- if result == 'warn':
- result = 'pass'
- elif result == 'crash':
- result = 'fail'
- elif result == 'incomplete':
- result = 'fail'
+ if result == "warn":
+ result = "pass"
+ elif result == "crash":
+ result = "fail"
+ elif result == "incomplete":
+ result = "fail"
return result
@@ -47,43 +47,43 @@
def print_results(filename, ignore_tests):
- currentsuite = ''
- with open(filename, 'r') as f:
+ currentsuite = ""
+ with open(filename, "r") as f:
piglit_results = json.loads(f.read())
- for test in sorted(piglit_results['tests'].keys()):
+ for test in sorted(piglit_results["tests"].keys()):
if test in ignore_tests:
continue
- testname_parts = test.split('@')
- testname = testname_parts[-1].replace(' ', '_')
- suitename = '@'.join(testname_parts[0:-1])
+ testname_parts = test.split("@")
+ testname = testname_parts[-1].replace(" ", "_")
+ suitename = "@".join(testname_parts[0:-1])
if currentsuite != suitename:
if currentsuite:
- print('lava-test-set stop %s' % currentsuite)
+ print("lava-test-set stop %s" % currentsuite)
currentsuite = suitename
- print('lava-test-set start %s' % currentsuite)
+ print("lava-test-set start %s" % currentsuite)
- result = map_result_to_lava(piglit_results['tests'][test]['result'])
+ result = map_result_to_lava(piglit_results["tests"][test]["result"])
print("%s %s" % (testname, result))
- print('lava-test-set stop %s' % currentsuite)
+ print("lava-test-set stop %s" % currentsuite)
-if __name__ == '__main__':
+if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: %s <result_dir|result_file> [ignore_file]" % sys.argv[0])
sys.exit(1)
ignore_tests = []
if len(sys.argv) == 3:
- with open(sys.argv[2], 'r') as f:
+ with open(sys.argv[2], "r") as f:
ignore_tests = f.read().split()
if os.path.isdir(sys.argv[1]):
for root, dirs, files in os.walk(sys.argv[1]):
result_types = {}
for name in sorted(files, key=natural_keys):
- if name.endswith('.tmp'):
+ if name.endswith(".tmp"):
continue
piglit_result = None
full_f = os.path.join(root, name)
diff --git a/automated/linux/ptest/ptest.py b/automated/linux/ptest/ptest.py
index 90fc45d..82ef82d 100755
--- a/automated/linux/ptest/ptest.py
+++ b/automated/linux/ptest/ptest.py
@@ -27,17 +27,17 @@
import re
import os
-OUTPUT_LOG = os.path.join(os.getcwd(), 'result.txt')
+OUTPUT_LOG = os.path.join(os.getcwd(), "result.txt")
def get_ptest_dir():
- ptest_dirs = ['/usr/lib', '/usr/lib64', '/usr/lib32']
+ ptest_dirs = ["/usr/lib", "/usr/lib64", "/usr/lib32"]
for pdir in ptest_dirs:
try:
- ptests = subprocess.check_output('ptest-runner -l -d %s' %
- pdir, shell=True,
- stderr=subprocess.STDOUT)
+ ptests = subprocess.check_output(
+ "ptest-runner -l -d %s" % pdir, shell=True, stderr=subprocess.STDOUT
+ )
except subprocess.CalledProcessError:
continue
@@ -47,16 +47,16 @@
def get_available_ptests(ptest_dir):
- output = subprocess.check_output('ptest-runner -l -d %s' %
- ptest_dir, shell=True,
- stderr=subprocess.STDOUT)
+ output = subprocess.check_output(
+ "ptest-runner -l -d %s" % ptest_dir, shell=True, stderr=subprocess.STDOUT
+ )
ptests = []
ptest_rex = re.compile("^(?P<ptest_name>.*)\t")
- for line in output.decode('utf-8', errors="replace").split('\n'):
+ for line in output.decode("utf-8", errors="replace").split("\n"):
m = ptest_rex.search(line)
if m:
- ptests.append(m.group('ptest_name'))
+ ptests.append(m.group("ptest_name"))
return ptests
@@ -78,8 +78,7 @@
for request_ptest in requested_ptests.keys():
if not requested_ptests[request_ptest]:
- print("ERROR: Ptest %s was requested and isn't available" %
- request_ptest)
+ print("ERROR: Ptest %s was requested and isn't available" % request_ptest)
sys.exit(1)
return filter_ptests
@@ -87,9 +86,9 @@
def parse_line(line):
test_status_list = {
- 'pass': re.compile("^PASS:(.+)"),
- 'fail': re.compile("^FAIL:(.+)"),
- 'skip': re.compile("^SKIP:(.+)")
+ "pass": re.compile("^PASS:(.+)"),
+ "fail": re.compile("^FAIL:(.+)"),
+ "skip": re.compile("^SKIP:(.+)"),
}
for test_status, status_regex in test_status_list.items():
@@ -102,10 +101,9 @@
def run_ptest(command):
results = []
- process = subprocess.Popen(command,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ process = subprocess.Popen(
+ command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ )
while True:
output = process.stdout.readline()
try:
@@ -125,34 +123,46 @@
def check_ptest(ptest_dir, ptest_name, output_log):
- log_name = os.path.join(os.getcwd(), '%s.log' % ptest_name)
- status, results = run_ptest('ptest-runner -d %s %s' % (ptest_dir, ptest_name))
+ log_name = os.path.join(os.getcwd(), "%s.log" % ptest_name)
+ status, results = run_ptest("ptest-runner -d %s %s" % (ptest_dir, ptest_name))
- with open(output_log, 'a+') as f:
+ with open(output_log, "a+") as f:
f.write("lava-test-set start %s\n" % ptest_name)
f.write("%s %s\n" % (ptest_name, "pass" if status == 0 else "fail"))
for test, test_status in results:
test = test.encode("ascii", errors="ignore").decode()
- f.write("%s %s\n" % (re.sub(r'[^\w-]', '', test), test_status))
+ f.write("%s %s\n" % (re.sub(r"[^\w-]", "", test), test_status))
f.write("lava-test-set stop %s\n" % ptest_name)
def main():
- parser = argparse.ArgumentParser(description="LAVA/OE ptest script",
- add_help=False)
- parser.add_argument('-t', '--tests', action='store', nargs='*',
- help='Ptests to run')
- parser.add_argument('-e', '--exclude', action='store', nargs='*',
- help='Ptests to exclude')
- parser.add_argument('-d', '--ptest-dir',
- help='Directory where ptests are stored (optional)',
- action='store')
- parser.add_argument('-o', '--output-log',
- help='File to output log (optional)', action='store',
- default=OUTPUT_LOG)
- parser.add_argument('-h', '--help', action='help',
- default=argparse.SUPPRESS,
- help='show this help message and exit')
+ parser = argparse.ArgumentParser(description="LAVA/OE ptest script", add_help=False)
+ parser.add_argument(
+ "-t", "--tests", action="store", nargs="*", help="Ptests to run"
+ )
+ parser.add_argument(
+ "-e", "--exclude", action="store", nargs="*", help="Ptests to exclude"
+ )
+ parser.add_argument(
+ "-d",
+ "--ptest-dir",
+ help="Directory where ptests are stored (optional)",
+ action="store",
+ )
+ parser.add_argument(
+ "-o",
+ "--output-log",
+ help="File to output log (optional)",
+ action="store",
+ default=OUTPUT_LOG,
+ )
+ parser.add_argument(
+ "-h",
+ "--help",
+ action="help",
+ default=argparse.SUPPRESS,
+ help="show this help message and exit",
+ )
args = parser.parse_args()
if args.ptest_dir:
@@ -186,7 +196,7 @@
return 0
-if __name__ == '__main__':
+if __name__ == "__main__":
try:
ret = main()
except SystemExit as e:
@@ -194,6 +204,7 @@
except Exception:
ret = 1
import traceback
+
traceback.print_exc()
sys.exit(ret)
diff --git a/automated/linux/ui-browser-test/robot-results-parser.py b/automated/linux/ui-browser-test/robot-results-parser.py
index ad0e4d4..4ae6ce2 100755
--- a/automated/linux/ui-browser-test/robot-results-parser.py
+++ b/automated/linux/ui-browser-test/robot-results-parser.py
@@ -27,16 +27,16 @@
tree = ET.parse(input_file)
root = tree.getroot()
-for statistics in root.findall('statistics'):
- for suite in statistics.findall('suite'):
- for stat in suite.findall('stat'):
- name = stat.get('name')
- if 'Robot-Test-Scripts' == name:
- status = 'pass'
+for statistics in root.findall("statistics"):
+ for suite in statistics.findall("suite"):
+ for stat in suite.findall("stat"):
+ name = stat.get("name")
+ if "Robot-Test-Scripts" == name:
+ status = "pass"
print name, " ", status
else:
- if '1' == stat.get('pass'):
- status = 'pass'
+ if "1" == stat.get("pass"):
+ status = "pass"
else:
- status = 'fail'
+ status = "fail"
print name, " ", status
diff --git a/automated/utils/httperf/httperf-runner.py b/automated/utils/httperf/httperf-runner.py
index 90bba64..d2eafa0 100755
--- a/automated/utils/httperf/httperf-runner.py
+++ b/automated/utils/httperf/httperf-runner.py
@@ -35,11 +35,7 @@
HTTPERF_FINISHED = 2
HTTPERF_ERROR = 3
- def __init__(self,
- rate=10000,
- server='localhost',
- duration=5,
- timeout=1):
+ def __init__(self, rate=10000, server="localhost", duration=5, timeout=1):
self.state = httperf.HTTPERF_INIT
self.result = None
self.errors = {}
@@ -55,21 +51,30 @@
return 1
self.state = httperf.HTTPERF_RUNNING
- self.proc = subprocess.Popen(['httperf',
- '--hog',
- '--timeout', str(self.timeout),
- '--server', self.server,
- '--uri', '/index.html',
- '--rate', str(self.rate),
- '--num-conns', str(self.rate * self.duration)],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True)
+ self.proc = subprocess.Popen(
+ [
+ "httperf",
+ "--hog",
+ "--timeout",
+ str(self.timeout),
+ "--server",
+ self.server,
+ "--uri",
+ "/index.html",
+ "--rate",
+ str(self.rate),
+ "--num-conns",
+ str(self.rate * self.duration),
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ )
self.stdout, self.stderr = self.proc.communicate()
if self.proc.returncode != 0:
- print('Error running httperf', file=sys.stderr)
+ print("Error running httperf", file=sys.stderr)
self.state = httperf.HTTPERF_ERROR
return 1
@@ -79,33 +84,33 @@
return 0
def __parse_output(self):
- re1 = re.compile('^Errors: total')
- re2 = re.compile('^Errors: fd')
- re3 = re.compile('^Request rate')
- for line in self.stdout.split('\n'):
+ re1 = re.compile("^Errors: total")
+ re2 = re.compile("^Errors: fd")
+ re3 = re.compile("^Request rate")
+ for line in self.stdout.split("\n"):
values = line.split()
if re1.match(line):
- self.errors['total'] = int(values[2])
- self.errors['client-timo'] = int(values[4])
- self.errors['socket-timo'] = int(values[6])
- self.errors['connrefused'] = int(values[8])
- self.errors['connreset'] = int(values[10])
+ self.errors["total"] = int(values[2])
+ self.errors["client-timo"] = int(values[4])
+ self.errors["socket-timo"] = int(values[6])
+ self.errors["connrefused"] = int(values[8])
+ self.errors["connreset"] = int(values[10])
elif re2.match(line):
- self.errors['fd-unavail'] = int(values[2])
- self.errors['addrunavail'] = int(values[4])
- self.errors['ftab-full'] = int(values[6])
- self.errors['other'] = int(values[8])
+ self.errors["fd-unavail"] = int(values[2])
+ self.errors["addrunavail"] = int(values[4])
+ self.errors["ftab-full"] = int(values[6])
+ self.errors["other"] = int(values[8])
elif re3.match(line):
self.request_rate = float(values[2])
def get_errors(self, kind):
if self.state != httperf.HTTPERF_FINISHED:
- print('get_errors: not finished', file=sys.stderr)
+ print("get_errors: not finished", file=sys.stderr)
# FIXME: raise exception
return 0
if kind not in self.errors:
- print('Error type %s not valid' % kind)
+ print("Error type %s not valid" % kind)
# FIXME: raise exception
return 0
@@ -118,7 +123,7 @@
if self.state != httperf.HTTPERF_FINISHED:
return 1
- with open(filename, 'w') as f:
+ with open(filename, "w") as f:
f.write(self.output())
return 0
@@ -137,8 +142,17 @@
FINISHED = 4
ERROR = 5
- def __init__(self, step=10000, rate=10000, min_step=200, duration=5,
- server='localhost', sleep_time=61, tolerance={}, attempts=1):
+ def __init__(
+ self,
+ step=10000,
+ rate=10000,
+ min_step=200,
+ duration=5,
+ server="localhost",
+ sleep_time=61,
+ tolerance={},
+ attempts=1,
+ ):
self.state = httperf_runner.IDLE
self.step = step
self.rate = rate
@@ -156,7 +170,7 @@
def __has_errors(self, cmd):
if cmd:
for kind in cmd.get_error_list():
- if kind == 'total':
+ if kind == "total":
continue
count = cmd.get_errors(kind)
if count == 0:
@@ -183,23 +197,23 @@
attempt += 1
if self.__has_errors(cmd):
- print('--- SLEEP', self.sleep_time, 'and RETRY')
+ print("--- SLEEP", self.sleep_time, "and RETRY")
sleep(self.sleep_time)
- print('--- RANGE: [%0.1f, %0.1f], STEP: %d' %
- (lower_limit, upper_limit, step))
- print('--- BEGIN', rate,
- ', ATTEMPT %d/%d' % (attempt, self.attempts))
+ print(
+ "--- RANGE: [%0.1f, %0.1f], STEP: %d"
+ % (lower_limit, upper_limit, step)
+ )
+ print("--- BEGIN", rate, ", ATTEMPT %d/%d" % (attempt, self.attempts))
self.state = httperf_runner.RUNNING
- cmd = httperf(rate=rate, duration=self.duration,
- server=self.server)
+ cmd = httperf(rate=rate, duration=self.duration, server=self.server)
cmd.run()
print(cmd.output())
- print('--- END')
+ print("--- END")
if self.__has_errors(cmd):
self.state = httperf_runner.FAILED
- print('--- ERRORS:', cmd.get_errors('total'))
+ print("--- ERRORS:", cmd.get_errors("total"))
else:
break
@@ -209,7 +223,7 @@
else:
# NO errors, we might have found a NEW HIGH
if cmd.request_rate > lower_limit:
- print('--- NEW HIGH:', cmd.request_rate)
+ print("--- NEW HIGH:", cmd.request_rate)
lower_limit = cmd.request_rate
# save this httperf object
self.max_run = cmd
@@ -243,7 +257,7 @@
if self.max_run:
return self.max_run.output()
- def write(self, filename='httperf.txt'):
+ def write(self, filename="httperf.txt"):
if self.max_run:
self.max_run.write(filename)
@@ -256,42 +270,100 @@
try:
ret[key] = int(ret[key])
except ValueError:
- print("Warning: Ignoring value", ret[key], "for", key,
- ": not an integer", file=sys.stderr)
+ print(
+ "Warning: Ignoring value",
+ ret[key],
+ "for",
+ key,
+ ": not an integer",
+ file=sys.stderr,
+ )
ret[key] = 0
setattr(namespace, self.dest, ret)
-parser = argparse.ArgumentParser(description='Find highest rate using httperf')
-parser.add_argument('--attempts', '-a', type=int, default=[2], nargs=1,
- help='Number of attempts for each rate under test (default 2)')
-parser.add_argument('--csv', nargs=1,
- help='Save the results in the given file. The file will '
- + 'have one column which is later easy to import in a '
- + 'spreadsheet. If the file exists, data will be '
- + 'appended to it.')
-parser.add_argument('--dir', '-d', nargs=1, default=None,
- help='Put all output files in this directory (default CWD)')
-parser.add_argument('--duration', nargs=1, default=[5], type=int,
- help='Duration of each httperf run (default 5)')
-parser.add_argument('--iterations', '-i', default=[1], nargs=1, type=int,
- help='Runs the script this amount of times (default 1)')
-parser.add_argument('--min-step', '-m', nargs=1, default=[200], type=int,
- help='The minimum step to consider (default 200)')
-parser.add_argument('--output', '-o', default='httperf_max_rate',
- help='Stores the result in the OUTPUT file, with the '
- + 'iteration number appended (default httperf_max_rate)')
-parser.add_argument('--rate', '-r', type=int, default=[10000], nargs=1,
- help='The initial request rate to try (default 10000)')
-parser.add_argument('--step', '-s', type=int, default=[10000], nargs=1,
- help='The initial step (default 10000)')
-parser.add_argument('--server', default='localhost',
- help='Server to connet to (defaut localhost)')
-parser.add_argument('--tolerance', nargs='+', action=ParseTolerance,
- default={'client-timo': 20},
- help='list of key value pairs of errors accepted by '
- + 'httperf. Ex: --tolerance client-timo 20 other 5')
+parser = argparse.ArgumentParser(description="Find highest rate using httperf")
+parser.add_argument(
+ "--attempts",
+ "-a",
+ type=int,
+ default=[2],
+ nargs=1,
+ help="Number of attempts for each rate under test (default 2)",
+)
+parser.add_argument(
+ "--csv",
+ nargs=1,
+ help="Save the results in the given file. The file will "
+ + "have one column which is later easy to import in a "
+ + "spreadsheet. If the file exists, data will be "
+ + "appended to it.",
+)
+parser.add_argument(
+ "--dir",
+ "-d",
+ nargs=1,
+ default=None,
+ help="Put all output files in this directory (default CWD)",
+)
+parser.add_argument(
+ "--duration",
+ nargs=1,
+ default=[5],
+ type=int,
+ help="Duration of each httperf run (default 5)",
+)
+parser.add_argument(
+ "--iterations",
+ "-i",
+ default=[1],
+ nargs=1,
+ type=int,
+ help="Runs the script this amount of times (default 1)",
+)
+parser.add_argument(
+ "--min-step",
+ "-m",
+ nargs=1,
+ default=[200],
+ type=int,
+ help="The minimum step to consider (default 200)",
+)
+parser.add_argument(
+ "--output",
+ "-o",
+ default="httperf_max_rate",
+ help="Stores the result in the OUTPUT file, with the "
+ + "iteration number appended (default httperf_max_rate)",
+)
+parser.add_argument(
+ "--rate",
+ "-r",
+ type=int,
+ default=[10000],
+ nargs=1,
+ help="The initial request rate to try (default 10000)",
+)
+parser.add_argument(
+ "--step",
+ "-s",
+ type=int,
+ default=[10000],
+ nargs=1,
+ help="The initial step (default 10000)",
+)
+parser.add_argument(
+ "--server", default="localhost", help="Server to connet to (defaut localhost)"
+)
+parser.add_argument(
+ "--tolerance",
+ nargs="+",
+ action=ParseTolerance,
+ default={"client-timo": 20},
+ help="list of key value pairs of errors accepted by "
+ + "httperf. Ex: --tolerance client-timo 20 other 5",
+)
args = parser.parse_args()
@@ -299,15 +371,15 @@
if not os.path.exists(args.dir[0]):
os.mkdir(args.dir[0])
elif not os.path.isdir(args.dir[0]):
- print("Error:", args.dir[0], "exists but it is not a directory",
- file=sys.stderr)
+ print(
+ "Error:", args.dir[0], "exists but it is not a directory", file=sys.stderr
+ )
exit(1)
else:
- args.dir = ['.']
+ args.dir = ["."]
if not os.access(args.dir[0], os.W_OK):
- print("Error: can not write to ", os.path.realpath(args.dir[0]),
- file=sys.stderr)
+ print("Error: can not write to ", os.path.realpath(args.dir[0]), file=sys.stderr)
exit(1)
if args.csv:
@@ -319,25 +391,28 @@
start_time = time()
for i in range(args.iterations[0]):
- print('\n--- ITERATION', i)
- runner = httperf_runner(step=args.step[0], rate=args.rate[0],
- server=args.server, min_step=args.min_step[0],
- duration=args.duration[0],
- tolerance=args.tolerance, attempts=args.attempts[0])
+ print("\n--- ITERATION", i)
+ runner = httperf_runner(
+ step=args.step[0],
+ rate=args.rate[0],
+ server=args.server,
+ min_step=args.min_step[0],
+ duration=args.duration[0],
+ tolerance=args.tolerance,
+ attempts=args.attempts[0],
+ )
if runner.run():
- print('There was an error, exiting.', file=sys.stderr)
+ print("There was an error, exiting.", file=sys.stderr)
exit(1)
- print('--- MAX RATE: %0.1f' % runner.max_rate)
- print('--- ELAPSED TIME:',
- str(datetime.timedelta(seconds=runner.elapsed_time)))
+ print("--- MAX RATE: %0.1f" % runner.max_rate)
+ print("--- ELAPSED TIME:", str(datetime.timedelta(seconds=runner.elapsed_time)))
runner.write(ofile + str(i))
if csv_file:
- with open(csv_file, 'a') as f:
+ with open(csv_file, "a") as f:
print(runner.max_rate, file=f)
-print('\n--- TOTAL ELAPSED TIME:',
- str(datetime.timedelta(seconds=time() - start_time)))
+print("\n--- TOTAL ELAPSED TIME:", str(datetime.timedelta(seconds=time() - start_time)))
diff --git a/automated/utils/post-to-squad.py b/automated/utils/post-to-squad.py
index 7ba4e0c..e52ac92 100755
--- a/automated/utils/post-to-squad.py
+++ b/automated/utils/post-to-squad.py
@@ -12,24 +12,58 @@
def parse_args():
parser = argparse.ArgumentParser()
- parser.add_argument('-r', '--result-file', dest='result_file',
- required=True, default='./result.json',
- help='Specify test result file.')
- parser.add_argument('-a', '--attachment', dest='attachment',
- action='append', help='Specify attachment file.')
- parser.add_argument('-t', '--team', dest='team', required=True,
- help='Team identifier. Defaults to "erp"')
- parser.add_argument('-p', '--project', dest='project',
- help='Project identifier. Defaults to the name of the Linux distribution.')
- parser.add_argument('-b', '--build', dest='build', required=True,
- help='Build identifier.')
- parser.add_argument('-e', '--test-env', dest='test_env',
- help='Environment identifier. Defaults to board name.')
- parser.add_argument('-u', '--url', dest='url',
- default='https://qa-reports.linaro.org',
- help='Dashboard URL. Defaults to https://qa-reports.linaro.org.')
- parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
- default=True, help='Set log level.')
+ parser.add_argument(
+ "-r",
+ "--result-file",
+ dest="result_file",
+ required=True,
+ default="./result.json",
+ help="Specify test result file.",
+ )
+ parser.add_argument(
+ "-a",
+ "--attachment",
+ dest="attachment",
+ action="append",
+ help="Specify attachment file.",
+ )
+ parser.add_argument(
+ "-t",
+ "--team",
+ dest="team",
+ required=True,
+ help='Team identifier. Defaults to "erp"',
+ )
+ parser.add_argument(
+ "-p",
+ "--project",
+ dest="project",
+ help="Project identifier. Defaults to the name of the Linux distribution.",
+ )
+ parser.add_argument(
+ "-b", "--build", dest="build", required=True, help="Build identifier."
+ )
+ parser.add_argument(
+ "-e",
+ "--test-env",
+ dest="test_env",
+ help="Environment identifier. Defaults to board name.",
+ )
+ parser.add_argument(
+ "-u",
+ "--url",
+ dest="url",
+ default="https://qa-reports.linaro.org",
+ help="Dashboard URL. Defaults to https://qa-reports.linaro.org.",
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ dest="verbose",
+ default=True,
+ help="Set log level.",
+ )
args = parser.parse_args()
return args
@@ -39,38 +73,40 @@
squad_tests = OrderedDict()
squad_metrics = OrderedDict()
for result in results:
- for metric in result['metrics']:
- key = '%s/%s' % (result['name'], metric['test_case_id'])
- if not metric['measurement']:
+ for metric in result["metrics"]:
+ key = "%s/%s" % (result["name"], metric["test_case_id"])
+ if not metric["measurement"]:
# Collect pass/fail test results.
- squad_tests[key] = metric['result']
+ squad_tests[key] = metric["result"]
else:
# Collect performance test results.
try:
- measurement = decimal.Decimal(metric['measurement'])
+ measurement = decimal.Decimal(metric["measurement"])
squad_metrics[key] = float(measurement)
except decimal.InvalidOperation:
- logger.info('Invalid measurement: %s' % metric['measurement'])
- logger.info('Skipped adding: %s' % metric)
- assert squad_tests or squad_metrics, 'No valid result found!'
+ logger.info("Invalid measurement: %s" % metric["measurement"])
+ logger.info("Skipped adding: %s" % metric)
+ assert squad_tests or squad_metrics, "No valid result found!"
return (squad_tests, squad_metrics)
def squad_metadata(results):
- test_plan = list(set(i['test_plan'] for i in results))
- test_version = list(set(i['version'] for i in results))
+ test_plan = list(set(i["test_plan"] for i in results))
+ test_version = list(set(i["version"] for i in results))
- assert len(test_plan) == 1, 'More then one test plan found!'
- assert len(test_version) == 1, 'More then one test version found!'
+ assert len(test_plan) == 1, "More then one test plan found!"
+ assert len(test_version) == 1, "More then one test version found!"
squad_metadata = OrderedDict()
test_plan = test_plan[0]
test_plan_name = os.path.splitext(os.path.basename(test_plan))[0]
- squad_metadata['job_id'] = '{}_{}'.format(test_plan_name, datetime.datetime.utcnow().isoformat())
- squad_metadata['test_plan'] = test_plan
- squad_metadata['test_version'] = test_version[0]
- for key, value in results[-1]['environment'].items():
- if key != 'packages':
+ squad_metadata["job_id"] = "{}_{}".format(
+ test_plan_name, datetime.datetime.utcnow().isoformat()
+ )
+ squad_metadata["test_plan"] = test_plan
+ squad_metadata["test_version"] = test_version[0]
+ for key, value in results[-1]["environment"].items():
+ if key != "packages":
squad_metadata[key] = value
return squad_metadata
@@ -79,31 +115,35 @@
auth_token = os.environ.get("SQUAD_AUTH_TOKEN")
assert auth_token, "SQUAD_AUTH_TOKEN not provided in environment"
- with open(args.result_file, 'r') as f:
+ with open(args.result_file, "r") as f:
results = json.load(f)
metadata = squad_metadata(results)
tests, metrics = squad_result(results)
- files = [('metadata', json.dumps(metadata)),
- ('tests', json.dumps(tests)),
- ('metrics', json.dumps(metrics)),
- ('attachment', open(args.result_file, 'rb'))]
+ files = [
+ ("metadata", json.dumps(metadata)),
+ ("tests", json.dumps(tests)),
+ ("metrics", json.dumps(metrics)),
+ ("attachment", open(args.result_file, "rb")),
+ ]
if args.attachment is not None:
for item in args.attachment:
if os.path.exists(item):
- logger.info('Adding {} to attachment list...'.format(item))
- files.append(tuple(['attachment', open(item, 'rb')]))
+ logger.info("Adding {} to attachment list...".format(item))
+ files.append(tuple(["attachment", open(item, "rb")]))
else:
- logger.info('Attachment %s Not found' % args.attachment)
- logger.info('Skipped uploading %s' % args.attachment)
- logger.debug('Data to post: %s' % files)
+ logger.info("Attachment %s Not found" % args.attachment)
+ logger.info("Skipped uploading %s" % args.attachment)
+ logger.debug("Data to post: %s" % files)
- project = args.project or metadata['linux_distribution']
- test_env = args.test_env or metadata['board_name']
- url = '{}/api/submit/{}/{}/{}/{}'.format(args.url, args.team, project, args.build, test_env)
- logger.info('Posting to {}'.format(url))
+ project = args.project or metadata["linux_distribution"]
+ test_env = args.test_env or metadata["board_name"]
+ url = "{}/api/submit/{}/{}/{}/{}".format(
+ args.url, args.team, project, args.build, test_env
+ )
+ logger.info("Posting to {}".format(url))
- headers = {'Auth-Token': auth_token}
+ headers = {"Auth-Token": auth_token}
r = requests.post(url, headers=headers, files=files)
print(r.text)
@@ -111,13 +151,15 @@
if __name__ == "__main__":
args = parse_args()
- logger = logging.getLogger('post-to-squad')
+ logger = logging.getLogger("post-to-squad")
logger.setLevel(logging.INFO)
if args.verbose:
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ )
ch.setFormatter(formatter)
logger.addHandler(ch)
diff --git a/automated/utils/test-runner.py b/automated/utils/test-runner.py
index f290ae8..c6d3a54 100755
--- a/automated/utils/test-runner.py
+++ b/automated/utils/test-runner.py
@@ -24,8 +24,8 @@
from squad_client.core.models import Squad
from urllib.parse import urlparse
except ImportError as e:
- logger = logging.getLogger('RUNNER')
- logger.warning('squad_client is needed if you want to upload to qa-reports')
+ logger = logging.getLogger("RUNNER")
+ logger.warning("squad_client is needed if you want to upload to qa-reports")
try:
@@ -33,15 +33,17 @@
import yaml
except ImportError as e:
print(e)
- print('Please run the below command to install modules required')
- print('pip3 install -r ${REPO_PATH}/automated/utils/requirements.txt')
+ print("Please run the below command to install modules required")
+ print("pip3 install -r ${REPO_PATH}/automated/utils/requirements.txt")
sys.exit(1)
class StoreDictKeyPair(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
self._nargs = nargs
- super(StoreDictKeyPair, self).__init__(option_strings, dest, nargs=nargs, **kwargs)
+ super(StoreDictKeyPair, self).__init__(
+ option_strings, dest, nargs=nargs, **kwargs
+ )
def __call__(self, parser, namespace, values, option_string=None):
my_dict = {}
@@ -65,12 +67,12 @@
if target:
run = 'ssh {} {} "{}"'.format(SSH_PARAMS, target, command)
- logger = logging.getLogger('RUNNER.run_command')
+ logger = logging.getLogger("RUNNER.run_command")
logger.debug(run)
if sys.version_info[0] < 3:
return subprocess.check_output(shlex.split(run)).strip()
else:
- return subprocess.check_output(shlex.split(run)).strip().decode('utf-8')
+ return subprocess.check_output(shlex.split(run)).strip().decode("utf-8")
class TestPlan(object):
@@ -83,41 +85,51 @@
self.test_plan = args.test_plan
self.timeout = args.timeout
self.skip_install = args.skip_install
- self.logger = logging.getLogger('RUNNER.TestPlan')
+ self.logger = logging.getLogger("RUNNER.TestPlan")
self.overlay = args.overlay
def apply_overlay(self, test_list):
fixed_test_list = copy.deepcopy(test_list)
- logger = logging.getLogger('RUNNER.TestPlan.Overlay')
+ logger = logging.getLogger("RUNNER.TestPlan.Overlay")
with open(self.overlay) as f:
data = yaml.load(f)
- if data.get('skip'):
- skip_tests = data['skip']
+ if data.get("skip"):
+ skip_tests = data["skip"]
for test in test_list:
for skip_test in skip_tests:
- if test['path'] == skip_test['path'] and test['repository'] == skip_test['repository']:
+ if (
+ test["path"] == skip_test["path"]
+ and test["repository"] == skip_test["repository"]
+ ):
fixed_test_list.remove(test)
logger.info("Skipped: {}".format(test))
else:
continue
- if data.get('amend'):
- amend_tests = data['amend']
+ if data.get("amend"):
+ amend_tests = data["amend"]
for test in fixed_test_list:
for amend_test in amend_tests:
- if test['path'] == amend_test['path'] and test['repository'] == skip_test['repository']:
- if amend_test.get('parameters'):
- if test.get('parameters'):
- test['parameters'].update(amend_test['parameters'])
+ if (
+ test["path"] == amend_test["path"]
+ and test["repository"] == skip_test["repository"]
+ ):
+ if amend_test.get("parameters"):
+ if test.get("parameters"):
+ test["parameters"].update(amend_test["parameters"])
else:
- test['parameters'] = amend_test['parameters']
- logger.info('Updated: {}'.format(test))
+ test["parameters"] = amend_test["parameters"]
+ logger.info("Updated: {}".format(test))
else:
- logger.warning("'parameters' not found in {}, nothing to amend.".format(amend_test))
+ logger.warning(
+ "'parameters' not found in {}, nothing to amend.".format(
+ amend_test
+ )
+ )
- if data.get('add'):
- add_tests = data['add']
+ if data.get("add"):
+ add_tests = data["add"]
unique_add_tests = []
for test in add_tests:
if test not in unique_add_tests:
@@ -126,13 +138,15 @@
logger.warning("Skipping duplicate test {}".format(test))
for test in test_list:
- del test['uuid']
+ del test["uuid"]
for add_test in unique_add_tests:
if add_test in test_list:
- logger.warning("{} already included in test plan, do nothing.".format(add_test))
+ logger.warning(
+ "{} already included in test plan, do nothing.".format(add_test)
+ )
else:
- add_test['uuid'] = str(uuid4())
+ add_test["uuid"] = str(uuid4())
fixed_test_list.append(add_test)
logger.info("Added: {}".format(add_test))
@@ -141,33 +155,35 @@
def test_list(self, kind="automated"):
if self.test_def:
if not os.path.exists(self.test_def):
- self.logger.error(' %s NOT found, exiting...' % self.test_def)
+ self.logger.error(" %s NOT found, exiting..." % self.test_def)
sys.exit(1)
- test_list = [{'path': self.test_def}]
- test_list[0]['uuid'] = str(uuid4())
- test_list[0]['timeout'] = self.timeout
- test_list[0]['skip_install'] = self.skip_install
+ test_list = [{"path": self.test_def}]
+ test_list[0]["uuid"] = str(uuid4())
+ test_list[0]["timeout"] = self.timeout
+ test_list[0]["skip_install"] = self.skip_install
elif self.test_plan:
if not os.path.exists(self.test_plan):
- self.logger.error(' %s NOT found, exiting...' % self.test_plan)
+ self.logger.error(" %s NOT found, exiting..." % self.test_plan)
sys.exit(1)
- with open(self.test_plan, 'r') as f:
+ with open(self.test_plan, "r") as f:
test_plan = yaml.safe_load(f)
try:
- plan_version = test_plan['metadata'].get('format')
- self.logger.info('Test plan version: {}'.format(plan_version))
+ plan_version = test_plan["metadata"].get("format")
+ self.logger.info("Test plan version: {}".format(plan_version))
tests = []
if plan_version == "Linaro Test Plan v2":
- tests = test_plan['tests'][kind]
+ tests = test_plan["tests"][kind]
elif plan_version == "Linaro Test Plan v1" or plan_version is None:
- for requirement in test_plan['requirements']:
- if 'tests' in requirement.keys():
- if requirement['tests'] and \
- kind in requirement['tests'].keys() and \
- requirement['tests'][kind]:
- for test in requirement['tests'][kind]:
+ for requirement in test_plan["requirements"]:
+ if "tests" in requirement.keys():
+ if (
+ requirement["tests"]
+ and kind in requirement["tests"].keys()
+ and requirement["tests"][kind]
+ ):
+ for test in requirement["tests"][kind]:
tests.append(test)
test_list = []
@@ -181,12 +197,12 @@
unique_tests.append(test_hash)
test_list.append(test)
for test in test_list:
- test['uuid'] = str(uuid4())
+ test["uuid"] = str(uuid4())
except KeyError as e:
self.logger.error("%s is missing from test plan" % str(e))
sys.exit(1)
else:
- self.logger.error('Plese specify a test or test plan.')
+ self.logger.error("Plese specify a test or test plan.")
sys.exit(1)
if self.overlay is None:
@@ -203,43 +219,47 @@
def __init__(self, test, args):
self.test = test
self.args = args
- self.logger = logging.getLogger('RUNNER.TestSetup')
+ self.logger = logging.getLogger("RUNNER.TestSetup")
self.test_kind = args.kind
- self.test_version = test.get('version', None)
+ self.test_version = test.get("version", None)
def validate_env(self):
# Inspect if environment set properly.
try:
- self.repo_path = os.environ['REPO_PATH']
+ self.repo_path = os.environ["REPO_PATH"]
except KeyError:
- self.logger.error('KeyError: REPO_PATH')
- self.logger.error("Please run '. ./bin/setenv.sh' to setup test environment")
+ self.logger.error("KeyError: REPO_PATH")
+ self.logger.error(
+ "Please run '. ./bin/setenv.sh' to setup test environment"
+ )
sys.exit(1)
def create_dir(self):
- if not os.path.exists(self.test['output']):
- os.makedirs(self.test['output'])
- self.logger.info('Output directory created: %s' % self.test['output'])
+ if not os.path.exists(self.test["output"]):
+ os.makedirs(self.test["output"])
+ self.logger.info("Output directory created: %s" % self.test["output"])
def copy_test_repo(self):
self.validate_env()
- shutil.rmtree(self.test['test_path'], ignore_errors=True)
- if self.repo_path in self.test['test_path']:
- self.logger.error("Cannot copy repository into itself. Please choose output directory outside repository path")
+ shutil.rmtree(self.test["test_path"], ignore_errors=True)
+ if self.repo_path in self.test["test_path"]:
+ self.logger.error(
+ "Cannot copy repository into itself. Please choose output directory outside repository path"
+ )
sys.exit(1)
- shutil.copytree(self.repo_path, self.test['test_path'], symlinks=True)
- self.logger.info('Test repo copied to: %s' % self.test['test_path'])
+ shutil.copytree(self.repo_path, self.test["test_path"], symlinks=True)
+ self.logger.info("Test repo copied to: %s" % self.test["test_path"])
def checkout_version(self):
if self.test_version:
path = os.getcwd()
- os.chdir(self.test['test_path'])
+ os.chdir(self.test["test_path"])
subprocess.call("git checkout %s" % self.test_version, shell=True)
os.chdir(path)
def create_uuid_file(self):
- with open('%s/uuid' % self.test['test_path'], 'w') as f:
- f.write(self.test['uuid'])
+ with open("%s/uuid" % self.test["test_path"], "w") as f:
+ f.write(self.test["uuid"])
class TestDefinition(object):
@@ -250,22 +270,24 @@
def __init__(self, test, args):
self.test = test
self.args = args
- self.logger = logging.getLogger('RUNNER.TestDef')
+ self.logger = logging.getLogger("RUNNER.TestDef")
self.skip_install = args.skip_install
self.is_manual = False
- if 'skip_install' in test:
- self.skip_install = test['skip_install']
+ if "skip_install" in test:
+ self.skip_install = test["skip_install"]
self.custom_params = None
- if 'parameters' in test:
- self.custom_params = test['parameters']
- if 'params' in test:
- self.custom_params = test['params']
+ if "parameters" in test:
+ self.custom_params = test["parameters"]
+ if "params" in test:
+ self.custom_params = test["params"]
self.exists = False
- if os.path.isfile(self.test['path']):
+ if os.path.isfile(self.test["path"]):
self.exists = True
- with open(self.test['path'], 'r') as f:
+ with open(self.test["path"], "r") as f:
self.testdef = yaml.safe_load(f)
- if self.testdef['metadata']['format'].startswith("Manual Test Definition"):
+ if self.testdef["metadata"]["format"].startswith(
+ "Manual Test Definition"
+ ):
self.is_manual = True
if self.is_manual:
self.runner = ManualTestRun(test, args)
@@ -275,82 +297,90 @@
self.runner = AutomatedTestRun(test, args)
def definition(self):
- with open('%s/testdef.yaml' % self.test['test_path'], 'wb') as f:
- f.write(yaml.dump(self.testdef, encoding='utf-8', allow_unicode=True))
+ with open("%s/testdef.yaml" % self.test["test_path"], "wb") as f:
+ f.write(yaml.dump(self.testdef, encoding="utf-8", allow_unicode=True))
def metadata(self):
- with open('%s/testdef_metadata' % self.test['test_path'], 'wb') as f:
- f.write(yaml.dump(self.testdef['metadata'], encoding='utf-8', allow_unicode=True))
+ with open("%s/testdef_metadata" % self.test["test_path"], "wb") as f:
+ f.write(
+ yaml.dump(
+ self.testdef["metadata"], encoding="utf-8", allow_unicode=True
+ )
+ )
def mkrun(self):
if not self.is_manual:
- with open('%s/run.sh' % self.test['test_path'], 'a') as f:
- f.write('#!/bin/sh\n')
+ with open("%s/run.sh" % self.test["test_path"], "a") as f:
+ f.write("#!/bin/sh\n")
self.parameters = self.handle_parameters()
if self.parameters:
for line in self.parameters:
f.write(line)
- f.write('set -e\n')
- f.write('set -x\n')
- f.write('export TESTRUN_ID=%s\n' % self.testdef['metadata']['name'])
+ f.write("set -e\n")
+ f.write("set -x\n")
+ f.write("export TESTRUN_ID=%s\n" % self.testdef["metadata"]["name"])
if self.args.target is None:
- f.write('cd %s\n' % (self.test['test_path']))
+ f.write("cd %s\n" % (self.test["test_path"]))
else:
- f.write('cd %s\n' % (self.test['target_test_path']))
- f.write('UUID=`cat uuid`\n')
+ f.write("cd %s\n" % (self.test["target_test_path"]))
+ f.write("UUID=`cat uuid`\n")
f.write('echo "<STARTRUN $TESTRUN_ID $UUID>"\n')
- f.write('export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin\n')
- steps = self.testdef['run'].get('steps', [])
+ f.write(
+ "export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin\n"
+ )
+ steps = self.testdef["run"].get("steps", [])
if steps:
for step in steps:
command = step
- if '--cmd' in step or '--shell' in step:
- command = re.sub(r'\$(\d+)\b', r'\\$\1', step)
- f.write('%s\n' % command)
+ if "--cmd" in step or "--shell" in step:
+ command = re.sub(r"\$(\d+)\b", r"\\$\1", step)
+ f.write("%s\n" % command)
f.write('echo "<ENDRUN $TESTRUN_ID $UUID>"\n')
- os.chmod('%s/run.sh' % self.test['test_path'], 0o755)
+ os.chmod("%s/run.sh" % self.test["test_path"], 0o755)
def run(self):
self.runner.run()
def handle_parameters(self):
- ret_val = ['###default parameters from test definition###\n']
+ ret_val = ["###default parameters from test definition###\n"]
- if 'params' in self.testdef:
- for def_param_name, def_param_value in list(self.testdef['params'].items()):
+ if "params" in self.testdef:
+ for def_param_name, def_param_value in list(self.testdef["params"].items()):
# ?'yaml_line'
- if def_param_name == 'yaml_line':
+ if def_param_name == "yaml_line":
continue
- ret_val.append('%s=\'%s\'\n' % (def_param_name, def_param_value))
- elif 'parameters' in self.testdef:
- for def_param_name, def_param_value in list(self.testdef['parameters'].items()):
- if def_param_name == 'yaml_line':
+ ret_val.append("%s='%s'\n" % (def_param_name, def_param_value))
+ elif "parameters" in self.testdef:
+ for def_param_name, def_param_value in list(
+ self.testdef["parameters"].items()
+ ):
+ if def_param_name == "yaml_line":
continue
- ret_val.append('%s=\'%s\'\n' % (def_param_name, def_param_value))
+ ret_val.append("%s='%s'\n" % (def_param_name, def_param_value))
else:
return None
- ret_val.append('######\n')
+ ret_val.append("######\n")
- ret_val.append('###custom parameters from test plan###\n')
+ ret_val.append("###custom parameters from test plan###\n")
if self.custom_params:
for param_name, param_value in list(self.custom_params.items()):
- if param_name == 'yaml_line':
+ if param_name == "yaml_line":
continue
- ret_val.append('%s=\'%s\'\n' % (param_name, param_value))
+ ret_val.append("%s='%s'\n" % (param_name, param_value))
if self.skip_install:
ret_val.append('SKIP_INSTALL="True"\n')
- ret_val.append('######\n')
+ ret_val.append("######\n")
- ret_val.append('###custom parameters from command line###\n')
+ ret_val.append("###custom parameters from command line###\n")
if self.args.test_def_params:
for param_name, param_value in self.args.test_def_params.items():
- ret_val.append('%s=\'%s\'\n' % (param_name, param_value))
- ret_val.append('######\n')
+ ret_val.append("%s='%s'\n" % (param_name, param_value))
+ ret_val.append("######\n")
return ret_val
@@ -358,10 +388,10 @@
def __init__(self, test, args):
self.test = test
self.args = args
- self.logger = logging.getLogger('RUNNER.TestRun')
+ self.logger = logging.getLogger("RUNNER.TestRun")
self.test_timeout = self.args.timeout
- if 'timeout' in test:
- self.test_timeout = test['timeout']
+ if "timeout" in test:
+ self.test_timeout = test["timeout"]
def run(self):
raise NotImplementedError
@@ -372,62 +402,86 @@
class AutomatedTestRun(TestRun):
def run(self):
- self.logger.info('Executing %s/run.sh' % self.test['test_path'])
- shell_cmd = '%s/run.sh 2>&1 | tee %s/stdout.log' % (self.test['test_path'], self.test['test_path'])
- self.child = pexpect.spawnu('/bin/sh', ['-c', shell_cmd])
+ self.logger.info("Executing %s/run.sh" % self.test["test_path"])
+ shell_cmd = "%s/run.sh 2>&1 | tee %s/stdout.log" % (
+ self.test["test_path"],
+ self.test["test_path"],
+ )
+ self.child = pexpect.spawnu("/bin/sh", ["-c", shell_cmd])
self.check_result()
def check_result(self):
if self.test_timeout:
- self.logger.info('Test timeout: %s' % self.test_timeout)
+ self.logger.info("Test timeout: %s" % self.test_timeout)
test_end = time.time() + self.test_timeout
while self.child.isalive():
if self.test_timeout and time.time() > test_end:
- self.logger.warning('%s test timed out, killing test process...' % self.test['test_uuid'])
+ self.logger.warning(
+ "%s test timed out, killing test process..."
+ % self.test["test_uuid"]
+ )
self.child.terminate(force=True)
break
try:
- self.child.expect('\r\n')
+ self.child.expect("\r\n")
print(self.child.before)
except pexpect.TIMEOUT:
continue
except pexpect.EOF:
- self.logger.info('%s test finished.\n' % self.test['test_uuid'])
+ self.logger.info("%s test finished.\n" % self.test["test_uuid"])
break
class RemoteTestRun(AutomatedTestRun):
def copy_to_target(self):
- os.chdir(self.test['test_path'])
+ os.chdir(self.test["test_path"])
tarball_name = "target-test-files.tar"
self.logger.info("Archiving test files")
run_command(
- 'tar -caf %s run.sh uuid automated/lib automated/bin automated/utils %s' %
- (tarball_name, self.test['tc_relative_dir']))
+ "tar -caf %s run.sh uuid automated/lib automated/bin automated/utils %s"
+ % (tarball_name, self.test["tc_relative_dir"])
+ )
self.logger.info("Creating test path")
- run_command("mkdir -p %s" % (self.test['target_test_path']), self.args.target)
+ run_command("mkdir -p %s" % (self.test["target_test_path"]), self.args.target)
self.logger.info("Copying test archive to target host")
- run_command('scp %s ./%s %s:%s' % (SSH_PARAMS, tarball_name, self.args.target,
- self.test['target_test_path']))
+ run_command(
+ "scp %s ./%s %s:%s"
+ % (
+ SSH_PARAMS,
+ tarball_name,
+ self.args.target,
+ self.test["target_test_path"],
+ )
+ )
self.logger.info("Unarchiving test files on target")
- run_command("cd %s && tar -xf %s" % (self.test['target_test_path'],
- tarball_name), self.args.target)
+ run_command(
+ "cd %s && tar -xf %s" % (self.test["target_test_path"], tarball_name),
+ self.args.target,
+ )
self.logger.info("Removing test file archive from target")
- run_command("rm %s/%s" % (self.test['target_test_path'],
- tarball_name), self.args.target)
+ run_command(
+ "rm %s/%s" % (self.test["target_test_path"], tarball_name), self.args.target
+ )
def run(self):
self.copy_to_target()
- self.logger.info('Executing %s/run.sh remotely on %s' % (self.test['target_test_path'], self.args.target))
- shell_cmd = 'ssh %s %s "%s/run.sh 2>&1"' % (SSH_PARAMS, self.args.target, self.test['target_test_path'])
- self.logger.debug('shell_cmd: %s' % shell_cmd)
- output = open("%s/stdout.log" % self.test['test_path'], "w")
+ self.logger.info(
+ "Executing %s/run.sh remotely on %s"
+ % (self.test["target_test_path"], self.args.target)
+ )
+ shell_cmd = 'ssh %s %s "%s/run.sh 2>&1"' % (
+ SSH_PARAMS,
+ self.args.target,
+ self.test["target_test_path"],
+ )
+ self.logger.debug("shell_cmd: %s" % shell_cmd)
+ output = open("%s/stdout.log" % self.test["test_path"], "w")
self.child = pexpect.spawnu(shell_cmd)
self.child.logfile = output
self.check_result()
@@ -440,9 +494,12 @@
self.test_case_id = test_case_id
self.result_path = result_path
self.current_step_index = 0
- self.steps = self.test_dict['run']['steps']
- self.expected = self.test_dict['run']['expected']
- self.prompt = "%s[%s] > " % (self.test_dict['metadata']['name'], self.test_case_id)
+ self.steps = self.test_dict["run"]["steps"]
+ self.expected = self.test_dict["run"]["expected"]
+ self.prompt = "%s[%s] > " % (
+ self.test_dict["metadata"]["name"],
+ self.test_case_id,
+ )
self.result = None
self.intro = """
Welcome to manual test executor. Type 'help' for available commands.
@@ -462,7 +519,9 @@
if line.find("-f") >= 0:
self._record_result("skip")
return True
- print("Test result not recorded. Use -f to force. Forced quit records result as 'skip'")
+ print(
+ "Test result not recorded. Use -f to force. Forced quit records result as 'skip'"
+ )
do_EOF = do_quit
@@ -470,7 +529,7 @@
"""
Prints current test overall description
"""
- print(self.test_dict['metadata']['description'])
+ print(self.test_dict["metadata"]["description"])
def do_steps(self, line):
"""
@@ -508,8 +567,10 @@
def _record_result(self, result):
print("Recording %s in %s/stdout.log" % (result, self.result_path))
with open("%s/stdout.log" % self.result_path, "a") as f:
- f.write("<LAVA_SIGNAL_TESTCASE TEST_CASE_ID=%s RESULT=%s>" %
- (self.test_case_id, result))
+ f.write(
+ "<LAVA_SIGNAL_TESTCASE TEST_CASE_ID=%s RESULT=%s>"
+ % (self.test_case_id, result)
+ )
def do_pass(self, line):
"""
@@ -538,16 +599,16 @@
class ManualTestRun(TestRun, cmd.Cmd):
def run(self):
- print(self.test['test_name'])
- with open('%s/testdef.yaml' % self.test['test_path'], 'r') as f:
+ print(self.test["test_name"])
+ with open("%s/testdef.yaml" % self.test["test_path"], "r") as f:
self.testdef = yaml.safe_load(f)
- if 'name' in self.test:
- test_case_id = self.test['name']
+ if "name" in self.test:
+ test_case_id = self.test["name"]
else:
- test_case_id = self.testdef['metadata']['name']
+ test_case_id = self.testdef["metadata"]["name"]
- ManualTestShell(self.testdef, self.test['test_path'], test_case_id).cmdloop()
+ ManualTestShell(self.testdef, self.test["test_path"], test_case_id).cmdloop()
def check_result(self):
pass
@@ -575,17 +636,25 @@
]
"""
- logger = logging.getLogger('RUNNER.get_packages')
+ logger = logging.getLogger("RUNNER.get_packages")
packages = []
- if linux_distribution in ['debian', 'ubuntu']:
+ if linux_distribution in ["debian", "ubuntu"]:
# Debian (apt) based system
- packages = run_command("dpkg-query -W -f '${package}-${version}\n'", target).splitlines()
+ packages = run_command(
+ "dpkg-query -W -f '${package}-${version}\n'", target
+ ).splitlines()
- elif linux_distribution in ['centos', 'fedora']:
+ elif linux_distribution in ["centos", "fedora"]:
# RedHat (rpm) based system
- packages = run_command("rpm -qa --qf '%{NAME}-%{VERSION}-%{RELEASE}\n'", target).splitlines()
+ packages = run_command(
+ "rpm -qa --qf '%{NAME}-%{VERSION}-%{RELEASE}\n'", target
+ ).splitlines()
else:
- logger.warning("Unknown linux distribution '{}'; package list not populated.".format(linux_distribution))
+ logger.warning(
+ "Unknown linux distribution '{}'; package list not populated.".format(
+ linux_distribution
+ )
+ )
packages.sort()
return packages
@@ -619,43 +688,52 @@
if skip_collection:
return environment
try:
- environment['linux_distribution'] = run_command(
- "grep ^ID= /etc/os-release", target).split('=')[-1].strip('"').lower()
+ environment["linux_distribution"] = (
+ run_command("grep ^ID= /etc/os-release", target)
+ .split("=")[-1]
+ .strip('"')
+ .lower()
+ )
except subprocess.CalledProcessError:
- environment['linux_distribution'] = ""
+ environment["linux_distribution"] = ""
try:
- environment['kernel'] = run_command("uname -r", target)
+ environment["kernel"] = run_command("uname -r", target)
except subprocess.CalledProcessError:
- environment['kernel'] = ""
+ environment["kernel"] = ""
try:
- environment['uname'] = run_command("uname -a", target)
+ environment["uname"] = run_command("uname -a", target)
except subprocess.CalledProcessError:
- environment['uname'] = ""
+ environment["uname"] = ""
try:
- environment['bios_version'] = run_command(
- "cat /sys/devices/virtual/dmi/id/bios_version", target)
+ environment["bios_version"] = run_command(
+ "cat /sys/devices/virtual/dmi/id/bios_version", target
+ )
except subprocess.CalledProcessError:
- environment['bios_version'] = ""
+ environment["bios_version"] = ""
try:
- environment['board_vendor'] = run_command(
- "cat /sys/devices/virtual/dmi/id/board_vendor", target)
+ environment["board_vendor"] = run_command(
+ "cat /sys/devices/virtual/dmi/id/board_vendor", target
+ )
except subprocess.CalledProcessError:
- environment['board_vendor'] = ""
+ environment["board_vendor"] = ""
try:
- environment['board_name'] = run_command(
- "cat /sys/devices/virtual/dmi/id/board_name", target)
+ environment["board_name"] = run_command(
+ "cat /sys/devices/virtual/dmi/id/board_name", target
+ )
except subprocess.CalledProcessError:
- environment['board_name'] = ""
+ environment["board_name"] = ""
try:
- environment['packages'] = get_packages(environment['linux_distribution'], target)
+ environment["packages"] = get_packages(
+ environment["linux_distribution"], target
+ )
except subprocess.CalledProcessError:
- environment['packages'] = []
+ environment["packages"] = []
return environment
@@ -665,20 +743,23 @@
self.args = args
self.metrics = []
self.results = {}
- self.results['test'] = test['test_name']
- self.results['id'] = test['test_uuid']
- self.results['test_plan'] = args.test_plan
- self.results['environment'] = get_environment(
- target=self.args.target, skip_collection=self.args.skip_environment)
- self.logger = logging.getLogger('RUNNER.ResultParser')
- self.results['params'] = {}
+ self.results["test"] = test["test_name"]
+ self.results["id"] = test["test_uuid"]
+ self.results["test_plan"] = args.test_plan
+ self.results["environment"] = get_environment(
+ target=self.args.target, skip_collection=self.args.skip_environment
+ )
+ self.logger = logging.getLogger("RUNNER.ResultParser")
+ self.results["params"] = {}
self.pattern = None
self.fixup = None
self.qa_reports_server = args.qa_reports_server
if args.qa_reports_token is not None:
self.qa_reports_token = args.qa_reports_token
else:
- self.qa_reports_token = os.environ.get("QA_REPORTS_TOKEN", get_token_from_netrc(self.qa_reports_server))
+ self.qa_reports_token = os.environ.get(
+ "QA_REPORTS_TOKEN", get_token_from_netrc(self.qa_reports_server)
+ )
self.qa_reports_project = args.qa_reports_project
self.qa_reports_group = args.qa_reports_group
self.qa_reports_env = args.qa_reports_env
@@ -687,41 +768,52 @@
self.qa_reports_metadata = args.qa_reports_metadata
self.qa_reports_metadata_file = args.qa_reports_metadata_file
- with open(os.path.join(self.test['test_path'], "testdef.yaml"), "r") as f:
+ with open(os.path.join(self.test["test_path"], "testdef.yaml"), "r") as f:
self.testdef = yaml.safe_load(f)
- self.results['name'] = ""
- if 'metadata' in self.testdef.keys() and \
- 'name' in self.testdef['metadata'].keys():
- self.results['name'] = self.testdef['metadata']['name']
- if 'params' in self.testdef.keys():
- self.results['params'] = self.testdef['params']
+ self.results["name"] = ""
+ if (
+ "metadata" in self.testdef.keys()
+ and "name" in self.testdef["metadata"].keys()
+ ):
+ self.results["name"] = self.testdef["metadata"]["name"]
+ if "params" in self.testdef.keys():
+ self.results["params"] = self.testdef["params"]
if self.args.test_def_params:
for param_name, param_value in self.args.test_def_params.items():
- self.results['params'][param_name] = param_value
- if 'parse' in self.testdef.keys() and 'pattern' in self.testdef['parse'].keys():
- self.pattern = self.testdef['parse']['pattern']
+ self.results["params"][param_name] = param_value
+ if (
+ "parse" in self.testdef.keys()
+ and "pattern" in self.testdef["parse"].keys()
+ ):
+ self.pattern = self.testdef["parse"]["pattern"]
self.logger.info("Enabling log parse pattern: %s" % self.pattern)
- if 'fixupdict' in self.testdef['parse'].keys():
- self.fixup = self.testdef['parse']['fixupdict']
- self.logger.info("Enabling log parse pattern fixup: %s" % self.fixup)
- if 'parameters' in test.keys():
- self.results['params'].update(test['parameters'])
- if 'params' in test.keys():
- self.results['params'].update(test['params'])
- if 'version' in test.keys():
- self.results['version'] = test['version']
+ if "fixupdict" in self.testdef["parse"].keys():
+ self.fixup = self.testdef["parse"]["fixupdict"]
+ self.logger.info(
+ "Enabling log parse pattern fixup: %s" % self.fixup
+ )
+ if "parameters" in test.keys():
+ self.results["params"].update(test["parameters"])
+ if "params" in test.keys():
+ self.results["params"].update(test["params"])
+ if "version" in test.keys():
+ self.results["version"] = test["version"]
else:
path = os.getcwd()
- os.chdir(self.test['test_path'])
+ os.chdir(self.test["test_path"])
if sys.version_info[0] < 3:
test_version = subprocess.check_output("git rev-parse HEAD", shell=True)
else:
- test_version = subprocess.check_output("git rev-parse HEAD", shell=True).decode('utf-8')
- self.results['version'] = test_version.rstrip()
+ test_version = subprocess.check_output(
+ "git rev-parse HEAD", shell=True
+ ).decode("utf-8")
+ self.results["version"] = test_version.rstrip()
os.chdir(path)
self.lava_run = args.lava_run
- if self.lava_run and not find_executable('lava-test-case'):
- self.logger.info("lava-test-case not found, '-l' or '--lava_run' option ignored'")
+ if self.lava_run and not find_executable("lava-test-case"):
+ self.logger.info(
+ "lava-test-case not found, '-l' or '--lava_run' option ignored'"
+ )
self.lava_run = False
def run(self):
@@ -730,29 +822,38 @@
self.parse_pattern()
# If 'metrics' is empty, add 'no-result-found fail'.
if not self.metrics:
- self.metrics = [{'test_case_id': 'no-result-found', 'result': 'fail', 'measurement': '', 'units': ''}]
- self.results['metrics'] = self.metrics
+ self.metrics = [
+ {
+ "test_case_id": "no-result-found",
+ "result": "fail",
+ "measurement": "",
+ "units": "",
+ }
+ ]
+ self.results["metrics"] = self.metrics
self.dict_to_json()
self.dict_to_csv()
self.send_to_qa_reports()
- self.logger.info('Result files saved to: %s' % self.test['test_path'])
- print('--- Printing result.csv ---')
- with open('%s/result.csv' % self.test['test_path']) as f:
+ self.logger.info("Result files saved to: %s" % self.test["test_path"])
+ print("--- Printing result.csv ---")
+ with open("%s/result.csv" % self.test["test_path"]) as f:
print(f.read())
def parse_stdout(self):
- with open('%s/stdout.log' % self.test['test_path'], 'r') as f:
+ with open("%s/stdout.log" % self.test["test_path"], "r") as f:
test_case_re = re.compile("TEST_CASE_ID=(.*)")
result_re = re.compile("RESULT=(.*)")
measurement_re = re.compile("MEASUREMENT=(.*)")
units_re = re.compile("UNITS=(.*)")
for line in f:
- if re.match(r'\<(|LAVA_SIGNAL_TESTCASE )TEST_CASE_ID=.*', line):
- line = line.strip('\n').strip('\r').strip('<>').split(' ')
- data = {'test_case_id': '',
- 'result': '',
- 'measurement': '',
- 'units': ''}
+ if re.match(r"\<(|LAVA_SIGNAL_TESTCASE )TEST_CASE_ID=.*", line):
+ line = line.strip("\n").strip("\r").strip("<>").split(" ")
+ data = {
+ "test_case_id": "",
+ "result": "",
+ "measurement": "",
+ "units": "",
+ }
for string in line:
test_case_match = test_case_re.match(string)
@@ -760,16 +861,16 @@
measurement_match = measurement_re.match(string)
units_match = units_re.match(string)
if test_case_match:
- data['test_case_id'] = test_case_match.group(1)
+ data["test_case_id"] = test_case_match.group(1)
if result_match:
- data['result'] = result_match.group(1)
+ data["result"] = result_match.group(1)
if measurement_match:
try:
- data['measurement'] = float(measurement_match.group(1))
+ data["measurement"] = float(measurement_match.group(1))
except ValueError as e:
pass
if units_match:
- data['units'] = units_match.group(1)
+ data["units"] = units_match.group(1)
self.metrics.append(data.copy())
@@ -777,18 +878,18 @@
self.send_to_lava(data)
def parse_pattern(self):
- with open('%s/stdout.log' % self.test['test_path'], 'r') as f:
- rex_pattern = re.compile(r'%s' % self.pattern)
+ with open("%s/stdout.log" % self.test["test_path"], "r") as f:
+ rex_pattern = re.compile(r"%s" % self.pattern)
for line in f:
data = {}
m = rex_pattern.search(line)
if m:
data = m.groupdict()
- for x in ['measurement', 'units']:
+ for x in ["measurement", "units"]:
if x not in data:
- data[x] = ''
- if self.fixup and data['result'] in self.fixup:
- data['result'] = self.fixup[data['result']]
+ data[x] = ""
+ if self.fixup and data["result"] in self.fixup:
+ data["result"] = self.fixup[data["result"]]
self.metrics.append(data.copy())
@@ -796,29 +897,44 @@
self.send_to_lava(data)
def send_to_lava(self, data):
- cmd = 'lava-test-case {} --result {}'.format(data['test_case_id'], data['result'])
- if data['measurement']:
- cmd = '{} --measurement {} --units {}'.format(cmd, data['measurement'], data['units'])
- self.logger.debug('lava-run: cmd: {}'.format(cmd))
+ cmd = "lava-test-case {} --result {}".format(
+ data["test_case_id"], data["result"]
+ )
+ if data["measurement"]:
+ cmd = "{} --measurement {} --units {}".format(
+ cmd, data["measurement"], data["units"]
+ )
+ self.logger.debug("lava-run: cmd: {}".format(cmd))
subprocess.call(shlex.split(cmd))
def send_to_qa_reports(self):
- if None in (self.qa_reports_server, self.qa_reports_token, self.qa_reports_group, self.qa_reports_project, self.qa_reports_build_version, self.qa_reports_env):
- self.logger.warning("All parameters for qa reports are not set, results will not be pushed to qa reports")
+ if None in (
+ self.qa_reports_server,
+ self.qa_reports_token,
+ self.qa_reports_group,
+ self.qa_reports_project,
+ self.qa_reports_build_version,
+ self.qa_reports_env,
+ ):
+ self.logger.warning(
+ "All parameters for qa reports are not set, results will not be pushed to qa reports"
+ )
return
- SquadApi.configure(
- url=self.qa_reports_server, token=self.qa_reports_token
- )
+ SquadApi.configure(url=self.qa_reports_server, token=self.qa_reports_token)
tests = {}
metrics = {}
for metric in self.metrics:
- if metric['measurement'] != "":
- metrics["{}/{}".format(self.test['test_name'], metric['test_case_id'])] = metric['measurement']
+ if metric["measurement"] != "":
+ metrics[
+ "{}/{}".format(self.test["test_name"], metric["test_case_id"])
+ ] = metric["measurement"]
else:
- tests["{}/{}".format(self.test['test_name'], metric['test_case_id'])] = metric['result']
+ tests[
+ "{}/{}".format(self.test["test_name"], metric["test_case_id"])
+ ] = metric["result"]
- with open("{}/stdout.log".format(self.test['test_path']), "r") as logfile:
+ with open("{}/stdout.log".format(self.test["test_path"]), "r") as logfile:
log = logfile.read()
metadata = {}
@@ -828,7 +944,9 @@
if self.qa_reports_metadata_file:
try:
with open(self.qa_reports_metadata_file, "r") as metadata_file:
- loaded_metadata = yaml.load(metadata_file, Loader=yaml.SafeLoader)
+ loaded_metadata = yaml.load(
+ metadata_file, Loader=yaml.SafeLoader
+ )
# check if loaded metadata is key=value and both are strings
for key, value in loaded_metadata.items():
if type(key) == str and type(value) == str:
@@ -840,63 +958,75 @@
except FileNotFoundError:
self.logger.warning("Metadata file not found")
except PermissionError:
- self.logger.warning("Insufficient permissions to open metadata file")
+ self.logger.warning(
+ "Insufficient permissions to open metadata file"
+ )
if submit_results(
- group_project_slug="{}/{}".format(self.qa_reports_group, self.qa_reports_project),
- build_version=self.qa_reports_build_version,
- env_slug=self.qa_reports_env,
- tests=tests,
- metrics=metrics,
- log=log,
- metadata=metadata,
- attachments=None):
+ group_project_slug="{}/{}".format(
+ self.qa_reports_group, self.qa_reports_project
+ ),
+ build_version=self.qa_reports_build_version,
+ env_slug=self.qa_reports_env,
+ tests=tests,
+ metrics=metrics,
+ log=log,
+ metadata=metadata,
+ attachments=None,
+ ):
self.logger.info("Results pushed to QA Reports")
else:
self.logger.warning("Results upload to QA Reports failed!")
def dict_to_json(self):
# Save test results to output/test_id/result.json
- with open('%s/result.json' % self.test['test_path'], 'w') as f:
+ with open("%s/result.json" % self.test["test_path"], "w") as f:
json.dump([self.results], f, indent=4)
# Collect test results of all tests in output/result.json
feeds = []
- if os.path.isfile('%s/result.json' % self.test['output']):
- with open('%s/result.json' % self.test['output'], 'r') as f:
+ if os.path.isfile("%s/result.json" % self.test["output"]):
+ with open("%s/result.json" % self.test["output"], "r") as f:
feeds = json.load(f)
feeds.append(self.results)
- with open('%s/result.json' % self.test['output'], 'w') as f:
+ with open("%s/result.json" % self.test["output"], "w") as f:
json.dump(feeds, f, indent=4)
def dict_to_csv(self):
# Convert dict self.results['params'] to a string.
- test_params = ''
- if self.results['params']:
- params_dict = self.results['params']
- test_params = ';'.join(['%s=%s' % (k, v) for k, v in params_dict.items()])
+ test_params = ""
+ if self.results["params"]:
+ params_dict = self.results["params"]
+ test_params = ";".join(["%s=%s" % (k, v) for k, v in params_dict.items()])
- for metric in self.results['metrics']:
- metric['name'] = self.results['name']
- metric['test_params'] = test_params
+ for metric in self.results["metrics"]:
+ metric["name"] = self.results["name"]
+ metric["test_params"] = test_params
# Save test results to output/test_id/result.csv
- fieldnames = ['name', 'test_case_id', 'result', 'measurement', 'units', 'test_params']
- with open('%s/result.csv' % self.test['test_path'], 'w') as f:
+ fieldnames = [
+ "name",
+ "test_case_id",
+ "result",
+ "measurement",
+ "units",
+ "test_params",
+ ]
+ with open("%s/result.csv" % self.test["test_path"], "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
- for metric in self.results['metrics']:
+ for metric in self.results["metrics"]:
writer.writerow(metric)
# Collect test results of all tests in output/result.csv
- if not os.path.isfile('%s/result.csv' % self.test['output']):
- with open('%s/result.csv' % self.test['output'], 'w') as f:
+ if not os.path.isfile("%s/result.csv" % self.test["output"]):
+ with open("%s/result.csv" % self.test["output"], "w") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
- with open('%s/result.csv' % self.test['output'], 'a') as f:
+ with open("%s/result.csv" % self.test["output"], "a") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
- for metric in self.results['metrics']:
+ for metric in self.results["metrics"]:
writer.writerow(metric)
@@ -915,62 +1045,140 @@
def get_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
- parser.add_argument('-o', '--output', default=os.getenv("HOME", "") + '/output', dest='output',
- help=textwrap.dedent('''\
+ parser.add_argument(
+ "-o",
+ "--output",
+ default=os.getenv("HOME", "") + "/output",
+ dest="output",
+ help=textwrap.dedent(
+ """\
specify a directory to store test and result files.
Default: $HOME/output
- '''))
- parser.add_argument('-p', '--test_plan', default=None, dest='test_plan',
- help=textwrap.dedent('''\
+ """
+ ),
+ )
+ parser.add_argument(
+ "-p",
+ "--test_plan",
+ default=None,
+ dest="test_plan",
+ help=textwrap.dedent(
+ """\
specify an test plan file which has tests and related
params listed in yaml format.
- '''))
- parser.add_argument('-d', '--test_def', default=None, dest='test_def',
- help=textwrap.dedent('''\
+ """
+ ),
+ )
+ parser.add_argument(
+ "-d",
+ "--test_def",
+ default=None,
+ dest="test_def",
+ help=textwrap.dedent(
+ """\
base on test definition repo location, specify relative
path to the test definition to run.
Format example: "ubuntu/smoke-tests-basic.yaml"
- '''))
- parser.add_argument('-r', '--test_def_params', default={}, dest='test_def_params',
- action=StoreDictKeyPair, nargs="+", metavar="KEY=VALUE",
- help=textwrap.dedent('''\
+ """
+ ),
+ )
+ parser.add_argument(
+ "-r",
+ "--test_def_params",
+ default={},
+ dest="test_def_params",
+ action=StoreDictKeyPair,
+ nargs="+",
+ metavar="KEY=VALUE",
+ help=textwrap.dedent(
+ """\
Set additional parameters when using test definition without
a test plan. The name values are set similarily to environment
variables:
--test_def_params KEY1=VALUE1 KEY2=VALUE2 ...
- '''))
- parser.add_argument('-k', '--kind', default="automated", dest='kind',
- choices=['automated', 'manual'],
- help=textwrap.dedent('''\
+ """
+ ),
+ )
+ parser.add_argument(
+ "-k",
+ "--kind",
+ default="automated",
+ dest="kind",
+ choices=["automated", "manual"],
+ help=textwrap.dedent(
+ """\
Selects type of tests to be executed from the test plan.
Possible options: automated, manual
- '''))
- parser.add_argument('-t', '--timeout', type=int, default=None,
- dest='timeout', help='Specify test timeout')
- parser.add_argument('-g', '--target', default=None,
- dest='target', help=textwrap.dedent('''\
+ """
+ ),
+ )
+ parser.add_argument(
+ "-t",
+ "--timeout",
+ type=int,
+ default=None,
+ dest="timeout",
+ help="Specify test timeout",
+ )
+ parser.add_argument(
+ "-g",
+ "--target",
+ default=None,
+ dest="target",
+ help=textwrap.dedent(
+ """\
Specify SSH target to execute tests.
Format: user@host
Note: ssh authentication must be paswordless
- '''))
- parser.add_argument('-s', '--skip_install', dest='skip_install',
- default=False, action='store_true',
- help='skip install section defined in test definition.')
- parser.add_argument('-e', '--skip_environment', dest='skip_environment',
- default=False, action='store_true',
- help='skip environmental data collection (board name, distro, etc)')
- parser.add_argument('-l', '--lava_run', dest='lava_run',
- default=False, action='store_true',
- help='send test result to LAVA with lava-test-case.')
- parser.add_argument('-O', '--overlay', default=None,
- dest='overlay', help=textwrap.dedent('''\
+ """
+ ),
+ )
+ parser.add_argument(
+ "-s",
+ "--skip_install",
+ dest="skip_install",
+ default=False,
+ action="store_true",
+ help="skip install section defined in test definition.",
+ )
+ parser.add_argument(
+ "-e",
+ "--skip_environment",
+ dest="skip_environment",
+ default=False,
+ action="store_true",
+ help="skip environmental data collection (board name, distro, etc)",
+ )
+ parser.add_argument(
+ "-l",
+ "--lava_run",
+ dest="lava_run",
+ default=False,
+ action="store_true",
+ help="send test result to LAVA with lava-test-case.",
+ )
+ parser.add_argument(
+ "-O",
+ "--overlay",
+ default=None,
+ dest="overlay",
+ help=textwrap.dedent(
+ """\
Specify test plan ovelay file to:
* skip tests
* amend test parameters
* add new tests
- '''))
- parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
- default=False, help='Set log level to DEBUG.')
+ """
+ ),
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ dest="verbose",
+ default=False,
+ help="Set log level to DEBUG.",
+ )
parser.add_argument(
"--qa-reports-server",
dest="qa_reports_server",
@@ -1011,7 +1219,7 @@
"--qa-reports-disable-metadata",
dest="qa_reports_disable_metadata",
default=False,
- action='store_true',
+ action="store_true",
help="Disable sending metadata to SQUAD. Default: false",
)
parser.add_argument(
@@ -1038,17 +1246,17 @@
args = get_args()
# Setup logger.
- logger = logging.getLogger('RUNNER')
+ logger = logging.getLogger("RUNNER")
logger.setLevel(logging.INFO)
if args.verbose:
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(name)s: %(levelname)s: %(message)s')
+ formatter = logging.Formatter("%(asctime)s - %(name)s: %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
- logger.debug('Test job arguments: %s' % args)
+ logger.debug("Test job arguments: %s" % args)
if args.kind != "manual" and args.target is None:
if os.geteuid() != 0:
logger.error("Sorry, you need to run this as root")
@@ -1056,45 +1264,51 @@
# Validate target argument format and connectivity.
if args.target:
- rex = re.compile('.+@.+')
+ rex = re.compile(".+@.+")
if not rex.match(args.target):
- logger.error('Usage: -g username@host')
+ logger.error("Usage: -g username@host")
sys.exit(1)
- if pexpect.which('ssh') is None:
- logger.error('openssh client must be installed on the host.')
+ if pexpect.which("ssh") is None:
+ logger.error("openssh client must be installed on the host.")
sys.exit(1)
try:
run_command("exit", args.target)
except subprocess.CalledProcessError as e:
- logger.error('ssh login failed.')
+ logger.error("ssh login failed.")
print(e)
sys.exit(1)
# Generate test plan.
test_plan = TestPlan(args)
test_list = test_plan.test_list(args.kind)
- logger.info('Tests to run:')
+ logger.info("Tests to run:")
for test in test_list:
print(test)
# Run tests.
for test in test_list:
# Set and save test params to test dictionary.
- test['test_name'] = os.path.splitext(test['path'].split('/')[-1])[0]
- test['test_uuid'] = '%s_%s' % (test['test_name'], test['uuid'])
- test['output'] = os.path.realpath(args.output)
- if args.target is not None and '-o' not in sys.argv:
- test['output'] = os.path.join(test['output'], args.target)
- test['test_path'] = os.path.join(test['output'], test['test_uuid'])
+ test["test_name"] = os.path.splitext(test["path"].split("/")[-1])[0]
+ test["test_uuid"] = "%s_%s" % (test["test_name"], test["uuid"])
+ test["output"] = os.path.realpath(args.output)
+ if args.target is not None and "-o" not in sys.argv:
+ test["output"] = os.path.join(test["output"], args.target)
+ test["test_path"] = os.path.join(test["output"], test["test_uuid"])
if args.target is not None:
# Get relative directory path of yaml file for partial file copy.
# '-d' takes any relative paths to the yaml file, so get the realpath first.
- tc_realpath = os.path.realpath(test['path'])
+ tc_realpath = os.path.realpath(test["path"])
tc_dirname = os.path.dirname(tc_realpath)
- test['tc_relative_dir'] = '%s%s' % (args.kind, tc_dirname.split(args.kind)[1])
+ test["tc_relative_dir"] = "%s%s" % (
+ args.kind,
+ tc_dirname.split(args.kind)[1],
+ )
target_user_home = run_command("echo $HOME", args.target)
- test['target_test_path'] = '%s/output/%s' % (target_user_home, test['test_uuid'])
- logger.debug('Test parameters: %s' % test)
+ test["target_test_path"] = "%s/output/%s" % (
+ target_user_home,
+ test["test_uuid"],
+ )
+ logger.debug("Test parameters: %s" % test)
# Create directories and copy files needed.
setup = TestSetup(test, args)
@@ -1117,7 +1331,7 @@
result_parser = ResultParser(test, args)
result_parser.run()
else:
- logger.warning("Requested test definition %s doesn't exist" % test['path'])
+ logger.warning("Requested test definition %s doesn't exist" % test["path"])
if __name__ == "__main__":