blob: b1d741ea89c16fd6294cc5ae5f1f2266aebbaeab [file] [log] [blame]
import csv
import logging
import multiprocessing
import requests
import statistics
import traceback
from datetime import datetime, timedelta
from multiprocessing import Pool, Lock
from urllib.parse import urljoin, urlsplit
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def process_testjob_list(testjob_list, lava_auth, build, writer_queue, global_writer_queue, build_queue):
# get job details from LAVA
print("starting fetch for build %s" % build['version'])
print(len(testjob_list))
wait_time_list = []
execution_time_list = []
total_time_list = []
job_priority_list = []
first_job_submission_time = now
last_job_end_time = datetime.strptime(build['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ")
failed_jobs = 0
for testjob in testjob_list:
# check job status
if testjob['job_status'] != "Complete":
failed_jobs += 1
# get testjob backend and ask for detalils
# assubmit all backends are LAVA type
backend = backend_dict.get(testjob['backend'].rsplit("/", 2)[1])
if backend is not None:
backend_url_parts = urlsplit(backend['url'])
backend_rest_url = "%s://%s/api/v0.2/" % (backend_url_parts.scheme, backend_url_parts.netloc)
job_details_request = requests.get(
urljoin(backend_rest_url, "jobs/%s/" % testjob['job_id']),
auth=NullAuth(),
headers=lava_auth
)
if job_details_request.status_code == 200:
job_details = job_details_request.json()
if job_details['start_time']:
submit_time = datetime.strptime(job_details['submit_time'], "%Y-%m-%dT%H:%M:%S.%fZ")
start_time = datetime.strptime(job_details['start_time'], "%Y-%m-%dT%H:%M:%S.%fZ")
end_time = datetime.strptime(job_details['end_time'], "%Y-%m-%dT%H:%M:%S.%fZ")
wait_time = start_time - submit_time
wait_time_list.append(wait_time.total_seconds())
execution_time = end_time - start_time
execution_time_list.append(execution_time.total_seconds())
job_priority_list.append(job_details['priority'])
if first_job_submission_time > submit_time :
first_job_submission_time = submit_time
if last_job_end_time < end_time:
last_job_end_time = end_time
build_queue.put(
{
'build_version': build['version'],
'job_id': testjob['job_id'],
'priority': job_details['priority'],
'submit_time': submit_time,
'wait_time': wait_time,
'execution_time': execution_time
}
)
else:
continue
row = {}
if wait_time_list:
row = {
'date': datetime.strptime(build['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S"),
'build version': build['version'],
'no of jobs': len(testjob_list),
'no of failed jobs': failed_jobs,
'avg job priority': statistics.mean(job_priority_list),
'avg wait time': statistics.mean(wait_time_list),
'avg execution time':statistics.mean(execution_time_list),
'total execution time': (last_job_end_time - first_job_submission_time).total_seconds()
}
global_writer_queue.put(row)
return row
def listener_process(queue):
with open("total.csv", "w") as global_csv_stats:
global_writer = csv.DictWriter(global_csv_stats, fieldnames)
global_writer.writeheader()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
global_writer.writerow(record)
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def build_listener_process(build_name, queue):
with open("%s.csv" % (build_name), "w") as csv_file:
fieldnames = ['build_version', 'job_id', 'priority', 'submit_time', 'wait_time', 'execution_time']
writer = csv.DictWriter(csv_file, fieldnames)
writer.writeheader()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
writer.writerow(record)
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def log_results(results):
print(results)
logger.debug(results)
# if wait_time_list:
# with writer_lock:
# writer.writerow({
# 'date': datetime.strptime(build['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S"),
# 'build version': build['version'],
# 'no of jobs': len(testjob_list),
# 'no of failed jobs': failed_jobs,
# 'avg job priority': statistics.mean(job_priority_list),
# 'avg wait time': statistics.mean(wait_time_list),
# 'avg execution time':statistics.mean(execution_time_list),
# 'total execution time': (last_job_end_time - first_job_submission_time).total_seconds()
# })
# with global_writer_lock:
# global_writer.writerow({
# 'date': datetime.strptime(build['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S"),
# 'build version': build['version'],
# 'no of jobs': len(testjob_list),
# 'no of failed jobs': failed_jobs,
# 'avg job priority': statistics.mean(job_priority_list),
# 'avg wait time': statistics.mean(wait_time_list),
# 'avg execution time':statistics.mean(execution_time_list),
# 'total execution time': (last_job_end_time - first_job_submission_time).total_seconds()
# })
# create CSV with the following format
# date, build version, # of jobs, # of failed jobs, avg wait time, avg execution time, time from first submission to last result
fieldnames = [
'date',
'build version',
'no of jobs',
'no of failed jobs',
'avg job priority',
'avg wait time',
'avg execution time',
'total execution time'
]
class NullAuth(requests.auth.AuthBase):
'''force requests to ignore the ``.netrc``
Some sites do not support regular authentication, but we still
want to store credentials in the ``.netrc`` file and submit them
as form elements. Without this, requests would otherwise use the
.netrc which leads, on some sites, to a 401 error.
Use with::
requests.get(url, auth=NullAuth())
Copied from: https://github.com/psf/requests/issues/2773#issuecomment-174312831
'''
def __call__(self, r):
return r
android_list = [
"android-lkft/4.14-10.0-gsi-hikey",
"android-lkft/4.14-10.0-gsi-hikey960",
"android-lkft/4.14-master-hikey",
"android-lkft/4.14-master-hikey960",
"android-lkft/4.14-master-hikey960-lkft",
"android-lkft/4.14-master-hikey-lkft",
"android-lkft/4.14p-10.0-gsi-hikey",
"android-lkft/4.14p-10.0-gsi-hikey960",
"android-lkft/4.14-stable-master-hikey960-lkft",
"android-lkft/4.14-stable-master-hikey-lkft",
"android-lkft/4.19-10.0-gsi-hikey",
"android-lkft/4.19-10.0-gsi-hikey960",
"android-lkft/4.19-9.0-hikey960-auto",
"android-lkft/4.19-9.0-hikey-auto",
"android-lkft/4.19-master-hikey",
"android-lkft/4.19-master-hikey960",
"android-lkft/4.19-master-hikey960-lkft",
"android-lkft/4.19-master-hikey-lkft",
"android-lkft/4.19-stable-master-hikey960-lkft",
"android-lkft/4.19-stable-master-hikey-lkft",
"android-lkft/4.4o-10.0-gsi-hikey",
"android-lkft/4.4o-9.0-lcr-hikey",
"android-lkft/4.4p-10.0-gsi-hikey",
"android-lkft/4.4p-rc-10.0-gsi-hikey",
"android-lkft/4.4p-rc-9.0-hikey",
"android-lkft/4.9-10.0-gsi-hikey",
"android-lkft/4.9-10.0-gsi-hikey960",
"android-lkft/4.9o-10.0-gsi-hikey",
"android-lkft/4.9o-10.0-gsi-hikey960",
"android-lkft/4.9o-9.0-lcr-hikey",
"android-lkft/4.9p-10.0-gsi-hikey",
"android-lkft/4.9p-10.0-gsi-hikey960",
]
lkft_list = [
# "warp7/warp7-bsp",
# "lkft/linux-stable-rc-linux-4.4.y",
# "lkft/linux-stable-rc-linux-4.4.y-sanity",
# "lkft/linux-stable-rc-linux-4.9.y",
# "lkft/linux-stable-rc-linux-4.9.y-sanity",
# "lkft/linux-stable-rc-linux-4.14.y",
# "lkft/linux-stable-rc-linux-4.14.y-sanity",
# "lkft/linux-stable-rc-linux-4.19.y",
# "lkft/linux-stable-rc-linux-4.19.y-sanity",
"lkft/linux-stable-rc-linux-5.4.y",
"lkft/linux-stable-rc-linux-5.4.y-sanity",
"lkft/linux-stable-rc-linux-5.8.y",
"lkft/linux-stable-rc-linux-5.8.y-sanity"
]
project_list = lkft_list
auth = {
"Authorization": "Token 86dda92be75dd6929b97e3bce4d4f660afedd0dd"
}
lava_auth = {
"Authorization": "Token d3xjq5twg9hs562bcqpiezgx1yknrfp4ojhf0470hga0cg3k8qu1bndf3mzwbbu02us5e3zxbor8m2ezwst7405hk4ob4n6nurk71el1nmb10699lo8lszozzkqxaqp8"
}
qa_reports_url = "https://qa-reports.linaro.org/api/"
now = datetime.now()
delta = timedelta(weeks=26)
# get backend list
backend_dict = {}
backend_list_request = requests.get(
urljoin(qa_reports_url, "backends/"),
auth=NullAuth(),
headers=auth
)
if backend_list_request.status_code == 200:
backend_list = backend_list_request.json()['results']
for backend in backend_list:
backend_dict.update({str(backend['id']): backend})
global_writer_queue = multiprocessing.Queue()
workers = []
writers = []
writer_queues = []
#pool = Pool()
listener = multiprocessing.Process(target=listener_process, args=(global_writer_queue,))
listener.start()
for project in project_list:
print(project)
group_slug, project_slug = project.split("/")
query_params = {
"project__slug": project_slug,
"project__group__slug": group_slug,
"created_at__gte": now-delta,
"ordering": "created_at"
}
# get list of builds for last 6 months
build_list_request = requests.get(
urljoin(qa_reports_url, "builds"),
params=query_params,
headers=auth,
auth=NullAuth()
)
build_list = []
if build_list_request.status_code == 200:
build_list = build_list_request.json()['results']
else:
print(build_list_request.status_code)
print(build_list_request.text)
if not build_list:
# no builds, go to the next project
continue
while build_list_request.json()['next']:
build_list_request = requests.get(
build_list_request.json()['next'],
headers=auth,
auth=NullAuth()
)
if build_list_request.status_code == 200:
build_list = build_list + build_list_request.json()['results']
writer_queue = multiprocessing.Queue()
writer_queues.append(writer_queue)
writer_listener = multiprocessing.Process(target=build_listener_process, args=("%s_%s" % (group_slug, project_slug), writer_queue,))
writer_listener.start()
writers.append(writer_listener)
# for each build, get list of LAVA jobs
for build in build_list:
testjob_list_request = requests.get(
build['testjobs'],
auth=NullAuth(),
headers=auth
)
testjob_list = []
if testjob_list_request.status_code == 200:
testjob_list = testjob_list_request.json()['results']
while testjob_list_request.json()['next']:
try:
testjob_list_request = requests.get(
testjob_list_request.json()['next'],
auth=NullAuth(),
headers=auth
)
if testjob_list_request.status_code == 200:
testjob_list = testjob_list + testjob_list_request.json()['results']
except requests.exceptions.ConnectionError:
pass
logger.debug("processing jobs for: %s" % build['version'])
#p = pool.apply_async(process_testjob_list, [testjob_list, lava_auth, build, writer_queue, global_writer_queue], callback=log_results)
w = multiprocessing.Process(target=process_testjob_list, args=(testjob_list, lava_auth, build, writer_queue, global_writer_queue, writer_queue))
workers.append(w)
w.start()
#global_csv_stats.close()
#pool.close()
#pool.join()
print(global_writer_queue.qsize())
for w in workers:
w.join()
for wq in writer_queues:
wq.put(None)
for w in writers:
w.join()
global_writer_queue.put(None)
listener.join()