introspection.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. from __future__ import absolute_import, unicode_literals
  2. from django.db.backends.base.introspection import (
  3. BaseDatabaseIntrospection, FieldInfo, TableInfo
  4. )
  5. from . import ado_consts
  6. AUTO_FIELD_MARKER = -1000
  7. BIG_AUTO_FIELD_MARKER = -1001
  8. MONEY_FIELD_MARKER = -1002
  9. class DatabaseIntrospection(BaseDatabaseIntrospection):
  10. def get_field_type(self, data_type, description):
  11. field_type = self.data_types_reverse[data_type]
  12. if (field_type == 'CharField' and description.internal_size is not None and description.internal_size > 8000):
  13. field_type = 'TextField'
  14. return field_type
  15. def get_table_list(self, cursor):
  16. "Return a list of table and view names in the current database."
  17. cursor.execute("""\
  18. SELECT TABLE_NAME, 't'
  19. FROM INFORMATION_SCHEMA.TABLES
  20. WHERE TABLE_TYPE = 'BASE TABLE'
  21. UNION
  22. SELECT TABLE_NAME, 'v'
  23. FROM INFORMATION_SCHEMA.VIEWS
  24. """)
  25. return [TableInfo(row[0], row[1]) for row in cursor.fetchall()]
  26. def _is_auto_field(self, cursor, table_name, column_name):
  27. """Check if a column is an identity column.
  28. See: http://msdn2.microsoft.com/en-us/library/ms174968.aspx
  29. """
  30. sql = "SELECT COLUMNPROPERTY(OBJECT_ID(N'%s'), N'%s', 'IsIdentity')" % \
  31. (table_name, column_name)
  32. cursor.execute(sql)
  33. return cursor.fetchone()[0]
  34. def _get_table_field_type_map(self, cursor, table_name):
  35. """
  36. Return a dict mapping field name to data type. DB-API cursor description
  37. interprets the date columns as chars.
  38. """
  39. cursor.execute('''
  40. SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH]
  41. FROM INFORMATION_SCHEMA.COLUMNS
  42. WHERE [TABLE_NAME] LIKE \'%s\'
  43. ''' % table_name)
  44. results = dict([(c[0], (c[1], c[2])) for c in cursor.fetchall()])
  45. return results
  46. def _datatype_to_ado_type(self, datatype):
  47. """
  48. Map datatype name to ado type.
  49. """
  50. return {
  51. 'bigint': ado_consts.adBigInt,
  52. 'binary': ado_consts.adBinary,
  53. 'bit': ado_consts.adBoolean,
  54. 'char': ado_consts.adChar,
  55. 'date': ado_consts.adDBDate,
  56. 'datetime': ado_consts.adDBTimeStamp,
  57. 'datetime2': ado_consts.adDBTimeStamp,
  58. 'datetimeoffset': ado_consts.adDBTimeStamp,
  59. 'decimal': ado_consts.adDecimal,
  60. 'float': ado_consts.adDouble,
  61. 'image': ado_consts.adVarBinary,
  62. 'int': ado_consts.adInteger,
  63. 'money': MONEY_FIELD_MARKER,
  64. 'numeric': ado_consts.adNumeric,
  65. 'nchar': ado_consts.adWChar,
  66. 'ntext': ado_consts.adLongVarWChar,
  67. 'nvarchar': ado_consts.adVarWChar,
  68. 'smalldatetime': ado_consts.adDBTimeStamp,
  69. 'smallint': ado_consts.adSmallInt,
  70. 'smallmoney': MONEY_FIELD_MARKER,
  71. 'text': ado_consts.adLongVarChar,
  72. 'time': ado_consts.adDBTime,
  73. 'tinyint': ado_consts.adTinyInt,
  74. 'varbinary': ado_consts.adVarBinary,
  75. 'varchar': ado_consts.adVarChar,
  76. }.get(datatype.lower(), None)
  77. def get_table_description(self, cursor, table_name, identity_check=True):
  78. """Return a description of the table, with DB-API cursor.description interface.
  79. The 'auto_check' parameter has been added to the function argspec.
  80. If set to True, the function will check each of the table's fields for the
  81. IDENTITY property (the IDENTITY property is the MSSQL equivalent to an AutoField).
  82. When a field is found with an IDENTITY property, it is given a custom field number
  83. of SQL_AUTOFIELD, which maps to the 'AutoField' value in the DATA_TYPES_REVERSE dict.
  84. """
  85. table_field_type_map = self._get_table_field_type_map(cursor, table_name)
  86. cursor.execute("SELECT * FROM [%s] where 1=0" % (table_name))
  87. columns = cursor.description
  88. items = list()
  89. for column in columns:
  90. column = list(column) # Convert tuple to list
  91. # fix data type
  92. data_type, char_length = table_field_type_map.get(column[0])
  93. column[1] = self._datatype_to_ado_type(data_type)
  94. if identity_check and self._is_auto_field(cursor, table_name, column[0]):
  95. if column[1] == ado_consts.adBigInt:
  96. column[1] = BIG_AUTO_FIELD_MARKER
  97. else:
  98. column[1] = AUTO_FIELD_MARKER
  99. if column[1] == MONEY_FIELD_MARKER:
  100. # force decimal_places=4 to match data type. Cursor description thinks this column is a string
  101. column[5] = 4
  102. elif column[1] == ado_consts.adVarWChar and char_length == -1:
  103. # treat varchar(max) as text
  104. column[1] = self._datatype_to_ado_type('text')
  105. items.append(FieldInfo(*column))
  106. return items
  107. def _name_to_index(self, cursor, table_name):
  108. """Return a dictionary of {field_name: field_index} for the given table.
  109. Indexes are 0-based.
  110. """
  111. return dict([(d[0], i) for i, d in enumerate(self.get_table_description(cursor, table_name, False))])
  112. def get_relations(self, cursor, table_name):
  113. """
  114. Returns a dictionary of {field_name: (field_name_other_table, other_table)}
  115. representing all relationships to the given table.
  116. """
  117. constraints = self.get_key_columns(cursor, table_name)
  118. relations = {}
  119. for my_fieldname, other_table, other_field in constraints:
  120. relations[my_fieldname] = (other_field, other_table)
  121. return relations
  122. def get_key_columns(self, cursor, table_name):
  123. """
  124. Backends can override this to return a list of (column_name, referenced_table_name,
  125. referenced_column_name) for all key columns in given table.
  126. """
  127. # source_field_dict = self._name_to_index(cursor, table_name)
  128. sql = """
  129. select
  130. COLUMN_NAME = fk_cols.COLUMN_NAME,
  131. REFERENCED_TABLE_NAME = pk.TABLE_NAME,
  132. REFERENCED_COLUMN_NAME = pk_cols.COLUMN_NAME
  133. from INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ref_const
  134. join INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk
  135. on ref_const.CONSTRAINT_CATALOG = fk.CONSTRAINT_CATALOG
  136. and ref_const.CONSTRAINT_SCHEMA = fk.CONSTRAINT_SCHEMA
  137. and ref_const.CONSTRAINT_NAME = fk.CONSTRAINT_NAME
  138. and fk.CONSTRAINT_TYPE = 'FOREIGN KEY'
  139. join INFORMATION_SCHEMA.TABLE_CONSTRAINTS pk
  140. on ref_const.UNIQUE_CONSTRAINT_CATALOG = pk.CONSTRAINT_CATALOG
  141. and ref_const.UNIQUE_CONSTRAINT_SCHEMA = pk.CONSTRAINT_SCHEMA
  142. and ref_const.UNIQUE_CONSTRAINT_NAME = pk.CONSTRAINT_NAME
  143. And pk.CONSTRAINT_TYPE = 'PRIMARY KEY'
  144. join INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_cols
  145. on ref_const.CONSTRAINT_NAME = fk_cols.CONSTRAINT_NAME
  146. join INFORMATION_SCHEMA.KEY_COLUMN_USAGE pk_cols
  147. on pk.CONSTRAINT_NAME = pk_cols.CONSTRAINT_NAME
  148. where
  149. fk.TABLE_NAME = %s"""
  150. cursor.execute(sql, [table_name])
  151. relations = cursor.fetchall()
  152. key_columns = []
  153. key_columns.extend([(source_column, target_table, target_column)
  154. for source_column, target_table, target_column in relations])
  155. return key_columns
  156. def get_indexes(self, cursor, table_name):
  157. # Returns a dictionary of fieldname -> infodict for the given table,
  158. # where each infodict is in the format:
  159. # {'primary_key': boolean representing whether it's the primary key,
  160. # 'unique': boolean representing whether it's a unique index}
  161. sql = """
  162. select
  163. C.name as [column_name],
  164. IX.is_unique as [unique],
  165. IX.is_primary_key as [primary_key]
  166. from
  167. sys.tables T
  168. join sys.index_columns IC on IC.object_id = T.object_id
  169. join sys.columns C on C.object_id = T.object_id and C.column_id = IC.column_id
  170. join sys.indexes IX on IX.object_id = T.object_id and IX.index_id = IC.index_id
  171. where
  172. T.name = %s
  173. -- Omit multi-column keys
  174. and not exists (
  175. select *
  176. from sys.index_columns cols
  177. where
  178. cols.object_id = T.object_id
  179. and cols.index_id = IC.index_id
  180. and cols.key_ordinal > 1
  181. )
  182. """
  183. cursor.execute(sql, [table_name])
  184. constraints = cursor.fetchall()
  185. indexes = dict()
  186. for column_name, unique, primary_key in constraints:
  187. indexes[column_name.lower()] = {"primary_key": primary_key, "unique": unique}
  188. return indexes
  189. data_types_reverse = {
  190. AUTO_FIELD_MARKER: 'AutoField',
  191. BIG_AUTO_FIELD_MARKER: 'sqlserver_ado.fields.BigAutoField',
  192. MONEY_FIELD_MARKER: 'DecimalField',
  193. ado_consts.adBoolean: 'BooleanField',
  194. ado_consts.adChar: 'CharField',
  195. ado_consts.adWChar: 'CharField',
  196. ado_consts.adDecimal: 'DecimalField',
  197. ado_consts.adNumeric: 'DecimalField',
  198. ado_consts.adDate: 'DateField',
  199. ado_consts.adDBDate: 'DateField',
  200. ado_consts.adDBTime: 'TimeField',
  201. ado_consts.adDBTimeStamp: 'DateTimeField',
  202. ado_consts.adDouble: 'FloatField',
  203. ado_consts.adSingle: 'FloatField',
  204. ado_consts.adInteger: 'IntegerField',
  205. ado_consts.adBigInt: 'BigIntegerField',
  206. ado_consts.adSmallInt: 'SmallIntegerField',
  207. ado_consts.adTinyInt: 'SmallIntegerField',
  208. ado_consts.adVarChar: 'CharField',
  209. ado_consts.adVarWChar: 'CharField',
  210. ado_consts.adLongVarWChar: 'TextField',
  211. ado_consts.adLongVarChar: 'TextField',
  212. ado_consts.adBinary: 'BinaryField',
  213. ado_consts.adVarBinary: 'BinaryField',
  214. }
  215. def get_constraints(self, cursor, table_name):
  216. """
  217. Retrieves any constraints or keys (unique, pk, fk, check, index)
  218. across one or more columns.
  219. Returns a dict mapping constraint names to their attributes,
  220. where attributes is a dict with keys:
  221. * columns: List of columns this covers
  222. * primary_key: True if primary key, False otherwise
  223. * unique: True if this is a unique constraint, False otherwise
  224. * foreign_key: (table, column) of target, or None
  225. * check: True if check constraint, False otherwise
  226. * index: True if index, False otherwise.
  227. Some backends may return special constraint names that don't exist
  228. if they don't name constraints of a certain type (e.g. SQLite)
  229. """
  230. constraints = dict()
  231. # getting indexes (primary keys, unique, regular)
  232. sql = """
  233. select object_id, name, index_id, is_unique, is_primary_key
  234. from sys.indexes where object_id = OBJECT_ID(%s)
  235. """
  236. cursor.execute(sql, [table_name])
  237. for object_id, name, index_id, unique, primary_key in list(cursor.fetchall()):
  238. sql = """
  239. select name from sys.index_columns ic
  240. inner join sys.columns c on ic.column_id = c.column_id and ic.object_id = c.object_id
  241. where ic.object_id = %s and ic.index_id = %s
  242. """
  243. cursor.execute(sql, [object_id, index_id])
  244. columns = [row[0] for row in cursor.fetchall()]
  245. constraint = {"columns": list(columns),
  246. "primary_key": primary_key,
  247. "unique": unique,
  248. "index": True,
  249. "check": False,
  250. "foreign_key": None,
  251. }
  252. constraints[name] = constraint
  253. # getting foreign keys
  254. sql = """
  255. select fk.object_id, fk.name, rt.name from sys.foreign_keys fk
  256. inner join sys.tables rt on fk.referenced_object_id = rt.object_id
  257. where fk.parent_object_id = OBJECT_ID(%s)
  258. """
  259. cursor.execute(sql, [table_name])
  260. for id, name, ref_table_name in list(cursor.fetchall()):
  261. sql = """
  262. select cc.name, rc.name from sys.foreign_key_columns fkc
  263. inner join sys.columns rc on
  264. fkc.referenced_object_id = rc.object_id and fkc.referenced_column_id = rc.column_id
  265. inner join sys.columns cc on
  266. fkc.parent_object_id = cc.object_id and fkc.parent_column_id = cc.column_id
  267. where fkc.constraint_object_id = %s
  268. """
  269. cursor.execute(sql, [id])
  270. columns, fkcolumns = zip(*cursor.fetchall())
  271. constraint = {"columns": list(columns),
  272. "primary_key": False,
  273. "unique": False,
  274. "index": False,
  275. "check": False,
  276. "foreign_key": (ref_table_name, fkcolumns[0]),
  277. }
  278. constraints[name] = constraint
  279. # get check constraints
  280. sql = """
  281. SELECT kc.constraint_name, kc.column_name
  282. FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS kc
  283. JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS c ON
  284. kc.table_schema = c.table_schema AND
  285. kc.table_name = c.table_name AND
  286. kc.constraint_name = c.constraint_name
  287. WHERE
  288. c.constraint_type = 'CHECK'
  289. AND
  290. kc.table_name = %s
  291. """
  292. cursor.execute(sql, [table_name])
  293. for constraint, column in list(cursor.fetchall()):
  294. if column not in constraints:
  295. constraints[constraint] = {
  296. "columns": [],
  297. "primary_key": False,
  298. "unique": False,
  299. "index": False,
  300. "check": True,
  301. "foreign_key": None,
  302. }
  303. constraints[constraint]['columns'].append(column)
  304. return constraints