summaryrefslogtreecommitdiff
path: root/apps/patchmetrics/crowd.py
blob: 7d3e7226fa79ea4f47ee2dcbdf0060f2a2fc3ecc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Copyright (C) 2013 Linaro
#
# Author: Milo Casagrande <milo.casagrande@linaro.org>
# This file is part of the Patchmetrics package.
#
# Patchmetrics is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Patchmetrics is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Patchwork; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import base64
import httplib
import urllib
import json
import types


class CrowdException(Exception):
    """Base class for Crowd exceptions."""


class CrowdNotFoundException(CrowdException):
    """An exception for 404 status."""


class CrowdForbiddenException(CrowdException):
    """An exception for 403 status."""


class CrowdUnauthorizedException(CrowdException):
    """ An exception for 401 status."""


class CrowdUser(object):
    """An object that depicts a user from the Crowd system.

    It has the following properties:
    name (str)
    display_name (str)
    teams (list)
    emails (list)
    """

    def __init__(self):
        self._display_name = None
        self._emails = None
        self._teams = None
        self._name = None

    @property
    def emails(self):
        return self._emails

    @emails.setter
    def emails(self, value):
        if isinstance(value, types.StringTypes):
            self._emails = [value]
        else:
            self._emails = list(value)

    @property
    def teams(self):
        return self._teams

    @teams.setter
    def teams(self, value):
        if isinstance(value, types.StringTypes):
            self._teams = [value]
        else:
            self._teams = list(value)

    @property
    def display_name(self):
        return self._display_name

    @display_name.setter
    def display_name(self, value):
        self._display_name = value

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value):
        self._name = value

    @staticmethod
    def _user_from_json(data):
        user = CrowdUser()
        user.display_name = data['display-name']
        user.name = data['name']
        user.emails = data['email']
        return user

    @staticmethod
    def from_json(data):
        """Creates a CrowdUser from JSON data.

        The JSON data has to contain these fields:
        display-name
        name
        email

        :param data: The JSON file to load.
        """
        json_data = json.load(data)
        return CrowdUser._user_from_json(json_data)

    @staticmethod
    def from_json_s(string):
        """Creates a CrowdUser from a JSON string.

        The JSON data has to contain these fields:
        display-name
        name
        email

        :param string: The JSON string.
        :type str
        """
        json_data = json.loads(string)
        return CrowdUser._user_from_json(json_data)


class Crowd(object):
    """A Crowd object used to perform query operations."""

    def __init__(self, usr, pwd, url):
        self._usr = usr
        self._pwd = pwd
        assert url.startswith("https://")
        dummy, dummy, self._host, self._uri = url.split("/", 3)
        if ":" in self._host:
            self._host, self._port = self._host.split(":")
        else:
            self._port = 443

        self._auth = base64.encodestring('{0}:{1}'.format(self._usr,
                                                          self._pwd))
        self._headers = {
            "Authorization": "Basic {0}".format(self._auth),
            "Accept": "application/json"
        }

    def get_user(self, email):
        params = {"username": email}

        resource = "/user?{0}".format(urllib.urlencode(params))
        return CrowdUser.from_json_s(self._get_rest_usermanagement(resource))

    def get_user_with_groups(self, email):
        """Gets all the groups a user is member of.

        :param email: The user email.
        :return A CrowdUser object.
        """
        # First get the user, if it does not exist, we skip all the operations
        # here.
        user = self.get_user(email)

        params = {"username": email}

        resource = "/user/group/nested?{0}".format(
            urllib.urlencode(params))
        data = json.loads(self._get_rest_usermanagement(resource))

        teams = []
        if data["groups"]:
            teams = [x["name"] for x in data["groups"]]
        user.teams = teams

        return user

    def is_valid_user(self, email):
        """Handy function to check if a user exists or not.

        :param email: The user email.
        :return True or False.
        """
        params = {"username": email}

        resource = "/user?{0}".format(urllib.urlencode(params))
        api_url = "/crowd/rest/usermanagement/1{0}".format(resource)

        valid = True
        try:
            self._get(api_url)
        except CrowdNotFoundException:
            # In case of other exceptions, raise them.
            valid = False

        return valid

    def _get_rest_usermanagement(self, resource):
        api_url = "/{0}{1}".format(self._uri, resource)
        return self._get(api_url)

    def _get(self, api_url):
        """Performs a GET operation on the API URL.

        :param api_url: The URL of the API to use.
        :return The response data.
        :raise CrowdNotFoundException if the response status is 404,
            CrowdForbiddenException if status is 403,
            CrowdUnauthorizedException if status is 401, and CrowdException
            in other cases.
        """
        connection = httplib.HTTPSConnection(self._host, self._port)
        connection.request("GET", api_url, headers=self._headers)
        resp = connection.getresponse()

        if resp.status == 200:
            return resp.read()
        elif resp.status == 404:
            raise CrowdNotFoundException("Resource not found")
        elif resp.status == 401:
            raise CrowdUnauthorizedException(
                "Authorization not granted to fulfill the request")
        elif resp.status == 403:
            raise CrowdForbiddenException(
                "Access forbidden to fulfill the request")
        else:
            raise CrowdException(
                "Unknown Crowd status {0}".format(resp.status))