1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
import contextlib
import os
import subprocess
import tempfile
import ldap
# To provide alternative ldap bind credentials, override the LDAP_CONF
# environment variable when calling your script that makes use of the this
# library
LDAP_CONF = os.environ.get('LDAP_CONF', '/etc/ldap.conf')
@contextlib.contextmanager
def ldap_client(config):
client = ldap.initialize(config["uri"], trace_level=0)
client.set_option(ldap.OPT_REFERRALS, 0)
client.simple_bind(config["binddn"], config["bindpw"])
try:
yield client
finally:
client.unbind()
def build_config():
config = {}
with open(LDAP_CONF) as f:
for line in f:
if line.startswith('binddn'):
if "binddn" not in config:
config["binddn"] = line.split(' ', 1)[1].strip()
elif line.startswith('bindpw'):
if "bindpw" not in config:
config["bindpw"] = line.split(' ', 1)[1].strip()
elif line.startswith('base'):
if "basedn" not in config:
config["basedn"] = line.split(' ', 1)[1].strip()
elif line.startswith('uri'):
if "uri" not in config:
config["uri"] = line.split(' ', 1)[1].strip()
return config
def validate_key(pubkey):
with tempfile.NamedTemporaryFile(delete=True) as f:
f.write(pubkey)
f.flush()
try:
args = ['ssh-keygen', '-l', '-f', f.name]
subprocess.check_output(args, stderr=subprocess.PIPE)
except:
return False
return True
def do_query(search_attr='uid', search_pat='*', attrlist=[]):
config = build_config()
with ldap_client(config) as client:
result = client.search_s(
config["basedn"],
ldap.SCOPE_SUBTREE,
'(%s=%s)' % (search_attr, search_pat),
attrlist)
return result
def get_users_and_keys(only_validated=False):
"""Gets all the users and their associated SSH key.
:return A list of tuples (uid, ssh_key), only if the user has an SSH
key.
"""
result = do_query(attrlist=['uid', 'sshPublicKey'])
all_users = {}
for row in result:
try:
# Just pick the first UID, it does not really matter how the
# user is called, it will access the git repository via the
# 'git' user.
uid = row[1]['uid'][0]
ssh_keys = row[1]['sshPublicKey']
for index, ssh_key in enumerate(ssh_keys):
if not only_validated or validate_key(ssh_key):
key_name = '{0}@key_{1}.pub'.format(uid, index)
all_users.setdefault(uid, []).append((key_name, ssh_key))
except KeyError:
# If there are no SSH keys, skip this user.
pass
return all_users
def get_groups_and_users(ignore_list=[]):
result = do_query(attrlist=['uid', 'memberOf'])
all_groups = {}
for row in result:
try:
uid = row[1]['uid'][0]
groups = row[1]['memberOf']
if (type(ignore_list) is list) and (uid in ignore_list):
continue
for group in groups:
group = group.split(",")[0].split("=")[1]
all_groups.setdefault(group, []).append((uid))
except KeyError:
# This user is not in any groups
pass
return all_groups
|