#!/usr/bin/python import json import logging import requests logging.basicConfig() logging.getLogger("requests").setLevel(logging.WARNING) log = logging.getLogger("linaro_gerrit") class LinaroGerrit: def __init__(self, base, username, password, noverify=False, loglevel="WARN", dryrun=False): self.base = base self.username = username self.password = password self.verify = not noverify self.dryrun = dryrun log.setLevel(getattr(logging, loglevel.upper())) try: from requests.auth import HTTPDigestAuth self.reqargs = {"auth": HTTPDigestAuth(username, password), "verify": self.verify} except ImportError: log.info("Using old version of python-requests \ (--noverify not supported") self.reqargs = {"auth": ('digest', username, password)} def strip_gerrit_junk(self, string): # https://gerrit-review.googlesource.com/Documentation/rest-api.html#output return '\n'.join(string.split('\n')[1:]) def list_keys(self, username): log.info("Listing keys for user: %s", username) url = "%s/a/accounts/%s/sshkeys/" % (self.base, username) r = requests.get(url, **self.reqargs) keydict = {} if r.status_code == 200: try: a = json.loads(self.strip_gerrit_junk(r.content)) for data in a: keydict[data["seq"]] = data["ssh_public_key"].strip() return keydict except ValueError as e: log.warn(e) return False else: log.info("user %s not found in gerrit", username) return False def add_key(self, pubkey, username): log.debug("Adding pubkey %s to user %s", pubkey, username) url = "%s/a/accounts/%s/sshkeys/" % (self.base, username) if not self.dryrun: r = requests.post(url, data=pubkey.encode("utf-8"), **self.reqargs) if r.status_code == 201: return True return False log.debug("Not actually doing it because --dryrun") return True def del_key(self, username, key_id): log.debug("Deleting key %s by id from user %s", key_id, username) url = "%s/a/accounts/%s/sshkeys/%i" % (self.base, username, key_id) if not self.dryrun: r = requests.delete(url, **self.reqargs) if r.status_code == 204: return True return False log.debug("Not actually doing it because --dryrun") return True def keysets_to_list(self, keysets): list = [] for key in keysets: list.append(unicode(key[1])) return list def list_group_members(self, groupname): log.info("Listing member of group: %s", groupname) url = "%s/a/groups/%s/members/" % (self.base, groupname) r = requests.get(url, **self.reqargs) members = [] if r.status_code == 200: try: a = json.loads(self.strip_gerrit_junk(r.content)) for data in a: try: members.append(data["username"]) except KeyError as e: log.error("ERROR: user %s has no username!", data["email"]) return members except ValueError as e: log.warn(e) return False elif r.status_code == 404: log.info("Group %s not found in gerrit", groupname) return False else: log.error("list_group_members(%s): unexpected status code: %d", groupname, r.status_code) return False def add_group_member(self, groupname, username): log.debug("Adding %s to group %s", username, groupname) url = "%s/a/groups/%s/members/%s" % (self.base, groupname, username) if not self.dryrun: r = requests.put(url, **self.reqargs) if r.status_code == 201 or r.status_code == 200: return True else: log.info("Failed to add %s to group %s", username, groupname) return False log.debug("Not actually doing it because --dryrun") return True def delete_group_member(self, groupname, username): log.debug("Deleting %s from group %s", username, groupname) url = "%s/a/groups/%s/members/%s" % (self.base, groupname, username) if not self.dryrun: r = requests.delete(url, **self.reqargs) if r.status_code == 204: return True else: log.error("Failed to del %s from group %s", username, groupname) return False log.debug("Not actually doing it because --dryrun") return True def list_included_groups(self, groupname): log.debug("Listing included groups for %s", groupname) url = "%s/a/groups/%s/groups/" % (self.base, groupname) r = requests.get(url, **self.reqargs) groups = [] if r.status_code == 200: try: a = json.loads(self.strip_gerrit_junk(r.content)) for data in a: groups.append(data["name"]) return groups except ValueError as e: log.warn(e) return False else: log.info("Group %s not found in gerrit", groupname) return False def add_included_group(self, groupname, include): log.debug("Including group %s in %s", include, groupname) include = requests.utils.quote(include, safe='') url = "%s/a/groups/%s/groups/%s" % (self.base, groupname, include) if not self.dryrun: r = requests.put(url, **self.reqargs) if r.status_code == 201 or r.status_code == 200: return True else: log.error("Failed to include %s to group %s", include, groupname) return False log.debug("Not actually doing it because --dryrun") return True def add_gerrit_args(parser): parser.add_argument('--username', help="Gerrit HTTP API Username") parser.add_argument('--password', help="Gerrit HTTP API Password") parser.add_argument('--base', help="Gerrit BASE URL (" "https://review.linaro.org)") parser.add_argument('--noverify', action="store_true", help="Disable SSL certificate verficiation") parser.add_argument('--dryrun', action="store_true", help="Do not perform any actions, just report") parser.add_argument('--loglevel', default="WARNING", help="Setting logging level, default: %(default)s") def apply_gerrit_conf(args): """Try to load linaro_gerrit.conf file of simple arg=val format and use options there for None-valued arguments in args. The usecase is: Let specify password in a config file, so it didn't show in the process list, cron emails, etc. If password is specified on command line however, it should be used instead of one in config file. Different/mroe advanced usecases may not be handled by this function.""" conf_path = __file__.rsplit(".", 1)[0] + ".conf" with open(conf_path) as f: for l in f: l = l.strip() if not l or l[0] == "#": continue k, v = [x.strip() for x in l.split("=", 1)] if hasattr(args, k) and getattr(args, k) is None: setattr(args, k, v)