app/django/db/backends/oracle/creation.py
author Mario Ferraro <fadinlight@gmail.com>
Sun, 15 Nov 2009 22:12:20 +0100
changeset 3093 d1be59b6b627
parent 323 ff1a9aa48cfd
permissions -rw-r--r--
GMaps related JS changed to use new google namespace. Google is going to change permanently in the future the way to load its services, so better stay safe. Also this commit shows uses of the new melange.js module. Fixes Issue 634.

import sys, time
from django.conf import settings
from django.core import management
from django.db.backends.creation import BaseDatabaseCreation

TEST_DATABASE_PREFIX = 'test_'
PASSWORD = 'Im_a_lumberjack'

class DatabaseCreation(BaseDatabaseCreation):
    # This dictionary maps Field objects to their associated Oracle column
    # types, as strings. Column-type strings can contain format strings; they'll
    # be interpolated against the values of Field.__dict__ before being output.
    # If a column type is set to None, it won't be included in the output.
    #
    # Any format strings starting with "qn_" are quoted before being used in the
    # output (the "qn_" prefix is stripped before the lookup is performed.

    data_types = {
        'AutoField':                    'NUMBER(11)',
        'BooleanField':                 'NUMBER(1) CHECK (%(qn_column)s IN (0,1))',
        'CharField':                    'NVARCHAR2(%(max_length)s)',
        'CommaSeparatedIntegerField':   'VARCHAR2(%(max_length)s)',
        'DateField':                    'DATE',
        'DateTimeField':                'TIMESTAMP',
        'DecimalField':                 'NUMBER(%(max_digits)s, %(decimal_places)s)',
        'FileField':                    'NVARCHAR2(%(max_length)s)',
        'FilePathField':                'NVARCHAR2(%(max_length)s)',
        'FloatField':                   'DOUBLE PRECISION',
        'IntegerField':                 'NUMBER(11)',
        'IPAddressField':               'VARCHAR2(15)',
        'NullBooleanField':             'NUMBER(1) CHECK ((%(qn_column)s IN (0,1)) OR (%(qn_column)s IS NULL))',
        'OneToOneField':                'NUMBER(11)',
        'PositiveIntegerField':         'NUMBER(11) CHECK (%(qn_column)s >= 0)',
        'PositiveSmallIntegerField':    'NUMBER(11) CHECK (%(qn_column)s >= 0)',
        'SlugField':                    'NVARCHAR2(50)',
        'SmallIntegerField':            'NUMBER(11)',
        'TextField':                    'NCLOB',
        'TimeField':                    'TIMESTAMP',
        'URLField':                     'VARCHAR2(%(max_length)s)',
    }

    remember = {}

    def _create_test_db(self, verbosity=1, autoclobber=False):
        TEST_DATABASE_NAME = self._test_database_name(settings)
        TEST_DATABASE_USER = self._test_database_user(settings)
        TEST_DATABASE_PASSWD = self._test_database_passwd(settings)
        TEST_DATABASE_TBLSPACE = self._test_database_tblspace(settings)
        TEST_DATABASE_TBLSPACE_TMP = self._test_database_tblspace_tmp(settings)

        parameters = {
            'dbname': TEST_DATABASE_NAME,
            'user': TEST_DATABASE_USER,
            'password': TEST_DATABASE_PASSWD,
            'tblspace': TEST_DATABASE_TBLSPACE,
            'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
        }

        self.remember['user'] = settings.DATABASE_USER
        self.remember['passwd'] = settings.DATABASE_PASSWORD

        cursor = self.connection.cursor()
        if self._test_database_create(settings):
            if verbosity >= 1:
                print 'Creating test database...'
            try:
                self._execute_test_db_creation(cursor, parameters, verbosity)
            except Exception, e:
                sys.stderr.write("Got an error creating the test database: %s\n" % e)
                if not autoclobber:
                    confirm = raw_input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_NAME)
                if autoclobber or confirm == 'yes':
                    try:
                        if verbosity >= 1:
                            print "Destroying old test database..."
                        self._execute_test_db_destruction(cursor, parameters, verbosity)
                        if verbosity >= 1:
                            print "Creating test database..."
                        self._execute_test_db_creation(cursor, parameters, verbosity)
                    except Exception, e:
                        sys.stderr.write("Got an error recreating the test database: %s\n" % e)
                        sys.exit(2)
                else:
                    print "Tests cancelled."
                    sys.exit(1)

        if self._test_user_create(settings):
            if verbosity >= 1:
                print "Creating test user..."
            try:
                self._create_test_user(cursor, parameters, verbosity)
            except Exception, e:
                sys.stderr.write("Got an error creating the test user: %s\n" % e)
                if not autoclobber:
                    confirm = raw_input("It appears the test user, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_USER)
                if autoclobber or confirm == 'yes':
                    try:
                        if verbosity >= 1:
                            print "Destroying old test user..."
                        self._destroy_test_user(cursor, parameters, verbosity)
                        if verbosity >= 1:
                            print "Creating test user..."
                        self._create_test_user(cursor, parameters, verbosity)
                    except Exception, e:
                        sys.stderr.write("Got an error recreating the test user: %s\n" % e)
                        sys.exit(2)
                else:
                    print "Tests cancelled."
                    sys.exit(1)

        settings.DATABASE_USER = TEST_DATABASE_USER
        settings.DATABASE_PASSWORD = TEST_DATABASE_PASSWD

        return settings.DATABASE_NAME

    def _destroy_test_db(self, test_database_name, verbosity=1):
        """
        Destroy a test database, prompting the user for confirmation if the
        database already exists. Returns the name of the test database created.
        """
        TEST_DATABASE_NAME = self._test_database_name(settings)
        TEST_DATABASE_USER = self._test_database_user(settings)
        TEST_DATABASE_PASSWD = self._test_database_passwd(settings)
        TEST_DATABASE_TBLSPACE = self._test_database_tblspace(settings)
        TEST_DATABASE_TBLSPACE_TMP = self._test_database_tblspace_tmp(settings)

        settings.DATABASE_USER = self.remember['user']
        settings.DATABASE_PASSWORD = self.remember['passwd']

        parameters = {
            'dbname': TEST_DATABASE_NAME,
            'user': TEST_DATABASE_USER,
            'password': TEST_DATABASE_PASSWD,
            'tblspace': TEST_DATABASE_TBLSPACE,
            'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
        }

        self.remember['user'] = settings.DATABASE_USER
        self.remember['passwd'] = settings.DATABASE_PASSWORD

        cursor = self.connection.cursor()
        time.sleep(1) # To avoid "database is being accessed by other users" errors.
        if self._test_user_create(settings):
            if verbosity >= 1:
                print 'Destroying test user...'
            self._destroy_test_user(cursor, parameters, verbosity)
        if self._test_database_create(settings):
            if verbosity >= 1:
                print 'Destroying test database tables...'
            self._execute_test_db_destruction(cursor, parameters, verbosity)
        self.connection.close()

    def _execute_test_db_creation(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_create_test_db(): dbname = %s" % parameters['dbname']
        statements = [
            """CREATE TABLESPACE %(tblspace)s
               DATAFILE '%(tblspace)s.dbf' SIZE 20M
               REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
            """,
            """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
               TEMPFILE '%(tblspace_temp)s.dbf' SIZE 20M
               REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
            """,
        ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _create_test_user(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_create_test_user(): username = %s" % parameters['user']
        statements = [
            """CREATE USER %(user)s
               IDENTIFIED BY %(password)s
               DEFAULT TABLESPACE %(tblspace)s
               TEMPORARY TABLESPACE %(tblspace_temp)s
            """,
            """GRANT CONNECT, RESOURCE TO %(user)s""",
        ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _execute_test_db_destruction(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_execute_test_db_destruction(): dbname=%s" % parameters['dbname']
        statements = [
            'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
            'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
            ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _destroy_test_user(self, cursor, parameters, verbosity):
        if verbosity >= 2:
            print "_destroy_test_user(): user=%s" % parameters['user']
            print "Be patient.  This can take some time..."
        statements = [
            'DROP USER %(user)s CASCADE',
        ]
        self._execute_statements(cursor, statements, parameters, verbosity)

    def _execute_statements(self, cursor, statements, parameters, verbosity):
        for template in statements:
            stmt = template % parameters
            if verbosity >= 2:
                print stmt
            try:
                cursor.execute(stmt)
            except Exception, err:
                sys.stderr.write("Failed (%s)\n" % (err))
                raise

    def _test_database_name(self, settings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
        try:
            if settings.TEST_DATABASE_NAME:
                name = settings.TEST_DATABASE_NAME
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_create(self, settings):
        name = True
        try:
            if settings.TEST_DATABASE_CREATE:
                name = True
            else:
                name = False
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_user_create(self, settings):
        name = True
        try:
            if settings.TEST_USER_CREATE:
                name = True
            else:
                name = False
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_user(self, ettings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
        try:
            if settings.TEST_DATABASE_USER:
                name = settings.TEST_DATABASE_USER
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_passwd(self, settings):
        name = PASSWORD
        try:
            if settings.TEST_DATABASE_PASSWD:
                name = settings.TEST_DATABASE_PASSWD
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_tblspace(self, settings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
        try:
            if settings.TEST_DATABASE_TBLSPACE:
                name = settings.TEST_DATABASE_TBLSPACE
        except AttributeError:
            pass
        except:
            raise
        return name

    def _test_database_tblspace_tmp(self, settings):
        name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME + '_temp'
        try:
            if settings.TEST_DATABASE_TBLSPACE_TMP:
                name = settings.TEST_DATABASE_TBLSPACE_TMP
        except AttributeError:
            pass
        except:
            raise
        return name