summaryrefslogtreecommitdiff
path: root/apps/patchmetrics/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'apps/patchmetrics/utils.py')
-rw-r--r--apps/patchmetrics/utils.py151
1 files changed, 0 insertions, 151 deletions
diff --git a/apps/patchmetrics/utils.py b/apps/patchmetrics/utils.py
deleted file mode 100644
index c900c41..0000000
--- a/apps/patchmetrics/utils.py
+++ /dev/null
@@ -1,151 +0,0 @@
-import logging
-import re
-
-from patchmetrics.crowd import (
- CrowdException,
- CrowdNotFoundException,
-)
-from django.contrib.auth.models import User
-from django.db.models import Q
-from patchmetrics.models import Team
-from patchwork.models import Person
-
-logging.basicConfig()
-logger = logging.getLogger()
-
-
-def create_team_display_name(team):
- """Very simple and hackish way to create a display name for a team.
-
- :param team: The team name.
- """
- # XXX: since Crowd does not expose the displayName of a team, we hack one.
- parts = re.split("-|_", team)
- display_name = " ".join(parts)
- return display_name.title()
-
-
-def sync_crowd_memberships(memberships):
- """Make sure the given memberships are represented in the database.
-
- :param memberships: A dict mapping each team to a list of members. Each
- member is in turn represented by a CrowdUser object.
- For example: {Team: [CrowdUser1, CrowdUser2, ...]}
- """
- created_memberships = []
- for team, members in memberships.iteritems():
- team_display_name = create_team_display_name(team)
- try:
- db_team = Team.objects.get(name=team)
- except Team.DoesNotExist:
- logger.info(
- 'Creating new team: {0} ({1})'.format(team, team_display_name))
- db_team = Team(name=team, display_name=team_display_name)
- db_team.save()
-
- for member in members:
- people = []
- for email in member.emails:
- try:
- person = Person.objects.get(email=email)
- except Person.DoesNotExist:
- logger.info("Creating new person: {0} "
- "({1})".format(member.name, email))
- person = Person(name=member.display_name, email=email)
- person.save()
- people.append(person)
-
- # Get all Person entries that might represent other email
- # addresses of this same user.
- people.extend(Person.objects.filter(name=member.display_name))
-
- user = get_user(member.name, member.emails)
- if user is None:
- user = User.objects.create_user(
- member.name, member.emails[0], password=None)
-
- # Now link all the Person entries to the user account.
- for person in people:
- person.user = user
- person.save()
-
- # And finally, make sure the user is a member of the team.
- if user not in db_team.members:
- logger.info("Adding {0} as a member "
- "of {1}".format(member.name, db_team.name))
- membership = db_team.add_member(user)
- membership.save()
- created_memberships.append(membership)
-
- return created_memberships
-
-
-def get_user(name, emails):
- """Return the user linked to the person with one of the given emails.
-
- If there are no users linked to a person with any of the given emails,
- return None.
- """
- query = (Q(person__in=Person.objects.filter(email__in=emails))
- | Q(username=name))
- users = User.objects.filter(query).distinct()
- if users.count() == 1:
- return users[0]
- elif users.count() > 1:
- logger.info("Found more than one user for {0}; "
- "using the first one".format(emails))
- return users[0]
- else:
- return None
-
-
-def sync_user_memberships(email_addresses, crowd, whitelisted_groups=[]):
- """If an input email matches one from Linaro Login, add them to the db.
-
- If an email address matches a user in Linaro Login, takes the CrowdUser
- object and passes it to sync_crowd_memberships, where CrowdUser are
- matched with User objects (many CrowdUser can map to a single User).
- The tail call to `sync_openid_urls` then adds OpenID URLs to each User
- object.
-
- :param: email_addresses: List of email addresses to look in Linaro Login.
- :type list
- :param crowd: The Crowd object instance to perform query to Linaro Login.
- :type Crowd
- :param whitelisted_groups: A list of valid groups/teams to create.
- :type list
- """
- memberships = {}
- for email in email_addresses:
- email = email.lower()
-
- user = None
- try:
- user = crowd.get_user_with_groups(email)
- except CrowdNotFoundException:
- # If there is not a user matching that email addess in Linaro,
- # move on.
- pass
- except CrowdException:
- # If something else went wrong, report it.
- logger.warning("Error while searching email address "
- "'{0}'.".format(email))
-
- # No user with that email, or user is not part of any team.
- if user is None or not user.teams:
- continue
- else:
- for team in user.teams:
- # If we have a list of valid groups, obey it. Otherwise, all
- # groups are valid.
- if whitelisted_groups:
- if team in whitelisted_groups:
- if not team in memberships:
- memberships[team] = []
- memberships[team].append(user)
- else:
- if not team in memberships:
- memberships[team] = []
- memberships[team].append(user)
-
- sync_crowd_memberships(memberships)