app/django/test/utils.py
changeset 323 ff1a9aa48cfd
parent 54 03e267d67478
equal deleted inserted replaced
322:6641e941ef1e 323:ff1a9aa48cfd
     1 import sys, time, os
     1 import sys, time, os
     2 from django.conf import settings
     2 from django.conf import settings
     3 from django.db import connection, get_creation_module
     3 from django.db import connection
     4 from django.core import mail
     4 from django.core import mail
     5 from django.core.management import call_command
       
     6 from django.dispatch import dispatcher
       
     7 from django.test import signals
     5 from django.test import signals
     8 from django.template import Template
     6 from django.template import Template
     9 from django.utils.translation import deactivate
     7 from django.utils.translation import deactivate
    10 
       
    11 # The prefix to put on the default database name when creating
       
    12 # the test database.
       
    13 TEST_DATABASE_PREFIX = 'test_'
       
    14 
     8 
    15 def instrumented_test_render(self, context):
     9 def instrumented_test_render(self, context):
    16     """
    10     """
    17     An instrumented Template render method, providing a signal
    11     An instrumented Template render method, providing a signal
    18     that can be intercepted by the test system Client
    12     that can be intercepted by the test system Client
    19     """
    13     """
    20     dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context)
    14     signals.template_rendered.send(sender=self, template=self, context=context)
    21     return self.nodelist.render(context)
    15     return self.nodelist.render(context)
    22 
    16 
    23 class TestSMTPConnection(object):
    17 class TestSMTPConnection(object):
    24     """A substitute SMTP connection for use during test sessions.
    18     """A substitute SMTP connection for use during test sessions.
    25     The test connection stores email messages in a dummy outbox,
    19     The test connection stores email messages in a dummy outbox,
    69     mail.SMTPConnection = mail.original_SMTPConnection
    63     mail.SMTPConnection = mail.original_SMTPConnection
    70     del mail.original_SMTPConnection
    64     del mail.original_SMTPConnection
    71 
    65 
    72     del mail.outbox
    66     del mail.outbox
    73 
    67 
    74 def _set_autocommit(connection):
       
    75     "Make sure a connection is in autocommit mode."
       
    76     if hasattr(connection.connection, "autocommit"):
       
    77         connection.connection.autocommit(True)
       
    78     elif hasattr(connection.connection, "set_isolation_level"):
       
    79         connection.connection.set_isolation_level(0)
       
    80 
       
    81 def get_mysql_create_suffix():
       
    82     suffix = []
       
    83     if settings.TEST_DATABASE_CHARSET:
       
    84         suffix.append('CHARACTER SET %s' % settings.TEST_DATABASE_CHARSET)
       
    85     if settings.TEST_DATABASE_COLLATION:
       
    86         suffix.append('COLLATE %s' % settings.TEST_DATABASE_COLLATION)
       
    87     return ' '.join(suffix)
       
    88 
       
    89 def get_postgresql_create_suffix():
       
    90     assert settings.TEST_DATABASE_COLLATION is None, "PostgreSQL does not support collation setting at database creation time."
       
    91     if settings.TEST_DATABASE_CHARSET:
       
    92         return "WITH ENCODING '%s'" % settings.TEST_DATABASE_CHARSET
       
    93     return ''
       
    94 
       
    95 def create_test_db(verbosity=1, autoclobber=False):
       
    96     """
       
    97     Creates a test database, prompting the user for confirmation if the
       
    98     database already exists. Returns the name of the test database created.
       
    99     """
       
   100     # If the database backend wants to create the test DB itself, let it
       
   101     creation_module = get_creation_module()
       
   102     if hasattr(creation_module, "create_test_db"):
       
   103         creation_module.create_test_db(settings, connection, verbosity, autoclobber)
       
   104         return
       
   105 
       
   106     if verbosity >= 1:
       
   107         print "Creating test database..."
       
   108     # If we're using SQLite, it's more convenient to test against an
       
   109     # in-memory database. Using the TEST_DATABASE_NAME setting you can still choose
       
   110     # to run on a physical database.
       
   111     if settings.DATABASE_ENGINE == "sqlite3":
       
   112         if settings.TEST_DATABASE_NAME and settings.TEST_DATABASE_NAME != ":memory:":
       
   113             TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
       
   114             # Erase the old test database
       
   115             if verbosity >= 1:
       
   116                 print "Destroying old test database..."
       
   117             if os.access(TEST_DATABASE_NAME, os.F_OK):
       
   118                 if not autoclobber:
       
   119                     confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
       
   120                 if autoclobber or confirm == 'yes':
       
   121                   try:
       
   122                       if verbosity >= 1:
       
   123                           print "Destroying old test database..."
       
   124                       os.remove(TEST_DATABASE_NAME)
       
   125                   except Exception, e:
       
   126                       sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
       
   127                       sys.exit(2)
       
   128                 else:
       
   129                     print "Tests cancelled."
       
   130                     sys.exit(1)
       
   131             if verbosity >= 1:
       
   132                 print "Creating test database..."
       
   133         else:
       
   134             TEST_DATABASE_NAME = ":memory:"
       
   135     else:
       
   136         suffix = {
       
   137             'postgresql': get_postgresql_create_suffix,
       
   138             'postgresql_psycopg2': get_postgresql_create_suffix,
       
   139             'mysql': get_mysql_create_suffix,
       
   140             'mysql_old': get_mysql_create_suffix,
       
   141         }.get(settings.DATABASE_ENGINE, lambda: '')()
       
   142         if settings.TEST_DATABASE_NAME:
       
   143             TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
       
   144         else:
       
   145             TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
       
   146 
       
   147         qn = connection.ops.quote_name
       
   148 
       
   149         # Create the test database and connect to it. We need to autocommit
       
   150         # if the database supports it because PostgreSQL doesn't allow
       
   151         # CREATE/DROP DATABASE statements within transactions.
       
   152         cursor = connection.cursor()
       
   153         _set_autocommit(connection)
       
   154         try:
       
   155             cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix))
       
   156         except Exception, e:
       
   157             sys.stderr.write("Got an error creating the test database: %s\n" % e)
       
   158             if not autoclobber:
       
   159                 confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
       
   160             if autoclobber or confirm == 'yes':
       
   161                 try:
       
   162                     if verbosity >= 1:
       
   163                         print "Destroying old test database..."
       
   164                     cursor.execute("DROP DATABASE %s" % qn(TEST_DATABASE_NAME))
       
   165                     if verbosity >= 1:
       
   166                         print "Creating test database..."
       
   167                     cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix))
       
   168                 except Exception, e:
       
   169                     sys.stderr.write("Got an error recreating the test database: %s\n" % e)
       
   170                     sys.exit(2)
       
   171             else:
       
   172                 print "Tests cancelled."
       
   173                 sys.exit(1)
       
   174 
       
   175     connection.close()
       
   176     settings.DATABASE_NAME = TEST_DATABASE_NAME
       
   177 
       
   178     call_command('syncdb', verbosity=verbosity, interactive=False)
       
   179 
       
   180     if settings.CACHE_BACKEND.startswith('db://'):
       
   181         cache_name = settings.CACHE_BACKEND[len('db://'):]
       
   182         call_command('createcachetable', cache_name)
       
   183 
       
   184     # Get a cursor (even though we don't need one yet). This has
       
   185     # the side effect of initializing the test database.
       
   186     cursor = connection.cursor()
       
   187 
       
   188     return TEST_DATABASE_NAME
       
   189 
       
   190 def destroy_test_db(old_database_name, verbosity=1):
       
   191     # If the database wants to drop the test DB itself, let it
       
   192     creation_module = get_creation_module()
       
   193     if hasattr(creation_module, "destroy_test_db"):
       
   194         creation_module.destroy_test_db(settings, connection, old_database_name, verbosity)
       
   195         return
       
   196 
       
   197     if verbosity >= 1:
       
   198         print "Destroying test database..."
       
   199     connection.close()
       
   200     TEST_DATABASE_NAME = settings.DATABASE_NAME
       
   201     settings.DATABASE_NAME = old_database_name
       
   202     if settings.DATABASE_ENGINE == "sqlite3":
       
   203         if TEST_DATABASE_NAME and TEST_DATABASE_NAME != ":memory:":
       
   204             # Remove the SQLite database file
       
   205             os.remove(TEST_DATABASE_NAME)
       
   206     else:
       
   207         # Remove the test database to clean up after
       
   208         # ourselves. Connect to the previous database (not the test database)
       
   209         # to do so, because it's not allowed to delete a database while being
       
   210         # connected to it.
       
   211         cursor = connection.cursor()
       
   212         _set_autocommit(connection)
       
   213         time.sleep(1) # To avoid "database is being accessed by other users" errors.
       
   214         cursor.execute("DROP DATABASE %s" % connection.ops.quote_name(TEST_DATABASE_NAME))
       
   215         connection.close()