introspection.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import re
  2. from django.db.backends import BaseDatabaseIntrospection, FieldInfo
  3. field_size_re = re.compile(r'^\s*(?:var)?char\s*\(\s*(\d+)\s*\)\s*$')
  4. def get_field_size(name):
  5. """ Extract the size number from a "varchar(11)" type name """
  6. m = field_size_re.search(name)
  7. return int(m.group(1)) if m else None
  8. # This light wrapper "fakes" a dictionary interface, because some SQLite data
  9. # types include variables in them -- e.g. "varchar(30)" -- and can't be matched
  10. # as a simple dictionary lookup.
  11. class FlexibleFieldLookupDict(object):
  12. # Maps SQL types to Django Field types. Some of the SQL types have multiple
  13. # entries here because SQLite allows for anything and doesn't normalize the
  14. # field type; it uses whatever was given.
  15. base_data_types_reverse = {
  16. 'bool': 'BooleanField',
  17. 'boolean': 'BooleanField',
  18. 'smallint': 'SmallIntegerField',
  19. 'smallint unsigned': 'PositiveSmallIntegerField',
  20. 'smallinteger': 'SmallIntegerField',
  21. 'int': 'IntegerField',
  22. 'integer': 'IntegerField',
  23. 'bigint': 'BigIntegerField',
  24. 'integer unsigned': 'PositiveIntegerField',
  25. 'decimal': 'DecimalField',
  26. 'real': 'FloatField',
  27. 'text': 'TextField',
  28. 'char': 'CharField',
  29. 'blob': 'BinaryField',
  30. 'date': 'DateField',
  31. 'datetime': 'DateTimeField',
  32. 'time': 'TimeField',
  33. }
  34. def __getitem__(self, key):
  35. key = key.lower()
  36. try:
  37. return self.base_data_types_reverse[key]
  38. except KeyError:
  39. size = get_field_size(key)
  40. if size is not None:
  41. return ('CharField', {'max_length': size})
  42. raise KeyError
  43. class DatabaseIntrospection(BaseDatabaseIntrospection):
  44. data_types_reverse = FlexibleFieldLookupDict()
  45. def get_table_list(self, cursor):
  46. "Returns a list of table names in the current database."
  47. # Skip the sqlite_sequence system table used for autoincrement key
  48. # generation.
  49. cursor.execute("""
  50. SELECT name FROM sqlite_master
  51. WHERE type in ('table', 'view') AND NOT name='sqlite_sequence'
  52. ORDER BY name""")
  53. return [row[0] for row in cursor.fetchall()]
  54. def get_table_description(self, cursor, table_name):
  55. "Returns a description of the table, with the DB-API cursor.description interface."
  56. return [FieldInfo(info['name'], info['type'], None, info['size'], None, None,
  57. info['null_ok']) for info in self._table_info(cursor, table_name)]
  58. def get_relations(self, cursor, table_name):
  59. """
  60. Returns a dictionary of {field_index: (field_index_other_table, other_table)}
  61. representing all relationships to the given table. Indexes are 0-based.
  62. """
  63. # Dictionary of relations to return
  64. relations = {}
  65. # Schema for this table
  66. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s AND type = %s", [table_name, "table"])
  67. try:
  68. results = cursor.fetchone()[0].strip()
  69. except TypeError:
  70. # It might be a view, then no results will be returned
  71. return relations
  72. results = results[results.index('(') + 1:results.rindex(')')]
  73. # Walk through and look for references to other tables. SQLite doesn't
  74. # really have enforced references, but since it echoes out the SQL used
  75. # to create the table we can look for REFERENCES statements used there.
  76. for field_index, field_desc in enumerate(results.split(',')):
  77. field_desc = field_desc.strip()
  78. if field_desc.startswith("UNIQUE"):
  79. continue
  80. m = re.search('references (.*) \(["|](.*)["|]\)', field_desc, re.I)
  81. if not m:
  82. continue
  83. table, column = [s.strip('"') for s in m.groups()]
  84. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table])
  85. result = cursor.fetchall()[0]
  86. other_table_results = result[0].strip()
  87. li, ri = other_table_results.index('('), other_table_results.rindex(')')
  88. other_table_results = other_table_results[li + 1:ri]
  89. for other_index, other_desc in enumerate(other_table_results.split(',')):
  90. other_desc = other_desc.strip()
  91. if other_desc.startswith('UNIQUE'):
  92. continue
  93. name = other_desc.split(' ', 1)[0].strip('"')
  94. if name == column:
  95. relations[field_index] = (other_index, table)
  96. break
  97. return relations
  98. def get_key_columns(self, cursor, table_name):
  99. """
  100. Returns a list of (column_name, referenced_table_name, referenced_column_name) for all
  101. key columns in given table.
  102. """
  103. key_columns = []
  104. # Schema for this table
  105. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s AND type = %s", [table_name, "table"])
  106. results = cursor.fetchone()[0].strip()
  107. results = results[results.index('(') + 1:results.rindex(')')]
  108. # Walk through and look for references to other tables. SQLite doesn't
  109. # really have enforced references, but since it echoes out the SQL used
  110. # to create the table we can look for REFERENCES statements used there.
  111. for field_index, field_desc in enumerate(results.split(',')):
  112. field_desc = field_desc.strip()
  113. if field_desc.startswith("UNIQUE"):
  114. continue
  115. m = re.search('"(.*)".*references (.*) \(["|](.*)["|]\)', field_desc, re.I)
  116. if not m:
  117. continue
  118. # This will append (column_name, referenced_table_name, referenced_column_name) to key_columns
  119. key_columns.append(tuple(s.strip('"') for s in m.groups()))
  120. return key_columns
  121. def get_indexes(self, cursor, table_name):
  122. indexes = {}
  123. for info in self._table_info(cursor, table_name):
  124. if info['pk'] != 0:
  125. indexes[info['name']] = {'primary_key': True,
  126. 'unique': False}
  127. cursor.execute('PRAGMA index_list(%s)' % self.connection.ops.quote_name(table_name))
  128. # seq, name, unique
  129. for index, unique in [(field[1], field[2]) for field in cursor.fetchall()]:
  130. cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index))
  131. info = cursor.fetchall()
  132. # Skip indexes across multiple fields
  133. if len(info) != 1:
  134. continue
  135. name = info[0][2] # seqno, cid, name
  136. indexes[name] = {'primary_key': indexes.get(name, {}).get("primary_key", False),
  137. 'unique': unique}
  138. return indexes
  139. def get_primary_key_column(self, cursor, table_name):
  140. """
  141. Get the column name of the primary key for the given table.
  142. """
  143. # Don't use PRAGMA because that causes issues with some transactions
  144. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s AND type = %s", [table_name, "table"])
  145. row = cursor.fetchone()
  146. if row is None:
  147. raise ValueError("Table %s does not exist" % table_name)
  148. results = row[0].strip()
  149. results = results[results.index('(') + 1:results.rindex(')')]
  150. for field_desc in results.split(','):
  151. field_desc = field_desc.strip()
  152. m = re.search('"(.*)".*PRIMARY KEY( AUTOINCREMENT)?$', field_desc)
  153. if m:
  154. return m.groups()[0]
  155. return None
  156. def _table_info(self, cursor, name):
  157. cursor.execute('PRAGMA table_info(%s)' % self.connection.ops.quote_name(name))
  158. # cid, name, type, notnull, dflt_value, pk
  159. return [{'name': field[1],
  160. 'type': field[2],
  161. 'size': get_field_size(field[2]),
  162. 'null_ok': not field[3],
  163. 'pk': field[5] # undocumented
  164. } for field in cursor.fetchall()]
  165. def get_constraints(self, cursor, table_name):
  166. """
  167. Retrieves any constraints or keys (unique, pk, fk, check, index) across one or more columns.
  168. """
  169. constraints = {}
  170. # Get the index info
  171. cursor.execute("PRAGMA index_list(%s)" % self.connection.ops.quote_name(table_name))
  172. for number, index, unique in cursor.fetchall():
  173. # Get the index info for that index
  174. cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index))
  175. for index_rank, column_rank, column in cursor.fetchall():
  176. if index not in constraints:
  177. constraints[index] = {
  178. "columns": [],
  179. "primary_key": False,
  180. "unique": bool(unique),
  181. "foreign_key": False,
  182. "check": False,
  183. "index": True,
  184. }
  185. constraints[index]['columns'].append(column)
  186. # Get the PK
  187. pk_column = self.get_primary_key_column(cursor, table_name)
  188. if pk_column:
  189. # SQLite doesn't actually give a name to the PK constraint,
  190. # so we invent one. This is fine, as the SQLite backend never
  191. # deletes PK constraints by name, as you can't delete constraints
  192. # in SQLite; we remake the table with a new PK instead.
  193. constraints["__primary__"] = {
  194. "columns": [pk_column],
  195. "primary_key": True,
  196. "unique": False, # It's not actually a unique constraint.
  197. "foreign_key": False,
  198. "check": False,
  199. "index": False,
  200. }
  201. return constraints