1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
import datetime
import mimetypes
import os
import time
import boto
import boto.s3.key
from django.conf import settings
from django.http import HttpResponseRedirect
from license_protected_downloads.artifact.base import (
Artifact,
cached_prop,
)
class S3Artifact(Artifact):
bucket = None
@classmethod
def get_bucket(cls):
'''Keeps a single bucket object cached for the duration of a request'''
if not cls.bucket:
b = getattr(settings, 'S3_BUCKET', None)
if b:
c = boto.connect_s3(
settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY)
cls.bucket = c.get_bucket(settings.S3_BUCKET)
return cls.bucket
@staticmethod
def pathname2url(path):
'''There are certain characters S3 doesn't deal with for key names when
trying to generate signed urls. This url-encodes them in a way that
makes S3 happy.'''
# currently the "~" is the only character we know breaks things
return path.replace('~', '%7E')
@staticmethod
def url2pathname(path):
'''Reverses the changes done by pathname2url'''
return path.replace('%7E', '~')
def __init__(self, bucket, item, parent, human_readable):
base = item.name.replace(settings.S3_PREFIX_PATH, '')
if base:
base = '/' + os.path.dirname(base)
if hasattr(item, 'size'):
file_name = os.path.basename(item.name)
self.mtype = mimetypes.guess_type(item.name)[0]
dt = datetime.datetime.strptime(
item.last_modified, "%Y-%m-%dT%H:%M:%S.000Z")
item.last_modified = time.mktime(dt.timetuple())
self.item = item
else:
self.mtype = 'folder'
self.children = []
if base:
base = os.path.dirname(base)
file_name = os.path.basename(item.name[:-1])
else:
file_name = ''
item.size = 0
item.last_modified = '-'
file_name = self.url2pathname(file_name)
self.bucket = bucket
self.parent = parent
if parent and hasattr(self.parent, 'children'):
self.parent.children.append(self)
super(S3Artifact, self).__init__(
base, file_name, item.size, item.last_modified, human_readable)
def get_type(self):
if self.human_readable:
if self.mtype is None:
return 'other'
elif self.mtype.split('/')[0] == 'text':
return 'text'
return self.mtype
def get_file_download_response(self, method='GET', force_http=False):
"Return HttpResponse which will send path to user's browser."
assert not self.isdir()
return HttpResponseRedirect(
self.item.generate_url(90, method=method, force_http=force_http))
@cached_prop
def build_info_buffer(self):
if self.parent and not self.isdir():
return self.parent.build_info_buffer
if self.urlbase == '/':
key = settings.S3_PREFIX_PATH[:-1]
else:
key = settings.S3_PREFIX_PATH + self.urlbase[1:]
if self.isdir():
key += '/' + self.file_name
key += '/BUILD-INFO.txt'
try:
key = boto.s3.key.Key(self.bucket, key)
return key.get_contents_as_string()
except boto.exception.S3ResponseError:
pass # No build-info file, return None - its okay
@cached_prop
def _container_eulas(self):
if not self.isdir() and self.parent:
return self.parent._container_eulas
prefix = settings.S3_PREFIX_PATH + self.urlbase[1:]
if prefix[-1] != '/':
# s3 listing needs '/' to do a dir listing
prefix = prefix + '/'
if self.isdir():
prefix += self.file_name + '/'
eulas = []
for x in self.bucket.list(prefix=prefix, delimiter='/'):
if isinstance(x, boto.s3.key.Key) and 'EULA.txt' in x.name:
eulas.append(os.path.basename(x.name))
return eulas
def get_eulas(self):
'''find eulas for this artifact
if this is a file, it will use the parent container's eula which
we keep cached, so that we only hit s3 one time
'''
return self._container_eulas
def get_file_contents(self, fname):
if self.urlbase == '/':
key = settings.S3_PREFIX_PATH[:-1]
else:
key = settings.S3_PREFIX_PATH + self.urlbase[1:]
if self.isdir():
key += '/' + self.file_name + '/' + fname
else:
key += '/' + os.path.dirname(self.file_name) + fname
try:
key = boto.s3.key.Key(self.bucket, self.pathname2url(key))
return key.get_contents_as_string()
except boto.exception.S3ResponseError:
pass # return None - its okay
def get_textile_files(self):
assert self.isdir()
# NOTE: This logic is assuming some optimizations based on how files
# are currently published. Legacy publishing required more complex
# searching but all new publishing will work with this logic.
allowed = settings.ANDROID_FILES + settings.LINUX_FILES
for x in self.children:
if not x.isdir() and os.path.basename(x.item.name) in allowed:
yield (x.item.name, x.item)
def get_annotated_manifest(self):
assert self.isdir()
for x in self.children:
if not x.isdir() and \
os.path.basename(x.item.name) == settings.ANNOTATED_XML:
return x.item.read()
def isdir(self):
return self.mtype == 'folder'
def get_real_name(self):
url = self.url()
path = self.get_file_contents('.s3_linked_from')
if path:
path = path.replace(settings.S3_PREFIX_PATH, '/')
url = url.replace(os.path.dirname(url), path)
return url
|