diff options
Diffstat (limited to 'apps/patchmetrics/utils.py')
-rw-r--r-- | apps/patchmetrics/utils.py | 151 |
1 files changed, 0 insertions, 151 deletions
diff --git a/apps/patchmetrics/utils.py b/apps/patchmetrics/utils.py deleted file mode 100644 index c900c41..0000000 --- a/apps/patchmetrics/utils.py +++ /dev/null @@ -1,151 +0,0 @@ -import logging -import re - -from patchmetrics.crowd import ( - CrowdException, - CrowdNotFoundException, -) -from django.contrib.auth.models import User -from django.db.models import Q -from patchmetrics.models import Team -from patchwork.models import Person - -logging.basicConfig() -logger = logging.getLogger() - - -def create_team_display_name(team): - """Very simple and hackish way to create a display name for a team. - - :param team: The team name. - """ - # XXX: since Crowd does not expose the displayName of a team, we hack one. - parts = re.split("-|_", team) - display_name = " ".join(parts) - return display_name.title() - - -def sync_crowd_memberships(memberships): - """Make sure the given memberships are represented in the database. - - :param memberships: A dict mapping each team to a list of members. Each - member is in turn represented by a CrowdUser object. - For example: {Team: [CrowdUser1, CrowdUser2, ...]} - """ - created_memberships = [] - for team, members in memberships.iteritems(): - team_display_name = create_team_display_name(team) - try: - db_team = Team.objects.get(name=team) - except Team.DoesNotExist: - logger.info( - 'Creating new team: {0} ({1})'.format(team, team_display_name)) - db_team = Team(name=team, display_name=team_display_name) - db_team.save() - - for member in members: - people = [] - for email in member.emails: - try: - person = Person.objects.get(email=email) - except Person.DoesNotExist: - logger.info("Creating new person: {0} " - "({1})".format(member.name, email)) - person = Person(name=member.display_name, email=email) - person.save() - people.append(person) - - # Get all Person entries that might represent other email - # addresses of this same user. - people.extend(Person.objects.filter(name=member.display_name)) - - user = get_user(member.name, member.emails) - if user is None: - user = User.objects.create_user( - member.name, member.emails[0], password=None) - - # Now link all the Person entries to the user account. - for person in people: - person.user = user - person.save() - - # And finally, make sure the user is a member of the team. - if user not in db_team.members: - logger.info("Adding {0} as a member " - "of {1}".format(member.name, db_team.name)) - membership = db_team.add_member(user) - membership.save() - created_memberships.append(membership) - - return created_memberships - - -def get_user(name, emails): - """Return the user linked to the person with one of the given emails. - - If there are no users linked to a person with any of the given emails, - return None. - """ - query = (Q(person__in=Person.objects.filter(email__in=emails)) - | Q(username=name)) - users = User.objects.filter(query).distinct() - if users.count() == 1: - return users[0] - elif users.count() > 1: - logger.info("Found more than one user for {0}; " - "using the first one".format(emails)) - return users[0] - else: - return None - - -def sync_user_memberships(email_addresses, crowd, whitelisted_groups=[]): - """If an input email matches one from Linaro Login, add them to the db. - - If an email address matches a user in Linaro Login, takes the CrowdUser - object and passes it to sync_crowd_memberships, where CrowdUser are - matched with User objects (many CrowdUser can map to a single User). - The tail call to `sync_openid_urls` then adds OpenID URLs to each User - object. - - :param: email_addresses: List of email addresses to look in Linaro Login. - :type list - :param crowd: The Crowd object instance to perform query to Linaro Login. - :type Crowd - :param whitelisted_groups: A list of valid groups/teams to create. - :type list - """ - memberships = {} - for email in email_addresses: - email = email.lower() - - user = None - try: - user = crowd.get_user_with_groups(email) - except CrowdNotFoundException: - # If there is not a user matching that email addess in Linaro, - # move on. - pass - except CrowdException: - # If something else went wrong, report it. - logger.warning("Error while searching email address " - "'{0}'.".format(email)) - - # No user with that email, or user is not part of any team. - if user is None or not user.teams: - continue - else: - for team in user.teams: - # If we have a list of valid groups, obey it. Otherwise, all - # groups are valid. - if whitelisted_groups: - if team in whitelisted_groups: - if not team in memberships: - memberships[team] = [] - memberships[team].append(user) - else: - if not team in memberships: - memberships[team] = [] - memberships[team].append(user) - - sync_crowd_memberships(memberships) |