app/django/db/backends/oracle/creation.py
changeset 54 03e267d67478
child 323 ff1a9aa48cfd
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/app/django/db/backends/oracle/creation.py	Fri Jul 18 18:22:23 2008 +0000
@@ -0,0 +1,288 @@
+import sys, time
+from django.core import management
+
+# This dictionary maps Field objects to their associated Oracle column
+# types, as strings. Column-type strings can contain format strings; they'll
+# be interpolated against the values of Field.__dict__ before being output.
+# If a column type is set to None, it won't be included in the output.
+DATA_TYPES = {
+    'AutoField':                    'NUMBER(11)',
+    'BooleanField':                 'NUMBER(1) CHECK (%(column)s IN (0,1))',
+    'CharField':                    'NVARCHAR2(%(max_length)s)',
+    'CommaSeparatedIntegerField':   'VARCHAR2(%(max_length)s)',
+    'DateField':                    'DATE',
+    'DateTimeField':                'TIMESTAMP',
+    'DecimalField':                 'NUMBER(%(max_digits)s, %(decimal_places)s)',
+    'FileField':                    'NVARCHAR2(%(max_length)s)',
+    'FilePathField':                'NVARCHAR2(%(max_length)s)',
+    'FloatField':                   'DOUBLE PRECISION',
+    'ImageField':                   'NVARCHAR2(%(max_length)s)',
+    'IntegerField':                 'NUMBER(11)',
+    'IPAddressField':               'VARCHAR2(15)',
+    'NullBooleanField':             'NUMBER(1) CHECK ((%(column)s IN (0,1)) OR (%(column)s IS NULL))',
+    'OneToOneField':                'NUMBER(11)',
+    'PhoneNumberField':             'VARCHAR2(20)',
+    'PositiveIntegerField':         'NUMBER(11) CHECK (%(column)s >= 0)',
+    'PositiveSmallIntegerField':    'NUMBER(11) CHECK (%(column)s >= 0)',
+    'SlugField':                    'NVARCHAR2(50)',
+    'SmallIntegerField':            'NUMBER(11)',
+    'TextField':                    'NCLOB',
+    'TimeField':                    'TIMESTAMP',
+    'URLField':                     'VARCHAR2(%(max_length)s)',
+    'USStateField':                 'CHAR(2)',
+}
+
+TEST_DATABASE_PREFIX = 'test_'
+PASSWORD = 'Im_a_lumberjack'
+REMEMBER = {}
+
+def create_test_db(settings, connection, verbosity=1, autoclobber=False):
+    TEST_DATABASE_NAME = _test_database_name(settings)
+    TEST_DATABASE_USER = _test_database_user(settings)
+    TEST_DATABASE_PASSWD = _test_database_passwd(settings)
+    TEST_DATABASE_TBLSPACE = _test_database_tblspace(settings)
+    TEST_DATABASE_TBLSPACE_TMP = _test_database_tblspace_tmp(settings)
+
+    parameters = {
+        'dbname': TEST_DATABASE_NAME,
+        'user': TEST_DATABASE_USER,
+        'password': TEST_DATABASE_PASSWD,
+        'tblspace': TEST_DATABASE_TBLSPACE,
+        'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
+ 	}
+
+    REMEMBER['user'] = settings.DATABASE_USER
+    REMEMBER['passwd'] = settings.DATABASE_PASSWORD
+
+    cursor = connection.cursor()
+    if _test_database_create(settings):
+        if verbosity >= 1:
+            print 'Creating test database...'
+        try:
+            _create_test_db(cursor, parameters, verbosity)
+        except Exception, e:
+            sys.stderr.write("Got an error creating the test database: %s\n" % e)
+            if not autoclobber:
+                confirm = raw_input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_NAME)
+            if autoclobber or confirm == 'yes':
+                try:
+                    if verbosity >= 1:
+                        print "Destroying old test database..."
+                    _destroy_test_db(cursor, parameters, verbosity)
+                    if verbosity >= 1:
+                        print "Creating test database..."
+                    _create_test_db(cursor, parameters, verbosity)
+                except Exception, e:
+                    sys.stderr.write("Got an error recreating the test database: %s\n" % e)
+                    sys.exit(2)
+            else:
+                print "Tests cancelled."
+                sys.exit(1)
+
+    if _test_user_create(settings):
+        if verbosity >= 1:
+            print "Creating test user..."
+        try:
+            _create_test_user(cursor, parameters, verbosity)
+        except Exception, e:
+            sys.stderr.write("Got an error creating the test user: %s\n" % e)
+            if not autoclobber:
+                confirm = raw_input("It appears the test user, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_USER)
+            if autoclobber or confirm == 'yes':
+                try:
+                    if verbosity >= 1:
+                        print "Destroying old test user..."
+                    _destroy_test_user(cursor, parameters, verbosity)
+                    if verbosity >= 1:
+                        print "Creating test user..."
+                    _create_test_user(cursor, parameters, verbosity)
+                except Exception, e:
+                    sys.stderr.write("Got an error recreating the test user: %s\n" % e)
+                    sys.exit(2)
+            else:
+                print "Tests cancelled."
+                sys.exit(1)
+
+    connection.close()
+    settings.DATABASE_USER = TEST_DATABASE_USER
+    settings.DATABASE_PASSWORD = TEST_DATABASE_PASSWD
+
+    management.call_command('syncdb', verbosity=verbosity, interactive=False)
+
+    # Get a cursor (even though we don't need one yet). This has
+    # the side effect of initializing the test database.
+    cursor = connection.cursor()
+
+def destroy_test_db(settings, connection, old_database_name, verbosity=1):
+    connection.close()
+
+    TEST_DATABASE_NAME = _test_database_name(settings)
+    TEST_DATABASE_USER = _test_database_user(settings)
+    TEST_DATABASE_PASSWD = _test_database_passwd(settings)
+    TEST_DATABASE_TBLSPACE = _test_database_tblspace(settings)
+    TEST_DATABASE_TBLSPACE_TMP = _test_database_tblspace_tmp(settings)
+
+    settings.DATABASE_NAME = old_database_name
+    settings.DATABASE_USER = REMEMBER['user']
+    settings.DATABASE_PASSWORD = REMEMBER['passwd']
+
+    parameters = {
+        'dbname': TEST_DATABASE_NAME,
+        'user': TEST_DATABASE_USER,
+        'password': TEST_DATABASE_PASSWD,
+        'tblspace': TEST_DATABASE_TBLSPACE,
+        'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
+ 	}
+
+    REMEMBER['user'] = settings.DATABASE_USER
+    REMEMBER['passwd'] = settings.DATABASE_PASSWORD
+
+    cursor = connection.cursor()
+    time.sleep(1) # To avoid "database is being accessed by other users" errors.
+    if _test_user_create(settings):
+        if verbosity >= 1:
+            print 'Destroying test user...'
+        _destroy_test_user(cursor, parameters, verbosity)
+    if _test_database_create(settings):
+        if verbosity >= 1:
+            print 'Destroying test database...'
+        _destroy_test_db(cursor, parameters, verbosity)
+    connection.close()
+
+def _create_test_db(cursor, parameters, verbosity):
+    if verbosity >= 2:
+        print "_create_test_db(): dbname = %s" % parameters['dbname']
+    statements = [
+        """CREATE TABLESPACE %(tblspace)s
+           DATAFILE '%(tblspace)s.dbf' SIZE 20M
+           REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
+        """,
+        """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
+           TEMPFILE '%(tblspace_temp)s.dbf' SIZE 20M
+           REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
+        """,
+    ]
+    _execute_statements(cursor, statements, parameters, verbosity)
+
+def _create_test_user(cursor, parameters, verbosity):
+    if verbosity >= 2:
+        print "_create_test_user(): username = %s" % parameters['user']
+    statements = [
+        """CREATE USER %(user)s
+           IDENTIFIED BY %(password)s
+           DEFAULT TABLESPACE %(tblspace)s
+           TEMPORARY TABLESPACE %(tblspace_temp)s
+        """,
+        """GRANT CONNECT, RESOURCE TO %(user)s""",
+    ]
+    _execute_statements(cursor, statements, parameters, verbosity)
+
+def _destroy_test_db(cursor, parameters, verbosity):
+    if verbosity >= 2:
+        print "_destroy_test_db(): dbname=%s" % parameters['dbname']
+    statements = [
+        'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
+        'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
+        ]
+    _execute_statements(cursor, statements, parameters, verbosity)
+
+def _destroy_test_user(cursor, parameters, verbosity):
+    if verbosity >= 2:
+        print "_destroy_test_user(): user=%s" % parameters['user']
+        print "Be patient.  This can take some time..."
+    statements = [
+        'DROP USER %(user)s CASCADE',
+    ]
+    _execute_statements(cursor, statements, parameters, verbosity)
+
+def _execute_statements(cursor, statements, parameters, verbosity):
+    for template in statements:
+        stmt = template % parameters
+        if verbosity >= 2:
+            print stmt
+        try:
+            cursor.execute(stmt)
+        except Exception, err:
+            sys.stderr.write("Failed (%s)\n" % (err))
+            raise
+
+def _test_database_name(settings):
+    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
+    try:
+        if settings.TEST_DATABASE_NAME:
+            name = settings.TEST_DATABASE_NAME
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name
+
+def _test_database_create(settings):
+    name = True
+    try:
+        if settings.TEST_DATABASE_CREATE:
+            name = True
+        else:
+            name = False
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name
+
+def _test_user_create(settings):
+    name = True
+    try:
+        if settings.TEST_USER_CREATE:
+            name = True
+        else:
+            name = False
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name
+
+def _test_database_user(settings):
+    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
+    try:
+        if settings.TEST_DATABASE_USER:
+            name = settings.TEST_DATABASE_USER
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name
+
+def _test_database_passwd(settings):
+    name = PASSWORD
+    try:
+        if settings.TEST_DATABASE_PASSWD:
+            name = settings.TEST_DATABASE_PASSWD
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name
+
+def _test_database_tblspace(settings):
+    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
+    try:
+        if settings.TEST_DATABASE_TBLSPACE:
+            name = settings.TEST_DATABASE_TBLSPACE
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name
+
+def _test_database_tblspace_tmp(settings):
+    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME + '_temp'
+    try:
+        if settings.TEST_DATABASE_TBLSPACE_TMP:
+            name = settings.TEST_DATABASE_TBLSPACE_TMP
+    except AttributeError:
+        pass
+    except:
+        raise
+    return name