creation.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import sys
  2. import time
  3. from django.conf import settings
  4. from django.db.backends.creation import BaseDatabaseCreation
  5. from django.utils.six.moves import input
  6. TEST_DATABASE_PREFIX = 'test_'
  7. PASSWORD = 'Im_a_lumberjack'
  8. class DatabaseCreation(BaseDatabaseCreation):
  9. # This dictionary maps Field objects to their associated Oracle column
  10. # types, as strings. Column-type strings can contain format strings; they'll
  11. # be interpolated against the values of Field.__dict__ before being output.
  12. # If a column type is set to None, it won't be included in the output.
  13. #
  14. # Any format strings starting with "qn_" are quoted before being used in the
  15. # output (the "qn_" prefix is stripped before the lookup is performed.
  16. data_types = {
  17. 'AutoField': 'NUMBER(11)',
  18. 'BinaryField': 'BLOB',
  19. 'BooleanField': 'NUMBER(1)',
  20. 'CharField': 'NVARCHAR2(%(max_length)s)',
  21. 'CommaSeparatedIntegerField': 'VARCHAR2(%(max_length)s)',
  22. 'DateField': 'DATE',
  23. 'DateTimeField': 'TIMESTAMP',
  24. 'DecimalField': 'NUMBER(%(max_digits)s, %(decimal_places)s)',
  25. 'FileField': 'NVARCHAR2(%(max_length)s)',
  26. 'FilePathField': 'NVARCHAR2(%(max_length)s)',
  27. 'FloatField': 'DOUBLE PRECISION',
  28. 'IntegerField': 'NUMBER(11)',
  29. 'BigIntegerField': 'NUMBER(19)',
  30. 'IPAddressField': 'VARCHAR2(15)',
  31. 'GenericIPAddressField': 'VARCHAR2(39)',
  32. 'NullBooleanField': 'NUMBER(1)',
  33. 'OneToOneField': 'NUMBER(11)',
  34. 'PositiveIntegerField': 'NUMBER(11)',
  35. 'PositiveSmallIntegerField': 'NUMBER(11)',
  36. 'SlugField': 'NVARCHAR2(%(max_length)s)',
  37. 'SmallIntegerField': 'NUMBER(11)',
  38. 'TextField': 'NCLOB',
  39. 'TimeField': 'TIMESTAMP',
  40. 'URLField': 'VARCHAR2(%(max_length)s)',
  41. }
  42. data_type_check_constraints = {
  43. 'BooleanField': '%(qn_column)s IN (0,1)',
  44. 'NullBooleanField': '(%(qn_column)s IN (0,1)) OR (%(qn_column)s IS NULL)',
  45. 'PositiveIntegerField': '%(qn_column)s >= 0',
  46. 'PositiveSmallIntegerField': '%(qn_column)s >= 0',
  47. }
  48. def __init__(self, connection):
  49. super(DatabaseCreation, self).__init__(connection)
  50. def _create_test_db(self, verbosity=1, autoclobber=False):
  51. TEST_NAME = self._test_database_name()
  52. TEST_USER = self._test_database_user()
  53. TEST_PASSWD = self._test_database_passwd()
  54. TEST_TBLSPACE = self._test_database_tblspace()
  55. TEST_TBLSPACE_TMP = self._test_database_tblspace_tmp()
  56. parameters = {
  57. 'dbname': TEST_NAME,
  58. 'user': TEST_USER,
  59. 'password': TEST_PASSWD,
  60. 'tblspace': TEST_TBLSPACE,
  61. 'tblspace_temp': TEST_TBLSPACE_TMP,
  62. }
  63. cursor = self.connection.cursor()
  64. if self._test_database_create():
  65. try:
  66. self._execute_test_db_creation(cursor, parameters, verbosity)
  67. except Exception as e:
  68. sys.stderr.write("Got an error creating the test database: %s\n" % e)
  69. if not autoclobber:
  70. confirm = input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_NAME)
  71. if autoclobber or confirm == 'yes':
  72. try:
  73. if verbosity >= 1:
  74. print("Destroying old test database '%s'..." % self.connection.alias)
  75. self._execute_test_db_destruction(cursor, parameters, verbosity)
  76. self._execute_test_db_creation(cursor, parameters, verbosity)
  77. except Exception as e:
  78. sys.stderr.write("Got an error recreating the test database: %s\n" % e)
  79. sys.exit(2)
  80. else:
  81. print("Tests cancelled.")
  82. sys.exit(1)
  83. if self._test_user_create():
  84. if verbosity >= 1:
  85. print("Creating test user...")
  86. try:
  87. self._create_test_user(cursor, parameters, verbosity)
  88. except Exception as e:
  89. sys.stderr.write("Got an error creating the test user: %s\n" % e)
  90. if not autoclobber:
  91. confirm = input("It appears the test user, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_USER)
  92. if autoclobber or confirm == 'yes':
  93. try:
  94. if verbosity >= 1:
  95. print("Destroying old test user...")
  96. self._destroy_test_user(cursor, parameters, verbosity)
  97. if verbosity >= 1:
  98. print("Creating test user...")
  99. self._create_test_user(cursor, parameters, verbosity)
  100. except Exception as e:
  101. sys.stderr.write("Got an error recreating the test user: %s\n" % e)
  102. sys.exit(2)
  103. else:
  104. print("Tests cancelled.")
  105. sys.exit(1)
  106. self.connection.close() # done with main user -- test user and tablespaces created
  107. real_settings = settings.DATABASES[self.connection.alias]
  108. real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = self.connection.settings_dict['USER']
  109. real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = self.connection.settings_dict['PASSWORD']
  110. real_test_settings = real_settings['TEST']
  111. test_settings = self.connection.settings_dict['TEST']
  112. real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = self.connection.settings_dict['USER'] = TEST_USER
  113. real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = TEST_PASSWD
  114. return self.connection.settings_dict['NAME']
  115. def _destroy_test_db(self, test_database_name, verbosity=1):
  116. """
  117. Destroy a test database, prompting the user for confirmation if the
  118. database already exists. Returns the name of the test database created.
  119. """
  120. TEST_NAME = self._test_database_name()
  121. TEST_USER = self._test_database_user()
  122. TEST_PASSWD = self._test_database_passwd()
  123. TEST_TBLSPACE = self._test_database_tblspace()
  124. TEST_TBLSPACE_TMP = self._test_database_tblspace_tmp()
  125. self.connection.settings_dict['USER'] = self.connection.settings_dict['SAVED_USER']
  126. self.connection.settings_dict['PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD']
  127. parameters = {
  128. 'dbname': TEST_NAME,
  129. 'user': TEST_USER,
  130. 'password': TEST_PASSWD,
  131. 'tblspace': TEST_TBLSPACE,
  132. 'tblspace_temp': TEST_TBLSPACE_TMP,
  133. }
  134. cursor = self.connection.cursor()
  135. time.sleep(1) # To avoid "database is being accessed by other users" errors.
  136. if self._test_user_create():
  137. if verbosity >= 1:
  138. print('Destroying test user...')
  139. self._destroy_test_user(cursor, parameters, verbosity)
  140. if self._test_database_create():
  141. if verbosity >= 1:
  142. print('Destroying test database tables...')
  143. self._execute_test_db_destruction(cursor, parameters, verbosity)
  144. self.connection.close()
  145. def _execute_test_db_creation(self, cursor, parameters, verbosity):
  146. if verbosity >= 2:
  147. print("_create_test_db(): dbname = %s" % parameters['dbname'])
  148. statements = [
  149. """CREATE TABLESPACE %(tblspace)s
  150. DATAFILE '%(tblspace)s.dbf' SIZE 20M
  151. REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 200M
  152. """,
  153. """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
  154. TEMPFILE '%(tblspace_temp)s.dbf' SIZE 20M
  155. REUSE AUTOEXTEND ON NEXT 10M MAXSIZE 100M
  156. """,
  157. ]
  158. self._execute_statements(cursor, statements, parameters, verbosity)
  159. def _create_test_user(self, cursor, parameters, verbosity):
  160. if verbosity >= 2:
  161. print("_create_test_user(): username = %s" % parameters['user'])
  162. statements = [
  163. """CREATE USER %(user)s
  164. IDENTIFIED BY %(password)s
  165. DEFAULT TABLESPACE %(tblspace)s
  166. TEMPORARY TABLESPACE %(tblspace_temp)s
  167. QUOTA UNLIMITED ON %(tblspace)s
  168. """,
  169. """GRANT CONNECT, RESOURCE TO %(user)s""",
  170. ]
  171. self._execute_statements(cursor, statements, parameters, verbosity)
  172. def _execute_test_db_destruction(self, cursor, parameters, verbosity):
  173. if verbosity >= 2:
  174. print("_execute_test_db_destruction(): dbname=%s" % parameters['dbname'])
  175. statements = [
  176. 'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
  177. 'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
  178. ]
  179. self._execute_statements(cursor, statements, parameters, verbosity)
  180. def _destroy_test_user(self, cursor, parameters, verbosity):
  181. if verbosity >= 2:
  182. print("_destroy_test_user(): user=%s" % parameters['user'])
  183. print("Be patient. This can take some time...")
  184. statements = [
  185. 'DROP USER %(user)s CASCADE',
  186. ]
  187. self._execute_statements(cursor, statements, parameters, verbosity)
  188. def _execute_statements(self, cursor, statements, parameters, verbosity):
  189. for template in statements:
  190. stmt = template % parameters
  191. if verbosity >= 2:
  192. print(stmt)
  193. try:
  194. cursor.execute(stmt)
  195. except Exception as err:
  196. sys.stderr.write("Failed (%s)\n" % (err))
  197. raise
  198. def _test_settings_get(self, key, default=None, prefixed=None):
  199. """
  200. Return a value from the test settings dict,
  201. or a given default,
  202. or a prefixed entry from the main settings dict
  203. """
  204. settings_dict = self.connection.settings_dict
  205. val = settings_dict['TEST'].get(key, default)
  206. if val is None:
  207. val = TEST_DATABASE_PREFIX + settings_dict[prefixed]
  208. return val
  209. def _test_database_name(self):
  210. return self._test_settings_get('NAME', prefixed='NAME')
  211. def _test_database_create(self):
  212. return self._test_settings_get('CREATE_DB', default=True)
  213. def _test_user_create(self):
  214. return self._test_settings_get('CREATE_USER', default=True)
  215. def _test_database_user(self):
  216. return self._test_settings_get('USER', prefixed='USER')
  217. def _test_database_passwd(self):
  218. return self._test_settings_get('PASSWORD', default=PASSWORD)
  219. def _test_database_tblspace(self):
  220. return self._test_settings_get('TBLSPACE', prefixed='NAME')
  221. def _test_database_tblspace_tmp(self):
  222. settings_dict = self.connection.settings_dict
  223. return settings_dict['TEST'].get('TBLSPACE_TMP',
  224. TEST_DATABASE_PREFIX + settings_dict['NAME'] + '_temp')
  225. def _get_test_db_name(self):
  226. """
  227. We need to return the 'production' DB name to get the test DB creation
  228. machinery to work. This isn't a great deal in this case because DB
  229. names as handled by Django haven't real counterparts in Oracle.
  230. """
  231. return self.connection.settings_dict['NAME']
  232. def test_db_signature(self):
  233. settings_dict = self.connection.settings_dict
  234. return (
  235. settings_dict['HOST'],
  236. settings_dict['PORT'],
  237. settings_dict['ENGINE'],
  238. settings_dict['NAME'],
  239. self._test_database_user(),
  240. )