diff options
author | James Tunnicliffe <james.tunnicliffe@linaro.org> | 2012-06-28 14:10:07 +0100 |
---|---|---|
committer | James Tunnicliffe <james.tunnicliffe@linaro.org> | 2012-06-28 14:10:07 +0100 |
commit | db8c964a499b711a8642d47d759fd9211e810be6 (patch) | |
tree | 15049e0d734d07ac64ccdde4fb409ad782936a45 /tests | |
parent | 9d5a18425b487b6c6ede38dfc70fbeffdaa56e2d (diff) |
Updated testr config.
Moved license_protected_file_downloader.py back to where it should be for production doctests.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 2 | ||||
-rw-r--r-- | tests/license_protected_file_downloader.py | 311 | ||||
-rw-r--r-- | tests/test_click_through_license.py | 330 |
3 files changed, 311 insertions, 332 deletions
diff --git a/tests/__init__.py b/tests/__init__.py index 56f6d60..10c6f6b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -3,9 +3,7 @@ import unittest def test_suite(): module_names = [ - 'tests.test_click_through_license.TestLicense', 'tests.test_publish_to_snapshots.TestSnapshotsPublisher', - 'tests.test_php_unit.PhpUnitTest', ] loader = unittest.TestLoader() suite = loader.loadTestsFromNames(module_names) diff --git a/tests/license_protected_file_downloader.py b/tests/license_protected_file_downloader.py new file mode 100644 index 0000000..dd7e890 --- /dev/null +++ b/tests/license_protected_file_downloader.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python + +import argparse +import os +import pycurl +import re +import urllib +import urlparse +import html2text +from BeautifulSoup import BeautifulSoup + + +class LicenseProtectedFileFetcher: + """Fetch a file from the web that may be protected by a license redirect + + This is designed to run on snapshots.linaro.org. License HTML file are in + the form: + + <vendor>.html has a link to <vendor>-accept.html + + If self.get is pointed at a file that has to go through one of these + licenses, it should be able to automatically accept the license and + download the file. + + Once a license has been accepted, it will be used for all following + downloads. + + If self.close() is called before the object is deleted, cURL will store + the license accept cookie to cookies.txt, so it can be used for later + downloads. + + """ + def __init__(self, cookie_file="cookies.txt"): + """Set up cURL""" + self.curl = pycurl.Curl() + self.curl.setopt(pycurl.WRITEFUNCTION, self._write_body) + self.curl.setopt(pycurl.HEADERFUNCTION, self._write_header) + self.curl.setopt(pycurl.FOLLOWLOCATION, 1) + self.curl.setopt(pycurl.COOKIEFILE, cookie_file) + self.curl.setopt(pycurl.COOKIEJAR, cookie_file) + self.file_out = None + self.file_number = 0 + + def _pre_curl(self, url): + url = url.encode("ascii") + self.curl.setopt(pycurl.URL, url) + + self.body = "" + self.header = "" + + file_name = self.file_name + if self.file_number > 0: + file_name += str(self.file_number) + + # When debugging it is useful to have all intermediate files saved. + # If you want them, uncomment this line: + # self.file_number += 1 + + if self.file_name: + self.file_out = open(file_name, 'w') + else: + self.file_out = None + + return url + + def _post_curl(self, url): + self._parse_headers(url) + + if self.file_out: + self.file_out.close() + + def _get(self, url): + """Clear out header and body storage, fetch URL, filling them in.""" + self._pre_curl(url) + self.curl.perform() + self._post_curl(url) + + def _post(self, url, args, form): + """Clear out header and body storage, post to URL, filling them in.""" + + # Prep the URL. + # For some reason I can't get the python built in functions to do this + # for me in a way that actually works. + # args = urllib.urlencode(args) encodes the values as arrays + # Quoting of the string is unnecessary. + url = url + "?" + for k, v in args.items(): + url += k + "=" + v[0] + "&" + url = url[0:-1] # Trim the final & + + self._pre_curl(url) + self.curl.setopt(pycurl.HTTPPOST, form) + self.curl.perform() + self._post_curl(url) + + def _parse_headers(self, url): + header = {} + for line in self.header.splitlines(): + # Header lines typically are of the form thing: value... + test_line = re.search("^(.*?)\s*:\s*(.*)$", line) + + if test_line: + header[test_line.group(1)] = test_line.group(2) + + # The location attribute is sometimes relative, but we would + # like to have it as always absolute... + if 'Location' in header: + parsed_location = urlparse.urlparse(header["Location"]) + + # If not an absolute location... + if not parsed_location.netloc: + parsed_source_url = urlparse.urlparse(url) + new_location = ["", "", "", "", ""] + + new_location[0] = parsed_source_url.scheme + new_location[1] = parsed_source_url.netloc + new_location[2] = header["Location"] + + # Update location with absolute URL + header["Location"] = urlparse.urlunsplit(new_location) + + self.header_text = self.header + self.header = header + + def get_headers(self, url): + url = url.encode("ascii") + self.curl.setopt(pycurl.URL, url) + + self.body = "" + self.header = "" + + # Setting NOBODY causes CURL to just fetch the header. + self.curl.setopt(pycurl.NOBODY, True) + self.curl.perform() + self.curl.setopt(pycurl.NOBODY, False) + + self._parse_headers(url) + + return self.header + + def get_or_return_license(self, url, file_name=None): + """Get file at the requested URL or, if behind a license, return that. + + If the URL provided does not redirect us to a license, then return the + body of that file. If we are redirected to a license click through + then return (the license as plain text, url to accept the license). + + If the user of this function accepts the license, then they should + call get_protected_file.""" + + self.file_name = file_name + + # Get the license details. If this returns None, the file isn't license + # protected and we can just return the file we started to get in the + # function (self.body). + license_details = self._get_license(url) + + if license_details: + return license_details + + return self.body + + def get(self, url, file_name=None, ignore_license=False, + accept_license=True): + """Fetch the requested URL, accepting licenses + + Fetches the file at url. If a redirect is encountered, it is + expected to be to a license that has an accept link. Follow that link, + then download the original file. Returns the fist 1MB of the file + (see _write_body). + + """ + + self.file_name = file_name + if ignore_license: + self._get(url) + return self.body + + license_details = self._get_license(url) + + if license_details: + # Found a license. + if accept_license: + # Accept the license without looking at it and + # start fetching the file we originally wanted. + accept_url = license_details[1] + accept_query = license_details[2] + form = license_details[3] + self.get_protected_file(accept_url, accept_query, url, form) + + else: + # If we got here, there wasn't a license protecting the file + # so we just fetch it. + self._get(url) + + return self.body + + def _get_license(self, url): + """Return (license, accept URL, decline URL) if found, + else return None. + + """ + + self.get_headers(url) + + text_license = None + submit_url = None + + if "Location" in self.header and self.header["Location"] != url: + # We have been redirected to a new location - the license file + location = self.header["Location"] + + # Fetch the license HTML + self._get(location) + + soup = BeautifulSoup(self.body) + for form in soup.findAll("form"): + action = form.get("action") + if action and re.search("""/accept-license\?lic=""", action): + # This form is what we need to accept the license + submit_url = action + + # The license is in a div with the ID license-text, so we + # use this to ?lic={{ license.digest }}&url={{ url }}" method="post">pull just the license out of the HTML. + html_license = u"" + for chunk in soup.findAll(id="license-text"): + # Output of chunk.prettify is UTF8, but comes back + # as a str, so convert it here. + html_license += chunk.prettify().decode("utf-8") + + text_license = html2text.html2text(html_license) + + if text_license and submit_url: + # Currently accept_url contains the arguments we want to send. Split. + parsed = urlparse.urlparse(submit_url) + accept_url = urlparse.urljoin(url, parsed[2]) + args = urlparse.parse_qs(parsed[4]) + csrftoken = soup.findAll("input", + attrs={"name": "csrfmiddlewaretoken"})[0]["value"] + csrftoken = csrftoken.encode("ascii") + + form = [('accept', 'accept'), ("csrfmiddlewaretoken", csrftoken)] + + return text_license, accept_url, args, form + + return None + + def get_protected_file(self, accept_url, accept_query, url, form): + """Gets the file redirected to by the accept_url""" + + self._post(accept_url, accept_query, form) + + # The server returns us to an HTML file that redirects to the real + # download (in order to return the user to the directory listing + # after accepting a license). We don't parse the HTML. Just re- + # request the file. + self._get(url) # Download the target file + + return self.body + + def _write_body(self, buf): + """Used by curl as a sink for body content""" + + # If we have a target file to write to, write to it + if self.file_out and not self.file_out.closed: + self.file_out.write(buf) + + # Only buffer first 1MB of body. This should be plenty for anything + # we wish to parse internally. + if len(self.body) < 1024 * 1024 * 1024: + # XXX Would be nice to stop keeping the file in RAM at all and + # passing large buffers around. Perhaps only keep in RAM if + # file_name == None? (used for getting directory listings + # normally). + self.body += buf + + def _write_header(self, buf): + """Used by curl as a sink for header content""" + self.header += buf + + def register_progress_callback(self, callback): + self.curl.setopt(pycurl.NOPROGRESS, 0) + self.curl.setopt(pycurl.PROGRESSFUNCTION, callback) + + def close(self): + """Wrapper to close curl - this will allow curl to write out cookies""" + self.curl.close() + + +def main(): + """Download file specified on command line""" + parser = argparse.ArgumentParser(description="Download a file, accepting " + "any licenses required to do so.") + + parser.add_argument('url', metavar="URL", type=str, nargs=1, + help="URL of file to download.") + + args = parser.parse_args() + + fetcher = LicenseProtectedFileFetcher() + + # Get file name from URL + file_name = os.path.basename(urlparse.urlparse(args.url[0]).path) + if not file_name: + file_name = "downloaded" + fetcher.get(args.url[0], file_name) + + fetcher.close() + +if __name__ == "__main__": + main() diff --git a/tests/test_click_through_license.py b/tests/test_click_through_license.py deleted file mode 100644 index 4a0520d..0000000 --- a/tests/test_click_through_license.py +++ /dev/null @@ -1,330 +0,0 @@ -#!/usr/bin/env python - -import re -import os -import shutil -import shlex -import subprocess -import socket - -from testtools import TestCase -from testtools.matchers import Mismatch -from license_protected_file_downloader import LicenseProtectedFileFetcher - -fetcher = LicenseProtectedFileFetcher() -cwd = os.getcwd() -docroot = cwd -srvroot = os.path.abspath(os.path.join(*([cwd] + ['tests']))) -local_rewrite = 'RewriteCond %{REMOTE_ADDR} 127.0.0.1 [OR]' - -host = 'http://127.0.0.1' -port = '0' # 0 == Pick a free port. -samsung_license_path = '/licenses/license.php' -ste_license_path = '/licenses/license.php' -linaro_license_path = '/licenses/license.php' -samsung_test_file = '/android/~linaro-android/staging-origen/test.txt' -ste_test_file = ('/android/~linaro-android/staging-snowball' - '/173/target/product/snowball/test.txt') -ste_open_test_file = '/android/~linaro-android/staging-snowball/173/test.txt' -never_available = '/android/~linaro-android/staging-imx53/test.txt' -linaro_test_file = '/android/~linaro-android/staging-panda/test.txt' -not_protected_test_file = ('/android/~linaro-android/staging-vexpress-a9' - '/test.txt') -not_found_test_file = ('/android/~linaro-android/staging-vexpress-a9' - '/notfound.txt') -per_file_samsung_test_file = '/android/images/origen-blob.txt' -per_file_ste_test_file = '/android/images/snowball-blob.txt' -per_file_not_protected_test_file = '/android/images/MANIFEST' -dirs_only_dir = '/android/~linaro-android/' -build_info_samsung_test_file = '/android/build-info/origen-blob.txt' -build_info_ste_test_file = '/android/build-info/snowball-blob.txt' -build_info_not_protected_test_file = '/android/build-info/panda-open.txt' -build_info_openid_test_file = '/android/build-info/openid.txt' - - -class Contains(object): - '''Match if a string contains substring''' - def __init__(self, substr): - self.substr = substr - - def __str__(self): - return 'Contains(%s)' % (self.substr,) - - def match(self, actual): - for line in actual.splitlines(): - res = re.search(self.substr, line) - if res: - return None - return Mismatch("Initial string doesn't contain substring (%s)" % - self.substr) - - -class CommandNotFoundException(Exception): - ''' Unable to find command ''' - - -class NonZeroReturnValueException(Exception): - ''' Command exited with nonzero return value ''' - - -class TestLicense(TestCase): - '''Tests for accessing files and directories with license protection''' - - @classmethod - def setUpClass(cls): - global host - global port - if port == '0': - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('127.0.0.1', 0)) - port = str(s.getsockname()[1]) - s.close() - host = host + ':' + port - shutil.copy("%s/apache2.conf.tmpl" % srvroot, "%s/apache2.conf" % - srvroot) - shutil.copy("%s/.htaccess" % docroot, "%s/dothtaccess" % docroot) - subprocess.Popen(['sed', '-i', 's/ServerRoot \"\"/ServerRoot \"%s\"/' - % srvroot.replace('/', '\/'), '%s/apache2.conf' % srvroot], - stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() - subprocess.Popen(['sed', '-i', 's/DocumentRoot \"\"/DocumentRoot ' - '\"%s\"/' % docroot.replace('/', '\/'), '%s/apache2.conf' - % srvroot], stdout=open('/dev/null', 'w'), - stderr=subprocess.STDOUT).wait() - subprocess.Popen(['sed', '-i', 's/Directory \"\"/Directory \"%s\"/' - % docroot.replace('/', '\/'), '%s/apache2.conf' % srvroot], - stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() - subprocess.Popen(['sed', '-i', 's/Listen/Listen %s/' % port, - '%s/apache2.conf' % srvroot], stdout=open('/dev/null', 'w'), - stderr=subprocess.STDOUT).wait() - if subprocess.Popen(['which', 'apache2'], - stdout=open('/dev/null', 'w'), - stderr=subprocess.STDOUT).wait(): - raise CommandNotFoundException("apache2 command not found. Please " - "install apache2 web server and rerun tests.") - args = shlex.split('apache2 -d %s -f apache2.conf -k start' % srvroot) - rc = subprocess.Popen(args, stdout=open('/dev/null', 'w'), - stderr=subprocess.STDOUT).wait() - if rc: - raise NonZeroReturnValueException("apache2 server exited with " - "error %s" % rc) - - @classmethod - def tearDownClass(cls): - if os.path.exists("%s/cookies.txt" % docroot): - os.unlink("%s/cookies.txt" % docroot) - args = shlex.split('apache2 -d %s -f apache2.conf -k stop' % srvroot) - subprocess.Popen(args, stdout=open('/dev/null', 'w'), - stderr=subprocess.STDOUT).wait() - if os.path.exists("%s/apache2.conf" % srvroot): - os.unlink("%s/apache2.conf" % srvroot) - if os.path.exists("%s/click_through_license_access.log" % srvroot): - os.unlink("%s/click_through_license_access.log" % srvroot) - if os.path.exists("%s/click_through_license_error.log" % srvroot): - os.unlink("%s/click_through_license_error.log" % srvroot) - if os.path.exists("%s/rewrite.log" % srvroot): - os.unlink("%s/rewrite.log" % srvroot) - os.rename("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot) - - def setUp(self): - super(TestLicense, self).setUp() - global fetcher - fetcher = LicenseProtectedFileFetcher() - - def tearDown(self): - super(TestLicense, self).tearDown() - if isinstance(fetcher, LicenseProtectedFileFetcher): - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.unlink("%s/cookies.txt" % docroot) - - def test_licensefile_directly_samsung(self): - search = "Index of /" - testfile = fetcher.get(host + samsung_license_path) - self.assertThat(testfile, Contains(search)) - - def test_licensefile_directly_ste(self): - search = "Index of /" - testfile = fetcher.get(host + ste_license_path) - self.assertThat(testfile, Contains(search)) - - def test_licensefile_directly_linaro(self): - search = "Index of /" - testfile = fetcher.get(host + linaro_license_path) - self.assertThat(testfile, Contains(search)) - - def test_redirect_to_license_samsung(self): - search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY" - testfile = fetcher.get_or_return_license(host + samsung_test_file) - self.assertThat(testfile[0], Contains(search)) - - def test_redirect_to_license_ste(self): - search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY" - testfile = fetcher.get_or_return_license(host + ste_test_file) - self.assertThat(testfile[0], Contains(search)) - - def test_redirect_to_license_linaro(self): - search = "Linaro license." - testfile = fetcher.get_or_return_license(host + linaro_test_file) - self.assertThat(testfile[0], Contains(search)) - - def test_decline_license_samsung(self): - search = "License has not been accepted" - testfile = fetcher.get(host + samsung_test_file, accept_license=False) - self.assertThat(testfile, Contains(search)) - - def test_decline_license_ste(self): - search = "License has not been accepted" - testfile = fetcher.get(host + ste_test_file, accept_license=False) - self.assertThat(testfile, Contains(search)) - - def test_decline_license_linaro(self): - search = "License has not been accepted" - testfile = fetcher.get(host + linaro_test_file, accept_license=False) - self.assertThat(testfile, Contains(search)) - - def test_non_protected_dirs(self): - search = "This is always available." - testfile = fetcher.get(host + not_protected_test_file) - self.assertThat(testfile, Contains(search)) - - def test_never_available_dirs(self): - search = "Forbidden" - testfile = fetcher.get(host + never_available) - self.assertThat(testfile, Contains(search)) - - def test_accept_license_samsung_file(self): - search = "This is protected with click-through Samsung license." - testfile = fetcher.get(host + samsung_test_file) - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.rename("%s/cookies.txt" % docroot, - "%s/cookies.samsung" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_accept_license_samsung_dir(self): - search = "Index of /android/~linaro-android/staging-origen" - testfile = fetcher.get(host + os.path.dirname(samsung_test_file)) - self.assertThat(testfile, Contains(search)) - - def test_accept_license_ste_file(self): - search = "This is protected with click-through ST-E license." - testfile = fetcher.get(host + ste_test_file) - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_accept_license_ste_dir(self): - search = "Index of /android/~linaro-android/staging-snowball" - testfile = fetcher.get(host + os.path.dirname(ste_test_file)) - self.assertThat(testfile, Contains(search)) - - def test_license_accepted_samsung(self): - search = "This is protected with click-through Samsung license." - os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot) - testfile = fetcher.get(host + samsung_test_file) - self.assertThat(testfile, Contains(search)) - - def test_license_accepted_ste(self): - search = "This is protected with click-through ST-E license." - os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot) - testfile = fetcher.get(host + ste_test_file) - self.assertThat(testfile, Contains(search)) - - def test_internal_host_samsung(self): - search = "This is protected with click-through Samsung license." - subprocess.Popen(['sed', '-i', '/## Let internal hosts through ' - 'always./ a %s' % local_rewrite, '%s/.htaccess' % docroot], - stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() - testfile = fetcher.get(host + samsung_test_file, ignore_license=True) - shutil.copy("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_internal_host_ste(self): - search = "This is protected with click-through ST-E license." - subprocess.Popen(['sed', '-i', '/## Let internal hosts through ' - 'always./ a %s' % local_rewrite, '%s/.htaccess' % docroot], - stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() - testfile = fetcher.get(host + ste_test_file, ignore_license=True) - shutil.copy("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_ste_open_file(self): - search = "This is always available." - testfile = fetcher.get(host + ste_open_test_file) - self.assertThat(testfile, Contains(search)) - - def test_per_file_accept_license_samsung_file(self): - search = "This is protected with click-through Samsung license." - testfile = fetcher.get(host + per_file_samsung_test_file) - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.rename("%s/cookies.txt" % docroot, - "%s/cookies.samsung" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_per_file_accept_license_ste_file(self): - search = "This is protected with click-through ST-E license." - testfile = fetcher.get(host + per_file_ste_test_file) - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_per_file_license_accepted_samsung(self): - search = "This is protected with click-through Samsung license." - os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot) - testfile = fetcher.get(host + per_file_samsung_test_file, - ignore_license=True) - self.assertThat(testfile, Contains(search)) - - def test_per_file_license_accepted_ste(self): - search = "This is protected with click-through ST-E license." - os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot) - testfile = fetcher.get(host + per_file_ste_test_file, - ignore_license=True) - self.assertThat(testfile, Contains(search)) - - def test_per_file_non_protected_dirs(self): - search = "MANIFEST" - testfile = fetcher.get(host + per_file_not_protected_test_file) - self.assertThat(testfile, Contains(search)) - - def test_dir_containing_only_dirs(self): - search = "Index of /android/~linaro-android" - testfile = fetcher.get(host + dirs_only_dir) - self.assertThat(testfile, Contains(search)) - - def test_not_found_file(self): - search = "Not Found" - testfile = fetcher.get(host + not_found_test_file) - self.assertThat(testfile, Contains(search)) - - def test_build_info_non_protected_file(self): - search = "This is always available." - testfile = fetcher.get(host + build_info_not_protected_test_file) - self.assertThat(testfile, Contains(search)) - - def test_build_info_accept_license_samsung_file(self): - search = "This is protected with click-through Samsung license." - testfile = fetcher.get(host + build_info_samsung_test_file) - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.rename("%s/cookies.txt" % docroot, - "%s/cookies.samsung" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_build_info_accept_license_ste_file(self): - search = "This is protected with click-through ST-E license." - testfile = fetcher.get(host + build_info_ste_test_file) - fetcher.close() - if os.path.exists("%s/cookies.txt" % docroot): - os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot) - self.assertThat(testfile, Contains(search)) - - def test_build_info_openid_protection(self): - search = "This is protected with OpenID." - testfile = fetcher.get(host + build_info_openid_test_file) - fetcher.close() - self.assertThat(testfile, Contains(search)) - |