--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/app/django/test/utils.py Fri Jul 18 18:22:23 2008 +0000
@@ -0,0 +1,215 @@
+import sys, time, os
+from django.conf import settings
+from django.db import connection, get_creation_module
+from django.core import mail
+from django.core.management import call_command
+from django.dispatch import dispatcher
+from django.test import signals
+from django.template import Template
+from django.utils.translation import deactivate
+
+# The prefix to put on the default database name when creating
+# the test database.
+TEST_DATABASE_PREFIX = 'test_'
+
+def instrumented_test_render(self, context):
+ """
+ An instrumented Template render method, providing a signal
+ that can be intercepted by the test system Client
+ """
+ dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context)
+ return self.nodelist.render(context)
+
+class TestSMTPConnection(object):
+ """A substitute SMTP connection for use during test sessions.
+ The test connection stores email messages in a dummy outbox,
+ rather than sending them out on the wire.
+
+ """
+ def __init__(*args, **kwargs):
+ pass
+ def open(self):
+ "Mock the SMTPConnection open() interface"
+ pass
+ def close(self):
+ "Mock the SMTPConnection close() interface"
+ pass
+ def send_messages(self, messages):
+ "Redirect messages to the dummy outbox"
+ mail.outbox.extend(messages)
+ return len(messages)
+
+def setup_test_environment():
+ """Perform any global pre-test setup. This involves:
+
+ - Installing the instrumented test renderer
+ - Diverting the email sending functions to a test buffer
+ - Setting the active locale to match the LANGUAGE_CODE setting.
+ """
+ Template.original_render = Template.render
+ Template.render = instrumented_test_render
+
+ mail.original_SMTPConnection = mail.SMTPConnection
+ mail.SMTPConnection = TestSMTPConnection
+
+ mail.outbox = []
+
+ deactivate()
+
+def teardown_test_environment():
+ """Perform any global post-test teardown. This involves:
+
+ - Restoring the original test renderer
+ - Restoring the email sending functions
+
+ """
+ Template.render = Template.original_render
+ del Template.original_render
+
+ mail.SMTPConnection = mail.original_SMTPConnection
+ del mail.original_SMTPConnection
+
+ del mail.outbox
+
+def _set_autocommit(connection):
+ "Make sure a connection is in autocommit mode."
+ if hasattr(connection.connection, "autocommit"):
+ connection.connection.autocommit(True)
+ elif hasattr(connection.connection, "set_isolation_level"):
+ connection.connection.set_isolation_level(0)
+
+def get_mysql_create_suffix():
+ suffix = []
+ if settings.TEST_DATABASE_CHARSET:
+ suffix.append('CHARACTER SET %s' % settings.TEST_DATABASE_CHARSET)
+ if settings.TEST_DATABASE_COLLATION:
+ suffix.append('COLLATE %s' % settings.TEST_DATABASE_COLLATION)
+ return ' '.join(suffix)
+
+def get_postgresql_create_suffix():
+ assert settings.TEST_DATABASE_COLLATION is None, "PostgreSQL does not support collation setting at database creation time."
+ if settings.TEST_DATABASE_CHARSET:
+ return "WITH ENCODING '%s'" % settings.TEST_DATABASE_CHARSET
+ return ''
+
+def create_test_db(verbosity=1, autoclobber=False):
+ """
+ Creates a test database, prompting the user for confirmation if the
+ database already exists. Returns the name of the test database created.
+ """
+ # If the database backend wants to create the test DB itself, let it
+ creation_module = get_creation_module()
+ if hasattr(creation_module, "create_test_db"):
+ creation_module.create_test_db(settings, connection, verbosity, autoclobber)
+ return
+
+ if verbosity >= 1:
+ print "Creating test database..."
+ # If we're using SQLite, it's more convenient to test against an
+ # in-memory database. Using the TEST_DATABASE_NAME setting you can still choose
+ # to run on a physical database.
+ if settings.DATABASE_ENGINE == "sqlite3":
+ if settings.TEST_DATABASE_NAME and settings.TEST_DATABASE_NAME != ":memory:":
+ TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
+ # Erase the old test database
+ if verbosity >= 1:
+ print "Destroying old test database..."
+ if os.access(TEST_DATABASE_NAME, os.F_OK):
+ if not autoclobber:
+ confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
+ if autoclobber or confirm == 'yes':
+ try:
+ if verbosity >= 1:
+ print "Destroying old test database..."
+ os.remove(TEST_DATABASE_NAME)
+ except Exception, e:
+ sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
+ sys.exit(2)
+ else:
+ print "Tests cancelled."
+ sys.exit(1)
+ if verbosity >= 1:
+ print "Creating test database..."
+ else:
+ TEST_DATABASE_NAME = ":memory:"
+ else:
+ suffix = {
+ 'postgresql': get_postgresql_create_suffix,
+ 'postgresql_psycopg2': get_postgresql_create_suffix,
+ 'mysql': get_mysql_create_suffix,
+ 'mysql_old': get_mysql_create_suffix,
+ }.get(settings.DATABASE_ENGINE, lambda: '')()
+ if settings.TEST_DATABASE_NAME:
+ TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
+ else:
+ TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
+
+ qn = connection.ops.quote_name
+
+ # Create the test database and connect to it. We need to autocommit
+ # if the database supports it because PostgreSQL doesn't allow
+ # CREATE/DROP DATABASE statements within transactions.
+ cursor = connection.cursor()
+ _set_autocommit(connection)
+ try:
+ cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix))
+ except Exception, e:
+ sys.stderr.write("Got an error creating the test database: %s\n" % e)
+ if not autoclobber:
+ confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
+ if autoclobber or confirm == 'yes':
+ try:
+ if verbosity >= 1:
+ print "Destroying old test database..."
+ cursor.execute("DROP DATABASE %s" % qn(TEST_DATABASE_NAME))
+ if verbosity >= 1:
+ print "Creating test database..."
+ cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix))
+ except Exception, e:
+ sys.stderr.write("Got an error recreating the test database: %s\n" % e)
+ sys.exit(2)
+ else:
+ print "Tests cancelled."
+ sys.exit(1)
+
+ connection.close()
+ settings.DATABASE_NAME = TEST_DATABASE_NAME
+
+ call_command('syncdb', verbosity=verbosity, interactive=False)
+
+ if settings.CACHE_BACKEND.startswith('db://'):
+ cache_name = settings.CACHE_BACKEND[len('db://'):]
+ call_command('createcachetable', cache_name)
+
+ # Get a cursor (even though we don't need one yet). This has
+ # the side effect of initializing the test database.
+ cursor = connection.cursor()
+
+ return TEST_DATABASE_NAME
+
+def destroy_test_db(old_database_name, verbosity=1):
+ # If the database wants to drop the test DB itself, let it
+ creation_module = get_creation_module()
+ if hasattr(creation_module, "destroy_test_db"):
+ creation_module.destroy_test_db(settings, connection, old_database_name, verbosity)
+ return
+
+ if verbosity >= 1:
+ print "Destroying test database..."
+ connection.close()
+ TEST_DATABASE_NAME = settings.DATABASE_NAME
+ settings.DATABASE_NAME = old_database_name
+ if settings.DATABASE_ENGINE == "sqlite3":
+ if TEST_DATABASE_NAME and TEST_DATABASE_NAME != ":memory:":
+ # Remove the SQLite database file
+ os.remove(TEST_DATABASE_NAME)
+ else:
+ # Remove the test database to clean up after
+ # ourselves. Connect to the previous database (not the test database)
+ # to do so, because it's not allowed to delete a database while being
+ # connected to it.
+ cursor = connection.cursor()
+ _set_autocommit(connection)
+ time.sleep(1) # To avoid "database is being accessed by other users" errors.
+ cursor.execute("DROP DATABASE %s" % connection.ops.quote_name(TEST_DATABASE_NAME))
+ connection.close()