1 from django.db.backends.sqlite3.base import DatabaseOperations |
1 from django.db.backends import BaseDatabaseIntrospection |
2 |
|
3 quote_name = DatabaseOperations().quote_name |
|
4 |
|
5 def get_table_list(cursor): |
|
6 "Returns a list of table names in the current database." |
|
7 # Skip the sqlite_sequence system table used for autoincrement key |
|
8 # generation. |
|
9 cursor.execute(""" |
|
10 SELECT name FROM sqlite_master |
|
11 WHERE type='table' AND NOT name='sqlite_sequence' |
|
12 ORDER BY name""") |
|
13 return [row[0] for row in cursor.fetchall()] |
|
14 |
|
15 def get_table_description(cursor, table_name): |
|
16 "Returns a description of the table, with the DB-API cursor.description interface." |
|
17 return [(info['name'], info['type'], None, None, None, None, |
|
18 info['null_ok']) for info in _table_info(cursor, table_name)] |
|
19 |
|
20 def get_relations(cursor, table_name): |
|
21 raise NotImplementedError |
|
22 |
|
23 def get_indexes(cursor, table_name): |
|
24 """ |
|
25 Returns a dictionary of fieldname -> infodict for the given table, |
|
26 where each infodict is in the format: |
|
27 {'primary_key': boolean representing whether it's the primary key, |
|
28 'unique': boolean representing whether it's a unique index} |
|
29 """ |
|
30 indexes = {} |
|
31 for info in _table_info(cursor, table_name): |
|
32 indexes[info['name']] = {'primary_key': info['pk'] != 0, |
|
33 'unique': False} |
|
34 cursor.execute('PRAGMA index_list(%s)' % quote_name(table_name)) |
|
35 # seq, name, unique |
|
36 for index, unique in [(field[1], field[2]) for field in cursor.fetchall()]: |
|
37 if not unique: |
|
38 continue |
|
39 cursor.execute('PRAGMA index_info(%s)' % quote_name(index)) |
|
40 info = cursor.fetchall() |
|
41 # Skip indexes across multiple fields |
|
42 if len(info) != 1: |
|
43 continue |
|
44 name = info[0][2] # seqno, cid, name |
|
45 indexes[name]['unique'] = True |
|
46 return indexes |
|
47 |
|
48 def _table_info(cursor, name): |
|
49 cursor.execute('PRAGMA table_info(%s)' % quote_name(name)) |
|
50 # cid, name, type, notnull, dflt_value, pk |
|
51 return [{'name': field[1], |
|
52 'type': field[2], |
|
53 'null_ok': not field[3], |
|
54 'pk': field[5] # undocumented |
|
55 } for field in cursor.fetchall()] |
|
56 |
|
57 # Maps SQL types to Django Field types. Some of the SQL types have multiple |
|
58 # entries here because SQLite allows for anything and doesn't normalize the |
|
59 # field type; it uses whatever was given. |
|
60 BASE_DATA_TYPES_REVERSE = { |
|
61 'bool': 'BooleanField', |
|
62 'boolean': 'BooleanField', |
|
63 'smallint': 'SmallIntegerField', |
|
64 'smallinteger': 'SmallIntegerField', |
|
65 'int': 'IntegerField', |
|
66 'integer': 'IntegerField', |
|
67 'text': 'TextField', |
|
68 'char': 'CharField', |
|
69 'date': 'DateField', |
|
70 'datetime': 'DateTimeField', |
|
71 'time': 'TimeField', |
|
72 } |
|
73 |
2 |
74 # This light wrapper "fakes" a dictionary interface, because some SQLite data |
3 # This light wrapper "fakes" a dictionary interface, because some SQLite data |
75 # types include variables in them -- e.g. "varchar(30)" -- and can't be matched |
4 # types include variables in them -- e.g. "varchar(30)" -- and can't be matched |
76 # as a simple dictionary lookup. |
5 # as a simple dictionary lookup. |
77 class FlexibleFieldLookupDict: |
6 class FlexibleFieldLookupDict: |
|
7 # Maps SQL types to Django Field types. Some of the SQL types have multiple |
|
8 # entries here because SQLite allows for anything and doesn't normalize the |
|
9 # field type; it uses whatever was given. |
|
10 base_data_types_reverse = { |
|
11 'bool': 'BooleanField', |
|
12 'boolean': 'BooleanField', |
|
13 'smallint': 'SmallIntegerField', |
|
14 'smallint unsigned': 'PositiveSmallIntegerField', |
|
15 'smallinteger': 'SmallIntegerField', |
|
16 'int': 'IntegerField', |
|
17 'integer': 'IntegerField', |
|
18 'integer unsigned': 'PositiveIntegerField', |
|
19 'decimal': 'DecimalField', |
|
20 'real': 'FloatField', |
|
21 'text': 'TextField', |
|
22 'char': 'CharField', |
|
23 'date': 'DateField', |
|
24 'datetime': 'DateTimeField', |
|
25 'time': 'TimeField', |
|
26 } |
|
27 |
78 def __getitem__(self, key): |
28 def __getitem__(self, key): |
79 key = key.lower() |
29 key = key.lower() |
80 try: |
30 try: |
81 return BASE_DATA_TYPES_REVERSE[key] |
31 return self.base_data_types_reverse[key] |
82 except KeyError: |
32 except KeyError: |
83 import re |
33 import re |
84 m = re.search(r'^\s*(?:var)?char\s*\(\s*(\d+)\s*\)\s*$', key) |
34 m = re.search(r'^\s*(?:var)?char\s*\(\s*(\d+)\s*\)\s*$', key) |
85 if m: |
35 if m: |
86 return ('CharField', {'max_length': int(m.group(1))}) |
36 return ('CharField', {'max_length': int(m.group(1))}) |
87 raise KeyError |
37 raise KeyError |
88 |
38 |
89 DATA_TYPES_REVERSE = FlexibleFieldLookupDict() |
39 class DatabaseIntrospection(BaseDatabaseIntrospection): |
|
40 data_types_reverse = FlexibleFieldLookupDict() |
|
41 |
|
42 def get_table_list(self, cursor): |
|
43 "Returns a list of table names in the current database." |
|
44 # Skip the sqlite_sequence system table used for autoincrement key |
|
45 # generation. |
|
46 cursor.execute(""" |
|
47 SELECT name FROM sqlite_master |
|
48 WHERE type='table' AND NOT name='sqlite_sequence' |
|
49 ORDER BY name""") |
|
50 return [row[0] for row in cursor.fetchall()] |
|
51 |
|
52 def get_table_description(self, cursor, table_name): |
|
53 "Returns a description of the table, with the DB-API cursor.description interface." |
|
54 return [(info['name'], info['type'], None, None, None, None, |
|
55 info['null_ok']) for info in self._table_info(cursor, table_name)] |
|
56 |
|
57 def get_relations(self, cursor, table_name): |
|
58 raise NotImplementedError |
|
59 |
|
60 def get_indexes(self, cursor, table_name): |
|
61 """ |
|
62 Returns a dictionary of fieldname -> infodict for the given table, |
|
63 where each infodict is in the format: |
|
64 {'primary_key': boolean representing whether it's the primary key, |
|
65 'unique': boolean representing whether it's a unique index} |
|
66 """ |
|
67 indexes = {} |
|
68 for info in self._table_info(cursor, table_name): |
|
69 indexes[info['name']] = {'primary_key': info['pk'] != 0, |
|
70 'unique': False} |
|
71 cursor.execute('PRAGMA index_list(%s)' % self.connection.ops.quote_name(table_name)) |
|
72 # seq, name, unique |
|
73 for index, unique in [(field[1], field[2]) for field in cursor.fetchall()]: |
|
74 if not unique: |
|
75 continue |
|
76 cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index)) |
|
77 info = cursor.fetchall() |
|
78 # Skip indexes across multiple fields |
|
79 if len(info) != 1: |
|
80 continue |
|
81 name = info[0][2] # seqno, cid, name |
|
82 indexes[name]['unique'] = True |
|
83 return indexes |
|
84 |
|
85 def _table_info(self, cursor, name): |
|
86 cursor.execute('PRAGMA table_info(%s)' % self.connection.ops.quote_name(name)) |
|
87 # cid, name, type, notnull, dflt_value, pk |
|
88 return [{'name': field[1], |
|
89 'type': field[2], |
|
90 'null_ok': not field[3], |
|
91 'pk': field[5] # undocumented |
|
92 } for field in cursor.fetchall()] |
|
93 |