#!/usr/bin/env python import sys import httplib import urllib import base64 import json import optparse import getpass import logging from pprint import pprint #from BeautifulSoup import BeautifulSoup BASE_URL = "https://login.linaro.org:8443/crowd/rest/usermanagement/1" AUTH = (None, None) log = logging.getLogger() class CrowdRequestError(Exception): pass def strip_html(html): return html html = BeautifulSoup(html, convertEntities=BeautifulSoup.HTML_ENTITIES) if not html.body: return html m = ' '.join(html.body.findAll(text=True)) return m def rest(uri, params): c = httplib.HTTPSConnection("login.linaro.org", 8443) auth = base64.encodestring('%s:%s' % AUTH) headers = {"Authorization": "Basic " + auth} url = uri + "?" + urllib.urlencode(params) c.request("GET", "/crowd/rest/usermanagement/1" + url, headers=headers) resp = c.getresponse() if resp.status != 200: data = resp.read() log.error("Non-successful response code %d from Crowd: %s", resp.status, strip_html(data)) raise CrowdRequestError(resp, data) data = json.load(resp) return data if __name__ == "__main__": logging.basicConfig() optparser = optparse.OptionParser(usage="%prog user|alises|group|usergroups|ismember|groupusers|groupgroups|members") optparser.add_option("-u", "--user", default="rest-test", help="Crowd username") optparser.add_option("-p", "--passwd", help="Crowd password") optparser.add_option("-P", "--ask-passwd", action="store_true", help="Ask Crowd password") optparser.add_option("-n", "--nested", action="store_true", help="Process nested groups") optparser.add_option("-r", "--raw", action="store_true", help="Show raw JSON response") options, args = optparser.parse_args(sys.argv[1:]) if len(args) < 1: optparser.error("Wrong number of arguments") if options.ask_passwd: options.passwd = getpass.getpass("Crowd password: ") AUTH = (options.user, options.passwd) if args[0] == "user": pprint(rest("/user.json", {"username": args[1], "expand": "attributes"})) elif args[0] == "aliases": pprint(rest("/aliases.json", {"username": args[1]})) elif args[0] == "group": pprint(rest("/group.json", {"groupname": args[1], "expand": "attributes"})) elif args[0] == "usergroups": url = "/user/group/nested.json" if options.nested else "/user/group/direct.json" data = rest(url, {"username": args[1]}) if options.raw: pprint(data) else: names = [x["name"] for x in data["groups"]] names.sort() for n in names: print n elif args[0] == "groupusers": url = "/group/user/nested.json" if options.nested else "/group/user/direct.json" data = rest(url, {"groupname": args[1]}) if options.raw: pprint(data) else: names = [x["name"] for x in data["users"]] names.sort() for n in names: print n elif args[0] == "groupgroups": url = "/group/child-group/nested.json" if options.nested else "/group/child-group/direct.json" data = rest(url, {"groupname": args[1]}) if options.raw: pprint(data) else: names = [x["name"] for x in data["groups"]] names.sort() for n in names: print n elif args[0] == "members": def print_names(header, list): if list: print header for n in list: print "\t" + n data = rest("/group/user/direct.json", {"groupname": args[1]}) names = sorted([x["name"] for x in data["users"]]) print_names("Users:", names) data = rest("/group/child-group/direct.json", {"groupname": args[1]}) names = sorted([x["name"] for x in data["groups"]]) print_names("Groups:", names) elif args[0] == "ismember": url = "/user/group/nested.json" if options.nested else "/user/group/direct.json" try: data = rest(url, {"username": args[1], "groupname": args[2]}) assert data["name"] == args[2] print "yes" except CrowdRequestError, e: print e.args[1] print "no"