|
1 #! -*- coding: utf-8 -*- |
|
2 import errno |
|
3 import os |
|
4 import shutil |
|
5 import unittest |
|
6 from StringIO import StringIO |
|
7 |
|
8 from django.core.files import temp as tempfile |
|
9 from django.core.files.uploadedfile import SimpleUploadedFile |
|
10 from django.test import TestCase, client |
|
11 from django.utils import simplejson |
|
12 from django.utils.hashcompat import sha_constructor |
|
13 from django.http.multipartparser import MultiPartParser |
|
14 |
|
15 from models import FileModel, temp_storage, UPLOAD_TO |
|
16 import uploadhandler |
|
17 |
|
18 |
|
19 UNICODE_FILENAME = u'test-0123456789_中文_Orléans.jpg' |
|
20 |
|
21 class FileUploadTests(TestCase): |
|
22 def test_simple_upload(self): |
|
23 post_data = { |
|
24 'name': 'Ringo', |
|
25 'file_field': open(__file__), |
|
26 } |
|
27 response = self.client.post('/file_uploads/upload/', post_data) |
|
28 self.assertEqual(response.status_code, 200) |
|
29 |
|
30 def test_large_upload(self): |
|
31 tdir = tempfile.gettempdir() |
|
32 |
|
33 file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir) |
|
34 file1.write('a' * (2 ** 21)) |
|
35 file1.seek(0) |
|
36 |
|
37 file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir) |
|
38 file2.write('a' * (10 * 2 ** 20)) |
|
39 file2.seek(0) |
|
40 |
|
41 post_data = { |
|
42 'name': 'Ringo', |
|
43 'file_field1': file1, |
|
44 'file_field2': file2, |
|
45 } |
|
46 |
|
47 for key in post_data.keys(): |
|
48 try: |
|
49 post_data[key + '_hash'] = sha_constructor(post_data[key].read()).hexdigest() |
|
50 post_data[key].seek(0) |
|
51 except AttributeError: |
|
52 post_data[key + '_hash'] = sha_constructor(post_data[key]).hexdigest() |
|
53 |
|
54 response = self.client.post('/file_uploads/verify/', post_data) |
|
55 |
|
56 self.assertEqual(response.status_code, 200) |
|
57 |
|
58 def test_unicode_file_name(self): |
|
59 tdir = tempfile.gettempdir() |
|
60 |
|
61 # This file contains chinese symbols and an accented char in the name. |
|
62 file1 = open(os.path.join(tdir, UNICODE_FILENAME.encode('utf-8')), 'w+b') |
|
63 file1.write('b' * (2 ** 10)) |
|
64 file1.seek(0) |
|
65 |
|
66 post_data = { |
|
67 'file_unicode': file1, |
|
68 } |
|
69 |
|
70 response = self.client.post('/file_uploads/unicode_name/', post_data) |
|
71 |
|
72 file1.close() |
|
73 try: |
|
74 os.unlink(file1.name) |
|
75 except: |
|
76 pass |
|
77 |
|
78 self.assertEqual(response.status_code, 200) |
|
79 |
|
80 def test_dangerous_file_names(self): |
|
81 """Uploaded file names should be sanitized before ever reaching the view.""" |
|
82 # This test simulates possible directory traversal attacks by a |
|
83 # malicious uploader We have to do some monkeybusiness here to construct |
|
84 # a malicious payload with an invalid file name (containing os.sep or |
|
85 # os.pardir). This similar to what an attacker would need to do when |
|
86 # trying such an attack. |
|
87 scary_file_names = [ |
|
88 "/tmp/hax0rd.txt", # Absolute path, *nix-style. |
|
89 "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle. |
|
90 "C:/Windows/hax0rd.txt", # Absolute path, broken-style. |
|
91 "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way. |
|
92 "/tmp\\hax0rd.txt", # Absolute path, broken by mixing. |
|
93 "subdir/hax0rd.txt", # Descendant path, *nix-style. |
|
94 "subdir\\hax0rd.txt", # Descendant path, win-style. |
|
95 "sub/dir\\hax0rd.txt", # Descendant path, mixed. |
|
96 "../../hax0rd.txt", # Relative path, *nix-style. |
|
97 "..\\..\\hax0rd.txt", # Relative path, win-style. |
|
98 "../..\\hax0rd.txt" # Relative path, mixed. |
|
99 ] |
|
100 |
|
101 payload = [] |
|
102 for i, name in enumerate(scary_file_names): |
|
103 payload.extend([ |
|
104 '--' + client.BOUNDARY, |
|
105 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name), |
|
106 'Content-Type: application/octet-stream', |
|
107 '', |
|
108 'You got pwnd.' |
|
109 ]) |
|
110 payload.extend([ |
|
111 '--' + client.BOUNDARY + '--', |
|
112 '', |
|
113 ]) |
|
114 |
|
115 payload = "\r\n".join(payload) |
|
116 r = { |
|
117 'CONTENT_LENGTH': len(payload), |
|
118 'CONTENT_TYPE': client.MULTIPART_CONTENT, |
|
119 'PATH_INFO': "/file_uploads/echo/", |
|
120 'REQUEST_METHOD': 'POST', |
|
121 'wsgi.input': client.FakePayload(payload), |
|
122 } |
|
123 response = self.client.request(**r) |
|
124 |
|
125 # The filenames should have been sanitized by the time it got to the view. |
|
126 recieved = simplejson.loads(response.content) |
|
127 for i, name in enumerate(scary_file_names): |
|
128 got = recieved["file%s" % i] |
|
129 self.assertEqual(got, "hax0rd.txt") |
|
130 |
|
131 def test_filename_overflow(self): |
|
132 """File names over 256 characters (dangerous on some platforms) get fixed up.""" |
|
133 name = "%s.txt" % ("f"*500) |
|
134 payload = "\r\n".join([ |
|
135 '--' + client.BOUNDARY, |
|
136 'Content-Disposition: form-data; name="file"; filename="%s"' % name, |
|
137 'Content-Type: application/octet-stream', |
|
138 '', |
|
139 'Oops.' |
|
140 '--' + client.BOUNDARY + '--', |
|
141 '', |
|
142 ]) |
|
143 r = { |
|
144 'CONTENT_LENGTH': len(payload), |
|
145 'CONTENT_TYPE': client.MULTIPART_CONTENT, |
|
146 'PATH_INFO': "/file_uploads/echo/", |
|
147 'REQUEST_METHOD': 'POST', |
|
148 'wsgi.input': client.FakePayload(payload), |
|
149 } |
|
150 got = simplejson.loads(self.client.request(**r).content) |
|
151 self.assert_(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file'])) |
|
152 |
|
153 def test_custom_upload_handler(self): |
|
154 # A small file (under the 5M quota) |
|
155 smallfile = tempfile.NamedTemporaryFile() |
|
156 smallfile.write('a' * (2 ** 21)) |
|
157 smallfile.seek(0) |
|
158 |
|
159 # A big file (over the quota) |
|
160 bigfile = tempfile.NamedTemporaryFile() |
|
161 bigfile.write('a' * (10 * 2 ** 20)) |
|
162 bigfile.seek(0) |
|
163 |
|
164 # Small file posting should work. |
|
165 response = self.client.post('/file_uploads/quota/', {'f': smallfile}) |
|
166 got = simplejson.loads(response.content) |
|
167 self.assert_('f' in got) |
|
168 |
|
169 # Large files don't go through. |
|
170 response = self.client.post("/file_uploads/quota/", {'f': bigfile}) |
|
171 got = simplejson.loads(response.content) |
|
172 self.assert_('f' not in got) |
|
173 |
|
174 def test_broken_custom_upload_handler(self): |
|
175 f = tempfile.NamedTemporaryFile() |
|
176 f.write('a' * (2 ** 21)) |
|
177 f.seek(0) |
|
178 |
|
179 # AttributeError: You cannot alter upload handlers after the upload has been processed. |
|
180 self.assertRaises( |
|
181 AttributeError, |
|
182 self.client.post, |
|
183 '/file_uploads/quota/broken/', |
|
184 {'f': f} |
|
185 ) |
|
186 |
|
187 def test_fileupload_getlist(self): |
|
188 file1 = tempfile.NamedTemporaryFile() |
|
189 file1.write('a' * (2 ** 23)) |
|
190 file1.seek(0) |
|
191 |
|
192 file2 = tempfile.NamedTemporaryFile() |
|
193 file2.write('a' * (2 * 2 ** 18)) |
|
194 file2.seek(0) |
|
195 |
|
196 file2a = tempfile.NamedTemporaryFile() |
|
197 file2a.write('a' * (5 * 2 ** 20)) |
|
198 file2a.seek(0) |
|
199 |
|
200 response = self.client.post('/file_uploads/getlist_count/', { |
|
201 'file1': file1, |
|
202 'field1': u'test', |
|
203 'field2': u'test3', |
|
204 'field3': u'test5', |
|
205 'field4': u'test6', |
|
206 'field5': u'test7', |
|
207 'file2': (file2, file2a) |
|
208 }) |
|
209 got = simplejson.loads(response.content) |
|
210 |
|
211 self.assertEqual(got.get('file1'), 1) |
|
212 self.assertEqual(got.get('file2'), 2) |
|
213 |
|
214 def test_file_error_blocking(self): |
|
215 """ |
|
216 The server should not block when there are upload errors (bug #8622). |
|
217 This can happen if something -- i.e. an exception handler -- tries to |
|
218 access POST while handling an error in parsing POST. This shouldn't |
|
219 cause an infinite loop! |
|
220 """ |
|
221 class POSTAccessingHandler(client.ClientHandler): |
|
222 """A handler that'll access POST during an exception.""" |
|
223 def handle_uncaught_exception(self, request, resolver, exc_info): |
|
224 ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info) |
|
225 p = request.POST |
|
226 return ret |
|
227 |
|
228 post_data = { |
|
229 'name': 'Ringo', |
|
230 'file_field': open(__file__), |
|
231 } |
|
232 # Maybe this is a little more complicated that it needs to be; but if |
|
233 # the django.test.client.FakePayload.read() implementation changes then |
|
234 # this test would fail. So we need to know exactly what kind of error |
|
235 # it raises when there is an attempt to read more than the available bytes: |
|
236 try: |
|
237 client.FakePayload('a').read(2) |
|
238 except Exception, reference_error: |
|
239 pass |
|
240 |
|
241 # install the custom handler that tries to access request.POST |
|
242 self.client.handler = POSTAccessingHandler() |
|
243 |
|
244 try: |
|
245 response = self.client.post('/file_uploads/upload_errors/', post_data) |
|
246 except reference_error.__class__, err: |
|
247 self.failIf( |
|
248 str(err) == str(reference_error), |
|
249 "Caught a repeated exception that'll cause an infinite loop in file uploads." |
|
250 ) |
|
251 except Exception, err: |
|
252 # CustomUploadError is the error that should have been raised |
|
253 self.assertEqual(err.__class__, uploadhandler.CustomUploadError) |
|
254 |
|
255 class DirectoryCreationTests(unittest.TestCase): |
|
256 """ |
|
257 Tests for error handling during directory creation |
|
258 via _save_FIELD_file (ticket #6450) |
|
259 """ |
|
260 def setUp(self): |
|
261 self.obj = FileModel() |
|
262 if not os.path.isdir(temp_storage.location): |
|
263 os.makedirs(temp_storage.location) |
|
264 if os.path.isdir(UPLOAD_TO): |
|
265 os.chmod(UPLOAD_TO, 0700) |
|
266 shutil.rmtree(UPLOAD_TO) |
|
267 |
|
268 def tearDown(self): |
|
269 os.chmod(temp_storage.location, 0700) |
|
270 shutil.rmtree(temp_storage.location) |
|
271 |
|
272 def test_readonly_root(self): |
|
273 """Permission errors are not swallowed""" |
|
274 os.chmod(temp_storage.location, 0500) |
|
275 try: |
|
276 self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x')) |
|
277 except OSError, err: |
|
278 self.assertEquals(err.errno, errno.EACCES) |
|
279 except Exception, err: |
|
280 self.fail("OSError [Errno %s] not raised." % errno.EACCES) |
|
281 |
|
282 def test_not_a_directory(self): |
|
283 """The correct IOError is raised when the upload directory name exists but isn't a directory""" |
|
284 # Create a file with the upload directory name |
|
285 fd = open(UPLOAD_TO, 'w') |
|
286 fd.close() |
|
287 try: |
|
288 self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x')) |
|
289 except IOError, err: |
|
290 # The test needs to be done on a specific string as IOError |
|
291 # is raised even without the patch (just not early enough) |
|
292 self.assertEquals(err.args[0], |
|
293 "%s exists and is not a directory." % UPLOAD_TO) |
|
294 except: |
|
295 self.fail("IOError not raised") |
|
296 |
|
297 class MultiParserTests(unittest.TestCase): |
|
298 |
|
299 def test_empty_upload_handlers(self): |
|
300 # We're not actually parsing here; just checking if the parser properly |
|
301 # instantiates with empty upload handlers. |
|
302 parser = MultiPartParser({ |
|
303 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', |
|
304 'CONTENT_LENGTH': '1' |
|
305 }, StringIO('x'), [], 'utf-8') |