#!/usr/bin/python3 import requests import json import logging import argparse import sys from linaro_ldap import do_complex_query log = logging.getLogger('bugzilla-audit-assigned') handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter('%(message)s') handler.setFormatter(formatter) log.addHandler(handler) parser = argparse.ArgumentParser( description='Audit bugzilla to ensure default contacts are valid') parser.add_argument( '--debug', '-d', action='store_true', help="Show all contact regardless of if the user is missing or not") args = parser.parse_args() try: import bugzilla_conf except ImportError: log.error('could not load bugzilla_conf.py... exiting') sys.exit(2) USER_CACHE = {} def user_exists(email): if email in USER_CACHE: return USER_CACHE[email] else: results = do_complex_query( search_filter='(mail=%s)' % email, attrlist=['uid'], base='ou=accounts,dc=linaro,dc=org' ) USER_CACHE[email] = len(results) > 0 return USER_CACHE[email] def find_missing_users(results): for product in results['products']: if product['is_active']: for component in product['components']: if component['is_active']: if component['default_assigned_to']: if not user_exists( component['default_assigned_to'] ): log.warn("MISSING: %s||%s || default assigned: %s", product['name'], component['name'], component['default_assigned_to'] ) else: log.info("present: %s||%s || default assigned: %s", product['name'], component['name'], component['default_assigned_to'] ) if component['default_qa_contact']: if not user_exists(component['default_qa_contact']): log.warn("MISSING: %s||%s || default qa: %s", product['name'], component['name'], component['default_qa_contact'] ) class Bugz: server = None token = None user = None password = None def __init__(self, server): self.server = server def auth(self, user, passwd): url = '/rest/login?login=%s&password=%s' % (user, passwd) data = self.get(url) self.token = data['token'] def get(self, req_url, auth=False): url = self.server + req_url if self.token is not None: if '?' in url: url += '&' else: url += '?' url += 'token=%s' % self.token # print "Hitting %s" % url r = requests.get(url) if r.status_code != 200: raise Exception("request error: %s" % r.status_code) return json.loads(r.text) if __name__ == '__main__': b = Bugz(bugzilla_conf.BZ_SITE) b.auth(bugzilla_conf.BZ_USER, bugzilla_conf.BZ_PASSWORD) if args.debug: log.setLevel('INFO') find_missing_users(b.get('/rest/product?type=accessible'))