import contextlib import os import subprocess import tempfile import ldap # To provide alternative ldap bind credentials, override the LDAP_CONF # environment variable when calling your script that makes use of the this # library LDAP_CONF = os.environ.get('LDAP_CONF', '/etc/ldap.conf') @contextlib.contextmanager def ldap_client(config): client = ldap.initialize(config["uri"], trace_level=0) client.set_option(ldap.OPT_REFERRALS, 0) client.simple_bind(config["binddn"], config["bindpw"]) try: yield client finally: client.unbind() def build_config(): config = {} with open(LDAP_CONF) as f: for line in f: if line.startswith('binddn'): if "binddn" not in config: config["binddn"] = line.split(' ', 1)[1].strip() elif line.startswith('bindpw'): if "bindpw" not in config: config["bindpw"] = line.split(' ', 1)[1].strip() elif line.startswith('base'): if "basedn" not in config: config["basedn"] = line.split(' ', 1)[1].strip() elif line.startswith('uri'): if "uri" not in config: config["uri"] = line.split(' ', 1)[1].strip() return config def validate_key(pubkey): with tempfile.NamedTemporaryFile(delete=True) as f: f.write(pubkey) f.flush() try: args = ['ssh-keygen', '-l', '-f', f.name] subprocess.check_output(args, stderr=subprocess.PIPE) except: return False return True def do_query(search_attr='uid', search_pat='*', attrlist=[]): config = build_config() with ldap_client(config) as client: result = client.search_s( config["basedn"], ldap.SCOPE_SUBTREE, '(%s=%s)' % (search_attr, search_pat), attrlist) return result def get_users_and_keys(only_validated=False): """Gets all the users and their associated SSH key. :return A list of tuples (uid, ssh_key), only if the user has an SSH key. """ result = do_query(attrlist=['uid', 'sshPublicKey']) all_users = {} for row in result: try: # Just pick the first UID, it does not really matter how the # user is called, it will access the git repository via the # 'git' user. uid = row[1]['uid'][0] ssh_keys = row[1]['sshPublicKey'] for index, ssh_key in enumerate(ssh_keys): if not only_validated or validate_key(ssh_key): key_name = '{0}@key_{1}.pub'.format(uid, index) all_users.setdefault(uid, []).append((key_name, ssh_key)) except KeyError: # If there are no SSH keys, skip this user. pass return all_users def get_groups_and_users(ignore_list=[]): result = do_query(attrlist=['uid', 'memberOf']) all_groups = {} for row in result: try: uid = row[1]['uid'][0] groups = row[1]['memberOf'] if (type(ignore_list) is list) and (uid in ignore_list): continue for group in groups: group = group.split(",")[0].split("=")[1] all_groups.setdefault(group, []).append((uid)) except KeyError: # This user is not in any groups pass return all_groups