iostream_test.py 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454
  1. from __future__ import absolute_import, division, print_function
  2. from tornado.concurrent import Future
  3. from tornado import gen
  4. from tornado import netutil
  5. from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer
  6. from tornado.httputil import HTTPHeaders
  7. from tornado.locks import Condition, Event
  8. from tornado.log import gen_log, app_log
  9. from tornado.netutil import ssl_wrap_socket
  10. from tornado.stack_context import NullContext
  11. from tornado.tcpserver import TCPServer
  12. from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test # noqa: E501
  13. from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58,
  14. ignore_deprecation)
  15. from tornado.web import RequestHandler, Application
  16. import errno
  17. import hashlib
  18. import os
  19. import platform
  20. import random
  21. import socket
  22. import ssl
  23. import sys
  24. try:
  25. from unittest import mock # type: ignore
  26. except ImportError:
  27. try:
  28. import mock # type: ignore
  29. except ImportError:
  30. mock = None
  31. def _server_ssl_options():
  32. return dict(
  33. certfile=os.path.join(os.path.dirname(__file__), 'test.crt'),
  34. keyfile=os.path.join(os.path.dirname(__file__), 'test.key'),
  35. )
  36. class HelloHandler(RequestHandler):
  37. def get(self):
  38. self.write("Hello")
  39. class TestIOStreamWebMixin(object):
  40. def _make_client_iostream(self):
  41. raise NotImplementedError()
  42. def get_app(self):
  43. return Application([('/', HelloHandler)])
  44. def test_connection_closed(self):
  45. # When a server sends a response and then closes the connection,
  46. # the client must be allowed to read the data before the IOStream
  47. # closes itself. Epoll reports closed connections with a separate
  48. # EPOLLRDHUP event delivered at the same time as the read event,
  49. # while kqueue reports them as a second read/write event with an EOF
  50. # flag.
  51. response = self.fetch("/", headers={"Connection": "close"})
  52. response.rethrow()
  53. @gen_test
  54. def test_read_until_close(self):
  55. stream = self._make_client_iostream()
  56. yield stream.connect(('127.0.0.1', self.get_http_port()))
  57. stream.write(b"GET / HTTP/1.0\r\n\r\n")
  58. data = yield stream.read_until_close()
  59. self.assertTrue(data.startswith(b"HTTP/1.1 200"))
  60. self.assertTrue(data.endswith(b"Hello"))
  61. @gen_test
  62. def test_read_zero_bytes(self):
  63. self.stream = self._make_client_iostream()
  64. yield self.stream.connect(("127.0.0.1", self.get_http_port()))
  65. self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
  66. # normal read
  67. data = yield self.stream.read_bytes(9)
  68. self.assertEqual(data, b"HTTP/1.1 ")
  69. # zero bytes
  70. data = yield self.stream.read_bytes(0)
  71. self.assertEqual(data, b"")
  72. # another normal read
  73. data = yield self.stream.read_bytes(3)
  74. self.assertEqual(data, b"200")
  75. self.stream.close()
  76. @gen_test
  77. def test_write_while_connecting(self):
  78. stream = self._make_client_iostream()
  79. connect_fut = stream.connect(("127.0.0.1", self.get_http_port()))
  80. # unlike the previous tests, try to write before the connection
  81. # is complete.
  82. write_fut = stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n")
  83. self.assertFalse(connect_fut.done())
  84. # connect will always complete before write.
  85. it = gen.WaitIterator(connect_fut, write_fut)
  86. resolved_order = []
  87. while not it.done():
  88. yield it.next()
  89. resolved_order.append(it.current_future)
  90. self.assertEqual(resolved_order, [connect_fut, write_fut])
  91. data = yield stream.read_until_close()
  92. self.assertTrue(data.endswith(b"Hello"))
  93. stream.close()
  94. @gen_test
  95. def test_future_interface(self):
  96. """Basic test of IOStream's ability to return Futures."""
  97. stream = self._make_client_iostream()
  98. connect_result = yield stream.connect(
  99. ("127.0.0.1", self.get_http_port()))
  100. self.assertIs(connect_result, stream)
  101. yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
  102. first_line = yield stream.read_until(b"\r\n")
  103. self.assertEqual(first_line, b"HTTP/1.1 200 OK\r\n")
  104. # callback=None is equivalent to no callback.
  105. header_data = yield stream.read_until(b"\r\n\r\n", callback=None)
  106. headers = HTTPHeaders.parse(header_data.decode('latin1'))
  107. content_length = int(headers['Content-Length'])
  108. body = yield stream.read_bytes(content_length)
  109. self.assertEqual(body, b'Hello')
  110. stream.close()
  111. @gen_test
  112. def test_future_close_while_reading(self):
  113. stream = self._make_client_iostream()
  114. yield stream.connect(("127.0.0.1", self.get_http_port()))
  115. yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
  116. with self.assertRaises(StreamClosedError):
  117. yield stream.read_bytes(1024 * 1024)
  118. stream.close()
  119. @gen_test
  120. def test_future_read_until_close(self):
  121. # Ensure that the data comes through before the StreamClosedError.
  122. stream = self._make_client_iostream()
  123. yield stream.connect(("127.0.0.1", self.get_http_port()))
  124. yield stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n")
  125. yield stream.read_until(b"\r\n\r\n")
  126. body = yield stream.read_until_close()
  127. self.assertEqual(body, b"Hello")
  128. # Nothing else to read; the error comes immediately without waiting
  129. # for yield.
  130. with self.assertRaises(StreamClosedError):
  131. stream.read_bytes(1)
  132. class TestReadWriteMixin(object):
  133. # Tests where one stream reads and the other writes.
  134. # These should work for BaseIOStream implementations.
  135. def make_iostream_pair(self, **kwargs):
  136. raise NotImplementedError
  137. @gen_test
  138. def test_write_zero_bytes(self):
  139. # Attempting to write zero bytes should run the callback without
  140. # going into an infinite loop.
  141. rs, ws = yield self.make_iostream_pair()
  142. yield ws.write(b'')
  143. ws.close()
  144. rs.close()
  145. @gen_test
  146. def test_streaming_callback(self):
  147. rs, ws = yield self.make_iostream_pair()
  148. try:
  149. chunks = []
  150. cond = Condition()
  151. def streaming_callback(data):
  152. chunks.append(data)
  153. cond.notify()
  154. with ignore_deprecation():
  155. fut = rs.read_bytes(6, streaming_callback=streaming_callback)
  156. ws.write(b"1234")
  157. while not chunks:
  158. yield cond.wait()
  159. ws.write(b"5678")
  160. final_data = yield(fut)
  161. self.assertFalse(final_data)
  162. self.assertEqual(chunks, [b"1234", b"56"])
  163. # the rest of the last chunk is still in the buffer
  164. data = yield rs.read_bytes(2)
  165. self.assertEqual(data, b"78")
  166. finally:
  167. rs.close()
  168. ws.close()
  169. @gen_test
  170. def test_streaming_callback_with_final_callback(self):
  171. rs, ws = yield self.make_iostream_pair()
  172. try:
  173. chunks = []
  174. final_called = []
  175. cond = Condition()
  176. def streaming_callback(data):
  177. chunks.append(data)
  178. cond.notify()
  179. def final_callback(data):
  180. self.assertFalse(data)
  181. final_called.append(True)
  182. cond.notify()
  183. with ignore_deprecation():
  184. rs.read_bytes(6, callback=final_callback,
  185. streaming_callback=streaming_callback)
  186. ws.write(b"1234")
  187. while not chunks:
  188. yield cond.wait()
  189. ws.write(b"5678")
  190. while not final_called:
  191. yield cond.wait()
  192. self.assertEqual(chunks, [b"1234", b"56"])
  193. # the rest of the last chunk is still in the buffer
  194. data = yield rs.read_bytes(2)
  195. self.assertEqual(data, b"78")
  196. finally:
  197. rs.close()
  198. ws.close()
  199. @gen_test
  200. def test_streaming_callback_with_data_in_buffer(self):
  201. rs, ws = yield self.make_iostream_pair()
  202. ws.write(b"abcd\r\nefgh")
  203. data = yield rs.read_until(b"\r\n")
  204. self.assertEqual(data, b"abcd\r\n")
  205. streaming_fut = Future()
  206. with ignore_deprecation():
  207. rs.read_until_close(streaming_callback=streaming_fut.set_result)
  208. data = yield streaming_fut
  209. self.assertEqual(data, b"efgh")
  210. rs.close()
  211. ws.close()
  212. @gen_test
  213. def test_streaming_until_close(self):
  214. rs, ws = yield self.make_iostream_pair()
  215. try:
  216. chunks = []
  217. closed = [False]
  218. cond = Condition()
  219. def streaming_callback(data):
  220. chunks.append(data)
  221. cond.notify()
  222. def close_callback(data):
  223. assert not data, data
  224. closed[0] = True
  225. cond.notify()
  226. with ignore_deprecation():
  227. rs.read_until_close(callback=close_callback,
  228. streaming_callback=streaming_callback)
  229. ws.write(b"1234")
  230. while len(chunks) != 1:
  231. yield cond.wait()
  232. yield ws.write(b"5678")
  233. ws.close()
  234. while not closed[0]:
  235. yield cond.wait()
  236. self.assertEqual(chunks, [b"1234", b"5678"])
  237. finally:
  238. ws.close()
  239. rs.close()
  240. @gen_test
  241. def test_streaming_until_close_future(self):
  242. rs, ws = yield self.make_iostream_pair()
  243. try:
  244. chunks = []
  245. @gen.coroutine
  246. def rs_task():
  247. with ignore_deprecation():
  248. yield rs.read_until_close(streaming_callback=chunks.append)
  249. @gen.coroutine
  250. def ws_task():
  251. yield ws.write(b"1234")
  252. yield gen.sleep(0.01)
  253. yield ws.write(b"5678")
  254. ws.close()
  255. yield [rs_task(), ws_task()]
  256. self.assertEqual(chunks, [b"1234", b"5678"])
  257. finally:
  258. ws.close()
  259. rs.close()
  260. @gen_test
  261. def test_delayed_close_callback(self):
  262. # The scenario: Server closes the connection while there is a pending
  263. # read that can be served out of buffered data. The client does not
  264. # run the close_callback as soon as it detects the close, but rather
  265. # defers it until after the buffered read has finished.
  266. rs, ws = yield self.make_iostream_pair()
  267. try:
  268. event = Event()
  269. rs.set_close_callback(event.set)
  270. ws.write(b"12")
  271. chunks = []
  272. def callback1(data):
  273. chunks.append(data)
  274. with ignore_deprecation():
  275. rs.read_bytes(1, callback2)
  276. ws.close()
  277. def callback2(data):
  278. chunks.append(data)
  279. with ignore_deprecation():
  280. rs.read_bytes(1, callback1)
  281. yield event.wait() # stopped by close_callback
  282. self.assertEqual(chunks, [b"1", b"2"])
  283. finally:
  284. ws.close()
  285. rs.close()
  286. @gen_test
  287. def test_future_delayed_close_callback(self):
  288. # Same as test_delayed_close_callback, but with the future interface.
  289. rs, ws = yield self.make_iostream_pair()
  290. try:
  291. ws.write(b"12")
  292. chunks = []
  293. chunks.append((yield rs.read_bytes(1)))
  294. ws.close()
  295. chunks.append((yield rs.read_bytes(1)))
  296. self.assertEqual(chunks, [b"1", b"2"])
  297. finally:
  298. ws.close()
  299. rs.close()
  300. @gen_test
  301. def test_close_buffered_data(self):
  302. # Similar to the previous test, but with data stored in the OS's
  303. # socket buffers instead of the IOStream's read buffer. Out-of-band
  304. # close notifications must be delayed until all data has been
  305. # drained into the IOStream buffer. (epoll used to use out-of-band
  306. # close events with EPOLLRDHUP, but no longer)
  307. #
  308. # This depends on the read_chunk_size being smaller than the
  309. # OS socket buffer, so make it small.
  310. rs, ws = yield self.make_iostream_pair(read_chunk_size=256)
  311. try:
  312. ws.write(b"A" * 512)
  313. data = yield rs.read_bytes(256)
  314. self.assertEqual(b"A" * 256, data)
  315. ws.close()
  316. # Allow the close to propagate to the `rs` side of the
  317. # connection. Using add_callback instead of add_timeout
  318. # doesn't seem to work, even with multiple iterations
  319. yield gen.sleep(0.01)
  320. data = yield rs.read_bytes(256)
  321. self.assertEqual(b"A" * 256, data)
  322. finally:
  323. ws.close()
  324. rs.close()
  325. @gen_test
  326. def test_read_until_close_after_close(self):
  327. # Similar to test_delayed_close_callback, but read_until_close takes
  328. # a separate code path so test it separately.
  329. rs, ws = yield self.make_iostream_pair()
  330. try:
  331. ws.write(b"1234")
  332. ws.close()
  333. # Read one byte to make sure the client has received the data.
  334. # It won't run the close callback as long as there is more buffered
  335. # data that could satisfy a later read.
  336. data = yield rs.read_bytes(1)
  337. self.assertEqual(data, b"1")
  338. data = yield rs.read_until_close()
  339. self.assertEqual(data, b"234")
  340. finally:
  341. ws.close()
  342. rs.close()
  343. @gen_test
  344. def test_streaming_read_until_close_after_close(self):
  345. # Same as the preceding test but with a streaming_callback.
  346. # All data should go through the streaming callback,
  347. # and the final read callback just gets an empty string.
  348. rs, ws = yield self.make_iostream_pair()
  349. try:
  350. ws.write(b"1234")
  351. ws.close()
  352. data = yield rs.read_bytes(1)
  353. self.assertEqual(data, b"1")
  354. streaming_data = []
  355. final_future = Future()
  356. with ignore_deprecation():
  357. rs.read_until_close(final_future.set_result,
  358. streaming_callback=streaming_data.append)
  359. final_data = yield final_future
  360. self.assertEqual(b'', final_data)
  361. self.assertEqual(b''.join(streaming_data), b"234")
  362. finally:
  363. ws.close()
  364. rs.close()
  365. @gen_test
  366. def test_large_read_until(self):
  367. # Performance test: read_until used to have a quadratic component
  368. # so a read_until of 4MB would take 8 seconds; now it takes 0.25
  369. # seconds.
  370. rs, ws = yield self.make_iostream_pair()
  371. try:
  372. # This test fails on pypy with ssl. I think it's because
  373. # pypy's gc defeats moves objects, breaking the
  374. # "frozen write buffer" assumption.
  375. if (isinstance(rs, SSLIOStream) and
  376. platform.python_implementation() == 'PyPy'):
  377. raise unittest.SkipTest(
  378. "pypy gc causes problems with openssl")
  379. NUM_KB = 4096
  380. for i in range(NUM_KB):
  381. ws.write(b"A" * 1024)
  382. ws.write(b"\r\n")
  383. data = yield rs.read_until(b"\r\n")
  384. self.assertEqual(len(data), NUM_KB * 1024 + 2)
  385. finally:
  386. ws.close()
  387. rs.close()
  388. @gen_test
  389. def test_close_callback_with_pending_read(self):
  390. # Regression test for a bug that was introduced in 2.3
  391. # where the IOStream._close_callback would never be called
  392. # if there were pending reads.
  393. OK = b"OK\r\n"
  394. rs, ws = yield self.make_iostream_pair()
  395. event = Event()
  396. rs.set_close_callback(event.set)
  397. try:
  398. ws.write(OK)
  399. res = yield rs.read_until(b"\r\n")
  400. self.assertEqual(res, OK)
  401. ws.close()
  402. rs.read_until(b"\r\n")
  403. # If _close_callback (self.stop) is not called,
  404. # an AssertionError: Async operation timed out after 5 seconds
  405. # will be raised.
  406. yield event.wait()
  407. finally:
  408. ws.close()
  409. rs.close()
  410. @gen_test
  411. def test_future_close_callback(self):
  412. # Regression test for interaction between the Future read interfaces
  413. # and IOStream._maybe_add_error_listener.
  414. rs, ws = yield self.make_iostream_pair()
  415. closed = [False]
  416. cond = Condition()
  417. def close_callback():
  418. closed[0] = True
  419. cond.notify()
  420. rs.set_close_callback(close_callback)
  421. try:
  422. ws.write(b'a')
  423. res = yield rs.read_bytes(1)
  424. self.assertEqual(res, b'a')
  425. self.assertFalse(closed[0])
  426. ws.close()
  427. yield cond.wait()
  428. self.assertTrue(closed[0])
  429. finally:
  430. rs.close()
  431. ws.close()
  432. @gen_test
  433. def test_write_memoryview(self):
  434. rs, ws = yield self.make_iostream_pair()
  435. try:
  436. fut = rs.read_bytes(4)
  437. ws.write(memoryview(b"hello"))
  438. data = yield fut
  439. self.assertEqual(data, b"hell")
  440. finally:
  441. ws.close()
  442. rs.close()
  443. @gen_test
  444. def test_read_bytes_partial(self):
  445. rs, ws = yield self.make_iostream_pair()
  446. try:
  447. # Ask for more than is available with partial=True
  448. fut = rs.read_bytes(50, partial=True)
  449. ws.write(b"hello")
  450. data = yield fut
  451. self.assertEqual(data, b"hello")
  452. # Ask for less than what is available; num_bytes is still
  453. # respected.
  454. fut = rs.read_bytes(3, partial=True)
  455. ws.write(b"world")
  456. data = yield fut
  457. self.assertEqual(data, b"wor")
  458. # Partial reads won't return an empty string, but read_bytes(0)
  459. # will.
  460. data = yield rs.read_bytes(0, partial=True)
  461. self.assertEqual(data, b'')
  462. finally:
  463. ws.close()
  464. rs.close()
  465. @gen_test
  466. def test_read_until_max_bytes(self):
  467. rs, ws = yield self.make_iostream_pair()
  468. closed = Event()
  469. rs.set_close_callback(closed.set)
  470. try:
  471. # Extra room under the limit
  472. fut = rs.read_until(b"def", max_bytes=50)
  473. ws.write(b"abcdef")
  474. data = yield fut
  475. self.assertEqual(data, b"abcdef")
  476. # Just enough space
  477. fut = rs.read_until(b"def", max_bytes=6)
  478. ws.write(b"abcdef")
  479. data = yield fut
  480. self.assertEqual(data, b"abcdef")
  481. # Not enough space, but we don't know it until all we can do is
  482. # log a warning and close the connection.
  483. with ExpectLog(gen_log, "Unsatisfiable read"):
  484. fut = rs.read_until(b"def", max_bytes=5)
  485. ws.write(b"123456")
  486. yield closed.wait()
  487. finally:
  488. ws.close()
  489. rs.close()
  490. @gen_test
  491. def test_read_until_max_bytes_inline_legacy(self):
  492. rs, ws = yield self.make_iostream_pair()
  493. closed = Event()
  494. rs.set_close_callback(closed.set)
  495. try:
  496. # Similar to the error case in the previous test, but the
  497. # ws writes first so rs reads are satisfied
  498. # inline. For consistency with the out-of-line case, we
  499. # do not raise the error synchronously.
  500. ws.write(b"123456")
  501. with ExpectLog(gen_log, "Unsatisfiable read"):
  502. with ignore_deprecation():
  503. rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5)
  504. yield closed.wait()
  505. finally:
  506. ws.close()
  507. rs.close()
  508. @gen_test
  509. def test_read_until_max_bytes_inline(self):
  510. rs, ws = yield self.make_iostream_pair()
  511. closed = Event()
  512. rs.set_close_callback(closed.set)
  513. try:
  514. # Similar to the error case in the previous test, but the
  515. # ws writes first so rs reads are satisfied
  516. # inline. For consistency with the out-of-line case, we
  517. # do not raise the error synchronously.
  518. ws.write(b"123456")
  519. with ExpectLog(gen_log, "Unsatisfiable read"):
  520. with self.assertRaises(StreamClosedError):
  521. yield rs.read_until(b"def", max_bytes=5)
  522. yield closed.wait()
  523. finally:
  524. ws.close()
  525. rs.close()
  526. @gen_test
  527. def test_read_until_max_bytes_ignores_extra(self):
  528. rs, ws = yield self.make_iostream_pair()
  529. closed = Event()
  530. rs.set_close_callback(closed.set)
  531. try:
  532. # Even though data that matches arrives the same packet that
  533. # puts us over the limit, we fail the request because it was not
  534. # found within the limit.
  535. ws.write(b"abcdef")
  536. with ExpectLog(gen_log, "Unsatisfiable read"):
  537. rs.read_until(b"def", max_bytes=5)
  538. yield closed.wait()
  539. finally:
  540. ws.close()
  541. rs.close()
  542. @gen_test
  543. def test_read_until_regex_max_bytes(self):
  544. rs, ws = yield self.make_iostream_pair()
  545. closed = Event()
  546. rs.set_close_callback(closed.set)
  547. try:
  548. # Extra room under the limit
  549. fut = rs.read_until_regex(b"def", max_bytes=50)
  550. ws.write(b"abcdef")
  551. data = yield fut
  552. self.assertEqual(data, b"abcdef")
  553. # Just enough space
  554. fut = rs.read_until_regex(b"def", max_bytes=6)
  555. ws.write(b"abcdef")
  556. data = yield fut
  557. self.assertEqual(data, b"abcdef")
  558. # Not enough space, but we don't know it until all we can do is
  559. # log a warning and close the connection.
  560. with ExpectLog(gen_log, "Unsatisfiable read"):
  561. rs.read_until_regex(b"def", max_bytes=5)
  562. ws.write(b"123456")
  563. yield closed.wait()
  564. finally:
  565. ws.close()
  566. rs.close()
  567. @gen_test
  568. def test_read_until_regex_max_bytes_inline(self):
  569. rs, ws = yield self.make_iostream_pair()
  570. closed = Event()
  571. rs.set_close_callback(closed.set)
  572. try:
  573. # Similar to the error case in the previous test, but the
  574. # ws writes first so rs reads are satisfied
  575. # inline. For consistency with the out-of-line case, we
  576. # do not raise the error synchronously.
  577. ws.write(b"123456")
  578. with ExpectLog(gen_log, "Unsatisfiable read"):
  579. rs.read_until_regex(b"def", max_bytes=5)
  580. yield closed.wait()
  581. finally:
  582. ws.close()
  583. rs.close()
  584. @gen_test
  585. def test_read_until_regex_max_bytes_ignores_extra(self):
  586. rs, ws = yield self.make_iostream_pair()
  587. closed = Event()
  588. rs.set_close_callback(closed.set)
  589. try:
  590. # Even though data that matches arrives the same packet that
  591. # puts us over the limit, we fail the request because it was not
  592. # found within the limit.
  593. ws.write(b"abcdef")
  594. with ExpectLog(gen_log, "Unsatisfiable read"):
  595. rs.read_until_regex(b"def", max_bytes=5)
  596. yield closed.wait()
  597. finally:
  598. ws.close()
  599. rs.close()
  600. @gen_test
  601. def test_small_reads_from_large_buffer(self):
  602. # 10KB buffer size, 100KB available to read.
  603. # Read 1KB at a time and make sure that the buffer is not eagerly
  604. # filled.
  605. rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
  606. try:
  607. ws.write(b"a" * 1024 * 100)
  608. for i in range(100):
  609. data = yield rs.read_bytes(1024)
  610. self.assertEqual(data, b"a" * 1024)
  611. finally:
  612. ws.close()
  613. rs.close()
  614. @gen_test
  615. def test_small_read_untils_from_large_buffer(self):
  616. # 10KB buffer size, 100KB available to read.
  617. # Read 1KB at a time and make sure that the buffer is not eagerly
  618. # filled.
  619. rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
  620. try:
  621. ws.write((b"a" * 1023 + b"\n") * 100)
  622. for i in range(100):
  623. data = yield rs.read_until(b"\n", max_bytes=4096)
  624. self.assertEqual(data, b"a" * 1023 + b"\n")
  625. finally:
  626. ws.close()
  627. rs.close()
  628. @gen_test
  629. def test_flow_control(self):
  630. MB = 1024 * 1024
  631. rs, ws = yield self.make_iostream_pair(max_buffer_size=5 * MB)
  632. try:
  633. # Client writes more than the rs will accept.
  634. ws.write(b"a" * 10 * MB)
  635. # The rs pauses while reading.
  636. yield rs.read_bytes(MB)
  637. yield gen.sleep(0.1)
  638. # The ws's writes have been blocked; the rs can
  639. # continue to read gradually.
  640. for i in range(9):
  641. yield rs.read_bytes(MB)
  642. finally:
  643. rs.close()
  644. ws.close()
  645. @gen_test
  646. def test_read_into(self):
  647. rs, ws = yield self.make_iostream_pair()
  648. def sleep_some():
  649. self.io_loop.run_sync(lambda: gen.sleep(0.05))
  650. try:
  651. buf = bytearray(10)
  652. fut = rs.read_into(buf)
  653. ws.write(b"hello")
  654. yield gen.sleep(0.05)
  655. self.assertTrue(rs.reading())
  656. ws.write(b"world!!")
  657. data = yield fut
  658. self.assertFalse(rs.reading())
  659. self.assertEqual(data, 10)
  660. self.assertEqual(bytes(buf), b"helloworld")
  661. # Existing buffer is fed into user buffer
  662. fut = rs.read_into(buf)
  663. yield gen.sleep(0.05)
  664. self.assertTrue(rs.reading())
  665. ws.write(b"1234567890")
  666. data = yield fut
  667. self.assertFalse(rs.reading())
  668. self.assertEqual(data, 10)
  669. self.assertEqual(bytes(buf), b"!!12345678")
  670. # Existing buffer can satisfy read immediately
  671. buf = bytearray(4)
  672. ws.write(b"abcdefghi")
  673. data = yield rs.read_into(buf)
  674. self.assertEqual(data, 4)
  675. self.assertEqual(bytes(buf), b"90ab")
  676. data = yield rs.read_bytes(7)
  677. self.assertEqual(data, b"cdefghi")
  678. finally:
  679. ws.close()
  680. rs.close()
  681. @gen_test
  682. def test_read_into_partial(self):
  683. rs, ws = yield self.make_iostream_pair()
  684. try:
  685. # Partial read
  686. buf = bytearray(10)
  687. fut = rs.read_into(buf, partial=True)
  688. ws.write(b"hello")
  689. data = yield fut
  690. self.assertFalse(rs.reading())
  691. self.assertEqual(data, 5)
  692. self.assertEqual(bytes(buf), b"hello\0\0\0\0\0")
  693. # Full read despite partial=True
  694. ws.write(b"world!1234567890")
  695. data = yield rs.read_into(buf, partial=True)
  696. self.assertEqual(data, 10)
  697. self.assertEqual(bytes(buf), b"world!1234")
  698. # Existing buffer can satisfy read immediately
  699. data = yield rs.read_into(buf, partial=True)
  700. self.assertEqual(data, 6)
  701. self.assertEqual(bytes(buf), b"5678901234")
  702. finally:
  703. ws.close()
  704. rs.close()
  705. @gen_test
  706. def test_read_into_zero_bytes(self):
  707. rs, ws = yield self.make_iostream_pair()
  708. try:
  709. buf = bytearray()
  710. fut = rs.read_into(buf)
  711. self.assertEqual(fut.result(), 0)
  712. finally:
  713. ws.close()
  714. rs.close()
  715. @gen_test
  716. def test_many_mixed_reads(self):
  717. # Stress buffer handling when going back and forth between
  718. # read_bytes() (using an internal buffer) and read_into()
  719. # (using a user-allocated buffer).
  720. r = random.Random(42)
  721. nbytes = 1000000
  722. rs, ws = yield self.make_iostream_pair()
  723. produce_hash = hashlib.sha1()
  724. consume_hash = hashlib.sha1()
  725. @gen.coroutine
  726. def produce():
  727. remaining = nbytes
  728. while remaining > 0:
  729. size = r.randint(1, min(1000, remaining))
  730. data = os.urandom(size)
  731. produce_hash.update(data)
  732. yield ws.write(data)
  733. remaining -= size
  734. assert remaining == 0
  735. @gen.coroutine
  736. def consume():
  737. remaining = nbytes
  738. while remaining > 0:
  739. if r.random() > 0.5:
  740. # read_bytes()
  741. size = r.randint(1, min(1000, remaining))
  742. data = yield rs.read_bytes(size)
  743. consume_hash.update(data)
  744. remaining -= size
  745. else:
  746. # read_into()
  747. size = r.randint(1, min(1000, remaining))
  748. buf = bytearray(size)
  749. n = yield rs.read_into(buf)
  750. assert n == size
  751. consume_hash.update(buf)
  752. remaining -= size
  753. assert remaining == 0
  754. try:
  755. yield [produce(), consume()]
  756. assert produce_hash.hexdigest() == consume_hash.hexdigest()
  757. finally:
  758. ws.close()
  759. rs.close()
  760. class TestIOStreamMixin(TestReadWriteMixin):
  761. def _make_server_iostream(self, connection, **kwargs):
  762. raise NotImplementedError()
  763. def _make_client_iostream(self, connection, **kwargs):
  764. raise NotImplementedError()
  765. @gen.coroutine
  766. def make_iostream_pair(self, **kwargs):
  767. listener, port = bind_unused_port()
  768. server_stream_fut = Future()
  769. def accept_callback(connection, address):
  770. server_stream_fut.set_result(self._make_server_iostream(connection, **kwargs))
  771. netutil.add_accept_handler(listener, accept_callback)
  772. client_stream = self._make_client_iostream(socket.socket(), **kwargs)
  773. connect_fut = client_stream.connect(('127.0.0.1', port))
  774. server_stream, client_stream = yield [server_stream_fut, connect_fut]
  775. self.io_loop.remove_handler(listener.fileno())
  776. listener.close()
  777. raise gen.Return((server_stream, client_stream))
  778. def test_connection_refused_legacy(self):
  779. # When a connection is refused, the connect callback should not
  780. # be run. (The kqueue IOLoop used to behave differently from the
  781. # epoll IOLoop in this respect)
  782. cleanup_func, port = refusing_port()
  783. self.addCleanup(cleanup_func)
  784. stream = IOStream(socket.socket())
  785. self.connect_called = False
  786. def connect_callback():
  787. self.connect_called = True
  788. self.stop()
  789. stream.set_close_callback(self.stop)
  790. # log messages vary by platform and ioloop implementation
  791. with ExpectLog(gen_log, ".*", required=False):
  792. with ignore_deprecation():
  793. stream.connect(("127.0.0.1", port), connect_callback)
  794. self.wait()
  795. self.assertFalse(self.connect_called)
  796. self.assertTrue(isinstance(stream.error, socket.error), stream.error)
  797. if sys.platform != 'cygwin':
  798. _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
  799. if hasattr(errno, "WSAECONNREFUSED"):
  800. _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
  801. # cygwin's errnos don't match those used on native windows python
  802. self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
  803. @gen_test
  804. def test_connection_refused(self):
  805. # When a connection is refused, the connect callback should not
  806. # be run. (The kqueue IOLoop used to behave differently from the
  807. # epoll IOLoop in this respect)
  808. cleanup_func, port = refusing_port()
  809. self.addCleanup(cleanup_func)
  810. stream = IOStream(socket.socket())
  811. stream.set_close_callback(self.stop)
  812. # log messages vary by platform and ioloop implementation
  813. with ExpectLog(gen_log, ".*", required=False):
  814. with self.assertRaises(StreamClosedError):
  815. yield stream.connect(("127.0.0.1", port))
  816. self.assertTrue(isinstance(stream.error, socket.error), stream.error)
  817. if sys.platform != 'cygwin':
  818. _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
  819. if hasattr(errno, "WSAECONNREFUSED"):
  820. _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
  821. # cygwin's errnos don't match those used on native windows python
  822. self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
  823. @unittest.skipIf(mock is None, 'mock package not present')
  824. @gen_test
  825. def test_gaierror(self):
  826. # Test that IOStream sets its exc_info on getaddrinfo error.
  827. # It's difficult to reliably trigger a getaddrinfo error;
  828. # some resolvers own't even return errors for malformed names,
  829. # so we mock it instead. If IOStream changes to call a Resolver
  830. # before sock.connect, the mock target will need to change too.
  831. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  832. stream = IOStream(s)
  833. stream.set_close_callback(self.stop)
  834. with mock.patch('socket.socket.connect',
  835. side_effect=socket.gaierror(errno.EIO, 'boom')):
  836. with self.assertRaises(StreamClosedError):
  837. yield stream.connect(('localhost', 80))
  838. self.assertTrue(isinstance(stream.error, socket.gaierror))
  839. @gen_test
  840. def test_read_callback_error(self):
  841. # Test that IOStream sets its exc_info when a read callback throws
  842. server, client = yield self.make_iostream_pair()
  843. try:
  844. closed = Event()
  845. server.set_close_callback(closed.set)
  846. with ExpectLog(
  847. app_log, "(Uncaught exception|Exception in callback)"
  848. ):
  849. # Clear ExceptionStackContext so IOStream catches error
  850. with NullContext():
  851. with ignore_deprecation():
  852. server.read_bytes(1, callback=lambda data: 1 / 0)
  853. client.write(b"1")
  854. yield closed.wait()
  855. self.assertTrue(isinstance(server.error, ZeroDivisionError))
  856. finally:
  857. server.close()
  858. client.close()
  859. @unittest.skipIf(mock is None, 'mock package not present')
  860. @gen_test
  861. def test_read_until_close_with_error(self):
  862. server, client = yield self.make_iostream_pair()
  863. try:
  864. with mock.patch('tornado.iostream.BaseIOStream._try_inline_read',
  865. side_effect=IOError('boom')):
  866. with self.assertRaisesRegexp(IOError, 'boom'):
  867. client.read_until_close()
  868. finally:
  869. server.close()
  870. client.close()
  871. @skipIfNonUnix
  872. @skipPypy3V58
  873. @gen_test
  874. def test_inline_read_error(self):
  875. # An error on an inline read is raised without logging (on the
  876. # assumption that it will eventually be noticed or logged further
  877. # up the stack).
  878. #
  879. # This test is posix-only because windows os.close() doesn't work
  880. # on socket FDs, but we can't close the socket object normally
  881. # because we won't get the error we want if the socket knows
  882. # it's closed.
  883. server, client = yield self.make_iostream_pair()
  884. try:
  885. os.close(server.socket.fileno())
  886. with self.assertRaises(socket.error):
  887. server.read_bytes(1)
  888. finally:
  889. server.close()
  890. client.close()
  891. @skipPypy3V58
  892. @gen_test
  893. def test_async_read_error_logging(self):
  894. # Socket errors on asynchronous reads should be logged (but only
  895. # once).
  896. server, client = yield self.make_iostream_pair()
  897. closed = Event()
  898. server.set_close_callback(closed.set)
  899. try:
  900. # Start a read that will be fulfilled asynchronously.
  901. server.read_bytes(1)
  902. client.write(b'a')
  903. # Stub out read_from_fd to make it fail.
  904. def fake_read_from_fd():
  905. os.close(server.socket.fileno())
  906. server.__class__.read_from_fd(server)
  907. server.read_from_fd = fake_read_from_fd
  908. # This log message is from _handle_read (not read_from_fd).
  909. with ExpectLog(gen_log, "error on read"):
  910. yield closed.wait()
  911. finally:
  912. server.close()
  913. client.close()
  914. @gen_test
  915. def test_future_write(self):
  916. """
  917. Test that write() Futures are never orphaned.
  918. """
  919. # Run concurrent writers that will write enough bytes so as to
  920. # clog the socket buffer and accumulate bytes in our write buffer.
  921. m, n = 5000, 1000
  922. nproducers = 10
  923. total_bytes = m * n * nproducers
  924. server, client = yield self.make_iostream_pair(max_buffer_size=total_bytes)
  925. @gen.coroutine
  926. def produce():
  927. data = b'x' * m
  928. for i in range(n):
  929. yield server.write(data)
  930. @gen.coroutine
  931. def consume():
  932. nread = 0
  933. while nread < total_bytes:
  934. res = yield client.read_bytes(m)
  935. nread += len(res)
  936. try:
  937. yield [produce() for i in range(nproducers)] + [consume()]
  938. finally:
  939. server.close()
  940. client.close()
  941. class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
  942. def _make_client_iostream(self):
  943. return IOStream(socket.socket())
  944. class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase):
  945. def _make_client_iostream(self):
  946. return SSLIOStream(socket.socket(),
  947. ssl_options=dict(cert_reqs=ssl.CERT_NONE))
  948. class TestIOStream(TestIOStreamMixin, AsyncTestCase):
  949. def _make_server_iostream(self, connection, **kwargs):
  950. return IOStream(connection, **kwargs)
  951. def _make_client_iostream(self, connection, **kwargs):
  952. return IOStream(connection, **kwargs)
  953. class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
  954. def _make_server_iostream(self, connection, **kwargs):
  955. connection = ssl.wrap_socket(connection,
  956. server_side=True,
  957. do_handshake_on_connect=False,
  958. **_server_ssl_options())
  959. return SSLIOStream(connection, **kwargs)
  960. def _make_client_iostream(self, connection, **kwargs):
  961. return SSLIOStream(connection,
  962. ssl_options=dict(cert_reqs=ssl.CERT_NONE),
  963. **kwargs)
  964. # This will run some tests that are basically redundant but it's the
  965. # simplest way to make sure that it works to pass an SSLContext
  966. # instead of an ssl_options dict to the SSLIOStream constructor.
  967. class TestIOStreamSSLContext(TestIOStreamMixin, AsyncTestCase):
  968. def _make_server_iostream(self, connection, **kwargs):
  969. context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
  970. context.load_cert_chain(
  971. os.path.join(os.path.dirname(__file__), 'test.crt'),
  972. os.path.join(os.path.dirname(__file__), 'test.key'))
  973. connection = ssl_wrap_socket(connection, context,
  974. server_side=True,
  975. do_handshake_on_connect=False)
  976. return SSLIOStream(connection, **kwargs)
  977. def _make_client_iostream(self, connection, **kwargs):
  978. context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
  979. return SSLIOStream(connection, ssl_options=context, **kwargs)
  980. class TestIOStreamStartTLS(AsyncTestCase):
  981. def setUp(self):
  982. try:
  983. super(TestIOStreamStartTLS, self).setUp()
  984. self.listener, self.port = bind_unused_port()
  985. self.server_stream = None
  986. self.server_accepted = Future()
  987. netutil.add_accept_handler(self.listener, self.accept)
  988. self.client_stream = IOStream(socket.socket())
  989. self.io_loop.add_future(self.client_stream.connect(
  990. ('127.0.0.1', self.port)), self.stop)
  991. self.wait()
  992. self.io_loop.add_future(self.server_accepted, self.stop)
  993. self.wait()
  994. except Exception as e:
  995. print(e)
  996. raise
  997. def tearDown(self):
  998. if self.server_stream is not None:
  999. self.server_stream.close()
  1000. if self.client_stream is not None:
  1001. self.client_stream.close()
  1002. self.listener.close()
  1003. super(TestIOStreamStartTLS, self).tearDown()
  1004. def accept(self, connection, address):
  1005. if self.server_stream is not None:
  1006. self.fail("should only get one connection")
  1007. self.server_stream = IOStream(connection)
  1008. self.server_accepted.set_result(None)
  1009. @gen.coroutine
  1010. def client_send_line(self, line):
  1011. self.client_stream.write(line)
  1012. recv_line = yield self.server_stream.read_until(b"\r\n")
  1013. self.assertEqual(line, recv_line)
  1014. @gen.coroutine
  1015. def server_send_line(self, line):
  1016. self.server_stream.write(line)
  1017. recv_line = yield self.client_stream.read_until(b"\r\n")
  1018. self.assertEqual(line, recv_line)
  1019. def client_start_tls(self, ssl_options=None, server_hostname=None):
  1020. client_stream = self.client_stream
  1021. self.client_stream = None
  1022. return client_stream.start_tls(False, ssl_options, server_hostname)
  1023. def server_start_tls(self, ssl_options=None):
  1024. server_stream = self.server_stream
  1025. self.server_stream = None
  1026. return server_stream.start_tls(True, ssl_options)
  1027. @gen_test
  1028. def test_start_tls_smtp(self):
  1029. # This flow is simplified from RFC 3207 section 5.
  1030. # We don't really need all of this, but it helps to make sure
  1031. # that after realistic back-and-forth traffic the buffers end up
  1032. # in a sane state.
  1033. yield self.server_send_line(b"220 mail.example.com ready\r\n")
  1034. yield self.client_send_line(b"EHLO mail.example.com\r\n")
  1035. yield self.server_send_line(b"250-mail.example.com welcome\r\n")
  1036. yield self.server_send_line(b"250 STARTTLS\r\n")
  1037. yield self.client_send_line(b"STARTTLS\r\n")
  1038. yield self.server_send_line(b"220 Go ahead\r\n")
  1039. client_future = self.client_start_tls(dict(cert_reqs=ssl.CERT_NONE))
  1040. server_future = self.server_start_tls(_server_ssl_options())
  1041. self.client_stream = yield client_future
  1042. self.server_stream = yield server_future
  1043. self.assertTrue(isinstance(self.client_stream, SSLIOStream))
  1044. self.assertTrue(isinstance(self.server_stream, SSLIOStream))
  1045. yield self.client_send_line(b"EHLO mail.example.com\r\n")
  1046. yield self.server_send_line(b"250 mail.example.com welcome\r\n")
  1047. @gen_test
  1048. def test_handshake_fail(self):
  1049. server_future = self.server_start_tls(_server_ssl_options())
  1050. # Certificates are verified with the default configuration.
  1051. client_future = self.client_start_tls(server_hostname="localhost")
  1052. with ExpectLog(gen_log, "SSL Error"):
  1053. with self.assertRaises(ssl.SSLError):
  1054. yield client_future
  1055. with self.assertRaises((ssl.SSLError, socket.error)):
  1056. yield server_future
  1057. @gen_test
  1058. def test_check_hostname(self):
  1059. # Test that server_hostname parameter to start_tls is being used.
  1060. # The check_hostname functionality is only available in python 2.7 and
  1061. # up and in python 3.4 and up.
  1062. server_future = self.server_start_tls(_server_ssl_options())
  1063. client_future = self.client_start_tls(
  1064. ssl.create_default_context(),
  1065. server_hostname='127.0.0.1')
  1066. with ExpectLog(gen_log, "SSL Error"):
  1067. with self.assertRaises(ssl.SSLError):
  1068. # The client fails to connect with an SSL error.
  1069. yield client_future
  1070. with self.assertRaises(Exception):
  1071. # The server fails to connect, but the exact error is unspecified.
  1072. yield server_future
  1073. class WaitForHandshakeTest(AsyncTestCase):
  1074. @gen.coroutine
  1075. def connect_to_server(self, server_cls):
  1076. server = client = None
  1077. try:
  1078. sock, port = bind_unused_port()
  1079. server = server_cls(ssl_options=_server_ssl_options())
  1080. server.add_socket(sock)
  1081. client = SSLIOStream(socket.socket(),
  1082. ssl_options=dict(cert_reqs=ssl.CERT_NONE))
  1083. yield client.connect(('127.0.0.1', port))
  1084. self.assertIsNotNone(client.socket.cipher())
  1085. finally:
  1086. if server is not None:
  1087. server.stop()
  1088. if client is not None:
  1089. client.close()
  1090. @gen_test
  1091. def test_wait_for_handshake_callback(self):
  1092. test = self
  1093. handshake_future = Future()
  1094. class TestServer(TCPServer):
  1095. def handle_stream(self, stream, address):
  1096. # The handshake has not yet completed.
  1097. test.assertIsNone(stream.socket.cipher())
  1098. self.stream = stream
  1099. with ignore_deprecation():
  1100. stream.wait_for_handshake(self.handshake_done)
  1101. def handshake_done(self):
  1102. # Now the handshake is done and ssl information is available.
  1103. test.assertIsNotNone(self.stream.socket.cipher())
  1104. handshake_future.set_result(None)
  1105. yield self.connect_to_server(TestServer)
  1106. yield handshake_future
  1107. @gen_test
  1108. def test_wait_for_handshake_future(self):
  1109. test = self
  1110. handshake_future = Future()
  1111. class TestServer(TCPServer):
  1112. def handle_stream(self, stream, address):
  1113. test.assertIsNone(stream.socket.cipher())
  1114. test.io_loop.spawn_callback(self.handle_connection, stream)
  1115. @gen.coroutine
  1116. def handle_connection(self, stream):
  1117. yield stream.wait_for_handshake()
  1118. handshake_future.set_result(None)
  1119. yield self.connect_to_server(TestServer)
  1120. yield handshake_future
  1121. @gen_test
  1122. def test_wait_for_handshake_already_waiting_error(self):
  1123. test = self
  1124. handshake_future = Future()
  1125. class TestServer(TCPServer):
  1126. @gen.coroutine
  1127. def handle_stream(self, stream, address):
  1128. fut = stream.wait_for_handshake()
  1129. test.assertRaises(RuntimeError, stream.wait_for_handshake)
  1130. yield fut
  1131. handshake_future.set_result(None)
  1132. yield self.connect_to_server(TestServer)
  1133. yield handshake_future
  1134. @gen_test
  1135. def test_wait_for_handshake_already_connected(self):
  1136. handshake_future = Future()
  1137. class TestServer(TCPServer):
  1138. @gen.coroutine
  1139. def handle_stream(self, stream, address):
  1140. yield stream.wait_for_handshake()
  1141. yield stream.wait_for_handshake()
  1142. handshake_future.set_result(None)
  1143. yield self.connect_to_server(TestServer)
  1144. yield handshake_future
  1145. @skipIfNonUnix
  1146. class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase):
  1147. @gen.coroutine
  1148. def make_iostream_pair(self, **kwargs):
  1149. r, w = os.pipe()
  1150. return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs)
  1151. @gen_test
  1152. def test_pipe_iostream(self):
  1153. rs, ws = yield self.make_iostream_pair()
  1154. ws.write(b"hel")
  1155. ws.write(b"lo world")
  1156. data = yield rs.read_until(b' ')
  1157. self.assertEqual(data, b"hello ")
  1158. data = yield rs.read_bytes(3)
  1159. self.assertEqual(data, b"wor")
  1160. ws.close()
  1161. data = yield rs.read_until_close()
  1162. self.assertEqual(data, b"ld")
  1163. rs.close()
  1164. @gen_test
  1165. def test_pipe_iostream_big_write(self):
  1166. rs, ws = yield self.make_iostream_pair()
  1167. NUM_BYTES = 1048576
  1168. # Write 1MB of data, which should fill the buffer
  1169. ws.write(b"1" * NUM_BYTES)
  1170. data = yield rs.read_bytes(NUM_BYTES)
  1171. self.assertEqual(data, b"1" * NUM_BYTES)
  1172. ws.close()
  1173. rs.close()
  1174. class TestStreamBuffer(unittest.TestCase):
  1175. """
  1176. Unit tests for the private _StreamBuffer class.
  1177. """
  1178. def setUp(self):
  1179. self.random = random.Random(42)
  1180. def to_bytes(self, b):
  1181. if isinstance(b, (bytes, bytearray)):
  1182. return bytes(b)
  1183. elif isinstance(b, memoryview):
  1184. return b.tobytes() # For py2
  1185. else:
  1186. raise TypeError(b)
  1187. def make_streambuffer(self, large_buf_threshold=10):
  1188. buf = _StreamBuffer()
  1189. assert buf._large_buf_threshold
  1190. buf._large_buf_threshold = large_buf_threshold
  1191. return buf
  1192. def check_peek(self, buf, expected):
  1193. size = 1
  1194. while size < 2 * len(expected):
  1195. got = self.to_bytes(buf.peek(size))
  1196. self.assertTrue(got) # Not empty
  1197. self.assertLessEqual(len(got), size)
  1198. self.assertTrue(expected.startswith(got), (expected, got))
  1199. size = (size * 3 + 1) // 2
  1200. def check_append_all_then_skip_all(self, buf, objs, input_type):
  1201. self.assertEqual(len(buf), 0)
  1202. expected = b''
  1203. for o in objs:
  1204. expected += o
  1205. buf.append(input_type(o))
  1206. self.assertEqual(len(buf), len(expected))
  1207. self.check_peek(buf, expected)
  1208. while expected:
  1209. n = self.random.randrange(1, len(expected) + 1)
  1210. expected = expected[n:]
  1211. buf.advance(n)
  1212. self.assertEqual(len(buf), len(expected))
  1213. self.check_peek(buf, expected)
  1214. self.assertEqual(len(buf), 0)
  1215. def test_small(self):
  1216. objs = [b'12', b'345', b'67', b'89a', b'bcde', b'fgh', b'ijklmn']
  1217. buf = self.make_streambuffer()
  1218. self.check_append_all_then_skip_all(buf, objs, bytes)
  1219. buf = self.make_streambuffer()
  1220. self.check_append_all_then_skip_all(buf, objs, bytearray)
  1221. buf = self.make_streambuffer()
  1222. self.check_append_all_then_skip_all(buf, objs, memoryview)
  1223. # Test internal algorithm
  1224. buf = self.make_streambuffer(10)
  1225. for i in range(9):
  1226. buf.append(b'x')
  1227. self.assertEqual(len(buf._buffers), 1)
  1228. for i in range(9):
  1229. buf.append(b'x')
  1230. self.assertEqual(len(buf._buffers), 2)
  1231. buf.advance(10)
  1232. self.assertEqual(len(buf._buffers), 1)
  1233. buf.advance(8)
  1234. self.assertEqual(len(buf._buffers), 0)
  1235. self.assertEqual(len(buf), 0)
  1236. def test_large(self):
  1237. objs = [b'12' * 5,
  1238. b'345' * 2,
  1239. b'67' * 20,
  1240. b'89a' * 12,
  1241. b'bcde' * 1,
  1242. b'fgh' * 7,
  1243. b'ijklmn' * 2]
  1244. buf = self.make_streambuffer()
  1245. self.check_append_all_then_skip_all(buf, objs, bytes)
  1246. buf = self.make_streambuffer()
  1247. self.check_append_all_then_skip_all(buf, objs, bytearray)
  1248. buf = self.make_streambuffer()
  1249. self.check_append_all_then_skip_all(buf, objs, memoryview)
  1250. # Test internal algorithm
  1251. buf = self.make_streambuffer(10)
  1252. for i in range(3):
  1253. buf.append(b'x' * 11)
  1254. self.assertEqual(len(buf._buffers), 3)
  1255. buf.append(b'y')
  1256. self.assertEqual(len(buf._buffers), 4)
  1257. buf.append(b'z')
  1258. self.assertEqual(len(buf._buffers), 4)
  1259. buf.advance(33)
  1260. self.assertEqual(len(buf._buffers), 1)
  1261. buf.advance(2)
  1262. self.assertEqual(len(buf._buffers), 0)
  1263. self.assertEqual(len(buf), 0)