introspection.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from __future__ import unicode_literals
  2. from django.db.backends import BaseDatabaseIntrospection, FieldInfo
  3. from django.utils.encoding import force_text
  4. class DatabaseIntrospection(BaseDatabaseIntrospection):
  5. # Maps type codes to Django Field types.
  6. data_types_reverse = {
  7. 16: 'BooleanField',
  8. 17: 'BinaryField',
  9. 20: 'BigIntegerField',
  10. 21: 'SmallIntegerField',
  11. 23: 'IntegerField',
  12. 25: 'TextField',
  13. 700: 'FloatField',
  14. 701: 'FloatField',
  15. 869: 'GenericIPAddressField',
  16. 1042: 'CharField', # blank-padded
  17. 1043: 'CharField',
  18. 1082: 'DateField',
  19. 1083: 'TimeField',
  20. 1114: 'DateTimeField',
  21. 1184: 'DateTimeField',
  22. 1266: 'TimeField',
  23. 1700: 'DecimalField',
  24. }
  25. ignored_tables = []
  26. def get_table_list(self, cursor):
  27. "Returns a list of table names in the current database."
  28. cursor.execute("""
  29. SELECT c.relname
  30. FROM pg_catalog.pg_class c
  31. LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
  32. WHERE c.relkind IN ('r', 'v', '')
  33. AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
  34. AND pg_catalog.pg_table_is_visible(c.oid)""")
  35. return [row[0] for row in cursor.fetchall() if row[0] not in self.ignored_tables]
  36. def get_table_description(self, cursor, table_name):
  37. "Returns a description of the table, with the DB-API cursor.description interface."
  38. # As cursor.description does not return reliably the nullable property,
  39. # we have to query the information_schema (#7783)
  40. cursor.execute("""
  41. SELECT column_name, is_nullable
  42. FROM information_schema.columns
  43. WHERE table_name = %s""", [table_name])
  44. null_map = dict(cursor.fetchall())
  45. cursor.execute("SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name))
  46. return [FieldInfo(*((force_text(line[0]),) + line[1:6] + (null_map[force_text(line[0])] == 'YES',)))
  47. for line in cursor.description]
  48. def get_relations(self, cursor, table_name):
  49. """
  50. Returns a dictionary of {field_index: (field_index_other_table, other_table)}
  51. representing all relationships to the given table. Indexes are 0-based.
  52. """
  53. cursor.execute("""
  54. SELECT con.conkey, con.confkey, c2.relname
  55. FROM pg_constraint con, pg_class c1, pg_class c2
  56. WHERE c1.oid = con.conrelid
  57. AND c2.oid = con.confrelid
  58. AND c1.relname = %s
  59. AND con.contype = 'f'""", [table_name])
  60. relations = {}
  61. for row in cursor.fetchall():
  62. # row[0] and row[1] are single-item lists, so grab the single item.
  63. relations[row[0][0] - 1] = (row[1][0] - 1, row[2])
  64. return relations
  65. def get_key_columns(self, cursor, table_name):
  66. key_columns = []
  67. cursor.execute("""
  68. SELECT kcu.column_name, ccu.table_name AS referenced_table, ccu.column_name AS referenced_column
  69. FROM information_schema.constraint_column_usage ccu
  70. LEFT JOIN information_schema.key_column_usage kcu
  71. ON ccu.constraint_catalog = kcu.constraint_catalog
  72. AND ccu.constraint_schema = kcu.constraint_schema
  73. AND ccu.constraint_name = kcu.constraint_name
  74. LEFT JOIN information_schema.table_constraints tc
  75. ON ccu.constraint_catalog = tc.constraint_catalog
  76. AND ccu.constraint_schema = tc.constraint_schema
  77. AND ccu.constraint_name = tc.constraint_name
  78. WHERE kcu.table_name = %s AND tc.constraint_type = 'FOREIGN KEY'""", [table_name])
  79. key_columns.extend(cursor.fetchall())
  80. return key_columns
  81. def get_indexes(self, cursor, table_name):
  82. # This query retrieves each index on the given table, including the
  83. # first associated field name
  84. cursor.execute("""
  85. SELECT attr.attname, idx.indkey, idx.indisunique, idx.indisprimary
  86. FROM pg_catalog.pg_class c, pg_catalog.pg_class c2,
  87. pg_catalog.pg_index idx, pg_catalog.pg_attribute attr
  88. WHERE c.oid = idx.indrelid
  89. AND idx.indexrelid = c2.oid
  90. AND attr.attrelid = c.oid
  91. AND attr.attnum = idx.indkey[0]
  92. AND c.relname = %s""", [table_name])
  93. indexes = {}
  94. for row in cursor.fetchall():
  95. # row[1] (idx.indkey) is stored in the DB as an array. It comes out as
  96. # a string of space-separated integers. This designates the field
  97. # indexes (1-based) of the fields that have indexes on the table.
  98. # Here, we skip any indexes across multiple fields.
  99. if ' ' in row[1]:
  100. continue
  101. if row[0] not in indexes:
  102. indexes[row[0]] = {'primary_key': False, 'unique': False}
  103. # It's possible to have the unique and PK constraints in separate indexes.
  104. if row[3]:
  105. indexes[row[0]]['primary_key'] = True
  106. if row[2]:
  107. indexes[row[0]]['unique'] = True
  108. return indexes
  109. def get_constraints(self, cursor, table_name):
  110. """
  111. Retrieves any constraints or keys (unique, pk, fk, check, index) across one or more columns.
  112. """
  113. constraints = {}
  114. # Loop over the key table, collecting things as constraints
  115. # This will get PKs, FKs, and uniques, but not CHECK
  116. cursor.execute("""
  117. SELECT
  118. kc.constraint_name,
  119. kc.column_name,
  120. c.constraint_type,
  121. array(SELECT table_name::text || '.' || column_name::text FROM information_schema.constraint_column_usage WHERE constraint_name = kc.constraint_name)
  122. FROM information_schema.key_column_usage AS kc
  123. JOIN information_schema.table_constraints AS c ON
  124. kc.table_schema = c.table_schema AND
  125. kc.table_name = c.table_name AND
  126. kc.constraint_name = c.constraint_name
  127. WHERE
  128. kc.table_schema = %s AND
  129. kc.table_name = %s
  130. ORDER BY kc.ordinal_position ASC
  131. """, ["public", table_name])
  132. for constraint, column, kind, used_cols in cursor.fetchall():
  133. # If we're the first column, make the record
  134. if constraint not in constraints:
  135. constraints[constraint] = {
  136. "columns": [],
  137. "primary_key": kind.lower() == "primary key",
  138. "unique": kind.lower() in ["primary key", "unique"],
  139. "foreign_key": tuple(used_cols[0].split(".", 1)) if kind.lower() == "foreign key" else None,
  140. "check": False,
  141. "index": False,
  142. }
  143. # Record the details
  144. constraints[constraint]['columns'].append(column)
  145. # Now get CHECK constraint columns
  146. cursor.execute("""
  147. SELECT kc.constraint_name, kc.column_name
  148. FROM information_schema.constraint_column_usage AS kc
  149. JOIN information_schema.table_constraints AS c ON
  150. kc.table_schema = c.table_schema AND
  151. kc.table_name = c.table_name AND
  152. kc.constraint_name = c.constraint_name
  153. WHERE
  154. c.constraint_type = 'CHECK' AND
  155. kc.table_schema = %s AND
  156. kc.table_name = %s
  157. """, ["public", table_name])
  158. for constraint, column in cursor.fetchall():
  159. # If we're the first column, make the record
  160. if constraint not in constraints:
  161. constraints[constraint] = {
  162. "columns": [],
  163. "primary_key": False,
  164. "unique": False,
  165. "foreign_key": None,
  166. "check": True,
  167. "index": False,
  168. }
  169. # Record the details
  170. constraints[constraint]['columns'].append(column)
  171. # Now get indexes
  172. cursor.execute("""
  173. SELECT
  174. c2.relname,
  175. ARRAY(
  176. SELECT (SELECT attname FROM pg_catalog.pg_attribute WHERE attnum = i AND attrelid = c.oid)
  177. FROM unnest(idx.indkey) i
  178. ),
  179. idx.indisunique,
  180. idx.indisprimary
  181. FROM pg_catalog.pg_class c, pg_catalog.pg_class c2,
  182. pg_catalog.pg_index idx
  183. WHERE c.oid = idx.indrelid
  184. AND idx.indexrelid = c2.oid
  185. AND c.relname = %s
  186. """, [table_name])
  187. for index, columns, unique, primary in cursor.fetchall():
  188. if index not in constraints:
  189. constraints[index] = {
  190. "columns": list(columns),
  191. "primary_key": primary,
  192. "unique": unique,
  193. "foreign_key": None,
  194. "check": False,
  195. "index": True,
  196. }
  197. return constraints