app/django/db/backends/oracle/creation.py
author Todd Larsen <tlarsen@google.com>
Thu, 25 Sep 2008 17:17:11 +0000
changeset 202 b8b4a83788d4
parent 54 03e267d67478
child 323 ff1a9aa48cfd
permissions -rw-r--r--
A key_name controller module to collect all of the name...() functions that compose Model entity key names, plus some minor changes to other controller modules to illustrate the proposed use. Patch by: Todd Larsen Review by: Pawel Solyga Review URL: http://codereviews.googleopensourceprograms.com/804 Review URL: http://codereviews.googleopensourceprograms.com/804

import sys, time
from django.core import management

# This dictionary maps Field objects to their associated Oracle column
# types, as strings. Column-type strings can contain format strings; they'll
# be interpolated against the values of Field.__dict__ before being output.
# If a column type is set to None, it won't be included in the output.
DATA_TYPES = {
    'AutoField':                    'NUMBER(11)',
    'BooleanField':                 'NUMBER(1) CHECK (%(column)s IN (0,1))',
    'CharField':                    'NVARCHAR2(%(max_length)s)',
    'CommaSeparatedIntegerField':   'VARCHAR2(%(max_length)s)',
    'DateField':                    'DATE',
    'DateTimeField':                'TIMESTAMP',
    'DecimalField':                 'NUMBER(%(max_digits)s, %(decimal_places)s)',
    'FileField':                    'NVARCHAR2(%(max_length)s)',
    'FilePathField':                'NVARCHAR2(%(max_length)s)',
    'FloatField':                   'DOUBLE PRECISION',
    'ImageField':                   'NVARCHAR2(%(max_length)s)',
    'IntegerField':                 'NUMBER(11)',
    'IPAddressField':               'VARCHAR2(15)',
    'NullBooleanField':             'NUMBER(1) CHECK ((%(column)s IN (0,1)) OR (%(column)s IS NULL))',
    'OneToOneField':                'NUMBER(11)',
    'PhoneNumberField':             'VARCHAR2(20)',
    'PositiveIntegerField':         'NUMBER(11) CHECK (%(column)s >= 0)',
    'PositiveSmallIntegerField':    'NUMBER(11) CHECK (%(column)s >= 0)',
    'SlugField':                    'NVARCHAR2(50)',
    'SmallIntegerField':            'NUMBER(11)',
    'TextField':                    'NCLOB',
    'TimeField':                    'TIMESTAMP',
    'URLField':                     'VARCHAR2(%(max_length)s)',
    'USStateField':                 'CHAR(2)',
}

TEST_DATABASE_PREFIX = 'test_'
PASSWORD = 'Im_a_lumberjack'
REMEMBER = {}

