test_rdb.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from __future__ import absolute_import
  2. import errno
  3. import socket
  4. from mock import Mock, patch
  5. from celery.contrib.rdb import (
  6. Rdb,
  7. debugger,
  8. set_trace,
  9. )
  10. from celery.tests.case import Case, WhateverIO, skip_if_pypy
  11. class SockErr(socket.error):
  12. errno = None
  13. class test_Rdb(Case):
  14. @patch('celery.contrib.rdb.Rdb')
  15. def test_debugger(self, Rdb):
  16. x = debugger()
  17. self.assertTrue(x)
  18. self.assertIs(x, debugger())
  19. @patch('celery.contrib.rdb.debugger')
  20. @patch('celery.contrib.rdb._frame')
  21. def test_set_trace(self, _frame, debugger):
  22. self.assertTrue(set_trace(Mock()))
  23. self.assertTrue(set_trace())
  24. self.assertTrue(debugger.return_value.set_trace.called)
  25. @patch('celery.contrib.rdb.Rdb.get_avail_port')
  26. @skip_if_pypy
  27. def test_rdb(self, get_avail_port):
  28. sock = Mock()
  29. get_avail_port.return_value = (sock, 8000)
  30. sock.accept.return_value = (Mock(), ['helu'])
  31. out = WhateverIO()
  32. rdb = Rdb(out=out)
  33. self.assertTrue(get_avail_port.called)
  34. self.assertIn('helu', out.getvalue())
  35. # set_quit
  36. with patch('sys.settrace') as settrace:
  37. rdb.set_quit()
  38. settrace.assert_called_with(None)
  39. # set_trace
  40. with patch('celery.contrib.rdb.Pdb.set_trace') as pset:
  41. with patch('celery.contrib.rdb._frame'):
  42. rdb.set_trace()
  43. rdb.set_trace(Mock())
  44. pset.side_effect = SockErr
  45. pset.side_effect.errno = errno.ECONNRESET
  46. rdb.set_trace()
  47. pset.side_effect.errno = errno.ENOENT
  48. with self.assertRaises(SockErr):
  49. rdb.set_trace()
  50. # _close_session
  51. rdb._close_session()
  52. # do_continue
  53. rdb.set_continue = Mock()
  54. rdb.do_continue(Mock())
  55. rdb.set_continue.assert_called_with()
  56. # do_quit
  57. rdb.set_quit = Mock()
  58. rdb.do_quit(Mock())
  59. rdb.set_quit.assert_called_with()
  60. @patch('socket.socket')
  61. @skip_if_pypy
  62. def test_get_avail_port(self, sock):
  63. out = WhateverIO()
  64. sock.return_value.accept.return_value = (Mock(), ['helu'])
  65. Rdb(out=out)
  66. with patch('celery.contrib.rdb.current_process') as curproc:
  67. curproc.return_value.name = 'PoolWorker-10'
  68. Rdb(out=out)
  69. err = sock.return_value.bind.side_effect = SockErr()
  70. err.errno = errno.ENOENT
  71. with self.assertRaises(SockErr):
  72. Rdb(out=out)
  73. err.errno = errno.EADDRINUSE
  74. with self.assertRaises(Exception):
  75. Rdb(out=out)
  76. called = [0]
  77. def effect(*a, **kw):
  78. try:
  79. if called[0] > 50:
  80. return True
  81. raise err
  82. finally:
  83. called[0] += 1
  84. sock.return_value.bind.side_effect = effect
  85. Rdb(out=out)