#!/usr/bin/python import json import logging import requests from requests.auth import HTTPBasicAuth logging.basicConfig() logging.getLogger("requests").setLevel(logging.WARNING) log = logging.getLogger("linaro_gerrit") class LinaroGerrit: def __init__(self, base, username, password, noverify=False, loglevel="WARN", dryrun=False): self.base = base self.username = username self.password = password self.verify = not noverify self.dryrun = dryrun log.setLevel(getattr(logging, loglevel.upper())) self.reqargs = { "auth": HTTPBasicAuth(username, password), "verify": self.verify } def strip_gerrit_junk(self, string): # https://gerrit-review.googlesource.com/Documentation/rest-api.html#output return '\n'.join(string.split('\n')[1:]) def get_account(self, username, limit=1): """Get account_id from a given query q and limit result n. """ url = ('%s/a/accounts/') % (self.base) payload = {'q': 'username:' + username, 'limit': limit} r = requests.get(url, params=payload, **self.reqargs) log.info("Finding user: %s with url %s", username, r.url) try: a = json.loads(self.strip_gerrit_junk(r.content)) if a: for data in a: log.info("Found user: %s", data["_account_id"]) return data["_account_id"] else: log.info("user %s not found in gerrit", username) return False except ValueError as e: log.warn(e) def list_keys(self, username): log.info("Listing keys for user: %s", username) url = "%s/a/accounts/%s/sshkeys/" % (self.base, self.get_account(username)) r = requests.get(url, **self.reqargs) keydict = {} if r.status_code == 200: try: a = json.loads(self.strip_gerrit_junk(r.content)) for data in a: keydict[data["seq"]] = data["ssh_public_key"].strip() return keydict except ValueError as e: log.warn(e) return False else: log.info("user %s not found in gerrit", username) return False def add_key(self, pubkey, username): log.debug("Adding pubkey %s to user %s", pubkey, username) url = "%s/a/accounts/%s/sshkeys/" % (self.base, self.get_account(username)) if not self.dryrun: r = requests.post(url, data=pubkey.encode("utf-8"), **self.reqargs) if r.status_code == 201: return True return False log.debug("Not actually doing it because --dryrun") return True def del_key(self, username, key_id): log.debug("Deleting key %s by id from user %s", key_id, username) url = "%s/a/accounts/%s/sshkeys/%i" % (self.base, self.get_account(username), key_id) if not self.dryrun: r = requests.delete(url, **self.reqargs) if r.status_code == 204: return True return False log.debug("Not actually doing it because --dryrun") return True def keysets_to_list(self, keysets): list = [] for key in keysets: list.append(unicode(key[1])) return list def list_group_members(self, groupname): log.info("Listing member of group: %s", groupname) url = "%s/a/groups/%s/members/" % (self.base, groupname) r = requests.get(url, **self.reqargs) members = [] if r.status_code == 200: try: a = json.loads(self.strip_gerrit_junk(r.content)) for data in a: try: members.append(data["username"]) except KeyError as e: log.error("ERROR: user %s has no username!", data["email"]) return members except ValueError as e: log.warn(e) return False elif r.status_code == 404: log.info("Group %s not found in gerrit", groupname) return False else: log.error("list_group_members(%s): unexpected status code: %d", groupname, r.status_code) return False def add_group_member(self, groupname, username): log.debug("Adding %s to group %s", username, groupname) url = "%s/a/groups/%s/members/%s" % (self.base, groupname, self.get_account(username)) if not self.dryrun: r = requests.put(url, **self.reqargs) if r.status_code == 201 or r.status_code == 200: return True else: log.info("Failed to add %s to group %s", username, groupname) return False log.debug("Not actually doing it because --dryrun") return True def delete_group_member(self, groupname, username): log.debug("Deleting %s from group %s", username, groupname) url = "%s/a/groups/%s/members/%s" % (self.base, groupname, self.get_account(username)) if not self.dryrun: r = requests.delete(url, **self.reqargs) if r.status_code == 204: return True else: log.error("Failed to del %s from group %s", username, groupname) return False log.debug("Not actually doing it because --dryrun") return True def list_included_groups(self, groupname): log.debug("Listing included groups for %s", groupname) url = "%s/a/groups/%s/groups/" % (self.base, groupname) r = requests.get(url, **self.reqargs) groups = [] if r.status_code == 200: try: a = json.loads(self.strip_gerrit_junk(r.content)) for data in a: groups.append(data["name"]) return groups except ValueError as e: log.warn(e) return False else: log.info("Group %s not found in gerrit", groupname) return False def add_included_group(self, groupname, include): log.debug("Including group %s in %s", include, groupname) include = requests.utils.quote(include, safe='') url = "%s/a/groups/%s/groups/%s" % (self.base, groupname, include) if not self.dryrun: r = requests.put(url, **self.reqargs) if r.status_code == 201 or r.status_code == 200: return True else: log.error("Failed to include %s to group %s", include, groupname) return False log.debug("Not actually doing it because --dryrun") return True def list_projects(self, parents=False): log.debug("Listing projects") url = "%s/a/projects/" % self.base params = {} if parents: params["t"] = "" r = requests.get(url, params=params, **self.reqargs) if r.status_code == 200: return json.loads(self.strip_gerrit_junk(r.content)) else: log.error("Failed to list projects %s", r.status_code) return {} def set_project_parent(self, project, parent, commit_msg=""): log.info("Setting project %s parent as %s", project, parent) project = requests.utils.quote(project, safe='') url = "%s/a/projects/%s/parent" % (self.base, project) body = {"parent": parent, "commit_message": commit_msg} headers = {"Content-Type": "application/json"} if not self.dryrun: r = requests.put(url, data=json.dumps(body), headers=headers, **self.reqargs) if r.status_code == 201 or r.status_code == 200: return True else: log.error("Failed to set project %s parent as %s", project, parent) return False log.info("Not actually doing it because --dryrun") return True def add_gerrit_args(parser, def_loglevel="WARNING"): parser.add_argument('--username', help="Gerrit HTTP API Username") parser.add_argument('--password', help="Gerrit HTTP API Password") parser.add_argument('--base', required=True, help="Gerrit BASE URL (e.g. " "https://review.linaro.org)") parser.add_argument('--noverify', action="store_true", help="Disable SSL certificate verficiation") parser.add_argument('--dryrun', action="store_true", help="Do not perform any actions, just report") parser.add_argument('--loglevel', default=def_loglevel, help="Setting logging level, default: %(default)s") def apply_gerrit_conf(args): """Try to load linaro_gerrit.conf file of simple arg=val format and use options there for None-valued arguments in args. The usecase is: Let specify password in a config file, so it didn't show in the process list, cron emails, etc. If password is specified on command line however, it should be used instead of one in config file. Different/mroe advanced usecases may not be handled by this function.""" conf_path = __file__.rsplit(".", 1)[0] + ".conf" with open(conf_path) as f: for l in f: l = l.strip() if not l or l[0] == "#": continue k, v = [x.strip() for x in l.split("=", 1)] if hasattr(args, k) and getattr(args, k) is None: setattr(args, k, v)