test_retry_eintr.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf8 -*-
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import signal
  5. import time
  6. from threading import Thread
  7. from pytest import mark
  8. import zmq
  9. from zmq.tests import (
  10. BaseZMQTestCase, SkipTest, skip_pypy
  11. )
  12. from zmq.utils.strtypes import b
  13. # Partially based on EINTRBaseTest from CPython 3.5 eintr_tester
  14. class TestEINTRSysCall(BaseZMQTestCase):
  15. """ Base class for EINTR tests. """
  16. # delay for initial signal delivery
  17. signal_delay = 0.1
  18. # timeout for tests. Must be > signal_delay
  19. timeout = .25
  20. timeout_ms = int(timeout * 1e3)
  21. def alarm(self, t=None):
  22. """start a timer to fire only once
  23. like signal.alarm, but with better resolution than integer seconds.
  24. """
  25. if not hasattr(signal, 'setitimer'):
  26. raise SkipTest('EINTR tests require setitimer')
  27. if t is None:
  28. t = self.signal_delay
  29. self.timer_fired = False
  30. self.orig_handler = signal.signal(signal.SIGALRM, self.stop_timer)
  31. # signal_period ignored, since only one timer event is allowed to fire
  32. signal.setitimer(signal.ITIMER_REAL, t, 1000)
  33. def stop_timer(self, *args):
  34. self.timer_fired = True
  35. signal.setitimer(signal.ITIMER_REAL, 0, 0)
  36. signal.signal(signal.SIGALRM, self.orig_handler)
  37. @mark.skipif(not hasattr(zmq, 'RCVTIMEO'), reason="requires RCVTIMEO")
  38. def test_retry_recv(self):
  39. pull = self.socket(zmq.PULL)
  40. pull.rcvtimeo = self.timeout_ms
  41. self.alarm()
  42. self.assertRaises(zmq.Again, pull.recv)
  43. assert self.timer_fired
  44. @mark.skipif(not hasattr(zmq, 'SNDTIMEO'), reason="requires SNDTIMEO")
  45. def test_retry_send(self):
  46. push = self.socket(zmq.PUSH)
  47. push.sndtimeo = self.timeout_ms
  48. self.alarm()
  49. self.assertRaises(zmq.Again, push.send, b('buf'))
  50. assert self.timer_fired
  51. def test_retry_poll(self):
  52. x, y = self.create_bound_pair()
  53. poller = zmq.Poller()
  54. poller.register(x, zmq.POLLIN)
  55. self.alarm()
  56. def send():
  57. time.sleep(2 * self.signal_delay)
  58. y.send(b('ping'))
  59. t = Thread(target=send)
  60. t.start()
  61. evts = dict(poller.poll(2 * self.timeout_ms))
  62. t.join()
  63. assert x in evts
  64. assert self.timer_fired
  65. x.recv()
  66. def test_retry_term(self):
  67. push = self.socket(zmq.PUSH)
  68. push.linger = self.timeout_ms
  69. push.connect('tcp://127.0.0.1:5555')
  70. push.send(b('ping'))
  71. time.sleep(0.1)
  72. self.alarm()
  73. self.context.destroy()
  74. assert self.timer_fired
  75. assert self.context.closed
  76. def test_retry_getsockopt(self):
  77. raise SkipTest("TODO: find a way to interrupt getsockopt")
  78. def test_retry_setsockopt(self):
  79. raise SkipTest("TODO: find a way to interrupt setsockopt")