adodbapitest.py 55 KB


  1. """ Unit tests version 2.6.1.0 for adodbapi"""
  2. from __future__ import print_function
  3. """
  4. adodbapi - A python DB API 2.0 interface to Microsoft ADO
  5. Copyright (C) 2002 Henrik Ekelund
  6. This library is free software; you can redistribute it and/or
  7. modify it under the terms of the GNU Lesser General Public
  8. License as published by the Free Software Foundation; either
  9. version 2.1 of the License, or (at your option) any later version.
  10. This library is distributed in the hope that it will be useful,
  11. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. Lesser General Public License for more details.
  14. You should have received a copy of the GNU Lesser General Public
  15. License along with this library; if not, write to the Free Software
  16. Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  17. Updates by Vernon Cole
  18. """
  19. import unittest
  20. import sys
  21. import datetime
  22. import decimal
  23. import copy
  24. import random
  25. import string
  26. try:
  27. import win32com.client
  28. win32 = True
  29. except ImportError:
  30. win32 = False
  31. # run the configuration module.
  32. import adodbapitestconfig as config # will set sys.path to find correct version of adodbapi
  33. # in our code below, all our switches are from config.whatever
  34. import tryconnection
  35. import adodbapi
  36. import adodbapi.apibase as api
  37. try:
  38. import adodbapi.ado_consts as ado_consts
  39. except ImportError: #we are doing a shortcut import as a module -- so
  40. try:
  41. import ado_consts
  42. except ImportError:
  43. from adodbapi import ado_consts
  44. if sys.version_info >= (3,0):
  45. def str2bytes(sval):
  46. return sval.encode("latin1")
  47. unicode = str
  48. long = int
  49. else:
  50. def str2bytes(sval):
  51. if isinstance(sval, str):
  52. return sval
  53. return sval.encode("latin1")
  54. try:
  55. bytes
  56. except NameError:
  57. bytes = str
  58. def randomstring(length):
  59. return ''.join([random.choice(string.ascii_letters) for n in range(32)])
  60. class CommonDBTests(unittest.TestCase):
  61. "Self contained super-simple tests in easy syntax, should work on everything between mySQL and Oracle"
  62. def setUp(self):
  63. self.engine = 'unknown'
  64. def getEngine(self):
  65. return self.engine
  66. def getConnection(self):
  67. raise NotImplementedError #"This method must be overriden by a subclass"
  68. def getCursor(self):
  69. return self.getConnection().cursor()
  70. def testConnection(self):
  71. crsr=self.getCursor()
  72. assert crsr.__class__.__name__ == 'Cursor'
  73. def testErrorHandlerInherits(self):
  74. if not self.remote:
  75. conn=self.getConnection()
  76. mycallable=lambda connection,cursor,errorclass,errorvalue: 1
  77. conn.errorhandler=mycallable
  78. crsr=conn.cursor()
  79. assert crsr.errorhandler==mycallable,"Error handler on crsr should be same as on connection"
  80. def testDefaultErrorHandlerConnection(self):
  81. if not self.remote:
  82. conn=self.getConnection()
  83. del conn.messages[:]
  84. try:
  85. conn.close()
  86. conn.commit() #Should not be able to use connection after it is closed
  87. except:
  88. assert len(conn.messages)==1
  89. assert len(conn.messages[0])==2
  90. assert conn.messages[0][0]==api.ProgrammingError
  91. def testOwnErrorHandlerConnection(self):
  92. if self.remote: # ToDo: use "skip"
  93. return
  94. mycallable=lambda connection,cursor,errorclass,errorvalue: 1 #does not raise anything
  95. conn=self.getConnection()
  96. conn.errorhandler=mycallable
  97. conn.close()
  98. conn.commit() #Should not be able to use connection after it is closed
  99. assert len(conn.messages)==0
  100. conn.errorhandler=None #This should bring back the standard error handler
  101. try:
  102. conn.close()
  103. conn.commit() #Should not be able to use connection after it is closed
  104. except:
  105. pass
  106. #The Standard errorhandler appends error to messages attribute
  107. assert len(conn.messages)>0,"Setting errorhandler to none should bring back the standard error handler"
  108. def testDefaultErrorHandlerCursor(self):
  109. crsr=self.getConnection().cursor()
  110. if not self.remote:
  111. del crsr.messages[:]
  112. try:
  113. crsr.execute("SELECT abbtytddrf FROM dasdasd")
  114. except:
  115. assert len(crsr.messages)==1
  116. assert len(crsr.messages[0])==2
  117. assert crsr.messages[0][0]==api.DatabaseError
  118. def testOwnErrorHandlerCursor(self):
  119. if self.remote: # ToDo: should be a "skip"
  120. return
  121. mycallable=lambda connection,cursor,errorclass,errorvalue: 1 #does not raise anything
  122. crsr=self.getConnection().cursor()
  123. crsr.errorhandler=mycallable
  124. crsr.execute("SELECT abbtytddrf FROM dasdasd")
  125. assert len(crsr.messages)==0
  126. crsr.errorhandler=None #This should bring back the standard error handler
  127. try:
  128. crsr.execute("SELECT abbtytddrf FROM dasdasd")
  129. except:
  130. pass
  131. #The Standard errorhandler appends error to messages attribute
  132. assert len(crsr.messages)>0,"Setting errorhandler to none should bring back the standard error handler"
  133. def testUserDefinedConversions(self):
  134. if self.remote: ## Todo: should be a "skip"
  135. return
  136. try:
  137. duplicatingConverter=lambda aStringField: aStringField*2
  138. assert duplicatingConverter(u'gabba') == u'gabbagabba'
  139. self.helpForceDropOnTblTemp()
  140. conn=self.getConnection()
  141. # the variantConversions attribute should not exist on a normal connection object
  142. self.assertRaises(AttributeError, lambda x:conn.variantConversions[x],[2])
  143. if not self.remote:
  144. # create a variantConversions attribute on the connection
  145. conn.variantConversions = copy.copy(api.variantConversions)
  146. crsr=conn.cursor()
  147. tabdef = "CREATE TABLE xx_%s (fldData VARCHAR(100) NOT NULL, fld2 VARCHAR(20))" % config.tmp
  148. crsr.execute(tabdef)
  149. crsr.execute("INSERT INTO xx_%s(fldData,fld2) VALUES('gabba','booga')" % config.tmp)
  150. crsr.execute("INSERT INTO xx_%s(fldData,fld2) VALUES('hey','yo')" % config.tmp)
  151. # change converter for ALL adoStringTypes columns
  152. conn.variantConversions[api.adoStringTypes]=duplicatingConverter
  153. crsr.execute("SELECT fldData,fld2 FROM xx_%s ORDER BY fldData" % config.tmp)
  154. rows=crsr.fetchall()
  155. row = rows[0]
  156. self.assertEqual(row[0],'gabbagabba')
  157. row = rows[1]
  158. self.assertEqual(row[0],'heyhey')
  159. self.assertEqual(row[1],'yoyo')
  160. upcaseConverter=lambda aStringField: aStringField.upper()
  161. assert upcaseConverter(u'upThis') == u'UPTHIS'
  162. # now use a single column converter
  163. rows.converters[1] = upcaseConverter # convert second column
  164. self.assertEqual(row[0],'heyhey') # first will be unchanged
  165. self.assertEqual(row[1],'YO') # second will convert to upper case
  166. finally:
  167. try:
  168. del conn.variantConversions #Restore the default
  169. except: pass
  170. self.helpRollbackTblTemp()
  171. def testUserDefinedConversionForExactNumericTypes(self):
  172. # variantConversions is a dictionary of conversion functions
  173. # held internally in adodbapi.apibase
  174. #
  175. # !!! this test intentionally alters the value of what should be constant in the module
  176. # !!! no new code should use this example, to is only a test to see that the
  177. # !!! deprecated way of doing this still works. (use connection.variantConversions)
  178. #
  179. if not self.remote and sys.version_info < (3,0): ### Py3 need different test
  180. oldconverter = adodbapi.variantConversions[ado_consts.adNumeric] #keep old function to restore later
  181. # By default decimal and "numbers" are returned as decimals.
  182. # Instead, make numbers return as floats
  183. try:
  184. adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtFloat
  185. self.helpTestDataType("decimal(18,2)",'NUMBER',3.45,compareAlmostEqual=1)
  186. self.helpTestDataType("numeric(18,2)",'NUMBER',3.45,compareAlmostEqual=1)
  187. # now return strings
  188. adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtString
  189. self.helpTestDataType("numeric(18,2)",'NUMBER','3.45')
  190. # now a completly weird user defined convertion
  191. adodbapi.variantConversions[ado_consts.adNumeric] = lambda x: u'!!This function returns a funny unicode string %s!!'%x
  192. self.helpTestDataType("numeric(18,2)",'NUMBER','3.45',
  193. allowedReturnValues=[u'!!This function returns a funny unicode string 3.45!!'])
  194. finally:
  195. # now reset the converter to its original function
  196. adodbapi.variantConversions[ado_consts.adNumeric]=oldconverter #Restore the original convertion function
  197. def helpTestDataType(self,sqlDataTypeString,
  198. DBAPIDataTypeString,
  199. pyData,
  200. pyDataInputAlternatives=None,
  201. compareAlmostEqual=None,
  202. allowedReturnValues=None):
  203. self.helpForceDropOnTblTemp()
  204. conn=self.getConnection()
  205. crsr=conn.cursor()
  206. tabdef= """
  207. CREATE TABLE xx_%s (
  208. fldId integer NOT NULL,
  209. fldData """ % config.tmp + sqlDataTypeString + ")\n"
  210. crsr.execute(tabdef)
  211. #Test Null values mapped to None
  212. crsr.execute("INSERT INTO xx_%s (fldId) VALUES (1)" % config.tmp)
  213. crsr.execute("SELECT fldId,fldData FROM xx_%s" % config.tmp)
  214. rs=crsr.fetchone()
  215. self.assertEqual(rs[1],None) #Null should be mapped to None
  216. assert rs[0]==1
  217. #Test description related
  218. descTuple=crsr.description[1]
  219. assert descTuple[0] in ['fldData','flddata'], 'was "%s" expected "%s"'%(descTuple[0],'fldData')
  220. if DBAPIDataTypeString=='STRING':
  221. assert descTuple[1] == api.STRING, 'was "%s" expected "%s"'%(descTuple[1],api.STRING.values)
  222. elif DBAPIDataTypeString == 'NUMBER':
  223. assert descTuple[1] == api.NUMBER, 'was "%s" expected "%s"'%(descTuple[1],api.NUMBER.values)
  224. elif DBAPIDataTypeString == 'BINARY':
  225. assert descTuple[1] == api.BINARY, 'was "%s" expected "%s"'%(descTuple[1],api.BINARY.values)
  226. elif DBAPIDataTypeString == 'DATETIME':
  227. assert descTuple[1] == api.DATETIME, 'was "%s" expected "%s"'%(descTuple[1],api.DATETIME.values)
  228. elif DBAPIDataTypeString == 'ROWID':
  229. assert descTuple[1] == api.ROWID, 'was "%s" expected "%s"'%(descTuple[1],api.ROWID.values)
  230. elif DBAPIDataTypeString == 'UUID':
  231. assert descTuple[1] == api.OTHER, 'was "%s" expected "%s"'%(descTuple[1],api.OTHER.values)
  232. else:
  233. raise NotImplementedError #"DBAPIDataTypeString not provided"
  234. #Test data binding
  235. inputs=[pyData]
  236. if pyDataInputAlternatives:
  237. inputs.extend(pyDataInputAlternatives)
  238. if str is unicode:
  239. inputs = set(inputs) # removes redundant string==unicode tests
  240. fldId=1
  241. for inParam in inputs:
  242. fldId+=1
  243. try:
  244. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (?,?)" % config.tmp, (fldId, inParam))
  245. except:
  246. if self.remote:
  247. for message in crsr.messages:
  248. print(message)
  249. else:
  250. conn.printADOerrors()
  251. raise
  252. crsr.execute("SELECT fldData FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId])
  253. rs=crsr.fetchone()
  254. if allowedReturnValues:
  255. allowedTypes = tuple([type(aRV) for aRV in allowedReturnValues])
  256. assert isinstance(rs[0],allowedTypes), \
  257. 'result type "%s" must be one of %s'%(type(rs[0]),allowedTypes)
  258. else:
  259. assert isinstance(rs[0] ,type(pyData)), \
  260. 'result type "%s" must be instance of %s'%(type(rs[0]),type(pyData))
  261. if compareAlmostEqual and DBAPIDataTypeString == 'DATETIME':
  262. iso1=adodbapi.dateconverter.DateObjectToIsoFormatString(rs[0])
  263. iso2=adodbapi.dateconverter.DateObjectToIsoFormatString(pyData)
  264. self.assertEqual(iso1, iso2)
  265. elif compareAlmostEqual:
  266. s = float(pyData)
  267. v = float(rs[0])
  268. assert abs(v-s)/s < 0.00001, \
  269. "Values not almost equal recvd=%s, expected=%f" %(rs[0],s)
  270. else:
  271. if allowedReturnValues:
  272. ok=False
  273. self.assertTrue(rs[0] in allowedReturnValues,
  274. 'Value "%s" not in %s' % (repr(rs[0]), allowedReturnValues))
  275. else:
  276. self.assertEqual(rs[0], pyData,
  277. 'Values are not equal recvd="%s", expected="%s"' %(rs[0],pyData))
  278. def testDataTypeFloat(self):
  279. self.helpTestDataType("real",'NUMBER',3.45,compareAlmostEqual=True)
  280. self.helpTestDataType("float",'NUMBER',1.79e37,compareAlmostEqual=True)
  281. def testDataTypeDecmal(self):
  282. self.helpTestDataType("decimal(18,2)",'NUMBER',3.45,
  283. allowedReturnValues=[u'3.45',u'3,45',decimal.Decimal('3.45')])
  284. self.helpTestDataType("numeric(18,2)",'NUMBER',3.45,
  285. allowedReturnValues=[u'3.45',u'3,45',decimal.Decimal('3.45')])
  286. self.helpTestDataType("decimal(20,2)",'NUMBER',444444444444444444,
  287. allowedReturnValues=[u'444444444444444444.00', u'444444444444444444,00',
  288. decimal.Decimal('444444444444444444')])
  289. if self.getEngine() == 'MSSQL':
  290. self.helpTestDataType("uniqueidentifier",'UUID','{71A4F49E-39F3-42B1-A41E-48FF154996E6}',
  291. allowedReturnValues=[u'{71A4F49E-39F3-42B1-A41E-48FF154996E6}'])
  292. def testDataTypeMoney(self): #v2.1 Cole -- use decimal for money
  293. if self.getEngine() == 'MySQL':
  294. self.helpTestDataType("DECIMAL(20,4)",'NUMBER',decimal.Decimal('-922337203685477.5808'))
  295. elif self.getEngine() == 'PostgreSQL':
  296. self.helpTestDataType("money",'NUMBER',decimal.Decimal('-922337203685477.5808'),
  297. compareAlmostEqual=True,
  298. allowedReturnValues=[-922337203685477.5808,
  299. decimal.Decimal('-922337203685477.5808')])
  300. else:
  301. self.helpTestDataType("smallmoney",'NUMBER',decimal.Decimal('214748.02'))
  302. self.helpTestDataType("money",'NUMBER',decimal.Decimal('-922337203685477.5808'))
  303. def testDataTypeInt(self):
  304. if self.getEngine() != 'PostgreSQL':
  305. self.helpTestDataType("tinyint",'NUMBER',115)
  306. self.helpTestDataType("smallint",'NUMBER',-32768)
  307. if self.getEngine() not in ['ACCESS','PostgreSQL']:
  308. self.helpTestDataType("bit",'NUMBER',1) #Does not work correctly with access
  309. if self.getEngine() in ['MSSQL','PostgreSQL']:
  310. self.helpTestDataType("bigint",'NUMBER',3000000000,
  311. allowedReturnValues=[3000000000, long(3000000000)])
  312. self.helpTestDataType("int",'NUMBER',2147483647)
  313. def testDataTypeChar(self):
  314. for sqlDataType in ("char(6)","nchar(6)"):
  315. self.helpTestDataType(sqlDataType,'STRING',u'spam ',allowedReturnValues=[u'spam','spam',u'spam ','spam '])
  316. def testDataTypeVarChar(self):
  317. if self.getEngine() == 'MySQL':
  318. stringKinds = ["varchar(10)","text"]
  319. elif self.getEngine() == 'PostgreSQL':
  320. stringKinds = ["varchar(10)","text","character varying"]
  321. else:
  322. stringKinds = ["varchar(10)","nvarchar(10)","text","ntext"] #,"varchar(max)"]
  323. for sqlDataType in stringKinds:
  324. self.helpTestDataType(sqlDataType,'STRING',u'spam',['spam'])
  325. def testDataTypeDate(self):
  326. if self.getEngine() == 'PostgreSQL':
  327. dt = "timestamp"
  328. else:
  329. dt = "datetime"
  330. self.helpTestDataType(dt,'DATETIME',adodbapi.Date(2002,10,28),
  331. compareAlmostEqual=True)
  332. if self.getEngine() not in ['MySQL','PostgreSQL']:
  333. self.helpTestDataType("smalldatetime",'DATETIME',adodbapi.Date(2002,10,28),
  334. compareAlmostEqual=True)
  335. if tag != 'pythontime' and self.getEngine() not in ['MySQL','PostgreSQL']: # fails when using pythonTime
  336. self.helpTestDataType(dt,'DATETIME', adodbapi.Timestamp(2002,10,28,12,15,1),
  337. compareAlmostEqual=True)
  338. def testDataTypeBinary(self):
  339. binfld = str2bytes('\x07\x00\xE2\x40*')
  340. arv = [binfld, adodbapi.Binary(binfld), bytes(binfld)]
  341. if self.getEngine() == 'PostgreSQL':
  342. self.helpTestDataType("bytea",'BINARY',adodbapi.Binary(binfld),
  343. allowedReturnValues=arv)
  344. else:
  345. self.helpTestDataType("binary(5)",'BINARY',adodbapi.Binary(binfld),
  346. allowedReturnValues=arv)
  347. self.helpTestDataType("varbinary(100)",'BINARY',adodbapi.Binary(binfld),
  348. allowedReturnValues=arv)
  349. if self.getEngine() != 'MySQL':
  350. self.helpTestDataType("image",'BINARY',adodbapi.Binary(binfld),
  351. allowedReturnValues=arv)
  352. def helpRollbackTblTemp(self):
  353. self.helpForceDropOnTblTemp()
  354. def helpForceDropOnTblTemp(self):
  355. conn=self.getConnection()
  356. with conn.cursor() as crsr:
  357. try:
  358. crsr.execute("DROP TABLE xx_%s" % config.tmp)
  359. if not conn.autocommit:
  360. conn.commit()
  361. except:
  362. pass
  363. def helpCreateAndPopulateTableTemp(self,crsr):
  364. tabdef= """
  365. CREATE TABLE xx_%s (
  366. fldData INTEGER
  367. )
  368. """ % config.tmp
  369. try: #EAFP
  370. crsr.execute(tabdef)
  371. except api.DatabaseError: # was not dropped before
  372. self.helpForceDropOnTblTemp() # so drop it now
  373. crsr.execute(tabdef)
  374. for i in range(9): # note: this poor SQL code, but a valid test
  375. crsr.execute("INSERT INTO xx_%s (fldData) VALUES (%i)" % (config.tmp, i))
  376. # NOTE: building the test table without using parameter substitution
  377. def testFetchAll(self):
  378. crsr=self.getCursor()
  379. self.helpCreateAndPopulateTableTemp(crsr)
  380. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  381. rs=crsr.fetchall()
  382. assert len(rs)==9
  383. #test slice of rows
  384. i = 3
  385. for row in rs[3:-2]: #should have rowid 3..6
  386. assert row[0]==i
  387. i+=1
  388. self.helpRollbackTblTemp()
  389. def testPreparedStatement(self):
  390. crsr=self.getCursor()
  391. self.helpCreateAndPopulateTableTemp(crsr)
  392. crsr.prepare("SELECT fldData FROM xx_%s" % config.tmp)
  393. crsr.execute(crsr.command) # remembes the one that was prepared
  394. rs=crsr.fetchall()
  395. assert len(rs)==9
  396. assert rs[2][0]==2
  397. self.helpRollbackTblTemp()
  398. def testWrongPreparedStatement(self):
  399. crsr=self.getCursor()
  400. self.helpCreateAndPopulateTableTemp(crsr)
  401. crsr.prepare("SELECT * FROM nowhere")
  402. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) # should execute this one, not the prepared one
  403. rs=crsr.fetchall()
  404. assert len(rs)==9
  405. assert rs[2][0]==2
  406. self.helpRollbackTblTemp()
  407. def testIterator(self):
  408. crsr=self.getCursor()
  409. self.helpCreateAndPopulateTableTemp(crsr)
  410. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  411. for i,row in enumerate(crsr): # using cursor as an iterator, rather than fetchxxx
  412. assert row[0]==i
  413. self.helpRollbackTblTemp()
  414. def testExecuteMany(self):
  415. crsr=self.getCursor()
  416. self.helpCreateAndPopulateTableTemp(crsr)
  417. seq_of_values = [ (111,) , (222,) ]
  418. crsr.executemany("INSERT INTO xx_%s (fldData) VALUES (?)" % config.tmp, seq_of_values)
  419. if crsr.rowcount==-1:
  420. print(self.getEngine()+" Provider does not support rowcount (on .executemany())")
  421. else:
  422. self.assertEqual( crsr.rowcount,2)
  423. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  424. rs=crsr.fetchall()
  425. assert len(rs)==11
  426. self.helpRollbackTblTemp()
  427. def testRowCount(self):
  428. crsr=self.getCursor()
  429. self.helpCreateAndPopulateTableTemp(crsr)
  430. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  431. if crsr.rowcount == -1:
  432. #print("provider does not support rowcount on select")
  433. pass
  434. else:
  435. self.assertEqual( crsr.rowcount,9)
  436. self.helpRollbackTblTemp()
  437. def testRowCountNoRecordset(self):
  438. crsr=self.getCursor()
  439. self.helpCreateAndPopulateTableTemp(crsr)
  440. crsr.execute("DELETE FROM xx_%s WHERE fldData >= 5" % config.tmp)
  441. if crsr.rowcount==-1:
  442. print(self.getEngine()+" Provider does not support rowcount (on DELETE)")
  443. else:
  444. self.assertEqual( crsr.rowcount,4)
  445. self.helpRollbackTblTemp()
  446. def testFetchMany(self):
  447. crsr=self.getCursor()
  448. self.helpCreateAndPopulateTableTemp(crsr)
  449. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  450. rs=crsr.fetchmany(3)
  451. assert len(rs)==3
  452. rs=crsr.fetchmany(5)
  453. assert len(rs)==5
  454. rs=crsr.fetchmany(5)
  455. assert len(rs)==1 #Asked for five, but there is only one left
  456. self.helpRollbackTblTemp()
  457. def testFetchManyWithArraySize(self):
  458. crsr=self.getCursor()
  459. self.helpCreateAndPopulateTableTemp(crsr)
  460. crsr.execute("SELECT fldData FROM xx_%s" % config.tmp)
  461. rs=crsr.fetchmany()
  462. assert len(rs)==1 #arraysize Defaults to one
  463. crsr.arraysize=4
  464. rs=crsr.fetchmany()
  465. assert len(rs)==4
  466. rs=crsr.fetchmany()
  467. assert len(rs)==4
  468. rs=crsr.fetchmany()
  469. assert len(rs)==0
  470. self.helpRollbackTblTemp()
  471. def testErrorConnect(self):
  472. conn = self.getConnection()
  473. kw = {}
  474. if 'proxy_host' in conn.kwargs:
  475. kw['proxy_host'] = conn.kwargs['proxy_host']
  476. conn.close()
  477. self.assertRaises(api.DatabaseError, self.db, 'not a valid connect string', kw)
  478. def testRowIterator(self):
  479. self.helpForceDropOnTblTemp()
  480. conn=self.getConnection()
  481. crsr=conn.cursor()
  482. tabdef= """
  483. CREATE TABLE xx_%s (
  484. fldId integer NOT NULL,
  485. fldTwo integer,
  486. fldThree integer,
  487. fldFour integer)
  488. """ % config.tmp
  489. crsr.execute(tabdef)
  490. inputs = [(2,3,4),(102,103,104)]
  491. fldId=1
  492. for inParam in inputs:
  493. fldId+=1
  494. try:
  495. crsr.execute("INSERT INTO xx_%s (fldId,fldTwo,fldThree,fldFour) VALUES (?,?,?,?)" % config.tmp,
  496. (fldId,inParam[0],inParam[1],inParam[2]))
  497. except:
  498. if self.remote:
  499. for message in crsr.messages:
  500. print(message)
  501. else:
  502. conn.printADOerrors()
  503. raise
  504. crsr.execute("SELECT fldTwo,fldThree,fldFour FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId])
  505. rec = crsr.fetchone()
  506. # check that stepping through an emulated row works
  507. for j in range(len(inParam)):
  508. assert rec[j] == inParam[j], 'returned value:"%s" != test value:"%s"'%(rec[j],inParam[j])
  509. # check that we can get a complete tuple from a row
  510. assert tuple(rec) == inParam, 'returned value:"%s" != test value:"%s"'%(repr(rec),repr(inParam))
  511. # test that slices of rows work
  512. slice1 = tuple(rec[:-1])
  513. slice2 = tuple(inParam[0:2])
  514. assert slice1 == slice2, 'returned value:"%s" != test value:"%s"'%(repr(slice1),repr(slice2))
  515. # now test named column retrieval
  516. assert rec['fldTwo'] == inParam[0]
  517. assert rec.fldThree == inParam[1]
  518. assert rec.fldFour == inParam[2]
  519. # test array operation
  520. # note that the fields vv vv vv are out of order
  521. crsr.execute("select fldThree,fldFour,fldTwo from xx_%s" % config.tmp)
  522. recs = crsr.fetchall()
  523. assert recs[1][0] == 103
  524. assert recs[0][1] == 4
  525. assert recs[1]['fldFour'] == 104
  526. assert recs[0,0] == 3
  527. assert recs[0,'fldTwo'] == 2
  528. assert recs[1,2] == 102
  529. for i in range(1):
  530. for j in range(2):
  531. assert recs[i][j] == recs[i,j]
  532. def testFormatParamstyle(self):
  533. self.helpForceDropOnTblTemp()
  534. conn=self.getConnection()
  535. conn.paramstyle = 'format' #test nonstandard use of paramstyle
  536. crsr=conn.cursor()
  537. tabdef= """
  538. CREATE TABLE xx_%s (
  539. fldId integer NOT NULL,
  540. fldData varchar(10),
  541. fldConst varchar(30))
  542. """ % config.tmp
  543. crsr.execute(tabdef)
  544. inputs = [u'one',u'two',u'three']
  545. fldId=2
  546. for inParam in inputs:
  547. fldId+=1
  548. sql = "INSERT INTO xx_" + \
  549. config.tmp + \
  550. " (fldId,fldConst,fldData) VALUES (%s,'thi%s :may cause? trouble', %s)"
  551. try:
  552. crsr.execute(sql, (fldId,inParam))
  553. except:
  554. if self.remote:
  555. for message in crsr.messages:
  556. print(message)
  557. else:
  558. conn.printADOerrors()
  559. raise
  560. crsr.execute("SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE %s=fldID", [fldId])
  561. rec = crsr.fetchone()
  562. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"' % (rec[0],inParam))
  563. self.assertEqual(rec[1], u"thi%s :may cause? trouble")
  564. # now try an operation with a "%s" as part of a literal
  565. sel = "insert into xx_" + config.tmp + " (fldId,fldData) VALUES (%s,'four%sfive')"
  566. params = (20,)
  567. crsr.execute(sel,params)
  568. #test the .query implementation
  569. assert '(?,' in crsr.query, 'expected:"%s" in "%s"'%('(?,',crsr.query)
  570. #test the .command attribute
  571. assert crsr.command == sel, 'expected:"%s" but found "%s"' % (sel, crsr.command)
  572. #test the .parameters attribute
  573. if not self.remote: # parameter list will be altered in transit
  574. self.assertEqual(crsr.parameters, params)
  575. #now make sure the data made it
  576. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=20" % config.tmp)
  577. rec = crsr.fetchone()
  578. self.assertEqual(rec[0], 'four%sfive')
  579. def testNamedParamstyle(self):
  580. self.helpForceDropOnTblTemp()
  581. conn=self.getConnection()
  582. crsr=conn.cursor()
  583. crsr.paramstyle = 'named' #test nonstandard use of paramstyle
  584. tabdef= """
  585. CREATE TABLE xx_%s (
  586. fldId integer NOT NULL,
  587. fldData varchar(10))
  588. """ % config.tmp
  589. crsr.execute(tabdef)
  590. inputs = [u'four',u'five',u'six']
  591. fldId=10
  592. for inParam in inputs:
  593. fldId+=1
  594. try:
  595. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" % config.tmp,
  596. {"f_Val":inParam,'Id':fldId})
  597. except:
  598. if self.remote:
  599. for message in crsr.messages:
  600. print(message)
  601. else:
  602. conn.printADOerrors()
  603. raise
  604. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=:Id" % config.tmp, {'Id':fldId})
  605. rec = crsr.fetchone()
  606. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  607. # now a test with a ":" as part of a literal
  608. crsr.execute("insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp,{'xyz':30})
  609. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
  610. rec = crsr.fetchone()
  611. self.assertEqual(rec[0], 'six:five')
  612. def testPyformatParamstyle(self):
  613. self.helpForceDropOnTblTemp()
  614. conn=self.getConnection()
  615. crsr=conn.cursor()
  616. crsr.paramstyle = 'pyformat' #test nonstandard use of paramstyle
  617. tabdef= """
  618. CREATE TABLE xx_%s (
  619. fldId integer NOT NULL,
  620. fldData varchar(10))
  621. """ % config.tmp
  622. crsr.execute(tabdef)
  623. inputs = [u'four', u'five', u'six']
  624. fldId=10
  625. for inParam in inputs:
  626. fldId+=1
  627. try:
  628. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (%%(Id)s,%%(f_Val)s)" % config.tmp,
  629. {"f_Val": inParam, 'Id': fldId})
  630. except:
  631. if self.remote:
  632. for message in crsr.messages:
  633. print(message)
  634. else:
  635. conn.printADOerrors()
  636. raise
  637. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=%%(Id)s" % config.tmp, {'Id':fldId})
  638. rec = crsr.fetchone()
  639. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  640. # now a test with a "%" as part of a literal
  641. crsr.execute("insert into xx_%s (fldId,fldData) VALUES (%%(xyz)s,'six%%five')" % config.tmp,{'xyz': 30})
  642. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
  643. rec = crsr.fetchone()
  644. self.assertEqual(rec[0], 'six%five')
  645. def testAutomaticParamstyle(self):
  646. self.helpForceDropOnTblTemp()
  647. conn=self.getConnection()
  648. conn.paramstyle = 'dynamic' #test nonstandard use of paramstyle
  649. crsr=conn.cursor()
  650. tabdef= """
  651. CREATE TABLE xx_%s (
  652. fldId integer NOT NULL,
  653. fldData varchar(10),
  654. fldConst varchar(30))
  655. """ % config.tmp
  656. crsr.execute(tabdef)
  657. inputs = [u'one', u'two', u'three']
  658. fldId=2
  659. for inParam in inputs:
  660. fldId+=1
  661. try:
  662. crsr.execute("INSERT INTO xx_" + config.tmp + \
  663. " (fldId,fldConst,fldData) VALUES (?,'thi%s :may cause? troub:1e', ?)", (fldId,inParam))
  664. except:
  665. if self.remote:
  666. for message in crsr.messages:
  667. print(message)
  668. else:
  669. conn.printADOerrors()
  670. raise
  671. trouble = 'thi%s :may cause? troub:1e'
  672. crsr.execute("SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE ?=fldID", [fldId])
  673. rec = crsr.fetchone()
  674. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  675. self.assertEqual(rec[1], trouble)
  676. # inputs = [u'four',u'five',u'six']
  677. fldId=10
  678. for inParam in inputs:
  679. fldId+=1
  680. try:
  681. crsr.execute("INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" % config.tmp,
  682. {"f_Val":inParam,'Id':fldId})
  683. except:
  684. if self.remote:
  685. for message in crsr.messages:
  686. print(message)
  687. else:
  688. conn.printADOerrors()
  689. raise
  690. crsr.execute("SELECT fldData FROM xx_%s WHERE :Id=fldID" % config.tmp, {'Id':fldId})
  691. rec = crsr.fetchone()
  692. self.assertEqual(rec[0], inParam, 'returned value:"%s" != test value:"%s"'%(rec[0],inParam))
  693. # now a test with a ":" as part of a literal -- and use a prepared query
  694. ppdcmd = "insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp
  695. crsr.prepare(ppdcmd)
  696. crsr.execute(ppdcmd, {'xyz':30})
  697. crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp)
  698. rec = crsr.fetchone()
  699. self.assertEqual(rec[0], 'six:five')
  700. def testRollBack(self):
  701. conn = self.getConnection()
  702. crsr = conn.cursor()
  703. assert not crsr.connection.autocommit, 'Unexpected beginning condition'
  704. self.helpCreateAndPopulateTableTemp(crsr)
  705. crsr.connection.commit() # commit the first bunch
  706. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  707. selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
  708. crsr.execute(selectSql)
  709. rs = crsr.fetchall()
  710. assert len(rs) == 1
  711. self.conn.rollback()
  712. crsr.execute(selectSql)
  713. assert crsr.fetchone() == None, 'cursor.fetchone should return None if a query retrieves no rows'
  714. crsr.execute('SELECT fldData from xx_%s' % config.tmp)
  715. rs = crsr.fetchall()
  716. assert len(rs) == 9, 'the original records should still be present'
  717. self.helpRollbackTblTemp()
  718. def testCommit(self):
  719. try:
  720. con2 = self.getAnotherConnection()
  721. except NotImplementedError:
  722. return # should be "SKIP" for ACCESS
  723. assert not con2.autocommit, 'default should be manual commit'
  724. crsr = con2.cursor()
  725. self.helpCreateAndPopulateTableTemp(crsr)
  726. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  727. con2.commit()
  728. selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
  729. crsr.execute(selectSql)
  730. rs = crsr.fetchall()
  731. assert len(rs) == 1
  732. crsr.close()
  733. con2.close()
  734. conn = self.getConnection()
  735. crsr = self.getCursor()
  736. with conn.cursor() as crsr:
  737. crsr.execute(selectSql)
  738. rs = crsr.fetchall()
  739. assert len(rs) == 1
  740. assert rs[0][0] == 100
  741. self.helpRollbackTblTemp()
  742. def testAutoRollback(self):
  743. try:
  744. con2 = self.getAnotherConnection()
  745. except NotImplementedError:
  746. return # should be "SKIP" for ACCESS
  747. assert not con2.autocommit, 'unexpected beginning condition'
  748. crsr = con2.cursor()
  749. self.helpCreateAndPopulateTableTemp(crsr)
  750. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  751. selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp
  752. crsr.execute(selectSql)
  753. rs = crsr.fetchall()
  754. assert len(rs) == 1
  755. crsr.close()
  756. con2.close()
  757. crsr = self.getCursor()
  758. try:
  759. crsr.execute(selectSql) # closing the connection should have forced rollback
  760. row = crsr.fetchone()
  761. except api.DatabaseError:
  762. row = None # if the entire table disappeared the rollback was perfect and the test passed
  763. assert row == None, 'cursor.fetchone should return None if a query retrieves no rows. Got %s' % repr(row)
  764. self.helpRollbackTblTemp()
  765. def testAutoCommit(self):
  766. try:
  767. ac_conn = self.getAnotherConnection({'autocommit': True})
  768. except NotImplementedError:
  769. return # should be "SKIP" for ACCESS
  770. crsr = ac_conn.cursor()
  771. self.helpCreateAndPopulateTableTemp(crsr)
  772. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  773. crsr.close()
  774. with self.getCursor() as crsr:
  775. selectSql = 'SELECT fldData from xx_%s' % config.tmp
  776. crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
  777. rs = crsr.fetchall()
  778. assert len(rs) == 10, 'all records should still be present'
  779. ac_conn.close()
  780. self.helpRollbackTblTemp()
  781. def testSwitchedAutoCommit(self):
  782. try:
  783. ac_conn = self.getAnotherConnection()
  784. except NotImplementedError:
  785. return # should be "SKIP" for ACCESS
  786. ac_conn.autocommit = True
  787. crsr = ac_conn.cursor()
  788. self.helpCreateAndPopulateTableTemp(crsr)
  789. crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp)
  790. crsr.close()
  791. conn = self.getConnection()
  792. ac_conn.close()
  793. with self.getCursor() as crsr:
  794. selectSql = 'SELECT fldData from xx_%s' % config.tmp
  795. crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
  796. rs = crsr.fetchall()
  797. assert len(rs) == 10, 'all records should still be present'
  798. self.helpRollbackTblTemp()
  799. def testExtendedTypeHandling(self):
  800. class XtendString(str):
  801. pass
  802. class XtendInt(int):
  803. pass
  804. class XtendFloat(float):
  805. pass
  806. xs = XtendString(randomstring(30))
  807. xi = XtendInt(random.randint(-100, 500))
  808. xf = XtendFloat(random.random())
  809. self.helpForceDropOnTblTemp()
  810. conn = self.getConnection()
  811. crsr = conn.cursor()
  812. tabdef = """
  813. CREATE TABLE xx_%s (
  814. s VARCHAR(40) NOT NULL,
  815. i INTEGER NOT NULL,
  816. f REAL NOT NULL)""" % config.tmp
  817. crsr.execute(tabdef)
  818. crsr.execute("INSERT INTO xx_%s (s, i, f) VALUES (?, ?, ?)" % config.tmp, (xs, xi, xf))
  819. crsr.close()
  820. conn = self.getConnection()
  821. with self.getCursor() as crsr:
  822. selectSql = 'SELECT s, i, f from xx_%s' % config.tmp
  823. crsr.execute(selectSql) # closing the connection should _not_ have forced rollback
  824. row = crsr.fetchone()
  825. self.assertEqual(row.s, xs)
  826. self.assertEqual(row.i, xi)
  827. self.assertAlmostEqual(row.f, xf)
  828. self.helpRollbackTblTemp()
  829. class TestADOwithSQLServer(CommonDBTests):
  830. def setUp(self):
  831. self.conn = config.dbSqlServerconnect(*config.connStrSQLServer[0], **config.connStrSQLServer[1])
  832. self.conn.timeout = 30 # turn timeout back up
  833. self.engine = 'MSSQL'
  834. self.db = config.dbSqlServerconnect
  835. self.remote = config.connStrSQLServer[2]
  836. def tearDown(self):
  837. try:
  838. self.conn.rollback()
  839. except:
  840. pass
  841. try:
  842. self.conn.close()
  843. except:
  844. pass
  845. self.conn=None
  846. def getConnection(self):
  847. return self.conn
  848. def getAnotherConnection(self, addkeys=None):
  849. keys = dict(config.connStrSQLServer[1])
  850. if addkeys:
  851. keys.update(addkeys)
  852. return config.dbSqlServerconnect(*config.connStrSQLServer[0], **keys)
  853. def testVariableReturningStoredProcedure(self):
  854. crsr=self.conn.cursor()
  855. spdef= """
  856. CREATE PROCEDURE sp_DeleteMeOnlyForTesting
  857. @theInput varchar(50),
  858. @theOtherInput varchar(50),
  859. @theOutput varchar(100) OUTPUT
  860. AS
  861. SET @theOutput=@theInput+@theOtherInput
  862. """
  863. try:
  864. crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
  865. self.conn.commit()
  866. except: #Make sure it is empty
  867. pass
  868. crsr.execute(spdef)
  869. retvalues=crsr.callproc('sp_DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
  870. assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
  871. assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
  872. assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2])
  873. self.conn.rollback()
  874. def testMultipleSetReturn(self):
  875. crsr=self.getCursor()
  876. self.helpCreateAndPopulateTableTemp(crsr)
  877. spdef= """
  878. CREATE PROCEDURE sp_DeleteMe_OnlyForTesting
  879. AS
  880. SELECT fldData FROM xx_%s ORDER BY fldData ASC
  881. SELECT fldData From xx_%s where fldData = -9999
  882. SELECT fldData FROM xx_%s ORDER BY fldData DESC
  883. """ % (config.tmp, config.tmp, config.tmp)
  884. try:
  885. crsr.execute("DROP PROCEDURE sp_DeleteMe_OnlyForTesting")
  886. self.conn.commit()
  887. except: #Make sure it is empty
  888. pass
  889. crsr.execute(spdef)
  890. retvalues=crsr.callproc('sp_DeleteMe_OnlyForTesting')
  891. row=crsr.fetchone()
  892. self.assertEqual(row[0], 0)
  893. assert crsr.nextset() == True, 'Operation should succeed'
  894. assert not crsr.fetchall(), 'Should be an empty second set'
  895. assert crsr.nextset() == True, 'third set should be present'
  896. rowdesc=crsr.fetchall()
  897. self.assertEqual(rowdesc[0][0],8)
  898. assert crsr.nextset() == None,'No more return sets, should return None'
  899. self.helpRollbackTblTemp()
  900. def testDatetimeProcedureParameter(self):
  901. crsr=self.conn.cursor()
  902. spdef= """
  903. CREATE PROCEDURE sp_DeleteMeOnlyForTesting
  904. @theInput DATETIME,
  905. @theOtherInput varchar(50),
  906. @theOutput varchar(100) OUTPUT
  907. AS
  908. SET @theOutput = CONVERT(CHARACTER(20), @theInput, 0) + @theOtherInput
  909. """
  910. try:
  911. crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
  912. self.conn.commit()
  913. except: #Make sure it is empty
  914. pass
  915. crsr.execute(spdef)
  916. result = crsr.callproc('sp_DeleteMeOnlyForTesting', [adodbapi.Timestamp(2014,12,25,0,1,0), 'Beep', ' ' * 30])
  917. assert result[2] == 'Dec 25 2014 12:01AM Beep', 'value was="%s"' % result[2]
  918. self.conn.rollback()
  919. def testIncorrectStoredProcedureParameter(self):
  920. crsr=self.conn.cursor()
  921. spdef= """
  922. CREATE PROCEDURE sp_DeleteMeOnlyForTesting
  923. @theInput DATETIME,
  924. @theOtherInput varchar(50),
  925. @theOutput varchar(100) OUTPUT
  926. AS
  927. SET @theOutput = CONVERT(CHARACTER(20), @theInput) + @theOtherInput
  928. """
  929. try:
  930. crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting")
  931. self.conn.commit()
  932. except: #Make sure it is empty
  933. pass
  934. crsr.execute(spdef)
  935. # calling the sproc with a string for the first parameter where a DateTime is expected
  936. result = tryconnection.try_operation_with_expected_exception(
  937. (api.DataError,api.DatabaseError),
  938. crsr.callproc,
  939. ['sp_DeleteMeOnlyForTesting'],
  940. {'parameters': ['this is wrong', 'Anne', 'not Alice']}
  941. )
  942. if result[0]: # the expected exception was raised
  943. assert '@theInput' in str(result[1]) or 'DatabaseError' in str(result), \
  944. 'Identifies the wrong erroneous parameter'
  945. else:
  946. assert result[0], result[1] # incorrect or no exception
  947. self.conn.rollback()
  948. class TestADOwithAccessDB(CommonDBTests):
  949. def setUp(self):
  950. self.conn = config.dbAccessconnect(*config.connStrAccess[0], **config.connStrAccess[1])
  951. self.conn.timeout = 30 # turn timeout back up
  952. self.engine = 'ACCESS'
  953. self.db = config.dbAccessconnect
  954. self.remote = config.connStrAccess[2]
  955. def tearDown(self):
  956. try:
  957. self.conn.rollback()
  958. except:
  959. pass
  960. try:
  961. self.conn.close()
  962. except:
  963. pass
  964. self.conn=None
  965. def getConnection(self):
  966. return self.conn
  967. def getAnotherConnection(self, addkeys=None):
  968. raise NotImplementedError('Jet cannot use a second connection to the database')
  969. def testOkConnect(self):
  970. c = self.db(*config.connStrAccess[0], **config.connStrAccess[1])
  971. assert c != None
  972. c.close()
  973. class TestADOwithMySql(CommonDBTests):
  974. def setUp(self):
  975. self.conn = config.dbMySqlconnect(*config.connStrMySql[0], **config.connStrMySql[1])
  976. self.conn.timeout = 30 # turn timeout back up
  977. self.engine = 'MySQL'
  978. self.db = config.dbMySqlconnect
  979. self.remote = config.connStrMySql[2]
  980. def tearDown(self):
  981. try:
  982. self.conn.rollback()
  983. except:
  984. pass
  985. try:
  986. self.conn.close()
  987. except:
  988. pass
  989. self.conn=None
  990. def getConnection(self):
  991. return self.conn
  992. def getAnotherConnection(self, addkeys=None):
  993. keys = dict(config.connStrMySql[1])
  994. if addkeys:
  995. keys.update(addkeys)
  996. return config.dbMySqlconnect(*config.connStrMySql[0], **keys)
  997. def testOkConnect(self):
  998. c = self.db(*config.connStrMySql[0], **config.connStrMySql[1])
  999. assert c != None
  1000. # def testStoredProcedure(self):
  1001. # crsr=self.conn.cursor()
  1002. # try:
  1003. # crsr.execute("DROP PROCEDURE DeleteMeOnlyForTesting")
  1004. # self.conn.commit()
  1005. # except: #Make sure it is empty
  1006. # pass
  1007. # spdef= """
  1008. # DELIMITER $$
  1009. # CREATE PROCEDURE DeleteMeOnlyForTesting (onein CHAR(10), twoin CHAR(10), OUT theout CHAR(20))
  1010. # DETERMINISTIC
  1011. # BEGIN
  1012. # SET theout = onein //|| twoin;
  1013. # /* (SELECT 'a small string' as result; */
  1014. # END $$
  1015. # """
  1016. #
  1017. # crsr.execute(spdef)
  1018. #
  1019. # retvalues=crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
  1020. # print 'return value (mysql)=',repr(crsr.returnValue) ###
  1021. # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
  1022. # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
  1023. # assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2])
  1024. #
  1025. # try:
  1026. # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting")
  1027. # self.conn.commit()
  1028. # except: #Make sure it is empty
  1029. # pass
  1030. class TestADOwithPostgres(CommonDBTests):
  1031. def setUp(self):
  1032. self.conn = config.dbPostgresConnect(*config.connStrPostgres[0], **config.connStrPostgres[1])
  1033. self.conn.timeout = 30 # turn timeout back up
  1034. self.engine = 'PostgreSQL'
  1035. self.db = config.dbPostgresConnect
  1036. self.remote = config.connStrPostgres[2]
  1037. def tearDown(self):
  1038. try:
  1039. self.conn.rollback()
  1040. except:
  1041. pass
  1042. try:
  1043. self.conn.close()
  1044. except:
  1045. pass
  1046. self.conn=None
  1047. def getConnection(self):
  1048. return self.conn
  1049. def getAnotherConnection(self, addkeys=None):
  1050. keys = dict(config.connStrPostgres[1])
  1051. if addkeys:
  1052. keys.update(addkeys)
  1053. return config.dbPostgresConnect(*config.connStrPostgres[0], **keys)
  1054. def testOkConnect(self):
  1055. c = self.db(*config.connStrPostgres[0], **config.connStrPostgres[1])
  1056. assert c != None
  1057. # def testStoredProcedure(self):
  1058. # crsr=self.conn.cursor()
  1059. # spdef= """
  1060. # CREATE OR REPLACE FUNCTION DeleteMeOnlyForTesting (text, text)
  1061. # RETURNS text AS $funk$
  1062. # BEGIN
  1063. # RETURN $1 || $2;
  1064. # END;
  1065. # $funk$
  1066. # LANGUAGE SQL;
  1067. # """
  1068. #
  1069. # crsr.execute(spdef)
  1070. # retvalues = crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' '))
  1071. # ### print 'return value (pg)=',repr(crsr.returnValue) ###
  1072. # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0])
  1073. # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1])
  1074. # assert retvalues[2]=='Dodsworth Anne','%s is not "Dodsworth Anne"'%repr(retvalues[2])
  1075. # self.conn.rollback()
  1076. # try:
  1077. # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting")
  1078. # self.conn.commit()
  1079. # except: #Make sure it is empty
  1080. # pass
  1081. class TimeConverterInterfaceTest(unittest.TestCase):
  1082. def testIDate(self):
  1083. assert self.tc.Date(1990,2,2)
  1084. def testITime(self):
  1085. assert self.tc.Time(13,2,2)
  1086. def testITimestamp(self):
  1087. assert self.tc.Timestamp(1990,2,2,13,2,1)
  1088. def testIDateObjectFromCOMDate(self):
  1089. assert self.tc.DateObjectFromCOMDate(37435.7604282)
  1090. def testICOMDate(self):
  1091. assert hasattr(self.tc,'COMDate')
  1092. def testExactDate(self):
  1093. d=self.tc.Date(1994,11,15)
  1094. comDate=self.tc.COMDate(d)
  1095. correct=34653.0
  1096. assert comDate == correct,comDate
  1097. def testExactTimestamp(self):
  1098. d=self.tc.Timestamp(1994,11,15,12,0,0)
  1099. comDate=self.tc.COMDate(d)
  1100. correct=34653.5
  1101. self.assertEqual( comDate ,correct)
  1102. d=self.tc.Timestamp(2003,5,6,14,15,17)
  1103. comDate=self.tc.COMDate(d)
  1104. correct=37747.593946759262
  1105. self.assertEqual( comDate ,correct)
  1106. def testIsoFormat(self):
  1107. d=self.tc.Timestamp(1994,11,15,12,3,10)
  1108. iso=self.tc.DateObjectToIsoFormatString(d)
  1109. self.assertEqual(str(iso[:19]) , '1994-11-15 12:03:10')
  1110. dt=self.tc.Date(2003,5,2)
  1111. iso=self.tc.DateObjectToIsoFormatString(dt)
  1112. self.assertEqual(str(iso[:10]), '2003-05-02')
  1113. if config.doMxDateTimeTest:
  1114. import mx.DateTime
  1115. class TestMXDateTimeConverter(TimeConverterInterfaceTest):
  1116. def setUp(self):
  1117. self.tc = api.mxDateTimeConverter()
  1118. def testCOMDate(self):
  1119. t=mx.DateTime.DateTime(2002,6,28,18,15,2)
  1120. cmd=self.tc.COMDate(t)
  1121. assert cmd == t.COMDate()
  1122. def testDateObjectFromCOMDate(self):
  1123. cmd=self.tc.DateObjectFromCOMDate(37435.7604282)
  1124. t=mx.DateTime.DateTime(2002,6,28,18,15,0)
  1125. t2=mx.DateTime.DateTime(2002,6,28,18,15,2)
  1126. assert t2>cmd>t
  1127. def testDate(self):
  1128. assert mx.DateTime.Date(1980,11,4)==self.tc.Date(1980,11,4)
  1129. def testTime(self):
  1130. assert mx.DateTime.Time(13,11,4)==self.tc.Time(13,11,4)
  1131. def testTimestamp(self):
  1132. t=mx.DateTime.DateTime(2002,6,28,18,15,1)
  1133. obj=self.tc.Timestamp(2002,6,28,18,15,1)
  1134. assert t == obj
  1135. import time
  1136. class TestPythonTimeConverter(TimeConverterInterfaceTest):
  1137. def setUp(self):
  1138. self.tc=api.pythonTimeConverter()
  1139. def testCOMDate(self):
  1140. mk = time.mktime((2002,6,28,18,15,1, 4,31+28+31+30+31+28,-1))
  1141. t=time.localtime(mk)
  1142. # Fri, 28 Jun 2002 18:15:01 +0000
  1143. cmd=self.tc.COMDate(t)
  1144. assert abs(cmd - 37435.7604282) < 1.0/24,"%f more than an hour wrong" % cmd
  1145. def testDateObjectFromCOMDate(self):
  1146. cmd=self.tc.DateObjectFromCOMDate(37435.7604282)
  1147. t1=time.gmtime(time.mktime((2002,6,28,0,14,1, 4,31+28+31+30+31+28,-1)))
  1148. #there are errors in the implementation of gmtime which we ignore
  1149. t2=time.gmtime(time.mktime((2002,6,29,12,14,2, 4,31+28+31+30+31+28,-1)))
  1150. assert t1<cmd<t2, '"%s" should be about 2002-6-28 12:15:01'%repr(cmd)
  1151. def testDate(self):
  1152. t1=time.mktime((2002,6,28,18,15,1, 4,31+28+31+30+31+30,0))
  1153. t2=time.mktime((2002,6,30,18,15,1, 4,31+28+31+30+31+28,0))
  1154. obj=self.tc.Date(2002,6,29)
  1155. assert t1< time.mktime(obj)<t2,obj
  1156. def testTime(self):
  1157. self.assertEqual( self.tc.Time(18,15,2),time.gmtime(18*60*60+15*60+2))
  1158. def testTimestamp(self):
  1159. t1=time.localtime(time.mktime((2002,6,28,18,14,1, 4,31+28+31+30+31+28,-1)))
  1160. t2=time.localtime(time.mktime((2002,6,28,18,16,1, 4,31+28+31+30+31+28,-1)))
  1161. obj=self.tc.Timestamp(2002,6,28,18,15,2)
  1162. assert t1< obj <t2,obj
  1163. class TestPythonDateTimeConverter(TimeConverterInterfaceTest):
  1164. def setUp(self):
  1165. self.tc = api.pythonDateTimeConverter()
  1166. def testCOMDate(self):
  1167. t=datetime.datetime( 2002,6,28,18,15,1)
  1168. # Fri, 28 Jun 2002 18:15:01 +0000
  1169. cmd=self.tc.COMDate(t)
  1170. assert abs(cmd - 37435.7604282) < 1.0/24,"more than an hour wrong"
  1171. def testDateObjectFromCOMDate(self):
  1172. cmd = self.tc.DateObjectFromCOMDate(37435.7604282)
  1173. t1 = datetime.datetime(2002,6,28,18,14,1)
  1174. t2 = datetime.datetime(2002,6,28,18,16,1)
  1175. assert t1 < cmd < t2, cmd
  1176. tx = datetime.datetime(2002,6,28,18,14,1,900000) # testing that microseconds don't become milliseconds
  1177. c1 = self.tc.DateObjectFromCOMDate(self.tc.COMDate(tx))
  1178. assert t1 < c1 < t2, c1
  1179. def testDate(self):
  1180. t1=datetime.date(2002,6,28)
  1181. t2=datetime.date(2002,6,30)
  1182. obj=self.tc.Date(2002,6,29)
  1183. assert t1< obj <t2,obj
  1184. def testTime(self):
  1185. self.assertEqual( self.tc.Time(18,15,2).isoformat()[:8],'18:15:02')
  1186. def testTimestamp(self):
  1187. t1=datetime.datetime(2002,6,28,18,14,1)
  1188. t2=datetime.datetime(2002,6,28,18,16,1)
  1189. obj=self.tc.Timestamp(2002,6,28,18,15,2)
  1190. assert t1< obj <t2,obj
  1191. suites=[]
  1192. suites.append( unittest.makeSuite(TestPythonDateTimeConverter,'test'))
  1193. if config.doMxDateTimeTest:
  1194. suites.append( unittest.makeSuite(TestMXDateTimeConverter,'test'))
  1195. if config.doTimeTest:
  1196. suites.append( unittest.makeSuite(TestPythonTimeConverter,'test'))
  1197. if config.doAccessTest:
  1198. suites.append( unittest.makeSuite(TestADOwithAccessDB,'test'))
  1199. if config.doSqlServerTest:
  1200. suites.append( unittest.makeSuite(TestADOwithSQLServer,'test'))
  1201. if config.doMySqlTest:
  1202. suites.append( unittest.makeSuite(TestADOwithMySql,'test'))
  1203. if config.doPostgresTest:
  1204. suites.append( unittest.makeSuite(TestADOwithPostgres,'test'))
  1205. class cleanup_manager(object):
  1206. def __enter__(self):
  1207. pass
  1208. def __exit__(self, exc_type, exc_val, exc_tb):
  1209. config.cleanup(config.testfolder, config.mdb_name)
  1210. suite=unittest.TestSuite(suites)
  1211. if __name__ == '__main__':
  1212. mysuite = copy.deepcopy(suite)
  1213. with cleanup_manager():
  1214. defaultDateConverter = adodbapi.dateconverter
  1215. print(__doc__)
  1216. print("Default Date Converter is %s" %(defaultDateConverter,))
  1217. dateconverter = defaultDateConverter
  1218. tag = 'datetime'
  1219. unittest.TextTestRunner().run(mysuite)
  1220. if config.iterateOverTimeTests:
  1221. for test, dateconverter, tag in (
  1222. (config.doTimeTest,api.pythonTimeConverter, 'pythontime'),
  1223. (config.doMxDateTimeTest, api.mxDateTimeConverter, 'mx')):
  1224. if test:
  1225. mysuite = copy.deepcopy(suite) # work around a side effect of unittest.TextTestRunner
  1226. adodbapi.adodbapi.dateconverter = dateconverter()
  1227. print("Changed dateconverter to ")
  1228. print(adodbapi.adodbapi.dateconverter)
  1229. unittest.TextTestRunner().run(mysuite)