def create_test_db(settings, connection, verbosity=1, autoclobber=False):
    TEST_DATABASE_NAME = _test_database_name(settings)
    TEST_DATABASE_USER = _test_database_user(settings)
    TEST_DATABASE_PASSWD = _test_database_passwd(settings)
    TEST_DATABASE_TBLSPACE = _test_database_tblspace(settings)
    TEST_DATABASE_TBLSPACE_TMP = _test_database_tblspace_tmp(settings)

    parameters = {
        'dbname': TEST_DATABASE_NAME,
        'user': TEST_DATABASE_USER,
        'password': TEST_DATABASE_PASSWD,
        'tblspace': TEST_DATABASE_TBLSPACE,
        'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
 	}

    REMEMBER['user'] = settings.DATABASE_USER
    REMEMBER['passwd'] = settings.DATABASE_PASSWORD

    cursor = connection.cursor()
    if _test_database_create(settings):
        if verbosity >= 1:
            print 'Creating test database...'
        try:
            _create_test_db(cursor, parameters, verbosity)
        except Exception, e:
            sys.stderr.write("Got an error creating the test database: %s\n" % e)
            if not autoclobber:
                confirm = raw_input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_NAME)
            if autoclobber or confirm == 'yes':
                try:
                    if verbosity >= 1:
                        print "Destroying old test database..."
                    _destroy_test_db(cursor, parameters, verbosity)
                    if verbosity >= 1:
                        print "Creating test database..."
                    _create_test_db(cursor, parameters, verbosity)
                except Exception, e:
                    sys.stderr.write("Got an error recreating the test database: %s\n" % e)
                    sys.exit(2)
            else:
                print "Tests cancelled."
                sys.exit(1)

    if _test_user_create(settings):
        if verbosity >= 1:
            print "Creating test user..."
        try:
            _create_test_user(cursor, parameters, verbosity)
        except Exception, e:
            sys.stderr.write("Got an error creating the test user: %s\n" % e)
            if not autoclobber:
                confirm = raw_input("It appears the test user, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_USER)
            if autoclobber or confirm == 'yes':
                try:
                    if verbosity >= 1:
                        print "Destroying old test user..."
                    _destroy_test_user(cursor, parameters, verbosity)
                    if verbosity >= 1:
                        print "Creating test user..."
                    _create_test_user(cursor, parameters, verbosity)
                except Exception, e:
                    sys.stderr.write("Got an error recreating the test user: %s\n" % e)
                    sys.exit(2)
            else:
                print "Tests cancelled."
                sys.exit(1)

    connection.close()
    settings.DATABASE_USER = TEST_DATABASE_USER
    settings.DATABASE_PASSWORD = TEST_DATABASE_PASSWD

    management.call_command('syncdb', verbosity=verbosity, interactive=False)

    # Get a cursor (even though we don't need one yet). This has
    # the side effect of initializing the test database.
    cursor = connection.cursor()

def destroy_test_db(settings, connection, old_database_name, verbosity=1):
    connection.close()

    TEST_DATABASE_NAME = _test_database_name(settings)
    TEST_DATABASE_USER = _test_database_user(settings)
    TEST_DATABASE_PASSWD = _test_database_passwd(settings)
    TEST_DATABASE_TBLSPACE = _test_database_tblspace(settings)
    TEST_DATABASE_TBLSPACE_TMP = _test_database_tblspace_tmp(settings)

    settings.DATABASE_NAME = old_database_name
    settings.DATABASE_USER = REMEMBER['user']
    settings.DATABASE_PASSWORD = REMEMBER['passwd']

    parameters = {
        'dbname': TEST_DATABASE_NAME,
        'user': TEST_DATABASE_USER,
        'password': TEST_DATABASE_PASSWD,
        'tblspace': TEST_DATABASE_TBLSPACE,
        'tblspace_temp': TEST_DATABASE_TBLSPACE_TMP,
 	}

    REMEMBER['user'] = settings.DATABASE_USER
    REMEMBER['passwd'] = settings.DATABASE_PASSWORD

    cursor = connection.cursor()
    time.sleep(1) # To avoid "database is being accessed by other users" errors.
    if _test_user_create(settings):
        if verbosity >= 1:
            print 'Destroying test user...'
        _destroy_test_user(cursor, parameters, verbosity)
    if _test_database_create(settings):
        if verbosity >= 1:
            print 'Destroying test database...'
        _destroy_test_db(cursor, parameters, verbosity)
    connection.close()

def _create_test_db(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_create_test_db(): dbname = %s" % parameters['dbname']
    statements = [
        """CREATE TABLESPACE %(tblspace)s
           DATAFILE '%(tblspace)s.dbf' SIZE 20M
           REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
        """,
        """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
           TEMPFILE '%(tblspace_temp)s.dbf' SIZE 20M
           REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
        """,
    ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _create_test_user(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_create_test_user(): username = %s" % parameters['user']
    statements = [
        """CREATE USER %(user)s
           IDENTIFIED BY %(password)s
           DEFAULT TABLESPACE %(tblspace)s
           TEMPORARY TABLESPACE %(tblspace_temp)s
        """,
        """GRANT CONNECT, RESOURCE TO %(user)s""",
    ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _destroy_test_db(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_destroy_test_db(): dbname=%s" % parameters['dbname']
    statements = [
        'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
        'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
        ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _destroy_test_user(cursor, parameters, verbosity):
    if verbosity >= 2:
        print "_destroy_test_user(): user=%s" % parameters['user']
        print "Be patient.  This can take some time..."
    statements = [
        'DROP USER %(user)s CASCADE',
    ]
    _execute_statements(cursor, statements, parameters, verbosity)

def _execute_statements(cursor, statements, parameters, verbosity):
    for template in statements:
        stmt = template % parameters
        if verbosity >= 2:
            print stmt
        try:
            cursor.execute(stmt)
        except Exception, err:
            sys.stderr.write("Failed (%s)\n" % (err))
            raise

def _test_database_name(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
    try:
        if settings.TEST_DATABASE_NAME:
            name = settings.TEST_DATABASE_NAME
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_create(settings):
    name = True
    try:
        if settings.TEST_DATABASE_CREATE:
            name = True
        else:
            name = False
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_user_create(settings):
    name = True
    try:
        if settings.TEST_USER_CREATE:
            name = True
        else:
            name = False
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_user(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
    try:
        if settings.TEST_DATABASE_USER:
            name = settings.TEST_DATABASE_USER
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_passwd(settings):
    name = PASSWORD
    try:
        if settings.TEST_DATABASE_PASSWD:
            name = settings.TEST_DATABASE_PASSWD
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_tblspace(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
    try:
        if settings.TEST_DATABASE_TBLSPACE:
            name = settings.TEST_DATABASE_TBLSPACE
    except AttributeError:
        pass
    except:
        raise
    return name

def _test_database_tblspace_tmp(settings):
    name = TEST_DATABASE_PREFIX + settings.DATABASE_NAME + '_temp'
    try:
        if settings.TEST_DATABASE_TBLSPACE_TMP:
            name = settings.TEST_DATABASE_TBLSPACE_TMP
    except AttributeError:
        pass
    except:
        raise
    return name