1 import sys, time, os |
1 import sys, time, os |
2 from django.conf import settings |
2 from django.conf import settings |
3 from django.db import connection, get_creation_module |
3 from django.db import connection |
4 from django.core import mail |
4 from django.core import mail |
5 from django.core.management import call_command |
|
6 from django.dispatch import dispatcher |
|
7 from django.test import signals |
5 from django.test import signals |
8 from django.template import Template |
6 from django.template import Template |
9 from django.utils.translation import deactivate |
7 from django.utils.translation import deactivate |
10 |
|
11 # The prefix to put on the default database name when creating |
|
12 # the test database. |
|
13 TEST_DATABASE_PREFIX = 'test_' |
|
14 |
8 |
15 def instrumented_test_render(self, context): |
9 def instrumented_test_render(self, context): |
16 """ |
10 """ |
17 An instrumented Template render method, providing a signal |
11 An instrumented Template render method, providing a signal |
18 that can be intercepted by the test system Client |
12 that can be intercepted by the test system Client |
19 """ |
13 """ |
20 dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context) |
14 signals.template_rendered.send(sender=self, template=self, context=context) |
21 return self.nodelist.render(context) |
15 return self.nodelist.render(context) |
22 |
16 |
23 class TestSMTPConnection(object): |
17 class TestSMTPConnection(object): |
24 """A substitute SMTP connection for use during test sessions. |
18 """A substitute SMTP connection for use during test sessions. |
25 The test connection stores email messages in a dummy outbox, |
19 The test connection stores email messages in a dummy outbox, |
69 mail.SMTPConnection = mail.original_SMTPConnection |
63 mail.SMTPConnection = mail.original_SMTPConnection |
70 del mail.original_SMTPConnection |
64 del mail.original_SMTPConnection |
71 |
65 |
72 del mail.outbox |
66 del mail.outbox |
73 |
67 |
74 def _set_autocommit(connection): |
|
75 "Make sure a connection is in autocommit mode." |
|
76 if hasattr(connection.connection, "autocommit"): |
|
77 connection.connection.autocommit(True) |
|
78 elif hasattr(connection.connection, "set_isolation_level"): |
|
79 connection.connection.set_isolation_level(0) |
|
80 |
|
81 def get_mysql_create_suffix(): |
|
82 suffix = [] |
|
83 if settings.TEST_DATABASE_CHARSET: |
|
84 suffix.append('CHARACTER SET %s' % settings.TEST_DATABASE_CHARSET) |
|
85 if settings.TEST_DATABASE_COLLATION: |
|
86 suffix.append('COLLATE %s' % settings.TEST_DATABASE_COLLATION) |
|
87 return ' '.join(suffix) |
|
88 |
|
89 def get_postgresql_create_suffix(): |
|
90 assert settings.TEST_DATABASE_COLLATION is None, "PostgreSQL does not support collation setting at database creation time." |
|
91 if settings.TEST_DATABASE_CHARSET: |
|
92 return "WITH ENCODING '%s'" % settings.TEST_DATABASE_CHARSET |
|
93 return '' |
|
94 |
|
95 def create_test_db(verbosity=1, autoclobber=False): |
|
96 """ |
|
97 Creates a test database, prompting the user for confirmation if the |
|
98 database already exists. Returns the name of the test database created. |
|
99 """ |
|
100 # If the database backend wants to create the test DB itself, let it |
|
101 creation_module = get_creation_module() |
|
102 if hasattr(creation_module, "create_test_db"): |
|
103 creation_module.create_test_db(settings, connection, verbosity, autoclobber) |
|
104 return |
|
105 |
|
106 if verbosity >= 1: |
|
107 print "Creating test database..." |
|
108 # If we're using SQLite, it's more convenient to test against an |
|
109 # in-memory database. Using the TEST_DATABASE_NAME setting you can still choose |
|
110 # to run on a physical database. |
|
111 if settings.DATABASE_ENGINE == "sqlite3": |
|
112 if settings.TEST_DATABASE_NAME and settings.TEST_DATABASE_NAME != ":memory:": |
|
113 TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME |
|
114 # Erase the old test database |
|
115 if verbosity >= 1: |
|
116 print "Destroying old test database..." |
|
117 if os.access(TEST_DATABASE_NAME, os.F_OK): |
|
118 if not autoclobber: |
|
119 confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME) |
|
120 if autoclobber or confirm == 'yes': |
|
121 try: |
|
122 if verbosity >= 1: |
|
123 print "Destroying old test database..." |
|
124 os.remove(TEST_DATABASE_NAME) |
|
125 except Exception, e: |
|
126 sys.stderr.write("Got an error deleting the old test database: %s\n" % e) |
|
127 sys.exit(2) |
|
128 else: |
|
129 print "Tests cancelled." |
|
130 sys.exit(1) |
|
131 if verbosity >= 1: |
|
132 print "Creating test database..." |
|
133 else: |
|
134 TEST_DATABASE_NAME = ":memory:" |
|
135 else: |
|
136 suffix = { |
|
137 'postgresql': get_postgresql_create_suffix, |
|
138 'postgresql_psycopg2': get_postgresql_create_suffix, |
|
139 'mysql': get_mysql_create_suffix, |
|
140 'mysql_old': get_mysql_create_suffix, |
|
141 }.get(settings.DATABASE_ENGINE, lambda: '')() |
|
142 if settings.TEST_DATABASE_NAME: |
|
143 TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME |
|
144 else: |
|
145 TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME |
|
146 |
|
147 qn = connection.ops.quote_name |
|
148 |
|
149 # Create the test database and connect to it. We need to autocommit |
|
150 # if the database supports it because PostgreSQL doesn't allow |
|
151 # CREATE/DROP DATABASE statements within transactions. |
|
152 cursor = connection.cursor() |
|
153 _set_autocommit(connection) |
|
154 try: |
|
155 cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix)) |
|
156 except Exception, e: |
|
157 sys.stderr.write("Got an error creating the test database: %s\n" % e) |
|
158 if not autoclobber: |
|
159 confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME) |
|
160 if autoclobber or confirm == 'yes': |
|
161 try: |
|
162 if verbosity >= 1: |
|
163 print "Destroying old test database..." |
|
164 cursor.execute("DROP DATABASE %s" % qn(TEST_DATABASE_NAME)) |
|
165 if verbosity >= 1: |
|
166 print "Creating test database..." |
|
167 cursor.execute("CREATE DATABASE %s %s" % (qn(TEST_DATABASE_NAME), suffix)) |
|
168 except Exception, e: |
|
169 sys.stderr.write("Got an error recreating the test database: %s\n" % e) |
|
170 sys.exit(2) |
|
171 else: |
|
172 print "Tests cancelled." |
|
173 sys.exit(1) |
|
174 |
|
175 connection.close() |
|
176 settings.DATABASE_NAME = TEST_DATABASE_NAME |
|
177 |
|
178 call_command('syncdb', verbosity=verbosity, interactive=False) |
|
179 |
|
180 if settings.CACHE_BACKEND.startswith('db://'): |
|
181 cache_name = settings.CACHE_BACKEND[len('db://'):] |
|
182 call_command('createcachetable', cache_name) |
|
183 |
|
184 # Get a cursor (even though we don't need one yet). This has |
|
185 # the side effect of initializing the test database. |
|
186 cursor = connection.cursor() |
|
187 |
|
188 return TEST_DATABASE_NAME |
|
189 |
|
190 def destroy_test_db(old_database_name, verbosity=1): |
|
191 # If the database wants to drop the test DB itself, let it |
|
192 creation_module = get_creation_module() |
|
193 if hasattr(creation_module, "destroy_test_db"): |
|
194 creation_module.destroy_test_db(settings, connection, old_database_name, verbosity) |
|
195 return |
|
196 |
|
197 if verbosity >= 1: |
|
198 print "Destroying test database..." |
|
199 connection.close() |
|
200 TEST_DATABASE_NAME = settings.DATABASE_NAME |
|
201 settings.DATABASE_NAME = old_database_name |
|
202 if settings.DATABASE_ENGINE == "sqlite3": |
|
203 if TEST_DATABASE_NAME and TEST_DATABASE_NAME != ":memory:": |
|
204 # Remove the SQLite database file |
|
205 os.remove(TEST_DATABASE_NAME) |
|
206 else: |
|
207 # Remove the test database to clean up after |
|
208 # ourselves. Connect to the previous database (not the test database) |
|
209 # to do so, because it's not allowed to delete a database while being |
|
210 # connected to it. |
|
211 cursor = connection.cursor() |
|
212 _set_autocommit(connection) |
|
213 time.sleep(1) # To avoid "database is being accessed by other users" errors. |
|
214 cursor.execute("DROP DATABASE %s" % connection.ops.quote_name(TEST_DATABASE_NAME)) |
|
215 connection.close() |
|