test_adbapi.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for twisted.enterprise.adbapi.
  5. """
  6. from twisted.trial import unittest
  7. import os
  8. import stat
  9. from twisted.enterprise.adbapi import ConnectionPool, ConnectionLost
  10. from twisted.enterprise.adbapi import Connection, Transaction
  11. from twisted.internet import reactor, defer, interfaces
  12. from twisted.python.failure import Failure
  13. from twisted.python.reflect import requireModule
  14. simple_table_schema = """
  15. CREATE TABLE simple (
  16. x integer
  17. )
  18. """
  19. class ADBAPITestBase(object):
  20. """
  21. Test the asynchronous DB-API code.
  22. """
  23. openfun_called = {}
  24. if interfaces.IReactorThreads(reactor, None) is None:
  25. skip = "ADB-API requires threads, no way to test without them"
  26. def extraSetUp(self):
  27. """
  28. Set up the database and create a connection pool pointing at it.
  29. """
  30. self.startDB()
  31. self.dbpool = self.makePool(cp_openfun=self.openfun)
  32. self.dbpool.start()
  33. def tearDown(self):
  34. d = self.dbpool.runOperation('DROP TABLE simple')
  35. d.addCallback(lambda res: self.dbpool.close())
  36. d.addCallback(lambda res: self.stopDB())
  37. return d
  38. def openfun(self, conn):
  39. self.openfun_called[conn] = True
  40. def checkOpenfunCalled(self, conn=None):
  41. if not conn:
  42. self.assertTrue(self.openfun_called)
  43. else:
  44. self.assertIn(conn, self.openfun_called)
  45. def test_pool(self):
  46. d = self.dbpool.runOperation(simple_table_schema)
  47. if self.test_failures:
  48. d.addCallback(self._testPool_1_1)
  49. d.addCallback(self._testPool_1_2)
  50. d.addCallback(self._testPool_1_3)
  51. d.addCallback(self._testPool_1_4)
  52. d.addCallback(lambda res: self.flushLoggedErrors())
  53. d.addCallback(self._testPool_2)
  54. d.addCallback(self._testPool_3)
  55. d.addCallback(self._testPool_4)
  56. d.addCallback(self._testPool_5)
  57. d.addCallback(self._testPool_6)
  58. d.addCallback(self._testPool_7)
  59. d.addCallback(self._testPool_8)
  60. d.addCallback(self._testPool_9)
  61. return d
  62. def _testPool_1_1(self, res):
  63. d = defer.maybeDeferred(self.dbpool.runQuery, "select * from NOTABLE")
  64. d.addCallbacks(lambda res: self.fail('no exception'),
  65. lambda f: None)
  66. return d
  67. def _testPool_1_2(self, res):
  68. d = defer.maybeDeferred(self.dbpool.runOperation,
  69. "deletexxx from NOTABLE")
  70. d.addCallbacks(lambda res: self.fail('no exception'),
  71. lambda f: None)
  72. return d
  73. def _testPool_1_3(self, res):
  74. d = defer.maybeDeferred(self.dbpool.runInteraction,
  75. self.bad_interaction)
  76. d.addCallbacks(lambda res: self.fail('no exception'),
  77. lambda f: None)
  78. return d
  79. def _testPool_1_4(self, res):
  80. d = defer.maybeDeferred(self.dbpool.runWithConnection,
  81. self.bad_withConnection)
  82. d.addCallbacks(lambda res: self.fail('no exception'),
  83. lambda f: None)
  84. return d
  85. def _testPool_2(self, res):
  86. # verify simple table is empty
  87. sql = "select count(1) from simple"
  88. d = self.dbpool.runQuery(sql)
  89. def _check(row):
  90. self.assertTrue(int(row[0][0]) == 0, "Interaction not rolled back")
  91. self.checkOpenfunCalled()
  92. d.addCallback(_check)
  93. return d
  94. def _testPool_3(self, res):
  95. sql = "select count(1) from simple"
  96. inserts = []
  97. # add some rows to simple table (runOperation)
  98. for i in range(self.num_iterations):
  99. sql = "insert into simple(x) values(%d)" % i
  100. inserts.append(self.dbpool.runOperation(sql))
  101. d = defer.gatherResults(inserts)
  102. def _select(res):
  103. # make sure they were added (runQuery)
  104. sql = "select x from simple order by x";
  105. d = self.dbpool.runQuery(sql)
  106. return d
  107. d.addCallback(_select)
  108. def _check(rows):
  109. self.assertTrue(len(rows) == self.num_iterations,
  110. "Wrong number of rows")
  111. for i in range(self.num_iterations):
  112. self.assertTrue(len(rows[i]) == 1, "Wrong size row")
  113. self.assertTrue(rows[i][0] == i, "Values not returned.")
  114. d.addCallback(_check)
  115. return d
  116. def _testPool_4(self, res):
  117. # runInteraction
  118. d = self.dbpool.runInteraction(self.interaction)
  119. d.addCallback(lambda res: self.assertEqual(res, "done"))
  120. return d
  121. def _testPool_5(self, res):
  122. # withConnection
  123. d = self.dbpool.runWithConnection(self.withConnection)
  124. d.addCallback(lambda res: self.assertEqual(res, "done"))
  125. return d
  126. def _testPool_6(self, res):
  127. # Test a withConnection cannot be closed
  128. d = self.dbpool.runWithConnection(self.close_withConnection)
  129. return d
  130. def _testPool_7(self, res):
  131. # give the pool a workout
  132. ds = []
  133. for i in range(self.num_iterations):
  134. sql = "select x from simple where x = %d" % i
  135. ds.append(self.dbpool.runQuery(sql))
  136. dlist = defer.DeferredList(ds, fireOnOneErrback=True)
  137. def _check(result):
  138. for i in range(self.num_iterations):
  139. self.assertTrue(result[i][1][0][0] == i, "Value not returned")
  140. dlist.addCallback(_check)
  141. return dlist
  142. def _testPool_8(self, res):
  143. # now delete everything
  144. ds = []
  145. for i in range(self.num_iterations):
  146. sql = "delete from simple where x = %d" % i
  147. ds.append(self.dbpool.runOperation(sql))
  148. dlist = defer.DeferredList(ds, fireOnOneErrback=True)
  149. return dlist
  150. def _testPool_9(self, res):
  151. # verify simple table is empty
  152. sql = "select count(1) from simple"
  153. d = self.dbpool.runQuery(sql)
  154. def _check(row):
  155. self.assertTrue(int(row[0][0]) == 0,
  156. "Didn't successfully delete table contents")
  157. self.checkConnect()
  158. d.addCallback(_check)
  159. return d
  160. def checkConnect(self):
  161. """Check the connect/disconnect synchronous calls."""
  162. conn = self.dbpool.connect()
  163. self.checkOpenfunCalled(conn)
  164. curs = conn.cursor()
  165. curs.execute("insert into simple(x) values(1)")
  166. curs.execute("select x from simple")
  167. res = curs.fetchall()
  168. self.assertEqual(len(res), 1)
  169. self.assertEqual(len(res[0]), 1)
  170. self.assertEqual(res[0][0], 1)
  171. curs.execute("delete from simple")
  172. curs.execute("select x from simple")
  173. self.assertEqual(len(curs.fetchall()), 0)
  174. curs.close()
  175. self.dbpool.disconnect(conn)
  176. def interaction(self, transaction):
  177. transaction.execute("select x from simple order by x")
  178. for i in range(self.num_iterations):
  179. row = transaction.fetchone()
  180. self.assertTrue(len(row) == 1, "Wrong size row")
  181. self.assertTrue(row[0] == i, "Value not returned.")
  182. self.assertIsNone(transaction.fetchone(), "Too many rows")
  183. return "done"
  184. def bad_interaction(self, transaction):
  185. if self.can_rollback:
  186. transaction.execute("insert into simple(x) values(0)")
  187. transaction.execute("select * from NOTABLE")
  188. def withConnection(self, conn):
  189. curs = conn.cursor()
  190. try:
  191. curs.execute("select x from simple order by x")
  192. for i in range(self.num_iterations):
  193. row = curs.fetchone()
  194. self.assertTrue(len(row) == 1, "Wrong size row")
  195. self.assertTrue(row[0] == i, "Value not returned.")
  196. finally:
  197. curs.close()
  198. return "done"
  199. def close_withConnection(self, conn):
  200. conn.close()
  201. def bad_withConnection(self, conn):
  202. curs = conn.cursor()
  203. try:
  204. curs.execute("select * from NOTABLE")
  205. finally:
  206. curs.close()
  207. class ReconnectTestBase(object):
  208. """
  209. Test the asynchronous DB-API code with reconnect.
  210. """
  211. if interfaces.IReactorThreads(reactor, None) is None:
  212. skip = "ADB-API requires threads, no way to test without them"
  213. def extraSetUp(self):
  214. """
  215. Skip the test if C{good_sql} is unavailable. Otherwise, set up the
  216. database, create a connection pool pointed at it, and set up a simple
  217. schema in it.
  218. """
  219. if self.good_sql is None:
  220. raise unittest.SkipTest('no good sql for reconnect test')
  221. self.startDB()
  222. self.dbpool = self.makePool(cp_max=1, cp_reconnect=True,
  223. cp_good_sql=self.good_sql)
  224. self.dbpool.start()
  225. return self.dbpool.runOperation(simple_table_schema)
  226. def tearDown(self):
  227. d = self.dbpool.runOperation('DROP TABLE simple')
  228. d.addCallback(lambda res: self.dbpool.close())
  229. d.addCallback(lambda res: self.stopDB())
  230. return d
  231. def test_pool(self):
  232. d = defer.succeed(None)
  233. d.addCallback(self._testPool_1)
  234. d.addCallback(self._testPool_2)
  235. if not self.early_reconnect:
  236. d.addCallback(self._testPool_3)
  237. d.addCallback(self._testPool_4)
  238. d.addCallback(self._testPool_5)
  239. return d
  240. def _testPool_1(self, res):
  241. sql = "select count(1) from simple"
  242. d = self.dbpool.runQuery(sql)
  243. def _check(row):
  244. self.assertTrue(int(row[0][0]) == 0, "Table not empty")
  245. d.addCallback(_check)
  246. return d
  247. def _testPool_2(self, res):
  248. # reach in and close the connection manually
  249. list(self.dbpool.connections.values())[0].close()
  250. def _testPool_3(self, res):
  251. sql = "select count(1) from simple"
  252. d = defer.maybeDeferred(self.dbpool.runQuery, sql)
  253. d.addCallbacks(lambda res: self.fail('no exception'),
  254. lambda f: None)
  255. return d
  256. def _testPool_4(self, res):
  257. sql = "select count(1) from simple"
  258. d = self.dbpool.runQuery(sql)
  259. def _check(row):
  260. self.assertTrue(int(row[0][0]) == 0, "Table not empty")
  261. d.addCallback(_check)
  262. return d
  263. def _testPool_5(self, res):
  264. self.flushLoggedErrors()
  265. sql = "select * from NOTABLE" # bad sql
  266. d = defer.maybeDeferred(self.dbpool.runQuery, sql)
  267. d.addCallbacks(lambda res: self.fail('no exception'),
  268. lambda f: self.assertFalse(f.check(ConnectionLost)))
  269. return d
  270. class DBTestConnector(object):
  271. """
  272. A class which knows how to test for the presence of
  273. and establish a connection to a relational database.
  274. To enable test cases which use a central, system database,
  275. you must create a database named DB_NAME with a user DB_USER
  276. and password DB_PASS with full access rights to database DB_NAME.
  277. """
  278. TEST_PREFIX = None # used for creating new test cases
  279. DB_NAME = "twisted_test"
  280. DB_USER = 'twisted_test'
  281. DB_PASS = 'twisted_test'
  282. DB_DIR = None # directory for database storage
  283. nulls_ok = True # nulls supported
  284. trailing_spaces_ok = True # trailing spaces in strings preserved
  285. can_rollback = True # rollback supported
  286. test_failures = True # test bad sql?
  287. escape_slashes = True # escape \ in sql?
  288. good_sql = ConnectionPool.good_sql
  289. early_reconnect = True # cursor() will fail on closed connection
  290. can_clear = True # can try to clear out tables when starting
  291. num_iterations = 50 # number of iterations for test loops
  292. # (lower this for slow db's)
  293. def setUp(self):
  294. self.DB_DIR = self.mktemp()
  295. os.mkdir(self.DB_DIR)
  296. if not self.can_connect():
  297. raise unittest.SkipTest('%s: Cannot access db' % self.TEST_PREFIX)
  298. return self.extraSetUp()
  299. def can_connect(self):
  300. """Return true if this database is present on the system
  301. and can be used in a test."""
  302. raise NotImplementedError()
  303. def startDB(self):
  304. """Take any steps needed to bring database up."""
  305. pass
  306. def stopDB(self):
  307. """Bring database down, if needed."""
  308. pass
  309. def makePool(self, **newkw):
  310. """Create a connection pool with additional keyword arguments."""
  311. args, kw = self.getPoolArgs()
  312. kw = kw.copy()
  313. kw.update(newkw)
  314. return ConnectionPool(*args, **kw)
  315. def getPoolArgs(self):
  316. """Return a tuple (args, kw) of list and keyword arguments
  317. that need to be passed to ConnectionPool to create a connection
  318. to this database."""
  319. raise NotImplementedError()
  320. class SQLite3Connector(DBTestConnector):
  321. """
  322. Connector that uses the stdlib SQLite3 database support.
  323. """
  324. TEST_PREFIX = 'SQLite3'
  325. escape_slashes = False
  326. num_iterations = 1 # slow
  327. def can_connect(self):
  328. if requireModule('sqlite3') is None:
  329. return False
  330. else:
  331. return True
  332. def startDB(self):
  333. self.database = os.path.join(self.DB_DIR, self.DB_NAME)
  334. if os.path.exists(self.database):
  335. os.unlink(self.database)
  336. def getPoolArgs(self):
  337. args = ('sqlite3',)
  338. kw = {'database': self.database,
  339. 'cp_max': 1,
  340. 'check_same_thread': False}
  341. return args, kw
  342. class PySQLite2Connector(DBTestConnector):
  343. """
  344. Connector that uses pysqlite's SQLite database support.
  345. """
  346. TEST_PREFIX = 'pysqlite2'
  347. escape_slashes = False
  348. num_iterations = 1 # slow
  349. def can_connect(self):
  350. if requireModule('pysqlite2.dbapi2') is None:
  351. return False
  352. else:
  353. return True
  354. def startDB(self):
  355. self.database = os.path.join(self.DB_DIR, self.DB_NAME)
  356. if os.path.exists(self.database):
  357. os.unlink(self.database)
  358. def getPoolArgs(self):
  359. args = ('pysqlite2.dbapi2',)
  360. kw = {'database': self.database,
  361. 'cp_max': 1,
  362. 'check_same_thread': False}
  363. return args, kw
  364. class PyPgSQLConnector(DBTestConnector):
  365. TEST_PREFIX = "PyPgSQL"
  366. def can_connect(self):
  367. try: from pyPgSQL import PgSQL
  368. except: return False
  369. try:
  370. conn = PgSQL.connect(database=self.DB_NAME, user=self.DB_USER,
  371. password=self.DB_PASS)
  372. conn.close()
  373. return True
  374. except:
  375. return False
  376. def getPoolArgs(self):
  377. args = ('pyPgSQL.PgSQL',)
  378. kw = {'database': self.DB_NAME, 'user': self.DB_USER,
  379. 'password': self.DB_PASS, 'cp_min': 0}
  380. return args, kw
  381. class PsycopgConnector(DBTestConnector):
  382. TEST_PREFIX = 'Psycopg'
  383. def can_connect(self):
  384. try: import psycopg
  385. except: return False
  386. try:
  387. conn = psycopg.connect(database=self.DB_NAME, user=self.DB_USER,
  388. password=self.DB_PASS)
  389. conn.close()
  390. return True
  391. except:
  392. return False
  393. def getPoolArgs(self):
  394. args = ('psycopg',)
  395. kw = {'database': self.DB_NAME, 'user': self.DB_USER,
  396. 'password': self.DB_PASS, 'cp_min': 0}
  397. return args, kw
  398. class MySQLConnector(DBTestConnector):
  399. TEST_PREFIX = 'MySQL'
  400. trailing_spaces_ok = False
  401. can_rollback = False
  402. early_reconnect = False
  403. def can_connect(self):
  404. try: import MySQLdb
  405. except: return False
  406. try:
  407. conn = MySQLdb.connect(db=self.DB_NAME, user=self.DB_USER,
  408. passwd=self.DB_PASS)
  409. conn.close()
  410. return True
  411. except:
  412. return False
  413. def getPoolArgs(self):
  414. args = ('MySQLdb',)
  415. kw = {'db': self.DB_NAME, 'user': self.DB_USER, 'passwd': self.DB_PASS}
  416. return args, kw
  417. class FirebirdConnector(DBTestConnector):
  418. TEST_PREFIX = 'Firebird'
  419. test_failures = False # failure testing causes problems
  420. escape_slashes = False
  421. good_sql = None # firebird doesn't handle failed sql well
  422. can_clear = False # firebird is not so good
  423. num_iterations = 5 # slow
  424. def can_connect(self):
  425. if requireModule('kinterbasdb') is None:
  426. return False
  427. try:
  428. self.startDB()
  429. self.stopDB()
  430. return True
  431. except:
  432. return False
  433. def startDB(self):
  434. import kinterbasdb
  435. self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
  436. os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
  437. sql = 'create database "%s" user "%s" password "%s"'
  438. sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
  439. conn = kinterbasdb.create_database(sql)
  440. conn.close()
  441. def getPoolArgs(self):
  442. args = ('kinterbasdb',)
  443. kw = {'database': self.DB_NAME, 'host': '127.0.0.1',
  444. 'user': self.DB_USER, 'password': self.DB_PASS}
  445. return args, kw
  446. def stopDB(self):
  447. import kinterbasdb
  448. conn = kinterbasdb.connect(database=self.DB_NAME,
  449. host='127.0.0.1', user=self.DB_USER,
  450. password=self.DB_PASS)
  451. conn.drop_database()
  452. def makeSQLTests(base, suffix, globals):
  453. """
  454. Make a test case for every db connector which can connect.
  455. @param base: Base class for test case. Additional base classes
  456. will be a DBConnector subclass and unittest.TestCase
  457. @param suffix: A suffix used to create test case names. Prefixes
  458. are defined in the DBConnector subclasses.
  459. """
  460. connectors = [PySQLite2Connector, SQLite3Connector, PyPgSQLConnector,
  461. PsycopgConnector, MySQLConnector, FirebirdConnector]
  462. tests = {}
  463. for connclass in connectors:
  464. name = connclass.TEST_PREFIX + suffix
  465. class testcase(connclass, base, unittest.TestCase):
  466. __module__ = connclass.__module__
  467. testcase.__name__ = name
  468. if hasattr(connclass, "__qualname__"):
  469. testcase.__qualname__ = ".".join(
  470. connclass.__qualname__.split()[0:-1] + [name])
  471. tests[name] = testcase
  472. globals.update(tests)
  473. # PySQLite2Connector SQLite3ADBAPITests PyPgSQLADBAPITests
  474. # PsycopgADBAPITests MySQLADBAPITests FirebirdADBAPITests
  475. makeSQLTests(ADBAPITestBase, 'ADBAPITests', globals())
  476. # PySQLite2Connector SQLite3ReconnectTests PyPgSQLReconnectTests
  477. # PsycopgReconnectTests MySQLReconnectTests FirebirdReconnectTests
  478. makeSQLTests(ReconnectTestBase, 'ReconnectTests', globals())
  479. class FakePool(object):
  480. """
  481. A fake L{ConnectionPool} for tests.
  482. @ivar connectionFactory: factory for making connections returned by the
  483. C{connect} method.
  484. @type connectionFactory: any callable
  485. """
  486. reconnect = True
  487. noisy = True
  488. def __init__(self, connectionFactory):
  489. self.connectionFactory = connectionFactory
  490. def connect(self):
  491. """
  492. Return an instance of C{self.connectionFactory}.
  493. """
  494. return self.connectionFactory()
  495. def disconnect(self, connection):
  496. """
  497. Do nothing.
  498. """
  499. class ConnectionTests(unittest.TestCase):
  500. """
  501. Tests for the L{Connection} class.
  502. """
  503. def test_rollbackErrorLogged(self):
  504. """
  505. If an error happens during rollback, L{ConnectionLost} is raised but
  506. the original error is logged.
  507. """
  508. class ConnectionRollbackRaise(object):
  509. def rollback(self):
  510. raise RuntimeError("problem!")
  511. pool = FakePool(ConnectionRollbackRaise)
  512. connection = Connection(pool)
  513. self.assertRaises(ConnectionLost, connection.rollback)
  514. errors = self.flushLoggedErrors(RuntimeError)
  515. self.assertEqual(len(errors), 1)
  516. self.assertEqual(errors[0].value.args[0], "problem!")
  517. class TransactionTests(unittest.TestCase):
  518. """
  519. Tests for the L{Transaction} class.
  520. """
  521. def test_reopenLogErrorIfReconnect(self):
  522. """
  523. If the cursor creation raises an error in L{Transaction.reopen}, it
  524. reconnects but log the error occurred.
  525. """
  526. class ConnectionCursorRaise(object):
  527. count = 0
  528. def reconnect(self):
  529. pass
  530. def cursor(self):
  531. if self.count == 0:
  532. self.count += 1
  533. raise RuntimeError("problem!")
  534. pool = FakePool(None)
  535. transaction = Transaction(pool, ConnectionCursorRaise())
  536. transaction.reopen()
  537. errors = self.flushLoggedErrors(RuntimeError)
  538. self.assertEqual(len(errors), 1)
  539. self.assertEqual(errors[0].value.args[0], "problem!")
  540. class NonThreadPool(object):
  541. def callInThreadWithCallback(self, onResult, f, *a, **kw):
  542. success = True
  543. try:
  544. result = f(*a, **kw)
  545. except Exception:
  546. success = False
  547. result = Failure()
  548. onResult(success, result)
  549. class DummyConnectionPool(ConnectionPool):
  550. """
  551. A testable L{ConnectionPool};
  552. """
  553. threadpool = NonThreadPool()
  554. def __init__(self):
  555. """
  556. Don't forward init call.
  557. """
  558. self.reactor = reactor
  559. class EventReactor(object):
  560. """
  561. Partial L{IReactorCore} implementation with simple event-related
  562. methods.
  563. @ivar _running: A C{bool} indicating whether the reactor is pretending
  564. to have been started already or not.
  565. @ivar triggers: A C{list} of pending system event triggers.
  566. """
  567. def __init__(self, running):
  568. self._running = running
  569. self.triggers = []
  570. def callWhenRunning(self, function):
  571. if self._running:
  572. function()
  573. else:
  574. return self.addSystemEventTrigger('after', 'startup', function)
  575. def addSystemEventTrigger(self, phase, event, trigger):
  576. handle = (phase, event, trigger)
  577. self.triggers.append(handle)
  578. return handle
  579. def removeSystemEventTrigger(self, handle):
  580. self.triggers.remove(handle)
  581. class ConnectionPoolTests(unittest.TestCase):
  582. """
  583. Unit tests for L{ConnectionPool}.
  584. """
  585. def test_runWithConnectionRaiseOriginalError(self):
  586. """
  587. If rollback fails, L{ConnectionPool.runWithConnection} raises the
  588. original exception and log the error of the rollback.
  589. """
  590. class ConnectionRollbackRaise(object):
  591. def __init__(self, pool):
  592. pass
  593. def rollback(self):
  594. raise RuntimeError("problem!")
  595. def raisingFunction(connection):
  596. raise ValueError("foo")
  597. pool = DummyConnectionPool()
  598. pool.connectionFactory = ConnectionRollbackRaise
  599. d = pool.runWithConnection(raisingFunction)
  600. d = self.assertFailure(d, ValueError)
  601. def cbFailed(ignored):
  602. errors = self.flushLoggedErrors(RuntimeError)
  603. self.assertEqual(len(errors), 1)
  604. self.assertEqual(errors[0].value.args[0], "problem!")
  605. d.addCallback(cbFailed)
  606. return d
  607. def test_closeLogError(self):
  608. """
  609. L{ConnectionPool._close} logs exceptions.
  610. """
  611. class ConnectionCloseRaise(object):
  612. def close(self):
  613. raise RuntimeError("problem!")
  614. pool = DummyConnectionPool()
  615. pool._close(ConnectionCloseRaise())
  616. errors = self.flushLoggedErrors(RuntimeError)
  617. self.assertEqual(len(errors), 1)
  618. self.assertEqual(errors[0].value.args[0], "problem!")
  619. def test_runWithInteractionRaiseOriginalError(self):
  620. """
  621. If rollback fails, L{ConnectionPool.runInteraction} raises the
  622. original exception and log the error of the rollback.
  623. """
  624. class ConnectionRollbackRaise(object):
  625. def __init__(self, pool):
  626. pass
  627. def rollback(self):
  628. raise RuntimeError("problem!")
  629. class DummyTransaction(object):
  630. def __init__(self, pool, connection):
  631. pass
  632. def raisingFunction(transaction):
  633. raise ValueError("foo")
  634. pool = DummyConnectionPool()
  635. pool.connectionFactory = ConnectionRollbackRaise
  636. pool.transactionFactory = DummyTransaction
  637. d = pool.runInteraction(raisingFunction)
  638. d = self.assertFailure(d, ValueError)
  639. def cbFailed(ignored):
  640. errors = self.flushLoggedErrors(RuntimeError)
  641. self.assertEqual(len(errors), 1)
  642. self.assertEqual(errors[0].value.args[0], "problem!")
  643. d.addCallback(cbFailed)
  644. return d
  645. def test_unstartedClose(self):
  646. """
  647. If L{ConnectionPool.close} is called without L{ConnectionPool.start}
  648. having been called, the pool's startup event is cancelled.
  649. """
  650. reactor = EventReactor(False)
  651. pool = ConnectionPool('twisted.test.test_adbapi', cp_reactor=reactor)
  652. # There should be a startup trigger waiting.
  653. self.assertEqual(reactor.triggers, [('after', 'startup', pool._start)])
  654. pool.close()
  655. # But not anymore.
  656. self.assertFalse(reactor.triggers)
  657. def test_startedClose(self):
  658. """
  659. If L{ConnectionPool.close} is called after it has been started, but
  660. not by its shutdown trigger, the shutdown trigger is cancelled.
  661. """
  662. reactor = EventReactor(True)
  663. pool = ConnectionPool('twisted.test.test_adbapi', cp_reactor=reactor)
  664. # There should be a shutdown trigger waiting.
  665. self.assertEqual(reactor.triggers,
  666. [('during', 'shutdown', pool.finalClose)])
  667. pool.close()
  668. # But not anymore.
  669. self.assertFalse(reactor.triggers)