parts/django/tests/regressiontests/file_uploads/tests.py
changeset 307 c6bca38c1cbf
equal deleted inserted replaced
306:5ff1fc726848 307:c6bca38c1cbf
       
     1 #! -*- coding: utf-8 -*-
       
     2 import errno
       
     3 import os
       
     4 import shutil
       
     5 import unittest
       
     6 from StringIO import StringIO
       
     7 
       
     8 from django.core.files import temp as tempfile
       
     9 from django.core.files.uploadedfile import SimpleUploadedFile
       
    10 from django.test import TestCase, client
       
    11 from django.utils import simplejson
       
    12 from django.utils.hashcompat import sha_constructor
       
    13 from django.http.multipartparser import MultiPartParser
       
    14 
       
    15 from models import FileModel, temp_storage, UPLOAD_TO
       
    16 import uploadhandler
       
    17 
       
    18 
       
    19 UNICODE_FILENAME = u'test-0123456789_中文_Orléans.jpg'
       
    20 
       
    21 class FileUploadTests(TestCase):
       
    22     def test_simple_upload(self):
       
    23         post_data = {
       
    24             'name': 'Ringo',
       
    25             'file_field': open(__file__),
       
    26         }
       
    27         response = self.client.post('/file_uploads/upload/', post_data)
       
    28         self.assertEqual(response.status_code, 200)
       
    29 
       
    30     def test_large_upload(self):
       
    31         tdir = tempfile.gettempdir()
       
    32 
       
    33         file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
       
    34         file1.write('a' * (2 ** 21))
       
    35         file1.seek(0)
       
    36 
       
    37         file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
       
    38         file2.write('a' * (10 * 2 ** 20))
       
    39         file2.seek(0)
       
    40 
       
    41         post_data = {
       
    42             'name': 'Ringo',
       
    43             'file_field1': file1,
       
    44             'file_field2': file2,
       
    45             }
       
    46 
       
    47         for key in post_data.keys():
       
    48             try:
       
    49                 post_data[key + '_hash'] = sha_constructor(post_data[key].read()).hexdigest()
       
    50                 post_data[key].seek(0)
       
    51             except AttributeError:
       
    52                 post_data[key + '_hash'] = sha_constructor(post_data[key]).hexdigest()
       
    53 
       
    54         response = self.client.post('/file_uploads/verify/', post_data)
       
    55 
       
    56         self.assertEqual(response.status_code, 200)
       
    57 
       
    58     def test_unicode_file_name(self):
       
    59         tdir = tempfile.gettempdir()
       
    60 
       
    61         # This file contains chinese symbols and an accented char in the name.
       
    62         file1 = open(os.path.join(tdir, UNICODE_FILENAME.encode('utf-8')), 'w+b')
       
    63         file1.write('b' * (2 ** 10))
       
    64         file1.seek(0)
       
    65 
       
    66         post_data = {
       
    67             'file_unicode': file1,
       
    68             }
       
    69 
       
    70         response = self.client.post('/file_uploads/unicode_name/', post_data)
       
    71 
       
    72         file1.close()
       
    73         try:
       
    74             os.unlink(file1.name)
       
    75         except:
       
    76             pass
       
    77 
       
    78         self.assertEqual(response.status_code, 200)
       
    79 
       
    80     def test_dangerous_file_names(self):
       
    81         """Uploaded file names should be sanitized before ever reaching the view."""
       
    82         # This test simulates possible directory traversal attacks by a
       
    83         # malicious uploader We have to do some monkeybusiness here to construct
       
    84         # a malicious payload with an invalid file name (containing os.sep or
       
    85         # os.pardir). This similar to what an attacker would need to do when
       
    86         # trying such an attack.
       
    87         scary_file_names = [
       
    88             "/tmp/hax0rd.txt",          # Absolute path, *nix-style.
       
    89             "C:\\Windows\\hax0rd.txt",  # Absolute path, win-syle.
       
    90             "C:/Windows/hax0rd.txt",    # Absolute path, broken-style.
       
    91             "\\tmp\\hax0rd.txt",        # Absolute path, broken in a different way.
       
    92             "/tmp\\hax0rd.txt",         # Absolute path, broken by mixing.
       
    93             "subdir/hax0rd.txt",        # Descendant path, *nix-style.
       
    94             "subdir\\hax0rd.txt",       # Descendant path, win-style.
       
    95             "sub/dir\\hax0rd.txt",      # Descendant path, mixed.
       
    96             "../../hax0rd.txt",         # Relative path, *nix-style.
       
    97             "..\\..\\hax0rd.txt",       # Relative path, win-style.
       
    98             "../..\\hax0rd.txt"         # Relative path, mixed.
       
    99         ]
       
   100 
       
   101         payload = []
       
   102         for i, name in enumerate(scary_file_names):
       
   103             payload.extend([
       
   104                 '--' + client.BOUNDARY,
       
   105                 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
       
   106                 'Content-Type: application/octet-stream',
       
   107                 '',
       
   108                 'You got pwnd.'
       
   109             ])
       
   110         payload.extend([
       
   111             '--' + client.BOUNDARY + '--',
       
   112             '',
       
   113         ])
       
   114 
       
   115         payload = "\r\n".join(payload)
       
   116         r = {
       
   117             'CONTENT_LENGTH': len(payload),
       
   118             'CONTENT_TYPE':   client.MULTIPART_CONTENT,
       
   119             'PATH_INFO':      "/file_uploads/echo/",
       
   120             'REQUEST_METHOD': 'POST',
       
   121             'wsgi.input':     client.FakePayload(payload),
       
   122         }
       
   123         response = self.client.request(**r)
       
   124 
       
   125         # The filenames should have been sanitized by the time it got to the view.
       
   126         recieved = simplejson.loads(response.content)
       
   127         for i, name in enumerate(scary_file_names):
       
   128             got = recieved["file%s" % i]
       
   129             self.assertEqual(got, "hax0rd.txt")
       
   130 
       
   131     def test_filename_overflow(self):
       
   132         """File names over 256 characters (dangerous on some platforms) get fixed up."""
       
   133         name = "%s.txt" % ("f"*500)
       
   134         payload = "\r\n".join([
       
   135             '--' + client.BOUNDARY,
       
   136             'Content-Disposition: form-data; name="file"; filename="%s"' % name,
       
   137             'Content-Type: application/octet-stream',
       
   138             '',
       
   139             'Oops.'
       
   140             '--' + client.BOUNDARY + '--',
       
   141             '',
       
   142         ])
       
   143         r = {
       
   144             'CONTENT_LENGTH': len(payload),
       
   145             'CONTENT_TYPE':   client.MULTIPART_CONTENT,
       
   146             'PATH_INFO':      "/file_uploads/echo/",
       
   147             'REQUEST_METHOD': 'POST',
       
   148             'wsgi.input':     client.FakePayload(payload),
       
   149         }
       
   150         got = simplejson.loads(self.client.request(**r).content)
       
   151         self.assert_(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
       
   152 
       
   153     def test_custom_upload_handler(self):
       
   154         # A small file (under the 5M quota)
       
   155         smallfile = tempfile.NamedTemporaryFile()
       
   156         smallfile.write('a' * (2 ** 21))
       
   157         smallfile.seek(0)
       
   158 
       
   159         # A big file (over the quota)
       
   160         bigfile = tempfile.NamedTemporaryFile()
       
   161         bigfile.write('a' * (10 * 2 ** 20))
       
   162         bigfile.seek(0)
       
   163 
       
   164         # Small file posting should work.
       
   165         response = self.client.post('/file_uploads/quota/', {'f': smallfile})
       
   166         got = simplejson.loads(response.content)
       
   167         self.assert_('f' in got)
       
   168 
       
   169         # Large files don't go through.
       
   170         response = self.client.post("/file_uploads/quota/", {'f': bigfile})
       
   171         got = simplejson.loads(response.content)
       
   172         self.assert_('f' not in got)
       
   173 
       
   174     def test_broken_custom_upload_handler(self):
       
   175         f = tempfile.NamedTemporaryFile()
       
   176         f.write('a' * (2 ** 21))
       
   177         f.seek(0)
       
   178 
       
   179         # AttributeError: You cannot alter upload handlers after the upload has been processed.
       
   180         self.assertRaises(
       
   181             AttributeError,
       
   182             self.client.post,
       
   183             '/file_uploads/quota/broken/',
       
   184             {'f': f}
       
   185         )
       
   186 
       
   187     def test_fileupload_getlist(self):
       
   188         file1 = tempfile.NamedTemporaryFile()
       
   189         file1.write('a' * (2 ** 23))
       
   190         file1.seek(0)
       
   191 
       
   192         file2 = tempfile.NamedTemporaryFile()
       
   193         file2.write('a' * (2 * 2 ** 18))
       
   194         file2.seek(0)
       
   195 
       
   196         file2a = tempfile.NamedTemporaryFile()
       
   197         file2a.write('a' * (5 * 2 ** 20))
       
   198         file2a.seek(0)
       
   199 
       
   200         response = self.client.post('/file_uploads/getlist_count/', {
       
   201             'file1': file1,
       
   202             'field1': u'test',
       
   203             'field2': u'test3',
       
   204             'field3': u'test5',
       
   205             'field4': u'test6',
       
   206             'field5': u'test7',
       
   207             'file2': (file2, file2a)
       
   208         })
       
   209         got = simplejson.loads(response.content)
       
   210 
       
   211         self.assertEqual(got.get('file1'), 1)
       
   212         self.assertEqual(got.get('file2'), 2)
       
   213 
       
   214     def test_file_error_blocking(self):
       
   215         """
       
   216         The server should not block when there are upload errors (bug #8622).
       
   217         This can happen if something -- i.e. an exception handler -- tries to
       
   218         access POST while handling an error in parsing POST. This shouldn't
       
   219         cause an infinite loop!
       
   220         """
       
   221         class POSTAccessingHandler(client.ClientHandler):
       
   222             """A handler that'll access POST during an exception."""
       
   223             def handle_uncaught_exception(self, request, resolver, exc_info):
       
   224                 ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
       
   225                 p = request.POST
       
   226                 return ret
       
   227         
       
   228         post_data = {
       
   229             'name': 'Ringo',
       
   230             'file_field': open(__file__),
       
   231         }
       
   232         # Maybe this is a little more complicated that it needs to be; but if
       
   233         # the django.test.client.FakePayload.read() implementation changes then
       
   234         # this test would fail.  So we need to know exactly what kind of error
       
   235         # it raises when there is an attempt to read more than the available bytes:
       
   236         try:
       
   237             client.FakePayload('a').read(2)
       
   238         except Exception, reference_error:
       
   239             pass
       
   240 
       
   241         # install the custom handler that tries to access request.POST
       
   242         self.client.handler = POSTAccessingHandler()
       
   243 
       
   244         try:
       
   245             response = self.client.post('/file_uploads/upload_errors/', post_data)
       
   246         except reference_error.__class__, err:
       
   247             self.failIf(
       
   248                 str(err) == str(reference_error), 
       
   249                 "Caught a repeated exception that'll cause an infinite loop in file uploads."
       
   250             )
       
   251         except Exception, err:
       
   252             # CustomUploadError is the error that should have been raised
       
   253             self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
       
   254 
       
   255 class DirectoryCreationTests(unittest.TestCase):
       
   256     """
       
   257     Tests for error handling during directory creation
       
   258     via _save_FIELD_file (ticket #6450)
       
   259     """
       
   260     def setUp(self):
       
   261         self.obj = FileModel()
       
   262         if not os.path.isdir(temp_storage.location):
       
   263             os.makedirs(temp_storage.location)
       
   264         if os.path.isdir(UPLOAD_TO):
       
   265             os.chmod(UPLOAD_TO, 0700)
       
   266             shutil.rmtree(UPLOAD_TO)
       
   267 
       
   268     def tearDown(self):
       
   269         os.chmod(temp_storage.location, 0700)
       
   270         shutil.rmtree(temp_storage.location)
       
   271 
       
   272     def test_readonly_root(self):
       
   273         """Permission errors are not swallowed"""
       
   274         os.chmod(temp_storage.location, 0500)
       
   275         try:
       
   276             self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
       
   277         except OSError, err:
       
   278             self.assertEquals(err.errno, errno.EACCES)
       
   279         except Exception, err:
       
   280             self.fail("OSError [Errno %s] not raised." % errno.EACCES)
       
   281 
       
   282     def test_not_a_directory(self):
       
   283         """The correct IOError is raised when the upload directory name exists but isn't a directory"""
       
   284         # Create a file with the upload directory name
       
   285         fd = open(UPLOAD_TO, 'w')
       
   286         fd.close()
       
   287         try:
       
   288             self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
       
   289         except IOError, err:
       
   290             # The test needs to be done on a specific string as IOError
       
   291             # is raised even without the patch (just not early enough)
       
   292             self.assertEquals(err.args[0],
       
   293                               "%s exists and is not a directory." % UPLOAD_TO)
       
   294         except:
       
   295             self.fail("IOError not raised")
       
   296 
       
   297 class MultiParserTests(unittest.TestCase):
       
   298 
       
   299     def test_empty_upload_handlers(self):
       
   300         # We're not actually parsing here; just checking if the parser properly
       
   301         # instantiates with empty upload handlers.
       
   302         parser = MultiPartParser({
       
   303             'CONTENT_TYPE':     'multipart/form-data; boundary=_foo',
       
   304             'CONTENT_LENGTH':   '1'
       
   305         }, StringIO('x'), [], 'utf-8')