app/django/db/backends/oracle/creation.py
author Sverre Rabbelier <srabbelier@gmail.com>
Mon, 20 Apr 2009 22:52:38 +0000
changeset 2264 c8697d928065
parent 323 ff1a9aa48cfd
permissions -rw-r--r--
Email address is actually private, not public, make this obvios Patch by: Sverre Rabbelier

import sys, time
from django.conf import settings
from django.core import management
from django.db.backends.creation import BaseDatabaseCreation

TEST_DATABASE_PREFIX = 'test_'
PASSWORD = 'Im_a_lumberjack'

class DatabaseCreation(BaseDatabaseCreation):
    # This dictionary maps Field objects to their associated Oracle column
    # types, as strings. Column-type strings can contain format strings; they'll
    # be interpolated against the values of Field.__dict__ before being output.
    # If a column type is set to None, it won't be included in the output.
    #
    # Any format strings starting with "qn_" are quoted before being used in the
    # output (the "qn_" prefix is stripped before the lookup is performed.

    data_types = {
        'AutoField':                    'NUMBER(11)',
        'BooleanField':                 'NUMBER(1) CHECK (%(qn_column)s IN (0,1))',
        'CharField':                    'NVARCHAR2(%(max_length)s)',
        'CommaSeparatedIntegerField':   'VARCHAR2(%(max_length)s)',
        'DateField':                    'DATE',
        'DateTimeField':                'TIMESTAMP',
        'DecimalField':                 'NUMBER(%(max_digits)s, %(decimal_places)s)',
        'FileField':                    'NVARCHAR2(%(max_length)s)',
        'FilePathField':                'NVARCHAR2(%(max_length)s)',
        'FloatField':                   'DOUBLE PRECISION',
        'IntegerField':                 'NUMBER(11)',
        'IPAddressField':               'VARCHAR2(15)',
        'NullBooleanField':             'NUMBER(1) CHECK ((%(qn_column)s IN (0,1)) OR (%(qn_column)s IS NULL))',
        'OneToOneField':                'NUMBER(11)',
        'PositiveIntegerField':         'NUMBER(11) CHECK (%(qn_column)s >= 0)',
        'PositiveSmallIntegerField':    'NUMBER(11) CHECK (%(qn_column)s >= 0)',
        'SlugField':                    'NVARCHAR2(50)',
        'SmallIntegerField':            'NUMBER(11)',
        'TextField':                    'NCLOB',
        'TimeField':                    'TIMESTAMP',
        'URLField':                     'VARCHAR2(%(max_length)s)',
    }

    remember = {}

    def _create_test_db(self, verbosity=1, autoclobber=False):
        TEST_DATABASE_NAME = self._test_database_name(settings)
        TEST_DATABASE_USER = self._test_database_user(settings)
        TEST_DATABASE_PASSWD = self._test_database_passwd(settings)
        TEST_DATABASE_TBLSPACE = self._test_database_tblspace(settings)
        TEST_DATABASE_TBLSPACE_TMP = self._test_database_tblspace_tmp(settings)

        parameters = {
            'dbname': TEST_DATABASE_NAME,
            'user': TEST_DATABASE_USER,
            'password': TEST_DATABASE_PASSWD,
            'tblspace': TEST_DATABASE_TBLSPACE,
            'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
        }

        self.remember['user'] = settings.DATABASE_USER
        self.remember['passwd'] = settings.DATABASE_PASSWORD

        cursor = self.connection.cursor()
        if self._test_database_create(settings):
            if verbosity >= 1:
                print 'Creating test database...'
            try:
                self._execute_test_db_creation(cursor, parameters, verbosity)
            except Exception, e:
                sys.stderr.write("Got an error creating the test database: %s\n" % e)
                if not autoclobber:
                    confirm = raw_input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_NAME)
                if autoclobber or confirm == 'yes':
                    try:
                        if verbosity >= 1:
                            print "Destroying old test database..."
                        self._execute_test_db_destruction(cursor, parameters, verbosity)
                        if verbosity >= 1:
                            print "Creating test database..."
                        self._execute_test_db_creation(cursor, parameters, verbosity)
                    except Exception, e:
                        sys.stderr.write("Got an error recreating the test database: %s\n" % e)
                        sys.exit(2)
                else:
                    print "Tests cancelled."
                    sys.exit(1)

        if self._test_user_create(settings):
            if verbosity >= 1:
                print "Creating test user..."
            try:
                self._create_test_user(cursor, parameters, verbosity)
            except Exception, e:
                sys.stderr.write("Got an error creating the test user: %s\n" % e)
                if not autoclobber:
                    confirm = raw_input("It appears the test user, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_USER)
                if autoclobber or confirm == 'yes':
                    try:
                        if verbosity >= 1:
                            print "Destroying old test user..."
                        self._destroy_test_user(cursor, parameters, verbosity)
                        if verbosity >= 1:
                            print "Creating test user..."
                        self._create_test_user(cursor, parameters, verbosity)
                    except Exception, e:
                        sys.stderr.write("Got an error recreating the test user: %s\n" % e)
                        sys.exit(2)
                else:
                    print "Tests cancelled."
                    sys.exit(1)

        settings.DATABASE_USER = TEST_DATABASE_USER
        settings.DATABASE_PASSWORD = TEST_DATABASE_PASSWD

        return settings.DATABASE_NAME

    def _destroy_test_db(self, test_database_name, verbosity=1):
        """
        Destroy a test database, prompting the user for confirmation if the
        database already exists. Returns the name of the test database created.
        """
        TEST_DATABASE_NAME = self._test_database_name(settings)
        TEST_DATABASE_USER = self._test_database_user(settings)
        TEST_DATABASE_PASSWD = self._test_database_passwd(settings)
        TEST_DATABASE_TBLSPACE = self._test_database_tblspace(settings)
        TEST_DATABASE_TBLSPACE_TMP = self._test_database_tblspace_tmp(settings)

        settings.DATABASE_USER = self.remember['user']
        settings.DATABASE_PASSWORD = self.remember['passwd']

        parameters = {
            'dbname': TEST_DATABASE_NAME,
            'user': TEST_DATABASE_USER,
            'password': TEST_DATABASE_PASSWD,
            'tblspace': TEST_DATABASE_TBLSPACE,
            'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
        }

        self.remember['user'] = settings.DATABASE_USER
        self.remember['passwd'] = settings.DATABASE_PASSWORD

        cursor = self.connection.cursor()
        time.sleep(1) # To avoid "database is being accessed by other users" errors.
        if self._test_user_create(settings):
            if verbosity >= 1:
                print 'Destroying test user...'
            self._destroy_test_user(cursor, parameters, verbosity)
        if self._test_database_create(settings):
            if verbosity >= 1:
                print 'Destroying test database tables...'
            self._execute_test_db_destruction(cursor, parameters, verbosity)
        self.connection.close()

    def _execute_test_db_creation(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_create_test_db(): dbname = %s" % parameters['dbname']
        statements = [
            """CREATE TABLESPACE %(tblspace)s
               DATAFILE '%(tblspace)s.dbf' SIZE 20M
               REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
            """,
            """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
               TEMPFILE '%(tblspace_temp)s.dbf' SIZE 20M
               REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
            """,
        ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _create_test_user(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_create_test_user(): username = %s" % parameters['user']
        statements = [
            """CREATE USER %(user)s
               IDENTIFIED BY %(password)s
               DEFAULT TABLESPACE %(tblspace)s
               TEMPORARY TABLESPACE %(tblspace_temp)s
            """,
            """GRANT CONNECT, RESOURCE TO %(user)s""",
        ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _execute_test_db_destruction(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_execute_test_db_destruction(): dbname=%s" % parameters['dbname']
        statements = [
            'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
            'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
            ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _destroy_test_user(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_destroy_test_user(): user=%s" % parameters['user']
            print "Be patient.  This can take some time..."
        statements = [
            'DROP USER %(user)s CASCADE',
        ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _execute_statements(self, cursor, statements, parameters, verbosity):
        for template in statements:
            stmt = template % parameters
            if verbosity >= 2:
                print stmt
            try:
                cursor.execute(stmt)
            except Exception, err:
                sys.stderr.write("Failed (%s)\n" % (err))
                raise

    def _test_database_name(self, settings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
        try:
            if settings.TEST_DATABASE_NAME:
                name = settings.TEST_DATABASE_NAME
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_create(self, settings):
        name = True
        try:
            if settings.TEST_DATABASE_CREATE:
                name = True
            else:
                name = False
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_user_create(self, settings):
        name = True
        try:
            if settings.TEST_USER_CREATE:
                name = True
            else:
                name = False
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_user(self, ettings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
        try:
            if settings.TEST_DATABASE_USER:
                name = settings.TEST_DATABASE_USER
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_passwd(self, settings):
        name = PASSWORD
        try:
            if settings.TEST_DATABASE_PASSWD:
                name = settings.TEST_DATABASE_PASSWD
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_tblspace(self, settings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
        try:
            if settings.TEST_DATABASE_TBLSPACE:
                name = settings.TEST_DATABASE_TBLSPACE
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_tblspace_tmp(self, settings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME + '_temp'
        try:
            if settings.TEST_DATABASE_TBLSPACE_TMP:
                name = settings.TEST_DATABASE_TBLSPACE_TMP
        except AttributeError:
            pass
        except:
            raise
        return name