test_win32file.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. from __future__ import print_function
  2. import unittest
  3. from pywin32_testutil import str2bytes, TestSkipped, testmain
  4. import win32api, win32file, win32pipe, pywintypes, winerror, win32event
  5. import win32con, ntsecuritycon
  6. import sys
  7. import os
  8. import tempfile
  9. import threading
  10. import time
  11. import shutil
  12. import socket
  13. import datetime
  14. import random
  15. import win32timezone
  16. try:
  17. set
  18. except NameError:
  19. from sets import Set as set
  20. class TestReadBuffer(unittest.TestCase):
  21. def testLen(self):
  22. buffer = win32file.AllocateReadBuffer(1)
  23. self.failUnlessEqual(len(buffer), 1)
  24. def testSimpleIndex(self):
  25. val = str2bytes('\xFF')
  26. buffer = win32file.AllocateReadBuffer(1)
  27. buffer[0] = val
  28. self.failUnlessEqual(buffer[0], val)
  29. def testSimpleSlice(self):
  30. buffer = win32file.AllocateReadBuffer(2)
  31. val = str2bytes('\0\0')
  32. buffer[:2] = val
  33. self.failUnlessEqual(buffer[0:2], val)
  34. class TestSimpleOps(unittest.TestCase):
  35. def testSimpleFiles(self):
  36. fd, filename = tempfile.mkstemp()
  37. os.close(fd)
  38. os.unlink(filename)
  39. handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
  40. test_data = str2bytes("Hello\0there")
  41. try:
  42. win32file.WriteFile(handle, test_data)
  43. handle.Close()
  44. # Try and open for read
  45. handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
  46. rc, data = win32file.ReadFile(handle, 1024)
  47. self.assertEquals(data, test_data)
  48. finally:
  49. handle.Close()
  50. try:
  51. os.unlink(filename)
  52. except os.error:
  53. pass
  54. # A simple test using normal read/write operations.
  55. def testMoreFiles(self):
  56. # Create a file in the %TEMP% directory.
  57. testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
  58. desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
  59. # Set a flag to delete the file automatically when it is closed.
  60. fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
  61. h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)
  62. # Write a known number of bytes to the file.
  63. data = str2bytes("z") * 1025
  64. win32file.WriteFile(h, data)
  65. self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")
  66. # Ensure we can read the data back.
  67. win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
  68. hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
  69. self.failUnless(hr==0, "Readfile returned %d" % hr)
  70. self.failUnless(read_data == data, "Read data is not what we wrote!")
  71. # Now truncate the file at 1/2 its existing size.
  72. newSize = len(data)//2
  73. win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
  74. win32file.SetEndOfFile(h)
  75. self.failUnlessEqual(win32file.GetFileSize(h), newSize)
  76. # GetFileAttributesEx/GetFileAttributesExW tests.
  77. self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))
  78. attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
  79. self.failUnless(size==newSize,
  80. "Expected GetFileAttributesEx to return the same size as GetFileSize()")
  81. self.failUnless(attr==win32file.GetFileAttributes(testName),
  82. "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")
  83. h = None # Close the file by removing the last reference to the handle!
  84. self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
  85. def testFilePointer(self):
  86. # via [ 979270 ] SetFilePointer fails with negative offset
  87. # Create a file in the %TEMP% directory.
  88. filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
  89. f = win32file.CreateFile(filename,
  90. win32file.GENERIC_READ|win32file.GENERIC_WRITE,
  91. 0,
  92. None,
  93. win32file.CREATE_ALWAYS,
  94. win32file.FILE_ATTRIBUTE_NORMAL,
  95. 0)
  96. try:
  97. #Write some data
  98. data = str2bytes('Some data')
  99. (res, written) = win32file.WriteFile(f, data)
  100. self.failIf(res)
  101. self.assertEqual(written, len(data))
  102. #Move at the beginning and read the data
  103. win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
  104. (res, s) = win32file.ReadFile(f, len(data))
  105. self.failIf(res)
  106. self.assertEqual(s, data)
  107. #Move at the end and read the data
  108. win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
  109. (res, s) = win32file.ReadFile(f, len(data))
  110. self.failIf(res)
  111. self.failUnlessEqual(s, data)
  112. finally:
  113. f.Close()
  114. os.unlink(filename)
  115. def testFileTimesTimezones(self):
  116. if not issubclass(pywintypes.TimeType, datetime.datetime):
  117. # maybe should report 'skipped', but that's not quite right as
  118. # there is nothing you can do to avoid it being skipped!
  119. return
  120. filename = tempfile.mktemp("-testFileTimes")
  121. # now() is always returning a timestamp with microseconds but the
  122. # file APIs all have zero microseconds, so some comparisons fail.
  123. now_utc = win32timezone.utcnow().replace(microsecond=0)
  124. now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local())
  125. h = win32file.CreateFile(filename,
  126. win32file.GENERIC_READ|win32file.GENERIC_WRITE,
  127. 0, None, win32file.CREATE_ALWAYS, 0, 0)
  128. try:
  129. win32file.SetFileTime(h, now_utc, now_utc, now_utc)
  130. ct, at, wt = win32file.GetFileTime(h)
  131. self.failUnlessEqual(now_local, ct)
  132. self.failUnlessEqual(now_local, at)
  133. self.failUnlessEqual(now_local, wt)
  134. # and the reverse - set local, check against utc
  135. win32file.SetFileTime(h, now_local, now_local, now_local)
  136. ct, at, wt = win32file.GetFileTime(h)
  137. self.failUnlessEqual(now_utc, ct)
  138. self.failUnlessEqual(now_utc, at)
  139. self.failUnlessEqual(now_utc, wt)
  140. finally:
  141. h.close()
  142. os.unlink(filename)
  143. def testFileTimes(self):
  144. if issubclass(pywintypes.TimeType, datetime.datetime):
  145. from win32timezone import TimeZoneInfo
  146. # now() is always returning a timestamp with microseconds but the
  147. # file APIs all have zero microseconds, so some comparisons fail.
  148. now = datetime.datetime.now(tz=TimeZoneInfo.utc()).replace(microsecond=0)
  149. nowish = now + datetime.timedelta(seconds=1)
  150. later = now + datetime.timedelta(seconds=120)
  151. else:
  152. rc, tzi = win32api.GetTimeZoneInformation()
  153. bias = tzi[0]
  154. if rc==2: # daylight-savings is in effect.
  155. bias += tzi[-1]
  156. bias *= 60 # minutes to seconds...
  157. tick = int(time.time())
  158. now = pywintypes.Time(tick+bias)
  159. nowish = pywintypes.Time(tick+bias+1)
  160. later = pywintypes.Time(tick+bias+120)
  161. filename = tempfile.mktemp("-testFileTimes")
  162. # Windows docs the 'last time' isn't valid until the last write
  163. # handle is closed - so create the file, then re-open it to check.
  164. open(filename,"w").close()
  165. f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
  166. 0, None,
  167. win32con.OPEN_EXISTING, 0, None)
  168. try:
  169. ct, at, wt = win32file.GetFileTime(f)
  170. self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
  171. self.failUnless( now <= ct <= nowish, (now, ct))
  172. self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
  173. self.failUnless( now <= wt <= nowish, (now, wt))
  174. # Now set the times.
  175. win32file.SetFileTime(f, later, later, later, UTCTimes=True)
  176. # Get them back.
  177. ct, at, wt = win32file.GetFileTime(f)
  178. # XXX - the builtin PyTime type appears to be out by a dst offset.
  179. # just ignore that type here...
  180. self.failUnlessEqual(ct, later)
  181. self.failUnlessEqual(at, later)
  182. self.failUnlessEqual(wt, later)
  183. finally:
  184. f.Close()
  185. os.unlink(filename)
  186. class TestGetFileInfoByHandleEx(unittest.TestCase):
  187. __handle = __filename = None
  188. def setUp(self):
  189. fd, self.__filename = tempfile.mkstemp()
  190. os.close(fd)
  191. def tearDown(self):
  192. if self.__handle is not None:
  193. self.__handle.Close()
  194. if self.__filename is not None:
  195. try:
  196. os.unlink(self.__filename)
  197. except OSError:
  198. pass
  199. self.__handle = self.__filename = None
  200. def testFileBasicInfo(self):
  201. attr = win32file.GetFileAttributes(self.__filename)
  202. f = win32file.CreateFile(self.__filename, win32file.GENERIC_READ, 0, None,
  203. win32con.OPEN_EXISTING, 0, None)
  204. self.__handle = f
  205. ct, at, wt = win32file.GetFileTime(f)
  206. # bug #752: this throws ERROR_BAD_LENGTH (24) in x86 binaries of build 221
  207. basic_info = win32file.GetFileInformationByHandleEx(f, win32file.FileBasicInfo)
  208. self.assertEqual(ct, basic_info['CreationTime'])
  209. self.assertEqual(at, basic_info['LastAccessTime'])
  210. self.assertEqual(wt, basic_info['LastWriteTime'])
  211. self.assertEqual(attr, basic_info['FileAttributes'])
  212. class TestOverlapped(unittest.TestCase):
  213. def testSimpleOverlapped(self):
  214. # Create a file in the %TEMP% directory.
  215. import win32event
  216. testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
  217. desiredAccess = win32file.GENERIC_WRITE
  218. overlapped = pywintypes.OVERLAPPED()
  219. evt = win32event.CreateEvent(None, 0, 0, None)
  220. overlapped.hEvent = evt
  221. # Create the file and write shit-loads of data to it.
  222. h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
  223. chunk_data = str2bytes("z") * 0x8000
  224. num_loops = 512
  225. expected_size = num_loops * len(chunk_data)
  226. for i in range(num_loops):
  227. win32file.WriteFile(h, chunk_data, overlapped)
  228. win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
  229. overlapped.Offset = overlapped.Offset + len(chunk_data)
  230. h.Close()
  231. # Now read the data back overlapped
  232. overlapped = pywintypes.OVERLAPPED()
  233. evt = win32event.CreateEvent(None, 0, 0, None)
  234. overlapped.hEvent = evt
  235. desiredAccess = win32file.GENERIC_READ
  236. h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
  237. buffer = win32file.AllocateReadBuffer(0xFFFF)
  238. while 1:
  239. try:
  240. hr, data = win32file.ReadFile(h, buffer, overlapped)
  241. win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
  242. overlapped.Offset = overlapped.Offset + len(data)
  243. if not data is buffer:
  244. self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
  245. except win32api.error:
  246. break
  247. h.Close()
  248. def testCompletionPortsMultiple(self):
  249. # Mainly checking that we can "associate" an existing handle. This
  250. # failed in build 203.
  251. ioport = win32file.CreateIoCompletionPort(win32file.INVALID_HANDLE_VALUE,
  252. 0, 0, 0)
  253. socks = []
  254. for PORT in range(9123, 9125):
  255. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  256. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  257. sock.bind(('', PORT))
  258. sock.listen(1)
  259. socks.append(sock)
  260. new = win32file.CreateIoCompletionPort(sock.fileno(), ioport, PORT, 0)
  261. assert new is ioport
  262. for s in socks:
  263. s.close()
  264. hv = int(ioport)
  265. ioport = new = None
  266. # The handle itself should be closed now (unless we leak references!)
  267. # Check that.
  268. try:
  269. win32file.CloseHandle(hv)
  270. raise RuntimeError("Expected close to fail!")
  271. except win32file.error as details:
  272. self.failUnlessEqual(details.winerror, winerror.ERROR_INVALID_HANDLE)
  273. def testCompletionPortsQueued(self):
  274. class Foo: pass
  275. io_req_port = win32file.CreateIoCompletionPort(-1, None, 0, 0)
  276. overlapped = pywintypes.OVERLAPPED()
  277. overlapped.object = Foo()
  278. win32file.PostQueuedCompletionStatus(io_req_port, 0, 99, overlapped)
  279. errCode, bytes, key, overlapped = \
  280. win32file.GetQueuedCompletionStatus(io_req_port, win32event.INFINITE)
  281. self.failUnlessEqual(errCode, 0)
  282. self.failUnless(isinstance(overlapped.object, Foo))
  283. def _IOCPServerThread(self, handle, port, drop_overlapped_reference):
  284. overlapped = pywintypes.OVERLAPPED()
  285. win32pipe.ConnectNamedPipe(handle, overlapped)
  286. if drop_overlapped_reference:
  287. # Be naughty - the overlapped object is now dead, but
  288. # GetQueuedCompletionStatus will still find it. Our check of
  289. # reference counting should catch that error.
  290. overlapped = None
  291. # even if we fail, be sure to close the handle; prevents hangs
  292. # on Vista 64...
  293. try:
  294. self.failUnlessRaises(RuntimeError,
  295. win32file.GetQueuedCompletionStatus, port, -1)
  296. finally:
  297. handle.Close()
  298. return
  299. result = win32file.GetQueuedCompletionStatus(port, -1)
  300. ol2 = result[-1]
  301. self.failUnless(ol2 is overlapped)
  302. data = win32file.ReadFile(handle, 512)[1]
  303. win32file.WriteFile(handle, data)
  304. def testCompletionPortsNonQueued(self, test_overlapped_death = 0):
  305. # In 204 we had a reference count bug when OVERLAPPED objects were
  306. # associated with a completion port other than via
  307. # PostQueuedCompletionStatus. This test is based on the reproduction
  308. # reported with that bug.
  309. # Create the pipe.
  310. BUFSIZE = 512
  311. pipe_name = r"\\.\pipe\pywin32_test_pipe"
  312. handle = win32pipe.CreateNamedPipe(pipe_name,
  313. win32pipe.PIPE_ACCESS_DUPLEX|
  314. win32file.FILE_FLAG_OVERLAPPED,
  315. win32pipe.PIPE_TYPE_MESSAGE|
  316. win32pipe.PIPE_READMODE_MESSAGE|
  317. win32pipe.PIPE_WAIT,
  318. 1, BUFSIZE, BUFSIZE,
  319. win32pipe.NMPWAIT_WAIT_FOREVER,
  320. None)
  321. # Create an IOCP and associate it with the handle.
  322. port = win32file.CreateIoCompletionPort(-1, 0, 0, 0)
  323. win32file.CreateIoCompletionPort(handle, port, 1, 0)
  324. t = threading.Thread(target=self._IOCPServerThread, args=(handle,port, test_overlapped_death))
  325. t.setDaemon(True) # avoid hanging entire test suite on failure.
  326. t.start()
  327. try:
  328. time.sleep(0.1) # let thread do its thing.
  329. try:
  330. win32pipe.CallNamedPipe(r"\\.\pipe\pywin32_test_pipe", str2bytes("Hello there"), BUFSIZE, 0)
  331. except win32pipe.error:
  332. # Testing for overlapped death causes this
  333. if not test_overlapped_death:
  334. raise
  335. finally:
  336. if not test_overlapped_death:
  337. handle.Close()
  338. t.join(3)
  339. self.failIf(t.isAlive(), "thread didn't finish")
  340. def testCompletionPortsNonQueuedBadReference(self):
  341. self.testCompletionPortsNonQueued(True)
  342. def testHashable(self):
  343. overlapped = pywintypes.OVERLAPPED()
  344. d = {}
  345. d[overlapped] = "hello"
  346. self.failUnlessEqual(d[overlapped], "hello")
  347. def testComparable(self):
  348. overlapped = pywintypes.OVERLAPPED()
  349. self.failUnlessEqual(overlapped, overlapped)
  350. # ensure we explicitly test the operators.
  351. self.failUnless(overlapped == overlapped)
  352. self.failIf(overlapped != overlapped)
  353. def testComparable2(self):
  354. # 2 overlapped objects compare equal if their contents are the same.
  355. overlapped1 = pywintypes.OVERLAPPED()
  356. overlapped2 = pywintypes.OVERLAPPED()
  357. self.failUnlessEqual(overlapped1, overlapped2)
  358. # ensure we explicitly test the operators.
  359. self.failUnless(overlapped1 == overlapped2)
  360. self.failIf(overlapped1 != overlapped2)
  361. # now change something in one of them - should no longer be equal.
  362. overlapped1.hEvent = 1
  363. self.failIfEqual(overlapped1, overlapped2)
  364. # ensure we explicitly test the operators.
  365. self.failIf(overlapped1 == overlapped2)
  366. self.failUnless(overlapped1 != overlapped2)
  367. class TestSocketExtensions(unittest.TestCase):
  368. def acceptWorker(self, port, running_event, stopped_event):
  369. listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  370. listener.bind(('', port))
  371. listener.listen(200)
  372. # create accept socket
  373. accepter = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  374. # An overlapped
  375. overlapped = pywintypes.OVERLAPPED()
  376. overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
  377. # accept the connection.
  378. # We used to allow strings etc to be passed here, and they would be
  379. # modified! Obviously this is evil :)
  380. buffer = " " * 1024 # EVIL - SHOULD NOT BE ALLOWED.
  381. self.assertRaises(TypeError, win32file.AcceptEx, listener, accepter, buffer, overlapped)
  382. # This is the correct way to allocate the buffer...
  383. buffer = win32file.AllocateReadBuffer(1024)
  384. rc = win32file.AcceptEx(listener, accepter, buffer, overlapped)
  385. self.failUnlessEqual(rc, winerror.ERROR_IO_PENDING)
  386. # Set the event to say we are all ready
  387. running_event.set()
  388. # and wait for the connection.
  389. rc = win32event.WaitForSingleObject(overlapped.hEvent, 2000)
  390. if rc == win32event.WAIT_TIMEOUT:
  391. self.fail("timed out waiting for a connection")
  392. nbytes = win32file.GetOverlappedResult(listener.fileno(), overlapped, False)
  393. #fam, loc, rem = win32file.GetAcceptExSockaddrs(accepter, buffer)
  394. accepter.send(buffer[:nbytes])
  395. # NOT set in a finally - this means *successfully* stopped!
  396. stopped_event.set()
  397. def testAcceptEx(self):
  398. port = 4680
  399. running = threading.Event()
  400. stopped = threading.Event()
  401. t = threading.Thread(target=self.acceptWorker, args=(port, running,stopped))
  402. t.start()
  403. running.wait(2)
  404. if not running.isSet():
  405. self.fail("AcceptEx Worker thread failed to start")
  406. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  407. s.connect(('127.0.0.1', port))
  408. win32file.WSASend(s, str2bytes("hello"), None)
  409. overlapped = pywintypes.OVERLAPPED()
  410. overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
  411. # Like above - WSARecv used to allow strings as the receive buffer!!
  412. buffer = " " * 10
  413. self.assertRaises(TypeError, win32file.WSARecv, s, buffer, overlapped)
  414. # This one should work :)
  415. buffer = win32file.AllocateReadBuffer(10)
  416. win32file.WSARecv(s, buffer, overlapped)
  417. nbytes = win32file.GetOverlappedResult(s.fileno(), overlapped, True)
  418. got = buffer[:nbytes]
  419. self.failUnlessEqual(got, str2bytes("hello"))
  420. # thread should have stopped
  421. stopped.wait(2)
  422. if not stopped.isSet():
  423. self.fail("AcceptEx Worker thread failed to successfully stop")
  424. class TestFindFiles(unittest.TestCase):
  425. def testIter(self):
  426. dir = os.path.join(os.getcwd(), "*")
  427. files = win32file.FindFilesW(dir)
  428. set1 = set()
  429. set1.update(files)
  430. set2 = set()
  431. for file in win32file.FindFilesIterator(dir):
  432. set2.add(file)
  433. assert len(set2) > 5, "This directory has less than 5 files!?"
  434. self.failUnlessEqual(set1, set2)
  435. def testBadDir(self):
  436. dir = os.path.join(os.getcwd(), "a dir that doesnt exist", "*")
  437. self.assertRaises(win32file.error, win32file.FindFilesIterator, dir)
  438. def testEmptySpec(self):
  439. spec = os.path.join(os.getcwd(), "*.foo_bar")
  440. num = 0
  441. for i in win32file.FindFilesIterator(spec):
  442. num += 1
  443. self.failUnlessEqual(0, num)
  444. def testEmptyDir(self):
  445. test_path = os.path.join(win32api.GetTempPath(), "win32file_test_directory")
  446. try:
  447. # Note: previously used shutil.rmtree, but when looking for
  448. # reference count leaks, that function showed leaks! os.rmdir
  449. # doesn't have that problem.
  450. os.rmdir(test_path)
  451. except os.error:
  452. pass
  453. os.mkdir(test_path)
  454. try:
  455. num = 0
  456. for i in win32file.FindFilesIterator(os.path.join(test_path, "*")):
  457. num += 1
  458. # Expecting "." and ".." only
  459. self.failUnlessEqual(2, num)
  460. finally:
  461. os.rmdir(test_path)
  462. class TestDirectoryChanges(unittest.TestCase):
  463. num_test_dirs = 1
  464. def setUp(self):
  465. self.watcher_threads = []
  466. self.watcher_thread_changes = []
  467. self.dir_names = []
  468. self.dir_handles = []
  469. for i in range(self.num_test_dirs):
  470. td = tempfile.mktemp("-test-directory-changes-%d" % i)
  471. os.mkdir(td)
  472. self.dir_names.append(td)
  473. hdir = win32file.CreateFile(td,
  474. ntsecuritycon.FILE_LIST_DIRECTORY,
  475. win32con.FILE_SHARE_READ,
  476. None, # security desc
  477. win32con.OPEN_EXISTING,
  478. win32con.FILE_FLAG_BACKUP_SEMANTICS |
  479. win32con.FILE_FLAG_OVERLAPPED,
  480. None)
  481. self.dir_handles.append(hdir)
  482. changes = []
  483. t = threading.Thread(target=self._watcherThreadOverlapped,
  484. args=(td, hdir, changes))
  485. t.start()
  486. self.watcher_threads.append(t)
  487. self.watcher_thread_changes.append(changes)
  488. def _watcherThread(self, dn, dh, changes):
  489. # A synchronous version:
  490. # XXX - not used - I was having a whole lot of problems trying to
  491. # get this to work. Specifically:
  492. # * ReadDirectoryChangesW without an OVERLAPPED blocks infinitely.
  493. # * If another thread attempts to close the handle while
  494. # ReadDirectoryChangesW is waiting on it, the ::CloseHandle() method
  495. # blocks (which has nothing to do with the GIL - it is correctly
  496. # managed)
  497. # Which ends up with no way to kill the thread!
  498. flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
  499. while 1:
  500. try:
  501. print("waiting", dh)
  502. changes = win32file.ReadDirectoryChangesW(dh,
  503. 8192,
  504. False, #sub-tree
  505. flags)
  506. print("got", changes)
  507. except:
  508. raise
  509. changes.extend(changes)
  510. def _watcherThreadOverlapped(self, dn, dh, changes):
  511. flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME
  512. buf = win32file.AllocateReadBuffer(8192)
  513. overlapped = pywintypes.OVERLAPPED()
  514. overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
  515. while 1:
  516. win32file.ReadDirectoryChangesW(dh,
  517. buf,
  518. False, #sub-tree
  519. flags,
  520. overlapped)
  521. # Wait for our event, or for 5 seconds.
  522. rc = win32event.WaitForSingleObject(overlapped.hEvent, 5000)
  523. if rc == win32event.WAIT_OBJECT_0:
  524. # got some data! Must use GetOverlappedResult to find out
  525. # how much is valid! 0 generally means the handle has
  526. # been closed. Blocking is OK here, as the event has
  527. # already been set.
  528. nbytes = win32file.GetOverlappedResult(dh, overlapped, True)
  529. if nbytes:
  530. bits = win32file.FILE_NOTIFY_INFORMATION(buf, nbytes)
  531. changes.extend(bits)
  532. else:
  533. # This is "normal" exit - our 'tearDown' closes the
  534. # handle.
  535. # print "looks like dir handle was closed!"
  536. return
  537. else:
  538. print("ERROR: Watcher thread timed-out!")
  539. return # kill the thread!
  540. def tearDown(self):
  541. # be careful about raising errors at teardown!
  542. for h in self.dir_handles:
  543. # See comments in _watcherThread above - this appears to
  544. # deadlock if a synchronous ReadDirectoryChangesW is waiting...
  545. # (No such problems with an asynch ReadDirectoryChangesW)
  546. h.Close()
  547. for dn in self.dir_names:
  548. try:
  549. shutil.rmtree(dn)
  550. except OSError:
  551. print("FAILED to remove directory", dn)
  552. for t in self.watcher_threads:
  553. # closing dir handle should have killed threads!
  554. t.join(5)
  555. if t.isAlive():
  556. print("FAILED to wait for thread termination")
  557. def stablize(self):
  558. time.sleep(0.5)
  559. def testSimple(self):
  560. self.stablize()
  561. for dn in self.dir_names:
  562. fn = os.path.join(dn, "test_file")
  563. open(fn, "w").close()
  564. self.stablize()
  565. changes = self.watcher_thread_changes[0]
  566. self.failUnlessEqual(changes, [(1, "test_file")])
  567. def testSmall(self):
  568. self.stablize()
  569. for dn in self.dir_names:
  570. fn = os.path.join(dn, "x")
  571. open(fn, "w").close()
  572. self.stablize()
  573. changes = self.watcher_thread_changes[0]
  574. self.failUnlessEqual(changes, [(1, "x")])
  575. class TestEncrypt(unittest.TestCase):
  576. def testEncrypt(self):
  577. fname = tempfile.mktemp("win32file_test")
  578. f = open(fname, "wb")
  579. f.write(str2bytes("hello"))
  580. f.close()
  581. f = None
  582. try:
  583. try:
  584. win32file.EncryptFile(fname)
  585. except win32file.error as details:
  586. if details.winerror != winerror.ERROR_ACCESS_DENIED:
  587. raise
  588. print("It appears this is not NTFS - cant encrypt/decrypt")
  589. win32file.DecryptFile(fname)
  590. finally:
  591. if f is not None:
  592. f.close()
  593. os.unlink(fname)
  594. class TestConnect(unittest.TestCase):
  595. def connect_thread_runner(self, expect_payload, giveup_event):
  596. # As Windows 2000 doesn't do ConnectEx, we need to use a non-blocking
  597. # accept, as our test connection may never come. May as well use
  598. # AcceptEx for this...
  599. listener = socket.socket()
  600. self.addr = ('localhost', random.randint(10000,64000))
  601. listener.bind(self.addr)
  602. listener.listen(1)
  603. # create accept socket
  604. accepter = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  605. # An overlapped
  606. overlapped = pywintypes.OVERLAPPED()
  607. overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
  608. # accept the connection.
  609. if expect_payload:
  610. buf_size = 1024
  611. else:
  612. # when we don't expect data we must be careful to only pass the
  613. # exact number of bytes for the endpoint data...
  614. buf_size = win32file.CalculateSocketEndPointSize(listener)
  615. buffer = win32file.AllocateReadBuffer(buf_size)
  616. win32file.AcceptEx(listener, accepter, buffer, overlapped)
  617. # wait for the connection or our test to fail.
  618. events = giveup_event, overlapped.hEvent
  619. rc = win32event.WaitForMultipleObjects(events, False, 2000)
  620. if rc == win32event.WAIT_TIMEOUT:
  621. self.fail("timed out waiting for a connection")
  622. if rc == win32event.WAIT_OBJECT_0:
  623. # Our main thread running the test failed and will never connect.
  624. return
  625. # must be a connection.
  626. nbytes = win32file.GetOverlappedResult(listener.fileno(), overlapped, False)
  627. if expect_payload:
  628. self.request = buffer[:nbytes]
  629. accepter.send(str2bytes('some expected response'))
  630. def test_connect_with_payload(self):
  631. giveup_event = win32event.CreateEvent(None, 0, 0, None)
  632. t = threading.Thread(target=self.connect_thread_runner,
  633. args=(True, giveup_event))
  634. t.start()
  635. time.sleep(0.1)
  636. s2 = socket.socket()
  637. ol = pywintypes.OVERLAPPED()
  638. s2.bind(('0.0.0.0', 0)) # connectex requires the socket be bound beforehand
  639. try:
  640. win32file.ConnectEx(s2, self.addr, ol, str2bytes("some expected request"))
  641. except win32file.error as exc:
  642. win32event.SetEvent(giveup_event)
  643. if exc.winerror == 10022: # WSAEINVAL
  644. raise TestSkipped("ConnectEx is not available on this platform")
  645. raise # some error error we don't expect.
  646. win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  647. ol = pywintypes.OVERLAPPED()
  648. buff = win32file.AllocateReadBuffer(1024)
  649. win32file.WSARecv(s2, buff, ol, 0)
  650. length = win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  651. self.response = buff[:length]
  652. self.assertEqual(self.response, str2bytes('some expected response'))
  653. self.assertEqual(self.request, str2bytes('some expected request'))
  654. t.join(5)
  655. self.failIf(t.isAlive(), "worker thread didn't terminate")
  656. def test_connect_without_payload(self):
  657. giveup_event = win32event.CreateEvent(None, 0, 0, None)
  658. t = threading.Thread(target=self.connect_thread_runner,
  659. args=(False, giveup_event))
  660. t.start()
  661. time.sleep(0.1)
  662. s2 = socket.socket()
  663. ol = pywintypes.OVERLAPPED()
  664. s2.bind(('0.0.0.0', 0)) # connectex requires the socket be bound beforehand
  665. try:
  666. win32file.ConnectEx(s2, self.addr, ol)
  667. except win32file.error as exc:
  668. win32event.SetEvent(giveup_event)
  669. if exc.winerror == 10022: # WSAEINVAL
  670. raise TestSkipped("ConnectEx is not available on this platform")
  671. raise # some error error we don't expect.
  672. win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  673. ol = pywintypes.OVERLAPPED()
  674. buff = win32file.AllocateReadBuffer(1024)
  675. win32file.WSARecv(s2, buff, ol, 0)
  676. length = win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  677. self.response = buff[:length]
  678. self.assertEqual(self.response, str2bytes('some expected response'))
  679. t.join(5)
  680. self.failIf(t.isAlive(), "worker thread didn't terminate")
  681. class TestTransmit(unittest.TestCase):
  682. def test_transmit(self):
  683. import binascii
  684. bytes = os.urandom(1024*1024)
  685. val = binascii.hexlify(bytes)
  686. val_length = len(val)
  687. f = tempfile.TemporaryFile()
  688. f.write(val)
  689. def runner():
  690. s1 = socket.socket()
  691. self.addr = ('localhost', random.randint(10000,64000))
  692. s1.bind(self.addr)
  693. s1.listen(1)
  694. cli, addr = s1.accept()
  695. buf = 1
  696. self.request = []
  697. while buf:
  698. buf = cli.recv(1024*100)
  699. self.request.append(buf)
  700. th = threading.Thread(target=runner)
  701. th.start()
  702. time.sleep(0.5)
  703. s2 = socket.socket()
  704. s2.connect(self.addr)
  705. length = 0
  706. aaa = str2bytes("[AAA]")
  707. bbb = str2bytes("[BBB]")
  708. ccc = str2bytes("[CCC]")
  709. ddd = str2bytes("[DDD]")
  710. empty = str2bytes("")
  711. ol = pywintypes.OVERLAPPED()
  712. f.seek(0)
  713. win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0)
  714. length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  715. ol = pywintypes.OVERLAPPED()
  716. f.seek(0)
  717. win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, aaa, bbb)
  718. length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  719. ol = pywintypes.OVERLAPPED()
  720. f.seek(0)
  721. win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, empty, empty)
  722. length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  723. ol = pywintypes.OVERLAPPED()
  724. f.seek(0)
  725. win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, None, ccc)
  726. length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  727. ol = pywintypes.OVERLAPPED()
  728. f.seek(0)
  729. win32file.TransmitFile(s2, win32file._get_osfhandle(f.fileno()), val_length, 0, ol, 0, ddd)
  730. length += win32file.GetOverlappedResult(s2.fileno(), ol, 1)
  731. s2.close()
  732. th.join()
  733. buf = str2bytes('').join(self.request)
  734. self.assertEqual(length, len(buf))
  735. expected = val + aaa + val + bbb + val + val + ccc + ddd + val
  736. self.assertEqual(type(expected), type(buf))
  737. self.assert_(expected == buf)
  738. class TestWSAEnumNetworkEvents(unittest.TestCase):
  739. def test_basics(self):
  740. s = socket.socket()
  741. e = win32event.CreateEvent(None, 1, 0, None)
  742. win32file.WSAEventSelect(s, e, 0)
  743. self.assertEquals(win32file.WSAEnumNetworkEvents(s), {})
  744. self.assertEquals(win32file.WSAEnumNetworkEvents(s, e), {})
  745. self.assertRaises(TypeError, win32file.WSAEnumNetworkEvents, s, e, 3)
  746. self.assertRaises(TypeError, win32file.WSAEnumNetworkEvents, s, "spam")
  747. self.assertRaises(TypeError, win32file.WSAEnumNetworkEvents, "spam", e)
  748. self.assertRaises(TypeError, win32file.WSAEnumNetworkEvents, "spam")
  749. f = open("NUL")
  750. h = win32file._get_osfhandle(f.fileno())
  751. self.assertRaises(win32file.error, win32file.WSAEnumNetworkEvents, h)
  752. self.assertRaises(win32file.error, win32file.WSAEnumNetworkEvents, s, h)
  753. try:
  754. win32file.WSAEnumNetworkEvents(h)
  755. except win32file.error as e:
  756. self.assertEquals(e.winerror, win32file.WSAENOTSOCK)
  757. try:
  758. win32file.WSAEnumNetworkEvents(s, h)
  759. except win32file.error as e:
  760. # According to the docs it would seem reasonable that
  761. # this would fail with WSAEINVAL, but it doesn't.
  762. self.assertEquals(e.winerror, win32file.WSAENOTSOCK)
  763. def test_functional(self):
  764. # This is not really a unit test, but it does exercise the code
  765. # quite well and can serve as an example of WSAEventSelect and
  766. # WSAEnumNetworkEvents usage.
  767. port = socket.socket()
  768. port.setblocking(0)
  769. port_event = win32event.CreateEvent(None, 0, 0, None)
  770. win32file.WSAEventSelect(port, port_event,
  771. win32file.FD_ACCEPT |
  772. win32file.FD_CLOSE)
  773. port.bind(("127.0.0.1", 0))
  774. port.listen(10)
  775. client = socket.socket()
  776. client.setblocking(0)
  777. client_event = win32event.CreateEvent(None, 0, 0, None)
  778. win32file.WSAEventSelect(client, client_event,
  779. win32file.FD_CONNECT |
  780. win32file.FD_READ |
  781. win32file.FD_WRITE |
  782. win32file.FD_CLOSE)
  783. err = client.connect_ex(port.getsockname())
  784. self.assertEquals(err, win32file.WSAEWOULDBLOCK)
  785. res = win32event.WaitForSingleObject(port_event, 1000)
  786. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  787. events = win32file.WSAEnumNetworkEvents(port, port_event)
  788. self.assertEquals(events, {win32file.FD_ACCEPT: 0})
  789. server, addr = port.accept()
  790. server.setblocking(0)
  791. server_event = win32event.CreateEvent(None, 1, 0, None)
  792. win32file.WSAEventSelect(server, server_event,
  793. win32file.FD_READ |
  794. win32file.FD_WRITE |
  795. win32file.FD_CLOSE)
  796. res = win32event.WaitForSingleObject(server_event, 1000)
  797. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  798. events = win32file.WSAEnumNetworkEvents(server, server_event)
  799. self.assertEquals(events, {win32file.FD_WRITE: 0})
  800. res = win32event.WaitForSingleObject(client_event, 1000)
  801. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  802. events = win32file.WSAEnumNetworkEvents(client, client_event)
  803. self.assertEquals(events, {win32file.FD_CONNECT: 0,
  804. win32file.FD_WRITE: 0})
  805. sent = 0
  806. data = str2bytes("x") * 16 * 1024
  807. while sent < 16 * 1024 * 1024:
  808. try:
  809. sent += client.send(data)
  810. except socket.error as e:
  811. if e.args[0] == win32file.WSAEINTR:
  812. continue
  813. elif e.args[0] in (win32file.WSAEWOULDBLOCK, win32file.WSAENOBUFS):
  814. break
  815. else:
  816. raise
  817. else:
  818. self.fail("could not find socket buffer limit")
  819. events = win32file.WSAEnumNetworkEvents(client)
  820. self.assertEquals(events, {})
  821. res = win32event.WaitForSingleObject(server_event, 1000)
  822. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  823. events = win32file.WSAEnumNetworkEvents(server, server_event)
  824. self.assertEquals(events, {win32file.FD_READ: 0})
  825. received = 0
  826. while received < sent:
  827. try:
  828. received += len(server.recv(16 * 1024))
  829. except socket.error as e:
  830. if e.args[0] in [win32file.WSAEINTR, win32file.WSAEWOULDBLOCK]:
  831. continue
  832. else:
  833. raise
  834. self.assertEquals(received, sent)
  835. events = win32file.WSAEnumNetworkEvents(server)
  836. self.assertEquals(events, {})
  837. res = win32event.WaitForSingleObject(client_event, 1000)
  838. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  839. events = win32file.WSAEnumNetworkEvents(client, client_event)
  840. self.assertEquals(events, {win32file.FD_WRITE: 0})
  841. client.shutdown(socket.SHUT_WR)
  842. res = win32event.WaitForSingleObject(server_event, 1000)
  843. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  844. # strange timing issues...
  845. for i in range(5):
  846. events = win32file.WSAEnumNetworkEvents(server, server_event)
  847. if events: break
  848. win32api.Sleep(100)
  849. else:
  850. raise AssertionError("failed to get events")
  851. self.assertEquals(events, {win32file.FD_CLOSE: 0})
  852. events = win32file.WSAEnumNetworkEvents(client)
  853. self.assertEquals(events, {})
  854. server.close()
  855. res = win32event.WaitForSingleObject(client_event, 1000)
  856. self.assertEquals(res, win32event.WAIT_OBJECT_0)
  857. events = win32file.WSAEnumNetworkEvents(client, client_event)
  858. self.assertEquals(events, {win32file.FD_CLOSE: 0})
  859. client.close()
  860. events = win32file.WSAEnumNetworkEvents(port)
  861. self.assertEquals(events, {})
  862. if __name__ == '__main__':
  863. testmain()