test_concurrency.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from __future__ import absolute_import
  2. import os
  3. from itertools import count
  4. from mock import Mock
  5. from celery.concurrency.base import apply_target, BasePool
  6. from celery.tests.case import AppCase
  7. class test_BasePool(AppCase):
  8. def test_apply_target(self):
  9. scratch = {}
  10. counter = count(0)
  11. def gen_callback(name, retval=None):
  12. def callback(*args):
  13. scratch[name] = (next(counter), args)
  14. return retval
  15. return callback
  16. apply_target(gen_callback('target', 42),
  17. args=(8, 16),
  18. callback=gen_callback('callback'),
  19. accept_callback=gen_callback('accept_callback'))
  20. self.assertDictContainsSubset(
  21. {'target': (1, (8, 16)), 'callback': (2, (42, ))},
  22. scratch,
  23. )
  24. pa1 = scratch['accept_callback']
  25. self.assertEqual(0, pa1[0])
  26. self.assertEqual(pa1[1][0], os.getpid())
  27. self.assertTrue(pa1[1][1])
  28. # No accept callback
  29. scratch.clear()
  30. apply_target(gen_callback('target', 42),
  31. args=(8, 16),
  32. callback=gen_callback('callback'),
  33. accept_callback=None)
  34. self.assertDictEqual(scratch,
  35. {'target': (3, (8, 16)),
  36. 'callback': (4, (42, ))})
  37. def test_does_not_debug(self):
  38. x = BasePool(10)
  39. x._does_debug = False
  40. x.apply_async(object)
  41. def test_num_processes(self):
  42. self.assertEqual(BasePool(7).num_processes, 7)
  43. def test_interface_on_start(self):
  44. BasePool(10).on_start()
  45. def test_interface_on_stop(self):
  46. BasePool(10).on_stop()
  47. def test_interface_on_apply(self):
  48. BasePool(10).on_apply()
  49. def test_interface_info(self):
  50. self.assertDictEqual(BasePool(10).info, {})
  51. def test_active(self):
  52. p = BasePool(10)
  53. self.assertFalse(p.active)
  54. p._state = p.RUN
  55. self.assertTrue(p.active)
  56. def test_restart(self):
  57. p = BasePool(10)
  58. with self.assertRaises(NotImplementedError):
  59. p.restart()
  60. def test_interface_on_terminate(self):
  61. p = BasePool(10)
  62. p.on_terminate()
  63. def test_interface_terminate_job(self):
  64. with self.assertRaises(NotImplementedError):
  65. BasePool(10).terminate_job(101)
  66. def test_interface_did_start_ok(self):
  67. self.assertTrue(BasePool(10).did_start_ok())
  68. def test_interface_register_with_event_loop(self):
  69. self.assertIsNone(
  70. BasePool(10).register_with_event_loop(Mock()),
  71. )
  72. def test_interface_on_soft_timeout(self):
  73. self.assertIsNone(BasePool(10).on_soft_timeout(Mock()))
  74. def test_interface_on_hard_timeout(self):
  75. self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))
  76. def test_interface_close(self):
  77. p = BasePool(10)
  78. p.on_close = Mock()
  79. p.close()
  80. self.assertEqual(p._state, p.CLOSE)
  81. p.on_close.assert_called_with()
  82. def test_interface_no_close(self):
  83. self.assertIsNone(BasePool(10).on_close())