app/django/db/backends/oracle/creation.py
author Pawel Solyga <Pawel.Solyga@gmail.com>
Fri, 19 Sep 2008 12:45:13 +0000
changeset 171 b62f1cf5e878
parent 54 03e267d67478
child 323 ff1a9aa48cfd
permissions -rw-r--r--
Bug fixes to revision r596. User Account field in Developer User Profile view needs to be an email not a login name. Header title of User Profile Developer view, when form validation failes is now showing correct value ("Modify existing ..." instead of "Create new user ...").

import sys, time
from django.core import management

# This dictionary maps Field objects to their associated Oracle column
# types, as strings. Column-type strings can contain format strings; they'll
# be interpolated against the values of Field.__dict__ before being output.
# If a column type is set to None, it won't be included in the output.
DATA_TYPES = {
    'AutoField':                    'NUMBER(11)',
    'BooleanField':                 'NUMBER(1) CHECK (%(column)s IN (0,1))',
    'CharField':                    'NVARCHAR2(%(max_length)s)',
    'CommaSeparatedIntegerField':   'VARCHAR2(%(max_length)s)',
    'DateField':                    'DATE',
    'DateTimeField':                'TIMESTAMP',
    'DecimalField':                 'NUMBER(%(max_digits)s, %(decimal_places)s)',
    'FileField':                    'NVARCHAR2(%(max_length)s)',
    'FilePathField':                'NVARCHAR2(%(max_length)s)',
    'FloatField':                   'DOUBLE PRECISION',
    'ImageField':                   'NVARCHAR2(%(max_length)s)',
    'IntegerField':                 'NUMBER(11)',
    'IPAddressField':               'VARCHAR2(15)',
    'NullBooleanField':             'NUMBER(1) CHECK ((%(column)s IN (0,1)) OR (%(column)s IS NULL))',
    'OneToOneField':                'NUMBER(11)',
    'PhoneNumberField':             'VARCHAR2(20)',
    'PositiveIntegerField':         'NUMBER(11) CHECK (%(column)s >= 0)',
    'PositiveSmallIntegerField':    'NUMBER(11) CHECK (%(column)s >= 0)',
    'SlugField':                    'NVARCHAR2(50)',
    'SmallIntegerField':            'NUMBER(11)',
    'TextField':                    'NCLOB',
    'TimeField':                    'TIMESTAMP',
    'URLField':                     'VARCHAR2(%(max_length)s)',
    'USStateField':                 'CHAR(2)',
}

TEST_DATABASE_PREFIX = 'test_'
PASSWORD = 'Im_a_lumberjack'
REMEMBER = {}

