diff options
author | Andy Doan <andy.doan@linaro.org> | 2014-09-03 19:10:53 +0000 |
---|---|---|
committer | Linaro Code Review <review@review.linaro.org> | 2014-09-03 19:10:53 +0000 |
commit | 9fb12a0392122d61e0000e08cf37123811359766 (patch) | |
tree | f522a0a4ab4389d62088d36ed4eb21c66f15b870 | |
parent | c47554a34010fb8b87f3c70f585aa1998d02595c (diff) | |
parent | e3af10ba45428217a0f528b52293a4dea422628f (diff) |
Merge "api: move get_license_api and list_files_api to v1.py"
-rw-r--r-- | license_protected_downloads/api/v1.py | 85 | ||||
-rw-r--r-- | license_protected_downloads/common.py | 243 | ||||
-rw-r--r-- | license_protected_downloads/tests/__init__.py | 5 | ||||
-rw-r--r-- | license_protected_downloads/tests/test_common.py | 34 | ||||
-rw-r--r-- | license_protected_downloads/tests/test_views.py | 25 | ||||
-rw-r--r-- | license_protected_downloads/views.py | 318 | ||||
-rw-r--r-- | urls.py | 4 |
7 files changed, 374 insertions, 340 deletions
diff --git a/license_protected_downloads/api/v1.py b/license_protected_downloads/api/v1.py index 5cc6fa1..79fcb15 100644 --- a/license_protected_downloads/api/v1.py +++ b/license_protected_downloads/api/v1.py @@ -1,17 +1,29 @@ +import json import os import random import shutil from django.views.decorators.csrf import csrf_exempt from django.http import ( + Http404, HttpResponse, HttpResponseForbidden, HttpResponseServerError ) from django.conf import settings +from django.utils.encoding import iri_to_uri -from license_protected_downloads.models import APIKeyStore, APILog -from license_protected_downloads.common import safe_path_join +from license_protected_downloads.models import ( + APIKeyStore, + APILog, + License +) +from license_protected_downloads.common import ( + dir_list, + is_protected, + safe_path_join, + test_path, +) def upload_target_path(path, key, public): @@ -133,3 +145,72 @@ def api_push_to_server(request): """ pass + + +def list_files_api(request, path): + path = iri_to_uri(path) + url = path + result = test_path(path, request) + if not result: + raise Http404 + + target_type = result[0] + path = result[1] + + if target_type: + if target_type == "file": + file_url = url + if file_url[0] != "/": + file_url = "/" + file_url + path = os.path.dirname(path) + url = os.path.dirname(url) + + listing = dir_list(url, path, human_readable=False) + + clean_listing = [] + for entry in listing: + if target_type == "file" and file_url != entry["url"]: + # If we are getting a listing for a single file, skip the rest + continue + + if len(entry["license_list"]) == 0: + entry["license_list"] = ["Open"] + + clean_listing.append({ + "name": entry["name"], + "size": entry["size"], + "type": entry["type"], + "mtime": entry["mtime"], + "url": entry["url"], + }) + + data = json.dumps({"files": clean_listing}) + else: + data = json.dumps({"files": ["File not found."]}) + + return HttpResponse(data, mimetype='application/json') + + +def get_license_api(request, path): + path = iri_to_uri(path) + result = test_path(path, request) + if not result: + raise Http404 + + target_type = result[0] + path = result[1] + + if target_type == "dir": + data = json.dumps({"licenses": + ["ERROR: License only shown for a single file."]}) + else: + license_digest_list = is_protected(path) + license_list = License.objects.all_with_hashes(license_digest_list) + if len(license_list) == 0: + license_list = ["Open"] + else: + license_list = [{"text": l.text, "digest": l.digest} + for l in license_list] + data = json.dumps({"licenses": license_list}) + + return HttpResponse(data, mimetype='application/json') diff --git a/license_protected_downloads/common.py b/license_protected_downloads/common.py index 74886bb..4178f2f 100644 --- a/license_protected_downloads/common.py +++ b/license_protected_downloads/common.py @@ -1,4 +1,22 @@ +import glob +import hashlib +import logging +import mimetypes import os +import re + +from datetime import datetime + +from django.conf import settings + +from license_protected_downloads import( + buildinfo, + models, +) + + +log = logging.getLogger("llp.views") + def safe_path_join(base_path, *paths): """os.path.join with check that result is inside base_path. @@ -16,3 +34,228 @@ def safe_path_join(base_path, *paths): return None return target_path + + +def _check_special_eula(path): + if glob.glob(path + ".EULA.txt.*"): + return True + + +def _get_theme(path): + eula = glob.glob(path + ".EULA.txt.*") + vendor = os.path.splitext(eula[0])[1] + return vendor[1:] + + +def _insert_license_into_db(digest, text, theme): + if not models.License.objects.filter(digest=digest): + l = models.License(digest=digest, text=text, theme=theme) + l.save() + + +def is_protected(path): + build_info = None + max_index = 1 + base_path = path + if not os.path.isdir(base_path): + base_path = os.path.dirname(base_path) + + buildinfo_path = os.path.join(base_path, "BUILD-INFO.txt") + open_eula_path = os.path.join(base_path, "OPEN-EULA.txt") + eula_path = os.path.join(base_path, "EULA.txt") + + if os.path.isfile(buildinfo_path): + try: + build_info = buildinfo.BuildInfo(path) + except buildinfo.IncorrectDataFormatException: + # If we can't parse the BuildInfo, return [], which indicates no + # license in dir_list and will trigger a 403 error in file_server. + return [] + + license_type = build_info.get("license-type") + license_text = build_info.get("license-text") + theme = build_info.get("theme") + auth_groups = build_info.get("auth-groups") + max_index = build_info.max_index + elif os.path.isfile(open_eula_path): + return "OPEN" + elif os.path.isfile(eula_path): + if re.search("snowball", path): + theme = "stericsson" + elif re.search("origen", path): + theme = "samsung" + else: + theme = "linaro" + license_type = "protected" + license_file = os.path.join(settings.PROJECT_ROOT, + 'templates/licenses/' + theme + '.txt') + auth_groups = False + with open(license_file, "r") as infile: + license_text = infile.read() + elif _check_special_eula(path): + theme = _get_theme(path) + license_type = "protected" + license_file = os.path.join(settings.PROJECT_ROOT, + 'templates/licenses/' + theme + '.txt') + auth_groups = False + with open(license_file, "r") as infile: + license_text = infile.read() + elif _check_special_eula(base_path + "/*"): + return "OPEN" + else: + return [] + + digests = [] + + if license_type: + if license_type == "open": + return "OPEN" + + if auth_groups and not license_text: + return "OPEN" + elif license_text: + for i in range(max_index): + if build_info: + license_text = build_info.get("license-text", i) + theme = build_info.get("theme", i) + digest = hashlib.md5(license_text).hexdigest() + digests.append(digest) + _insert_license_into_db(digest, license_text, theme) + else: + log.info("No license text or auth groups found: check the " + "BUILD-INFO file.") + + return digests + + +def test_path(path, request): + """Check that path points to something we can serve up. + """ + served_paths = settings.SERVED_PATHS + # if key is in request.GET["key"] then need to mod path and give + # access to a per-key directory. + if "key" in request.GET: + key_details = models.APIKeyStore.objects.filter(key=request.GET["key"]) + if key_details: + path = os.path.join(request.GET["key"], path) + + # Private uploads are in a separate path (or can be), so set + # served_paths as needed. + if not key_details[0].public: + served_paths = [settings.UPLOAD_PATH] + + for basepath in served_paths: + fullpath = safe_path_join(basepath, path) + if fullpath is None: + return None + if os.path.isfile(fullpath): + return ("file", fullpath) + if os.path.isdir(fullpath): + return ("dir", fullpath) + + +def _hidden_file(file_name): + hidden_files = ["BUILD-INFO.txt", "EULA.txt", "HEADER.html", + "HOWTO_", "textile", ".htaccess", "licenses"] + for pattern in hidden_files: + if re.search(pattern, file_name): + return True + return False + + +def _listdir(path): + '''Lists the contents of a directory sorted to our requirements. + + If the directory is all numbers it sorts them numerically. The "latest" + entry will always be the first entry. Else use standard sorting. + ''' + def _sort(a, b): + try: + return cmp(int(a), int(b)) + except: + pass + if a == 'latest': + return -1 + elif b == 'latest': + return 1 + + return cmp(a, b) + files = os.listdir(path) + files.sort(_sort) + return files + + +def _sizeof_fmt(num): + ''' Returns in human readable format for num. + ''' + if num < 1024 and num > -1024: + return str(num) + num /= 1024.0 + for x in ['K', 'M', 'G']: + if num < 1024.0 and num > -1024.0: + return "%3.1f%s" % (num, x) + num /= 1024.0 + return "%3.1f%s" % (num, 'T') + + +def dir_list(url, path, human_readable=True): + files = _listdir(path) + listing = [] + + for file_name in files: + if _hidden_file(file_name): + continue + + name = file_name + file_name = os.path.join(path, file_name) + + if os.path.exists(file_name): + mtime = os.path.getmtime(file_name) + else: + # If the file we are looking at doesn't exist (broken symlink for + # example), it doesn't have a mtime. + mtime = 0 + + if os.path.isdir(file_name): + target_type = "folder" + else: + target_type = mimetypes.guess_type(name)[0] + + if os.path.exists(file_name): + size = os.path.getsize(file_name) + else: + # If the file we are looking at doesn't exist (broken symlink for + # example), it doesn't have a size + size = 0 + + if not re.search(r'^/', url) and url != '': + url = '/' + url + + # Since the code below assume no trailing slash, make sure that + # there isn't one. + url = re.sub(r'/$', '', url) + + if human_readable: + if mtime: + mtime = datetime.fromtimestamp(mtime) + mtime = mtime.strftime('%d-%b-%Y %H:%M') + if target_type: + if target_type.split('/')[0] == "text": + target_type = "text" + else: + target_type = "other" + + size = _sizeof_fmt(size) + + pathname = os.path.join(path, name) + license_digest_list = is_protected(pathname) + license_list = models.License.objects.all_with_hashes( + license_digest_list) + listing.append({'name': name, + 'size': size, + 'type': target_type, + 'mtime': mtime, + 'license_digest_list': license_digest_list, + 'license_list': license_list, + 'url': url + '/' + name}) + return listing diff --git a/license_protected_downloads/tests/__init__.py b/license_protected_downloads/tests/__init__.py index 8d28529..65d5c4c 100644 --- a/license_protected_downloads/tests/__init__.py +++ b/license_protected_downloads/tests/__init__.py @@ -7,6 +7,10 @@ from license_protected_downloads.tests.test_splicebuildinfos \ from license_protected_downloads.tests.test_models import LicenseTestCase from license_protected_downloads.tests.test_pep8 import TestPep8 from license_protected_downloads.tests.test_pyflakes import TestPyflakes +from license_protected_downloads.tests.test_pyflakes import TestPyflakes +from license_protected_downloads.tests.test_common import ( + CommonTests, +) from license_protected_downloads.tests.test_views import ( FileViewTests, HowtoViewTests, @@ -24,6 +28,7 @@ from license_protected_downloads.tests.test_render_text_files \ #starts the test suite __test__ = { + 'CommonTests': CommonTests, 'APITests': APITests, 'BuildInfoTests': BuildInfoTests, 'SpliceBuildInfosTests': SpliceBuildInfosTests, diff --git a/license_protected_downloads/tests/test_common.py b/license_protected_downloads/tests/test_common.py new file mode 100644 index 0000000..aea0f00 --- /dev/null +++ b/license_protected_downloads/tests/test_common.py @@ -0,0 +1,34 @@ +__author__ = 'dooferlad' + +import os +import tempfile +import unittest +import shutil + +from license_protected_downloads import common + + +class CommonTests(unittest.TestCase): + def test_sizeof_fmt(self): + self.assertEqual(common._sizeof_fmt(1), '1') + self.assertEqual(common._sizeof_fmt(1234), '1.2K') + self.assertEqual(common._sizeof_fmt(1234567), '1.2M') + self.assertEqual(common._sizeof_fmt(1234567899), '1.1G') + self.assertEqual(common._sizeof_fmt(1234567899999), '1.1T') + + def test_listdir(self): + patterns = [ + (['b', 'a', 'latest', 'c'], ['latest', 'a', 'b', 'c']), + (['10', '1', '100', 'latest'], ['latest', '1', '10', '100']), + (['10', 'foo', '100', 'latest'], ['latest', '10', '100', 'foo']), + ] + for files, expected in patterns: + path = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, path) + for file in files: + with open(os.path.join(path, file), 'w') as f: + f.write(file) + self.assertEqual(expected, common._listdir(path)) + +if __name__ == '__main__': + unittest.main() diff --git a/license_protected_downloads/tests/test_views.py b/license_protected_downloads/tests/test_views.py index 41958fd..017ea89 100644 --- a/license_protected_downloads/tests/test_views.py +++ b/license_protected_downloads/tests/test_views.py @@ -6,7 +6,6 @@ import tempfile import unittest import urllib2 import urlparse -import shutil import mock @@ -15,12 +14,11 @@ from django.test import Client, TestCase from django.http import HttpResponse from license_protected_downloads.buildinfo import BuildInfo +from license_protected_downloads.common import _insert_license_into_db from license_protected_downloads.config import INTERNAL_HOSTS from license_protected_downloads.tests.helpers import temporary_directory from license_protected_downloads.tests.helpers import TestHttpServer -from license_protected_downloads.views import _insert_license_into_db from license_protected_downloads.views import _process_include_tags -from license_protected_downloads.views import _sizeof_fmt from license_protected_downloads.views import is_same_parent_dir from license_protected_downloads import views @@ -513,27 +511,6 @@ class ViewTests(BaseServeViewTest): # this test should not cause an exception. Anything else is a pass. self.assertEqual(response.status_code, 200) - def test_sizeof_fmt(self): - self.assertEqual(_sizeof_fmt(1), '1') - self.assertEqual(_sizeof_fmt(1234), '1.2K') - self.assertEqual(_sizeof_fmt(1234567), '1.2M') - self.assertEqual(_sizeof_fmt(1234567899), '1.1G') - self.assertEqual(_sizeof_fmt(1234567899999), '1.1T') - - def test_listdir(self): - patterns = [ - (['b', 'a', 'latest', 'c'], ['latest', 'a', 'b', 'c']), - (['10', '1', '100', 'latest'], ['latest', '1', '10', '100']), - (['10', 'foo', '100', 'latest'], ['latest', '10', '100', 'foo']), - ] - for files, expected in patterns: - path = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, path) - for file in files: - with open(os.path.join(path, file), 'w') as f: - f.write(file) - self.assertEqual(expected, views._listdir(path)) - def test_whitelisted_dirs(self): target_file = "precise/restricted/whitelisted.txt" url = urlparse.urljoin("http://testserver/", target_file) diff --git a/license_protected_downloads/views.py b/license_protected_downloads/views.py index 99f0017..db41561 100644 --- a/license_protected_downloads/views.py +++ b/license_protected_downloads/views.py @@ -1,13 +1,9 @@ import logging -import glob -import hashlib import json import mimetypes import os import re import urllib2 -from mimetypes import guess_type -from datetime import datetime from django.conf import settings from django.http import ( @@ -23,16 +19,20 @@ from django.views.decorators.csrf import csrf_exempt from buildinfo import BuildInfo, IncorrectDataFormatException from render_text_files import RenderTextFiles -from models import License, APIKeyStore +from models import License # Load group auth "plugin" dynamically import importlib group_auth_modules = [importlib.import_module(m) for m in settings.GROUP_AUTH_MODULES] from BeautifulSoup import BeautifulSoup import config -from common import safe_path_join from group_auth_common import GroupAuthError import xml.dom.minidom as dom +from license_protected_downloads.common import ( + dir_list, + is_protected, + test_path +) from license_protected_downloads.api.v1 import file_server_post @@ -44,168 +44,6 @@ LINARO_INCLUDE_FILE_RE1 = re.compile( log = logging.getLogger("llp.views") -def _hidden_file(file_name): - hidden_files = ["BUILD-INFO.txt", "EULA.txt", "HEADER.html", - "HOWTO_", "textile", ".htaccess", "licenses"] - for pattern in hidden_files: - if re.search(pattern, file_name): - return True - return False - - -def _sizeof_fmt(num): - ''' Returns in human readable format for num. - ''' - if num < 1024 and num > -1024: - return str(num) - num /= 1024.0 - for x in ['K', 'M', 'G']: - if num < 1024.0 and num > -1024.0: - return "%3.1f%s" % (num, x) - num /= 1024.0 - return "%3.1f%s" % (num, 'T') - - -def _listdir(path): - '''Lists the contents of a directory sorted to our requirements. - - If the directory is all numbers it sorts them numerically. The "latest" - entry will always be the first entry. Else use standard sorting. - ''' - def _sort(a, b): - try: - return cmp(int(a), int(b)) - except: - pass - if a == 'latest': - return -1 - elif b == 'latest': - return 1 - - return cmp(a, b) - files = os.listdir(path) - files.sort(_sort) - return files - - -def dir_list(url, path, human_readable=True): - files = _listdir(path) - listing = [] - - for file_name in files: - if _hidden_file(file_name): - continue - - name = file_name - file_name = os.path.join(path, file_name) - - if os.path.exists(file_name): - mtime = os.path.getmtime(file_name) - else: - # If the file we are looking at doesn't exist (broken symlink for - # example), it doesn't have a mtime. - mtime = 0 - - if os.path.isdir(file_name): - target_type = "folder" - else: - target_type = guess_type(name)[0] - - if os.path.exists(file_name): - size = os.path.getsize(file_name) - else: - # If the file we are looking at doesn't exist (broken symlink for - # example), it doesn't have a size - size = 0 - - if not re.search(r'^/', url) and url != '': - url = '/' + url - - # Since the code below assume no trailing slash, make sure that - # there isn't one. - url = re.sub(r'/$', '', url) - - if human_readable: - if mtime: - mtime = datetime.fromtimestamp(mtime).strftime( - '%d-%b-%Y %H:%M') - if target_type: - if target_type.split('/')[0] == "text": - target_type = "text" - else: - target_type = "other" - - size = _sizeof_fmt(size) - - pathname = os.path.join(path, name) - license_digest_list = is_protected(pathname) - license_list = License.objects.all_with_hashes(license_digest_list) - listing.append({'name': name, - 'size': size, - 'type': target_type, - 'mtime': mtime, - 'license_digest_list': license_digest_list, - 'license_list': license_list, - 'url': url + '/' + name}) - return listing - - -def test_path(path, request, served_paths=None): - """Check that path points to something we can serve up. - - served_paths can be provided to overwrite settings.SERVED_PATHS. This is - used for uploaded files, which may not be shared in the server root. - """ - - # if key is in request.GET["key"] then need to mod path and give - # access to a per-key directory. - if "key" in request.GET: - key_details = APIKeyStore.objects.filter(key=request.GET["key"]) - if key_details: - path = os.path.join(request.GET["key"], path) - - # Private uploads are in a separate path (or can be), so set - # served_paths as needed. - if key_details[0].public == False: - served_paths = settings.UPLOAD_PATH - - if served_paths is None: - served_paths = settings.SERVED_PATHS - else: - if not isinstance(served_paths, list): - served_paths = [served_paths] - - for basepath in served_paths: - fullpath = safe_path_join(basepath, path) - - if fullpath is None: - return None - - if os.path.isfile(fullpath): - return ("file", fullpath) - if os.path.isdir(fullpath): - return ("dir", fullpath) - - return None - - -def _insert_license_into_db(digest, text, theme): - if not License.objects.filter(digest=digest): - l = License(digest=digest, text=text, theme=theme) - l.save() - - -def _check_special_eula(path): - if glob.glob(path + ".EULA.txt.*"): - return True - - -def _get_theme(path): - eula = glob.glob(path + ".EULA.txt.*") - vendor = os.path.splitext(eula[0])[1] - return vendor[1:] - - def _get_header_html_content(path): """ Read HEADER.html in current directory if exists and return @@ -271,81 +109,6 @@ def _process_include_tags(content): return content -def is_protected(path): - build_info = None - max_index = 1 - base_path = path - if not os.path.isdir(base_path): - base_path = os.path.dirname(base_path) - - buildinfo_path = os.path.join(base_path, "BUILD-INFO.txt") - open_eula_path = os.path.join(base_path, "OPEN-EULA.txt") - eula_path = os.path.join(base_path, "EULA.txt") - - if os.path.isfile(buildinfo_path): - try: - build_info = BuildInfo(path) - except IncorrectDataFormatException: - # If we can't parse the BuildInfo, return [], which indicates no - # license in dir_list and will trigger a 403 error in file_server. - return [] - - license_type = build_info.get("license-type") - license_text = build_info.get("license-text") - theme = build_info.get("theme") - auth_groups = build_info.get("auth-groups") - max_index = build_info.max_index - elif os.path.isfile(open_eula_path): - return "OPEN" - elif os.path.isfile(eula_path): - if re.search("snowball", path): - theme = "stericsson" - elif re.search("origen", path): - theme = "samsung" - else: - theme = "linaro" - license_type = "protected" - license_file = os.path.join(settings.PROJECT_ROOT, - 'templates/licenses/' + theme + '.txt') - auth_groups = False - with open(license_file, "r") as infile: - license_text = infile.read() - elif _check_special_eula(path): - theme = _get_theme(path) - license_type = "protected" - license_file = os.path.join(settings.PROJECT_ROOT, - 'templates/licenses/' + theme + '.txt') - auth_groups = False - with open(license_file, "r") as infile: - license_text = infile.read() - elif _check_special_eula(base_path + "/*"): - return "OPEN" - else: - return [] - - digests = [] - - if license_type: - if license_type == "open": - return "OPEN" - - if auth_groups and not license_text: - return "OPEN" - elif license_text: - for i in range(max_index): - if build_info: - license_text = build_info.get("license-text", i) - theme = build_info.get("theme", i) - digest = hashlib.md5(license_text).hexdigest() - digests.append(digest) - _insert_license_into_db(digest, license_text, theme) - else: - log.info("No license text or auth groups found: check the " - "BUILD-INFO file.") - - return digests - - def get_client_ip(request): ip = request.META.get('REMOTE_ADDR') return ip @@ -619,75 +382,6 @@ def get_remote_static(request): return HttpResponse(data) -def list_files_api(request, path): - path = iri_to_uri(path) - url = path - result = test_path(path, request) - if not result: - raise Http404 - - target_type = result[0] - path = result[1] - - if target_type: - if target_type == "file": - file_url = url - if file_url[0] != "/": - file_url = "/" + file_url - path = os.path.dirname(path) - url = os.path.dirname(url) - - listing = dir_list(url, path, human_readable=False) - - clean_listing = [] - for entry in listing: - if target_type == "file" and file_url != entry["url"]: - # If we are getting a listing for a single file, skip the rest - continue - - if len(entry["license_list"]) == 0: - entry["license_list"] = ["Open"] - - clean_listing.append({ - "name": entry["name"], - "size": entry["size"], - "type": entry["type"], - "mtime": entry["mtime"], - "url": entry["url"], - }) - - data = json.dumps({"files": clean_listing}) - else: - data = json.dumps({"files": ["File not found."]}) - - return HttpResponse(data, mimetype='application/json') - - -def get_license_api(request, path): - path = iri_to_uri(path) - result = test_path(path, request) - if not result: - raise Http404 - - target_type = result[0] - path = result[1] - - if target_type == "dir": - data = json.dumps({"licenses": - ["ERROR: License only shown for a single file."]}) - else: - license_digest_list = is_protected(path) - license_list = License.objects.all_with_hashes(license_digest_list) - if len(license_list) == 0: - license_list = ["Open"] - else: - license_list = [{"text": l.text, "digest": l.digest} - for l in license_list] - data = json.dumps({"licenses": license_list}) - - return HttpResponse(data, mimetype='application/json') - - def render_descriptions(path): """ Extracts project name and its description from annotated source manifest @@ -47,10 +47,10 @@ urlpatterns = patterns('', name='get_textile_files'), url(r'^api/ls/(?P<path>.*)$', - 'license_protected_downloads.views.list_files_api'), + 'license_protected_downloads.api.v1.list_files_api'), url(r'^api/license/(?P<path>.*)$', - 'license_protected_downloads.views.get_license_api'), + 'license_protected_downloads.api.v1.get_license_api'), url(r'^api/request_key$', 'license_protected_downloads.api.v1.api_request_key'), |