test_ioloop.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. from __future__ import absolute_import
  4. try:
  5. import asyncio
  6. except ImportError:
  7. asyncio = None
  8. import time
  9. import os
  10. import threading
  11. import pytest
  12. import zmq
  13. from zmq.tests import BaseZMQTestCase, have_gevent
  14. try:
  15. from tornado.ioloop import IOLoop as BaseIOLoop
  16. from zmq.eventloop import ioloop
  17. _tornado = True
  18. except ImportError:
  19. _tornado = False
  20. # tornado 5 with asyncio disables custom IOLoop implementations
  21. t5asyncio = False
  22. if _tornado:
  23. import tornado
  24. if tornado.version_info >= (5,) and asyncio:
  25. t5asyncio = True
  26. def printer():
  27. os.system("say hello")
  28. raise Exception
  29. print (time.time())
  30. class Delay(threading.Thread):
  31. def __init__(self, f, delay=1):
  32. self.f=f
  33. self.delay=delay
  34. self.aborted=False
  35. self.cond=threading.Condition()
  36. super(Delay, self).__init__()
  37. def run(self):
  38. self.cond.acquire()
  39. self.cond.wait(self.delay)
  40. self.cond.release()
  41. if not self.aborted:
  42. self.f()
  43. def abort(self):
  44. self.aborted=True
  45. self.cond.acquire()
  46. self.cond.notify()
  47. self.cond.release()
  48. class TestIOLoop(BaseZMQTestCase):
  49. if _tornado:
  50. IOLoop = ioloop.IOLoop
  51. def setUp(self):
  52. if not _tornado:
  53. pytest.skip("tornado required")
  54. super(TestIOLoop, self).setUp()
  55. if asyncio:
  56. asyncio.set_event_loop(asyncio.new_event_loop())
  57. def tearDown(self):
  58. super(TestIOLoop, self).tearDown()
  59. BaseIOLoop.clear_current()
  60. BaseIOLoop.clear_instance()
  61. def test_simple(self):
  62. """simple IOLoop creation test"""
  63. loop = self.IOLoop()
  64. loop.make_current()
  65. dc = ioloop.PeriodicCallback(loop.stop, 200)
  66. pc = ioloop.PeriodicCallback(lambda : None, 10)
  67. pc.start()
  68. dc.start()
  69. t = Delay(loop.stop,1)
  70. t.start()
  71. loop.start()
  72. if t.is_alive():
  73. t.abort()
  74. else:
  75. self.fail("IOLoop failed to exit")
  76. def test_instance(self):
  77. """IOLoop.instance returns the right object"""
  78. loop = self.IOLoop.instance()
  79. if not t5asyncio:
  80. assert isinstance(loop, self.IOLoop)
  81. base_loop = BaseIOLoop.instance()
  82. assert base_loop is loop
  83. def test_current(self):
  84. """IOLoop.current returns the right object"""
  85. loop = ioloop.IOLoop.current()
  86. if not t5asyncio:
  87. assert isinstance(loop, self.IOLoop)
  88. base_loop = BaseIOLoop.current()
  89. assert base_loop is loop
  90. def test_close_all(self):
  91. """Test close(all_fds=True)"""
  92. loop = self.IOLoop.current()
  93. req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
  94. loop.add_handler(req, lambda msg: msg, ioloop.IOLoop.READ)
  95. loop.add_handler(rep, lambda msg: msg, ioloop.IOLoop.READ)
  96. self.assertEqual(req.closed, False)
  97. self.assertEqual(rep.closed, False)
  98. loop.close(all_fds=True)
  99. self.assertEqual(req.closed, True)
  100. self.assertEqual(rep.closed, True)
  101. if have_gevent and _tornado:
  102. import zmq.green.eventloop.ioloop as green_ioloop
  103. class TestIOLoopGreen(TestIOLoop):
  104. IOLoop = green_ioloop.IOLoop
  105. def xtest_instance(self):
  106. """Green IOLoop.instance returns the right object"""
  107. loop = self.IOLoop.instance()
  108. if not t5asyncio:
  109. assert isinstance(loop, self.IOLoop)
  110. base_loop = BaseIOLoop.instance()
  111. assert base_loop is loop
  112. def xtest_current(self):
  113. """Green IOLoop.current returns the right object"""
  114. loop = self.IOLoop.current()
  115. if not t5asyncio:
  116. assert isinstance(loop, self.IOLoop)
  117. base_loop = BaseIOLoop.current()
  118. assert base_loop is loop