def create_test_db(settings, connection, verbosity=1, autoclobber=False):
    TEST_DATABASE_NAME = _test_database_name(settings)
    TEST_DATABASE_USER = _test_database_user(settings)
    TEST_DATABASE_PASSWD = _test_database_passwd(settings)
    TEST_DATABASE_TBLSPACE = _test_database_tblspace(settings)
    TEST_DATABASE_TBLSPACE_TMP = _test_database_tblspace_tmp(settings)

    parameters = {
        'dbname': TEST_DATABASE_NAME,
        'user': TEST_DATABASE_USER,
        'password': TEST_DATABASE_PASSWD,
        'tblspace': TEST_DATABASE_TBLSPACE,
        'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
 	}

    REMEMBER['user'] = settings.DATABASE_USER
    REMEMBER['passwd'] = settings.DATABASE_PASSWORD

    cursor = connection.cursor()
    if _test_database_create(settings):
        if verbosity >= 1:
            print 'Creating test database...'
        try:
            _create_test_db(cursor, parameters, verbosity)
        except Exception, e:
            sys.stderr.write("Got an error creating the test database: %s\n" % e)
            if not autoclobber:
                confirm = raw_input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_NAME)
            if autoclobber or confirm == 'yes':
                try:
                    if verbosity >= 1:
                        print "Destroying old test database..."
                    _destroy_test_db(cursor, parameters, verbosity)
                    if verbosity >= 1:
                        print "Creating test database..."
                    _create_test_db(cursor, parameters, verbosity)
                except Exception, e:
                    sys.stderr.write("Got an error recreating the test database: %s\n" % e)
                    sys.exit(2)
            else:
                print "Tests cancelled."
                sys.exit(1)

    if _test_user_create(settings):
        if verbosity >= 1:
            print "Creating test user..."
        try:
            _create_test_user(cursor, parameters, verbosity)
        except Exception, e:
            sys.stderr.write("Got an error creating the test user: %s\n" % e)
            if not autoclobber:
                confirm = raw_input("It appears the test user, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_USER)
            if autoclobber or confirm == 'yes':
                try:
                    if verbosity >= 1:
                        print "Destroying old test user..."
                    _destroy_test_user(cursor, parameters, verbosity)
                    if verbosity >= 1:
                        print "Creating test user..."
                    _create_test_user(cursor, parameters, verbosity)
                except Exception, e:
                    sys.stderr.write("Got an error recreating the test user: %s\n" % e)
                    sys.exit(2)
            else:
                print "Tests cancelled."
                sys.exit(1)

    connection.close()
    settings.DATABASE_USER = TEST_DATABASE_USER
    settings.DATABASE_PASSWORD = TEST_DATABASE_PASSWD

    management.call_command('syncdb', verbosity=verbosity, interactive=False)

    # Get a cursor (even though we don't need one yet). This has
    # the side effect of initializing the test database.
    cursor = connection.cursor()

def destroy_test_db(settings, connection, old_database_name, verbosity=1):
    connection.close()

    TEST_DATABASE_NAME = _test_database_name(settings)
    TEST_DATABASE_USER = _test_database_user(settings)
    TEST_DATABASE_PASSWD = _test_database_passwd(settings)
    TEST_DATABASE_TBLSPACE = _test_database_tblspace(settings)
    TEST_DATABASE_TBLSPACE_TMP = _test_database_tblspace_tmp(settings)

    settings.DATABASE_NAME = old_database_name
    settings.DATABASE_USER = REMEMBER['user']
    settings.DATABASE_PASSWORD = REMEMBER['passwd']

    parameters = {
        'dbname': TEST_DATABASE_NAME,
        'user': TEST_DATABASE_USER,
        'password': TEST_DATABASE_PASSWD,
        'tblspace': TEST_DATABASE_TBLSPACE,
        'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
 	}

    REMEMBER['user'] = settings.DATABASE_USER
    REMEMBER['passwd'] = settings.DATABASE_PASSWORD

    cursor = connection.cursor()
    time.sleep(1) # To avoid "database is being accessed by other users" errors.
    if _test_user_create(settings):
        if verbosity >= 1:
            print 'Destroying test user...'
        _destroy_test_user(cursor, parameters, verbosity)
    if _test_database_create(settings):
        if verbosity >= 1:
            print 'Destroying test database...'
        _destroy_test_db(cursor, parameters, verbosity)
    connection.close()

def _create_test_db(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_create_test_db(): dbname = %s" % parameters['dbname']
    statements = [
        """CREATE TABLESPACE %(tblspace)s
           DATAFILE '%(tblspace)s.dbf' SIZE 20M
           REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
        """,
        """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
           TEMPFILE '%(tblspace_temp)s.dbf' SIZE 20M
           REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
        """,
    ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _create_test_user(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_create_test_user(): username = %s" % parameters['user']
    statements = [
        """CREATE USER %(user)s
           IDENTIFIED BY %(password)s
           DEFAULT TABLESPACE %(tblspace)s
           TEMPORARY TABLESPACE %(tblspace_temp)s
        """,
        """GRANT CONNECT, RESOURCE TO %(user)s""",
    ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _destroy_test_db(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_destroy_test_db(): dbname=%s" % parameters['dbname']
    statements = [
        'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
        'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
        ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _destroy_test_user(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_destroy_test_user(): user=%s" % parameters['user']
        print "Be patient.  This can take some time..."
    statements = [
        'DROP USER %(user)s CASCADE',
    ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _execute_statements(cursor, statements, parameters, verbosity):
    for template in statements:
        stmt = template % parameters
        if verbosity >= 2:
            print stmt
        try:
            cursor.execute(stmt)
        except Exception, err:
            sys.stderr.write("Failed (%s)\n" % (err))
            raise

def _test_database_name(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
    try:
        if settings.TEST_DATABASE_NAME:
            name = settings.TEST_DATABASE_NAME
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_create(settings):
    name = True
    try:
        if settings.TEST_DATABASE_CREATE:
            name = True
        else:
            name = False
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_user_create(settings):
    name = True
    try:
        if settings.TEST_USER_CREATE:
            name = True
        else:
            name = False
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_user(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
    try:
        if settings.TEST_DATABASE_USER:
            name = settings.TEST_DATABASE_USER
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_passwd(settings):
    name = PASSWORD
    try:
        if settings.TEST_DATABASE_PASSWD:
            name = settings.TEST_DATABASE_PASSWD
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_tblspace(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
    try:
        if settings.TEST_DATABASE_TBLSPACE:
            name = settings.TEST_DATABASE_TBLSPACE
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_tblspace_tmp(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME + '_temp'
    try:
        if settings.TEST_DATABASE_TBLSPACE_TMP:
            name = settings.TEST_DATABASE_TBLSPACE_TMP
    except AttributeError:
        pass
    except:
        raise
    return name